diff --git a/maxun-core/src/interpret.ts b/maxun-core/src/interpret.ts index faa473c25..6826b06ec 100644 --- a/maxun-core/src/interpret.ts +++ b/maxun-core/src/interpret.ts @@ -64,6 +64,8 @@ export default class Interpreter extends EventEmitter { private concurrency: Concurrency; private stopper: Function | null = null; + + private isAborted: boolean = false; private log: typeof log; @@ -114,6 +116,13 @@ export default class Interpreter extends EventEmitter { }) } + /** + * Sets the abort flag to immediately stop all operations + */ + public abort(): void { + this.isAborted = true; + } + private async applyAdBlocker(page: Page): Promise { if (this.blocker) { try { @@ -372,6 +381,11 @@ export default class Interpreter extends EventEmitter { * @param steps Array of actions. */ private async carryOutSteps(page: Page, steps: What[]): Promise { + if (this.isAborted) { + this.log('Workflow aborted, stopping execution', Level.WARN); + return; + } + /** * Defines overloaded (or added) methods/actions usable in the workflow. * If a method overloads any existing method of the Page class, it accepts the same set @@ -433,6 +447,11 @@ export default class Interpreter extends EventEmitter { }, scrapeSchema: async (schema: Record) => { + if (this.isAborted) { + this.log('Workflow aborted, stopping scrapeSchema', Level.WARN); + return; + } + if (this.options.debugChannel?.setActionType) { this.options.debugChannel.setActionType('scrapeSchema'); } @@ -468,6 +487,11 @@ export default class Interpreter extends EventEmitter { }, scrapeList: async (config: { listSelector: string, fields: any, limit?: number, pagination: any }) => { + if (this.isAborted) { + this.log('Workflow aborted, stopping scrapeList', Level.WARN); + return; + } + if (this.options.debugChannel?.setActionType) { this.options.debugChannel.setActionType('scrapeList'); } @@ -622,6 +646,11 @@ export default class Interpreter extends EventEmitter { limit?: number, pagination: any }) { + if (this.isAborted) { + this.log('Workflow aborted, stopping pagination', Level.WARN); + return []; + } + let allResults: Record[] = []; let previousHeight = 0; let scrapedItems: Set = new Set(); @@ -635,6 +664,12 @@ export default class Interpreter extends EventEmitter { }; const scrapeCurrentPage = async () => { + // Check abort flag before scraping current page + if (this.isAborted) { + debugLog("Workflow aborted, stopping scrapeCurrentPage"); + return; + } + const results = await page.evaluate((cfg) => window.scrapeList(cfg), config); const newResults = results.filter(item => { const uniqueKey = JSON.stringify(item); @@ -723,7 +758,12 @@ export default class Interpreter extends EventEmitter { let unchangedResultCounter = 0; try { - while (true) { + while (true) { + if (this.isAborted) { + this.log('Workflow aborted during pagination loop', Level.WARN); + return allResults; + } + switch (config.pagination.type) { case 'scrollDown': { let previousResultCount = allResults.length; @@ -969,6 +1009,11 @@ export default class Interpreter extends EventEmitter { // const MAX_NO_NEW_ITEMS = 2; while (true) { + if (this.isAborted) { + this.log('Workflow aborted during pagination loop', Level.WARN); + return allResults; + } + // Find working button with retry mechanism const { button: loadMoreButton, workingSelector, updatedSelectors } = await findWorkingButton(availableSelectors); @@ -1120,6 +1165,11 @@ export default class Interpreter extends EventEmitter { } private async runLoop(p: Page, workflow: Workflow) { + if (this.isAborted) { + this.log('Workflow aborted in runLoop', Level.WARN); + return; + } + let workflowCopy: Workflow = JSON.parse(JSON.stringify(workflow)); workflowCopy = this.removeSpecialSelectors(workflowCopy); @@ -1150,6 +1200,11 @@ export default class Interpreter extends EventEmitter { const MAX_LOOP_ITERATIONS = 1000; // Circuit breaker while (true) { + if (this.isAborted) { + this.log('Workflow aborted during step execution', Level.WARN); + return; + } + // Circuit breaker to prevent infinite loops if (++loopIterations > MAX_LOOP_ITERATIONS) { this.log('Maximum loop iterations reached, terminating to prevent infinite loop', Level.ERROR); @@ -1232,6 +1287,11 @@ export default class Interpreter extends EventEmitter { } lastAction = action; + if (this.isAborted) { + this.log('Workflow aborted before action execution', Level.WARN); + return; + } + try { console.log("Carrying out:", action.what); await this.carryOutSteps(p, action.what); diff --git a/server/src/api/record.ts b/server/src/api/record.ts index e05aa8ceb..a10a1c432 100644 --- a/server/src/api/record.ts +++ b/server/src/api/record.ts @@ -597,65 +597,53 @@ async function executeRun(id: string, userId: string) { } const workflow = AddGeneratedFlags(recording.recording); + + browser.interpreter.setRunId(id); + const interpretationInfo = await browser.interpreter.InterpretRecording( workflow, currentPage, (newPage: Page) => currentPage = newPage, plainRun.interpreterSettings ); - const binaryOutputService = new BinaryOutputService('maxun-run-screenshots'); - const uploadedBinaryOutput = await binaryOutputService.uploadAndStoreBinaryOutput(run, interpretationInfo.binaryOutput); - - const categorizedOutput = { - scrapeSchema: interpretationInfo.scrapeSchemaOutput || {}, - scrapeList: interpretationInfo.scrapeListOutput || {}, - }; - await destroyRemoteBrowser(plainRun.browserId, userId); const updatedRun = await run.update({ - ...run, status: 'success', finishedAt: new Date().toLocaleString(), - browserId: plainRun.browserId, log: interpretationInfo.log.join('\n'), - serializableOutput: { - scrapeSchema: Object.values(categorizedOutput.scrapeSchema), - scrapeList: Object.values(categorizedOutput.scrapeList), - }, - binaryOutput: uploadedBinaryOutput, }); - let totalSchemaItemsExtracted = 0; - let totalListItemsExtracted = 0; - let extractedScreenshotsCount = 0; - - if (categorizedOutput.scrapeSchema) { - Object.values(categorizedOutput.scrapeSchema).forEach((schemaResult: any) => { - if (Array.isArray(schemaResult)) { - totalSchemaItemsExtracted += schemaResult.length; - } else if (schemaResult && typeof schemaResult === 'object') { - totalSchemaItemsExtracted += 1; - } - }); - } - - if (categorizedOutput.scrapeList) { - Object.values(categorizedOutput.scrapeList).forEach((listResult: any) => { - if (Array.isArray(listResult)) { - totalListItemsExtracted += listResult.length; - } - }); - } - - if (uploadedBinaryOutput) { - extractedScreenshotsCount = Object.keys(uploadedBinaryOutput).length; - } - - const totalRowsExtracted = totalSchemaItemsExtracted + totalListItemsExtracted; - - console.log(`Extracted Schema Items Count: ${totalSchemaItemsExtracted}`); - console.log(`Extracted List Items Count: ${totalListItemsExtracted}`); - console.log(`Extracted Screenshots Count: ${extractedScreenshotsCount}`); - console.log(`Total Rows Extracted: ${totalRowsExtracted}`); + let totalSchemaItemsExtracted = 0; + let totalListItemsExtracted = 0; + let extractedScreenshotsCount = 0; + + const finalRun = await Run.findOne({ where: { runId: id } }); + if (finalRun) { + if (finalRun.serializableOutput) { + if (finalRun.serializableOutput.scrapeSchema) { + Object.values(finalRun.serializableOutput.scrapeSchema).forEach((schemaResult: any) => { + if (Array.isArray(schemaResult)) { + totalSchemaItemsExtracted += schemaResult.length; + } else if (schemaResult && typeof schemaResult === 'object') { + totalSchemaItemsExtracted += 1; + } + }); + } + + if (finalRun.serializableOutput.scrapeList) { + Object.values(finalRun.serializableOutput.scrapeList).forEach((listResult: any) => { + if (Array.isArray(listResult)) { + totalListItemsExtracted += listResult.length; + } + }); + } + } + + if (finalRun.binaryOutput) { + extractedScreenshotsCount = Object.keys(finalRun.binaryOutput).length; + } + } + + const totalRowsExtracted = totalSchemaItemsExtracted + totalListItemsExtracted; capture('maxun-oss-run-created-api',{ runId: id, @@ -668,7 +656,6 @@ async function executeRun(id: string, userId: string) { } ) - // Trigger webhooks for run completion const webhookPayload = { robot_id: plainRun.robotMetaId, run_id: plainRun.runId, @@ -677,8 +664,8 @@ async function executeRun(id: string, userId: string) { started_at: plainRun.startedAt, finished_at: new Date().toLocaleString(), extracted_data: { - captured_texts: Object.values(categorizedOutput.scrapeSchema).flat() || [], - captured_lists: categorizedOutput.scrapeList, + captured_texts: finalRun?.serializableOutput?.scrapeSchema ? Object.values(finalRun.serializableOutput.scrapeSchema).flat() : [], + captured_lists: finalRun?.serializableOutput?.scrapeList || {}, total_rows: totalRowsExtracted, captured_texts_count: totalSchemaItemsExtracted, captured_lists_count: totalListItemsExtracted, diff --git a/server/src/browser-management/classes/BrowserPool.ts b/server/src/browser-management/classes/BrowserPool.ts index eb99f2df7..286ef6812 100644 --- a/server/src/browser-management/classes/BrowserPool.ts +++ b/server/src/browser-management/classes/BrowserPool.ts @@ -36,6 +36,14 @@ interface BrowserPoolInfo { * Can be "reserved", "initializing", "ready" or "failed". */ status?: "reserved" | "initializing" | "ready" | "failed", + /** + * Timestamp when the browser slot was created/reserved + */ + createdAt?: number, + /** + * Timestamp when the browser was last accessed + */ + lastAccessed?: number, } /** @@ -66,6 +74,12 @@ export class BrowserPool { */ private userToBrowserMap: Map = new Map(); + /** + * Locks for atomic operations to prevent race conditions + * Key format: "userId-state", Value: timestamp when lock was acquired + */ + private reservationLocks: Map = new Map(); + /** * Adds a remote browser instance to the pool for a specific user. * If the user already has two browsers, the oldest browser will be closed and replaced. @@ -570,7 +584,7 @@ export class BrowserPool { }; /** - * Reserves a browser slot immediately without creating the actual browser. + * Reserves a browser slot atomically to prevent race conditions. * This ensures slot counting is accurate for rapid successive requests. * * @param id browser ID to reserve @@ -578,31 +592,65 @@ export class BrowserPool { * @param state browser state ("recording" or "run") * @returns true if slot was reserved, false if user has reached limit */ - public reserveBrowserSlot = (id: string, userId: string, state: BrowserState = "run"): boolean => { - // Check if user has available slots first - if (!this.hasAvailableBrowserSlots(userId, state)) { - logger.log('debug', `Cannot reserve slot for user ${userId}: no available slots`); + public reserveBrowserSlotAtomic = (id: string, userId: string, state: BrowserState = "run"): boolean => { + const lockKey = `${userId}-${state}`; + + if (this.reservationLocks.has(lockKey)) { + logger.log('debug', `Reservation already in progress for user ${userId} state ${state}`); return false; } + + try { + this.reservationLocks.set(lockKey, Date.now()); + + if (!this.hasAvailableBrowserSlots(userId, state)) { + logger.log('debug', `Cannot reserve slot for user ${userId}: no available slots`); + return false; + } - // Reserve the slot with null browser - this.pool[id] = { - browser: null, - active: false, - userId, - state, - status: "reserved" - }; + if (this.pool[id]) { + logger.log('debug', `Browser slot ${id} already exists`); + return false; + } - // Update the user-to-browser mapping - let userBrowserIds = this.userToBrowserMap.get(userId) || []; - if (!userBrowserIds.includes(id)) { - userBrowserIds.push(id); - this.userToBrowserMap.set(userId, userBrowserIds); + const now = Date.now(); + + this.pool[id] = { + browser: null, + active: false, + userId, + state, + status: "reserved", + createdAt: now, + lastAccessed: now + }; + + const userBrowserIds = this.userToBrowserMap.get(userId) || []; + if (!userBrowserIds.includes(id)) { + userBrowserIds.push(id); + this.userToBrowserMap.set(userId, userBrowserIds); + } + + logger.log('info', `Atomically reserved browser slot ${id} for user ${userId} in state ${state}`); + return true; + + } catch (error: any) { + logger.log('error', `Error during atomic slot reservation: ${error.message}`); + if (this.pool[id] && this.pool[id].status === "reserved") { + this.deleteRemoteBrowser(id); + } + return false; + } finally { + this.reservationLocks.delete(lockKey); } + }; - logger.log('info', `Reserved browser slot ${id} for user ${userId} in state ${state}`); - return true; + /** + * Legacy method - kept for backwards compatibility but now uses atomic version + * @deprecated Use reserveBrowserSlotAtomic instead + */ + public reserveBrowserSlot = (id: string, userId: string, state: BrowserState = "run"): boolean => { + return this.reserveBrowserSlotAtomic(id, userId, state); }; /** @@ -630,17 +678,89 @@ export class BrowserPool { }; /** - * Marks a reserved slot as failed and removes it. + * Marks a reserved slot as failed and removes it with proper cleanup. * * @param id browser ID to mark as failed */ public failBrowserSlot = (id: string): void => { if (this.pool[id]) { logger.log('info', `Marking browser slot ${id} as failed`); + + // Attempt to cleanup browser resources before deletion + const browserInfo = this.pool[id]; + if (browserInfo.browser) { + try { + // Try to close browser if it exists + browserInfo.browser.switchOff?.().catch((error: any) => { + logger.log('warn', `Error closing failed browser ${id}: ${error.message}`); + }); + } catch (error: any) { + logger.log('warn', `Error during browser cleanup for ${id}: ${error.message}`); + } + } + this.deleteRemoteBrowser(id); } }; + /** + * Cleanup stale browser slots that have been in reserved/initializing state too long + * This prevents resource leaks from failed initializations + */ + public cleanupStaleBrowserSlots = (): void => { + const now = Date.now(); + const staleThreshold = 5 * 60 * 1000; // 5 minutes + + const staleSlots: string[] = []; + + for (const [id, info] of Object.entries(this.pool)) { + const isStale = info.status === "reserved" || info.status === "initializing"; + const createdAt = info.createdAt || 0; + const age = now - createdAt; + + if (isStale && info.browser === null && age > staleThreshold) { + staleSlots.push(id); + } + } + + staleSlots.forEach(id => { + const info = this.pool[id]; + logger.log('warn', `Cleaning up stale browser slot ${id} with status ${info.status}, age: ${Math.round((now - (info.createdAt || 0)) / 1000)}s`); + this.failBrowserSlot(id); + }); + + if (staleSlots.length > 0) { + logger.log('info', `Cleaned up ${staleSlots.length} stale browser slots`); + } + + this.cleanupStaleReservationLocks(); + }; + + /** + * Cleans up reservation locks that are older than 1 minute + * This prevents locks from being held indefinitely due to crashes + */ + private cleanupStaleReservationLocks = (): void => { + const now = Date.now(); + const lockTimeout = 60 * 1000; // 1 minute + + const staleLocks: string[] = []; + + for (const [lockKey, timestamp] of this.reservationLocks.entries()) { + if (now - timestamp > lockTimeout) { + staleLocks.push(lockKey); + } + } + + staleLocks.forEach(lockKey => { + this.reservationLocks.delete(lockKey); + }); + + if (staleLocks.length > 0) { + logger.log('warn', `Cleaned up ${staleLocks.length} stale reservation locks`); + } + }; + /** * Gets the current status of a browser slot. * @@ -653,4 +773,22 @@ export class BrowserPool { } return this.pool[id].status || null; }; + + /** + * Returns all browser instances in the pool. + * Used for cleanup operations like graceful shutdown. + * + * @returns Map of browser IDs to browser instances + */ + public getAllBrowsers = (): Map => { + const browsers = new Map(); + + for (const [id, info] of Object.entries(this.pool)) { + if (info.browser) { + browsers.set(id, info.browser); + } + } + + return browsers; + }; } \ No newline at end of file diff --git a/server/src/models/Run.ts b/server/src/models/Run.ts index dc371e8e8..1e292dbbf 100644 --- a/server/src/models/Run.ts +++ b/server/src/models/Run.ts @@ -25,6 +25,7 @@ interface RunAttributes { runByAPI?: boolean; serializableOutput: Record; binaryOutput: Record; + retryCount?: number; } interface RunCreationAttributes extends Optional { } @@ -46,6 +47,7 @@ class Run extends Model implements RunAttr public runByAPI!: boolean; public serializableOutput!: Record; public binaryOutput!: Record; + public retryCount!: number; } Run.init( @@ -120,6 +122,11 @@ Run.init( allowNull: true, defaultValue: {}, }, + retryCount: { + type: DataTypes.INTEGER, + allowNull: true, + defaultValue: 0, + }, }, { sequelize, diff --git a/server/src/pgboss-worker.ts b/server/src/pgboss-worker.ts index c8baa9c0d..7e5687125 100644 --- a/server/src/pgboss-worker.ts +++ b/server/src/pgboss-worker.ts @@ -14,11 +14,9 @@ import Run from './models/Run'; import Robot from './models/Robot'; import { browserPool } from './server'; import { Page } from 'playwright'; -import { BinaryOutputService } from './storage/mino'; import { capture } from './utils/analytics'; import { googleSheetUpdateTasks, processGoogleSheetUpdates } from './workflow-management/integrations/gsheet'; import { airtableUpdateTasks, processAirtableUpdates } from './workflow-management/integrations/airtable'; -import { RemoteBrowser } from './browser-management/classes/RemoteBrowser'; import { io as serverIo } from "./server"; import { sendWebhook } from './routes/webhook'; @@ -85,107 +83,6 @@ function AddGeneratedFlags(workflow: WorkflowFile) { return copy; }; -/** - * Helper function to extract and process scraped data from browser interpreter - */ -async function extractAndProcessScrapedData( - browser: RemoteBrowser, - run: any -): Promise<{ - categorizedOutput: any; - uploadedBinaryOutput: any; - totalDataPointsExtracted: number; - totalSchemaItemsExtracted: number; - totalListItemsExtracted: number; - extractedScreenshotsCount: number; -}> { - let categorizedOutput: { - scrapeSchema: Record; - scrapeList: Record; - } = { - scrapeSchema: {}, - scrapeList: {} - }; - - if ((browser?.interpreter?.serializableDataByType?.scrapeSchema ?? []).length > 0) { - browser?.interpreter?.serializableDataByType?.scrapeSchema?.forEach((schemaItem: any, index: any) => { - categorizedOutput.scrapeSchema[`schema-${index}`] = schemaItem; - }); - } - - if ((browser?.interpreter?.serializableDataByType?.scrapeList ?? []).length > 0) { - browser?.interpreter?.serializableDataByType?.scrapeList?.forEach((listItem: any, index: any) => { - categorizedOutput.scrapeList[`list-${index}`] = listItem; - }); - } - - const binaryOutput = browser?.interpreter?.binaryData?.reduce( - (reducedObject: Record, item: any, index: number): Record => { - return { - [`item-${index}`]: item, - ...reducedObject, - }; - }, - {} - ) || {}; - - let totalDataPointsExtracted = 0; - let totalSchemaItemsExtracted = 0; - let totalListItemsExtracted = 0; - let extractedScreenshotsCount = 0; - - if (categorizedOutput.scrapeSchema) { - Object.values(categorizedOutput.scrapeSchema).forEach((schemaResult: any) => { - if (Array.isArray(schemaResult)) { - schemaResult.forEach(obj => { - if (obj && typeof obj === 'object') { - totalDataPointsExtracted += Object.keys(obj).length; - } - }); - totalSchemaItemsExtracted += schemaResult.length; - } else if (schemaResult && typeof schemaResult === 'object') { - totalDataPointsExtracted += Object.keys(schemaResult).length; - totalSchemaItemsExtracted += 1; - } - }); - } - - if (categorizedOutput.scrapeList) { - Object.values(categorizedOutput.scrapeList).forEach((listResult: any) => { - if (Array.isArray(listResult)) { - listResult.forEach(obj => { - if (obj && typeof obj === 'object') { - totalDataPointsExtracted += Object.keys(obj).length; - } - }); - totalListItemsExtracted += listResult.length; - } - }); - } - - if (binaryOutput) { - extractedScreenshotsCount = Object.keys(binaryOutput).length; - totalDataPointsExtracted += extractedScreenshotsCount; - } - - const binaryOutputService = new BinaryOutputService('maxun-run-screenshots'); - const uploadedBinaryOutput = await binaryOutputService.uploadAndStoreBinaryOutput( - run, - binaryOutput - ); - - return { - categorizedOutput: { - scrapeSchema: categorizedOutput.scrapeSchema || {}, - scrapeList: categorizedOutput.scrapeList || {} - }, - uploadedBinaryOutput, - totalDataPointsExtracted, - totalSchemaItemsExtracted, - totalListItemsExtracted, - extractedScreenshotsCount - }; -} // Helper function to handle integration updates async function triggerIntegrationUpdates(runId: string, robotMetaId: string): Promise { @@ -234,6 +131,11 @@ async function processRunExecution(job: Job) { return { success: true }; } + if (run.status === 'queued') { + logger.log('info', `Run ${data.runId} has status 'queued', skipping stale execution job - processQueuedRuns will handle it`); + return { success: true }; + } + const plainRun = run.toJSON(); const browserId = data.browserId || plainRun.browserId; @@ -309,6 +211,9 @@ async function processRunExecution(job: Job) { // Execute the workflow const workflow = AddGeneratedFlags(recording.recording); + + browser.interpreter.setRunId(data.runId); + const interpretationInfo = await browser.interpreter.InterpretRecording( workflow, currentPage, @@ -326,79 +231,49 @@ async function processRunExecution(job: Job) { logger.log('info', `Workflow execution completed for run ${data.runId}`); - const binaryOutputService = new BinaryOutputService('maxun-run-screenshots'); - const uploadedBinaryOutput = await binaryOutputService.uploadAndStoreBinaryOutput(run, interpretationInfo.binaryOutput); - - const categorizedOutput = { - scrapeSchema: interpretationInfo.scrapeSchemaOutput || {}, - scrapeList: interpretationInfo.scrapeListOutput || {} - }; - if (await isRunAborted()) { logger.log('info', `Run ${data.runId} was aborted while processing results, not updating status`); return { success: true }; } await run.update({ - ...run, status: 'success', finishedAt: new Date().toLocaleString(), - browserId: plainRun.browserId, - log: interpretationInfo.log.join('\n'), - serializableOutput: { - scrapeSchema: Object.values(categorizedOutput.scrapeSchema), - scrapeList: Object.values(categorizedOutput.scrapeList), - }, - binaryOutput: uploadedBinaryOutput, + log: interpretationInfo.log.join('\n') }); - // Track extraction metrics - let totalDataPointsExtracted = 0; let totalSchemaItemsExtracted = 0; let totalListItemsExtracted = 0; let extractedScreenshotsCount = 0; - if (categorizedOutput.scrapeSchema) { - Object.values(categorizedOutput.scrapeSchema).forEach((schemaResult: any) => { - if (Array.isArray(schemaResult)) { - schemaResult.forEach(obj => { - if (obj && typeof obj === 'object') { - totalDataPointsExtracted += Object.keys(obj).length; + const updatedRun = await Run.findOne({ where: { runId: data.runId } }); + if (updatedRun) { + if (updatedRun.serializableOutput) { + if (updatedRun.serializableOutput.scrapeSchema) { + Object.values(updatedRun.serializableOutput.scrapeSchema).forEach((schemaResult: any) => { + if (Array.isArray(schemaResult)) { + totalSchemaItemsExtracted += schemaResult.length; + } else if (schemaResult && typeof schemaResult === 'object') { + totalSchemaItemsExtracted += 1; } }); - totalSchemaItemsExtracted += schemaResult.length; - } else if (schemaResult && typeof schemaResult === 'object') { - totalDataPointsExtracted += Object.keys(schemaResult).length; - totalSchemaItemsExtracted += 1; } - }); - } - - if (categorizedOutput.scrapeList) { - Object.values(categorizedOutput.scrapeList).forEach((listResult: any) => { - if (Array.isArray(listResult)) { - listResult.forEach(obj => { - if (obj && typeof obj === 'object') { - totalDataPointsExtracted += Object.keys(obj).length; + + if (updatedRun.serializableOutput.scrapeList) { + Object.values(updatedRun.serializableOutput.scrapeList).forEach((listResult: any) => { + if (Array.isArray(listResult)) { + totalListItemsExtracted += listResult.length; } }); - totalListItemsExtracted += listResult.length; } - }); - } - - if (uploadedBinaryOutput) { - extractedScreenshotsCount = Object.keys(uploadedBinaryOutput).length; - totalDataPointsExtracted += extractedScreenshotsCount; + } + + if (updatedRun.binaryOutput) { + extractedScreenshotsCount = Object.keys(updatedRun.binaryOutput).length; + } } const totalRowsExtracted = totalSchemaItemsExtracted + totalListItemsExtracted; - - console.log(`Extracted Schema Items Count: ${totalSchemaItemsExtracted}`); - console.log(`Extracted List Items Count: ${totalListItemsExtracted}`); - console.log(`Extracted Screenshots Count: ${extractedScreenshotsCount}`); - console.log(`Total Rows Extracted: ${totalRowsExtracted}`); - console.log(`Total Data Points Extracted: ${totalDataPointsExtracted}`); // Capture metrics capture( @@ -415,7 +290,6 @@ async function processRunExecution(job: Job) { } ); - // Trigger webhooks for run completion const webhookPayload = { robot_id: plainRun.robotMetaId, run_id: data.runId, @@ -424,13 +298,12 @@ async function processRunExecution(job: Job) { started_at: plainRun.startedAt, finished_at: new Date().toLocaleString(), extracted_data: { - captured_texts: Object.values(categorizedOutput.scrapeSchema).flat() || [], - captured_lists: categorizedOutput.scrapeList, + captured_texts: updatedRun?.serializableOutput?.scrapeSchema ? Object.values(updatedRun.serializableOutput.scrapeSchema).flat() : [], + captured_lists: updatedRun?.serializableOutput?.scrapeList || {}, total_rows: totalRowsExtracted, captured_texts_count: totalSchemaItemsExtracted, captured_lists_count: totalListItemsExtracted, screenshots_count: extractedScreenshotsCount, - total_data_points_extracted: totalDataPointsExtracted, }, metadata: { browser_id: plainRun.browserId, @@ -475,30 +348,18 @@ async function processRunExecution(job: Job) { }; try { - if (browser && browser.interpreter) { - const hasSchemaData = (browser.interpreter.serializableDataByType?.scrapeSchema ?? []).length > 0; - const hasListData = (browser.interpreter.serializableDataByType?.scrapeList ?? []).length > 0; - const hasBinaryData = (browser.interpreter.binaryData ?? []).length > 0; - - if (hasSchemaData || hasListData || hasBinaryData) { - logger.log('info', `Extracting partial data from failed run ${data.runId}`); - - partialData = await extractAndProcessScrapedData(browser, run); - - partialUpdateData.serializableOutput = { - scrapeSchema: Object.values(partialData.categorizedOutput.scrapeSchema), - scrapeList: Object.values(partialData.categorizedOutput.scrapeList), - }; - partialUpdateData.binaryOutput = partialData.uploadedBinaryOutput; - - partialDataExtracted = true; - logger.log('info', `Partial data extracted for failed run ${data.runId}: ${partialData.totalDataPointsExtracted} data points`); - - await triggerIntegrationUpdates(plainRun.runId, plainRun.robotMetaId); - } + const hasData = (run.serializableOutput && + ((run.serializableOutput.scrapeSchema && run.serializableOutput.scrapeSchema.length > 0) || + (run.serializableOutput.scrapeList && run.serializableOutput.scrapeList.length > 0))) || + (run.binaryOutput && Object.keys(run.binaryOutput).length > 0); + + if (hasData) { + logger.log('info', `Partial data found in failed run ${data.runId}, triggering integration updates`); + await triggerIntegrationUpdates(plainRun.runId, plainRun.robotMetaId); + partialDataExtracted = true; } - } catch (partialDataError: any) { - logger.log('warn', `Failed to extract partial data for run ${data.runId}: ${partialDataError.message}`); + } catch (dataCheckError: any) { + logger.log('warn', `Failed to check for partial data in run ${data.runId}: ${dataCheckError.message}`); } await run.update(partialUpdateData); @@ -652,7 +513,9 @@ async function processRunExecution(job: Job) { async function abortRun(runId: string, userId: string): Promise { try { - const run = await Run.findOne({ where: { runId: runId } }); + const run = await Run.findOne({ + where: { runId: runId } + }); if (!run) { logger.log('warn', `Run ${runId} not found or does not belong to user ${userId}`); @@ -702,24 +565,18 @@ async function abortRun(runId: string, userId: string): Promise { return true; } - let currentLog = 'Run aborted by user'; - const extractedData = await extractAndProcessScrapedData(browser, run); - - console.log(`Total Data Points Extracted in aborted run: ${extractedData.totalDataPointsExtracted}`); - await run.update({ status: 'aborted', finishedAt: new Date().toLocaleString(), - browserId: plainRun.browserId, - log: currentLog, - serializableOutput: { - scrapeSchema: Object.values(extractedData.categorizedOutput.scrapeSchema), - scrapeList: Object.values(extractedData.categorizedOutput.scrapeList), - }, - binaryOutput: extractedData.uploadedBinaryOutput, + log: 'Run aborted by user' }); - if (extractedData.totalDataPointsExtracted > 0) { + const hasData = (run.serializableOutput && + ((run.serializableOutput.scrapeSchema && run.serializableOutput.scrapeSchema.length > 0) || + (run.serializableOutput.scrapeList && run.serializableOutput.scrapeList.length > 0))) || + (run.binaryOutput && Object.keys(run.binaryOutput).length > 0); + + if (hasData) { await triggerIntegrationUpdates(runId, plainRun.robotMetaId); } @@ -751,9 +608,52 @@ async function abortRun(runId: string, userId: string): Promise { } } +// Track registered queues globally for individual queue registration +const registeredUserQueues = new Map(); +const registeredAbortQueues = new Map(); + +async function registerWorkerForQueue(queueName: string) { + if (!registeredUserQueues.has(queueName)) { + await pgBoss.work(queueName, async (job: Job | Job[]) => { + try { + const singleJob = Array.isArray(job) ? job[0] : job; + return await processRunExecution(singleJob); + } catch (error: unknown) { + const errorMessage = error instanceof Error ? error.message : String(error); + logger.log('error', `Run execution job failed in ${queueName}: ${errorMessage}`); + throw error; + } + }); + + registeredUserQueues.set(queueName, true); + logger.log('info', `Registered worker for queue: ${queueName}`); + } +} + +async function registerAbortWorkerForQueue(queueName: string) { + if (!registeredAbortQueues.has(queueName)) { + await pgBoss.work(queueName, async (job: Job | Job[]) => { + try { + const data = extractJobData(job); + const { userId, runId } = data; + + logger.log('info', `Processing abort request for run ${runId} by user ${userId}`); + const success = await abortRun(runId, userId); + return { success }; + } catch (error: unknown) { + const errorMessage = error instanceof Error ? error.message : String(error); + logger.log('error', `Abort run job failed in ${queueName}: ${errorMessage}`); + throw error; + } + }); + + registeredAbortQueues.set(queueName, true); + logger.log('info', `Registered abort worker for queue: ${queueName}`); + } +} + async function registerRunExecutionWorker() { try { - const registeredUserQueues = new Map(); // Worker for executing runs (Legacy) await pgBoss.work('execute-run', async (job: Job | Job[]) => { @@ -951,9 +851,6 @@ async function startWorkers() { } } -// Start all workers -startWorkers(); - pgBoss.on('error', (error) => { logger.log('error', `PgBoss error: ${error.message}`); }); @@ -972,4 +869,4 @@ process.on('SIGINT', async () => { }); // For use in other files -export { pgBoss }; +export { pgBoss, registerWorkerForQueue, registerAbortWorkerForQueue, startWorkers }; diff --git a/server/src/routes/storage.ts b/server/src/routes/storage.ts index 35491f8ca..bc33f3dc8 100644 --- a/server/src/routes/storage.ts +++ b/server/src/routes/storage.ts @@ -17,7 +17,7 @@ import { capture } from "../utils/analytics"; import { encrypt, decrypt } from '../utils/auth'; import { WorkflowFile } from 'maxun-core'; import { cancelScheduledWorkflow, scheduleWorkflow } from '../schedule-worker'; -import { pgBoss } from '../pgboss-worker'; +import { pgBoss, registerWorkerForQueue, registerAbortWorkerForQueue } from '../pgboss-worker'; chromium.use(stealthPlugin()); export const router = Router(); @@ -573,6 +573,7 @@ router.put('/runs/:id', requireSignIn, async (req: AuthenticatedRequest, res) => try { const userQueueName = `execute-run-user-${req.user.id}`; await pgBoss.createQueue(userQueueName); + await registerWorkerForQueue(userQueueName); const jobId = await pgBoss.send(userQueueName, { userId: req.user.id, @@ -690,6 +691,7 @@ router.post('/runs/run/:id', requireSignIn, async (req: AuthenticatedRequest, re // Queue the execution job await pgBoss.createQueue(userQueueName); + await registerWorkerForQueue(userQueueName); const jobId = await pgBoss.send(userQueueName, { userId: req.user.id, @@ -949,8 +951,20 @@ router.post('/runs/abort/:id', requireSignIn, async (req: AuthenticatedRequest, }); } + // Immediately stop interpreter like cloud version + try { + const browser = browserPool.getRemoteBrowser(run.browserId); + if (browser && browser.interpreter) { + logger.log('info', `Immediately stopping interpreter for run ${req.params.id}`); + await browser.interpreter.stopInterpretation(); + } + } catch (immediateStopError: any) { + logger.log('warn', `Failed to immediately stop interpreter: ${immediateStopError.message}`); + } + const userQueueName = `abort-run-user-${req.user.id}`; await pgBoss.createQueue(userQueueName); + await registerAbortWorkerForQueue(userQueueName); const jobId = await pgBoss.send(userQueueName, { userId: req.user.id, @@ -961,7 +975,7 @@ router.post('/runs/abort/:id', requireSignIn, async (req: AuthenticatedRequest, return res.send({ success: true, - message: 'Abort signal sent', + message: 'Run stopped immediately, cleanup queued', jobId, isQueued: false }); @@ -1018,6 +1032,7 @@ async function processQueuedRuns() { const userQueueName = `execute-run-user-${userId}`; await pgBoss.createQueue(userQueueName); + await registerWorkerForQueue(userQueueName); const jobId = await pgBoss.send(userQueueName, { userId: userId, @@ -1041,4 +1056,81 @@ async function processQueuedRuns() { } } +/** + * Recovers orphaned runs that were left in "running" status due to instance crashes + * This function runs on server startup to ensure data reliability + */ +export async function recoverOrphanedRuns() { + try { + logger.log('info', 'Starting recovery of orphaned runs...'); + + const orphanedRuns = await Run.findAll({ + where: { + status: ['running', 'scheduled'] + }, + order: [['startedAt', 'ASC']] + }); + + if (orphanedRuns.length === 0) { + logger.log('info', 'No orphaned runs found'); + return; + } + + logger.log('info', `Found ${orphanedRuns.length} orphaned runs to recover (including scheduled runs)`); + + for (const run of orphanedRuns) { + try { + const runData = run.toJSON(); + logger.log('info', `Recovering orphaned run: ${runData.runId}`); + + const browser = browserPool.getRemoteBrowser(runData.browserId); + + if (!browser) { + const retryCount = runData.retryCount || 0; + + if (retryCount < 3) { + await run.update({ + status: 'queued', + retryCount: retryCount + 1, + serializableOutput: {}, + binaryOutput: {}, + browserId: undefined, + log: runData.log ? `${runData.log}\n[RETRY ${retryCount + 1}/3] Re-queuing due to server crash` : `[RETRY ${retryCount + 1}/3] Re-queuing due to server crash` + }); + + logger.log('info', `Re-queued crashed run ${runData.runId} (retry ${retryCount + 1}/3)`); + } else { + const crashRecoveryMessage = `Max retries exceeded (3/3) - Run failed after multiple server crashes.`; + + await run.update({ + status: 'failed', + finishedAt: new Date().toLocaleString(), + log: runData.log ? `${runData.log}\n${crashRecoveryMessage}` : crashRecoveryMessage + }); + + logger.log('warn', `Max retries reached for run ${runData.runId}, marked as permanently failed`); + } + + if (runData.browserId) { + try { + browserPool.deleteRemoteBrowser(runData.browserId); + logger.log('info', `Cleaned up stale browser reference: ${runData.browserId}`); + } catch (cleanupError: any) { + logger.log('warn', `Failed to cleanup browser reference ${runData.browserId}: ${cleanupError.message}`); + } + } + } else { + logger.log('info', `Run ${runData.runId} browser still active, not orphaned`); + } + } catch (runError: any) { + logger.log('error', `Failed to recover run ${run.runId}: ${runError.message}`); + } + } + + logger.log('info', `Orphaned run recovery completed. Processed ${orphanedRuns.length} runs.`); + } catch (error: any) { + logger.log('error', `Failed to recover orphaned runs: ${error.message}`); + } +} + export { processQueuedRuns }; diff --git a/server/src/server.ts b/server/src/server.ts index 5b729b721..c49b367b1 100644 --- a/server/src/server.ts +++ b/server/src/server.ts @@ -20,7 +20,8 @@ import connectPgSimple from 'connect-pg-simple'; import pg from 'pg'; import session from 'express-session'; import Run from './models/Run'; -import { processQueuedRuns } from './routes/storage'; +import { processQueuedRuns, recoverOrphanedRuns } from './routes/storage'; +import { startWorkers } from './pgboss-worker'; const app = express(); app.use(cors({ @@ -143,6 +144,12 @@ if (require.main === module) { await connectDB(); await syncDB(); + logger.log('info', 'Cleaning up stale browser slots...'); + browserPool.cleanupStaleBrowserSlots(); + + await recoverOrphanedRuns(); + await startWorkers(); + io = new Server(server); io.of('/queued-run').on('connection', (socket) => { @@ -211,20 +218,6 @@ if (require.main === module) { if (require.main === module) { process.on('SIGINT', async () => { console.log('Main app shutting down...'); - try { - await Run.update( - { - status: 'failed', - finishedAt: new Date().toLocaleString(), - log: 'Process interrupted during execution - worker shutdown' - }, - { - where: { status: 'running' } - } - ); - } catch (error: any) { - console.error('Error updating runs:', error); - } try { console.log('Closing PostgreSQL connection pool...'); diff --git a/server/src/workflow-management/classes/Interpreter.ts b/server/src/workflow-management/classes/Interpreter.ts index f249f26eb..0ed19f19d 100644 --- a/server/src/workflow-management/classes/Interpreter.ts +++ b/server/src/workflow-management/classes/Interpreter.ts @@ -4,6 +4,7 @@ import { Socket } from "socket.io"; import { Page } from "playwright"; import { InterpreterSettings } from "../../types"; import { decrypt } from "../../utils/auth"; +import Run from "../../models/Run"; /** * Decrypts any encrypted inputs in the workflow. If checkLimit is true, it will also handle the limit validation for scrapeList action. @@ -112,6 +113,11 @@ export class WorkflowInterpreter { */ private currentScrapeListIndex: number = 0; + /** + * Current run ID for real-time persistence + */ + private currentRunId: string | null = null; + /** * An array of id's of the pairs from the workflow that are about to be paused. * As "breakpoints". @@ -128,10 +134,12 @@ export class WorkflowInterpreter { /** * A public constructor taking a socket instance for communication with the client. * @param socket Socket.io socket instance enabling communication with the client (frontend) side. + * @param runId Optional run ID for real-time data persistence * @constructor */ - constructor(socket: Socket) { + constructor(socket: Socket, runId?: string) { this.socket = socket; + this.currentRunId = runId || null; } /** @@ -202,8 +210,14 @@ export class WorkflowInterpreter { this.currentActionType = type; } }, - serializableCallback: (data: any) => { + serializableCallback: async (data: any) => { if (this.currentActionType === 'scrapeSchema') { + const cumulativeScrapeSchemaData = Array.isArray(data) && data.length > 0 ? data : [data]; + + if (cumulativeScrapeSchemaData.length > 0) { + await this.persistDataToDatabase('scrapeSchema', cumulativeScrapeSchemaData); + } + if (Array.isArray(data) && data.length > 0) { this.socket.emit('serializableCallback', { type: 'captureText', @@ -216,13 +230,24 @@ export class WorkflowInterpreter { }); } } else if (this.currentActionType === 'scrapeList') { + if (data && Array.isArray(data) && data.length > 0) { + // Use the current index for persistence + await this.persistDataToDatabase('scrapeList', data, this.currentScrapeListIndex); + } + this.socket.emit('serializableCallback', { type: 'captureList', data }); } }, - binaryCallback: (data: string, mimetype: string) => { + binaryCallback: async (data: string, mimetype: string) => { + const binaryItem = { mimetype, data: JSON.stringify(data) }; + this.binaryData.push(binaryItem); + + // Persist binary data to database + await this.persistBinaryDataToDatabase(binaryItem); + this.socket.emit('binaryCallback', { data, mimetype, @@ -272,6 +297,10 @@ export class WorkflowInterpreter { public stopInterpretation = async () => { if (this.interpreter) { logger.log('info', 'Stopping the interpretation.'); + + this.interpreter.abort(); + logger.log('info', 'maxun-core interpreter aborted - data collection stopped immediately'); + await this.interpreter.stop(); this.socket.emit('log', '----- The interpretation has been stopped -----', false); this.clearState(); @@ -294,8 +323,115 @@ export class WorkflowInterpreter { }; this.binaryData = []; this.currentScrapeListIndex = 0; + this.currentRunId = null; } + /** + * Sets the current run ID for real-time persistence. + * @param runId The run ID to set + */ + public setRunId = (runId: string): void => { + this.currentRunId = runId; + logger.log('debug', `Set run ID for real-time persistence: ${runId}`); + }; + + /** + * Persists data to database in real-time during interpretation + * @private + */ + private persistDataToDatabase = async (actionType: string, data: any, listIndex?: number): Promise => { + if (!this.currentRunId) { + logger.log('debug', 'No run ID available for real-time persistence'); + return; + } + + try { + const run = await Run.findOne({ where: { runId: this.currentRunId } }); + + if (!run) { + logger.log('warn', `Run not found for real-time persistence: ${this.currentRunId}`); + return; + } + + const currentSerializableOutput = run.serializableOutput ? + JSON.parse(JSON.stringify(run.serializableOutput)) : + { scrapeSchema: [], scrapeList: [] }; + + if (actionType === 'scrapeSchema') { + const newSchemaData = Array.isArray(data) ? data : [data]; + const updatedOutput = { + ...currentSerializableOutput, + scrapeSchema: newSchemaData + }; + + await run.update({ + serializableOutput: updatedOutput + }); + + logger.log('debug', `Persisted scrapeSchema data for run ${this.currentRunId}: ${newSchemaData.length} items`); + + } else if (actionType === 'scrapeList' && typeof listIndex === 'number') { + if (!Array.isArray(currentSerializableOutput.scrapeList)) { + currentSerializableOutput.scrapeList = []; + } + + const updatedList = [...currentSerializableOutput.scrapeList]; + updatedList[listIndex] = data; + + const updatedOutput = { + ...currentSerializableOutput, + scrapeList: updatedList + }; + + await run.update({ + serializableOutput: updatedOutput + }); + + logger.log('debug', `Persisted scrapeList data for run ${this.currentRunId} at index ${listIndex}: ${Array.isArray(data) ? data.length : 'N/A'} items`); + } + } catch (error: any) { + logger.log('error', `Failed to persist data in real-time for run ${this.currentRunId}: ${error.message}`); + } + }; + + /** + * Persists binary data to database in real-time + * @private + */ + private persistBinaryDataToDatabase = async (binaryItem: { mimetype: string, data: string }): Promise => { + if (!this.currentRunId) { + logger.log('debug', 'No run ID available for binary data persistence'); + return; + } + + try { + const run = await Run.findOne({ where: { runId: this.currentRunId } }); + if (!run) { + logger.log('warn', `Run not found for binary data persistence: ${this.currentRunId}`); + return; + } + + const currentBinaryOutput = run.binaryOutput ? + JSON.parse(JSON.stringify(run.binaryOutput)) : + {}; + + const uniqueKey = `item-${Date.now()}-${Object.keys(currentBinaryOutput).length}`; + + const updatedBinaryOutput = { + ...currentBinaryOutput, + [uniqueKey]: binaryItem + }; + + await run.update({ + binaryOutput: updatedBinaryOutput + }); + + logger.log('debug', `Persisted binary data for run ${this.currentRunId}: ${binaryItem.mimetype}`); + } catch (error: any) { + logger.log('error', `Failed to persist binary data in real-time for run ${this.currentRunId}: ${error.message}`); + } + }; + /** * Interprets the recording as a run. * @param workflow The workflow to interpret. @@ -333,7 +469,7 @@ export class WorkflowInterpreter { this.currentScrapeListIndex++; } }, - serializableCallback: (data: any) => { + serializableCallback: async (data: any) => { if (this.currentActionType === 'scrapeSchema') { if (Array.isArray(data) && data.length > 0) { mergedScrapeSchema = { ...mergedScrapeSchema, ...data[0] }; @@ -342,14 +478,29 @@ export class WorkflowInterpreter { mergedScrapeSchema = { ...mergedScrapeSchema, ...data }; this.serializableDataByType.scrapeSchema.push([data]); } + + // Persist the cumulative scrapeSchema data + const cumulativeScrapeSchemaData = Object.keys(mergedScrapeSchema).length > 0 ? [mergedScrapeSchema] : []; + if (cumulativeScrapeSchemaData.length > 0) { + await this.persistDataToDatabase('scrapeSchema', cumulativeScrapeSchemaData); + } } else if (this.currentActionType === 'scrapeList') { + if (data && Array.isArray(data) && data.length > 0) { + // Use the current index for persistence + await this.persistDataToDatabase('scrapeList', data, this.currentScrapeListIndex); + } this.serializableDataByType.scrapeList[this.currentScrapeListIndex] = data; } this.socket.emit('serializableCallback', data); }, binaryCallback: async (data: string, mimetype: string) => { - this.binaryData.push({ mimetype, data: JSON.stringify(data) }); + const binaryItem = { mimetype, data: JSON.stringify(data) }; + this.binaryData.push(binaryItem); + + // Persist binary data to database + await this.persistBinaryDataToDatabase(binaryItem); + this.socket.emit('binaryCallback', { data, mimetype }); } } diff --git a/server/src/workflow-management/scheduler/index.ts b/server/src/workflow-management/scheduler/index.ts index b40e55f2e..7dbafab68 100644 --- a/server/src/workflow-management/scheduler/index.ts +++ b/server/src/workflow-management/scheduler/index.ts @@ -106,6 +106,39 @@ async function executeRun(id: string, userId: string) { const plainRun = run.toJSON(); + if (run.status === 'aborted' || run.status === 'aborting') { + logger.log('info', `Scheduled Run ${id} has status ${run.status}, skipping execution`); + return { + success: false, + error: `Run has status ${run.status}` + } + } + + if (run.status === 'queued') { + logger.log('info', `Scheduled Run ${id} has status 'queued', skipping stale execution - will be handled by recovery`); + return { + success: false, + error: 'Run is queued and will be handled by recovery' + } + } + + const retryCount = plainRun.retryCount || 0; + if (retryCount >= 3) { + logger.log('warn', `Scheduled Run ${id} has exceeded max retries (${retryCount}/3), marking as failed`); + const recording = await Robot.findOne({ where: { 'recording_meta.id': plainRun.robotMetaId, userId }, raw: true }); + + await run.update({ + status: 'failed', + finishedAt: new Date().toLocaleString(), + log: plainRun.log ? `${plainRun.log}\nMax retries exceeded (3/3) - Run failed after multiple attempts.` : `Max retries exceeded (3/3) - Run failed after multiple attempts.` + }); + + return { + success: false, + error: 'Max retries exceeded' + } + } + const recording = await Robot.findOne({ where: { 'recording_meta.id': plainRun.robotMetaId }, raw: true }); if (!recording) { return { @@ -127,58 +160,52 @@ async function executeRun(id: string, userId: string) { } const workflow = AddGeneratedFlags(recording.recording); + + // Set run ID for real-time data persistence + browser.interpreter.setRunId(id); + const interpretationInfo = await browser.interpreter.InterpretRecording( workflow, currentPage, (newPage: Page) => currentPage = newPage, plainRun.interpreterSettings ); - const binaryOutputService = new BinaryOutputService('maxun-run-screenshots'); - const uploadedBinaryOutput = await binaryOutputService.uploadAndStoreBinaryOutput(run, interpretationInfo.binaryOutput); - - const categorizedOutput = { - scrapeSchema: interpretationInfo.scrapeSchemaOutput || {}, - scrapeList: interpretationInfo.scrapeListOutput || {}, - }; - await destroyRemoteBrowser(plainRun.browserId, userId); await run.update({ - ...run, status: 'success', finishedAt: new Date().toLocaleString(), - browserId: plainRun.browserId, log: interpretationInfo.log.join('\n'), - serializableOutput: { - scrapeSchema: Object.values(categorizedOutput.scrapeSchema), - scrapeList: Object.values(categorizedOutput.scrapeList), - }, - binaryOutput: uploadedBinaryOutput, }); - // Track extraction metrics + // Get metrics from persisted data for analytics and webhooks let totalSchemaItemsExtracted = 0; let totalListItemsExtracted = 0; let extractedScreenshotsCount = 0; - if (categorizedOutput.scrapeSchema) { - Object.values(categorizedOutput.scrapeSchema).forEach((schemaResult: any) => { - if (Array.isArray(schemaResult)) { - totalSchemaItemsExtracted += schemaResult.length; - } else if (schemaResult && typeof schemaResult === 'object') { - totalSchemaItemsExtracted += 1; + const updatedRun = await Run.findOne({ where: { runId: id } }); + if (updatedRun) { + if (updatedRun.serializableOutput) { + if (updatedRun.serializableOutput.scrapeSchema) { + Object.values(updatedRun.serializableOutput.scrapeSchema).forEach((schemaResult: any) => { + if (Array.isArray(schemaResult)) { + totalSchemaItemsExtracted += schemaResult.length; + } else if (schemaResult && typeof schemaResult === 'object') { + totalSchemaItemsExtracted += 1; + } + }); } - }); - } - - if (categorizedOutput.scrapeList) { - Object.values(categorizedOutput.scrapeList).forEach((listResult: any) => { - if (Array.isArray(listResult)) { - totalListItemsExtracted += listResult.length; + + if (updatedRun.serializableOutput.scrapeList) { + Object.values(updatedRun.serializableOutput.scrapeList).forEach((listResult: any) => { + if (Array.isArray(listResult)) { + totalListItemsExtracted += listResult.length; + } + }); } - }); - } - - if (uploadedBinaryOutput) { - extractedScreenshotsCount = Object.keys(uploadedBinaryOutput).length; + } + + if (updatedRun.binaryOutput) { + extractedScreenshotsCount = Object.keys(updatedRun.binaryOutput).length; + } } const totalRowsExtracted = totalSchemaItemsExtracted + totalListItemsExtracted; @@ -204,8 +231,8 @@ async function executeRun(id: string, userId: string) { started_at: plainRun.startedAt, finished_at: new Date().toLocaleString(), extracted_data: { - captured_texts: Object.values(categorizedOutput.scrapeSchema).flat() || [], - captured_lists: categorizedOutput.scrapeList, + captured_texts: updatedRun?.serializableOutput?.scrapeSchema ? Object.values(updatedRun.serializableOutput.scrapeSchema).flat() : [], + captured_lists: updatedRun?.serializableOutput?.scrapeList || {}, total_rows: totalRowsExtracted, captured_texts_count: totalSchemaItemsExtracted, captured_lists_count: totalListItemsExtracted,