Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 67 additions & 15 deletions lib/datastore/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
'use strict';

var https = require('https');
var streamEvents = require('stream-events');
var through = require('through2');

/**
* @type module:datastore/entity
Expand Down Expand Up @@ -295,11 +297,20 @@ DatastoreRequest.prototype.delete = function(keys, callback) {
/**
* Datastore allows you to query entities by kind, filter them by property
* filters, and sort them by a property name. Projection and pagination are also
* supported. If more results are available, a query to retrieve the next page
* is provided to the callback function.
* supported.
*
* If you provide a callback, the query is run, and the results are returned as
* the second argument to your callback. A third argument will also exist, which
* is the `endCursor` of the previously-run query. You can use this to extend
* the query you just ran to see if more results exist.
*
* You may also omit the callback to this function to trigger streaming mode.
*
* See below for examples of both approaches.
*
* @param {module:datastore/query} q - Query object.
* @param {function} callback - The callback function.
* @param {function=} callback - The callback function. If omitted, a readable
* stream instance is returned.
*
* @example
* //-
Expand All @@ -313,9 +324,18 @@ DatastoreRequest.prototype.delete = function(keys, callback) {
* var nextQuery = queryObject.start(endCursor);
* transaction.runQuery(nextQuery, function(err, entities, endCursor) {});
* });
*
* //-
* // If you omit the callback, runQuery will automatically call subsequent
* // queries until no results remain. Entity objects will be pushed as they are
* // found.
* //-
* transaction.runQuery(queryObject)
* .on('data', function (entity) {});
*/

This comment was marked as spam.

DatastoreRequest.prototype.runQuery = function(q, callback) {
callback = callback || util.noop;
var that = this;
var stream;

var req = {
read_options: {},
Expand All @@ -328,19 +348,51 @@ DatastoreRequest.prototype.runQuery = function(q, callback) {
};
}

this.makeReq_('runQuery', req, function(err, resp) {
if (err || !resp.batch || !resp.batch.entity_result) {
callback(err);
return;
}
if (!util.is(callback, 'function')) {
stream = streamEvents(through.obj());
stream.once('reading', runQuery);
return stream;
} else {
callback = callback || util.noop;

This comment was marked as spam.

This comment was marked as spam.

runQuery();
}

var cursor = '';
if (resp.batch.end_cursor) {
cursor = resp.batch.end_cursor.toBase64();
}
function runQuery() {
that.makeReq_('runQuery', req, function(err, resp) {
if (err) {
if (stream) {
stream.emit('error', err);
stream.end();
} else {
callback(err);
}
return;
}

callback(null, entity.formatArray(resp.batch.entity_result), cursor);
});
var entities = entity.formatArray(resp.batch.entity_result);

var cursor = '';
if (resp.batch.end_cursor) {
cursor = resp.batch.end_cursor.toBase64();
}

if (stream) {
if (cursor && entities.length > 0) {
entities.forEach(function (entity) {
stream.push(entity);
});

req.query = entity.queryToQueryProto(q.start(cursor).offset(0));

runQuery();
} else {
stream.end();
}
} else {
callback(null, entities, cursor);
}
});
}
};

/**
Expand Down
15 changes: 15 additions & 0 deletions regression/datastore.js
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,21 @@ describe('datastore', function() {
});
});

it('should run a query as a stream', function(done) {
var q = ds.createQuery('Character').hasAncestor(ancestor)
.limit(5);

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.


var resultsReturned = 0;

ds.runQuery(q)
.on('error', done)
.on('data', function() { resultsReturned++; })
.on('end', function() {
assert.equal(resultsReturned, characters.length);
done();
});
});

it('should filter queries with simple indexes', function(done) {
var q = ds.createQuery('Character').hasAncestor(ancestor)
.filter('appearances >=', 20);
Expand Down
76 changes: 62 additions & 14 deletions test/datastore/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ var https = require('https');
var mockRespGet = require('../testdata/response_get.json');
var pb = require('../../lib/datastore/pb.js');
var Query = require('../../lib/datastore/query.js');
var Stream = require('stream');
var util = require('../../lib/common/util.js');

var httpsRequestOverride = util.noop;
Expand Down Expand Up @@ -278,8 +279,7 @@ describe('Request', function() {
entity_result: mockRespGet.found,
end_cursor: new ByteBuffer().writeIString('cursor').flip()
}
},
withoutResults: mockRespGet
}

This comment was marked as spam.

};

beforeEach(function() {
Expand All @@ -298,18 +298,6 @@ describe('Request', function() {
assert.equal(err, error);
});
});

it('should handle missing results error', function() {
request.makeReq_ = function(method, req, callback) {
assert.equal(method, 'runQuery');
callback(null, mockResponse.withoutResults);
};

request.runQuery(query, function(err, entities) {
assert.strictEqual(err, null);
assert.strictEqual(entities, undefined);
});
});
});

it('should execute callback with results', function() {
Expand Down Expand Up @@ -354,6 +342,66 @@ describe('Request', function() {
done();
});
});

describe('streams', function() {
it('should be a stream if a callback is omitted', function() {
assert(request.runQuery(query) instanceof Stream);
});

it('should run the query after being read from', function(done) {
request.makeReq_ = function() {
done();
};

request.runQuery(query).emit('reading');
});

it('should continuosly run until there are no results', function(done) {
var run = 0;
var timesToRun = 2;

request.makeReq_ = function(method, req, callback) {
run++;

if (run < timesToRun) {
callback(null, mockResponse.withResultsAndEndCursor);
} else {
var lastEndCursor =
mockResponse.withResultsAndEndCursor.batch.end_cursor.toBase64();
lastEndCursor = new Buffer(lastEndCursor, 'base64').toString();

assert.equal(String(req.query.start_cursor), lastEndCursor);
assert.strictEqual(req.query.offset, undefined);

callback(null, mockResponse.withResults);
}
};

var resultsReturned = 0;

request.runQuery(query)
.on('data', function() { resultsReturned++; })
.on('end', function() {
assert.equal(resultsReturned, mockRespGet.found.length);
done();
});
});

it('should emit an error', function(done) {
var error = new Error('Error.');

request.makeReq_ = function(method, req, callback) {
callback(error);
};

request.runQuery(query)
.on('error', function(err) {
assert.equal(err, error);
done();
})
.emit('reading');
});
});
});

describe('allocateIds', function() {
Expand Down