diff --git a/index.d.ts b/index.d.ts index 6b56beb..0b59dcb 100644 --- a/index.d.ts +++ b/index.d.ts @@ -89,7 +89,8 @@ declare module 'replicate' { webhook?: string; webhook_events_filter?: WebhookEventType[]; signal?: AbortSignal; - } + }, + progress?: (Prediction) => void ): Promise; request(route: string | URL, options: { diff --git a/index.js b/index.js index b1803a1..b147ea5 100644 --- a/index.js +++ b/index.js @@ -86,10 +86,11 @@ class Replicate { * @param {string} [options.webhook] - An HTTPS URL for receiving a webhook when the prediction has new output * @param {string[]} [options.webhook_events_filter] - You can change which events trigger webhook requests by specifying webhook events (`start`|`output`|`logs`|`completed`) * @param {AbortSignal} [options.signal] - AbortSignal to cancel the prediction + * @param {Function} [progress] - Callback function that receives the prediction object as it's updated. The function is called when the prediction is created, each time its updated while polling for completion, and when it's completed. * @throws {Error} If the prediction failed * @returns {Promise} - Resolves with the output of running the model */ - async run(identifier, options) { + async run(identifier, options, progress) { const { wait, ...data } = options; // Define a pattern for owner and model names that allows @@ -117,17 +118,32 @@ class Replicate { version, }); + // Call progress callback with the initial prediction object + if (progress) { + progress(prediction); + } + const { signal } = options; - prediction = await this.wait(prediction, wait || {}, async ({ id }) => { + prediction = await this.wait(prediction, wait || {}, async (updatedPrediction) => { + // Call progress callback with the updated prediction object + if (progress) { + progress(updatedPrediction); + } + if (signal && signal.aborted) { - await this.predictions.cancel(id); + await this.predictions.cancel(updatedPrediction.id); return true; // stop polling } return false; // continue polling }); + // Call progress callback with the completed prediction object + if (progress) { + progress(prediction); + } + if (prediction.status === 'failed') { throw new Error(`Prediction failed: ${prediction.error}`); } @@ -252,8 +268,6 @@ class Replicate { return prediction; } - let updatedPrediction = await this.predictions.get(id); - // eslint-disable-next-line no-promise-executor-return const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms)); @@ -261,11 +275,18 @@ class Replicate { const interval = options.interval || 250; const max_attempts = options.max_attempts || null; + let updatedPrediction = await this.predictions.get(id); + while ( updatedPrediction.status !== 'succeeded' && updatedPrediction.status !== 'failed' && updatedPrediction.status !== 'canceled' ) { + /* eslint-disable no-await-in-loop */ + if (stop && await stop(updatedPrediction) === true) { + break; + } + attempts += 1; if (max_attempts && attempts > max_attempts) { throw new Error( @@ -273,12 +294,8 @@ class Replicate { ); } - /* eslint-disable no-await-in-loop */ await sleep(interval); updatedPrediction = await this.predictions.get(prediction.id); - if (stop && await stop(updatedPrediction) === true) { - break; - } /* eslint-enable no-await-in-loop */ } diff --git a/index.test.ts b/index.test.ts index 9147b81..52e7c25 100644 --- a/index.test.ts +++ b/index.test.ts @@ -517,26 +517,62 @@ describe('Replicate client', () => { describe('run', () => { test('Calls the correct API routes', async () => { + let firstPollingRequest = true; + nock(BASE_URL) .post('/predictions') + .reply(201, { + id: 'ufawqhfynnddngldkgtslldrkq', + status: 'starting', + }) + .get('/predictions/ufawqhfynnddngldkgtslldrkq') + .twice() .reply(200, { id: 'ufawqhfynnddngldkgtslldrkq', status: 'processing', }) .get('/predictions/ufawqhfynnddngldkgtslldrkq') - .reply(201, { + .reply(200, { id: 'ufawqhfynnddngldkgtslldrkq', status: 'succeeded', - output: 'foobar', + output: 'Goodbye!', }); + const progress = jest.fn(); + const output = await client.run( 'owner/model:5c7d5dc6dd8bf75c1acaa8565735e7986bc5b66206b55cca93cb72c9bf15ccaa', { input: { text: 'Hello, world!' }, - } + wait: { interval: 1 } + }, + progress ); - expect(output).toBe('foobar'); + + expect(output).toBe('Goodbye!'); + + expect(progress).toHaveBeenNthCalledWith(1, { + id: 'ufawqhfynnddngldkgtslldrkq', + status: 'starting', + }); + + expect(progress).toHaveBeenNthCalledWith(2, { + id: 'ufawqhfynnddngldkgtslldrkq', + status: 'processing', + }); + + expect(progress).toHaveBeenNthCalledWith(3, { + id: 'ufawqhfynnddngldkgtslldrkq', + status: 'processing', + }); + + expect(progress).toHaveBeenNthCalledWith(4, { + id: 'ufawqhfynnddngldkgtslldrkq', + status: 'succeeded', + output: 'Goodbye!', + }); + + expect(progress).toHaveBeenCalledTimes(4); }); test('Does not throw an error for identifier containing hyphen and full stop', async () => {