From 74094fa115e4ec02a144df62e8e1acd976ba1269 Mon Sep 17 00:00:00 2001 From: xiezc Date: Fri, 16 Aug 2024 15:34:03 +0800 Subject: [PATCH] =?UTF-8?q?FileCacheQueueScheduler=E4=BD=BF=E7=94=A8BloomF?= =?UTF-8?q?ilter=E8=BF=9B=E8=A1=8C=E5=8E=BB=E9=87=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/us/codecraft/webmagic/Spider.java | 3 +- .../scheduler/FileCacheQueueScheduler.java | 103 +++++------------- 2 files changed, 30 insertions(+), 76 deletions(-) diff --git a/webmagic-core/src/main/java/us/codecraft/webmagic/Spider.java b/webmagic-core/src/main/java/us/codecraft/webmagic/Spider.java index a35af70af..a71166421 100644 --- a/webmagic-core/src/main/java/us/codecraft/webmagic/Spider.java +++ b/webmagic-core/src/main/java/us/codecraft/webmagic/Spider.java @@ -187,7 +187,7 @@ public Spider scheduler(Scheduler scheduler) { */ public Spider setScheduler(Scheduler updateScheduler) { checkIfRunning(); - SpiderScheduler oldScheduler = this.scheduler; + Scheduler oldScheduler = scheduler.getScheduler(); scheduler.setScheduler(updateScheduler); if (oldScheduler != null) { Request request; @@ -458,7 +458,6 @@ private void onDownloadSuccess(Request request, Page page) { logger.info("page status code error, page {} , code: {}", request.getUrl(), page.getStatusCode()); } sleep(site.getSleepTime()); - return; } private void onDownloaderFail(Request request) { diff --git a/webmagic-extension/src/main/java/us/codecraft/webmagic/scheduler/FileCacheQueueScheduler.java b/webmagic-extension/src/main/java/us/codecraft/webmagic/scheduler/FileCacheQueueScheduler.java index fec3c1db9..0dabdd954 100644 --- a/webmagic-extension/src/main/java/us/codecraft/webmagic/scheduler/FileCacheQueueScheduler.java +++ b/webmagic-extension/src/main/java/us/codecraft/webmagic/scheduler/FileCacheQueueScheduler.java @@ -1,29 +1,13 @@ package us.codecraft.webmagic.scheduler; -import java.io.BufferedReader; -import java.io.Closeable; -import java.io.File; -import java.io.FileNotFoundException; -import java.io.FileReader; -import java.io.FileWriter; -import java.io.IOException; -import java.io.PrintWriter; -import java.util.LinkedHashSet; -import java.util.Set; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.math.NumberUtils; - import us.codecraft.webmagic.Request; import us.codecraft.webmagic.Task; -import us.codecraft.webmagic.scheduler.component.DuplicateRemover; + +import java.io.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; /** @@ -32,7 +16,7 @@ * @author code4crafter@gmail.com
* @since 0.2.0 */ -public class FileCacheQueueScheduler extends DuplicateRemovedScheduler implements MonitorableScheduler,Closeable { +public class FileCacheQueueScheduler extends DuplicateRemovedScheduler implements MonitorableScheduler, Closeable { private String filePath = System.getProperty("java.io.tmpdir"); @@ -52,8 +36,6 @@ public class FileCacheQueueScheduler extends DuplicateRemovedScheduler implement private BlockingQueue queue; - private Set urls; - private ScheduledExecutorService flushThreadPool; public FileCacheQueueScheduler(String filePath) { @@ -83,36 +65,13 @@ private void init(Task task) { } private void initDuplicateRemover() { - setDuplicateRemover( - new DuplicateRemover() { - @Override - public boolean isDuplicate(Request request, Task task) { - if (!inited.get()) { - init(task); - } - return !urls.add(request.getUrl()); - } - - @Override - public void resetDuplicateCheck(Task task) { - urls.clear(); - } - - @Override - public int getTotalRequestsCount(Task task) { - return urls.size(); - } - }); + BloomFilterDuplicateRemover bloomFilterDuplicateRemover = new BloomFilterDuplicateRemover(this.filePath.hashCode()); + setDuplicateRemover(bloomFilterDuplicateRemover); } private void initFlushThread() { - flushThreadPool = Executors.newScheduledThreadPool(1); - flushThreadPool.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - flush(); - } - }, 10, 10, TimeUnit.SECONDS); + flushThreadPool = Executors.newScheduledThreadPool(1); + flushThreadPool.scheduleAtFixedRate(this::flush, 10, 10, TimeUnit.SECONDS); } private void initWriter() { @@ -127,7 +86,6 @@ private void initWriter() { private void readFile() { try { queue = new LinkedBlockingQueue(); - urls = new LinkedHashSet(); readCursorFile(); readUrlFile(); // initDuplicateRemover(); @@ -140,46 +98,43 @@ private void readFile() { } private void readUrlFile() throws IOException { - String line; - BufferedReader fileUrlReader = null; - try { - fileUrlReader = new BufferedReader(new FileReader(getFileName(fileUrlAllName))); + try (BufferedReader fileUrlReader = new BufferedReader(new FileReader(getFileName(fileUrlAllName)))) { + String line; int lineReaded = 0; while ((line = fileUrlReader.readLine()) != null) { - urls.add(line.trim()); + Request request = deserializeRequest(line); + this.getDuplicateRemover().isDuplicate(request, null); lineReaded++; if (lineReaded > cursor.get()) { - queue.add(deserializeRequest(line)); + queue.add(request); } } - } finally { - if (fileUrlReader != null) { - IOUtils.closeQuietly(fileUrlReader); - } } } private void readCursorFile() throws IOException { - BufferedReader fileCursorReader = null; - try { - fileCursorReader = new BufferedReader(new FileReader(getFileName(fileCursor))); + String fileName = getFileName(fileCursor); + try (BufferedReader fileCursorReader = new BufferedReader(new FileReader(fileName))) { String line; + String lastLine = null; //read the last number while ((line = fileCursorReader.readLine()) != null) { - cursor = new AtomicInteger(NumberUtils.toInt(line)); + line = line.trim(); + if (!line.isEmpty()) { + lastLine = line; + } } - } finally { - if (fileCursorReader != null) { - IOUtils.closeQuietly(fileCursorReader); + if (lastLine != null) { + cursor.set(NumberUtils.toInt(line)); } } } - + public void close() throws IOException { - flushThreadPool.shutdown(); - fileUrlWriter.close(); - fileCursorWriter.close(); - } + flushThreadPool.shutdown(); + fileUrlWriter.close(); + fileCursorWriter.close(); + } private String getFileName(String filename) { return filePath + task.getUUID() + filename;