1
1
import byline from "byline" ;
2
2
import Dockerode from "dockerode" ;
3
+ import { setTimeout } from "timers/promises" ;
3
4
import { log } from "../common" ;
4
5
import { getContainerRuntimeClient } from "../container-runtime" ;
5
6
import { BoundPorts } from "../utils/bound-ports" ;
@@ -16,46 +17,37 @@ export class LogWaitStrategy extends AbstractWaitStrategy {
16
17
}
17
18
18
19
public async waitUntilReady ( container : Dockerode . Container , boundPorts : BoundPorts , startTime ?: Date ) : Promise < void > {
20
+ await Promise . race ( [ this . handleTimeout ( container . id ) , this . handleLogs ( container , startTime ) ] ) ;
21
+ }
22
+
23
+ async handleTimeout ( containerId : string ) : Promise < void > {
24
+ await setTimeout ( this . startupTimeout ) ;
25
+ this . throwError ( containerId , `Log message "${ this . message } " not received after ${ this . startupTimeout } ms` ) ;
26
+ }
27
+
28
+ async handleLogs ( container : Dockerode . Container , startTime ?: Date ) : Promise < void > {
19
29
log . debug ( `Waiting for log message "${ this . message } "...` , { containerId : container . id } ) ;
20
30
const client = await getContainerRuntimeClient ( ) ;
21
31
const stream = await client . container . logs ( container , { since : startTime ? startTime . getTime ( ) / 1000 : 0 } ) ;
22
- return new Promise ( ( resolve , reject ) => {
23
- const timeout = setTimeout ( ( ) => {
24
- const message = `Log message "${ this . message } " not received after ${ this . startupTimeout } ms` ;
25
- log . error ( message , { containerId : container . id } ) ;
26
- reject ( new Error ( message ) ) ;
27
- } , this . startupTimeout ) ;
28
-
29
- const comparisonFn : ( line : string ) => boolean = ( line : string ) => {
30
- if ( this . message instanceof RegExp ) {
31
- return this . message . test ( line ) ;
32
- } else {
33
- return line . includes ( this . message ) ;
34
- }
35
- } ;
36
-
37
- let count = 0 ;
38
- const lineProcessor = ( line : string ) => {
39
- if ( comparisonFn ( line ) ) {
40
- if ( ++ count === this . times ) {
41
- stream . destroy ( ) ;
42
- clearTimeout ( timeout ) ;
43
- log . debug ( `Log wait strategy complete` , { containerId : container . id } ) ;
44
- resolve ( ) ;
45
- }
32
+
33
+ let matches = 0 ;
34
+ for await ( const line of byline ( stream ) ) {
35
+ if ( this . matches ( line ) ) {
36
+ if ( ++ matches === this . times ) {
37
+ return log . debug ( `Log wait strategy complete` , { containerId : container . id } ) ;
46
38
}
47
- } ;
48
-
49
- byline ( stream )
50
- . on ( "data" , lineProcessor )
51
- . on ( "err" , lineProcessor )
52
- . on ( "end" , ( ) => {
53
- stream . destroy ( ) ;
54
- clearTimeout ( timeout ) ;
55
- const message = `Log stream ended and message " ${ this . message } " was not received` ;
56
- log . error ( message , { containerId : container . id } ) ;
57
- reject ( new Error ( message ) ) ;
58
- } ) ;
59
- } ) ;
39
+ }
40
+ }
41
+
42
+ this . throwError ( container . id , `Log stream ended and message " ${ this . message } " was not received` ) ;
43
+ }
44
+
45
+ matches ( line : string ) : boolean {
46
+ return this . message instanceof RegExp ? this . message . test ( line ) : line . includes ( this . message ) ;
47
+ }
48
+
49
+ throwError ( containerId : string , message : string ) : void {
50
+ log . error ( message , { containerId } ) ;
51
+ throw new Error ( message ) ;
60
52
}
61
53
}
0 commit comments