@@ -7,7 +7,8 @@ import Replicate, {
77 parseProgressFromLogs ,
88} from "replicate" ;
99import nock from "nock" ;
10- import fetch from "cross-fetch" ;
10+ import { createReadableStream } from "./lib/stream" ;
11+ import { PassThrough } from "node:stream" ;
1112
1213let client : Replicate ;
1314const BASE_URL = "https://api.replicate.com/v1" ;
@@ -21,7 +22,6 @@ describe("Replicate client", () => {
2122
2223 beforeEach ( ( ) => {
2324 client = new Replicate ( { auth : "test-token" } ) ;
24- client . fetch = fetch ;
2525
2626 unmatched = [ ] ;
2727 nock . emitter . on ( "no match" , handleNoMatch ) ;
@@ -251,7 +251,7 @@ describe("Replicate client", () => {
251251 let actual : Record < string , any > | undefined ;
252252 nock ( BASE_URL )
253253 . post ( "/predictions" )
254- . reply ( 201 , ( uri : string , body : Record < string , any > ) => {
254+ . reply ( 201 , ( _uri : string , body : Record < string , any > ) => {
255255 actual = body ;
256256 return body ;
257257 } ) ;
@@ -1010,8 +1010,6 @@ describe("Replicate client", () => {
10101010 } ) ;
10111011
10121012 test ( "Calls the correct API routes for a model" , async ( ) => {
1013- const firstPollingRequest = true ;
1014-
10151013 nock ( BASE_URL )
10161014 . post ( "/models/replicate/hello-world/predictions" )
10171015 . reply ( 201 , {
@@ -1179,12 +1177,322 @@ describe("Replicate client", () => {
11791177 // This is a test secret and should not be used in production
11801178 const secret = "whsec_MfKQ9r8GKYqrTwjUPD8ILPZIo2LaLaSw" ;
11811179
1182- const isValid = await validateWebhook ( request , secret ) ;
1180+ const isValid = validateWebhook ( request , secret ) ;
11831181 expect ( isValid ) . toBe ( true ) ;
11841182 } ) ;
11851183
11861184 // Add more tests for error handling, edge cases, etc.
11871185 } ) ;
11881186
11891187 // Continue with tests for other methods
1188+
1189+ describe ( "createReadableStream" , ( ) => {
1190+ function createStream ( body : string | NodeJS . ReadableStream , status = 200 ) {
1191+ const streamEndpoint = "https://stream.replicate.com" ;
1192+ nock ( streamEndpoint )
1193+ . get ( "/fake_stream" )
1194+ . matchHeader ( "Accept" , "text/event-stream" )
1195+ . reply ( status , body ) ;
1196+
1197+ return createReadableStream ( {
1198+ url : `${ streamEndpoint } /fake_stream` ,
1199+ fetch : fetch ,
1200+ } ) ;
1201+ }
1202+
1203+ test ( "consumes a server sent event stream" , async ( ) => {
1204+ const stream = createStream (
1205+ `
1206+ event: output
1207+ id: EVENT_1
1208+ data: hello world
1209+
1210+ event: done
1211+ id: EVENT_2
1212+ data: {}
1213+
1214+ ` . replace ( / ^ [ ] + / gm, "" )
1215+ ) ;
1216+
1217+ const iterator = stream [ Symbol . asyncIterator ] ( ) ;
1218+ expect ( await iterator . next ( ) ) . toEqual ( {
1219+ done : false ,
1220+ value : { event : "output" , id : "EVENT_1" , data : "hello world" } ,
1221+ } ) ;
1222+ expect ( await iterator . next ( ) ) . toEqual ( {
1223+ done : false ,
1224+ value : { event : "done" , id : "EVENT_2" , data : "{}" } ,
1225+ } ) ;
1226+ expect ( await iterator . next ( ) ) . toEqual ( { done : true } ) ;
1227+ expect ( await iterator . next ( ) ) . toEqual ( { done : true } ) ;
1228+ } ) ;
1229+
1230+ test ( "consumes multiple events" , async ( ) => {
1231+ const stream = createStream (
1232+ `
1233+ event: output
1234+ id: EVENT_1
1235+ data: hello world
1236+
1237+ event: output
1238+ id: EVENT_2
1239+ data: hello dave
1240+
1241+ event: done
1242+ id: EVENT_3
1243+ data: {}
1244+
1245+ ` . replace ( / ^ [ ] + / gm, "" )
1246+ ) ;
1247+
1248+ const iterator = stream [ Symbol . asyncIterator ] ( ) ;
1249+
1250+ expect ( await iterator . next ( ) ) . toEqual ( {
1251+ done : false ,
1252+ value : { event : "output" , id : "EVENT_1" , data : "hello world" } ,
1253+ } ) ;
1254+ expect ( await iterator . next ( ) ) . toEqual ( {
1255+ done : false ,
1256+ value : { event : "output" , id : "EVENT_2" , data : "hello dave" } ,
1257+ } ) ;
1258+ expect ( await iterator . next ( ) ) . toEqual ( {
1259+ done : false ,
1260+ value : { event : "done" , id : "EVENT_3" , data : "{}" } ,
1261+ } ) ;
1262+ expect ( await iterator . next ( ) ) . toEqual ( { done : true } ) ;
1263+ expect ( await iterator . next ( ) ) . toEqual ( { done : true } ) ;
1264+ } ) ;
1265+
1266+ test ( "ignores unexpected characters" , async ( ) => {
1267+ const stream = createStream (
1268+ `
1269+ : hi
1270+
1271+ event: output
1272+ id: EVENT_1
1273+ data: hello world
1274+
1275+ event: done
1276+ id: EVENT_2
1277+ data: {}
1278+
1279+ ` . replace ( / ^ [ ] + / gm, "" )
1280+ ) ;
1281+
1282+ const iterator = stream [ Symbol . asyncIterator ] ( ) ;
1283+
1284+ expect ( await iterator . next ( ) ) . toEqual ( {
1285+ done : false ,
1286+ value : { event : "output" , id : "EVENT_1" , data : "hello world" } ,
1287+ } ) ;
1288+ expect ( await iterator . next ( ) ) . toEqual ( {
1289+ done : false ,
1290+ value : { event : "done" , id : "EVENT_2" , data : "{}" } ,
1291+ } ) ;
1292+ expect ( await iterator . next ( ) ) . toEqual ( { done : true } ) ;
1293+ expect ( await iterator . next ( ) ) . toEqual ( { done : true } ) ;
1294+ } ) ;
1295+
1296+ test ( "supports multiple lines of output in a single event" , async ( ) => {
1297+ const stream = createStream (
1298+ `
1299+ : hi
1300+
1301+ event: output
1302+ id: EVENT_1
1303+ data: hello,
1304+ data: this is a new line,
1305+ data: and this is a new line too
1306+
1307+ event: done
1308+ id: EVENT_2
1309+ data: {}
1310+
1311+ ` . replace ( / ^ [ ] + / gm, "" )
1312+ ) ;
1313+
1314+ const iterator = stream [ Symbol . asyncIterator ] ( ) ;
1315+
1316+ expect ( await iterator . next ( ) ) . toEqual ( {
1317+ done : false ,
1318+ value : {
1319+ event : "output" ,
1320+ id : "EVENT_1" ,
1321+ data : "hello,\nthis is a new line,\nand this is a new line too" ,
1322+ } ,
1323+ } ) ;
1324+ expect ( await iterator . next ( ) ) . toEqual ( {
1325+ done : false ,
1326+ value : { event : "done" , id : "EVENT_2" , data : "{}" } ,
1327+ } ) ;
1328+ expect ( await iterator . next ( ) ) . toEqual ( { done : true } ) ;
1329+ expect ( await iterator . next ( ) ) . toEqual ( { done : true } ) ;
1330+ } ) ;
1331+
1332+ test ( "supports the server writing data lines in multiple chunks" , async ( ) => {
1333+ const body = new PassThrough ( ) ;
1334+ const stream = createStream ( body ) ;
1335+
1336+ // Create a stream of data chunks split on the pipe character for readability.
1337+ const data = `
1338+ event: output
1339+ id: EVENT_1
1340+ data: hello,|
1341+ data: this is a new line,|
1342+ data: and this is a new line too
1343+
1344+ event: done
1345+ id: EVENT_2
1346+ data: {}
1347+
1348+ ` . replace ( / ^ [ ] + / gm, "" ) ;
1349+
1350+ const chunks = data . split ( "|" ) ;
1351+
1352+ // Consume the iterator in parallel to writing it.
1353+ const reading = new Promise ( ( resolve , reject ) => {
1354+ ( async ( ) => {
1355+ const iterator = stream [ Symbol . asyncIterator ] ( ) ;
1356+ expect ( await iterator . next ( ) ) . toEqual ( {
1357+ done : false ,
1358+ value : {
1359+ event : "output" ,
1360+ id : "EVENT_1" ,
1361+ data : "hello,\nthis is a new line,\nand this is a new line too" ,
1362+ } ,
1363+ } ) ;
1364+ expect ( await iterator . next ( ) ) . toEqual ( {
1365+ done : false ,
1366+ value : { event : "done" , id : "EVENT_2" , data : "{}" } ,
1367+ } ) ;
1368+ expect ( await iterator . next ( ) ) . toEqual ( { done : true } ) ;
1369+ } ) ( ) . then ( resolve , reject ) ;
1370+ } ) ;
1371+
1372+ // Write the chunks to the stream at an interval.
1373+ const writing = new Promise ( ( resolve , reject ) => {
1374+ ( async ( ) => {
1375+ for await ( const chunk of chunks ) {
1376+ body . write ( chunk ) ;
1377+ await new Promise ( ( resolve ) => setTimeout ( resolve , 1 ) ) ;
1378+ }
1379+ body . end ( ) ;
1380+ resolve ( null ) ;
1381+ } ) ( ) . then ( resolve , reject ) ;
1382+ } ) ;
1383+
1384+ // Wait for both promises to resolve.
1385+ await Promise . all ( [ reading , writing ] ) ;
1386+ } ) ;
1387+
1388+ test ( "supports the server writing data in a complete mess" , async ( ) => {
1389+ const body = new PassThrough ( ) ;
1390+ const stream = createStream ( body ) ;
1391+
1392+ // Create a stream of data chunks split on the pipe character for readability.
1393+ const data = `
1394+ : hi
1395+
1396+ ev|ent: output
1397+ id: EVENT_1
1398+ data: hello,
1399+ data: this |is a new line,|
1400+ data: and this is |a new line too
1401+
1402+ event: d|one
1403+ id: EVENT|_2
1404+ data: {}
1405+
1406+ ` . replace ( / ^ [ ] + / gm, "" ) ;
1407+
1408+ const chunks = data . split ( "|" ) ;
1409+
1410+ // Consume the iterator in parallel to writing it.
1411+ const reading = new Promise ( ( resolve , reject ) => {
1412+ ( async ( ) => {
1413+ const iterator = stream [ Symbol . asyncIterator ] ( ) ;
1414+ expect ( await iterator . next ( ) ) . toEqual ( {
1415+ done : false ,
1416+ value : {
1417+ event : "output" ,
1418+ id : "EVENT_1" ,
1419+ data : "hello,\nthis is a new line,\nand this is a new line too" ,
1420+ } ,
1421+ } ) ;
1422+ expect ( await iterator . next ( ) ) . toEqual ( {
1423+ done : false ,
1424+ value : { event : "done" , id : "EVENT_2" , data : "{}" } ,
1425+ } ) ;
1426+ expect ( await iterator . next ( ) ) . toEqual ( { done : true } ) ;
1427+ } ) ( ) . then ( resolve , reject ) ;
1428+ } ) ;
1429+
1430+ // Write the chunks to the stream at an interval.
1431+ const writing = new Promise ( ( resolve , reject ) => {
1432+ ( async ( ) => {
1433+ for await ( const chunk of chunks ) {
1434+ body . write ( chunk ) ;
1435+ await new Promise ( ( resolve ) => setTimeout ( resolve , 1 ) ) ;
1436+ }
1437+ body . end ( ) ;
1438+ resolve ( null ) ;
1439+ } ) ( ) . then ( resolve , reject ) ;
1440+ } ) ;
1441+
1442+ // Wait for both promises to resolve.
1443+ await Promise . all ( [ reading , writing ] ) ;
1444+ } ) ;
1445+
1446+ test ( "supports ending without a done" , async ( ) => {
1447+ const stream = createStream (
1448+ `
1449+ event: output
1450+ id: EVENT_1
1451+ data: hello world
1452+
1453+ ` . replace ( / ^ [ ] + / gm, "" )
1454+ ) ;
1455+
1456+ const iterator = stream [ Symbol . asyncIterator ] ( ) ;
1457+ expect ( await iterator . next ( ) ) . toEqual ( {
1458+ done : false ,
1459+ value : { event : "output" , id : "EVENT_1" , data : "hello world" } ,
1460+ } ) ;
1461+ expect ( await iterator . next ( ) ) . toEqual ( { done : true } ) ;
1462+ } ) ;
1463+
1464+ test ( "an error event in the stream raises an exception" , async ( ) => {
1465+ const stream = createStream (
1466+ `
1467+ event: output
1468+ id: EVENT_1
1469+ data: hello world
1470+
1471+ event: error
1472+ id: EVENT_2
1473+ data: An unexpected error occurred
1474+
1475+ ` . replace ( / ^ [ ] + / gm, "" )
1476+ ) ;
1477+
1478+ const iterator = stream [ Symbol . asyncIterator ] ( ) ;
1479+ expect ( await iterator . next ( ) ) . toEqual ( {
1480+ done : false ,
1481+ value : { event : "output" , id : "EVENT_1" , data : "hello world" } ,
1482+ } ) ;
1483+ await expect ( iterator . next ( ) ) . rejects . toThrowError (
1484+ "An unexpected error occurred"
1485+ ) ;
1486+ expect ( await iterator . next ( ) ) . toEqual ( { done : true } ) ;
1487+ } ) ;
1488+
1489+ test ( "an error when fetching the stream raises an exception" , async ( ) => {
1490+ const stream = createStream ( "{}" , 500 ) ;
1491+ const iterator = stream [ Symbol . asyncIterator ] ( ) ;
1492+ await expect ( iterator . next ( ) ) . rejects . toThrowError (
1493+ "Request to https://stream.replicate.com/fake_stream failed with status 500"
1494+ ) ;
1495+ expect ( await iterator . next ( ) ) . toEqual ( { done : true } ) ;
1496+ } ) ;
1497+ } ) ;
11901498} ) ;
0 commit comments