Skip to content

Commit c324c8e

Browse files
committed
Merge pull request #755 from stephenplusplus/spp--datastore-get
datastore: support streams with request.get
2 parents b18e5e2 + 3896bd6 commit c324c8e

File tree

3 files changed

+317
-80
lines changed

3 files changed

+317
-80
lines changed

lib/datastore/request.js

Lines changed: 78 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,13 @@
2020

2121
'use strict';
2222

23+
var isStreamEnded = require('is-stream-ended');
2324
var request = require('request').defaults({
2425
pool: {
2526
maxSockets: Infinity
2627
}
2728
});
29+
var through = require('through2');
2830

2931
/**
3032
* @type {module:datastore/entity}
@@ -92,11 +94,13 @@ function DatastoreRequest() {}
9294
* transaction. Get operations require a valid key to retrieve the
9395
* key-identified entity from Datastore.
9496
*
97+
* @throws {Error} If at least one Key object is not provided.
98+
*
9599
* @param {Key|Key[]} keys - Datastore key object(s).
96100
* @param {function} callback - The callback function.
97101
* @param {?error} callback.err - An error returned while making this request
98102
* @param {module:datastore/entity|module:datastore/entity[]} callback.entity -
99-
* Will return either a single Entity or a list of Entities
103+
* Will return either a single Entity or a list of Entities.
100104
* @param {object} callback.apiResponse - The full API response.
101105
*
102106
* @example
@@ -105,57 +109,102 @@ function DatastoreRequest() {}
105109
* // your use, whether that be a Dataset or Transaction object.
106110
* //-
107111
*
112+
* //-
108113
* // Get a single entity.
114+
* //-
109115
* var key = dataset.key(['Company', 123]);
116+
*
110117
* transaction.get(key, function(err, entity, apiResponse) {});
111118
*
112-
* // Get multiple entities at once.
113-
* transaction.get([
119+
* //-
120+
* // Get multiple entities at once with a callback.
121+
* //-
122+
* var keys = [
114123
* dataset.key(['Company', 123]),
115124
* dataset.key(['Product', 'Computer'])
116-
* ], function(err, entities, apiResponse) {});
125+
* ];
126+
*
127+
* transaction.get(keys, function(err, entities, apiResponse) {});
128+
*
129+
* //-
130+
* // Or, get the entities as a readable object stream.
131+
* //-
132+
* transaction.get(keys)
133+
* .on('error', function(err, apiResponse) {})
134+
* .on('data', function(entity) {
135+
* // entity is an entity object.
136+
* })
137+
* .on('end', function() {
138+
* // All entities retrieved.
139+
* });
117140
*/
118141
DatastoreRequest.prototype.get = function(keys, callback) {
119-
var that = this;
142+
var self = this;
120143

121-
var isMultipleRequest = Array.isArray(keys);
122-
keys = isMultipleRequest ? keys : [keys];
144+
var isStreamMode = !callback;
145+
var stream;
123146

124-
callback = callback || util.noop;
147+
if (isStreamMode) {
148+
stream = through.obj();
149+
}
125150

126-
var req = {
127-
key: keys.map(entity.keyToKeyProto)
151+
var isSingleLookup = !util.is(keys, 'array');
152+
keys = util.arrayize(keys).map(entity.keyToKeyProto);
153+
154+
if (keys.length === 0) {
155+
throw new Error('At least one Key object is required.');
156+
}
157+
158+
var request = {
159+
key: keys
128160
};
129161

130-
this.makeReq_('lookup', req, function(err, resp) {
162+
var entities = [];
163+
this.makeReq_('lookup', request, onApiResponse);
164+
165+
function onApiResponse(err, resp) {
131166
if (err) {
132-
callback(err, null, resp);
167+
if (isStreamMode) {
168+
stream.emit('error', err, resp);
169+
stream.end();
170+
} else {
171+
callback(err, null, resp);
172+
}
133173
return;
134174
}
135175

136-
var found = entity.formatArray(resp.found);
137-
138-
if (isMultipleRequest && resp.deferred && resp.deferred.length) {
139-
// There may be more results. Call `.get` again, and append the results.
140-
that.get(
141-
resp.deferred.map(entity.keyFromKeyProto), function(err, entities) {
142-
if (err) {
143-
callback(err, null, resp);
144-
return;
145-
}
176+
var results = entity.formatArray(resp.found);
177+
var nextKeys = (resp.deferred || []).map(entity.keyFromKeyProto);
146178

147-
if (resp) {
148-
found = (found || []).concat(entities);
149-
}
179+
if (isStreamMode) {
180+
var result;
181+
while ((result = results.shift()) && !isStreamEnded(stream)) {
182+
stream.push(result);
183+
}
184+
} else {
185+
entities = entities.concat(results);
186+
}
150187

151-
callback(null, found, resp);
152-
});
188+
if (isStreamMode && isStreamEnded(stream)) {
189+
return;
190+
}
153191

192+
if (nextKeys.length > 0) {
193+
self.get(nextKeys, onApiResponse);
154194
return;
155195
}
156196

157-
callback(null, isMultipleRequest ? found : found[0], resp);
158-
});
197+
if (isStreamMode) {
198+
stream.push(null);
199+
stream.end();
200+
} else {
201+
callback(null, isSingleLookup ? entities[0] : entities, resp);
202+
}
203+
}
204+
205+
if (isStreamMode) {
206+
return stream;
207+
}
159208
};
160209

161210
/**

system-test/datastore.js

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,33 @@ describe('datastore', function() {
183183
});
184184
});
185185

186+
it('should get multiple entities in a stream', function(done) {
187+
var key1 = ds.key('Post');
188+
var key2 = ds.key('Post');
189+
190+
ds.save([
191+
{ key: key1, data: post },
192+
{ key: key2, data: post }
193+
], function(err) {
194+
assert.ifError(err);
195+
196+
var firstKey = ds.key(['Post', key1.path[1]]);
197+
var secondKey = ds.key(['Post', key2.path[1]]);
198+
199+
var numEntitiesEmitted = 0;
200+
201+
ds.get([firstKey, secondKey])
202+
.on('error', done)
203+
.on('data', function() {
204+
numEntitiesEmitted++;
205+
})
206+
.on('end', function() {
207+
assert.strictEqual(numEntitiesEmitted, 2);
208+
209+
ds.delete([firstKey, secondKey], done);
210+
});
211+
});
212+
});
186213
});
187214

188215
it('should save keys as a part of entity and query by key', function(done) {

0 commit comments

Comments
 (0)