Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ declare module 'replicate' {
webhook?: string;
webhook_events_filter?: WebhookEventType[];
signal?: AbortSignal;
}
},
progress?: (Prediction) => void
): Promise<object>;

request(route: string | URL, options: {
Expand Down
35 changes: 26 additions & 9 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,11 @@ class Replicate {
* @param {string} [options.webhook] - An HTTPS URL for receiving a webhook when the prediction has new output
* @param {string[]} [options.webhook_events_filter] - You can change which events trigger webhook requests by specifying webhook events (`start`|`output`|`logs`|`completed`)
* @param {AbortSignal} [options.signal] - AbortSignal to cancel the prediction
* @param {Function} [progress] - Callback function that receives the prediction object as it's updated. The function is called when the prediction is created, each time its updated while polling for completion, and when it's completed.
* @throws {Error} If the prediction failed
* @returns {Promise<object>} - Resolves with the output of running the model
*/
async run(identifier, options) {
async run(identifier, options, progress) {
const { wait, ...data } = options;

// Define a pattern for owner and model names that allows
Expand Down Expand Up @@ -117,17 +118,32 @@ class Replicate {
version,
});

// Call progress callback with the initial prediction object
if (progress) {
progress(prediction);
}

const { signal } = options;

prediction = await this.wait(prediction, wait || {}, async ({ id }) => {
prediction = await this.wait(prediction, wait || {}, async (updatedPrediction) => {
// Call progress callback with the updated prediction object
if (progress) {
progress(updatedPrediction);
}

if (signal && signal.aborted) {
await this.predictions.cancel(id);
await this.predictions.cancel(updatedPrediction.id);
return true; // stop polling
}

return false; // continue polling
});

// Call progress callback with the completed prediction object
if (progress) {
progress(prediction);
}

if (prediction.status === 'failed') {
throw new Error(`Prediction failed: ${prediction.error}`);
}
Expand Down Expand Up @@ -252,33 +268,34 @@ class Replicate {
return prediction;
}

let updatedPrediction = await this.predictions.get(id);

// eslint-disable-next-line no-promise-executor-return
const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms));

let attempts = 0;
const interval = options.interval || 250;
const max_attempts = options.max_attempts || null;

let updatedPrediction = await this.predictions.get(id);

while (
updatedPrediction.status !== 'succeeded' &&
updatedPrediction.status !== 'failed' &&
updatedPrediction.status !== 'canceled'
) {
/* eslint-disable no-await-in-loop */
if (stop && await stop(updatedPrediction) === true) {
break;
}

attempts += 1;
if (max_attempts && attempts > max_attempts) {
throw new Error(
`Prediction ${id} did not finish after ${max_attempts} attempts`
);
}

/* eslint-disable no-await-in-loop */
await sleep(interval);
updatedPrediction = await this.predictions.get(prediction.id);
if (stop && await stop(updatedPrediction) === true) {
break;
}
/* eslint-enable no-await-in-loop */
}

Expand Down
44 changes: 40 additions & 4 deletions index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -517,26 +517,62 @@ describe('Replicate client', () => {

describe('run', () => {
test('Calls the correct API routes', async () => {
let firstPollingRequest = true;

nock(BASE_URL)
.post('/predictions')
.reply(201, {
id: 'ufawqhfynnddngldkgtslldrkq',
status: 'starting',
})
.get('/predictions/ufawqhfynnddngldkgtslldrkq')
.twice()
.reply(200, {
id: 'ufawqhfynnddngldkgtslldrkq',
status: 'processing',
})
.get('/predictions/ufawqhfynnddngldkgtslldrkq')
.reply(201, {
.reply(200, {
id: 'ufawqhfynnddngldkgtslldrkq',
status: 'succeeded',
output: 'foobar',
output: 'Goodbye!',
});

const progress = jest.fn();

const output = await client.run(
'owner/model:5c7d5dc6dd8bf75c1acaa8565735e7986bc5b66206b55cca93cb72c9bf15ccaa',
{
input: { text: 'Hello, world!' },
}
wait: { interval: 1 }
},
progress
);
expect(output).toBe('foobar');

expect(output).toBe('Goodbye!');

expect(progress).toHaveBeenNthCalledWith(1, {
id: 'ufawqhfynnddngldkgtslldrkq',
status: 'starting',
});

expect(progress).toHaveBeenNthCalledWith(2, {
id: 'ufawqhfynnddngldkgtslldrkq',
status: 'processing',
});

expect(progress).toHaveBeenNthCalledWith(3, {
id: 'ufawqhfynnddngldkgtslldrkq',
status: 'processing',
});

expect(progress).toHaveBeenNthCalledWith(4, {
id: 'ufawqhfynnddngldkgtslldrkq',
status: 'succeeded',
output: 'Goodbye!',
});

expect(progress).toHaveBeenCalledTimes(4);
});

test('Does not throw an error for identifier containing hyphen and full stop', async () => {
Expand Down