6
6
parsePacket ,
7
7
} from "@trigger.dev/core/v3" ;
8
8
import { BatchTaskRun , Prisma , TaskRunAttempt } from "@trigger.dev/database" ;
9
- import { $transaction , PrismaClientOrTransaction } from "~/db.server" ;
9
+ import { $transaction , prisma , PrismaClientOrTransaction } from "~/db.server" ;
10
10
import { env } from "~/env.server" ;
11
11
import { batchTaskRunItemStatusForRunStatus } from "~/models/taskRun.server" ;
12
12
import { AuthenticatedEnvironment } from "~/services/apiAuth.server" ;
@@ -25,12 +25,10 @@ import { z } from "zod";
25
25
26
26
const PROCESSING_BATCH_SIZE = 50 ;
27
27
const ASYNC_BATCH_PROCESS_SIZE_THRESHOLD = 20 ;
28
+ const MAX_ATTEMPTS = 10 ;
28
29
29
- const BatchProcessingStrategy = z . enum ( [ "sequential" , "parallel" ] ) ;
30
-
31
- type BatchProcessingStrategy = z . infer < typeof BatchProcessingStrategy > ;
32
-
33
- const CURRENT_STRATEGY : BatchProcessingStrategy = "parallel" ;
30
+ export const BatchProcessingStrategy = z . enum ( [ "sequential" , "parallel" ] ) ;
31
+ export type BatchProcessingStrategy = z . infer < typeof BatchProcessingStrategy > ;
34
32
35
33
export const BatchProcessingOptions = z . object ( {
36
34
batchId : z . string ( ) ,
@@ -52,6 +50,17 @@ export type BatchTriggerTaskServiceOptions = {
52
50
} ;
53
51
54
52
export class BatchTriggerV2Service extends BaseService {
53
+ private _batchProcessingStrategy : BatchProcessingStrategy ;
54
+
55
+ constructor (
56
+ batchProcessingStrategy ?: BatchProcessingStrategy ,
57
+ protected readonly _prisma : PrismaClientOrTransaction = prisma
58
+ ) {
59
+ super ( _prisma ) ;
60
+
61
+ this . _batchProcessingStrategy = batchProcessingStrategy ?? "parallel" ;
62
+ }
63
+
55
64
public async call (
56
65
environment : AuthenticatedEnvironment ,
57
66
body : BatchTriggerTaskV2RequestBody ,
@@ -452,14 +461,14 @@ export class BatchTriggerV2Service extends BaseService {
452
461
} ,
453
462
} ) ;
454
463
455
- switch ( CURRENT_STRATEGY ) {
464
+ switch ( this . _batchProcessingStrategy ) {
456
465
case "sequential" : {
457
466
await this . #enqueueBatchTaskRun( {
458
467
batchId : batch . id ,
459
468
processingId : batchId ,
460
469
range : { start : 0 , count : PROCESSING_BATCH_SIZE } ,
461
470
attemptCount : 0 ,
462
- strategy : CURRENT_STRATEGY ,
471
+ strategy : this . _batchProcessingStrategy ,
463
472
} ) ;
464
473
465
474
break ;
@@ -480,7 +489,7 @@ export class BatchTriggerV2Service extends BaseService {
480
489
processingId : `${ index } ` ,
481
490
range,
482
491
attemptCount : 0 ,
483
- strategy : CURRENT_STRATEGY ,
492
+ strategy : this . _batchProcessingStrategy ,
484
493
} ,
485
494
tx
486
495
)
@@ -539,6 +548,16 @@ export class BatchTriggerV2Service extends BaseService {
539
548
540
549
const $attemptCount = options . attemptCount + 1 ;
541
550
551
+ // Add early return if max attempts reached
552
+ if ( $attemptCount > MAX_ATTEMPTS ) {
553
+ logger . error ( "[BatchTriggerV2][processBatchTaskRun] Max attempts reached" , {
554
+ options,
555
+ attemptCount : $attemptCount ,
556
+ } ) ;
557
+ // You might want to update the batch status to failed here
558
+ return ;
559
+ }
560
+
542
561
const batch = await this . _prisma . batchTaskRun . findFirst ( {
543
562
where : { id : options . batchId } ,
544
563
include : {
0 commit comments