-
Notifications
You must be signed in to change notification settings - Fork 17
feat: Implement IPC RecordBatch body buffer compression #14
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
src/ipc/reader.ts
Outdated
| const combined = new Uint8Array(totalSize); | ||
|
|
||
| for (const [i, decompressedBuffer] of decompressedBuffers.entries()) { | ||
| combined.set(decompressedBuffer, newBufferRegions[i].offset); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should be able to implement this without copying the inflated data back into a single contiguous ArrayBuffer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's possible to implement a VirtualUint8Array class that takes an array of Uint8Array chunks and implements the necessary methods to behave like a contiguous Uint8Array. I'm going to experiment with that approach soon.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that might be more complicated than necessary.
IIUC, the new logic loops through all buffers, decompresses them, and collects them into a list. Then it packs all the decompressed buffers into a contiguous ArrayBuffer that matches the equivalent IPC format without compression.
In order to avoid the last step of re-packing into an ArrayBuffer, we'd need to return the list of uncompressed buffers and use a VectorLoader instance that accepts the list and selects the buffers by index (vs. the current behavior which accepts the contiguous ArrayBuffer and slices from it). Luckily, that's exactly what the JSONVectorLoader does!
I don't think you can use the JSONVectorLoader directly, since it assumes the list of buffers are JSON-encoded representations of the values, but you could implement a new CompressedVectorLoader class that closely follows its structure but doesn't call methods like packBools() and binaryDataFromJSON().
The logic in your function here would need to also return a list of BufferRegion instances whose offset field corresponds to the Array index of each decompressed buffer (rather than the byteOffset of each buffer in the contiguous ArrayBuffer).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something like this:
export class CompressedVectorLoader extends VectorLoader {
private sources: any[][];
constructor(sources: Uint8Array[][], nodes: FieldNode[], buffers: BufferRegion[], dictionaries: Map<number, Vector<any>>, metadataVersion: MetadataVersion) {
super(new Uint8Array(0), nodes, buffers, dictionaries, metadataVersion);
this.sources = sources;
}
protected readNullBitmap<T extends DataType>(_type: T, nullCount: number, { offset } = this.nextBufferRange()) {
return nullCount <= 0 ? new Uint8Array(0) : this.sources[offset];
}
protected readOffsets<T extends DataType>(_type: T, { offset } = this.nextBufferRange()) {
return this.sources[offset];
}
protected readTypeIds<T extends DataType>(_type: T, { offset } = this.nextBufferRange()) {
return this.sources[offset];
}
protected readData<T extends DataType>(_type: T, { offset } = this.nextBufferRange()) {
return this.sources[offset];
}
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ended up solving this issue without implementing a VirtualUint8Array. Instead, I modified the body parameter signature in _loadVectors and the VectorLoader constructor to accept Uint8Array | Uint8Array[].
It worked out nicely because the class already has a buffersIndex parameter that points to the correct buffer, and in my case, the decompression order matches the BufferRegion[] sequence. This approach required minimal changes, and thanks to the type signature, TypeScript will prevent errors in future modifications to VectorLoader.
Your suggested approach (with CompressedVectorLoader) is also interesting—it would help isolate the logic for compressed buffers. If you think it’s the better solution, I can refactor the code to use it instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could you help me figure out how to properly test this functionality
Do you intend to add compression support to the writer? Typically that's how we'd test this sort of behavior, since the reader and writer are duals.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No problem—I’ll refactor this to use the CompressedVectorLoader class instead of overloading the type signatures.
As for testing, I do plan to add compression support to the writer, though likely not for another month (depending on my project’s needs). Initially, I assumed the tests should be entirely independent, but I agree that aligning them with the writer’s behavior makes more sense and will be more maintainable long-term.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can take a look at adding it to the writer. I wouldn't want to merge this PR without at least a limited set of tests, and verifying we can read what we write is the easiest way to do that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, I'll try to find time this weekend to implement compression support in the writer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @trxcllnt!
I've implemented compression support for the reader and performed some minor refactoring to improve the structure. Here are the key changes:
- Added compression support for the writer (debugged and tested)
- Successfully verified LZ4 writer locally - it works correctly
- Small refactoring to streamline the code
- Introduced codec validators to prevent potential library mismatch issues
The main motivation for validators came from realizing that the current CompressionRegistry approach might cause problems for users when trying to match compression/decompression libraries across different environments.
Could you please review my changes, especially the validation logic? Maybe you can suggest something about ZSTD validation?
c739044 to
d1cabbf
Compare
| class Lz4FrameValidator implements CompressionValidator { | ||
| private readonly LZ4_FRAME_MAGIC = new Uint8Array([4, 34, 77, 24]); | ||
| private readonly MIN_HEADER_LENGTH = 7; // 4 (magic) + 2 (FLG + BD) + 1 (header checksum) = 7 min bytes | ||
|
|
||
| isValidCodecEncode(codec: Codec): boolean { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since many libraries use the raw LZ4 format instead of the framed format, I decided to add validation for the encode function. This ensures that Arrow files compressed with LZ4 can be correctly read in other languages. Initially, I considered comparing compressed and decompressed buffers, but due to optional metadata flags, this might not be reliable. Instead, I validate that the encode function generates a correct metadata header. I'm unsure if similar validation is needed for decode since users should notice if their data decompresses incorrectly.
| class ZstdValidator implements CompressionValidator { | ||
| // private readonly ZSTD_MAGIC = new Uint8Array([40, 181, 47, 253]); | ||
| isValidCodecEncode(_: Codec): boolean { | ||
| console.warn('ZSTD encode validator is not implemented yet.'); | ||
| return true; | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For ZSTD, I need to research how its metadata is structured and whether different formats exist (similar to LZ4's raw vs. framed formats). This will help determine if additional validation is necessary.
|
@trxcllnt Before finalizing the PR, I'd like to add proper tests. Could you advise:
If there are specific test patterns or files I should follow as reference? |
|
@Djjanks Since compression mode is an option on the reader and writer, the easiest way to integrate is probably to add/update the tests in The It looks like the RecordBatchFileWriter tests need to be updated to accept writer options, and both will need to also pass the compression option to the reader, but that should be straightforward. |
090c28a to
0ca03f2
Compare
Djjanks
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@trxcllnt Thanks for the testing guidance. I've implemented stream-writer and file-writer tests successfully. But face some problems:
- dictionary batch compression logic
- import zstd libs
Can you give your advice on the above points?
src/ipc/writer.ts
Outdated
| if ((padding = ((size + 7) & ~7) - size) > 0) { | ||
| this._writePadding(padding); | ||
| } | ||
| protected _writeBodyBuffers(buffers: ArrayBufferView[], batchType: "record" | "dictionary" = "record") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to Arrow format documentation, only record batches are compressed, not dictionary batches. Is this correct? I add attribute batchType to aviod bufGroupSize compression logic. Have I understood this correctly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that's correct, where do you read that from that documentation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The compression section of the documentation focuses on record batches, but it doesn't specifically mention that dictionary batches should also be compressed. However, I agree that, logically, dictionary compression should be included.
| import { Codec, compressionRegistry } from 'apache-arrow/ipc/compression/registry'; | ||
| import * as lz4js from 'lz4js'; | ||
|
|
||
| export async function registerCompressionCodecs(): Promise<void> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've implemented ZSTD compression with async initialization since most popular libraries require WASM/Node.js. I used dynamic import for ZSTD to avoid bundling issues. Duplicated the registration logic from stream writer because separating it into a shared module caused Jest import errors. May by somebody knows more elegant way to import zstd?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method worked for us as well - if it helps, I needed to use @oneidentity/zstd-js instead since zstd-codec wouldn't bundle correctly in our code base
|
We would love to see lz4 support in arrow-js. @westonpace @trxcllnt any chance you could give this another review? |
| } | ||
| protected _loadDictionaryBatch(header: metadata.DictionaryBatch, body: any) { | ||
|
|
||
| protected _loadDictionaryBatch(header: metadata.DictionaryBatch, body: Uint8Array) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've been trying this PR at my company using zstd encoding, and can confirm that dictionary batches are in fact compressed as well.
I needed to add the following lines for the zstd decompression to work fully for dictionary vectors:
let data: Data<any>[];
if (header.data.compression != null) {
const codec = compressionRegistry.get(header.data.compression.type);
if (codec?.decode && typeof codec.decode === 'function') {
const { decommpressedBody, buffers } = this._decompressBuffers(header.data, body, codec);
data = this._loadCompressedVectors(header.data, decommpressedBody, [type]);
header = new metadata.DictionaryBatch(new metadata.RecordBatch(
header.data.length,
header.data.nodes,
buffers,
null
), id, isDelta)
} else {
throw new Error('Dictionary batch is compressed but codec not found');
}
} else {
data = this._loadVectors(header.data, body, [type]);
}
otherwise this PR has been working great as-is at scale for us!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you! I have made a new commit with the correct compression and decompression dictionary functionality. Previously, compression did not work on dictionary batches. Could you please try it in your project?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Works great, thanks so much! Note that I'm only able to test the decompression, since we do not use the compression paths in our codebase (we use https://arrow.apache.org/java/18.2.0/reference/org.apache.arrow.vector/org/apache/arrow/vector/ipc/ArrowStreamWriter.html to send compressed data to the frontend)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, now compression works too. Thank you for testing! In my work we use only lz4 decompression
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How can i use this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I’m using this in a web application. Here’s how you can try it out in your own project:
1. Clone the fork and switch to the feature branch:
git clone https://github.com/Djjanks/arrow-js.git
cd arrow-js
git checkout feature/arrow-compression2. Build the package (Linux or WSL recommended):
yarn install
yarn build
3. Link the library locally (assuming you use npm):
cd targets/apache-arrow/
npm link
4. Link it in your project:
npm uninstall apache-arrow
npm link apache-arrow
5. Register the codec you want to use.
For example, with LZ4 (see tests or the PR for more examples):
import { Codec, compressionRegistry } from 'apache-arrow';
import * as lz4 from 'lz4js';
const lz4Codec: Codec = {
encode(data: Uint8Array): Uint8Array { return lz4js.compress(data) },
decode(data: Uint8Array): Uint8Array { return lz4js.decompress(data) }
};
compressionRegistry.set(CompressionType.LZ4_FRAME, lz4Codec);It’s not the most convenient setup, but it’s enough to experiment with compression support right now.
For a cleaner workflow, it’s better to wait until the PR is merged into the main repo.
0ca03f2 to
0f1a11b
Compare
…o dictionary reader
|
@trxcllnt
|
| Uint32, | ||
| Vector | ||
| } from 'apache-arrow'; | ||
| import { Codec, compressionRegistry } from 'apache-arrow/ipc/compression/registry'; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these not exported in the top-level export? It seems like they should be, since they'd allow users to register their own implementations?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, thanks! Codec wasn’t exported at the top level before. I’ve fixed that so now both compressionRegistry and Codec are available directly from the main package exports.
|
Super exciting! Any chance we could see an arrow-js release to make this more widely available? |
|
Let's discuss it in #283. |
|
I think I understand this after reading through the code, but it would be great to have some documentation included in the repo about how to use this feature. Documentation is not a strength of Arrow JS.
|
|
Could you open a new issue for documentation? |
Rationale for this change
This change introduces support for reading compressed Arrow IPC streams in JavaScript. The primary motivation is the need to read Arrow IPC Stream in the browser when they are transmitted over the network in a compressed format to reduce network load.
Several reasons support this enhancement:
What changes are included in this PR?
Additional notes:
Not all JavaScript LZ4 libraries are compatible with the Arrow IPC format. For example:
This can result in silent or cryptic errors. To improve developer experience, we could:
After decompressing the buffers, new BufferRegion entries are calculated to match the uncompressed data layout. A new metadata.RecordBatch is constructed with the updated buffer regions and passed into _loadVectors().
This introduces a mutation-like pattern that may break assumptions in the current design. However, it's necessary because:
When reconstructing the metadata, the compression field is explicitly set to null, since the data is already decompressed in memory.
This decision is somewhat debatable — feedback is welcome on whether it's better to retain the original compression metadata or to reflect the current state of the buffer (uncompressed). The current implementation assumes the latter.
Are these changes tested?
Are there any user-facing changes?
Yes, Arrow JS users can now read compressed IPC stream, assuming they register an appropriate codec using compressionRegistry.set().
Example:
This change does not affect writing or serialization.
This PR includes breaking changes to public APIs.
No. The change adds functionality but does not modify any existing API behavior.
This PR contains a "Critical Fix".
No. This is a new feature, not a critical fix.
Checklist
yarn test)yarn build)Closes #109.