Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
00eca53
dedup work:
ikreymer Aug 30, 2025
eb6b87f
args: add separate --dedupIndexUrl to support separate redis for dedup
ikreymer Sep 17, 2025
2ecf290
add indexer entrypoint:
ikreymer Sep 18, 2025
87c9487
keep skipping dupe URLs as before
ikreymer Sep 18, 2025
bbe084d
warc writing:
ikreymer Sep 18, 2025
db9e78e
rename --dedupStoreUrl -> redisDedupUrl
ikreymer Sep 18, 2025
2f81798
update to latest warcio (2.4.7) to fix issus when returning payload o…
ikreymer Sep 18, 2025
c447428
bump to 2.4.7
ikreymer Sep 18, 2025
0cadf37
tests: add dedup-basic.test for simple dedup, ensure number of revisi…
ikreymer Sep 18, 2025
db4393c
deps update
ikreymer Sep 20, 2025
ca02f09
dedup indexing: strip hash prefix from digest, as cdx does not have it
ikreymer Sep 23, 2025
78b8847
use dedup redis for queue up wacz files that need to be updated
ikreymer Sep 23, 2025
8d53399
dedup post requests and non-404s as well!
ikreymer Sep 25, 2025
298b901
- track source index for each hash, so entry becomes '<source index> …
ikreymer Oct 18, 2025
6579b2d
update to new data model:
ikreymer Oct 24, 2025
9fba5da
cleanup, keep compatibility with redis 6 still
ikreymer Oct 24, 2025
c4f07c4
always return wacz, store wacz depends only for current wacz
ikreymer Oct 24, 2025
dd8d2e1
rename 'dedup' -> 'dedupe' for consistency
ikreymer Oct 25, 2025
0d414f7
indexer optimize: commit only if added
ikreymer Oct 25, 2025
7c37672
add removing option to also remove unused crawls if doing a full sync…
ikreymer Oct 25, 2025
9db0872
rebase fix
ikreymer Nov 28, 2025
f090360
generate wacz filename if deduping
ikreymer Nov 28, 2025
1b2ca23
cleanup pass:
ikreymer Nov 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,12 @@ ADD https://cdn.jsdelivr.net/npm/replaywebpage@${RWP_VERSION}/ui.js /app/html/rw
ADD https://cdn.jsdelivr.net/npm/replaywebpage@${RWP_VERSION}/sw.js /app/html/rwp/
ADD https://cdn.jsdelivr.net/npm/replaywebpage@${RWP_VERSION}/adblock/adblock.gz /app/html/rwp/adblock.gz

RUN chmod a+x /app/dist/main.js /app/dist/create-login-profile.js && chmod a+r /app/html/rwp/*
RUN chmod a+x /app/dist/main.js /app/dist/create-login-profile.js /app/dist/indexer.js && chmod a+r /app/html/rwp/*

RUN ln -s /app/dist/main.js /usr/bin/crawl; \
ln -s /app/dist/main.js /usr/bin/qa; \
ln -s /app/dist/create-login-profile.js /usr/bin/create-login-profile
ln -s /app/dist/create-login-profile.js /usr/bin/create-login-profile; \
ln -s /app/dist/indexer.js /usr/bin/indexer;

RUN mkdir -p /app/behaviors

Expand Down
98 changes: 62 additions & 36 deletions src/crawler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import {
} from "./util/storage.js";
import { ScreenCaster, WSTransport } from "./util/screencaster.js";
import { Screenshots } from "./util/screenshots.js";
import { initRedis } from "./util/redis.js";
import { initRedisWaitForSuccess } from "./util/redis.js";
import { logger, formatErr, LogDetails, LogContext } from "./util/logger.js";
import { WorkerState, closeWorkers, runWorkers } from "./util/worker.js";
import { sleep, timedRun, secondsElapsed } from "./util/timing.js";
Expand Down Expand Up @@ -202,6 +202,7 @@ export class Crawler {
| null = null;

recording: boolean;
deduping = false;

constructor() {
const args = this.parseArgs();
Expand Down Expand Up @@ -341,32 +342,30 @@ export class Crawler {

async initCrawlState() {
const redisUrl = this.params.redisStoreUrl || "redis://localhost:6379/0";
const dedupeRedisUrl = this.params.redisDedupeUrl || redisUrl;

this.deduping = dedupeRedisUrl !== redisUrl;

if (!redisUrl.startsWith("redis://")) {
logger.fatal(
"stateStoreUrl must start with redis:// -- Only redis-based store currently supported",
);
}

let redis;

while (true) {
try {
redis = await initRedis(redisUrl);
break;
} catch (e) {
//logger.fatal("Unable to connect to state store Redis: " + redisUrl);
logger.warn(`Waiting for redis at ${redisUrl}`, {}, "state");
await sleep(1);
}
}
const redis = await initRedisWaitForSuccess(redisUrl);

logger.debug(
`Storing state via Redis ${redisUrl} @ key prefix "${this.crawlId}"`,
{},
"state",
);

let dedupeRedis = redis;

if (redisUrl !== dedupeRedisUrl) {
dedupeRedis = await initRedisWaitForSuccess(dedupeRedisUrl);
}

logger.debug(`Max Page Time: ${this.maxPageTime} seconds`, {}, "state");

this.crawlState = new RedisCrawlState(
Expand All @@ -375,6 +374,7 @@ export class Crawler {
this.maxPageTime,
os.hostname(),
this.params.maxPageRetries,
dedupeRedis,
);

if (this.params.logErrorsToRedis) {
Expand Down Expand Up @@ -1075,7 +1075,7 @@ self.__bx_behaviors.selectMainBehavior();
const { page, cdp, data, workerid, callbacks, recorder } = opts;
data.callbacks = callbacks;

const { url, seedId } = data;
const { url, seedId, depth } = data;

const auth = this.seeds[seedId].authHeader();

Expand Down Expand Up @@ -1148,6 +1148,7 @@ self.__bx_behaviors.selectMainBehavior();

if (recorder) {
recorder.pageSeed = seed;
recorder.pageSeedDepth = depth;
}

// run custom driver here, if any
Expand Down Expand Up @@ -1326,6 +1327,7 @@ self.__bx_behaviors.selectMainBehavior();
} else {
if (pageSkipped) {
await this.crawlState.markExcluded(url);
this.limitHit = false;
} else {
const retry = await this.crawlState.markFailed(url);

Expand Down Expand Up @@ -1691,8 +1693,11 @@ self.__bx_behaviors.selectMainBehavior();
this.storage = initStorage();
}

if (this.params.generateWACZ && this.storage) {
await this.crawlState.setWACZFilename();
if (this.params.generateWACZ && (this.storage || this.deduping)) {
const filename = await this.crawlState.setWACZFilename();
if (this.deduping) {
await this.crawlState.addSourceWACZForDedupe(filename);
}
}

if (POST_CRAWL_STATES.includes(initState)) {
Expand Down Expand Up @@ -1891,21 +1896,37 @@ self.__bx_behaviors.selectMainBehavior();
}

if (this.params.generateWACZ && generateFiles) {
const uploaded = await this.generateWACZ();
const wacz = await this.generateWACZ();

if (uploaded && this.uploadAndDeleteLocal) {
await this.crawlState.setArchiveSize(0);
logger.info(
`Uploaded WACZ, deleting local data to free up space: ${this.collDir}`,
);
try {
fs.rmSync(this.collDir, { recursive: true, force: true });
} catch (e) {
logger.warn(`Unable to clear ${this.collDir} before exit`, e);
if (wacz) {
await this.crawlState.clearWACZFilename();

if (this.deduping) {
await this.crawlState.updateDedupeSourceWACZ(wacz);
}

if (this.storage && this.uploadAndDeleteLocal) {
await this.crawlState.setArchiveSize(0);

logger.info(
`Uploaded WACZ, deleting local data to free up space: ${this.collDir}`,
);
try {
fs.rmSync(this.collDir, { recursive: true, force: true });
} catch (e) {
logger.warn(`Unable to clear ${this.collDir} before exit`, e);
}
}
}
}

if (this.deduping) {
//await this.crawlState.clearDupeCrawlRef();

// commit crawl data to main index
await this.crawlState.commitDedupeDone();
}

if (this.finalExit && generateFiles && this.params.saveProfile) {
const resource = await this.browser.saveProfile(
this.params.saveProfile,
Expand Down Expand Up @@ -1943,7 +1964,7 @@ self.__bx_behaviors.selectMainBehavior();
await streamFinish(logFH);
}

async generateWACZ() {
async generateWACZ(): Promise<WACZ | null> {
logger.info("Generating WACZ");
await this.crawlState.setStatus("generate-wacz");

Expand All @@ -1957,11 +1978,11 @@ self.__bx_behaviors.selectMainBehavior();
if (!warcFileList.length) {
// if finished, just return
if (isFinished || (await this.crawlState.isCrawlCanceled())) {
return;
return null;
}
// possibly restarted after committing, so assume done here!
if ((await this.crawlState.numDone()) > 0) {
return;
return null;
}
// fail crawl otherwise
logger.fatal("No WARC Files, assuming crawl failed");
Expand All @@ -1981,6 +2002,8 @@ self.__bx_behaviors.selectMainBehavior();

await this.closeLog();

const requires = await this.crawlState.getDupeDependentCrawls();

const waczOpts: WACZInitOpts = {
input: warcFileList.map((x) => path.join(this.archivesDir, x)),
output: waczPath,
Expand All @@ -1989,6 +2012,7 @@ self.__bx_behaviors.selectMainBehavior();
warcCdxDir: this.warcCdxDir,
indexesDir: this.indexesDir,
softwareString: this.infoString,
requires,
};

if (process.env.WACZ_SIGN_URL) {
Expand Down Expand Up @@ -2018,13 +2042,8 @@ self.__bx_behaviors.selectMainBehavior();
const targetFilename = await this.crawlState.getWACZFilename();

await this.storage.uploadCollWACZ(wacz, targetFilename, isFinished);

await this.crawlState.clearWACZFilename();

return true;
}

return false;
return wacz;
} catch (e) {
logger.error("Error creating WACZ", e);
if (!streaming) {
Expand All @@ -2033,6 +2052,8 @@ self.__bx_behaviors.selectMainBehavior();
await this.setStatusAndExit(ExitCodes.UploadFailed, "interrupted");
}
}

return null;
}

logMemory() {
Expand Down Expand Up @@ -2198,7 +2219,12 @@ self.__bx_behaviors.selectMainBehavior();
// excluded in recorder
if (msg.startsWith("net::ERR_BLOCKED_BY_RESPONSE")) {
data.pageSkipped = true;
logger.warn("Page Load Blocked, skipping", { msg, loadState });
logger.warn(
"Page Load Blocked, skipping",
{ msg, loadState },
"pageStatus",
);
throw new Error("logged");
} else {
return this.pageFailed("Page Load Failed", retry, {
msg,
Expand Down
Loading
Loading