From 4ec20253c24d43903f334b32fa000b017a1fd011 Mon Sep 17 00:00:00 2001 From: Nikolai Vavilov Date: Tue, 28 Jan 2014 19:35:51 +0200 Subject: [PATCH 1/6] zlib: add sync versions for convenience methods --- lib/zlib.js | 75 ++++++++++++++++++-- src/node_zlib.cc | 53 +++++++++++++- test/simple/test-zlib-convenience-methods.js | 16 ++++- 3 files changed, 135 insertions(+), 9 deletions(-) diff --git a/lib/zlib.js b/lib/zlib.js index 3341588e45b9..4b0e6cca6068 100644 --- a/lib/zlib.js +++ b/lib/zlib.js @@ -112,7 +112,7 @@ exports.deflate = function(buffer, opts, callback) { callback = opts; opts = {}; } - zlibBuffer(new Deflate(opts), buffer, callback); + return zlibBuffer(new Deflate(opts), buffer, callback); }; exports.gzip = function(buffer, opts, callback) { @@ -120,7 +120,7 @@ exports.gzip = function(buffer, opts, callback) { callback = opts; opts = {}; } - zlibBuffer(new Gzip(opts), buffer, callback); + return zlibBuffer(new Gzip(opts), buffer, callback); }; exports.deflateRaw = function(buffer, opts, callback) { @@ -128,7 +128,7 @@ exports.deflateRaw = function(buffer, opts, callback) { callback = opts; opts = {}; } - zlibBuffer(new DeflateRaw(opts), buffer, callback); + return zlibBuffer(new DeflateRaw(opts), buffer, callback); }; exports.unzip = function(buffer, opts, callback) { @@ -136,7 +136,7 @@ exports.unzip = function(buffer, opts, callback) { callback = opts; opts = {}; } - zlibBuffer(new Unzip(opts), buffer, callback); + return zlibBuffer(new Unzip(opts), buffer, callback); }; exports.inflate = function(buffer, opts, callback) { @@ -144,7 +144,7 @@ exports.inflate = function(buffer, opts, callback) { callback = opts; opts = {}; } - zlibBuffer(new Inflate(opts), buffer, callback); + return zlibBuffer(new Inflate(opts), buffer, callback); }; exports.gunzip = function(buffer, opts, callback) { @@ -152,7 +152,7 @@ exports.gunzip = function(buffer, opts, callback) { callback = opts; opts = {}; } - zlibBuffer(new Gunzip(opts), buffer, callback); + return zlibBuffer(new Gunzip(opts), buffer, callback); }; exports.inflateRaw = function(buffer, opts, callback) { @@ -160,13 +160,74 @@ exports.inflateRaw = function(buffer, opts, callback) { callback = opts; opts = {}; } - zlibBuffer(new InflateRaw(opts), buffer, callback); + return zlibBuffer(new InflateRaw(opts), buffer, callback); }; function zlibBuffer(engine, buffer, callback) { var buffers = []; var nread = 0; + if (!util.isFunction(callback)) { + if (util.isString(buffer)) + buffer = new Buffer(buffer); + if (!util.isBuffer(buffer)) + throw new TypeError('Not a string or buffer'); + + var flushFlag = binding.Z_FINISH; + + var availInBefore = buffer && buffer.length; + var availOutBefore = engine._chunkSize - engine._offset; + var inOff = 0; + + var error; + engine.on('error', function(er) { + error = er; + }); + + do { + var result = engine._binding.writeSync(flushFlag, + buffer, // in + inOff, // in_off + availInBefore, // in_len + engine._buffer, // out + engine._offset, //out_off + availOutBefore); // out_len + + if (engine._hadError) { + throw error; + } + + var availInAfter = result[0]; + var availOutAfter = result[1]; + + var have = availOutBefore - availOutAfter; + assert(have >= 0, 'have should not go down'); + + if (have > 0) { + var out = engine._buffer.slice(engine._offset, engine._offset + have); + engine._offset += have; + buffers.push(out); + nread += out.length; + } + + // exhausted the output buffer, or used all the input create a new one. + if (availOutAfter === 0 || engine._offset >= engine._chunkSize) { + availOutBefore = engine._chunkSize; + engine._offset = 0; + engine._buffer = new Buffer(engine._chunkSize); + } + + inOff += (availInBefore - availInAfter); + availInBefore = availInAfter; + } while (availOutAfter === 0); + + var buf = Buffer.concat(buffers, nread); + buffers = []; + engine.close(); + + return buf; + } + engine.on('error', onError); engine.on('end', onEnd); diff --git a/src/node_zlib.cc b/src/node_zlib.cc index 16372bd8118c..6f53176df579 100644 --- a/src/node_zlib.cc +++ b/src/node_zlib.cc @@ -39,6 +39,7 @@ namespace node { +using v8::Array; using v8::Context; using v8::FunctionCallbackInfo; using v8::FunctionTemplate; @@ -125,6 +126,7 @@ class ZCtx : public AsyncWrap { // write(flush, in, in_off, in_len, out, out_off, out_len) + template static void Write(const FunctionCallbackInfo& args) { HandleScope scope(node_isolate); assert(args.Length() == 7); @@ -190,6 +192,54 @@ class ZCtx : public AsyncWrap { // set this so that later on, I can easily tell how much was written. ctx->chunk_size_ = out_len; + if (!async) { + // sync version + + ZCtx::Process(work_req); + + Environment* env = ctx->env(); + + HandleScope handle_scope(env->isolate()); + Context::Scope context_scope(env->context()); + + // Acceptable error states depend on the type of zlib stream. + switch (ctx->err_) { + case Z_OK: + case Z_STREAM_END: + case Z_BUF_ERROR: + // normal statuses, not fatal + break; + case Z_NEED_DICT: + if (ctx->dictionary_ == NULL) { + ZCtx::Error(ctx, "Missing dictionary"); + } + else { + ZCtx::Error(ctx, "Bad dictionary"); + } + return; + default: + // something else. + ZCtx::Error(ctx, "Zlib error"); + return; + } + + Local avail_out = Integer::New(ctx->strm_.avail_out, node_isolate); + Local avail_in = Integer::New(ctx->strm_.avail_in, node_isolate); + + ctx->write_in_progress_ = false; + + Local result = Array::New(2); + result->Set(0, avail_in); + result->Set(1, avail_out); + args.GetReturnValue().Set(result); + + ctx->Unref(); + + return; + } + + // async version + uv_queue_work(ctx->env()->event_loop(), work_req, ZCtx::Process, @@ -556,7 +606,8 @@ void InitZlib(Handle target, z->InstanceTemplate()->SetInternalFieldCount(1); - NODE_SET_PROTOTYPE_METHOD(z, "write", ZCtx::Write); + NODE_SET_PROTOTYPE_METHOD(z, "write", ZCtx::Write); + NODE_SET_PROTOTYPE_METHOD(z, "writeSync", ZCtx::Write); NODE_SET_PROTOTYPE_METHOD(z, "init", ZCtx::Init); NODE_SET_PROTOTYPE_METHOD(z, "close", ZCtx::Close); NODE_SET_PROTOTYPE_METHOD(z, "params", ZCtx::Params); diff --git a/test/simple/test-zlib-convenience-methods.js b/test/simple/test-zlib-convenience-methods.js index 43794fbdd8c6..43b68d15ba6d 100644 --- a/test/simple/test-zlib-convenience-methods.js +++ b/test/simple/test-zlib-convenience-methods.js @@ -58,8 +58,22 @@ var opts = { }); }); + var result = zlib[method[0]](expect, opts); + result = zlib[method[1]](result, opts); + assert.equal(result, expect, + 'Should get original string after ' + + method[0] + '/' + method[1] + ' with options.'); + hadRun++; + + result = zlib[method[0]](expect); + result = zlib[method[1]](result); + assert.equal(result, expect, + 'Should get original string after ' + + method[0] + '/' + method[1] + ' without options.'); + hadRun++; + }); process.on('exit', function() { - assert.equal(hadRun, 8, 'expect 8 compressions'); + assert.equal(hadRun, 16, 'expect 16 compressions'); }); From 3f9747ba8325c0ec26b73f4f3dbbd557eee23ad7 Mon Sep 17 00:00:00 2001 From: Nikolai Vavilov Date: Wed, 29 Jan 2014 16:07:21 +0200 Subject: [PATCH 2/6] De-dupe After and make a separate method --- src/node_zlib.cc | 103 ++++++++++++++++++++--------------------------- 1 file changed, 44 insertions(+), 59 deletions(-) diff --git a/src/node_zlib.cc b/src/node_zlib.cc index 6f53176df579..5afb7d62ddf6 100644 --- a/src/node_zlib.cc +++ b/src/node_zlib.cc @@ -195,46 +195,9 @@ class ZCtx : public AsyncWrap { if (!async) { // sync version - ZCtx::Process(work_req); - - Environment* env = ctx->env(); - - HandleScope handle_scope(env->isolate()); - Context::Scope context_scope(env->context()); - - // Acceptable error states depend on the type of zlib stream. - switch (ctx->err_) { - case Z_OK: - case Z_STREAM_END: - case Z_BUF_ERROR: - // normal statuses, not fatal - break; - case Z_NEED_DICT: - if (ctx->dictionary_ == NULL) { - ZCtx::Error(ctx, "Missing dictionary"); - } - else { - ZCtx::Error(ctx, "Bad dictionary"); - } - return; - default: - // something else. - ZCtx::Error(ctx, "Zlib error"); - return; - } - - Local avail_out = Integer::New(ctx->strm_.avail_out, node_isolate); - Local avail_in = Integer::New(ctx->strm_.avail_in, node_isolate); - - ctx->write_in_progress_ = false; - - Local result = Array::New(2); - result->Set(0, avail_in); - result->Set(1, avail_out); - args.GetReturnValue().Set(result); - - ctx->Unref(); - + Process(work_req); + if (IsGood(ctx)) + AfterSync(ctx, args); return; } @@ -248,6 +211,20 @@ class ZCtx : public AsyncWrap { args.GetReturnValue().Set(ctx->object()); } + static void AfterSync(ZCtx* ctx, const FunctionCallbackInfo& args) { + Local avail_out = Integer::New(ctx->strm_.avail_out, node_isolate); + Local avail_in = Integer::New(ctx->strm_.avail_in, node_isolate); + + ctx->write_in_progress_ = false; + + Local result = Array::New(2); + result->Set(0, avail_in); + result->Set(1, avail_out); + args.GetReturnValue().Set(result); + + ctx->Unref(); + } + // thread pool! // This function may be called multiple times on the uv_work pool @@ -299,6 +276,31 @@ class ZCtx : public AsyncWrap { // or shift the queue and call Process. } + static bool IsGood(ZCtx* ctx) { + // Acceptable error states depend on the type of zlib stream. + switch (ctx->err_) { + case Z_OK: + case Z_STREAM_END: + case Z_BUF_ERROR: + // normal statuses, not fatal + break; + case Z_NEED_DICT: + if (ctx->dictionary_ == NULL) { + ZCtx::Error(ctx, "Missing dictionary"); + } + else { + ZCtx::Error(ctx, "Bad dictionary"); + } + return false; + default: + // something else. + ZCtx::Error(ctx, "Zlib error"); + return false; + } + + return true; + } + // v8 land! static void After(uv_work_t* work_req, int status) { assert(status == 0); @@ -309,25 +311,8 @@ class ZCtx : public AsyncWrap { HandleScope handle_scope(env->isolate()); Context::Scope context_scope(env->context()); - // Acceptable error states depend on the type of zlib stream. - switch (ctx->err_) { - case Z_OK: - case Z_STREAM_END: - case Z_BUF_ERROR: - // normal statuses, not fatal - break; - case Z_NEED_DICT: - if (ctx->dictionary_ == NULL) { - ZCtx::Error(ctx, "Missing dictionary"); - } else { - ZCtx::Error(ctx, "Bad dictionary"); - } - return; - default: - // something else. - ZCtx::Error(ctx, "Zlib error"); - return; - } + if (!IsGood(ctx)) + return; Local avail_out = Integer::New(ctx->strm_.avail_out, node_isolate); Local avail_in = Integer::New(ctx->strm_.avail_in, node_isolate); From 55563175d1abf2bf90346ecbb4f5fa19674b5bca Mon Sep 17 00:00:00 2001 From: Nikolai Vavilov Date: Wed, 29 Jan 2014 16:16:21 +0100 Subject: [PATCH 3/6] Fix docs --- doc/api/zlib.markdown | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/doc/api/zlib.markdown b/doc/api/zlib.markdown index 59d73554d5dc..8ade5a8205cd 100644 --- a/doc/api/zlib.markdown +++ b/doc/api/zlib.markdown @@ -195,34 +195,36 @@ the header. All of these take a string or buffer as the first argument, an optional second -argument to supply options to the zlib classes and will call the supplied -callback with `callback(error, result)`. +argument to supply options to the zlib classes and an optional callback. If a +callback is supplied, they will call it asynchronously with +`callback(error, result)`, otherwise they will return the result or throw the +error synchronously. -## zlib.deflate(buf, [options], callback) +## zlib.deflate(buf, [options], [callback]) Compress a string with Deflate. -## zlib.deflateRaw(buf, [options], callback) +## zlib.deflateRaw(buf, [options], [callback]) Compress a string with DeflateRaw. -## zlib.gzip(buf, [options], callback) +## zlib.gzip(buf, [options], [callback]) Compress a string with Gzip. -## zlib.gunzip(buf, [options], callback) +## zlib.gunzip(buf, [options], [callback]) Decompress a raw Buffer with Gunzip. -## zlib.inflate(buf, [options], callback) +## zlib.inflate(buf, [options], [callback]) Decompress a raw Buffer with Inflate. -## zlib.inflateRaw(buf, [options], callback) +## zlib.inflateRaw(buf, [options], [callback]) Decompress a raw Buffer with InflateRaw. -## zlib.unzip(buf, [options], callback) +## zlib.unzip(buf, [options], [callback]) Decompress a raw Buffer with Unzip. From c027af3885dac58912fc0b2a8245dff8db0ecbc7 Mon Sep 17 00:00:00 2001 From: Nikolai Vavilov Date: Wed, 29 Jan 2014 18:38:53 +0200 Subject: [PATCH 4/6] Correct last nits --- src/node_zlib.cc | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/src/node_zlib.cc b/src/node_zlib.cc index 5afb7d62ddf6..16dc2c025d2a 100644 --- a/src/node_zlib.cc +++ b/src/node_zlib.cc @@ -194,15 +194,13 @@ class ZCtx : public AsyncWrap { if (!async) { // sync version - Process(work_req); - if (IsGood(ctx)) + if (CheckError(ctx)) AfterSync(ctx, args); return; } // async version - uv_queue_work(ctx->env()->event_loop(), work_req, ZCtx::Process, @@ -276,7 +274,7 @@ class ZCtx : public AsyncWrap { // or shift the queue and call Process. } - static bool IsGood(ZCtx* ctx) { + static bool CheckError(ZCtx* ctx) { // Acceptable error states depend on the type of zlib stream. switch (ctx->err_) { case Z_OK: @@ -285,12 +283,10 @@ class ZCtx : public AsyncWrap { // normal statuses, not fatal break; case Z_NEED_DICT: - if (ctx->dictionary_ == NULL) { + if (ctx->dictionary_ == NULL) ZCtx::Error(ctx, "Missing dictionary"); - } - else { + else ZCtx::Error(ctx, "Bad dictionary"); - } return false; default: // something else. @@ -311,7 +307,7 @@ class ZCtx : public AsyncWrap { HandleScope handle_scope(env->isolate()); Context::Scope context_scope(env->context()); - if (!IsGood(ctx)) + if (!CheckError(ctx)) return; Local avail_out = Integer::New(ctx->strm_.avail_out, node_isolate); From 713123bdcb694db58859f67faeb820e1a9063d31 Mon Sep 17 00:00:00 2001 From: Nikolai Vavilov Date: Thu, 30 Jan 2014 01:16:34 +0100 Subject: [PATCH 5/6] de-dupe writing function --- lib/zlib.js | 144 +++++++++++++++++++++++----------------------------- 1 file changed, 63 insertions(+), 81 deletions(-) diff --git a/lib/zlib.js b/lib/zlib.js index 4b0e6cca6068..f1435e2ef44f 100644 --- a/lib/zlib.js +++ b/lib/zlib.js @@ -164,70 +164,13 @@ exports.inflateRaw = function(buffer, opts, callback) { }; function zlibBuffer(engine, buffer, callback) { - var buffers = []; - var nread = 0; - if (!util.isFunction(callback)) { - if (util.isString(buffer)) - buffer = new Buffer(buffer); - if (!util.isBuffer(buffer)) - throw new TypeError('Not a string or buffer'); - - var flushFlag = binding.Z_FINISH; - - var availInBefore = buffer && buffer.length; - var availOutBefore = engine._chunkSize - engine._offset; - var inOff = 0; - - var error; - engine.on('error', function(er) { - error = er; - }); - - do { - var result = engine._binding.writeSync(flushFlag, - buffer, // in - inOff, // in_off - availInBefore, // in_len - engine._buffer, // out - engine._offset, //out_off - availOutBefore); // out_len - - if (engine._hadError) { - throw error; - } - - var availInAfter = result[0]; - var availOutAfter = result[1]; - - var have = availOutBefore - availOutAfter; - assert(have >= 0, 'have should not go down'); - - if (have > 0) { - var out = engine._buffer.slice(engine._offset, engine._offset + have); - engine._offset += have; - buffers.push(out); - nread += out.length; - } - - // exhausted the output buffer, or used all the input create a new one. - if (availOutAfter === 0 || engine._offset >= engine._chunkSize) { - availOutBefore = engine._chunkSize; - engine._offset = 0; - engine._buffer = new Buffer(engine._chunkSize); - } - - inOff += (availInBefore - availInAfter); - availInBefore = availInAfter; - } while (availOutAfter === 0); - - var buf = Buffer.concat(buffers, nread); - buffers = []; - engine.close(); - - return buf; + return zlibBufferSync(engine, buffer, callback); } + var buffers = []; + var nread = 0; + engine.on('error', onError); engine.on('end', onEnd); @@ -257,6 +200,39 @@ function zlibBuffer(engine, buffer, callback) { } } +function zlibBufferSync(engine, buffer, callback) { + var buffers = []; + var nread = 0; + + if (util.isString(buffer)) + buffer = new Buffer(buffer); + if (!util.isBuffer(buffer)) + throw new TypeError('Not a string or buffer'); + + var flushFlag = binding.Z_FINISH; + + var error; + engine.on('error', function(er) { + error = er; + }); + + engine._processChunk(buffer, function(binding, args, callback) { + callback.apply(null, binding.writeSync.apply(binding, args)); + }, function(out) { + buffers.push(out); + nread += out.length; + }, flushFlag, function() {}); + + if (engine._hadError) { + throw error; + } + + var buf = Buffer.concat(buffers, nread); + buffers = []; + engine.close(); + + return buf; +} // generic zlib // minimal 2-byte header @@ -514,22 +490,29 @@ Zlib.prototype._transform = function(chunk, encoding, cb) { } } + var self = this; + this._processChunk(chunk, function(binding, args, callback) { + var req = binding.write.apply(binding, args); + req.callback = callback; + }, self.push.bind(self), flushFlag, cb); +}; + +Zlib.prototype._processChunk = function(chunk, write, push, flushFlag, cb) { var availInBefore = chunk && chunk.length; var availOutBefore = this._chunkSize - this._offset; var inOff = 0; - var req = this._binding.write(flushFlag, - chunk, // in - inOff, // in_off - availInBefore, // in_len - this._buffer, // out - this._offset, //out_off - availOutBefore); // out_len + var self = this; - req.buffer = chunk; - req.callback = callback; + write(this._binding, [flushFlag, + chunk, // in + inOff, // in_off + availInBefore, // in_len + this._buffer, // out + this._offset, //out_off + availOutBefore], // out_len + callback); - var self = this; function callback(availInAfter, availOutAfter) { if (self._hadError) return; @@ -541,7 +524,7 @@ Zlib.prototype._transform = function(chunk, encoding, cb) { var out = self._buffer.slice(self._offset, self._offset + have); self._offset += have; // serve some output to the consumer. - self.push(out); + push(out); } // exhausted the output buffer, or used all the input create a new one. @@ -559,15 +542,14 @@ Zlib.prototype._transform = function(chunk, encoding, cb) { inOff += (availInBefore - availInAfter); availInBefore = availInAfter; - var newReq = self._binding.write(flushFlag, - chunk, - inOff, - availInBefore, - self._buffer, - self._offset, - self._chunkSize); - newReq.callback = callback; // this same function - newReq.buffer = chunk; + write(self._binding, [flushFlag, + chunk, + inOff, + availInBefore, + self._buffer, + self._offset, + self._chunkSize], + callback); // this same function return; } From 168d7ec89fb50760a30fc3327422629b390cd761 Mon Sep 17 00:00:00 2001 From: Nikolai Vavilov Date: Fri, 31 Jan 2014 23:20:34 +0100 Subject: [PATCH 6/6] remove callbacks --- lib/zlib.js | 111 ++++++++++++++++++++++++++++++---------------------- 1 file changed, 65 insertions(+), 46 deletions(-) diff --git a/lib/zlib.js b/lib/zlib.js index f1435e2ef44f..68f3d59675b7 100644 --- a/lib/zlib.js +++ b/lib/zlib.js @@ -201,9 +201,6 @@ function zlibBuffer(engine, buffer, callback) { } function zlibBufferSync(engine, buffer, callback) { - var buffers = []; - var nread = 0; - if (util.isString(buffer)) buffer = new Buffer(buffer); if (!util.isBuffer(buffer)) @@ -211,27 +208,7 @@ function zlibBufferSync(engine, buffer, callback) { var flushFlag = binding.Z_FINISH; - var error; - engine.on('error', function(er) { - error = er; - }); - - engine._processChunk(buffer, function(binding, args, callback) { - callback.apply(null, binding.writeSync.apply(binding, args)); - }, function(out) { - buffers.push(out); - nread += out.length; - }, flushFlag, function() {}); - - if (engine._hadError) { - throw error; - } - - var buf = Buffer.concat(buffers, nread); - buffers = []; - engine.close(); - - return buf; + return engine._processChunk(buffer, flushFlag); } // generic zlib @@ -491,27 +468,57 @@ Zlib.prototype._transform = function(chunk, encoding, cb) { } var self = this; - this._processChunk(chunk, function(binding, args, callback) { - var req = binding.write.apply(binding, args); - req.callback = callback; - }, self.push.bind(self), flushFlag, cb); + this._processChunk(chunk, flushFlag, cb); }; -Zlib.prototype._processChunk = function(chunk, write, push, flushFlag, cb) { +Zlib.prototype._processChunk = function(chunk, flushFlag, cb) { var availInBefore = chunk && chunk.length; var availOutBefore = this._chunkSize - this._offset; var inOff = 0; var self = this; - write(this._binding, [flushFlag, - chunk, // in - inOff, // in_off - availInBefore, // in_len - this._buffer, // out - this._offset, //out_off - availOutBefore], // out_len - callback); + var async = util.isFunction(cb); + + if (!async) { + var buffers = []; + var nread = 0; + + var error; + this.on('error', function(er) { + error = er; + }); + + do { + var res = this._binding.writeSync(flushFlag, + chunk, // in + inOff, // in_off + availInBefore, // in_len + this._buffer, // out + this._offset, //out_off + availOutBefore); // out_len + } while (!this._hadError && callback(res[0], res[1])); + + if (this._hadError) { + throw error; + } + + var buf = Buffer.concat(buffers, nread); + this.close(); + + return buf; + } + + var req = this._binding.write(flushFlag, + chunk, // in + inOff, // in_off + availInBefore, // in_len + this._buffer, // out + this._offset, //out_off + availOutBefore); // out_len + + req.buffer = chunk; + req.callback = callback; function callback(availInAfter, availOutAfter) { if (self._hadError) @@ -524,7 +531,12 @@ Zlib.prototype._processChunk = function(chunk, write, push, flushFlag, cb) { var out = self._buffer.slice(self._offset, self._offset + have); self._offset += have; // serve some output to the consumer. - push(out); + if (async) + self.push(out); + else { + buffers.push(out); + nread += out.length; + } } // exhausted the output buffer, or used all the input create a new one. @@ -542,17 +554,24 @@ Zlib.prototype._processChunk = function(chunk, write, push, flushFlag, cb) { inOff += (availInBefore - availInAfter); availInBefore = availInAfter; - write(self._binding, [flushFlag, - chunk, - inOff, - availInBefore, - self._buffer, - self._offset, - self._chunkSize], - callback); // this same function + if (!async) + return true; + + var newReq = self._binding.write(flushFlag, + chunk, + inOff, + availInBefore, + self._buffer, + self._offset, + self._chunkSize); + newReq.callback = callback; // this same function + newReq.buffer = chunk; return; } + if (!async) + return false; + // finished with the chunk. cb(); }