Skip to content

Conversation

@RohitR311
Copy link
Collaborator

@RohitR311 RohitR311 commented Sep 9, 2025

What this PR does?

  1. Ensures data of a run is persisted to the database immediately after each action is successfully executed.
  2. Gets all running browsers from the pool on interruption and stores their run data in the database.
  3. Ensures the running data are queued if server crashes midway.

Summary by CodeRabbit

  • New Features

    • Added run abortion that stops workflows immediately and propagates across steps.
    • Live, in-run data persistence for texts, lists, and screenshots.
    • Automatic recovery of orphaned runs on startup with retry policy.
  • Improvements

    • Webhook payloads now include extracted row counts and screenshot counts.
    • More reliable queue worker registration and startup sequencing.
    • Browser slot cleanup runs on startup; stale reservations auto-purged.
    • Clearer abort response: “Run stopped immediately, cleanup queued.”
  • Bug Fixes

    • Prevents duplicate browser reservations and cleans up failed slots.
    • Skips execution for queued/aborted runs and handles partial data consistently.

@RohitR311 RohitR311 marked this pull request as draft September 9, 2025 18:44
@coderabbitai
Copy link

coderabbitai bot commented Sep 9, 2025

Walkthrough

Implements cooperative aborts in core interpreter; adds real-time persistence of extracted data tied to Run IDs; reworks server workflows to compute metrics from persisted DB state; introduces atomic browser-slot reservation with cleanup; adds retryCount to Run; overhauls PgBoss worker registration and startup; adds orphaned-run recovery and startup cleanup.

Changes

Cohort / File(s) Summary
Core interpreter aborts
maxun-core/src/interpret.ts
Adds isAborted flag and abort() method; injects abort checks across step execution, WAW actions, pagination, and run loop; logs WARN on early exits.
Run execution API updates
server/src/api/record.ts
Stops in-process output assembly; reads finalRun for metrics; updates webhook payload fields; sets interpreter RunId; narrows run.update fields; route now uses AuthenticatedRequest.
BrowserPool slot mgmt
server/src/browser-management/classes/BrowserPool.ts
Adds createdAt/lastAccessed to pool info; introduces atomic reservation with per-user/state lock; stale-slot and lock cleanup; enhanced fail cleanup; exposes getAllBrowsers().
Run model retry counter
server/src/models/Run.ts
Adds retryCount (attr, class prop, DB column with default 0).
PgBoss workers and persistence shift
server/src/pgboss-worker.ts
Removes BinaryOutputService path; derives metrics/webhooks from DB; skips queued runs; propagates runId to interpreter; exports registerWorkerForQueue, registerAbortWorkerForQueue, startWorkers; lazy per-queue registration.
Queues, aborts, and recovery
server/src/routes/storage.ts
Registers per-user workers; enqueues abort workers; attempts immediate interpreter stop on abort; exports recoverOrphanedRuns with retry logic and browser cleanup; integrates worker registration in queued-run paths.
Server startup orchestration
server/src/server.ts
On startup: cleanup stale browser slots, recover orphaned runs, start workers; removes SIGINT mass-fail logic; retains processQueuedRuns loop.
Workflow interpreter persistence
server/src/workflow-management/classes/Interpreter.ts
Constructor accepts optional runId; setRunId(); async callbacks persist serializable/binary outputs to Run during interpretation; stop triggers abort; interpretRecording returns structured result.
Scheduler run flow updates
server/src/workflow-management/scheduler/index.ts
Skips aborted/queued; enforces max 3 retries; sets interpreter runId; success path relies on persisted outputs for metrics/webhooks; no inline output storage.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  participant Client
  participant API as Server API
  participant Queue as PgBoss
  participant Worker
  participant Pool as BrowserPool
  participant Browser
  participant Interp as WorkflowInterpreter
  participant DB as DB (Run)

  Client->>API: POST /robots/:id/runs
  API->>DB: Create Run (status=scheduled/queued)
  API->>Queue: Enqueue user run queue
  API->>Worker: registerWorkerForQueue(userQueue)

  Note over Worker,Queue: Lazy, per-user queue registration

  Worker->>DB: Fetch Run
  Worker->>Pool: reserveBrowserSlotAtomic(userId,state)
  Pool-->>Worker: slot reserved / browser ready
  Worker->>Browser: attach interpreter
  Worker->>Interp: setRunId(runId)
  Worker->>Interp: interpretRecording()

  par Live persistence
    Interp->>DB: Update Run.serializableOutput (scrapeSchema/list)
    Interp->>DB: Update Run.binaryOutput (screenshots)
  end

  Interp-->>Worker: result (logs/status)
  Worker->>DB: Update Run (status, finishedAt, log)
  Worker->>DB: Read persisted outputs
  Worker-->>Client: Webhook with counts (texts/lists/screenshots)
Loading
sequenceDiagram
  autonumber
  participant Client
  participant API as Server API
  participant Queue as PgBoss
  participant AbortW as Abort Worker
  participant Pool as BrowserPool
  participant Browser
  participant Interp as WorkflowInterpreter
  participant Core as maxun-core Interpreter

  Client->>API: POST /runs/:id/abort
  API->>Queue: Enqueue abort queue
  API->>AbortW: registerAbortWorkerForQueue
  AbortW->>Pool: find browser by run.browserId
  AbortW->>Browser: locate interpreter
  AbortW->>Interp: stopInterpretation() / abort
  Interp->>Core: abort()
  Core-->>Interp: early exits with WARN logs
  AbortW-->>Client: "Run stopped immediately, cleanup queued"
Loading

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~120 minutes

Possibly related PRs

Suggested labels

Type: Feature, Type: Infra, Scope: Infra

Suggested reviewers

  • amhsirak

Pre-merge checks (1 passed, 1 warning, 1 inconclusive)

❌ Failed checks (1 warning, 1 inconclusive)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 33.33% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
Title Check ❓ Inconclusive The current title uses broad terms “extraction” and “platform stability” which do touch on the PR’s work but remain vague and do not specifically convey key features like real-time run data persistence or crash recovery. Consider revising the title to clearly reflect the main change, for example “Add real-time run data persistence and orphaned run recovery for improved extraction stability.”
✅ Passed checks (1 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.

Poem

I thump my paws—queues spin, then stall,
A whisker-twitch: “Abort!” I call.
Slots reserved, the clocks align,
Runs recovered, metrics fine.
I nibble logs, persist each bite—
Three hops, retried, then sets to right.
Carrot-webhooks shipped tonight! 🥕

Tip

👮 Agentic pre-merge checks are now available in preview!

Pro plan users can now enable pre-merge checks in their settings to enforce checklists before merging PRs.

  • Built-in checks – Quickly apply ready-made checks to enforce title conventions, require pull request descriptions that follow templates, validate linked issues for compliance, and more.
  • Custom agentic checks – Define your own rules using CodeRabbit’s advanced agentic capabilities to enforce organization-specific policies and workflows. For example, you can instruct CodeRabbit’s agent to verify that API documentation is updated whenever API schema files are modified in a PR. Note: Upto 5 custom checks are currently allowed during the preview period. Pricing for this feature will be announced in a few weeks.

Example:

reviews:
  pre_merge_checks:
    custom_checks:
      - name: "Undocumented Breaking Changes"
        mode: "warning"
        instructions: |
          Pass/fail criteria: All breaking changes to public APIs, CLI flags, environment variables, configuration keys, database schemas, or HTTP/GraphQL endpoints must be documented in the "Breaking Change" section of the PR description and in CHANGELOG.md. Exclude purely internal or private changes (e.g., code not exported from package entry points or explicitly marked as internal).

Please share your feedback with us on this Discord post.

✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch extract-reliable

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@RohitR311 RohitR311 changed the title feat: extraction and platform stability feat(maxun-core): extraction and platform stability Sep 9, 2025
@RohitR311 RohitR311 marked this pull request as ready for review September 10, 2025 04:52
@RohitR311 RohitR311 requested a review from amhsirak September 10, 2025 04:52
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 9

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (5)
server/src/workflow-management/classes/Interpreter.ts (2)

29-45: Decryption clobbers plaintext inputs on failure.

Blindly decrypting any string sets args to '' on error, breaking normal (unencrypted) inputs.

Apply guarded decrypt with fallback:

-      if ((action.action === 'type' || action.action === 'press') && Array.isArray(action.args) && action.args.length > 1) {
+      if ((action.action === 'type' || action.action === 'press') && Array.isArray(action.args) && action.args.length > 1) {
         try {
           const encryptedValue = action.args[1];
           if (typeof encryptedValue === 'string') {
-            const decryptedValue = decrypt(encryptedValue);
-            action.args[1] = decryptedValue;
+            // Decrypt only if value is marked/enveloped as encrypted; otherwise keep as-is
+            if (encryptedValue.startsWith('enc:')) {
+              const decryptedValue = decrypt(encryptedValue);
+              action.args[1] = decryptedValue;
+            }
           } else {
             logger.log('error', 'Encrypted value is not a string');
-            action.args[1] = '';
+            // Keep original; do not clobber
           }
         } catch (error: unknown) {
           const errorMessage = error instanceof Error ? error.message : String(error);
           logger.log('error', `Failed to decrypt input value: ${errorMessage}`);
-          action.args[1] = '';
+          // Keep original on failure
         }
       }

If you use a different envelope than enc:, adjust the predicate accordingly.


213-237: Editor flow: scrapeSchema persistence is not cumulative.

You persist the latest chunk only, overwriting prior schema data. This loses earlier captures during long runs.

Apply array-append merge:

-          const cumulativeScrapeSchemaData = Array.isArray(data) && data.length > 0 ? data : [data];
-          
-          if (cumulativeScrapeSchemaData.length > 0) {
-            await this.persistDataToDatabase('scrapeSchema', cumulativeScrapeSchemaData);
-          }
+          const chunk = Array.isArray(data) && data.length > 0 ? data : [data];
+          if (chunk.length > 0) {
+            await this.persistDataToDatabase('scrapeSchema', chunk);
+          }

And update persistDataToDatabase (see below) to append, not replace, for scrapeSchema.

server/src/workflow-management/scheduler/index.ts (1)

150-156: Run never marked 'running' in DB; risk of double-execution.

Only plainRun.status is updated locally. Without an atomic DB claim, multiple workers can execute the same run.

Claim the run atomically:

-    plainRun.status = 'running';
+    // Atomically claim the run to avoid double execution
+    const [affected] = await Run.update(
+      { status: 'running', startedAt: new Date().toISOString() },
+      { where: { runId: id, status: ['scheduled', 'queued'] } }
+    );
+    if (!affected) {
+      logger.log('info', `Run ${id} could not be claimed (status changed), skipping`);
+      return { success: false, error: 'Run already claimed' };
+    }

If using Sequelize < v6 returning semantics differ; I can tailor to your version.

server/src/routes/storage.ts (2)

645-651: Bug: wrong route param used when fetching a run.

The route is /runs/run/:id but the code reads req.params.runId, which will always be undefined.

-    const run = await Run.findOne({ where: { runId: req.params.runId }, raw: true });
+    const run = await Run.findOne({ where: { runId: req.params.id }, raw: true });

523-526: Avoid TOCTOU on slot check → creation; reserve atomically.

Checking hasAvailableBrowserSlots and then creating a browser can race under concurrent requests. Prefer reserving a slot atomically before spinning up a browser.

-    const canCreateBrowser = await browserPool.hasAvailableBrowserSlots(req.user.id, "run");
+    // Reserve atomically; if false, fall back to queue path
+    const tentativeBrowserId = uuid();
+    const canCreateBrowser = browserPool.reserveBrowserSlot(tentativeBrowserId, req.user.id, "run");

Follow-up: pass tentativeBrowserId through createRemoteBrowserForRun (or upgrade the reserved slot) and delete the reservation on failure.

🧹 Nitpick comments (17)
maxun-core/src/interpret.ts (1)

384-387: Consider adding abort status to the completion event.

When the workflow is aborted, you might want to emit a specific event or pass the abort status to allow callers to distinguish between normal completion and aborted workflows.

 private async carryOutSteps(page: Page, steps: What[]): Promise<void> {
   if (this.isAborted) {
     this.log('Workflow aborted, stopping execution', Level.WARN);
+    this.emit('workflow-aborted');
     return;
   }
server/src/server.ts (2)

124-133: Redundant CORS headers alongside cors() middleware.

This manual header block duplicates the cors middleware and can introduce inconsistencies. Prefer configuring only the middleware.


219-236: Graceful shutdown: close HTTP/Socke­t.IO and pool.

Close io and server to stop accepting connections before exiting. Optionally, drain the BrowserPool.

Apply:

   process.on('SIGINT', async () => {
     console.log('Main app shutting down...');
 
     try {
+      if (io) {
+        console.log('Closing Socket.IO...');
+        await new Promise<void>((resolve) => io.close(() => resolve()));
+      }
+      console.log('Closing HTTP server...');
+      await new Promise<void>((resolve) => server.close(() => resolve()));
       console.log('Closing PostgreSQL connection pool...');
       await pool.end();
       console.log('PostgreSQL connection pool closed');
     } catch (error) {
       console.error('Error closing PostgreSQL connection pool:', error);
     }
 
     if (!isProduction && process.platform === 'win32') {
       if (workerProcess) workerProcess.kill();
-      if (recordingWorkerProcess) recordingWorkerProcess.kill();
     }
     process.exit();
   });
server/src/workflow-management/classes/Interpreter.ts (1)

401-433: Same race/overwrite issue for binary output; consolidate.

Use the same atomic merge pattern as for serializableOutput. Also consider offloading blobs to object storage and persist only references.

server/src/workflow-management/scheduler/index.ts (2)

227-246: Use ISO timestamps in webhooks.

toLocaleString() varies by locale/timezone. Prefer ISO for interoperability.

-      finished_at: new Date().toLocaleString(),
+      finished_at: new Date().toISOString(),

125-140: Minor: unnecessary recording fetch in max-retries block.

The recording loaded at Line 128 isn’t used in this block.

Remove to save a query.

server/src/routes/storage.ts (2)

954-964: Abort: add null-safe guard for interpreter and avoid blocking request path.

stopInterpretation() is awaited in the request path; if it blocks, the API call waits. Consider making the stop fire-and-forget with a timeout and null-safe access.

-      if (browser && browser.interpreter) {
-        logger.log('info', `Immediately stopping interpreter for run ${req.params.id}`);
-        await browser.interpreter.stopInterpretation();
-      }
+      if (browser?.interpreter?.stopInterpretation) {
+        logger.log('info', `Immediately stopping interpreter for run ${req.params.id}`);
+        const stop = browser.interpreter.stopInterpretation();
+        await Promise.race([stop, new Promise(r => setTimeout(r, 1500))]);
+      }

573-601: Normalize timestamps for new runs and queued runs.

startedAt/finishedAt should be ISO strings, not locale strings; finishedAt should be null until set.

-          startedAt: new Date().toLocaleString(),
-          finishedAt: '',
+          startedAt: new Date().toISOString(),
+          finishedAt: null,
...
-        startedAt: new Date().toLocaleString(),
-        finishedAt: '',
+        startedAt: new Date().toISOString(),
+        finishedAt: null,

Also applies to: 612-627

server/src/browser-management/classes/BrowserPool.ts (4)

39-46: Track lastAccessed consistently.

You introduced createdAt/lastAccessed; update lastAccessed when the browser is retrieved or upgraded so cleanup heuristics stay meaningful.

@@ public getRemoteBrowser = (id: string): RemoteBrowser | undefined => {
-        return poolInfo.browser || undefined;
+        // touch lastAccessed when served
+        this.pool[id].lastAccessed = Date.now();
+        return this.pool[id].browser || undefined;
}
@@ public upgradeBrowserSlot = (id: string, browser: RemoteBrowser): boolean => {
         this.pool[id].browser = browser;
         this.pool[id].status = "ready";
+        this.pool[id].lastAccessed = Date.now();

77-82: In-memory locks only protect a single process.

reservationLocks avoids same-process races but won’t help in multi-instance deployments. Consider a cross-process lock (e.g., Postgres advisory locks) or a PgBoss “stately/singleton” policy per user to guarantee global exclusivity.


686-703: Fail path should await shutdown to prevent leaks.

switchOff() is async; currently it’s fire-and-forget inside a non-async method. Either make failBrowserSlot async and await shutdown or clearly document the intentional detach.

-    public failBrowserSlot = (id: string): void => {
+    public failBrowserSlot = async (id: string): Promise<void> => {
...
-            if (browserInfo.browser) {
-                try {
-                    // Try to close browser if it exists
-                    browserInfo.browser.switchOff?.().catch((error: any) => {
-                        logger.log('warn', `Error closing failed browser ${id}: ${error.message}`);
-                    });
-                } catch (error: any) {
-                    logger.log('warn', `Error during browser cleanup for ${id}: ${error.message}`);
-                }
-            }
+            if (browserInfo.browser?.switchOff) {
+                try {
+                    await browserInfo.browser.switchOff();
+                } catch (error: any) {
+                    logger.log('warn', `Error during browser cleanup for ${id}: ${error.message}`);
+                }
+            }

Note: adjust call sites (e.g., cleanupStaleBrowserSlots) to await this.


295-306: Use logger instead of console.

Keep logging consistent.

-                console.log(`Active browser Id ${browserId} found for user: ${userId}`);
+                logger.log('debug', `Active browser Id ${browserId} found for user: ${userId}`);
...
-            console.log(`No active browser found, returning most recent browser Id ${mostRecentId} for user: ${userId}`);
+            logger.log('debug', `No active browser found, returning most recent browser Id ${mostRecentId} for user: ${userId}`);
server/src/pgboss-worker.ts (5)

215-216: Guard setRunId call.

Avoids crashes if interpreter is missing.

-      browser.interpreter.setRunId(data.runId);
+      browser.interpreter?.setRunId?.(data.runId);

712-760: Avoid shadowing registeredAbortQueues.

registerAbortRunWorker defines a local registeredAbortQueues Map that shadows the module-scoped one, leading to inconsistent state tracking.

-  try {
-    const registeredAbortQueues = new Map();
+  try {
+    // reuse the module-scoped registry

57-61: Env var hard-fail on import can break tests/tools.

Throwing at module load time prevents importing helpers in environments without DB config. Consider deferring the check to startWorkers().

-if (!process.env.DB_USER || !process.env.DB_PASSWORD || !process.env.DB_HOST || !process.env.DB_PORT || !process.env.DB_NAME) {
-    throw new Error('Failed to start pgboss worker: one or more required environment variables are missing.');
-}
+export function assertPgBossEnv() {
+  if (!process.env.DB_USER || !process.env.DB_PASSWORD || !process.env.DB_HOST || !process.env.DB_PORT || !process.env.DB_NAME) {
+    throw new Error('Failed to start pgboss worker: one or more required environment variables are missing.');
+  }
+}
...
-async function startWorkers() {
+async function startWorkers() {
+  assertPgBossEnv();

766-846: Consider queue-level concurrency/policies for fairness.

If the intention is “one run at a time per user,” set queue policy to singleton/stately on createQueue or send with singletonKey=userId.


118-173: Timeouts: consider logging final browser status and slot state.

You already log finalStatus; also log pool.getBrowserStatus() and whether slot exists to aid debugging.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 24fd059 and b31daa6.

📒 Files selected for processing (9)
  • maxun-core/src/interpret.ts (12 hunks)
  • server/src/api/record.ts (2 hunks)
  • server/src/browser-management/classes/BrowserPool.ts (5 hunks)
  • server/src/models/Run.ts (3 hunks)
  • server/src/pgboss-worker.ts (9 hunks)
  • server/src/routes/storage.ts (7 hunks)
  • server/src/server.ts (2 hunks)
  • server/src/workflow-management/classes/Interpreter.ts (9 hunks)
  • server/src/workflow-management/scheduler/index.ts (3 hunks)
🧰 Additional context used
🧬 Code graph analysis (7)
server/src/workflow-management/scheduler/index.ts (1)
server/src/browser-management/controller.ts (1)
  • destroyRemoteBrowser (92-126)
server/src/server.ts (1)
server/src/routes/storage.ts (1)
  • recoverOrphanedRuns (1063-1134)
server/src/browser-management/classes/BrowserPool.ts (1)
server/src/browser-management/classes/RemoteBrowser.ts (1)
  • RemoteBrowser (128-2027)
server/src/pgboss-worker.ts (1)
maxun-core/src/interpret.ts (1)
  • run (1340-1376)
server/src/workflow-management/classes/Interpreter.ts (1)
maxun-core/src/interpret.ts (1)
  • run (1340-1376)
server/src/routes/storage.ts (2)
server/src/server.ts (1)
  • browserPool (85-85)
maxun-core/src/interpret.ts (1)
  • run (1340-1376)
server/src/api/record.ts (1)
server/src/browser-management/controller.ts (1)
  • destroyRemoteBrowser (92-126)
🔇 Additional comments (23)
server/src/models/Run.ts (1)

28-28: LGTM! Retry counter implementation looks good.

The addition of the retryCount field enables proper retry tracking for orphaned runs. The default value of 0 and nullable configuration are appropriate for this use case.

Also applies to: 50-50, 125-129

maxun-core/src/interpret.ts (3)

67-68: Well-implemented abort mechanism.

The abort flag provides a clean way to stop workflow execution cooperatively. The public abort() method allows external control over the interpreter's execution state.

Also applies to: 122-124


384-387: Comprehensive abort checks across all critical paths.

The abort checks are strategically placed at all major execution points including:

  • Main execution loops
  • Scraping operations (scrapeSchema, scrapeList)
  • Pagination handling
  • Step execution

This ensures clean termination without resource leaks or partial operations.

Also applies to: 449-453, 490-493, 649-652, 762-765, 1012-1015, 1168-1171, 1203-1206, 1290-1293


667-671: Good defensive programming in pagination.

The abort check before evaluating in scrapeCurrentPage prevents unnecessary browser operations when the workflow is being terminated.

server/src/api/record.ts (5)

601-601: Good integration of RunId with interpreter.

Setting the RunId on the interpreter before execution enables proper association of extracted data with the run record for persistence.


615-646: Robust extraction metrics computation from persisted data.

The implementation correctly:

  1. Fetches the final run state from the database
  2. Safely extracts metrics from the persisted serializableOutput and binaryOutput
  3. Handles both array and object formats for schema results
  4. Computes total rows by combining schema and list items

This approach ensures metrics are derived from the actual persisted data rather than in-memory state.


609-613: Clean separation between persistence and status update.

The update now only modifies status fields (status, finishedAt, log) without touching the data fields, which aligns with the PR's goal of immediate persistence during execution rather than at completion.


884-884: Type safety improvement with AuthenticatedRequest.

Good use of the typed request interface to ensure user authentication is properly validated.


667-668: Null-safety covered – no changes needed
The ternary on scrapeSchema and || {} on scrapeList guarantee captured_texts always yields an array and captured_lists always yields an object, matching all other usages in the codebase.

server/src/server.ts (1)

147-149: No await needed for cleanupStaleBrowserSlots
cleanupStaleBrowserSlots is declared as a synchronous method returning void (BrowserPool.ts line 710–716), so awaiting it is unnecessary.

Likely an incorrect or invalid review comment.

server/src/workflow-management/classes/Interpreter.ts (2)

333-336: Run context propagation looks good.

setRunId is a clear, explicit binding for live persistence.


223-231: Unify serializableCallback payload format
You’re emitting { type: 'captureText'|'captureList', data } at lines 222–231 but raw data at line 495. Confirm all socket listeners can handle both shapes—or normalize to a single {type, data} format.

server/src/workflow-management/scheduler/index.ts (2)

109-116: Good pre-flight guards.

Skipping runs in aborted/aborting/queued states prevents duplicate work during recovery.

Also applies to: 117-123


164-166: Binding runId before execution is correct.

This aligns interpreter persistence with the active Run.

Also applies to: 167-169

server/src/routes/storage.ts (4)

576-577: Good: per-user worker registration is idempotent and localized.

Calling registerWorkerForQueue right after createQueue is the right move; the helper guards duplicate registrations.


693-699: Good: worker is registered before enqueue.

Same pattern here; this prevents “no worker” races on first enqueue.


966-971: Good: per-user abort worker registration.

Ensures abort jobs don’t pile up without a worker.


1081-1125: processQueuedRuns is already invoked on startup
recoverOrphanedRuns is called on server start (server/src/server.ts:150–151) and processQueuedRuns is scheduled every 5 s in server/src/server.ts (lines 136–139), so queued runs will be processed.

Likely an incorrect or invalid review comment.

server/src/browser-management/classes/BrowserPool.ts (1)

710-737: Nice: stale-slot cleanup + lock cleanup.

This will reduce zombie reservations over time.

server/src/pgboss-worker.ts (4)

134-137: Good: skip stale jobs for queued runs.

Prevents duplicate execution when state changes between enqueue and work.


249-274: Metrics extraction looks solid.

Reading from persisted output is the right source of truth for counts.

Also applies to: 301-303


611-653: Nice: lazy, per-queue registration helpers.

This reduces startup coupling and avoids blanket workers.


239-244: Large logs: ensure log size won’t exceed DB/text column limits.

Joining interpretation logs can overflow if verbose. Consider truncation with a tail window.

Comment on lines +351 to 363
const hasData = (run.serializableOutput &&
((run.serializableOutput.scrapeSchema && run.serializableOutput.scrapeSchema.length > 0) ||
(run.serializableOutput.scrapeList && run.serializableOutput.scrapeList.length > 0))) ||
(run.binaryOutput && Object.keys(run.binaryOutput).length > 0);

if (hasData) {
logger.log('info', `Partial data found in failed run ${data.runId}, triggering integration updates`);
await triggerIntegrationUpdates(plainRun.runId, plainRun.robotMetaId);
partialDataExtracted = true;
}
} catch (partialDataError: any) {
logger.log('warn', `Failed to extract partial data for run ${data.runId}: ${partialDataError.message}`);
} catch (dataCheckError: any) {
logger.log('warn', `Failed to check for partial data in run ${data.runId}: ${dataCheckError.message}`);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix partial-data detection (object vs length).

scrapeSchema/scrapeList are objects keyed by extractor; .length checks won’t work.

-        const hasData = (run.serializableOutput && 
-          ((run.serializableOutput.scrapeSchema && run.serializableOutput.scrapeSchema.length > 0) ||
-           (run.serializableOutput.scrapeList && run.serializableOutput.scrapeList.length > 0))) ||
-          (run.binaryOutput && Object.keys(run.binaryOutput).length > 0);
+        const hasSchema =
+          !!run.serializableOutput?.scrapeSchema &&
+          Object.values(run.serializableOutput.scrapeSchema).some((v: any) =>
+            Array.isArray(v) ? v.length > 0 : v && typeof v === 'object'
+          );
+        const hasList =
+          !!run.serializableOutput?.scrapeList &&
+          Object.values(run.serializableOutput.scrapeList).some((v: any) => Array.isArray(v) && v.length > 0);
+        const hasBinary = !!run.binaryOutput && Object.keys(run.binaryOutput).length > 0;
+        const hasData = hasSchema || hasList || hasBinary;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const hasData = (run.serializableOutput &&
((run.serializableOutput.scrapeSchema && run.serializableOutput.scrapeSchema.length > 0) ||
(run.serializableOutput.scrapeList && run.serializableOutput.scrapeList.length > 0))) ||
(run.binaryOutput && Object.keys(run.binaryOutput).length > 0);
if (hasData) {
logger.log('info', `Partial data found in failed run ${data.runId}, triggering integration updates`);
await triggerIntegrationUpdates(plainRun.runId, plainRun.robotMetaId);
partialDataExtracted = true;
}
} catch (partialDataError: any) {
logger.log('warn', `Failed to extract partial data for run ${data.runId}: ${partialDataError.message}`);
} catch (dataCheckError: any) {
logger.log('warn', `Failed to check for partial data in run ${data.runId}: ${dataCheckError.message}`);
}
// Replace the old single-line hasData check with explicit schema/list/binary checks
const hasSchema =
!!run.serializableOutput?.scrapeSchema &&
Object.values(run.serializableOutput.scrapeSchema).some((v: any) =>
Array.isArray(v) ? v.length > 0 : v && typeof v === 'object'
);
const hasList =
!!run.serializableOutput?.scrapeList &&
Object.values(run.serializableOutput.scrapeList).some((v: any) => Array.isArray(v) && v.length > 0);
const hasBinary = !!run.binaryOutput && Object.keys(run.binaryOutput).length > 0;
const hasData = hasSchema || hasList || hasBinary;
if (hasData) {
logger.log('info', `Partial data found in failed run ${data.runId}, triggering integration updates`);
await triggerIntegrationUpdates(plainRun.runId, plainRun.robotMetaId);
partialDataExtracted = true;
}
🤖 Prompt for AI Agents
In server/src/pgboss-worker.ts around lines 351 to 363, the partial-data
detection currently uses .length on scrapeSchema/scrapeList which are objects
keyed by extractor; change the checks to verify presence and non-empty objects
via Object.keys(...).length > 0 (use optional chaining to guard against
undefined), e.g. replace (run.serializableOutput.scrapeSchema &&
run.serializableOutput.scrapeSchema.length > 0) with
(run.serializableOutput?.scrapeSchema &&
Object.keys(run.serializableOutput.scrapeSchema).length > 0) and do the same for
scrapeList, keeping the existing binaryOutput check and error handling.

Comment on lines +574 to +578
const hasData = (run.serializableOutput &&
((run.serializableOutput.scrapeSchema && run.serializableOutput.scrapeSchema.length > 0) ||
(run.serializableOutput.scrapeList && run.serializableOutput.scrapeList.length > 0))) ||
(run.binaryOutput && Object.keys(run.binaryOutput).length > 0);

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Same bug in abort path: fix partial-data detection.

-    const hasData = (run.serializableOutput && 
-      ((run.serializableOutput.scrapeSchema && run.serializableOutput.scrapeSchema.length > 0) ||
-       (run.serializableOutput.scrapeList && run.serializableOutput.scrapeList.length > 0))) ||
-      (run.binaryOutput && Object.keys(run.binaryOutput).length > 0);
+    const hasSchema =
+      !!run.serializableOutput?.scrapeSchema &&
+      Object.values(run.serializableOutput.scrapeSchema).some((v: any) =>
+        Array.isArray(v) ? v.length > 0 : v && typeof v === 'object'
+      );
+    const hasList =
+      !!run.serializableOutput?.scrapeList &&
+      Object.values(run.serializableOutput.scrapeList).some((v: any) => Array.isArray(v) && v.length > 0);
+    const hasBinary = !!run.binaryOutput && Object.keys(run.binaryOutput).length > 0;
+    const hasData = hasSchema || hasList || hasBinary;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const hasData = (run.serializableOutput &&
((run.serializableOutput.scrapeSchema && run.serializableOutput.scrapeSchema.length > 0) ||
(run.serializableOutput.scrapeList && run.serializableOutput.scrapeList.length > 0))) ||
(run.binaryOutput && Object.keys(run.binaryOutput).length > 0);
const hasSchema =
!!run.serializableOutput?.scrapeSchema &&
Object.values(run.serializableOutput.scrapeSchema).some((v: any) =>
Array.isArray(v) ? v.length > 0 : v && typeof v === 'object'
);
const hasList =
!!run.serializableOutput?.scrapeList &&
Object.values(run.serializableOutput.scrapeList).some((v: any) => Array.isArray(v) && v.length > 0);
const hasBinary = !!run.binaryOutput && Object.keys(run.binaryOutput).length > 0;
const hasData = hasSchema || hasList || hasBinary;
🤖 Prompt for AI Agents
In server/src/pgboss-worker.ts around lines 574 to 578, the partial-data
detection expression is wrong in the abort path; replace it with the same robust
check used elsewhere: compute hasData by first checking run.serializableOutput
exists and then whether scrapeSchema or scrapeList are non-empty arrays (use
optional chaining/Array.isArray and length>0), OR check run.binaryOutput exists
and Object.keys(run.binaryOutput).length>0; ensure proper parentheses so the ORs
are evaluated between the serializableOutput block and the binaryOutput check.

Comment on lines +1068 to +1071
where: {
status: ['running', 'scheduled']
},
order: [['startedAt', 'ASC']]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix status filter: use Op.in and import it.

Sequelize needs Op.in for multi-value filters; array shorthand won’t work reliably.

+import { Op } from 'sequelize';
...
-    const orphanedRuns = await Run.findAll({
-      where: { 
-        status: ['running', 'scheduled'] 
-      },
+    const orphanedRuns = await Run.findAll({
+      where: { 
+        status: { [Op.in]: ['running', 'scheduled'] }
+      },
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
where: {
status: ['running', 'scheduled']
},
order: [['startedAt', 'ASC']]
// Add at the top of server/src/routes/storage.ts
import { Op } from 'sequelize';
const orphanedRuns = await Run.findAll({
where: {
status: { [Op.in]: ['running', 'scheduled'] }
},
order: [['startedAt', 'ASC']]
});
🤖 Prompt for AI Agents
In server/src/routes/storage.ts around lines 1068 to 1071, the status filter
currently uses an array shorthand which is unreliable; change the where clause
to use Sequelize's Op.in (status: { [Op.in]: ['running', 'scheduled'] }) and add
the Op import at the top of the file (e.g., import { Op } from 'sequelize' or
const { Op } = require('sequelize') depending on module style) so the
multi-value filter works correctly.

Comment on lines +1097 to +1099
browserId: undefined,
log: runData.log ? `${runData.log}\n[RETRY ${retryCount + 1}/3] Re-queuing due to server crash` : `[RETRY ${retryCount + 1}/3] Re-queuing due to server crash`
});
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Persist nulls, not undefined; use ISO timestamps.

Undefined won’t clear a column in Sequelize; toLocaleString() is locale-dependent and hard to sort. Use null and ISO 8601.

-              serializableOutput: {},
-              binaryOutput: {},
-              browserId: undefined,
+              serializableOutput: {},
+              binaryOutput: {},
+              browserId: null,
...
-              finishedAt: new Date().toLocaleString(),
+              finishedAt: new Date().toISOString(),

Also applies to: 1105-1109

🤖 Prompt for AI Agents
In server/src/routes/storage.ts around lines 1097-1099 (and similarly
1105-1109), persist null instead of undefined for browserId so Sequelize will
clear the column (i.e., set browserId: null), and replace any use of
toLocaleString() in generated log entries with ISO 8601 timestamps (e.g.,
Date.toISOString()) so logs are sortable and consistent; update the log
construction to include the ISO timestamp and ensure retry messages remain
intact.

Comment on lines +147 to +152
logger.log('info', 'Cleaning up stale browser slots...');
browserPool.cleanupStaleBrowserSlots();

await recoverOrphanedRuns();
await startWorkers();

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Avoid double-starting queue workers; unify one startup path.

await startWorkers() is invoked (Line 151), and later the dev path also starts the same workers via dynamic import (Lines 201-203) and, on Windows dev, via fork (Lines 186-197). Additionally, processQueuedRuns() runs every 5s (Lines 135-139). This can lead to duplicate processing, race conditions, and job thrashing.

  • Pick a single worker bootstrap. Recommend: keep await startWorkers() and remove the extra dev starts (dynamic import and Windows fork for pgboss-worker). Also relocate or remove the processQueuedRuns() loop to avoid overlap with PgBoss.

Apply minimal diffs to stop duplicate worker starts:

@@
-const recordingWorkerPath = path.resolve(__dirname, isProduction ? './pgboss-worker.js' : './pgboss-worker.ts');
+// pgboss workers are started via startWorkers(); do not import/fork separately
@@
-          recordingWorkerProcess = fork(recordingWorkerPath, [], {
-            execArgv: ['--inspect=5860'],
-          });
-          recordingWorkerProcess.on('message', (message: any) => {
-            console.log(`Message from recording worker: ${message}`);
-          });
-          recordingWorkerProcess.on('error', (error: any) => {
-            console.error(`Error in recording worker: ${error}`);
-          });
-          recordingWorkerProcess.on('exit', (code: any) => {
-            console.log(`Recording worker exited with code: ${code}`);
-          });
@@
-            await import('./pgboss-worker');
+            // Workers already started via startWorkers(); skip importing pgboss-worker here

Also remove or gate the polling loop:

@@
-if (require.main === module) {
-  setInterval(() => {
-    processQueuedRuns();
-  }, 5000);
-}

If you still need the loop in non-PgBoss environments, start it after DB connect/sync and guard with an env flag.

Also applies to: 171-208, 135-139

🤖 Prompt for AI Agents
In server/src/server.ts around lines 147-152 (and also affecting 135-139 and
171-208): the code currently starts queue workers in multiple places (await
startWorkers() at ~151, a dynamic import at ~201-203, and a Windows fork at
~186-197) and also runs a separate processQueuedRuns() loop (~135-139), causing
duplicate processing and race conditions; keep the single canonical bootstrap
path by retaining await startWorkers() and remove the dynamic import and
fork-based worker starts (delete or comment out lines 171-208 that perform those
dev-specific starts), and remove or gate the processQueuedRuns() polling loop
(lines 135-139) so it only runs when PgBoss is not used — implement a simple env
flag (eg. USE_PGBOSS=true) or feature-flag and, if needed, start the polling
loop only after DB connect/sync and when the flag indicates no PgBoss; ensure no
other code path launches workers.

Comment on lines +150 to +152
await recoverOrphanedRuns();
await startWorkers();

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Recovery handler currently wipes outputs.

recoverOrphanedRuns() (routes/storage.ts) resets serializableOutput and binaryOutput to {} when re-queuing. This contradicts the PR goal “so it is not lost” and will drop partially persisted data.

Suggestion: preserve existing outputs on requeue and append recovery notes only to log. If cleanup is truly required, move data to a partialOutput field instead of discarding. Want a patch for routes/storage.ts?

🤖 Prompt for AI Agents
In server/src/server.ts around lines 150-152: the call to recoverOrphanedRuns()
triggers logic in routes/storage.ts that currently wipes serializableOutput and
binaryOutput when re-queuing runs; update routes/storage.ts so requeue preserves
any existing serializableOutput and binaryOutput (do not set them to {}), only
append a recovery note to the run's log field; if a cleanup/move is required
instead of preservation, move existing data to a new partialOutput field rather
than discarding it; ensure the requeue path only alters the log and queue
metadata, leaving persisted outputs intact.

Comment on lines +244 to +250
binaryCallback: async (data: string, mimetype: string) => {
const binaryItem = { mimetype, data: JSON.stringify(data) };
this.binaryData.push(binaryItem);

// Persist binary data to database
await this.persistBinaryDataToDatabase(binaryItem);

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Binary persistence: avoid JSON.stringify and weak keys.

  • JSON.stringify(data) stores quoted strings; consumers likely expect raw base64/bytes.
  • item-${Date.now()}-${length} risks collisions under concurrency.

Use UUID keys and store raw string:

@@
-      binaryCallback: async (data: string, mimetype: string) => {
-        const binaryItem = { mimetype, data: JSON.stringify(data) };
+      binaryCallback: async (data: string, mimetype: string) => {
+        const binaryItem = { mimetype, data };
         this.binaryData.push(binaryItem);
@@
-        await this.persistBinaryDataToDatabase(binaryItem);
+        await this.persistBinaryDataToDatabase(binaryItem);

And in persistBinaryDataToDatabase:

+import { randomUUID } from 'crypto';
@@
-      const uniqueKey = `item-${Date.now()}-${Object.keys(currentBinaryOutput).length}`;
+      const uniqueKey = `item-${randomUUID()}`;

Also applies to: 498-503

🤖 Prompt for AI Agents
In server/src/workflow-management/classes/Interpreter.ts around lines 244-250
(and similarly at 498-503): the code JSON.stringifys the binary string and
generates weak keys using item-${Date.now()}-${length}, which can produce quoted
base64 and collide under concurrency; change to push the raw string (no
JSON.stringify) and generate a UUID (v4) for the item key, then pass the raw
base64/string to persistBinaryDataToDatabase; update persistBinaryDataToDatabase
to accept and store the raw string and the UUID key (and ensure any DB column
types expect plain text/blob), and remove any logic that assumes a JSON-encoded
value or time-based key.

Comment on lines +338 to +395
/**
* Persists data to database in real-time during interpretation
* @private
*/
private persistDataToDatabase = async (actionType: string, data: any, listIndex?: number): Promise<void> => {
if (!this.currentRunId) {
logger.log('debug', 'No run ID available for real-time persistence');
return;
}

try {
const run = await Run.findOne({ where: { runId: this.currentRunId } });

if (!run) {
logger.log('warn', `Run not found for real-time persistence: ${this.currentRunId}`);
return;
}

const currentSerializableOutput = run.serializableOutput ?
JSON.parse(JSON.stringify(run.serializableOutput)) :
{ scrapeSchema: [], scrapeList: [] };

if (actionType === 'scrapeSchema') {
const newSchemaData = Array.isArray(data) ? data : [data];
const updatedOutput = {
...currentSerializableOutput,
scrapeSchema: newSchemaData
};

await run.update({
serializableOutput: updatedOutput
});

logger.log('debug', `Persisted scrapeSchema data for run ${this.currentRunId}: ${newSchemaData.length} items`);

} else if (actionType === 'scrapeList' && typeof listIndex === 'number') {
if (!Array.isArray(currentSerializableOutput.scrapeList)) {
currentSerializableOutput.scrapeList = [];
}

const updatedList = [...currentSerializableOutput.scrapeList];
updatedList[listIndex] = data;

const updatedOutput = {
...currentSerializableOutput,
scrapeList: updatedList
};

await run.update({
serializableOutput: updatedOutput
});

logger.log('debug', `Persisted scrapeList data for run ${this.currentRunId} at index ${listIndex}: ${Array.isArray(data) ? data.length : 'N/A'} items`);
}
} catch (error: any) {
logger.log('error', `Failed to persist data in real-time for run ${this.currentRunId}: ${error.message}`);
}
};
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

JSON overwrite + race window can drop data; merge atomically.

Current pattern:

  • Read whole JSON -> mutate -> run.update(). Parallel callbacks will race; last write wins.

  • Merge scrapeSchema by appending to the existing array.

  • Use optimistic locking (version column) or a DB transaction with SELECT ... FOR UPDATE.

  • Prefer JSONB merge at DB level to avoid round-trips.

Minimal in-place fix (append on the server side):

-      if (actionType === 'scrapeSchema') {
-        const newSchemaData = Array.isArray(data) ? data : [data];
-        const updatedOutput = {
-          ...currentSerializableOutput,
-          scrapeSchema: newSchemaData
-        };
+      if (actionType === 'scrapeSchema') {
+        const newSchemaData = Array.isArray(data) ? data : [data];
+        const existing = Array.isArray(currentSerializableOutput.scrapeSchema)
+          ? currentSerializableOutput.scrapeSchema
+          : [];
+        const updatedOutput = {
+          ...currentSerializableOutput,
+          scrapeSchema: [...existing, ...newSchemaData],
+        };
         await run.update({
           serializableOutput: updatedOutput
         });

If you want a transaction, I can draft the Sequelize/PG JSONB approach.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/**
* Persists data to database in real-time during interpretation
* @private
*/
private persistDataToDatabase = async (actionType: string, data: any, listIndex?: number): Promise<void> => {
if (!this.currentRunId) {
logger.log('debug', 'No run ID available for real-time persistence');
return;
}
try {
const run = await Run.findOne({ where: { runId: this.currentRunId } });
if (!run) {
logger.log('warn', `Run not found for real-time persistence: ${this.currentRunId}`);
return;
}
const currentSerializableOutput = run.serializableOutput ?
JSON.parse(JSON.stringify(run.serializableOutput)) :
{ scrapeSchema: [], scrapeList: [] };
if (actionType === 'scrapeSchema') {
const newSchemaData = Array.isArray(data) ? data : [data];
const updatedOutput = {
...currentSerializableOutput,
scrapeSchema: newSchemaData
};
await run.update({
serializableOutput: updatedOutput
});
logger.log('debug', `Persisted scrapeSchema data for run ${this.currentRunId}: ${newSchemaData.length} items`);
} else if (actionType === 'scrapeList' && typeof listIndex === 'number') {
if (!Array.isArray(currentSerializableOutput.scrapeList)) {
currentSerializableOutput.scrapeList = [];
}
const updatedList = [...currentSerializableOutput.scrapeList];
updatedList[listIndex] = data;
const updatedOutput = {
...currentSerializableOutput,
scrapeList: updatedList
};
await run.update({
serializableOutput: updatedOutput
});
logger.log('debug', `Persisted scrapeList data for run ${this.currentRunId} at index ${listIndex}: ${Array.isArray(data) ? data.length : 'N/A'} items`);
}
} catch (error: any) {
logger.log('error', `Failed to persist data in real-time for run ${this.currentRunId}: ${error.message}`);
}
};
if (actionType === 'scrapeSchema') {
const newSchemaData = Array.isArray(data) ? data : [data];
const existing = Array.isArray(currentSerializableOutput.scrapeSchema)
? currentSerializableOutput.scrapeSchema
: [];
const updatedOutput = {
...currentSerializableOutput,
scrapeSchema: [...existing, ...newSchemaData],
};
await run.update({
serializableOutput: updatedOutput
});
logger.log(
'debug',
`Persisted scrapeSchema data for run ${this.currentRunId}: ${
Array.isArray(newSchemaData) ? newSchemaData.length : 'N/A'
} items`
);
} else if (actionType === 'scrapeList' && typeof listIndex === 'number') {
// …existing scrapeList logic…
}

Comment on lines +184 to 209
const updatedRun = await Run.findOne({ where: { runId: id } });
if (updatedRun) {
if (updatedRun.serializableOutput) {
if (updatedRun.serializableOutput.scrapeSchema) {
Object.values(updatedRun.serializableOutput.scrapeSchema).forEach((schemaResult: any) => {
if (Array.isArray(schemaResult)) {
totalSchemaItemsExtracted += schemaResult.length;
} else if (schemaResult && typeof schemaResult === 'object') {
totalSchemaItemsExtracted += 1;
}
});
}
});
}

if (categorizedOutput.scrapeList) {
Object.values(categorizedOutput.scrapeList).forEach((listResult: any) => {
if (Array.isArray(listResult)) {
totalListItemsExtracted += listResult.length;
if (updatedRun.serializableOutput.scrapeList) {
Object.values(updatedRun.serializableOutput.scrapeList).forEach((listResult: any) => {
if (Array.isArray(listResult)) {
totalListItemsExtracted += listResult.length;
}
});
}
});
}

if (uploadedBinaryOutput) {
extractedScreenshotsCount = Object.keys(uploadedBinaryOutput).length;
}
if (updatedRun.binaryOutput) {
extractedScreenshotsCount = Object.keys(updatedRun.binaryOutput).length;
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Schema metrics undercount when using merged object.

If scrapeSchema is [ {k1: v1, k2: v2} ], the loop counts 1, not the number of keys.

Robust counting:

-    const updatedRun = await Run.findOne({ where: { runId: id } });
+    const updatedRun = await Run.findOne({ where: { runId: id }, raw: true });
     if (updatedRun) {
-      if (updatedRun.serializableOutput) {
-        if (updatedRun.serializableOutput.scrapeSchema) {
-          Object.values(updatedRun.serializableOutput.scrapeSchema).forEach((schemaResult: any) => {
-            if (Array.isArray(schemaResult)) {
-              totalSchemaItemsExtracted += schemaResult.length;
-            } else if (schemaResult && typeof schemaResult === 'object') {
-              totalSchemaItemsExtracted += 1;
-            }
-          });
-        }
-        
-        if (updatedRun.serializableOutput.scrapeList) {
-          Object.values(updatedRun.serializableOutput.scrapeList).forEach((listResult: any) => {
-            if (Array.isArray(listResult)) {
-              totalListItemsExtracted += listResult.length;
-            }
-          });
-        }
-      }
-      
-      if (updatedRun.binaryOutput) {
-        extractedScreenshotsCount = Object.keys(updatedRun.binaryOutput).length;
-      }
+      const so = updatedRun.serializableOutput || {};
+      const ss = so.scrapeSchema;
+      if (ss) {
+        if (Array.isArray(ss)) {
+          if (ss.length && typeof ss[0] === 'object') {
+            totalSchemaItemsExtracted += Object.keys(ss[0]).length;
+          } else {
+            totalSchemaItemsExtracted += ss.length;
+          }
+        } else if (typeof ss === 'object') {
+          totalSchemaItemsExtracted += Object.keys(ss).length;
+        }
+      }
+      const sl = so.scrapeList;
+      if (sl) {
+        const lists = Array.isArray(sl) ? sl : Object.values(sl);
+        for (const list of lists) {
+          if (Array.isArray(list)) totalListItemsExtracted += list.length;
+        }
+      }
+      const bo = updatedRun.binaryOutput || {};
+      extractedScreenshotsCount = Object.keys(bo).length;
     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const updatedRun = await Run.findOne({ where: { runId: id } });
if (updatedRun) {
if (updatedRun.serializableOutput) {
if (updatedRun.serializableOutput.scrapeSchema) {
Object.values(updatedRun.serializableOutput.scrapeSchema).forEach((schemaResult: any) => {
if (Array.isArray(schemaResult)) {
totalSchemaItemsExtracted += schemaResult.length;
} else if (schemaResult && typeof schemaResult === 'object') {
totalSchemaItemsExtracted += 1;
}
});
}
});
}
if (categorizedOutput.scrapeList) {
Object.values(categorizedOutput.scrapeList).forEach((listResult: any) => {
if (Array.isArray(listResult)) {
totalListItemsExtracted += listResult.length;
if (updatedRun.serializableOutput.scrapeList) {
Object.values(updatedRun.serializableOutput.scrapeList).forEach((listResult: any) => {
if (Array.isArray(listResult)) {
totalListItemsExtracted += listResult.length;
}
});
}
});
}
if (uploadedBinaryOutput) {
extractedScreenshotsCount = Object.keys(uploadedBinaryOutput).length;
}
if (updatedRun.binaryOutput) {
extractedScreenshotsCount = Object.keys(updatedRun.binaryOutput).length;
}
}
const updatedRun = await Run.findOne(
{ where: { runId: id }, raw: true }
);
if (updatedRun) {
const so = updatedRun.serializableOutput || {};
const ss = so.scrapeSchema;
if (ss) {
if (Array.isArray(ss)) {
// If it's an array of objects, count the keys of the first object;
// otherwise count items in the array.
if (ss.length && typeof ss[0] === 'object') {
totalSchemaItemsExtracted += Object.keys(ss[0]).length;
} else {
totalSchemaItemsExtracted += ss.length;
}
} else if (typeof ss === 'object') {
// If it's a single object, count its keys.
totalSchemaItemsExtracted += Object.keys(ss).length;
}
}
const sl = so.scrapeList;
if (sl) {
// Handle both array‐of‐lists and map‐of‐lists.
const lists = Array.isArray(sl) ? sl : Object.values(sl);
for (const list of lists) {
if (Array.isArray(list)) {
totalListItemsExtracted += list.length;
}
}
}
const bo = updatedRun.binaryOutput || {};
extractedScreenshotsCount = Object.keys(bo).length;
}

@amhsirak amhsirak added Scope: Infra All issues/PRs related to infrastructure Scope: Ext Issues/PRs related to core extraction labels Sep 10, 2025
@amhsirak amhsirak merged commit d0e1c9b into develop Sep 10, 2025
1 check passed
@coderabbitai coderabbitai bot mentioned this pull request Dec 10, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Scope: Ext Issues/PRs related to core extraction Scope: Infra All issues/PRs related to infrastructure

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants