`

Java nio导入csv对账文件

阅读更多

公司业务上传支付宝、微信交易记录,并和系统进行对账 功能

个人使用了,java 的nio  和 多线程进行扫描文件 并装载bean对象,具体代码如下:

附件有对应的工具类,一直上传失败,放到百度云盘,有需要的可以取下载:链接: https://pan.baidu.com/s/1y-I36iUAQbx2_Ss1Ih8ASQ 提取码: meng 

 

@RequestMapping(value = "checkReconciliationData/{channel}", method = RequestMethod.POST)
public CResponse checkReconciliationDataNew(@RequestParam("file") MultipartFile file, @PathVariable String channel)throws IOException {
    if (file != null && !file.isEmpty()) {
        String fileName = file.getOriginalFilename();
        UploadPayDataRequest uploadPayDataRequest = new UploadPayDataRequest();
        uploadPayDataRequest.setFileName(fileName);
        List<UploadPayDataEntity> uploadPayDataEntities = uploadPayDataService.selectInfoByList(uploadPayDataRequest);
        if (uploadPayDataEntities.size() > 0) {
            return CResponse.error(ResultCodeEnum.ERROR_DATA_FORMAT, "文件已存在,无需重复上传!");
        }
        String keyName = MD5.MD5Encode(fileName);
        System.out.println("keyName:" + keyName);
        String keyValue = jedisCluster.get(keyName);
        if (keyValue == null) {
            jedisCluster.set(keyName, "PROCESSING");//文件处理中
            ExecutorService executorService = Executors.newSingleThreadExecutor();
            UploadPayDataDto uploadPayDataDto = new UploadPayDataDto();
            executorService.execute(() -> {
                try {
                    CommonsMultipartFile cf = (CommonsMultipartFile) file;
                    DiskFileItem fi = (DiskFileItem) cf.getFileItem();
                    File f = fi.getStoreLocation();
                    BigFileReader.Builder builder = new BigFileReader.Builder(f, line -> {
                        if (line.indexOf("渠道") == -1) {
                            String[] strs = line.split(",");
                            String json = JSONObject.toJSONString(setItem(strs, channel, fileName));
                            jedisCluster.lpush(fileName, json);
                        }
                    });
                    BigFileReader bigFileReader = builder
                            .threadPoolSize(10)
                            .charset(Charset.forName("GBK"))
                            .bufferSize(1024).build();
                    bigFileReader.start(jedisCluster, keyName);
                    upload(file);
                } catch (Exception e) {
                    log.error("异步操作文件内容失败,原因:" + e.getMessage());
                    jedisCluster.del(keyName);
                }
            });
            BufferedInputStream bis = new BufferedInputStream(file.getInputStream());
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(bis, "GBK"), 30 * 1024 * 1024);//10M缓存
            uploadPayDataDto.setTotalNum(bufferedReader.lines().count());
            uploadPayDataDto.setUrl(ALIYUN_IMAGE_URL + fileName);
            uploadPayDataDto.setFileName(fileName);
            return CResponse.success(uploadPayDataDto);
        } else {
            return CResponse.error(ResultCodeEnum.ERROR_DATA_FORMAT, "文件处理中");
        }
    } else {
        return CResponse.error(ResultCodeEnum.ERROR_DATA_FORMAT, "文件数据为空");
    }
}



public class BigFileReader {
    private int threadPoolSize;
    private Charset charset;
    private int bufferSize;
    private IFileHandle handle;
    private ExecutorService executorService;
    private long fileLength;
    private RandomAccessFile rAccessFile;
    private Set<StartEndPair> startEndPairs;
    private CyclicBarrier cyclicBarrier;
    private AtomicLong counter = new AtomicLong(0);


    public BigFileReader(File file, IFileHandle handle, Charset charset, int bufferSize, int threadPoolSize) {
        this.fileLength = file.length();
        this.handle = handle;
        this.charset = charset;
        this.bufferSize = bufferSize;
        this.threadPoolSize = threadPoolSize;
        try {
            this.rAccessFile = new RandomAccessFile(file, "r");
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        }
        this.executorService = Executors.newFixedThreadPool(threadPoolSize);
        startEndPairs = new HashSet<>();
    }

    public void start(JedisCluster jedisCluster, String fileName) {
        long everySize = this.fileLength / this.threadPoolSize;
        try {
            calculateStartEnd(0, everySize);
        } catch (IOException e) {
            e.printStackTrace();
            return;
        }

        final long startTime = System.currentTimeMillis();
        cyclicBarrier = new CyclicBarrier(startEndPairs.size(), () -> {
            System.out.println("use time: " + (System.currentTimeMillis() - startTime));
            System.out.println("all line: " + counter.get());
            System.out.println(fileName);
            jedisCluster.set(fileName, "SUCCESS");
            shutdown();
        });
        for (StartEndPair pair : startEndPairs) {
            System.out.println("分配分片:" + pair);
            this.executorService.execute(new SliceReaderTask(pair));
        }
    }

    private void calculateStartEnd(long start, long size) throws IOException {
        if (start > fileLength - 1) {
            return;
        }
        StartEndPair pair = new StartEndPair();
        pair.start = start;
        long endPosition = start + size - 1;
        if (endPosition >= fileLength - 1) {
            pair.end = fileLength - 1;
            startEndPairs.add(pair);
            return;
        }

        rAccessFile.seek(endPosition);
        byte tmp = (byte) rAccessFile.read();
        while (tmp != '\n' && tmp != '\r') {
            endPosition++;
            if (endPosition >= fileLength - 1) {
                endPosition = fileLength - 1;
                break;
            }
            rAccessFile.seek(endPosition);
            tmp = (byte) rAccessFile.read();
        }
        pair.end = endPosition;
        startEndPairs.add(pair);

        calculateStartEnd(endPosition + 1, size);

    }

    public void shutdown() {
        try {
            this.rAccessFile.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
        this.executorService.shutdown();
    }

    private void handle(byte[] bytes) throws UnsupportedEncodingException {
        String line = null;
        if (this.charset == null) {
            line = new String(bytes);
        } else {
            line = new String(bytes, charset);
        }
        if (line != null && !"".equals(line)) {
            this.handle.handle(line);
            counter.incrementAndGet();
        }
    }


    private static class StartEndPair {
        public long start;
        public long end;

        @Override
        public String toString() {
            return "star=" + start + ";end=" + end;
        }
    }

    private class SliceReaderTask implements Runnable {
        private long start;
        private long sliceSize;
        private byte[] readBuff;

        public SliceReaderTask(StartEndPair pair) {
            this.start = pair.start;
            this.sliceSize = pair.end - pair.start + 1;
            this.readBuff = new byte[bufferSize];
        }


        @Override
        public void run() {
            try {
                MappedByteBuffer mapBuffer = rAccessFile.getChannel().map(FileChannel.MapMode.READ_ONLY, start, this.sliceSize);
                ByteArrayOutputStream bos = new ByteArrayOutputStream();
                for (int offset = 0; offset < sliceSize; offset += bufferSize) {
                    int readLength;
                    if (offset + bufferSize <= sliceSize) {
                        readLength = bufferSize;
                    } else {
                        readLength = (int) (sliceSize - offset);
                    }
                    mapBuffer.get(readBuff, 0, readLength);
                    for (int i = 0; i < readLength; i++) {
                        byte tmp = readBuff[i];
                        //碰到换行符
                        if (tmp == '\n' || tmp == '\r') {
                            handle(bos.toByteArray());
                            bos.reset();
                        } else {
                            bos.write(tmp);
                        }
                    }
                }
                if (bos.size() > 0) {
                    handle(bos.toByteArray());
                }
                cyclicBarrier.await();//测试性能用
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public static class Builder {
        private int threadSize = 1;
        private Charset charset;
        private int bufferSize = 1024 * 1024;
        private IFileHandle handle;
        private File file;

        public Builder(File file, IFileHandle handle) {
            this.file = file;
            if (!this.file.exists())
                throw new IllegalArgumentException("文件不存在!");
            this.handle = handle;
        }

        public Builder threadPoolSize(int size) {
            this.threadSize = size;
            return this;
        }

        public Builder charset(Charset charset) {
            this.charset = charset;
            return this;
        }

        public Builder bufferSize(int bufferSize) {
            this.bufferSize = bufferSize;
            return this;
        }

        public BigFileReader build() {
            return new BigFileReader(this.file, this.handle, this.charset, this.bufferSize, this.threadSize);
        }
    }

}

public interface IFileHandle {
    void handle(String line);
}
0
0
分享到:
评论

相关推荐

    java nio 包读取超大数据文件

    Java nio 超大数据文件 超大数据文件Java nio 超大数据文件 超大数据文件Java nio 超大数据文件 超大数据文件Java nio 超大数据文件 超大数据文件Java nio 超大数据文件 超大数据文件Java nio 超大数据文件 超大数据...

    java NIO和java并发编程的书籍

    java NIO和java并发编程的书籍java NIO和java并发编程的书籍java NIO和java并发编程的书籍java NIO和java并发编程的书籍java NIO和java并发编程的书籍java NIO和java并发编程的书籍java NIO和java并发编程的书籍java...

    java nio 读文件

    java nio 读文件,java nio 读文件

    JavaNIO chm帮助文档

    Java NIO系列教程(一) Java NIO 概述 Java NIO系列教程(二) Channel Java NIO系列教程(三) Buffer Java NIO系列教程(四) Scatter/Gather Java NIO系列教程(五) 通道之间的数据传输 Java NIO系列教程(六)...

    JAVA NIO 按行读取大文件,支持 GB级别

    本类,是专门为了处理大文件,按行读取开发的类。 采用读文件的缓存 fbb 1024*5 行缓存 bb 256 字节 设计思想: 每次通过nio读取字节到 fbb中 然后对fbb自己中的内容进行行判断即 10 回车 13 行号 0 文件...

    JAVA NIO 按行读取大文件支持 GB级别-修正版

    本类,是专门为了处理大文件,按行读取开发的类。 采用读文件的缓存 fbb 1024*5 行缓存 bb 256 字节 设计思想: 每次通过nio读取字节到 fbb中 然后对fbb自己中的内容进行行判断即 10 回车 13 行号 0 文件结束 ...

    Java NIO英文高清原版

    Java NIO英文高清原版

    JAVA NIO 简单PFT 文件服务

    JAVA NIO 简单PFT 文件服务 上传 下载 列表

    java nio 写文件

    java nio 写文件,java nio 写文件

    java NIO 中文版

    讲解了 JavaIO 与 JAVA NIO区别,JAVA NIO设计理念,以及JDK中java NIO中语法的使用

    java NIO.zip

    java NIO.zip

    Java NIO 中文 Java NIO 中文 Java NIO 中文文档

    Java NIO 深入探讨了 1.4 版的 I/O 新特性,并告诉您如何使用这些特性来极大地提升您所写的 Java 代码的执行效率。这本小册子就程序员所面临的有代表性的 I/O 问题作了详尽阐述,并讲解了 如何才能充分利用新的 I/O ...

    java NIO 视频教程

    Java NIO(New IO)是一个可以替代标准Java IO API的IO API(从Java 1.4开始),Java NIO提供了与标准IO不同的IO工作方式。 Java NIO: Channels and Buffers(通道和缓冲区) 标准的IO基于字节流和字符流进行操作的,...

    java nio 实现socket

    java nio 实现socketjava nio 实现socketjava nio 实现socketjava nio 实现socketjava nio 实现socket

    java nio中文版

    java NIO是 java New IO 的简称,在 jdk1.4 里提供的新 api 。 Sun 官方标榜的特性如下: – 为所有的原始类型提供 (Buffer) 缓存支持。 – 字符集编码解码解决方案。 – Channel :一个新的原始 I/O 抽象。 – 支持...

    java NIO实例

    实例介绍了一个简单的nio实例,适合刚接触nio的童鞋们....

    Java用NIO读取文件示范

    简单的用Java的NIO读取文件的程序,给大家参考。

    Java Nio selector例程

    java侧起server(NioUdpServer1.java),基于Java Nio的selector 阻塞等候,一个android app(NioUdpClient1文件夹)和一个java程序(UI.java)作为两个client分别向该server发数据,server收到后分别打印收到的消息...

    java NIO文件操作(中文版pdf)

    java NIO文件操作(中文版pdf),希望对大家有帮助,(转载)

    java NIO 写文件

    java nio 写文件样例,java写大数据文件时提高性能

Global site tag (gtag.js) - Google Analytics