From a31f0279b99d7efee42932ba4ac10e117c59ea21 Mon Sep 17 00:00:00 2001 From: sdip0971 Date: Tue, 15 Jul 2025 15:13:51 +0530 Subject: [PATCH 1/4] feat(mongodb-checkpoint): add optional TTL support via _createdAtForTTL field --- libs/checkpoint-mongodb/src/index.ts | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/libs/checkpoint-mongodb/src/index.ts b/libs/checkpoint-mongodb/src/index.ts index 279535086..ce28ab6f3 100644 --- a/libs/checkpoint-mongodb/src/index.ts +++ b/libs/checkpoint-mongodb/src/index.ts @@ -16,6 +16,11 @@ export type MongoDBSaverParams = { dbName?: string; checkpointCollectionName?: string; checkpointWritesCollectionName?: string; + /** + * Change 1 Made ttl optional + */ + enableTTL?:boolean + }; /** @@ -23,12 +28,18 @@ export type MongoDBSaverParams = { */ export class MongoDBSaver extends BaseCheckpointSaver { protected client: MongoClient; - + protected db: MongoDatabase; checkpointCollectionName = "checkpoints"; checkpointWritesCollectionName = "checkpoint_writes"; + /** Change 2: + * Conditionally Added _createdATForTTL if ttl is enabled + */ + protected enableTTL:boolean + + constructor( { @@ -36,11 +47,13 @@ export class MongoDBSaver extends BaseCheckpointSaver { dbName, checkpointCollectionName, checkpointWritesCollectionName, + enableTTL, }: MongoDBSaverParams, serde?: SerializerProtocol ) { super(serde); this.client = client; + this.enableTTL = enableTTL ?? false; this.db = this.client.db(dbName); this.checkpointCollectionName = checkpointCollectionName ?? this.checkpointCollectionName; @@ -84,6 +97,7 @@ export class MongoDBSaver extends BaseCheckpointSaver { thread_id, checkpoint_ns, checkpoint_id: doc.checkpoint_id, + }; const checkpoint = (await this.serde.loadsTyped( doc.type, @@ -229,12 +243,20 @@ export class MongoDBSaver extends BaseCheckpointSaver { if (checkpointType !== metadataType) { throw new Error("Mismatched checkpoint and metadata types."); } + const doc = { parent_checkpoint_id: config.configurable?.checkpoint_id, type: checkpointType, checkpoint: serializedCheckpoint, metadata: serializedMetadata, + /** Change 3: + * Conditionally Added _createdATForTTL if ttl is enabled + */ + ...(this.enableTTL ? { _createdAtForTTL: new Date() } : {}), }; + + + const upsertQuery = { thread_id, checkpoint_ns, From 591f098aa38a9418342b6c6600107460eeffa20d Mon Sep 17 00:00:00 2001 From: Tat Dat Duong Date: Tue, 2 Sep 2025 14:41:51 +0200 Subject: [PATCH 2/4] Cleanup TTL interface --- libs/checkpoint-mongodb/src/index.ts | 60 ++++++++++++++++++---------- 1 file changed, 38 insertions(+), 22 deletions(-) diff --git a/libs/checkpoint-mongodb/src/index.ts b/libs/checkpoint-mongodb/src/index.ts index ce28ab6f3..03b15d55f 100644 --- a/libs/checkpoint-mongodb/src/index.ts +++ b/libs/checkpoint-mongodb/src/index.ts @@ -16,11 +16,7 @@ export type MongoDBSaverParams = { dbName?: string; checkpointCollectionName?: string; checkpointWritesCollectionName?: string; - /** - * Change 1 Made ttl optional - */ - enableTTL?:boolean - + ttl?: { expireAfterSeconds: number }; }; /** @@ -28,18 +24,36 @@ export type MongoDBSaverParams = { */ export class MongoDBSaver extends BaseCheckpointSaver { protected client: MongoClient; - + protected db: MongoDatabase; + protected ttl: { expireAfterSeconds: number } | undefined; + + protected isSetup: boolean; + checkpointCollectionName = "checkpoints"; checkpointWritesCollectionName = "checkpoint_writes"; - /** Change 2: - * Conditionally Added _createdATForTTL if ttl is enabled - */ - protected enableTTL:boolean - + async setup(): Promise { + await this.db.createIndex( + this.checkpointCollectionName, + { _createdAtForTTL: 1 }, + { expireAfterSeconds: 60 * 60 } + ); + this.isSetup = true; + } + + protected assertSetup() { + // Skip setup check if TTL is not enabled + if (this.ttl == null) return; + + if (!this.isSetup) { + throw new Error( + "MongoDBSaver is not initialized. Please call `MongoDBSaver.setup()` first before using the checkpointer." + ); + } + } constructor( { @@ -47,14 +61,16 @@ export class MongoDBSaver extends BaseCheckpointSaver { dbName, checkpointCollectionName, checkpointWritesCollectionName, - enableTTL, + ttl, }: MongoDBSaverParams, serde?: SerializerProtocol ) { super(serde); this.client = client; - this.enableTTL = enableTTL ?? false; + this.ttl = ttl; this.db = this.client.db(dbName); + this.isSetup = false; + this.checkpointCollectionName = checkpointCollectionName ?? this.checkpointCollectionName; this.checkpointWritesCollectionName = @@ -68,6 +84,8 @@ export class MongoDBSaver extends BaseCheckpointSaver { * for the given thread ID is retrieved. */ async getTuple(config: RunnableConfig): Promise { + this.assertSetup(); + const { thread_id, checkpoint_ns = "", @@ -97,7 +115,6 @@ export class MongoDBSaver extends BaseCheckpointSaver { thread_id, checkpoint_ns, checkpoint_id: doc.checkpoint_id, - }; const checkpoint = (await this.serde.loadsTyped( doc.type, @@ -149,6 +166,8 @@ export class MongoDBSaver extends BaseCheckpointSaver { config: RunnableConfig, options?: CheckpointListOptions ): AsyncGenerator { + this.assertSetup(); + const { limit, before, filter } = options ?? {}; const query: Record = {}; @@ -224,6 +243,8 @@ export class MongoDBSaver extends BaseCheckpointSaver { checkpoint: Checkpoint, metadata: CheckpointMetadata ): Promise { + this.assertSetup(); + const thread_id = config.configurable?.thread_id; const checkpoint_ns = config.configurable?.checkpoint_ns ?? ""; const checkpoint_id = checkpoint.id; @@ -243,20 +264,13 @@ export class MongoDBSaver extends BaseCheckpointSaver { if (checkpointType !== metadataType) { throw new Error("Mismatched checkpoint and metadata types."); } - const doc = { parent_checkpoint_id: config.configurable?.checkpoint_id, type: checkpointType, checkpoint: serializedCheckpoint, metadata: serializedMetadata, - /** Change 3: - * Conditionally Added _createdATForTTL if ttl is enabled - */ - ...(this.enableTTL ? { _createdAtForTTL: new Date() } : {}), + ...(this.ttl ? { _createdAtForTTL: new Date() } : {}), }; - - - const upsertQuery = { thread_id, checkpoint_ns, @@ -283,6 +297,8 @@ export class MongoDBSaver extends BaseCheckpointSaver { writes: PendingWrite[], taskId: string ): Promise { + this.assertSetup(); + const thread_id = config.configurable?.thread_id; const checkpoint_ns = config.configurable?.checkpoint_ns; const checkpoint_id = config.configurable?.checkpoint_id; From 7aec52f87e27bbf3870eb81265cb03b11b3f1b97 Mon Sep 17 00:00:00 2001 From: Tat Dat Duong Date: Tue, 2 Sep 2025 14:47:17 +0200 Subject: [PATCH 3/4] Add TTL for collection writes as well --- libs/checkpoint-mongodb/src/index.ts | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/libs/checkpoint-mongodb/src/index.ts b/libs/checkpoint-mongodb/src/index.ts index 03b15d55f..b49080632 100644 --- a/libs/checkpoint-mongodb/src/index.ts +++ b/libs/checkpoint-mongodb/src/index.ts @@ -36,11 +36,22 @@ export class MongoDBSaver extends BaseCheckpointSaver { checkpointWritesCollectionName = "checkpoint_writes"; async setup(): Promise { - await this.db.createIndex( - this.checkpointCollectionName, - { _createdAtForTTL: 1 }, - { expireAfterSeconds: 60 * 60 } - ); + if (this.ttl != null) { + const { expireAfterSeconds } = this.ttl; + await Promise.all([ + this.db.createIndex( + this.checkpointCollectionName, + { _createdAtForTTL: 1 }, + { expireAfterSeconds } + ), + this.db.createIndex( + this.checkpointWritesCollectionName, + { _createdAtForTTL: 1 }, + { expireAfterSeconds } + ), + ]); + } + this.isSetup = true; } From 76daaf78b639adebdbbbcd75ceb887ff993854b3 Mon Sep 17 00:00:00 2001 From: Tat Dat Duong Date: Tue, 2 Sep 2025 14:58:34 +0200 Subject: [PATCH 4/4] Add TTL for writes as well --- libs/checkpoint-mongodb/src/index.ts | 25 +- .../src/tests/checkpoints.int.test.ts | 219 ++++++++++-------- 2 files changed, 134 insertions(+), 110 deletions(-) diff --git a/libs/checkpoint-mongodb/src/index.ts b/libs/checkpoint-mongodb/src/index.ts index b49080632..71051bbb7 100644 --- a/libs/checkpoint-mongodb/src/index.ts +++ b/libs/checkpoint-mongodb/src/index.ts @@ -39,16 +39,12 @@ export class MongoDBSaver extends BaseCheckpointSaver { if (this.ttl != null) { const { expireAfterSeconds } = this.ttl; await Promise.all([ - this.db.createIndex( - this.checkpointCollectionName, - { _createdAtForTTL: 1 }, - { expireAfterSeconds } - ), - this.db.createIndex( - this.checkpointWritesCollectionName, - { _createdAtForTTL: 1 }, - { expireAfterSeconds } - ), + this.db + .collection(this.checkpointCollectionName) + .createIndex({ _createdAtForTTL: 1 }, { expireAfterSeconds }), + this.db + .collection(this.checkpointWritesCollectionName) + .createIndex({ _createdAtForTTL: 1 }, { expireAfterSeconds }), ]); } @@ -338,7 +334,14 @@ export class MongoDBSaver extends BaseCheckpointSaver { return { updateOne: { filter: upsertQuery, - update: { $set: { channel, type, value: serializedValue } }, + update: { + $set: { + channel, + type, + value: serializedValue, + ...(this.ttl ? { _createdAtForTTL: new Date() } : {}), + }, + }, upsert: true, }, }; diff --git a/libs/checkpoint-mongodb/src/tests/checkpoints.int.test.ts b/libs/checkpoint-mongodb/src/tests/checkpoints.int.test.ts index 523bf9847..3793f2c61 100644 --- a/libs/checkpoint-mongodb/src/tests/checkpoints.int.test.ts +++ b/libs/checkpoint-mongodb/src/tests/checkpoints.int.test.ts @@ -1,4 +1,4 @@ -import { describe, it, expect, afterAll } from "vitest"; +import { describe, it, expect, afterEach, afterAll } from "vitest"; import { MongoClient } from "mongodb"; import { Checkpoint, @@ -47,128 +47,149 @@ const client = new MongoClient(getEnvironmentVariable("MONGODB_URL")!, { auth: { username: "user", password: "password" }, }); -afterAll(async () => { +afterEach(async () => { const db = client.db(); await db.dropCollection("checkpoints"); await db.dropCollection("checkpoint_writes"); +}); + +afterAll(async () => { await client.close(); }); describe("MongoDBSaver", () => { - it("should save and retrieve checkpoints correctly", async () => { - const saver = new MongoDBSaver({ client }); - - // get undefined checkpoint - const undefinedCheckpoint = await saver.getTuple({ - configurable: { thread_id: "1" }, - }); - expect(undefinedCheckpoint).toBeUndefined(); - - // save first checkpoint - const runnableConfig = await saver.put( - { configurable: { thread_id: "1" } }, - checkpoint1, - { source: "update", step: -1, parents: {} } - ); - expect(runnableConfig).toEqual({ - configurable: { - thread_id: "1", - checkpoint_ns: "", - checkpoint_id: checkpoint1.id, - }, - }); - - // add some writes - await saver.putWrites( - { + it.each([{ ttl: undefined }, { ttl: { expireAfterSeconds: 60 * 60 } }])( + "should save and retrieve checkpoints correctly (%s)", + async ({ ttl }) => { + const saver = new MongoDBSaver({ client, ttl }); + await saver.setup(); + + const threadId = crypto.randomUUID(); + + // get undefined checkpoint + const undefinedCheckpoint = await saver.getTuple({ + configurable: { thread_id: threadId }, + }); + expect(undefinedCheckpoint).toBeUndefined(); + + // save first checkpoint + const runnableConfig = await saver.put( + { configurable: { thread_id: threadId } }, + checkpoint1, + { source: "update", step: -1, parents: {} } + ); + expect(runnableConfig).toEqual({ configurable: { + thread_id: threadId, + checkpoint_ns: "", checkpoint_id: checkpoint1.id, + }, + }); + + // add some writes + await saver.putWrites( + { + configurable: { + checkpoint_id: checkpoint1.id, + checkpoint_ns: "", + thread_id: threadId, + }, + }, + [["bar", "baz"]], + "foo" + ); + + // get first checkpoint tuple + const firstCheckpointTuple = await saver.getTuple({ + configurable: { thread_id: threadId }, + }); + expect(firstCheckpointTuple?.config).toEqual({ + configurable: { + thread_id: threadId, checkpoint_ns: "", - thread_id: "1", + checkpoint_id: checkpoint1.id, }, - }, - [["bar", "baz"]], - "foo" - ); - - // get first checkpoint tuple - const firstCheckpointTuple = await saver.getTuple({ - configurable: { thread_id: "1" }, - }); - expect(firstCheckpointTuple?.config).toEqual({ - configurable: { - thread_id: "1", - checkpoint_ns: "", - checkpoint_id: checkpoint1.id, - }, - }); - expect(firstCheckpointTuple?.checkpoint).toEqual(checkpoint1); - expect(firstCheckpointTuple?.parentConfig).toBeUndefined(); - expect(firstCheckpointTuple?.pendingWrites).toEqual([ - ["foo", "bar", "baz"], - ]); - - // save second checkpoint - await saver.put( - { + }); + expect(firstCheckpointTuple?.checkpoint).toEqual(checkpoint1); + expect(firstCheckpointTuple?.parentConfig).toBeUndefined(); + expect(firstCheckpointTuple?.pendingWrites).toEqual([ + ["foo", "bar", "baz"], + ]); + + // save second checkpoint + await saver.put( + { + configurable: { + thread_id: threadId, + checkpoint_id: "2024-04-18T17:19:07.952Z", + }, + }, + checkpoint2, + { source: "update", step: -1, parents: {} } + ); + + // verify that parentTs is set and retrieved correctly for second checkpoint + const secondCheckpointTuple = await saver.getTuple({ + configurable: { thread_id: threadId }, + }); + expect(secondCheckpointTuple?.parentConfig).toEqual({ configurable: { - thread_id: "1", + thread_id: threadId, + checkpoint_ns: "", checkpoint_id: "2024-04-18T17:19:07.952Z", }, - }, - checkpoint2, - { source: "update", step: -1, parents: {} } - ); - - // verify that parentTs is set and retrieved correctly for second checkpoint - const secondCheckpointTuple = await saver.getTuple({ - configurable: { thread_id: "1" }, - }); - expect(secondCheckpointTuple?.parentConfig).toEqual({ - configurable: { - thread_id: "1", - checkpoint_ns: "", - checkpoint_id: "2024-04-18T17:19:07.952Z", - }, - }); - - // list checkpoints - const checkpointTupleGenerator = saver.list({ - configurable: { thread_id: "1" }, - }); - const checkpointTuples: CheckpointTuple[] = []; - for await (const checkpoint of checkpointTupleGenerator) { - checkpointTuples.push(checkpoint); + }); + + // list checkpoints + const checkpointTupleGenerator = saver.list({ + configurable: { thread_id: threadId }, + }); + const checkpointTuples: CheckpointTuple[] = []; + for await (const checkpoint of checkpointTupleGenerator) { + checkpointTuples.push(checkpoint); + } + expect(checkpointTuples.length).toBe(2); + + const checkpointTuple1 = checkpointTuples[0]; + const checkpointTuple2 = checkpointTuples[1]; + expect(checkpointTuple1.checkpoint.ts).toBe("2024-04-20T17:19:07.952Z"); + expect(checkpointTuple2.checkpoint.ts).toBe("2024-04-19T17:19:07.952Z"); } - expect(checkpointTuples.length).toBe(2); - - const checkpointTuple1 = checkpointTuples[0]; - const checkpointTuple2 = checkpointTuples[1]; - expect(checkpointTuple1.checkpoint.ts).toBe("2024-04-20T17:19:07.952Z"); - expect(checkpointTuple2.checkpoint.ts).toBe("2024-04-19T17:19:07.952Z"); - }); + ); it("should delete thread", async () => { + const threadId1 = crypto.randomUUID(); + const threadId2 = crypto.randomUUID(); + const saver = new MongoDBSaver({ client }); - await saver.put({ configurable: { thread_id: "1" } }, emptyCheckpoint(), { - source: "update", - step: -1, - parents: {}, - }); - await saver.put({ configurable: { thread_id: "2" } }, emptyCheckpoint(), { - source: "update", - step: -1, - parents: {}, - }); + await saver.put( + { configurable: { thread_id: threadId1 } }, + emptyCheckpoint(), + { + source: "update", + step: -1, + parents: {}, + } + ); + + await saver.put( + { configurable: { thread_id: threadId2 } }, + emptyCheckpoint(), + { + source: "update", + step: -1, + parents: {}, + } + ); - await saver.deleteThread("1"); + await saver.deleteThread(threadId1); expect( - await saver.getTuple({ configurable: { thread_id: "1" } }) + await saver.getTuple({ configurable: { thread_id: threadId1 } }) ).toBeUndefined(); expect( - await saver.getTuple({ configurable: { thread_id: "2" } }) + await saver.getTuple({ configurable: { thread_id: threadId2 } }) ).toBeDefined(); }); });