From cd7617ea0a6cbadb549fa90b1b14f68666a6eea0 Mon Sep 17 00:00:00 2001 From: Sebastian Alex Date: Tue, 5 Mar 2024 15:17:58 +0000 Subject: [PATCH 01/14] node: change NodeFileSystem types for streams, add createReadStream --- packages/node/src/storage/FsNodeFileSystem.ts | 13 ++++---- .../src/storage/interfaces/NodeFileSystem.ts | 10 ++----- packages/node/tests/_mocks/fileSystem.ts | 30 ++++++++++++++----- 3 files changed, 34 insertions(+), 19 deletions(-) diff --git a/packages/node/src/storage/FsNodeFileSystem.ts b/packages/node/src/storage/FsNodeFileSystem.ts index ae3e48d3..b1f05e0b 100644 --- a/packages/node/src/storage/FsNodeFileSystem.ts +++ b/packages/node/src/storage/FsNodeFileSystem.ts @@ -1,7 +1,8 @@ import { BacktraceAttachment } from '@backtrace/sdk-core'; import fs from 'fs'; +import { Readable, Writable } from 'stream'; import { BacktraceFileAttachment } from '../attachment/index.js'; -import { NodeFileSystem, WritableStream } from './interfaces/NodeFileSystem.js'; +import { NodeFileSystem } from './interfaces/NodeFileSystem.js'; export class FsNodeFileSystem implements NodeFileSystem { public readDir(dir: string): Promise { @@ -52,10 +53,12 @@ export class FsNodeFileSystem implements NodeFileSystem { fs.renameSync(oldPath, newPath); } - public createWriteStream(path: string): WritableStream { - const stream = fs.createWriteStream(path, 'utf-8'); - (stream as Partial).writeSync = (chunk) => stream.write(chunk); - return stream as unknown as WritableStream; + public createWriteStream(path: string): Writable { + return fs.createWriteStream(path, 'utf-8'); + } + + public createReadStream(path: string): Readable { + return fs.createReadStream(path, 'utf-8'); } public async exists(path: string): Promise { diff --git a/packages/node/src/storage/interfaces/NodeFileSystem.ts b/packages/node/src/storage/interfaces/NodeFileSystem.ts index e3595d23..109a1bd1 100644 --- a/packages/node/src/storage/interfaces/NodeFileSystem.ts +++ b/packages/node/src/storage/interfaces/NodeFileSystem.ts @@ -1,13 +1,9 @@ import { FileSystem } from '@backtrace/sdk-core'; - -export interface WritableStream { - write(chunk: string, callback?: (err?: Error | null) => void): void; - writeSync(chunk: string): void; - close(): void; -} +import { Readable, Writable } from 'stream'; export interface NodeFileSystem extends FileSystem { - createWriteStream(path: string): WritableStream; + createReadStream(path: string): Readable; + createWriteStream(path: string): Writable; rename(oldPath: string, newPath: string): Promise; renameSync(oldPath: string, newPath: string): void; } diff --git a/packages/node/tests/_mocks/fileSystem.ts b/packages/node/tests/_mocks/fileSystem.ts index 434b0c64..f2413645 100644 --- a/packages/node/tests/_mocks/fileSystem.ts +++ b/packages/node/tests/_mocks/fileSystem.ts @@ -1,7 +1,7 @@ -import { MockedFileSystem, mockFileSystem } from '@backtrace/sdk-core/tests/_mocks/fileSystem'; +import { MockedFileSystem, mockFileSystem } from '@backtrace/sdk-core/tests/_mocks/fileSystem.js'; import path from 'path'; -import { Writable } from 'stream'; -import { NodeFileSystem, WritableStream } from '../../src/storage/interfaces/NodeFileSystem.js'; +import { Readable, Writable } from 'stream'; +import { NodeFileSystem } from '../../src/storage/interfaces/NodeFileSystem.js'; export function mockStreamFileSystem(files?: Record): MockedFileSystem { const fs = mockFileSystem(files); @@ -22,7 +22,7 @@ export function mockStreamFileSystem(files?: Record): MockedFile }), createWriteStream: jest.fn().mockImplementation((p: string) => { - const writable = new Writable({ + return new Writable({ write(chunk, encoding, callback) { const str = Buffer.isBuffer(chunk) ? chunk.toString('utf-8') @@ -40,11 +40,27 @@ export function mockStreamFileSystem(files?: Record): MockedFile callback && callback(); }, }); + }), - (writable as Partial).close = () => writable.end(); - (writable as Partial).writeSync = (chunk) => writable.write(chunk); + createReadStream: jest.fn().mockImplementation((p: string) => { + const fullPath = path.resolve(p); + const file = fs.files[fullPath]; + if (!file) { + throw new Error(`File ${p} does not exist`); + } - return writable; + let position = 0; + return new Readable({ + read(size) { + const chunk = file.substring(position, position + size); + if (!chunk) { + this.push(null); + } else { + this.push(Buffer.from(chunk)); + position += size; + } + }, + }); }), }; } From 935eb37ebc63e4d810ec2bec0e0325706c32bd1e Mon Sep 17 00:00:00 2001 From: Sebastian Alex Date: Tue, 5 Mar 2024 15:18:14 +0000 Subject: [PATCH 02/14] node: use fileSystem in BacktraceFileAttachment --- packages/node/src/attachment/BacktraceFileAttachment.ts | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/packages/node/src/attachment/BacktraceFileAttachment.ts b/packages/node/src/attachment/BacktraceFileAttachment.ts index e1cc04ac..df581d74 100644 --- a/packages/node/src/attachment/BacktraceFileAttachment.ts +++ b/packages/node/src/attachment/BacktraceFileAttachment.ts @@ -2,6 +2,7 @@ import { BacktraceFileAttachment as CoreBacktraceFileAttachment } from '@backtra import fs from 'fs'; import path from 'path'; import { Readable } from 'stream'; +import { NodeFileSystem } from '../storage/interfaces/NodeFileSystem'; export class BacktraceFileAttachment implements CoreBacktraceFileAttachment { public readonly name: string; @@ -9,14 +10,15 @@ export class BacktraceFileAttachment implements CoreBacktraceFileAttachment Date: Tue, 5 Mar 2024 15:19:05 +0000 Subject: [PATCH 03/14] node: rewrite AlternatingFileWriter with react-native logic --- .../node/src/common/AlternatingFileWriter.ts | 146 ++++++++++++++---- .../node/tests/common/alternatingFile.spec.ts | 20 +-- 2 files changed, 122 insertions(+), 44 deletions(-) diff --git a/packages/node/src/common/AlternatingFileWriter.ts b/packages/node/src/common/AlternatingFileWriter.ts index ea60396b..eebebccd 100644 --- a/packages/node/src/common/AlternatingFileWriter.ts +++ b/packages/node/src/common/AlternatingFileWriter.ts @@ -1,67 +1,137 @@ -import { NodeFileSystem, WritableStream } from '../storage/interfaces/NodeFileSystem.js'; +import { Writable } from 'stream'; +import { NodeFileSystem } from '../storage/interfaces/NodeFileSystem.js'; export class AlternatingFileWriter { - private _fileStream?: WritableStream; + private _fileStream?: Writable; private _count = 0; + private _size = 0; private _disposed = false; + private readonly _logQueue: string[] = []; + private _currentAppendedLog?: string; + constructor( + private readonly _fileSystem: NodeFileSystem, private readonly _mainFile: string, private readonly _fallbackFile: string, - private readonly _fileCapacity: number, - private readonly _fileSystem: NodeFileSystem, + private readonly _maxLines: number, + private readonly _maxSize?: number, ) {} - public async writeLine(value: string): Promise { - if (this._fileCapacity <= 0) { - return this; - } - + public async writeLine(value: string): Promise { if (this._disposed) { throw new Error('This instance has been disposed.'); } + this._logQueue.push(value); + if (!this._currentAppendedLog) { + return await this.process(); + } + } + + private async process(): Promise { + this._currentAppendedLog = this._logQueue.shift(); + + if (!this._currentAppendedLog) { + return; + } + + const appendLength = this._currentAppendedLog.length + 1; + this.prepareBreadcrumbStream(appendLength); + if (!this._fileStream) { - const stream = this.safeCreateStream(this._mainFile); - if (!stream) { - return this; + this._logQueue.unshift(this._currentAppendedLog); + this._currentAppendedLog = undefined; + return; + } + + // if the queue is full and we can save more item in a batch + // try to save as much as possible to speed up potential native operations + this._count += 1; + this._size += appendLength; + + const logsToAppend = [this._currentAppendedLog]; + + let logsToTake = 0; + let currentCount = this._count; + let currentSize = this._size; + + for (let i = 0; i < this._logQueue.length; i++) { + const log = this._logQueue[i]; + if (!log) { + continue; } - this._fileStream = stream; - } else if (this._count >= this._fileCapacity) { - this._fileStream.close(); - this.safeMoveMainToFallback(); - this._count = 0; + const logLength = log.length + 1; - const stream = this.safeCreateStream(this._mainFile); - if (!stream) { - return this; + if (currentCount + 1 > this._maxLines) { + break; } - this._fileStream = stream; + if (this._maxSize && currentSize + logLength >= this._maxSize) { + break; + } + + logsToTake++; + currentCount++; + currentSize += logLength; } - await this.safeWriteAsync(this._fileStream, value + '\n'); - this._count++; + const restAppendingLogs = this._logQueue.splice(0, logsToTake); + this._count = this._count + restAppendingLogs.length; + this._size += restAppendingLogs.reduce((sum, l) => sum + l.length + 1, 0); + + logsToAppend.push(...restAppendingLogs); - return this; + return await this.writeAsync(this._fileStream, logsToAppend.join('\n') + '\n') + .catch(() => { + // handle potential issues with appending logs. + // we can't do really too much here other than retry + // logging the error might also cause a breadcrumb loop, that we should try to avoid + this._logQueue.unshift(...logsToAppend); + }) + .finally(() => { + if (this._logQueue.length !== 0) { + return this.process(); + } else { + this._currentAppendedLog = undefined; + } + }); } - private safeWriteAsync(fs: WritableStream, data: string) { - return new Promise((resolve) => fs.write(data, (err) => (err ? resolve(false) : resolve(true)))); + private writeAsync(fs: Writable, data: string) { + return new Promise((resolve, reject) => fs.write(data, (err) => (err ? reject(err) : resolve()))); } - public dispose() { - this._fileStream?.close(); - this._disposed = true; + private prepareBreadcrumbStream(newSize: number) { + if (!this._fileStream) { + this._fileStream = this.safeCreateStream(this._mainFile); + } else if (this._count >= this._maxLines || (this._maxSize && this._size + newSize >= this._maxSize)) { + this.switchFile(); + } } - private safeCreateStream(path: string) { - try { - return this._fileSystem.createWriteStream(path); - } catch { - return undefined; + private switchFile() { + if (this._fileStream) { + this._fileStream.destroy(); } + + this._fileStream = undefined; + + const renameResult = this.safeMoveMainToFallback(); + if (!renameResult) { + return; + } + + this._fileStream = this.safeCreateStream(this._mainFile); + + this._count = 0; + this._size = 0; + } + + public dispose() { + this._fileStream?.destroy(); + this._disposed = true; } private safeMoveMainToFallback() { @@ -72,4 +142,12 @@ export class AlternatingFileWriter { return false; } } + + private safeCreateStream(path: string) { + try { + return this._fileSystem.createWriteStream(path); + } catch { + return undefined; + } + } } diff --git a/packages/node/tests/common/alternatingFile.spec.ts b/packages/node/tests/common/alternatingFile.spec.ts index 9dfdac5f..0690d08c 100644 --- a/packages/node/tests/common/alternatingFile.spec.ts +++ b/packages/node/tests/common/alternatingFile.spec.ts @@ -31,7 +31,7 @@ describe('AlternatingFileWriter', () => { }); it('should add line to the main file', async () => { - const writer = new AlternatingFileWriter(file1, file2, 10, new FsNodeFileSystem()); + const writer = new AlternatingFileWriter(new FsNodeFileSystem(), file1, file2, 10); await writer.writeLine('value'); writer.dispose(); @@ -41,7 +41,7 @@ describe('AlternatingFileWriter', () => { it('should not move main file to fallback file before adding with fileCapacity reached', async () => { const count = 5; - const writer = new AlternatingFileWriter(file1, file2, count, new FsNodeFileSystem()); + const writer = new AlternatingFileWriter(new FsNodeFileSystem(), file1, file2, count); for (let i = 0; i < count; i++) { await writer.writeLine(`value-${i}`); } @@ -52,7 +52,7 @@ describe('AlternatingFileWriter', () => { it('should move main file to fallback file after adding with fileCapacity reached', async () => { const count = 5; - const writer = new AlternatingFileWriter(file1, file2, count, new FsNodeFileSystem()); + const writer = new AlternatingFileWriter(new FsNodeFileSystem(), file1, file2, count); for (let i = 0; i < count; i++) { await writer.writeLine(`value-${i}`); } @@ -67,7 +67,7 @@ describe('AlternatingFileWriter', () => { it('should add line to the main file after adding with fileCapacity reached', async () => { const count = 5; - const writer = new AlternatingFileWriter(file1, file2, count, new FsNodeFileSystem()); + const writer = new AlternatingFileWriter(new FsNodeFileSystem(), file1, file2, count); for (let i = 0; i < count; i++) { await writer.writeLine(`value-${i}`); } @@ -80,13 +80,13 @@ describe('AlternatingFileWriter', () => { }); it('should throw after adding line when disposed', async () => { - const writer = new AlternatingFileWriter(file1, file2, 10, new FsNodeFileSystem()); + const writer = new AlternatingFileWriter(new FsNodeFileSystem(), file1, file2, 10); writer.dispose(); await expect(writer.writeLine('value-x')).rejects.toThrowError('This instance has been disposed.'); }); it('should not write when fileCapacity is 0', () => { - const writer = new AlternatingFileWriter(file1, file2, 0, new FsNodeFileSystem()); + const writer = new AlternatingFileWriter(new FsNodeFileSystem(), file1, file2, 0); writer.writeLine('abc'); writer.dispose(); @@ -95,7 +95,7 @@ describe('AlternatingFileWriter', () => { }); it('should not write fileCapacity is less than 0', () => { - const writer = new AlternatingFileWriter(file1, file2, -1, new FsNodeFileSystem()); + const writer = new AlternatingFileWriter(new FsNodeFileSystem(), file1, file2, -1); writer.writeLine('abc'); writer.dispose(); @@ -105,7 +105,7 @@ describe('AlternatingFileWriter', () => { describe('stress test', () => { it('should not throw', async () => { - const writer = new AlternatingFileWriter(file1, file2, 1, new FsNodeFileSystem()); + const writer = new AlternatingFileWriter(new FsNodeFileSystem(), file1, file2, 1); const write = async (count: number, entry: string) => { for (let i = 0; i < count; i++) { @@ -117,7 +117,7 @@ describe('AlternatingFileWriter', () => { const writeCount = 100; const promises = [...new Array(writerCount)].map(() => write(writeCount, 'text')); await expect(Promise.all(promises)).resolves.not.toThrow(); - }); + }, 10000); it('should not skip text', async () => { const fs = mockStreamFileSystem(); @@ -130,7 +130,7 @@ describe('AlternatingFileWriter', () => { return renameSync(oldPath, newPath); }); - const writer = new AlternatingFileWriter(file1, file2, 1, fs); + const writer = new AlternatingFileWriter(fs, file1, file2, 1); const write = async (count: number, entry: string) => { for (let i = 0; i < count; i++) { From af0192275008ee5744985d7a6f2f9163b8529cef Mon Sep 17 00:00:00 2001 From: Sebastian Alex Date: Tue, 5 Mar 2024 15:20:01 +0000 Subject: [PATCH 04/14] node: use new AlternatingFileWriter with limits, add and use factory for FileBreadcrumbsStorage --- packages/node/src/BacktraceClient.ts | 8 +- .../src/breadcrumbs/FileBreadcrumbsStorage.ts | 36 +- .../FileBreadcrumbsStorage.spec.ts | 326 ++++++++++++++++++ 3 files changed, 352 insertions(+), 18 deletions(-) create mode 100644 packages/node/tests/breadcrumbs/FileBreadcrumbsStorage.spec.ts diff --git a/packages/node/src/BacktraceClient.ts b/packages/node/src/BacktraceClient.ts index 45384c35..709e569d 100644 --- a/packages/node/src/BacktraceClient.ts +++ b/packages/node/src/BacktraceClient.ts @@ -44,13 +44,7 @@ export class BacktraceClient extends BacktraceCoreClient const breadcrumbsManager = this.modules.get(BreadcrumbsManager); if (breadcrumbsManager && this.sessionFiles) { - breadcrumbsManager.setStorage( - FileBreadcrumbsStorage.create( - this.sessionFiles, - fileSystem, - (clientSetup.options.breadcrumbs?.maximumBreadcrumbs ?? 100) || 100, - ), - ); + breadcrumbsManager.setStorage(FileBreadcrumbsStorage.factory(this.sessionFiles, fileSystem)); } if (this.sessionFiles && clientSetup.options.database?.captureNativeCrashes) { diff --git a/packages/node/src/breadcrumbs/FileBreadcrumbsStorage.ts b/packages/node/src/breadcrumbs/FileBreadcrumbsStorage.ts index 9ab64429..d3c75edb 100644 --- a/packages/node/src/breadcrumbs/FileBreadcrumbsStorage.ts +++ b/packages/node/src/breadcrumbs/FileBreadcrumbsStorage.ts @@ -4,6 +4,8 @@ import { Breadcrumb, BreadcrumbLogLevel, BreadcrumbsStorage, + BreadcrumbsStorageFactory, + BreadcrumbsStorageLimits, BreadcrumbType, jsonEscaper, RawBreadcrumb, @@ -11,6 +13,7 @@ import { TimeHelper, } from '@backtrace/sdk-core'; import path from 'path'; +import { Readable } from 'stream'; import { BacktraceFileAttachment } from '../attachment/index.js'; import { AlternatingFileWriter } from '../common/AlternatingFileWriter.js'; import { NodeFileSystem } from '../storage/interfaces/NodeFileSystem.js'; @@ -29,14 +32,15 @@ export class FileBreadcrumbsStorage implements BreadcrumbsStorage { constructor( private readonly _mainFile: string, private readonly _fallbackFile: string, - fileSystem: NodeFileSystem, - maximumBreadcrumbs: number, + private readonly _fileSystem: NodeFileSystem, + private readonly _limits: BreadcrumbsStorageLimits, ) { this._writer = new AlternatingFileWriter( + _fileSystem, _mainFile, _fallbackFile, - Math.floor(maximumBreadcrumbs / 2), - fileSystem, + Math.floor((this._limits.maximumBreadcrumbs ?? 100) / 2), + this._limits.maximumTotalBreadcrumbsSize, ); } @@ -49,16 +53,18 @@ export class FileBreadcrumbsStorage implements BreadcrumbsStorage { return files.map((file) => new BacktraceFileAttachment(file, path.basename(file))); } - public static create(session: SessionFiles, fileSystem: NodeFileSystem, maximumBreadcrumbs: number) { - const file1 = session.getFileName(this.getFileName(0)); - const file2 = session.getFileName(this.getFileName(1)); - return new FileBreadcrumbsStorage(file1, file2, fileSystem, maximumBreadcrumbs); + public static factory(session: SessionFiles, fileSystem: NodeFileSystem): BreadcrumbsStorageFactory { + return ({ limits }) => { + const file1 = session.getFileName(this.getFileName(0)); + const file2 = session.getFileName(this.getFileName(1)); + return new FileBreadcrumbsStorage(file1, file2, fileSystem, limits); + }; } - public getAttachments(): BacktraceAttachment[] { + public getAttachments(): [BacktraceAttachment, BacktraceAttachment] { return [ - new BacktraceFileAttachment(this._mainFile, 'bt-breadcrumbs-0'), - new BacktraceFileAttachment(this._fallbackFile, 'bt-breadcrumbs-1'), + new BacktraceFileAttachment(this._mainFile, 'bt-breadcrumbs-0', this._fileSystem), + new BacktraceFileAttachment(this._fallbackFile, 'bt-breadcrumbs-1', this._fileSystem), ]; } @@ -88,6 +94,14 @@ export class FileBreadcrumbsStorage implements BreadcrumbsStorage { }; const breadcrumbJson = JSON.stringify(breadcrumb, jsonEscaper()); + const jsonLength = breadcrumbJson.length + 1; // newline + const sizeLimit = this._limits.maximumTotalBreadcrumbsSize; + if (sizeLimit !== undefined) { + if (jsonLength > sizeLimit) { + return id; + } + } + this._writer.writeLine(breadcrumbJson); return id; diff --git a/packages/node/tests/breadcrumbs/FileBreadcrumbsStorage.spec.ts b/packages/node/tests/breadcrumbs/FileBreadcrumbsStorage.spec.ts new file mode 100644 index 00000000..68edc451 --- /dev/null +++ b/packages/node/tests/breadcrumbs/FileBreadcrumbsStorage.spec.ts @@ -0,0 +1,326 @@ +import { Breadcrumb, BreadcrumbLogLevel, BreadcrumbType, RawBreadcrumb } from '@backtrace/sdk-core'; +import assert from 'assert'; +import { Readable } from 'stream'; +import { promisify } from 'util'; +import { FileBreadcrumbsStorage } from '../../src/breadcrumbs/FileBreadcrumbsStorage'; +import { mockStreamFileSystem } from '../_mocks/fileSystem'; + +async function readToEnd(readable: Readable) { + return new Promise((resolve, reject) => { + const chunks: Buffer[] = []; + + readable.on('error', reject); + readable.on('data', (chunk) => chunks.push(chunk)); + readable.on('end', () => resolve(Buffer.concat(chunks))); + }); +} + +async function loadBreadcrumbs(readable: Readable): Promise { + return (await readToEnd(readable)) + .toString('utf-8') + .split('\n') + .filter((n) => !!n) + .map((x) => { + try { + return JSON.parse(x); + } catch (err) { + throw new Error(`failed to parse "${x}": ${err}`); + } + }); +} + +const nextTick = promisify(process.nextTick); + +describe('FileBreadcrumbsStorage', () => { + it('should return added breadcrumbs', async () => { + const fs = mockStreamFileSystem(); + + const breadcrumbs: RawBreadcrumb[] = [ + { + level: BreadcrumbLogLevel.Info, + message: 'a', + type: BreadcrumbType.Manual, + attributes: { + foo: 'bar', + }, + }, + { + level: BreadcrumbLogLevel.Debug, + message: 'b', + type: BreadcrumbType.Http, + }, + { + level: BreadcrumbLogLevel.Warning, + message: 'c', + type: BreadcrumbType.Navigation, + attributes: {}, + }, + ]; + + const expectedMain: Breadcrumb[] = [ + { + id: expect.any(Number), + level: 'info', + message: 'a', + timestamp: expect.any(Number), + type: 'manual', + attributes: { + foo: 'bar', + }, + }, + { + id: expect.any(Number), + level: 'debug', + message: 'b', + timestamp: expect.any(Number), + type: 'http', + }, + { + id: expect.any(Number), + level: 'warning', + message: 'c', + timestamp: expect.any(Number), + type: 'navigation', + attributes: {}, + }, + ]; + + const storage = new FileBreadcrumbsStorage('breadcrumbs-1', 'breadcrumbs-2', fs, { + maximumBreadcrumbs: 100, + }); + + for (const breadcrumb of breadcrumbs) { + storage.add(breadcrumb); + } + + // FileBreadcrumbsStorage is asynchronous in nature + await nextTick(); + + const [mainAttachment] = storage.getAttachments(); + + const mainStream = mainAttachment.get(); + assert(mainStream); + + const actualMain = await loadBreadcrumbs(mainStream); + expect(actualMain).toEqual(expectedMain); + }); + + it('should return added breadcrumbs in two attachments', async () => { + const fs = mockStreamFileSystem(); + + const breadcrumbs: RawBreadcrumb[] = [ + { + level: BreadcrumbLogLevel.Info, + message: 'a', + type: BreadcrumbType.Manual, + attributes: { + foo: 'bar', + }, + }, + { + level: BreadcrumbLogLevel.Debug, + message: 'b', + type: BreadcrumbType.Http, + }, + { + level: BreadcrumbLogLevel.Warning, + message: 'c', + type: BreadcrumbType.Navigation, + attributes: {}, + }, + ]; + + const expectedMain: Breadcrumb[] = [ + { + id: expect.any(Number), + level: 'warning', + message: 'c', + timestamp: expect.any(Number), + type: 'navigation', + attributes: {}, + }, + ]; + + const expectedFallback: Breadcrumb[] = [ + { + id: expect.any(Number), + level: 'info', + message: 'a', + timestamp: expect.any(Number), + type: 'manual', + attributes: { + foo: 'bar', + }, + }, + { + id: expect.any(Number), + level: 'debug', + message: 'b', + timestamp: expect.any(Number), + type: 'http', + }, + ]; + + const storage = new FileBreadcrumbsStorage('breadcrumbs-1', 'breadcrumbs-2', fs, { + maximumBreadcrumbs: 4, + }); + + for (const breadcrumb of breadcrumbs) { + storage.add(breadcrumb); + await nextTick(); + } + + // FileBreadcrumbsStorage is asynchronous in nature + await nextTick(); + + const [mainAttachment, fallbackAttachment] = storage.getAttachments(); + + const mainStream = mainAttachment.get(); + const fallbackStream = fallbackAttachment.get(); + assert(mainStream); + assert(fallbackStream); + + const actualMain = await loadBreadcrumbs(mainStream); + const actualFallback = await loadBreadcrumbs(fallbackStream); + expect(actualMain).toEqual(expectedMain); + expect(actualFallback).toEqual(expectedFallback); + }); + + it('should return no more than maximumBreadcrumbs breadcrumbs', async () => { + const fs = mockStreamFileSystem(); + + const breadcrumbs: RawBreadcrumb[] = [ + { + level: BreadcrumbLogLevel.Info, + message: 'a', + type: BreadcrumbType.Manual, + attributes: { + foo: 'bar', + }, + }, + { + level: BreadcrumbLogLevel.Debug, + message: 'b', + type: BreadcrumbType.Http, + }, + { + level: BreadcrumbLogLevel.Warning, + message: 'c', + type: BreadcrumbType.Navigation, + attributes: {}, + }, + ]; + + const expectedMain: Breadcrumb[] = [ + { + id: expect.any(Number), + level: 'warning', + message: 'c', + timestamp: expect.any(Number), + type: 'navigation', + attributes: {}, + }, + ]; + + const expectedFallback: Breadcrumb[] = [ + { + id: expect.any(Number), + level: 'debug', + message: 'b', + timestamp: expect.any(Number), + type: 'http', + }, + ]; + + const storage = new FileBreadcrumbsStorage('breadcrumbs-1', 'breadcrumbs-2', fs, { + maximumBreadcrumbs: 2, + }); + + for (const breadcrumb of breadcrumbs) { + storage.add(breadcrumb); + await nextTick(); + } + + // FileBreadcrumbsStorage is asynchronous in nature + await nextTick(); + + const [mainAttachment, fallbackAttachment] = storage.getAttachments(); + + const mainStream = mainAttachment.get(); + const fallbackStream = fallbackAttachment.get(); + assert(mainStream); + assert(fallbackStream); + + const actualMain = await loadBreadcrumbs(mainStream); + const actualFallback = await loadBreadcrumbs(fallbackStream); + expect(actualMain).toEqual(expectedMain); + expect(actualFallback).toEqual(expectedFallback); + }); + + it('should return breadcrumbs up to the json size', async () => { + const fs = mockStreamFileSystem(); + + const breadcrumbs: RawBreadcrumb[] = [ + { + level: BreadcrumbLogLevel.Debug, + message: 'a', + type: BreadcrumbType.Http, + }, + { + level: BreadcrumbLogLevel.Debug, + message: 'b', + type: BreadcrumbType.Http, + }, + { + level: BreadcrumbLogLevel.Debug, + message: 'c', + type: BreadcrumbType.Http, + }, + ]; + + const expectedMain: Breadcrumb[] = [ + { + id: expect.any(Number), + level: 'debug', + message: 'c', + timestamp: expect.any(Number), + type: 'http', + }, + ]; + + const expectedFallback: Breadcrumb[] = [ + { + id: expect.any(Number), + level: 'debug', + message: 'b', + timestamp: expect.any(Number), + type: 'http', + }, + ]; + + const storage = new FileBreadcrumbsStorage('breadcrumbs-1', 'breadcrumbs-2', fs, { + maximumBreadcrumbs: 100, + maximumBreadcrumbsSize: JSON.stringify(expectedMain[0]).length + 10, + }); + + for (const breadcrumb of breadcrumbs) { + storage.add(breadcrumb); + await nextTick(); + } + + // FileBreadcrumbsStorage is asynchronous in nature + await nextTick(); + + const [mainAttachment, fallbackAttachment] = storage.getAttachments(); + + const mainStream = mainAttachment.get(); + const fallbackStream = fallbackAttachment.get(); + assert(mainStream); + assert(fallbackStream); + + const actualMain = await loadBreadcrumbs(mainStream); + const actualFallback = await loadBreadcrumbs(fallbackStream); + expect(actualMain).toEqual(expectedMain); + expect(actualFallback).toEqual(expectedFallback); + }); +}); From eeabf9a720d2bdecab542087a62d086127c3e560 Mon Sep 17 00:00:00 2001 From: Sebastian Alex Date: Tue, 5 Mar 2024 15:46:57 +0000 Subject: [PATCH 05/14] node: allow for unlimited breadcrumbs in FileBreadcrumbsStorage --- packages/node/src/breadcrumbs/FileBreadcrumbsStorage.ts | 2 +- packages/node/src/common/AlternatingFileWriter.ts | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/packages/node/src/breadcrumbs/FileBreadcrumbsStorage.ts b/packages/node/src/breadcrumbs/FileBreadcrumbsStorage.ts index d3c75edb..a6b19997 100644 --- a/packages/node/src/breadcrumbs/FileBreadcrumbsStorage.ts +++ b/packages/node/src/breadcrumbs/FileBreadcrumbsStorage.ts @@ -39,7 +39,7 @@ export class FileBreadcrumbsStorage implements BreadcrumbsStorage { _fileSystem, _mainFile, _fallbackFile, - Math.floor((this._limits.maximumBreadcrumbs ?? 100) / 2), + this._limits.maximumBreadcrumbs ? Math.floor(this._limits.maximumBreadcrumbs / 2) : undefined, this._limits.maximumTotalBreadcrumbsSize, ); } diff --git a/packages/node/src/common/AlternatingFileWriter.ts b/packages/node/src/common/AlternatingFileWriter.ts index eebebccd..bb62c239 100644 --- a/packages/node/src/common/AlternatingFileWriter.ts +++ b/packages/node/src/common/AlternatingFileWriter.ts @@ -14,7 +14,7 @@ export class AlternatingFileWriter { private readonly _fileSystem: NodeFileSystem, private readonly _mainFile: string, private readonly _fallbackFile: string, - private readonly _maxLines: number, + private readonly _maxLines?: number, private readonly _maxSize?: number, ) {} @@ -64,11 +64,11 @@ export class AlternatingFileWriter { const logLength = log.length + 1; - if (currentCount + 1 > this._maxLines) { + if (currentCount + 1 > (this._maxLines ?? Infinity)) { break; } - if (this._maxSize && currentSize + logLength >= this._maxSize) { + if (currentSize + logLength >= (this._maxSize ?? Infinity)) { break; } @@ -106,7 +106,7 @@ export class AlternatingFileWriter { private prepareBreadcrumbStream(newSize: number) { if (!this._fileStream) { this._fileStream = this.safeCreateStream(this._mainFile); - } else if (this._count >= this._maxLines || (this._maxSize && this._size + newSize >= this._maxSize)) { + } else if (this._count >= (this._maxLines ?? Infinity) || this._size + newSize >= (this._maxSize ?? Infinity)) { this.switchFile(); } } From 0cd738f9a3dd98f65fa0749970310a5139d6d68e Mon Sep 17 00:00:00 2001 From: Sebastian Alex Date: Mon, 22 Jul 2024 13:37:22 +0200 Subject: [PATCH 06/14] node: add stream chunkifier --- packages/node/src/storage/FsNodeFileSystem.ts | 5 +- .../src/storage/interfaces/NodeFileSystem.ts | 6 +- packages/node/src/streams/chunkifier.ts | 121 +++++++++ .../node/src/streams/combinedChunkSplitter.ts | 30 +++ packages/node/src/streams/fileChunkSink.ts | 101 ++++++++ .../node/src/streams/lengthChunkSplitter.ts | 51 ++++ .../node/src/streams/lineChunkSplitter.ts | 36 +++ .../node/tests/_helpers/blackholeChunkSink.ts | 12 + packages/node/tests/_helpers/chunks.ts | 29 +++ packages/node/tests/_helpers/events.ts | 18 ++ packages/node/tests/_helpers/generators.ts | 94 +++++++ .../node/tests/_helpers/memoryChunkSink.ts | 24 ++ packages/node/tests/_mocks/fileSystem.ts | 9 +- .../node/tests/streams/chunkifier.spec.ts | 233 ++++++++++++++++++ .../node/tests/streams/fileChunkSink.spec.ts | 60 +++++ .../tests/streams/lengthChunkSplitter.spec.ts | 87 +++++++ .../tests/streams/lineChunkSplitter.spec.ts | 80 ++++++ 17 files changed, 988 insertions(+), 8 deletions(-) create mode 100644 packages/node/src/streams/chunkifier.ts create mode 100644 packages/node/src/streams/combinedChunkSplitter.ts create mode 100644 packages/node/src/streams/fileChunkSink.ts create mode 100644 packages/node/src/streams/lengthChunkSplitter.ts create mode 100644 packages/node/src/streams/lineChunkSplitter.ts create mode 100644 packages/node/tests/_helpers/blackholeChunkSink.ts create mode 100644 packages/node/tests/_helpers/chunks.ts create mode 100644 packages/node/tests/_helpers/events.ts create mode 100644 packages/node/tests/_helpers/generators.ts create mode 100644 packages/node/tests/_helpers/memoryChunkSink.ts create mode 100644 packages/node/tests/streams/chunkifier.spec.ts create mode 100644 packages/node/tests/streams/fileChunkSink.spec.ts create mode 100644 packages/node/tests/streams/lengthChunkSplitter.spec.ts create mode 100644 packages/node/tests/streams/lineChunkSplitter.spec.ts diff --git a/packages/node/src/storage/FsNodeFileSystem.ts b/packages/node/src/storage/FsNodeFileSystem.ts index b1f05e0b..56772f00 100644 --- a/packages/node/src/storage/FsNodeFileSystem.ts +++ b/packages/node/src/storage/FsNodeFileSystem.ts @@ -1,6 +1,5 @@ import { BacktraceAttachment } from '@backtrace/sdk-core'; import fs from 'fs'; -import { Readable, Writable } from 'stream'; import { BacktraceFileAttachment } from '../attachment/index.js'; import { NodeFileSystem } from './interfaces/NodeFileSystem.js'; @@ -53,11 +52,11 @@ export class FsNodeFileSystem implements NodeFileSystem { fs.renameSync(oldPath, newPath); } - public createWriteStream(path: string): Writable { + public createWriteStream(path: string): fs.WriteStream { return fs.createWriteStream(path, 'utf-8'); } - public createReadStream(path: string): Readable { + public createReadStream(path: string): fs.ReadStream { return fs.createReadStream(path, 'utf-8'); } diff --git a/packages/node/src/storage/interfaces/NodeFileSystem.ts b/packages/node/src/storage/interfaces/NodeFileSystem.ts index 109a1bd1..75b1db66 100644 --- a/packages/node/src/storage/interfaces/NodeFileSystem.ts +++ b/packages/node/src/storage/interfaces/NodeFileSystem.ts @@ -1,9 +1,9 @@ import { FileSystem } from '@backtrace/sdk-core'; -import { Readable, Writable } from 'stream'; +import { ReadStream, WriteStream } from 'fs'; export interface NodeFileSystem extends FileSystem { - createReadStream(path: string): Readable; - createWriteStream(path: string): Writable; + createReadStream(path: string): ReadStream; + createWriteStream(path: string): WriteStream; rename(oldPath: string, newPath: string): Promise; renameSync(oldPath: string, newPath: string): void; } diff --git a/packages/node/src/streams/chunkifier.ts b/packages/node/src/streams/chunkifier.ts new file mode 100644 index 00000000..521bd920 --- /dev/null +++ b/packages/node/src/streams/chunkifier.ts @@ -0,0 +1,121 @@ +import { EventEmitter, Writable, WritableOptions } from 'stream'; + +export type ChunkSplitterFactory = () => (chunk: Buffer, encoding: BufferEncoding) => [Buffer, Buffer?]; + +/** + * Implementation of splitter should return either one or two `Buffer`s. + * + * The first `Buffer` will be written to the current chunk. + * If the second `Buffer` is returned, `chunkifier` will create a new chunk and write the + * second buffer to the new chunk. + */ +export type ChunkSplitter = (chunk: Buffer, encoding: BufferEncoding) => [Buffer, Buffer?]; + +/** + * Implementation of chunk sink should return each time a new writable stream. + * + * `n` determines which stream it is in sequence. + */ +export type ChunkSink = (n: number) => S; + +export interface ChunkifierOptions extends WritableOptions { + /** + * Chunk splitter factory. The factory will be called when creating a new chunk. + */ + readonly splitter: ChunkSplitterFactory; + + /** + * Chunk sink. The sink will be called when creating a new chunk. + */ + readonly sink: ChunkSink; + + readonly allowEmptyChunks?: boolean; +} + +interface StreamContext { + readonly stream: Writable; + readonly disposeEvents: () => void; + isEmptyChunk: boolean; +} + +/** + * Splits incoming data into chunks, writing them to the sink. + */ +export function chunkifier({ sink: streamFactory, ...options }: ChunkifierOptions) { + let chunkCount = 0; + + function createStreamContext(): StreamContext { + const stream = streamFactory(chunkCount++); + + // We need to forward the 'drain' event, in case the sink stream resumes writing. + const disposeEvents = forwardEvents(stream, writable, 'drain'); + + return { stream, disposeEvents, isEmptyChunk: true }; + } + + let context: StreamContext | undefined; + let splitter: ChunkSplitter | undefined; + + const writable = new Writable({ + ...options, + write(data: Buffer, encoding, callback) { + // If data is empty from the start, forward the write directly to current stream + if (!data.length) { + return (context ??= createStreamContext()).stream.write(data, encoding, callback); + } + + while (data) { + if (!data.length) { + break; + } + + splitter ??= options.splitter(); + const [currentChunk, nextChunk] = splitter(data, encoding); + if (!nextChunk) { + const current = (context ??= createStreamContext()); + if (currentChunk.length) { + current.isEmptyChunk = false; + } + + return current.stream.write(currentChunk, encoding, callback); + } + + data = nextChunk; + if (context ? context.isEmptyChunk : !currentChunk.length && !options.allowEmptyChunks) { + continue; + } + + const current = (context ??= createStreamContext()); + current.disposeEvents(); + current.stream.write(currentChunk, encoding, (err) => { + current.stream.destroy(err ?? undefined); + }); + + // On next loop iteration, or write, create new stream again + context = undefined; + splitter = undefined; + } + callback(); + return true; + }, + }); + return writable; +} + +function forwardEvents(from: E, to: E, ...events: string[]) { + const fwd = + (event: string) => + (...args: any[]) => + to.emit(event as string, ...args, to); + + const forwards: [string, ReturnType][] = []; + for (const event of events) { + const fn = fwd(event); + from.on(event, fn); + forwards.push([event, fn]); + } + + // Return a dispose function - when called, event callbacks will be detached + const off = () => forwards.forEach(([event, fn]) => from.off(event, fn)); + return off; +} diff --git a/packages/node/src/streams/combinedChunkSplitter.ts b/packages/node/src/streams/combinedChunkSplitter.ts new file mode 100644 index 00000000..5f818e3b --- /dev/null +++ b/packages/node/src/streams/combinedChunkSplitter.ts @@ -0,0 +1,30 @@ +import { ChunkSplitter } from './chunkifier'; + +/** + * Combines several splitters into one. + * + * Each splitter is checked, in order that they are passed. + * Splitters receive always the first chunk. + * + * If more than one splitter returns splitted chunks, the second + * chunks are concatenated and treated as one chunk. + * @param splitters + * @returns + */ +export function combinedChunkSplitter(...splitters: ChunkSplitter[]): ChunkSplitter { + return (chunk, encoding) => { + const rest: Buffer[] = []; + + for (const splitter of splitters) { + const [c1, c2] = splitter(chunk, encoding); + chunk = c1; + if (c2) { + // Prepend second chunk to the rest + rest.unshift(c2); + } + } + + // If any chunks are in rest, concatenate them and pass as the second chunk + return [chunk, rest.length ? Buffer.concat(rest) : undefined]; + }; +} diff --git a/packages/node/src/streams/fileChunkSink.ts b/packages/node/src/streams/fileChunkSink.ts new file mode 100644 index 00000000..f9c724b7 --- /dev/null +++ b/packages/node/src/streams/fileChunkSink.ts @@ -0,0 +1,101 @@ +import EventEmitter from 'events'; +import fs from 'fs'; +import { NodeFileSystem } from '../storage/interfaces/NodeFileSystem'; +import { ChunkSink } from './chunkifier'; + +interface FileChunkSinkOptions { + /** + * Maximum number of files. + */ + readonly maxFiles: number; + + /** + * Full path to the chunk file. + */ + readonly file: (n: number) => string; + + /** + * File system implementation to use. + */ + readonly fs?: NodeFileSystem; +} + +/** + * Chunk sink which writes data to disk. + * + * Each time a new chunk is created, a new stream is created with path provided from options. + */ +export class FileChunkSink extends EventEmitter { + private readonly _streamTracker: LimitedFifo; + + /** + * Returns all files that have been written to and are not deleted. + */ + public get files() { + return this._streamTracker.elements; + } + + constructor(private readonly _options: FileChunkSinkOptions) { + super(); + + // Track files using a FIFO queue + this._streamTracker = limitedFifo(_options.maxFiles, (file) => { + // On file removal, emit delete or delete the file + // If file is not yet destroyed (pending writes), wait on 'close' + if (file.destroyed) { + this.emitDeleteOrDelete(file); + } else { + file.once('close', () => this.emitDeleteOrDelete(file)); + } + }); + } + + /** + * Returns `ChunkSink`. Pass this to `chunkifier`. + */ + public getSink(): ChunkSink { + return (n) => { + const stream = this.createStream(n); + this._streamTracker.push(stream); + this.emit('create', stream); + return stream; + }; + } + + private createStream(n: number) { + const path = this._options.file(n); + return (this._options.fs ?? fs).createWriteStream(path); + } + + private emitDeleteOrDelete(file: fs.WriteStream) { + // If 'delete' event is not handled, delete the file + if (!this.emit('delete', file)) { + (this._options.fs ?? fs).unlink(file.path.toString('utf-8'), () => { + // Do nothing on error + }); + } + } +} + +/** + * Limited FIFO queue. Each time the capacity is exceeded, the first element is removed + * and `onShift` is called with the removed element. + * @param capacity Maximum capacity. + */ +function limitedFifo(capacity: number, onShift: (t: T) => void) { + const elements: T[] = []; + + function push(element: T) { + elements.push(element); + if (elements.length > capacity) { + const first = elements.shift(); + if (first) { + onShift(first); + } + } + } + + return { elements: elements as readonly T[], push }; +} + +type LimitedFifo = ReturnType>; diff --git a/packages/node/src/streams/lengthChunkSplitter.ts b/packages/node/src/streams/lengthChunkSplitter.ts new file mode 100644 index 00000000..a39fca62 --- /dev/null +++ b/packages/node/src/streams/lengthChunkSplitter.ts @@ -0,0 +1,51 @@ +import { ChunkSplitter } from './chunkifier'; + +/** + * Splits data into chunks with maximum length. + * @param maxLength Maximum length of one chunk. + * @param wholeLines If `true`, will split chunks before newlines, so whole lines are passed to the chunk. + */ +export function lengthChunkSplitter(maxLength: number, wholeLines: 'skip' | 'break' | false = false): ChunkSplitter { + let seen = 0; + + const emptyBuffer = Buffer.of(); + + return function lengthChunkSplitter(data) { + const remainingLength = maxLength - seen; + if (data.length <= remainingLength) { + seen += data.length; + return [data]; + } + + seen = 0; + if (!wholeLines) { + return [data.subarray(0, remainingLength), data.subarray(remainingLength)]; + } + + // Check last newline before first chunk end + const lastLineIndex = data.subarray(0, remainingLength).lastIndexOf('\n'); + + // If there is no newline, pass empty buffer as the first chunk + // and write all data into the second + if (lastLineIndex === -1) { + if (remainingLength !== maxLength) { + return [emptyBuffer, data]; + } + + if (wholeLines === 'break') { + // Break the line into two chunks + return [data.subarray(0, remainingLength), data.subarray(remainingLength)]; + } else { + const firstNewLine = data.indexOf('\n', remainingLength); + if (firstNewLine === -1) { + return [emptyBuffer]; + } + + return [emptyBuffer, data.subarray(firstNewLine + 1)]; + } + } + + // +1 - include trailing newline in first chunk, skip in second + return [data.subarray(0, lastLineIndex + 1), data.subarray(lastLineIndex + 1)]; + }; +} diff --git a/packages/node/src/streams/lineChunkSplitter.ts b/packages/node/src/streams/lineChunkSplitter.ts new file mode 100644 index 00000000..711a9514 --- /dev/null +++ b/packages/node/src/streams/lineChunkSplitter.ts @@ -0,0 +1,36 @@ +import { ChunkSplitter } from './chunkifier'; + +/** + * Splits data into chunks with maximum lines. + * @param maxLines Maximum lines in one chunk. + */ +export function lineChunkSplitter(maxLines: number): ChunkSplitter { + let seen = 0; + + function findNthLine(data: Buffer, remaining: number): [number, number] { + let lastIndex = -1; + let count = 0; + while (true) { + lastIndex = data.indexOf('\n', lastIndex + 1); + if (lastIndex === -1) { + return [-1, count]; + } + + if (remaining === ++count) { + return [lastIndex + 1, count]; + } + } + } + + return function lineChunkSplitter(data) { + const remainingLines = maxLines - seen; + const [index, count] = findNthLine(data, remainingLines); + if (index === -1) { + seen += count; + return [data]; + } + + seen = 0; + return [data.subarray(0, index), data.subarray(index)]; + }; +} diff --git a/packages/node/tests/_helpers/blackholeChunkSink.ts b/packages/node/tests/_helpers/blackholeChunkSink.ts new file mode 100644 index 00000000..dee01f05 --- /dev/null +++ b/packages/node/tests/_helpers/blackholeChunkSink.ts @@ -0,0 +1,12 @@ +import { Writable } from 'stream'; +import { ChunkSink } from '../../src/streams/chunkifier'; + +export function blackholeChunkSink(): ChunkSink { + return () => { + return new Writable({ + write(chunk, encoding, callback) { + callback(); + }, + }); + }; +} diff --git a/packages/node/tests/_helpers/chunks.ts b/packages/node/tests/_helpers/chunks.ts new file mode 100644 index 00000000..e2c7f903 --- /dev/null +++ b/packages/node/tests/_helpers/chunks.ts @@ -0,0 +1,29 @@ +import { Readable } from 'stream'; +import { ChunkSplitter } from '../../src/streams/chunkifier'; + +export async function splitToEnd(readable: Readable, splitter: ChunkSplitter) { + const results: Buffer[][] = [[]]; + + for await (let chunk of readable) { + while (chunk) { + const [c1, c2] = splitter(chunk, readable.readableEncoding ?? 'utf-8'); + results[results.length - 1].push(c1); + if (c2) { + chunk = c2; + results.push([]); + } else { + break; + } + } + } + + return results.map((b) => Buffer.concat(b)); +} + +export function* chunkify(chunk: Buffer, length: number) { + let i = 0; + do { + yield chunk.subarray(i * length, (i + 1) * length); + i++; + } while (i * length < chunk.length); +} diff --git a/packages/node/tests/_helpers/events.ts b/packages/node/tests/_helpers/events.ts new file mode 100644 index 00000000..a903a4f0 --- /dev/null +++ b/packages/node/tests/_helpers/events.ts @@ -0,0 +1,18 @@ +import { EventEmitter } from 'stream'; + +export function forwardEvents(from: E, to: E, ...events: string[]) { + const fwd = + (event: string) => + (...args: any[]) => + to.emit(event as string, ...args, to); + + const forwards: [string, ReturnType][] = []; + for (const event of events) { + const fn = fwd(event); + from.on(event, fn); + forwards.push([event, fn]); + } + + const off = () => forwards.forEach(([event, fn]) => from.off(event, fn)); + return off; +} diff --git a/packages/node/tests/_helpers/generators.ts b/packages/node/tests/_helpers/generators.ts new file mode 100644 index 00000000..48e04e31 --- /dev/null +++ b/packages/node/tests/_helpers/generators.ts @@ -0,0 +1,94 @@ +import crypto from 'crypto'; +import readline from 'readline'; +import { Readable, Transform } from 'stream'; + +export function randomLines(minLineLength: number, maxLineLength: number) { + const str = randomString(); + + return new Readable({ + read(size) { + let buffer = ''; + while (buffer.length < size) { + const lineLength = Math.floor(Math.random() * (maxLineLength - minLineLength)) + minLineLength; + const line = str.read(lineLength) + '\n'; + buffer += line; + } + this.push(buffer.substring(0, size)); + }, + }); +} + +export function lines(minLineLength: number, maxLineLength: number) { + let counter = 0; + + return new Readable({ + read(size) { + let buffer = ''; + while (buffer.length < size) { + const lineLength = Math.floor(Math.random() * (maxLineLength - minLineLength)) + minLineLength; + const line = counter + '_'.repeat(lineLength - 2) + counter + '\n'; + buffer += line; + counter = (counter + 1) % 10; + } + this.push(buffer.substring(0, size)); + }, + }); +} + +export function limit(count: number) { + let seen = 0; + + return new Transform({ + transform(chunk: Buffer, _, callback) { + const remaining = count - seen; + if (remaining <= 0) { + this.push(null); + return; + } + seen += chunk.length; + this.push(chunk.subarray(0, remaining)); + callback(); + }, + }); +} + +export function limitLines(count: number) { + let seen = 0; + + return new Transform({ + transform(chunk: Buffer, encoding) { + const remaining = count - seen; + if (chunk.length >= remaining) { + this.push(null); + return; + } + seen += chunk.length; + this.push(chunk.subarray(0, remaining)); + }, + }); +} + +export async function readLines(readable: Readable, lines: number) { + const rl = readline.createInterface(readable); + const result: string[] = []; + + let count = 0; + for await (const line of rl) { + result.push(line); + if (++count === lines) { + break; + } + } + + rl.close(); + return result; +} + +export function randomString() { + const a = 'A'.charCodeAt(0); + return new Readable({ + read(size) { + this.push(crypto.randomBytes(size).map((c) => (c % 26) + a)); + }, + }); +} diff --git a/packages/node/tests/_helpers/memoryChunkSink.ts b/packages/node/tests/_helpers/memoryChunkSink.ts new file mode 100644 index 00000000..75623d3b --- /dev/null +++ b/packages/node/tests/_helpers/memoryChunkSink.ts @@ -0,0 +1,24 @@ +import { Writable } from 'stream'; +import { ChunkSink } from '../../src/streams/chunkifier'; + +export function memoryChunkSink() { + const results: Buffer[][] = []; + + const sink: ChunkSink = () => { + let index = results.length; + results.push([]); + + return new Writable({ + write(chunk, encoding, callback) { + results[index].push(chunk); + callback(); + }, + }); + }; + + const getResults = () => { + return results.map((chunks) => Buffer.concat(chunks)); + }; + + return { sink, getResults }; +} diff --git a/packages/node/tests/_mocks/fileSystem.ts b/packages/node/tests/_mocks/fileSystem.ts index f2413645..e3c175bf 100644 --- a/packages/node/tests/_mocks/fileSystem.ts +++ b/packages/node/tests/_mocks/fileSystem.ts @@ -1,4 +1,5 @@ import { MockedFileSystem, mockFileSystem } from '@backtrace/sdk-core/tests/_mocks/fileSystem.js'; +import { ReadStream, WriteStream } from 'fs'; import path from 'path'; import { Readable, Writable } from 'stream'; import { NodeFileSystem } from '../../src/storage/interfaces/NodeFileSystem.js'; @@ -22,7 +23,7 @@ export function mockStreamFileSystem(files?: Record): MockedFile }), createWriteStream: jest.fn().mockImplementation((p: string) => { - return new Writable({ + const writable = new Writable({ write(chunk, encoding, callback) { const str = Buffer.isBuffer(chunk) ? chunk.toString('utf-8') @@ -40,6 +41,8 @@ export function mockStreamFileSystem(files?: Record): MockedFile callback && callback(); }, }); + (writable as WriteStream).path = p; + return writable; }), createReadStream: jest.fn().mockImplementation((p: string) => { @@ -50,7 +53,7 @@ export function mockStreamFileSystem(files?: Record): MockedFile } let position = 0; - return new Readable({ + const readable = new Readable({ read(size) { const chunk = file.substring(position, position + size); if (!chunk) { @@ -61,6 +64,8 @@ export function mockStreamFileSystem(files?: Record): MockedFile } }, }); + (readable as ReadStream).path = p; + return readable; }), }; } diff --git a/packages/node/tests/streams/chunkifier.spec.ts b/packages/node/tests/streams/chunkifier.spec.ts new file mode 100644 index 00000000..4a04aa1e --- /dev/null +++ b/packages/node/tests/streams/chunkifier.spec.ts @@ -0,0 +1,233 @@ +import { Readable, Writable } from 'stream'; +import { chunkifier, ChunkSplitter } from '../../src/streams/chunkifier'; +import { blackholeChunkSink } from '../_helpers/blackholeChunkSink'; +import { splitToEnd } from '../_helpers/chunks'; +import { limit, randomString } from '../_helpers/generators'; +import { memoryChunkSink } from '../_helpers/memoryChunkSink'; + +function charSplitter(char: string): ChunkSplitter { + return (chunk) => { + const index = chunk.indexOf(char); + if (index === -1) { + return [chunk]; + } + return [chunk.subarray(0, index), chunk.subarray(index + 1)]; + }; +} + +function noopSplitter(): ChunkSplitter { + return (c: Buffer) => [c]; +} + +describe('chunkifier', () => { + it('should call splitter function with every chunk', (done) => { + const data = randomString().pipe(limit(16384 * 10)); + const splitter = jest.fn(noopSplitter()); + + const instance = chunkifier({ + sink: blackholeChunkSink(), + splitter: () => splitter, + }); + + instance.on('finish', () => { + expect(splitter).toHaveBeenCalledTimes(10); + done(); + }); + + data.pipe(instance); + }); + + it('should call splitter factory with every new chunk', (done) => { + const data = randomString().pipe(limit(500)); + + const splitCount = 10; + let split = 0; + const splitterFactory = jest.fn( + (): ChunkSplitter => (chunk) => { + if (split < splitCount) { + split++; + return [chunk.subarray(0, 10), chunk.subarray(10)]; + } + return [chunk]; + }, + ); + + const instance = chunkifier({ + sink: blackholeChunkSink(), + splitter: splitterFactory, + }); + + instance.on('finish', () => { + expect(splitterFactory).toHaveBeenCalledTimes(splitCount + 1); + done(); + }); + + data.pipe(instance); + }); + + it('should not call sink on chunkifier creation', () => { + const splitter = noopSplitter(); + const sink = jest.fn(blackholeChunkSink()); + + chunkifier({ + sink, + splitter: () => splitter, + }); + + expect(sink).not.toHaveBeenCalled(); + }); + + it('should call sink on chunk split', (done) => { + const data = Readable.from('bAb'); + const splitter = charSplitter('A'); + const sink = jest.fn(blackholeChunkSink()); + + const instance = chunkifier({ + sink, + splitter: () => splitter, + }); + + data.pipe(instance).on('finish', () => { + expect(sink).toHaveBeenCalledTimes(2); + done(); + }); + }); + + it('should forward drain event from sink stream', (done) => { + const stream = new Writable({ + write(chunk, encoding, callback) { + callback(); + }, + }); + + const instance = chunkifier({ + sink: () => stream, + splitter: noopSplitter, + }); + + instance.on('drain', () => { + done(); + }); + + // make sure to write something to ensure that the sink has been used + Readable.from('a') + .pipe(instance) + .on('finish', () => { + stream.emit('drain'); + }); + }); + + it('should split data into chunks', async () => { + const dataStr = randomString().read(5000); + const data = Readable.from(dataStr); + const splitter = charSplitter(dataStr[3]); + const { sink, getResults } = memoryChunkSink(); + + const expected = await splitToEnd(Readable.from(dataStr), splitter); + + const instance = chunkifier({ + sink, + splitter: () => splitter, + allowEmptyChunks: true, + }); + + await new Promise((resolve, reject) => { + data.pipe(instance) + .on('finish', () => { + try { + const results = getResults(); + expect(results).toEqual(expected); + } catch (err) { + reject(err); + } + resolve(); + }) + .on('error', reject); + }); + }); + + it('should split data into non-empty chunks', async () => { + const dataStr = randomString().read(5000); + const data = Readable.from(dataStr); + const splitter = charSplitter(dataStr[3]); + const { sink, getResults } = memoryChunkSink(); + + const expected = (await splitToEnd(Readable.from(dataStr), splitter)).filter((b) => b.length); + + const instance = chunkifier({ + sink, + splitter: () => splitter, + allowEmptyChunks: false, + }); + + await new Promise((resolve, reject) => { + data.pipe(instance) + .on('finish', () => { + try { + const results = getResults(); + expect(results).toEqual(expected); + } catch (err) { + reject(err); + } + resolve(); + }) + .on('error', reject); + }); + }); + + it('should create empty chunks if allowEmptyChunks is true', async () => { + const testData = 'abcaaaabcdaaa'; + const splitter = charSplitter('a'); + const expected = ['', 'bc', '', '', '', 'bcd', '', '']; + const { sink, getResults } = memoryChunkSink(); + + const instance = chunkifier({ + sink, + splitter: () => splitter, + allowEmptyChunks: true, + }); + + await new Promise((resolve, reject) => { + Readable.from(testData) + .pipe(instance) + .on('finish', () => { + try { + const results = getResults().map((b) => b.toString('utf-8')); + expect(results).toEqual(expected); + } catch (err) { + reject(err); + } + resolve(); + }) + .on('error', reject); + }); + }); + + it('should not create empty chunks if allowEmptyChunks is false', async () => { + const testData = 'abcaaaabcdaaa'; + const splitter = charSplitter('a'); + const expected = ['bc', 'bcd']; + const { sink, getResults } = memoryChunkSink(); + + const instance = chunkifier({ + sink, + splitter: () => splitter, + allowEmptyChunks: false, + }); + + await new Promise((resolve, reject) => { + Readable.from(testData) + .pipe(instance) + .on('finish', () => { + try { + const results = getResults().map((b) => b.toString('utf-8')); + expect(results).toEqual(expected); + } catch (err) { + reject(err); + } + resolve(); + }) + .on('error', reject); + }); + }); +}); diff --git a/packages/node/tests/streams/fileChunkSink.spec.ts b/packages/node/tests/streams/fileChunkSink.spec.ts new file mode 100644 index 00000000..ecd0c145 --- /dev/null +++ b/packages/node/tests/streams/fileChunkSink.spec.ts @@ -0,0 +1,60 @@ +import path from 'path'; +import { Writable } from 'stream'; +import { FileChunkSink } from '../../src/streams/fileChunkSink'; +import { mockStreamFileSystem } from '../_mocks/fileSystem'; + +function writeAndClose(stream: Writable, value: string) { + return new Promise((resolve, reject) => { + stream.on('error', reject); + stream.write(value, () => { + stream.end(resolve); + }); + }); +} + +function sortString(a: string, b: string) { + return a.localeCompare(b); +} + +describe('fileChunkSink', () => { + it('should create a filestream with name from filename', async () => { + const fs = mockStreamFileSystem(); + const filename = 'abc'; + const sink = new FileChunkSink({ file: () => filename, maxFiles: Infinity, fs }); + + const stream = sink.getSink()(0); + expect(stream.path).toEqual(filename); + }); + + it('should create a filestream each time it is called', async () => { + const fs = mockStreamFileSystem(); + const dir = 'test'; + const sink = new FileChunkSink({ file: (n) => path.join(dir, n.toString()), maxFiles: Infinity, fs }); + const expected = [0, 2, 5]; + + for (const n of expected) { + const stream = sink.getSink()(n); + await writeAndClose(stream, 'a'); + } + + const actual = await fs.readDir(dir); + expect(actual.sort(sortString)).toEqual(expected.map((e) => e.toString()).sort(sortString)); + }); + + it('should remove previous files if count exceeds maxFiles', async () => { + const fs = mockStreamFileSystem(); + const dir = 'test'; + const maxFiles = 3; + const sink = new FileChunkSink({ file: (n) => path.join(dir, n.toString()), maxFiles, fs }); + const files = [0, 2, 5, 6, 79, 81, 38, -1, 3]; + const expected = files.slice(-maxFiles); + + for (const n of files) { + const stream = sink.getSink()(n); + await writeAndClose(stream, 'a'); + } + + const actual = await fs.readDir(dir); + expect(actual.sort(sortString)).toEqual(expected.map((e) => e.toString()).sort(sortString)); + }); +}); diff --git a/packages/node/tests/streams/lengthChunkSplitter.spec.ts b/packages/node/tests/streams/lengthChunkSplitter.spec.ts new file mode 100644 index 00000000..fcef8934 --- /dev/null +++ b/packages/node/tests/streams/lengthChunkSplitter.spec.ts @@ -0,0 +1,87 @@ +import { Readable } from 'stream'; +import { lengthChunkSplitter } from '../../src/streams/lengthChunkSplitter'; +import { chunkify, splitToEnd } from '../_helpers/chunks'; +import { randomString } from '../_helpers/generators'; + +describe('lengthChunkSplitter', () => { + it('should split chunk if it is larger than maxLength', () => { + const maxLength = 10; + const chunk = randomString().read(30); + const splitter = lengthChunkSplitter(maxLength); + + const [c1, c2] = splitter(chunk, 'utf-8'); + expect(c1.length).toEqual(maxLength); + expect(c2?.length).toEqual(30 - maxLength); + }); + + it('should split chunk if total seen length is larger than maxLength', () => { + const maxLength = 100; + const chunk = randomString().read(30); + const splitter = lengthChunkSplitter(maxLength); + + splitter(chunk, 'utf-8'); + splitter(chunk, 'utf-8'); + splitter(chunk, 'utf-8'); + const [c1, c2] = splitter(chunk, 'utf-8'); + + expect(c1.length).toEqual(100 - 30 * 3); + expect(c2?.length).toEqual(20); + }); + + it('should not split chunk if it is smaller than maxLength', () => { + const maxLength = 100; + const chunk = randomString().read(30); + const splitter = lengthChunkSplitter(maxLength); + const [c1, c2] = splitter(chunk, 'utf-8'); + + expect(c1.length).toEqual(30); + expect(c2).toBeUndefined(); + }); + + it('should not split chunk if it is equal to maxLength', () => { + const maxLength = 100; + const chunk = randomString().read(maxLength); + const splitter = lengthChunkSplitter(maxLength); + const [c1, c2] = splitter(chunk, 'utf-8'); + + expect(c1.length).toEqual(maxLength); + expect(c2).toBeUndefined(); + }); + + it('should split chunk by length', async () => { + const maxLength = 123; + const data: Buffer = randomString().read(1000); + const splitter = lengthChunkSplitter(maxLength); + const actual = await splitToEnd(Readable.from(chunkify(data, 100)), splitter); + + for (let i = 0; i < actual.length; i++) { + const chunk = actual[i]; + expect(chunk.length).toBeLessThanOrEqual(maxLength); + expect(chunk).toEqual(data.subarray(i * maxLength, (i + 1) * maxLength)); + } + }); + + describe('whole lines', () => { + it('should split chunk on length with whole lines and break longer lines', async () => { + const data = Buffer.from('a\nb\ncde\nfghijklmno\npqrs\ntuv\nwxyz'); + const maxLength = 4; + const expected = ['a\nb\n', 'cde\n', 'fghi', 'jklm', 'no\n', 'pqrs', '\n', 'tuv\n', 'wxyz']; + + const splitter = lengthChunkSplitter(maxLength, 'break'); + const actual = (await splitToEnd(Readable.from(data), splitter)).map((b) => b.toString('utf-8')); + + expect(actual).toEqual(expected); + }); + + it('should split chunk on length with whole lines and skip longer lines', async () => { + const data = Buffer.from('a\nb\ncde\nfghijklmno\npqrs\ntuv\nwxyz'); + const maxLength = 4; + const expected = ['a\nb\n', 'cde\n', '', '', 'tuv\n', 'wxyz']; + + const splitter = lengthChunkSplitter(maxLength, 'skip'); + const actual = (await splitToEnd(Readable.from(data), splitter)).map((b) => b.toString('utf-8')); + + expect(actual).toEqual(expected); + }); + }); +}); diff --git a/packages/node/tests/streams/lineChunkSplitter.spec.ts b/packages/node/tests/streams/lineChunkSplitter.spec.ts new file mode 100644 index 00000000..de824b6d --- /dev/null +++ b/packages/node/tests/streams/lineChunkSplitter.spec.ts @@ -0,0 +1,80 @@ +import { Readable } from 'stream'; +import { lineChunkSplitter } from '../../src/streams/lineChunkSplitter'; +import { chunkify, splitToEnd } from '../_helpers/chunks'; +import { randomLines, readLines } from '../_helpers/generators'; + +function countNewlines(buffer: Buffer) { + return buffer.reduce((sum, c) => (c === '\n'.charCodeAt(0) ? sum + 1 : sum), 0); +} + +async function getData(lines: number) { + return Buffer.from((await readLines(randomLines(10, 20), lines)).join('\n') + '\n'); +} + +describe('lineChunkSplitter', () => { + it('should split chunk if it has more lines than maxLines', async () => { + const maxLines = 10; + const chunk = await getData(30); + const splitter = lineChunkSplitter(maxLines); + + const [c1, c2] = splitter(chunk, 'utf-8'); + expect(countNewlines(c1)).toEqual(maxLines); // include trailing newline + expect(c2 && countNewlines(c2)).toEqual(30 - maxLines); + }); + + it('should split chunk if total seen lines is more than maxLines', async () => { + const maxLines = 100; + const chunk = await getData(30); + const splitter = lineChunkSplitter(maxLines); + + splitter(chunk, 'utf-8'); + splitter(chunk, 'utf-8'); + splitter(chunk, 'utf-8'); + const [c1, c2] = splitter(chunk, 'utf-8'); + + expect(countNewlines(c1)).toEqual(100 - 30 * 3); + expect(c2 && countNewlines(c2)).toEqual(20); + }); + + it('should not split chunk if it has less lines than maxLines', async () => { + const maxLines = 100; + const chunk = await getData(30); + const splitter = lineChunkSplitter(maxLines); + const [c1, c2] = splitter(chunk, 'utf-8'); + + expect(countNewlines(c1)).toEqual(30); + expect(c2).toBeUndefined(); + }); + + it('should not split chunk if it has maxLines lines', async () => { + const maxLines = 100; + const chunk = await getData(maxLines); + const splitter = lineChunkSplitter(maxLines); + const [c1, c2] = splitter(chunk, 'utf-8'); + + expect(countNewlines(c1)).toEqual(maxLines); + expect(c2?.length).toEqual(0); + }); + + it('should split chunk by lines', async () => { + const maxLines = 123; + const data = await getData(1000); + const splitter = lineChunkSplitter(maxLines); + const actual = await splitToEnd(Readable.from(chunkify(data, 100)), splitter); + + let seen = 0; + for (let i = 0; i < actual.length; i++) { + const chunk = actual[i]; + const start = seen; + const end = seen + chunk.length; + expect(chunk).toEqual(data.subarray(start, end)); + seen += chunk.length; + + if (i === actual.length - 1) { + expect(countNewlines(chunk)).toBeLessThanOrEqual(maxLines); + } else { + expect(countNewlines(chunk)).toEqual(maxLines); + } + } + }); +}); From bcd0402a60ffa4b72b219a40d75efbbda1ac827d Mon Sep 17 00:00:00 2001 From: Sebastian Alex Date: Wed, 4 Sep 2024 13:59:52 +0200 Subject: [PATCH 07/14] sdk-core: allow returning zero, one, or more attachments from BacktraceAttachmentProvider --- .../src/modules/attachments/AttachmentManager.ts | 11 ++++++++++- .../attachments/BacktraceAttachmentProvider.ts | 2 +- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/packages/sdk-core/src/modules/attachments/AttachmentManager.ts b/packages/sdk-core/src/modules/attachments/AttachmentManager.ts index 17d98834..0c3e07da 100644 --- a/packages/sdk-core/src/modules/attachments/AttachmentManager.ts +++ b/packages/sdk-core/src/modules/attachments/AttachmentManager.ts @@ -68,7 +68,16 @@ export class AttachmentManager { continue; } - result.push(provider.get()); + const attachment = provider.get(); + if (!attachment) { + continue; + } + + if (Array.isArray(attachment)) { + result.push(...attachment); + } else { + result.push(attachment); + } } return result; } diff --git a/packages/sdk-core/src/modules/attachments/BacktraceAttachmentProvider.ts b/packages/sdk-core/src/modules/attachments/BacktraceAttachmentProvider.ts index 531ba9f4..d19ed9cb 100644 --- a/packages/sdk-core/src/modules/attachments/BacktraceAttachmentProvider.ts +++ b/packages/sdk-core/src/modules/attachments/BacktraceAttachmentProvider.ts @@ -13,5 +13,5 @@ export interface BacktraceAttachmentProvider { /** * Generate provider attributes */ - get(): BacktraceAttachment; + get(): BacktraceAttachment | BacktraceAttachment[] | undefined; } From 57600512dc03725a8830fd49925a415bd417e997 Mon Sep 17 00:00:00 2001 From: Sebastian Alex Date: Wed, 4 Sep 2024 14:02:02 +0200 Subject: [PATCH 08/14] node: fix invalid imports --- packages/node/src/attachment/BacktraceFileAttachment.ts | 2 +- packages/node/src/streams/combinedChunkSplitter.ts | 2 +- packages/node/src/streams/fileChunkSink.ts | 4 ++-- packages/node/src/streams/lengthChunkSplitter.ts | 2 +- packages/node/src/streams/lineChunkSplitter.ts | 2 +- 5 files changed, 6 insertions(+), 6 deletions(-) diff --git a/packages/node/src/attachment/BacktraceFileAttachment.ts b/packages/node/src/attachment/BacktraceFileAttachment.ts index df581d74..c507eff6 100644 --- a/packages/node/src/attachment/BacktraceFileAttachment.ts +++ b/packages/node/src/attachment/BacktraceFileAttachment.ts @@ -2,7 +2,7 @@ import { BacktraceFileAttachment as CoreBacktraceFileAttachment } from '@backtra import fs from 'fs'; import path from 'path'; import { Readable } from 'stream'; -import { NodeFileSystem } from '../storage/interfaces/NodeFileSystem'; +import { NodeFileSystem } from '../storage/interfaces/NodeFileSystem.js'; export class BacktraceFileAttachment implements CoreBacktraceFileAttachment { public readonly name: string; diff --git a/packages/node/src/streams/combinedChunkSplitter.ts b/packages/node/src/streams/combinedChunkSplitter.ts index 5f818e3b..19ef9e56 100644 --- a/packages/node/src/streams/combinedChunkSplitter.ts +++ b/packages/node/src/streams/combinedChunkSplitter.ts @@ -1,4 +1,4 @@ -import { ChunkSplitter } from './chunkifier'; +import { ChunkSplitter } from './chunkifier.js'; /** * Combines several splitters into one. diff --git a/packages/node/src/streams/fileChunkSink.ts b/packages/node/src/streams/fileChunkSink.ts index f9c724b7..004b3609 100644 --- a/packages/node/src/streams/fileChunkSink.ts +++ b/packages/node/src/streams/fileChunkSink.ts @@ -1,7 +1,7 @@ import EventEmitter from 'events'; import fs from 'fs'; -import { NodeFileSystem } from '../storage/interfaces/NodeFileSystem'; -import { ChunkSink } from './chunkifier'; +import { NodeFileSystem } from '../storage/interfaces/NodeFileSystem.js'; +import { ChunkSink } from './chunkifier.js'; interface FileChunkSinkOptions { /** diff --git a/packages/node/src/streams/lengthChunkSplitter.ts b/packages/node/src/streams/lengthChunkSplitter.ts index a39fca62..dacb0d63 100644 --- a/packages/node/src/streams/lengthChunkSplitter.ts +++ b/packages/node/src/streams/lengthChunkSplitter.ts @@ -1,4 +1,4 @@ -import { ChunkSplitter } from './chunkifier'; +import { ChunkSplitter } from './chunkifier.js'; /** * Splits data into chunks with maximum length. diff --git a/packages/node/src/streams/lineChunkSplitter.ts b/packages/node/src/streams/lineChunkSplitter.ts index 711a9514..68726bd5 100644 --- a/packages/node/src/streams/lineChunkSplitter.ts +++ b/packages/node/src/streams/lineChunkSplitter.ts @@ -1,4 +1,4 @@ -import { ChunkSplitter } from './chunkifier'; +import { ChunkSplitter } from './chunkifier.js'; /** * Splits data into chunks with maximum lines. From d067a121024d06592a4f810679eac198f44bbce0 Mon Sep 17 00:00:00 2001 From: Sebastian Alex Date: Wed, 4 Sep 2024 14:02:14 +0200 Subject: [PATCH 09/14] node: use chunkifier in FileBreadcrumbsStorage --- packages/node/src/BacktraceClient.ts | 4 +- .../src/breadcrumbs/FileBreadcrumbsStorage.ts | 81 ++++++++++++------- .../FileBreadcrumbsStorage.spec.ts | 26 +++--- 3 files changed, 68 insertions(+), 43 deletions(-) diff --git a/packages/node/src/BacktraceClient.ts b/packages/node/src/BacktraceClient.ts index 709e569d..e1b223dc 100644 --- a/packages/node/src/BacktraceClient.ts +++ b/packages/node/src/BacktraceClient.ts @@ -295,7 +295,9 @@ export class BacktraceClient extends BacktraceCoreClient for (const [recordPath, report, session] of reports) { try { if (session) { - report.attachments.push(...FileBreadcrumbsStorage.getSessionAttachments(session)); + report.attachments.push( + ...FileBreadcrumbsStorage.getSessionAttachments(session, this.nodeFileSystem), + ); const fileAttributes = FileAttributeManager.createFromSession(session, this.nodeFileSystem); Object.assign(report.attributes, await fileAttributes.get()); diff --git a/packages/node/src/breadcrumbs/FileBreadcrumbsStorage.ts b/packages/node/src/breadcrumbs/FileBreadcrumbsStorage.ts index a6b19997..7bfc1308 100644 --- a/packages/node/src/breadcrumbs/FileBreadcrumbsStorage.ts +++ b/packages/node/src/breadcrumbs/FileBreadcrumbsStorage.ts @@ -13,10 +13,14 @@ import { TimeHelper, } from '@backtrace/sdk-core'; import path from 'path'; -import { Readable } from 'stream'; +import { Readable, Writable } from 'stream'; import { BacktraceFileAttachment } from '../attachment/index.js'; -import { AlternatingFileWriter } from '../common/AlternatingFileWriter.js'; import { NodeFileSystem } from '../storage/interfaces/NodeFileSystem.js'; +import { chunkifier, ChunkSplitterFactory } from '../streams/chunkifier.js'; +import { combinedChunkSplitter } from '../streams/combinedChunkSplitter.js'; +import { FileChunkSink } from '../streams/fileChunkSink.js'; +import { lengthChunkSplitter } from '../streams/lengthChunkSplitter.js'; +import { lineChunkSplitter } from '../streams/lineChunkSplitter.js'; const FILE_PREFIX = 'bt-breadcrumbs'; @@ -27,55 +31,71 @@ export class FileBreadcrumbsStorage implements BreadcrumbsStorage { private _lastBreadcrumbId: number = TimeHelper.toTimestampInSec(TimeHelper.now()); - private readonly _writer: AlternatingFileWriter; + private readonly _dest: Writable; + private readonly _sink: FileChunkSink; constructor( - private readonly _mainFile: string, - private readonly _fallbackFile: string, + session: SessionFiles, private readonly _fileSystem: NodeFileSystem, private readonly _limits: BreadcrumbsStorageLimits, ) { - this._writer = new AlternatingFileWriter( - _fileSystem, - _mainFile, - _fallbackFile, - this._limits.maximumBreadcrumbs ? Math.floor(this._limits.maximumBreadcrumbs / 2) : undefined, - this._limits.maximumTotalBreadcrumbsSize, - ); + const splitters: ChunkSplitterFactory[] = []; + const maximumBreadcrumbs = this._limits.maximumBreadcrumbs; + if (maximumBreadcrumbs !== undefined) { + splitters.push(() => lineChunkSplitter(Math.ceil(maximumBreadcrumbs / 2))); + } + + const maximumTotalBreadcrumbsSize = this._limits.maximumTotalBreadcrumbsSize; + if (maximumTotalBreadcrumbsSize !== undefined) { + splitters.push(() => lengthChunkSplitter(Math.ceil(maximumTotalBreadcrumbsSize), 'skip')); + } + + this._sink = new FileChunkSink({ + maxFiles: 2, + fs: this._fileSystem, + file: (n) => session.getFileName(FileBreadcrumbsStorage.getFileName(n)), + }); + + if (!splitters.length) { + this._dest = this._sink.getSink()(0); + } else { + this._dest = chunkifier({ + sink: this._sink.getSink(), + splitter: + splitters.length === 1 ? splitters[0] : () => combinedChunkSplitter(...splitters.map((s) => s())), + }); + } + + this._dest.on('error', () => { + // Do nothing on error + }); } - public static getSessionAttachments(session: SessionFiles) { + public static getSessionAttachments(session: SessionFiles, fileSystem: NodeFileSystem) { const files = session .getSessionFiles() .filter((f) => path.basename(f).startsWith(FILE_PREFIX)) .slice(0, 2); - return files.map((file) => new BacktraceFileAttachment(file, path.basename(file))); + return files.map((file) => new BacktraceFileAttachment(file, path.basename(file), fileSystem)); } public static factory(session: SessionFiles, fileSystem: NodeFileSystem): BreadcrumbsStorageFactory { - return ({ limits }) => { - const file1 = session.getFileName(this.getFileName(0)); - const file2 = session.getFileName(this.getFileName(1)); - return new FileBreadcrumbsStorage(file1, file2, fileSystem, limits); - }; + return ({ limits }) => new FileBreadcrumbsStorage(session, fileSystem, limits); } - public getAttachments(): [BacktraceAttachment, BacktraceAttachment] { - return [ - new BacktraceFileAttachment(this._mainFile, 'bt-breadcrumbs-0', this._fileSystem), - new BacktraceFileAttachment(this._fallbackFile, 'bt-breadcrumbs-1', this._fileSystem), - ]; + public getAttachments(): BacktraceAttachment[] { + const files = [...this._sink.files].map((f) => f.path.toString('utf-8')); + return files.map((f) => new BacktraceFileAttachment(f, f, this._fileSystem)); } public getAttachmentProviders(): BacktraceAttachmentProvider[] { return [ { - get: () => new BacktraceFileAttachment(this._mainFile, 'bt-breadcrumbs-0'), - type: 'dynamic', - }, - { - get: () => new BacktraceFileAttachment(this._fallbackFile, 'bt-breadcrumbs-1'), + get: () => { + const files = [...this._sink.files].map((f) => f.path.toString('utf-8')); + return files.map((f) => new BacktraceFileAttachment(f, f, this._fileSystem)); + }, type: 'dynamic', }, ]; @@ -102,8 +122,7 @@ export class FileBreadcrumbsStorage implements BreadcrumbsStorage { } } - this._writer.writeLine(breadcrumbJson); - + this._dest.write(breadcrumbJson + '\n'); return id; } diff --git a/packages/node/tests/breadcrumbs/FileBreadcrumbsStorage.spec.ts b/packages/node/tests/breadcrumbs/FileBreadcrumbsStorage.spec.ts index 68edc451..a5fa30cd 100644 --- a/packages/node/tests/breadcrumbs/FileBreadcrumbsStorage.spec.ts +++ b/packages/node/tests/breadcrumbs/FileBreadcrumbsStorage.spec.ts @@ -1,4 +1,4 @@ -import { Breadcrumb, BreadcrumbLogLevel, BreadcrumbType, RawBreadcrumb } from '@backtrace/sdk-core'; +import { Breadcrumb, BreadcrumbLogLevel, BreadcrumbType, RawBreadcrumb, SessionFiles } from '@backtrace/sdk-core'; import assert from 'assert'; import { Readable } from 'stream'; import { promisify } from 'util'; @@ -34,6 +34,7 @@ const nextTick = promisify(process.nextTick); describe('FileBreadcrumbsStorage', () => { it('should return added breadcrumbs', async () => { const fs = mockStreamFileSystem(); + const session = new SessionFiles(fs, '.', 'sessionId'); const breadcrumbs: RawBreadcrumb[] = [ { @@ -85,7 +86,7 @@ describe('FileBreadcrumbsStorage', () => { }, ]; - const storage = new FileBreadcrumbsStorage('breadcrumbs-1', 'breadcrumbs-2', fs, { + const storage = new FileBreadcrumbsStorage(session, fs, { maximumBreadcrumbs: 100, }); @@ -107,6 +108,7 @@ describe('FileBreadcrumbsStorage', () => { it('should return added breadcrumbs in two attachments', async () => { const fs = mockStreamFileSystem(); + const session = new SessionFiles(fs, '.', 'sessionId'); const breadcrumbs: RawBreadcrumb[] = [ { @@ -161,7 +163,7 @@ describe('FileBreadcrumbsStorage', () => { }, ]; - const storage = new FileBreadcrumbsStorage('breadcrumbs-1', 'breadcrumbs-2', fs, { + const storage = new FileBreadcrumbsStorage(session, fs, { maximumBreadcrumbs: 4, }); @@ -173,7 +175,7 @@ describe('FileBreadcrumbsStorage', () => { // FileBreadcrumbsStorage is asynchronous in nature await nextTick(); - const [mainAttachment, fallbackAttachment] = storage.getAttachments(); + const [fallbackAttachment, mainAttachment] = storage.getAttachments(); const mainStream = mainAttachment.get(); const fallbackStream = fallbackAttachment.get(); @@ -188,6 +190,7 @@ describe('FileBreadcrumbsStorage', () => { it('should return no more than maximumBreadcrumbs breadcrumbs', async () => { const fs = mockStreamFileSystem(); + const session = new SessionFiles(fs, '.', 'sessionId'); const breadcrumbs: RawBreadcrumb[] = [ { @@ -232,7 +235,7 @@ describe('FileBreadcrumbsStorage', () => { }, ]; - const storage = new FileBreadcrumbsStorage('breadcrumbs-1', 'breadcrumbs-2', fs, { + const storage = new FileBreadcrumbsStorage(session, fs, { maximumBreadcrumbs: 2, }); @@ -244,7 +247,7 @@ describe('FileBreadcrumbsStorage', () => { // FileBreadcrumbsStorage is asynchronous in nature await nextTick(); - const [mainAttachment, fallbackAttachment] = storage.getAttachments(); + const [fallbackAttachment, mainAttachment] = storage.getAttachments(); const mainStream = mainAttachment.get(); const fallbackStream = fallbackAttachment.get(); @@ -259,6 +262,7 @@ describe('FileBreadcrumbsStorage', () => { it('should return breadcrumbs up to the json size', async () => { const fs = mockStreamFileSystem(); + const session = new SessionFiles(fs, '.', 'sessionId'); const breadcrumbs: RawBreadcrumb[] = [ { @@ -298,9 +302,9 @@ describe('FileBreadcrumbsStorage', () => { }, ]; - const storage = new FileBreadcrumbsStorage('breadcrumbs-1', 'breadcrumbs-2', fs, { + const storage = new FileBreadcrumbsStorage(session, fs, { maximumBreadcrumbs: 100, - maximumBreadcrumbsSize: JSON.stringify(expectedMain[0]).length + 10, + maximumTotalBreadcrumbsSize: JSON.stringify(expectedMain[0]).length + 10, }); for (const breadcrumb of breadcrumbs) { @@ -311,10 +315,10 @@ describe('FileBreadcrumbsStorage', () => { // FileBreadcrumbsStorage is asynchronous in nature await nextTick(); - const [mainAttachment, fallbackAttachment] = storage.getAttachments(); + const [fallbackAttachment, mainAttachment] = storage.getAttachments(); - const mainStream = mainAttachment.get(); - const fallbackStream = fallbackAttachment.get(); + const mainStream = mainAttachment?.get(); + const fallbackStream = fallbackAttachment?.get(); assert(mainStream); assert(fallbackStream); From 9726a849de543d171f26118e7a7bc99464c27b15 Mon Sep 17 00:00:00 2001 From: Sebastian Alex Date: Mon, 22 Jul 2024 13:38:16 +0200 Subject: [PATCH 10/14] node: remove AlternatingFileWriter --- .../node/src/common/AlternatingFileWriter.ts | 153 ----------------- .../node/tests/common/alternatingFile.spec.ts | 154 ------------------ 2 files changed, 307 deletions(-) delete mode 100644 packages/node/src/common/AlternatingFileWriter.ts delete mode 100644 packages/node/tests/common/alternatingFile.spec.ts diff --git a/packages/node/src/common/AlternatingFileWriter.ts b/packages/node/src/common/AlternatingFileWriter.ts deleted file mode 100644 index bb62c239..00000000 --- a/packages/node/src/common/AlternatingFileWriter.ts +++ /dev/null @@ -1,153 +0,0 @@ -import { Writable } from 'stream'; -import { NodeFileSystem } from '../storage/interfaces/NodeFileSystem.js'; - -export class AlternatingFileWriter { - private _fileStream?: Writable; - private _count = 0; - private _size = 0; - private _disposed = false; - - private readonly _logQueue: string[] = []; - private _currentAppendedLog?: string; - - constructor( - private readonly _fileSystem: NodeFileSystem, - private readonly _mainFile: string, - private readonly _fallbackFile: string, - private readonly _maxLines?: number, - private readonly _maxSize?: number, - ) {} - - public async writeLine(value: string): Promise { - if (this._disposed) { - throw new Error('This instance has been disposed.'); - } - - this._logQueue.push(value); - if (!this._currentAppendedLog) { - return await this.process(); - } - } - - private async process(): Promise { - this._currentAppendedLog = this._logQueue.shift(); - - if (!this._currentAppendedLog) { - return; - } - - const appendLength = this._currentAppendedLog.length + 1; - this.prepareBreadcrumbStream(appendLength); - - if (!this._fileStream) { - this._logQueue.unshift(this._currentAppendedLog); - this._currentAppendedLog = undefined; - return; - } - - // if the queue is full and we can save more item in a batch - // try to save as much as possible to speed up potential native operations - this._count += 1; - this._size += appendLength; - - const logsToAppend = [this._currentAppendedLog]; - - let logsToTake = 0; - let currentCount = this._count; - let currentSize = this._size; - - for (let i = 0; i < this._logQueue.length; i++) { - const log = this._logQueue[i]; - if (!log) { - continue; - } - - const logLength = log.length + 1; - - if (currentCount + 1 > (this._maxLines ?? Infinity)) { - break; - } - - if (currentSize + logLength >= (this._maxSize ?? Infinity)) { - break; - } - - logsToTake++; - currentCount++; - currentSize += logLength; - } - - const restAppendingLogs = this._logQueue.splice(0, logsToTake); - this._count = this._count + restAppendingLogs.length; - this._size += restAppendingLogs.reduce((sum, l) => sum + l.length + 1, 0); - - logsToAppend.push(...restAppendingLogs); - - return await this.writeAsync(this._fileStream, logsToAppend.join('\n') + '\n') - .catch(() => { - // handle potential issues with appending logs. - // we can't do really too much here other than retry - // logging the error might also cause a breadcrumb loop, that we should try to avoid - this._logQueue.unshift(...logsToAppend); - }) - .finally(() => { - if (this._logQueue.length !== 0) { - return this.process(); - } else { - this._currentAppendedLog = undefined; - } - }); - } - - private writeAsync(fs: Writable, data: string) { - return new Promise((resolve, reject) => fs.write(data, (err) => (err ? reject(err) : resolve()))); - } - - private prepareBreadcrumbStream(newSize: number) { - if (!this._fileStream) { - this._fileStream = this.safeCreateStream(this._mainFile); - } else if (this._count >= (this._maxLines ?? Infinity) || this._size + newSize >= (this._maxSize ?? Infinity)) { - this.switchFile(); - } - } - - private switchFile() { - if (this._fileStream) { - this._fileStream.destroy(); - } - - this._fileStream = undefined; - - const renameResult = this.safeMoveMainToFallback(); - if (!renameResult) { - return; - } - - this._fileStream = this.safeCreateStream(this._mainFile); - - this._count = 0; - this._size = 0; - } - - public dispose() { - this._fileStream?.destroy(); - this._disposed = true; - } - - private safeMoveMainToFallback() { - try { - this._fileSystem.renameSync(this._mainFile, this._fallbackFile); - return true; - } catch { - return false; - } - } - - private safeCreateStream(path: string) { - try { - return this._fileSystem.createWriteStream(path); - } catch { - return undefined; - } - } -} diff --git a/packages/node/tests/common/alternatingFile.spec.ts b/packages/node/tests/common/alternatingFile.spec.ts deleted file mode 100644 index 0690d08c..00000000 --- a/packages/node/tests/common/alternatingFile.spec.ts +++ /dev/null @@ -1,154 +0,0 @@ -import fs from 'fs'; -import path from 'path'; -import { fileURLToPath } from 'url'; -import { AlternatingFileWriter } from '../../src/common/AlternatingFileWriter.js'; -import { FsNodeFileSystem } from '../../src/storage/FsNodeFileSystem.js'; -import { mockStreamFileSystem } from '../_mocks/fileSystem.js'; - -function unlinkSafe(file: string) { - try { - fs.unlinkSync(file); - } catch { - // Do nothing - } -} - -describe('AlternatingFileWriter', () => { - const dir = path.join(path.dirname(fileURLToPath(import.meta.url)), '../_testOutput'); - const file1 = path.join(dir, 'alternating_file1'); - const file2 = path.join(dir, 'alternating_file2'); - - beforeAll(() => { - fs.mkdirSync(dir, { recursive: true }); - - unlinkSafe(file1); - unlinkSafe(file2); - }); - - afterEach(() => { - unlinkSafe(file1); - unlinkSafe(file2); - }); - - it('should add line to the main file', async () => { - const writer = new AlternatingFileWriter(new FsNodeFileSystem(), file1, file2, 10); - await writer.writeLine('value'); - writer.dispose(); - - const mainFile = await fs.promises.readFile(file1, 'utf-8'); - expect(mainFile).toEqual('value\n'); - }); - - it('should not move main file to fallback file before adding with fileCapacity reached', async () => { - const count = 5; - const writer = new AlternatingFileWriter(new FsNodeFileSystem(), file1, file2, count); - for (let i = 0; i < count; i++) { - await writer.writeLine(`value-${i}`); - } - writer.dispose(); - - await expect(fs.promises.stat(file2)).rejects.toThrowError(); - }); - - it('should move main file to fallback file after adding with fileCapacity reached', async () => { - const count = 5; - const writer = new AlternatingFileWriter(new FsNodeFileSystem(), file1, file2, count); - for (let i = 0; i < count; i++) { - await writer.writeLine(`value-${i}`); - } - - const mainFile = await fs.promises.readFile(file1, 'utf-8'); - await writer.writeLine('value-x'); - writer.dispose(); - - const fallbackFile = await fs.promises.readFile(file2, 'utf-8'); - expect(fallbackFile).toEqual(mainFile); - }); - - it('should add line to the main file after adding with fileCapacity reached', async () => { - const count = 5; - const writer = new AlternatingFileWriter(new FsNodeFileSystem(), file1, file2, count); - for (let i = 0; i < count; i++) { - await writer.writeLine(`value-${i}`); - } - - await writer.writeLine('value-x'); - writer.dispose(); - - const mainFile = await fs.promises.readFile(file1, 'utf-8'); - expect(mainFile).toEqual('value-x\n'); - }); - - it('should throw after adding line when disposed', async () => { - const writer = new AlternatingFileWriter(new FsNodeFileSystem(), file1, file2, 10); - writer.dispose(); - await expect(writer.writeLine('value-x')).rejects.toThrowError('This instance has been disposed.'); - }); - - it('should not write when fileCapacity is 0', () => { - const writer = new AlternatingFileWriter(new FsNodeFileSystem(), file1, file2, 0); - writer.writeLine('abc'); - writer.dispose(); - - expect(fs.existsSync(file1)).toEqual(false); - expect(fs.existsSync(file2)).toEqual(false); - }); - - it('should not write fileCapacity is less than 0', () => { - const writer = new AlternatingFileWriter(new FsNodeFileSystem(), file1, file2, -1); - writer.writeLine('abc'); - writer.dispose(); - - expect(fs.existsSync(file1)).toEqual(false); - expect(fs.existsSync(file2)).toEqual(false); - }); - - describe('stress test', () => { - it('should not throw', async () => { - const writer = new AlternatingFileWriter(new FsNodeFileSystem(), file1, file2, 1); - - const write = async (count: number, entry: string) => { - for (let i = 0; i < count; i++) { - await writer.writeLine(entry); - } - }; - - const writerCount = 100; - const writeCount = 100; - const promises = [...new Array(writerCount)].map(() => write(writeCount, 'text')); - await expect(Promise.all(promises)).resolves.not.toThrow(); - }, 10000); - - it('should not skip text', async () => { - const fs = mockStreamFileSystem(); - const renameSync = fs.renameSync; - - let fallbackText = ''; - - fs.renameSync = jest.fn((oldPath, newPath) => { - fallbackText += fs.readFileSync(oldPath); - return renameSync(oldPath, newPath); - }); - - const writer = new AlternatingFileWriter(fs, file1, file2, 1); - - const write = async (count: number, entry: string) => { - for (let i = 0; i < count; i++) { - await writer.writeLine(entry); - } - }; - - // TODO: Current implementation is kinda bad with more writers, and loses some data in fact. - // Trying to fix this though can take a lot of work, so leaving this for now - const writerCount = 1; - const writeCount = 100; - const promises = [...new Array(writerCount)].map(() => write(writeCount, 'text')); - await Promise.all(promises); - - const expected = [...new Array(writerCount * writeCount - 1)].map(() => 'text\n').join(''); - - expect(renameSync).toBeCalledTimes(writerCount * writeCount - 1); - expect(fallbackText.length).toEqual(expected.length); - }); - }); -}); From f48ef2a9efe7436b494ce611dd98434279a4d6be Mon Sep 17 00:00:00 2001 From: Sebastian Alex Date: Wed, 4 Sep 2024 14:08:36 +0200 Subject: [PATCH 11/14] node: make fileSystem optional in FileBreadcrumbsStorage.getSessionAttachments --- packages/node/src/breadcrumbs/FileBreadcrumbsStorage.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/node/src/breadcrumbs/FileBreadcrumbsStorage.ts b/packages/node/src/breadcrumbs/FileBreadcrumbsStorage.ts index 7bfc1308..3b4bc5ea 100644 --- a/packages/node/src/breadcrumbs/FileBreadcrumbsStorage.ts +++ b/packages/node/src/breadcrumbs/FileBreadcrumbsStorage.ts @@ -71,7 +71,7 @@ export class FileBreadcrumbsStorage implements BreadcrumbsStorage { }); } - public static getSessionAttachments(session: SessionFiles, fileSystem: NodeFileSystem) { + public static getSessionAttachments(session: SessionFiles, fileSystem?: NodeFileSystem) { const files = session .getSessionFiles() .filter((f) => path.basename(f).startsWith(FILE_PREFIX)) From 786a84c05d23b6fd66263ee3dcabe48a1f228d3d Mon Sep 17 00:00:00 2001 From: Sebastian Alex Date: Wed, 4 Sep 2024 14:17:33 +0200 Subject: [PATCH 12/14] node: fix lint and test issues --- packages/node/src/streams/chunkifier.ts | 2 +- packages/node/src/streams/lineChunkSplitter.ts | 1 + packages/node/tests/_helpers/blackholeChunkSink.ts | 2 +- packages/node/tests/_helpers/chunks.ts | 2 +- packages/node/tests/_helpers/events.ts | 2 +- packages/node/tests/_helpers/generators.ts | 2 +- packages/node/tests/_helpers/memoryChunkSink.ts | 4 ++-- packages/node/tests/_mocks/fileSystem.ts | 4 +++- .../node/tests/breadcrumbs/FileBreadcrumbsStorage.spec.ts | 4 ++-- 9 files changed, 13 insertions(+), 10 deletions(-) diff --git a/packages/node/src/streams/chunkifier.ts b/packages/node/src/streams/chunkifier.ts index 521bd920..13d75aa8 100644 --- a/packages/node/src/streams/chunkifier.ts +++ b/packages/node/src/streams/chunkifier.ts @@ -105,7 +105,7 @@ export function chunkifier({ sink: streamFactory, ...options }: ChunkifierOption function forwardEvents(from: E, to: E, ...events: string[]) { const fwd = (event: string) => - (...args: any[]) => + (...args: unknown[]) => to.emit(event as string, ...args, to); const forwards: [string, ReturnType][] = []; diff --git a/packages/node/src/streams/lineChunkSplitter.ts b/packages/node/src/streams/lineChunkSplitter.ts index 68726bd5..324d2fb6 100644 --- a/packages/node/src/streams/lineChunkSplitter.ts +++ b/packages/node/src/streams/lineChunkSplitter.ts @@ -10,6 +10,7 @@ export function lineChunkSplitter(maxLines: number): ChunkSplitter { function findNthLine(data: Buffer, remaining: number): [number, number] { let lastIndex = -1; let count = 0; + // eslint-disable-next-line no-constant-condition while (true) { lastIndex = data.indexOf('\n', lastIndex + 1); if (lastIndex === -1) { diff --git a/packages/node/tests/_helpers/blackholeChunkSink.ts b/packages/node/tests/_helpers/blackholeChunkSink.ts index dee01f05..cd73d45c 100644 --- a/packages/node/tests/_helpers/blackholeChunkSink.ts +++ b/packages/node/tests/_helpers/blackholeChunkSink.ts @@ -1,5 +1,5 @@ import { Writable } from 'stream'; -import { ChunkSink } from '../../src/streams/chunkifier'; +import { ChunkSink } from '../../src/streams/chunkifier.js'; export function blackholeChunkSink(): ChunkSink { return () => { diff --git a/packages/node/tests/_helpers/chunks.ts b/packages/node/tests/_helpers/chunks.ts index e2c7f903..b3101f61 100644 --- a/packages/node/tests/_helpers/chunks.ts +++ b/packages/node/tests/_helpers/chunks.ts @@ -1,5 +1,5 @@ import { Readable } from 'stream'; -import { ChunkSplitter } from '../../src/streams/chunkifier'; +import { ChunkSplitter } from '../../src/streams/chunkifier.js'; export async function splitToEnd(readable: Readable, splitter: ChunkSplitter) { const results: Buffer[][] = [[]]; diff --git a/packages/node/tests/_helpers/events.ts b/packages/node/tests/_helpers/events.ts index a903a4f0..a9846dd7 100644 --- a/packages/node/tests/_helpers/events.ts +++ b/packages/node/tests/_helpers/events.ts @@ -3,7 +3,7 @@ import { EventEmitter } from 'stream'; export function forwardEvents(from: E, to: E, ...events: string[]) { const fwd = (event: string) => - (...args: any[]) => + (...args: unknown[]) => to.emit(event as string, ...args, to); const forwards: [string, ReturnType][] = []; diff --git a/packages/node/tests/_helpers/generators.ts b/packages/node/tests/_helpers/generators.ts index 48e04e31..d532f8e2 100644 --- a/packages/node/tests/_helpers/generators.ts +++ b/packages/node/tests/_helpers/generators.ts @@ -56,7 +56,7 @@ export function limitLines(count: number) { let seen = 0; return new Transform({ - transform(chunk: Buffer, encoding) { + transform(chunk: Buffer) { const remaining = count - seen; if (chunk.length >= remaining) { this.push(null); diff --git a/packages/node/tests/_helpers/memoryChunkSink.ts b/packages/node/tests/_helpers/memoryChunkSink.ts index 75623d3b..e3dd8b6b 100644 --- a/packages/node/tests/_helpers/memoryChunkSink.ts +++ b/packages/node/tests/_helpers/memoryChunkSink.ts @@ -1,11 +1,11 @@ import { Writable } from 'stream'; -import { ChunkSink } from '../../src/streams/chunkifier'; +import { ChunkSink } from '../../src/streams/chunkifier.js'; export function memoryChunkSink() { const results: Buffer[][] = []; const sink: ChunkSink = () => { - let index = results.length; + const index = results.length; results.push([]); return new Writable({ diff --git a/packages/node/tests/_mocks/fileSystem.ts b/packages/node/tests/_mocks/fileSystem.ts index e3c175bf..51e3a9a3 100644 --- a/packages/node/tests/_mocks/fileSystem.ts +++ b/packages/node/tests/_mocks/fileSystem.ts @@ -1,4 +1,6 @@ -import { MockedFileSystem, mockFileSystem } from '@backtrace/sdk-core/tests/_mocks/fileSystem.js'; +// eslint-disable-next-line @typescript-eslint/ban-ts-comment +// @ts-ignore The following import fails due to missing extension, but it cannot have one (it imports a .ts file) +import { MockedFileSystem, mockFileSystem } from '@backtrace/sdk-core/tests/_mocks/fileSystem'; import { ReadStream, WriteStream } from 'fs'; import path from 'path'; import { Readable, Writable } from 'stream'; diff --git a/packages/node/tests/breadcrumbs/FileBreadcrumbsStorage.spec.ts b/packages/node/tests/breadcrumbs/FileBreadcrumbsStorage.spec.ts index a5fa30cd..a5b7d2cf 100644 --- a/packages/node/tests/breadcrumbs/FileBreadcrumbsStorage.spec.ts +++ b/packages/node/tests/breadcrumbs/FileBreadcrumbsStorage.spec.ts @@ -2,8 +2,8 @@ import { Breadcrumb, BreadcrumbLogLevel, BreadcrumbType, RawBreadcrumb, SessionF import assert from 'assert'; import { Readable } from 'stream'; import { promisify } from 'util'; -import { FileBreadcrumbsStorage } from '../../src/breadcrumbs/FileBreadcrumbsStorage'; -import { mockStreamFileSystem } from '../_mocks/fileSystem'; +import { FileBreadcrumbsStorage } from '../../src/breadcrumbs/FileBreadcrumbsStorage.js'; +import { mockStreamFileSystem } from '../_mocks/fileSystem.js'; async function readToEnd(readable: Readable) { return new Promise((resolve, reject) => { From 7144b7abca2d43d12476f3d2cd02d37af6264c9c Mon Sep 17 00:00:00 2001 From: Sebastian Alex Date: Tue, 24 Sep 2024 17:24:54 +0200 Subject: [PATCH 13/14] node: update docs for lengthChunkSplitter --- packages/node/src/streams/lengthChunkSplitter.ts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/packages/node/src/streams/lengthChunkSplitter.ts b/packages/node/src/streams/lengthChunkSplitter.ts index dacb0d63..5b753abe 100644 --- a/packages/node/src/streams/lengthChunkSplitter.ts +++ b/packages/node/src/streams/lengthChunkSplitter.ts @@ -3,7 +3,10 @@ import { ChunkSplitter } from './chunkifier.js'; /** * Splits data into chunks with maximum length. * @param maxLength Maximum length of one chunk. - * @param wholeLines If `true`, will split chunks before newlines, so whole lines are passed to the chunk. + * @param wholeLines Can be one of: + * * `"skip"` - if last line does not fit in the chunk, it will be skipped entirely + * * `"break"` - if last line does not fit in the chunk, it will be broken into two new chunks + * * `false` - last line will be always broken into old and new chunk */ export function lengthChunkSplitter(maxLength: number, wholeLines: 'skip' | 'break' | false = false): ChunkSplitter { let seen = 0; From 4f54ec254b1e28a5a35ef848d576bb0728ed4196 Mon Sep 17 00:00:00 2001 From: Sebastian Alex Date: Tue, 24 Sep 2024 17:29:47 +0200 Subject: [PATCH 14/14] node: return undefined from FileBreadcrumbsStorage.add if size is too large --- packages/node/src/breadcrumbs/FileBreadcrumbsStorage.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/node/src/breadcrumbs/FileBreadcrumbsStorage.ts b/packages/node/src/breadcrumbs/FileBreadcrumbsStorage.ts index 3b4bc5ea..07f8f865 100644 --- a/packages/node/src/breadcrumbs/FileBreadcrumbsStorage.ts +++ b/packages/node/src/breadcrumbs/FileBreadcrumbsStorage.ts @@ -101,7 +101,7 @@ export class FileBreadcrumbsStorage implements BreadcrumbsStorage { ]; } - public add(rawBreadcrumb: RawBreadcrumb): number { + public add(rawBreadcrumb: RawBreadcrumb) { this._lastBreadcrumbId++; const id = this._lastBreadcrumbId; const breadcrumb: Breadcrumb = { @@ -118,7 +118,7 @@ export class FileBreadcrumbsStorage implements BreadcrumbsStorage { const sizeLimit = this._limits.maximumTotalBreadcrumbsSize; if (sizeLimit !== undefined) { if (jsonLength > sizeLimit) { - return id; + return undefined; } }