Skip to content
62 changes: 61 additions & 1 deletion maxun-core/src/interpret.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ export default class Interpreter extends EventEmitter {
private concurrency: Concurrency;

private stopper: Function | null = null;

private isAborted: boolean = false;

private log: typeof log;

Expand Down Expand Up @@ -114,6 +116,13 @@ export default class Interpreter extends EventEmitter {
})
}

/**
* Sets the abort flag to immediately stop all operations
*/
public abort(): void {
this.isAborted = true;
}

private async applyAdBlocker(page: Page): Promise<void> {
if (this.blocker) {
try {
Expand Down Expand Up @@ -372,6 +381,11 @@ export default class Interpreter extends EventEmitter {
* @param steps Array of actions.
*/
private async carryOutSteps(page: Page, steps: What[]): Promise<void> {
if (this.isAborted) {
this.log('Workflow aborted, stopping execution', Level.WARN);
return;
}

/**
* Defines overloaded (or added) methods/actions usable in the workflow.
* If a method overloads any existing method of the Page class, it accepts the same set
Expand Down Expand Up @@ -433,6 +447,11 @@ export default class Interpreter extends EventEmitter {
},

scrapeSchema: async (schema: Record<string, { selector: string; tag: string, attribute: string; shadow: string}>) => {
if (this.isAborted) {
this.log('Workflow aborted, stopping scrapeSchema', Level.WARN);
return;
}

if (this.options.debugChannel?.setActionType) {
this.options.debugChannel.setActionType('scrapeSchema');
}
Expand Down Expand Up @@ -468,6 +487,11 @@ export default class Interpreter extends EventEmitter {
},

scrapeList: async (config: { listSelector: string, fields: any, limit?: number, pagination: any }) => {
if (this.isAborted) {
this.log('Workflow aborted, stopping scrapeList', Level.WARN);
return;
}

if (this.options.debugChannel?.setActionType) {
this.options.debugChannel.setActionType('scrapeList');
}
Expand Down Expand Up @@ -622,6 +646,11 @@ export default class Interpreter extends EventEmitter {
limit?: number,
pagination: any
}) {
if (this.isAborted) {
this.log('Workflow aborted, stopping pagination', Level.WARN);
return [];
}

let allResults: Record<string, any>[] = [];
let previousHeight = 0;
let scrapedItems: Set<string> = new Set<string>();
Expand All @@ -635,6 +664,12 @@ export default class Interpreter extends EventEmitter {
};

const scrapeCurrentPage = async () => {
// Check abort flag before scraping current page
if (this.isAborted) {
debugLog("Workflow aborted, stopping scrapeCurrentPage");
return;
}

const results = await page.evaluate((cfg) => window.scrapeList(cfg), config);
const newResults = results.filter(item => {
const uniqueKey = JSON.stringify(item);
Expand Down Expand Up @@ -723,7 +758,12 @@ export default class Interpreter extends EventEmitter {
let unchangedResultCounter = 0;

try {
while (true) {
while (true) {
if (this.isAborted) {
this.log('Workflow aborted during pagination loop', Level.WARN);
return allResults;
}

switch (config.pagination.type) {
case 'scrollDown': {
let previousResultCount = allResults.length;
Expand Down Expand Up @@ -969,6 +1009,11 @@ export default class Interpreter extends EventEmitter {
// const MAX_NO_NEW_ITEMS = 2;

while (true) {
if (this.isAborted) {
this.log('Workflow aborted during pagination loop', Level.WARN);
return allResults;
}

// Find working button with retry mechanism
const { button: loadMoreButton, workingSelector, updatedSelectors } = await findWorkingButton(availableSelectors);

Expand Down Expand Up @@ -1120,6 +1165,11 @@ export default class Interpreter extends EventEmitter {
}

private async runLoop(p: Page, workflow: Workflow) {
if (this.isAborted) {
this.log('Workflow aborted in runLoop', Level.WARN);
return;
}

let workflowCopy: Workflow = JSON.parse(JSON.stringify(workflow));

workflowCopy = this.removeSpecialSelectors(workflowCopy);
Expand Down Expand Up @@ -1150,6 +1200,11 @@ export default class Interpreter extends EventEmitter {
const MAX_LOOP_ITERATIONS = 1000; // Circuit breaker

while (true) {
if (this.isAborted) {
this.log('Workflow aborted during step execution', Level.WARN);
return;
}

// Circuit breaker to prevent infinite loops
if (++loopIterations > MAX_LOOP_ITERATIONS) {
this.log('Maximum loop iterations reached, terminating to prevent infinite loop', Level.ERROR);
Expand Down Expand Up @@ -1232,6 +1287,11 @@ export default class Interpreter extends EventEmitter {
}
lastAction = action;

if (this.isAborted) {
this.log('Workflow aborted before action execution', Level.WARN);
return;
}

try {
console.log("Carrying out:", action.what);
await this.carryOutSteps(p, action.what);
Expand Down
87 changes: 37 additions & 50 deletions server/src/api/record.ts
Original file line number Diff line number Diff line change
Expand Up @@ -597,65 +597,53 @@ async function executeRun(id: string, userId: string) {
}

const workflow = AddGeneratedFlags(recording.recording);

browser.interpreter.setRunId(id);

const interpretationInfo = await browser.interpreter.InterpretRecording(
workflow, currentPage, (newPage: Page) => currentPage = newPage, plainRun.interpreterSettings
);

const binaryOutputService = new BinaryOutputService('maxun-run-screenshots');
const uploadedBinaryOutput = await binaryOutputService.uploadAndStoreBinaryOutput(run, interpretationInfo.binaryOutput);

const categorizedOutput = {
scrapeSchema: interpretationInfo.scrapeSchemaOutput || {},
scrapeList: interpretationInfo.scrapeListOutput || {},
};

await destroyRemoteBrowser(plainRun.browserId, userId);

const updatedRun = await run.update({
...run,
status: 'success',
finishedAt: new Date().toLocaleString(),
browserId: plainRun.browserId,
log: interpretationInfo.log.join('\n'),
serializableOutput: {
scrapeSchema: Object.values(categorizedOutput.scrapeSchema),
scrapeList: Object.values(categorizedOutput.scrapeList),
},
binaryOutput: uploadedBinaryOutput,
});

let totalSchemaItemsExtracted = 0;
let totalListItemsExtracted = 0;
let extractedScreenshotsCount = 0;

if (categorizedOutput.scrapeSchema) {
Object.values(categorizedOutput.scrapeSchema).forEach((schemaResult: any) => {
if (Array.isArray(schemaResult)) {
totalSchemaItemsExtracted += schemaResult.length;
} else if (schemaResult && typeof schemaResult === 'object') {
totalSchemaItemsExtracted += 1;
}
});
}

if (categorizedOutput.scrapeList) {
Object.values(categorizedOutput.scrapeList).forEach((listResult: any) => {
if (Array.isArray(listResult)) {
totalListItemsExtracted += listResult.length;
}
});
}

if (uploadedBinaryOutput) {
extractedScreenshotsCount = Object.keys(uploadedBinaryOutput).length;
}

const totalRowsExtracted = totalSchemaItemsExtracted + totalListItemsExtracted;

console.log(`Extracted Schema Items Count: ${totalSchemaItemsExtracted}`);
console.log(`Extracted List Items Count: ${totalListItemsExtracted}`);
console.log(`Extracted Screenshots Count: ${extractedScreenshotsCount}`);
console.log(`Total Rows Extracted: ${totalRowsExtracted}`);
let totalSchemaItemsExtracted = 0;
let totalListItemsExtracted = 0;
let extractedScreenshotsCount = 0;
const finalRun = await Run.findOne({ where: { runId: id } });
if (finalRun) {
if (finalRun.serializableOutput) {
if (finalRun.serializableOutput.scrapeSchema) {
Object.values(finalRun.serializableOutput.scrapeSchema).forEach((schemaResult: any) => {
if (Array.isArray(schemaResult)) {
totalSchemaItemsExtracted += schemaResult.length;
} else if (schemaResult && typeof schemaResult === 'object') {
totalSchemaItemsExtracted += 1;
}
});
}
if (finalRun.serializableOutput.scrapeList) {
Object.values(finalRun.serializableOutput.scrapeList).forEach((listResult: any) => {
if (Array.isArray(listResult)) {
totalListItemsExtracted += listResult.length;
}
});
}
}
if (finalRun.binaryOutput) {
extractedScreenshotsCount = Object.keys(finalRun.binaryOutput).length;
}
}
const totalRowsExtracted = totalSchemaItemsExtracted + totalListItemsExtracted;

capture('maxun-oss-run-created-api',{
runId: id,
Expand All @@ -668,7 +656,6 @@ async function executeRun(id: string, userId: string) {
}
)

// Trigger webhooks for run completion
const webhookPayload = {
robot_id: plainRun.robotMetaId,
run_id: plainRun.runId,
Expand All @@ -677,8 +664,8 @@ async function executeRun(id: string, userId: string) {
started_at: plainRun.startedAt,
finished_at: new Date().toLocaleString(),
extracted_data: {
captured_texts: Object.values(categorizedOutput.scrapeSchema).flat() || [],
captured_lists: categorizedOutput.scrapeList,
captured_texts: finalRun?.serializableOutput?.scrapeSchema ? Object.values(finalRun.serializableOutput.scrapeSchema).flat() : [],
captured_lists: finalRun?.serializableOutput?.scrapeList || {},
total_rows: totalRowsExtracted,
captured_texts_count: totalSchemaItemsExtracted,
captured_lists_count: totalListItemsExtracted,
Expand Down
Loading