From 36d70901bea99b8cdc872a9b5dd523eadc02a33c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=9C=A3=E6=89=8B=E4=B9=A6=E7=94=9F?= Date: Wed, 11 Sep 2019 12:05:42 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E6=94=AF=E6=8C=81=E6=89=B9?= =?UTF-8?q?=E9=87=8F=E4=BF=9D=E5=AD=98=E4=B8=BAJSON=E6=96=87=E4=BB=B6?= =?UTF-8?q?=E7=9A=84Pipeline=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../pipeline/BatchJsonFilePipeline.java | 106 ++++++++++++++++++ 1 file changed, 106 insertions(+) create mode 100644 webmagic-extension/src/main/java/us/codecraft/webmagic/pipeline/BatchJsonFilePipeline.java diff --git a/webmagic-extension/src/main/java/us/codecraft/webmagic/pipeline/BatchJsonFilePipeline.java b/webmagic-extension/src/main/java/us/codecraft/webmagic/pipeline/BatchJsonFilePipeline.java new file mode 100644 index 000000000..35a191e4e --- /dev/null +++ b/webmagic-extension/src/main/java/us/codecraft/webmagic/pipeline/BatchJsonFilePipeline.java @@ -0,0 +1,106 @@ +package us.codecraft.webmagic.pipeline; + +import com.alibaba.fastjson.JSON; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import us.codecraft.webmagic.ResultItems; +import us.codecraft.webmagic.Task; +import us.codecraft.webmagic.utils.FilePersistentBase; + +import java.io.*; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Store results to files in JSON format.
+ * + * @author zhanglubing927@163.com
+ * @since 0.7.3 + */ +public class BatchJsonFilePipeline extends FilePersistentBase implements Pipeline, Closeable { + + private Logger logger = LoggerFactory.getLogger(getClass()); + + private boolean running; + private int batchSize; + + private BlockingQueue> queue = new LinkedBlockingDeque>(); + private ExecutorService executorService = Executors.newFixedThreadPool(1); + private AtomicInteger index = new AtomicInteger(0); + private List> list = new ArrayList>(); + + public BatchJsonFilePipeline(int batchSize) { + this("/data/webmagic", batchSize); + } + + public BatchJsonFilePipeline(String path, int batchSize) { + this.batchSize = batchSize; + setPath(path); + + startWriteTask(); + } + + @Override + public void process(ResultItems resultItems, Task task) { + try { + queue.put(resultItems.getAll()); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + @Override + public void close() { + this.running = false; + + executorService.shutdown(); + + if (!list.isEmpty()) { + write(list); + } + } + + protected String filename(int currentIndex) { + return currentIndex + ".json"; + } + + private void startWriteTask() { + running = true; + executorService.submit(new Runnable() { + @Override + public void run() { + while (running) { + Map fields = null; + try { + fields = queue.poll(1, TimeUnit.SECONDS); + } catch (InterruptedException ignore) { + } + + if (fields != null && !fields.isEmpty()) { + list.add(fields); + if (list.size() >= batchSize) { + write(list); + list.clear(); + } + } + } + } + }); + } + + private void write(List> data) { + try { + String filename = filename(index.getAndAdd(1)); + File file = getFile(getPath() + PATH_SEPERATOR + filename); + PrintWriter printWriter = new PrintWriter(new FileWriter(file)); + printWriter.write(JSON.toJSONString(data)); + printWriter.close(); + } catch (IOException e) { + logger.warn("write file error", e); + } + } + +}