Skip to content

Commit 6d66b69

Browse files
dplewisflovilmart
authored andcommitted
Support for Aggregate Queries (parse-community#4207)
* Support for Aggregate Queries * improve pg and coverage * Mongo 3.4 aggregates and tests * replace _id with objectId * improve tests for objectId * project with group query * typo
1 parent 6d3d66f commit 6d66b69

File tree

8 files changed

+675
-1
lines changed

8 files changed

+675
-1
lines changed

spec/ParseQuery.Aggregate.spec.js

+412
Large diffs are not rendered by default.

src/Adapters/Storage/Mongo/MongoCollection.js

+8
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,14 @@ export default class MongoCollection {
6060
return countOperation;
6161
}
6262

63+
distinct(field, query) {
64+
return this._mongoCollection.distinct(field, query);
65+
}
66+
67+
aggregate(pipeline, { maxTimeMS, readPreference } = {}) {
68+
return this._mongoCollection.aggregate(pipeline, { maxTimeMS, readPreference }).toArray();
69+
}
70+
6371
insertOne(object) {
6472
return this._mongoCollection.insertOne(object);
6573
}

src/Adapters/Storage/Mongo/MongoStorageAdapter.js

+21
Original file line numberDiff line numberDiff line change
@@ -405,6 +405,27 @@ export class MongoStorageAdapter {
405405
}));
406406
}
407407

408+
distinct(className, schema, query, fieldName) {
409+
schema = convertParseSchemaToMongoSchema(schema);
410+
return this._adaptiveCollection(className)
411+
.then(collection => collection.distinct(fieldName, transformWhere(className, query, schema)));
412+
}
413+
414+
aggregate(className, pipeline, readPreference) {
415+
readPreference = this._parseReadPreference(readPreference);
416+
return this._adaptiveCollection(className)
417+
.then(collection => collection.aggregate(pipeline, { readPreference, maxTimeMS: this._maxTimeMS }))
418+
.then(results => {
419+
results.forEach(result => {
420+
if (result.hasOwnProperty('_id')) {
421+
result.objectId = result._id;
422+
delete result._id;
423+
}
424+
});
425+
return results;
426+
});
427+
}
428+
408429
_parseReadPreference(readPreference) {
409430
if (readPreference) {
410431
switch (readPreference) {

src/Adapters/Storage/Postgres/PostgresStorageAdapter.js

+138
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,10 @@ const transformDotField = (fieldName) => {
165165
return name;
166166
}
167167

168+
const transformAggregateField = (fieldName) => {
169+
return fieldName.substr(1);
170+
}
171+
168172
const validateKeys = (object) => {
169173
if (typeof object == 'object') {
170174
for (const key in object) {
@@ -1366,6 +1370,140 @@ export class PostgresStorageAdapter {
13661370
});
13671371
}
13681372

1373+
distinct(className, schema, query, fieldName) {
1374+
debug('distinct', className, query);
1375+
let field = fieldName;
1376+
let column = fieldName;
1377+
if (fieldName.indexOf('.') >= 0) {
1378+
field = transformDotFieldToComponents(fieldName).join('->');
1379+
column = fieldName.split('.')[0];
1380+
}
1381+
const isArrayField = schema.fields
1382+
&& schema.fields[fieldName]
1383+
&& schema.fields[fieldName].type === 'Array';
1384+
const values = [field, column, className];
1385+
const where = buildWhereClause({ schema, query, index: 4 });
1386+
values.push(...where.values);
1387+
1388+
const wherePattern = where.pattern.length > 0 ? `WHERE ${where.pattern}` : '';
1389+
let qs = `SELECT DISTINCT ON ($1:raw) $2:raw FROM $3:name ${wherePattern}`;
1390+
if (isArrayField) {
1391+
qs = `SELECT distinct jsonb_array_elements($1:raw) as $2:raw FROM $3:name ${wherePattern}`;
1392+
}
1393+
debug(qs, values);
1394+
return this._client.any(qs, values)
1395+
.catch(() => [])
1396+
.then((results) => {
1397+
if (fieldName.indexOf('.') === -1) {
1398+
return results.map(object => object[field]);
1399+
}
1400+
const child = fieldName.split('.')[1];
1401+
return results.map(object => object[column][child]);
1402+
});
1403+
}
1404+
1405+
aggregate(className, pipeline) {
1406+
debug('aggregate', className, pipeline);
1407+
const values = [className];
1408+
let columns = [];
1409+
let countField = null;
1410+
let wherePattern = '';
1411+
let limitPattern = '';
1412+
let skipPattern = '';
1413+
let sortPattern = '';
1414+
let groupPattern = '';
1415+
for (let i = 0; i < pipeline.length; i += 1) {
1416+
const stage = pipeline[i];
1417+
if (stage.$group) {
1418+
for (const field in stage.$group) {
1419+
const value = stage.$group[field];
1420+
if (value === null || value === undefined) {
1421+
continue;
1422+
}
1423+
if (field === '_id') {
1424+
columns.push(`${transformAggregateField(value)} AS "objectId"`);
1425+
groupPattern = `GROUP BY ${transformAggregateField(value)}`;
1426+
continue;
1427+
}
1428+
if (value.$sum) {
1429+
if (typeof value.$sum === 'string') {
1430+
columns.push(`SUM(${transformAggregateField(value.$sum)}) AS "${field}"`);
1431+
} else {
1432+
countField = field;
1433+
columns.push(`COUNT(*) AS "${field}"`);
1434+
}
1435+
}
1436+
if (value.$max) {
1437+
columns.push(`MAX(${transformAggregateField(value.$max)}) AS "${field}"`);
1438+
}
1439+
if (value.$min) {
1440+
columns.push(`MIN(${transformAggregateField(value.$min)}) AS "${field}"`);
1441+
}
1442+
if (value.$avg) {
1443+
columns.push(`AVG(${transformAggregateField(value.$avg)}) AS "${field}"`);
1444+
}
1445+
}
1446+
columns.join(',');
1447+
} else {
1448+
columns.push('*');
1449+
}
1450+
if (stage.$project) {
1451+
if (columns.includes('*')) {
1452+
columns = [];
1453+
}
1454+
for (const field in stage.$project) {
1455+
const value = stage.$project[field];
1456+
if ((value === 1 || value === true)) {
1457+
columns.push(field);
1458+
}
1459+
}
1460+
}
1461+
if (stage.$match) {
1462+
const patterns = [];
1463+
for (const field in stage.$match) {
1464+
const value = stage.$match[field];
1465+
Object.keys(ParseToPosgresComparator).forEach(cmp => {
1466+
if (value[cmp]) {
1467+
const pgComparator = ParseToPosgresComparator[cmp];
1468+
patterns.push(`${field} ${pgComparator} ${value[cmp]}`);
1469+
}
1470+
});
1471+
}
1472+
wherePattern = patterns.length > 0 ? `WHERE ${patterns.join(' ')}` : '';
1473+
}
1474+
if (stage.$limit) {
1475+
limitPattern = `LIMIT ${stage.$limit}`;
1476+
}
1477+
if (stage.$skip) {
1478+
skipPattern = `OFFSET ${stage.$skip}`;
1479+
}
1480+
if (stage.$sort) {
1481+
const sort = stage.$sort;
1482+
const sorting = Object.keys(sort).map((key) => {
1483+
if (sort[key] === 1) {
1484+
return `"${key}" ASC`;
1485+
}
1486+
return `"${key}" DESC`;
1487+
}).join(',');
1488+
sortPattern = sort !== undefined && Object.keys(sort).length > 0 ? `ORDER BY ${sorting}` : '';
1489+
}
1490+
}
1491+
1492+
const qs = `SELECT ${columns} FROM $1:name ${wherePattern} ${sortPattern} ${limitPattern} ${skipPattern} ${groupPattern}`;
1493+
debug(qs, values);
1494+
return this._client.any(qs, values).then(results => {
1495+
if (countField) {
1496+
results[0][countField] = parseInt(results[0][countField], 10);
1497+
}
1498+
results.forEach(result => {
1499+
if (!result.hasOwnProperty('objectId')) {
1500+
result.objectId = null;
1501+
}
1502+
});
1503+
return results;
1504+
});
1505+
}
1506+
13691507
performInitialization({ VolatileClassesSchemas }) {
13701508
debug('performInitialization');
13711509
const promises = VolatileClassesSchemas.map((schema) => {

src/Controllers/DatabaseController.js

+14
Original file line numberDiff line numberDiff line change
@@ -785,6 +785,8 @@ DatabaseController.prototype.find = function(className, query, {
785785
count,
786786
keys,
787787
op,
788+
distinct,
789+
pipeline,
788790
readPreference
789791
} = {}) {
790792
const isMaster = acl === undefined;
@@ -853,6 +855,18 @@ DatabaseController.prototype.find = function(className, query, {
853855
} else {
854856
return this.adapter.count(className, schema, query, readPreference);
855857
}
858+
} else if (distinct) {
859+
if (!classExists) {
860+
return [];
861+
} else {
862+
return this.adapter.distinct(className, schema, query, distinct);
863+
}
864+
} else if (pipeline) {
865+
if (!classExists) {
866+
return [];
867+
} else {
868+
return this.adapter.aggregate(className, pipeline, readPreference);
869+
}
856870
} else {
857871
if (!classExists) {
858872
return [];

src/ParseServer.js

+3-1
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import { SessionsRouter } from './Routers/SessionsRouter';
3434
import { UsersRouter } from './Routers/UsersRouter';
3535
import { PurgeRouter } from './Routers/PurgeRouter';
3636
import { AudiencesRouter } from './Routers/AudiencesRouter';
37+
import { AggregateRouter } from './Routers/AggregateRouter';
3738

3839
import { ParseServerRESTController } from './ParseServerRESTController';
3940
import * as controllers from './Controllers';
@@ -197,7 +198,8 @@ class ParseServer {
197198
new PurgeRouter(),
198199
new HooksRouter(),
199200
new CloudCodeRouter(),
200-
new AudiencesRouter()
201+
new AudiencesRouter(),
202+
new AggregateRouter()
201203
];
202204

203205
const routes = routers.reduce((memo, router) => {

src/RestQuery.js

+2
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,8 @@ function RestQuery(config, auth, className, restWhere = {}, restOptions = {}, cl
8686
case 'count':
8787
this.doCount = true;
8888
break;
89+
case 'distinct':
90+
case 'pipeline':
8991
case 'skip':
9092
case 'limit':
9193
case 'readPreference':

src/Routers/AggregateRouter.js

+77
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
import ClassesRouter from './ClassesRouter';
2+
import rest from '../rest';
3+
import * as middleware from '../middlewares';
4+
import Parse from 'parse/node';
5+
6+
const ALLOWED_KEYS = [
7+
'where',
8+
'distinct',
9+
'project',
10+
'match',
11+
'redact',
12+
'limit',
13+
'skip',
14+
'unwind',
15+
'group',
16+
'sample',
17+
'sort',
18+
'geoNear',
19+
'lookup',
20+
'out',
21+
'indexStats',
22+
'facet',
23+
'bucket',
24+
'bucketAuto',
25+
'sortByCount',
26+
'addFields',
27+
'replaceRoot',
28+
'count',
29+
'graphLookup',
30+
];
31+
32+
export class AggregateRouter extends ClassesRouter {
33+
34+
handleFind(req) {
35+
const body = Object.assign(req.body, ClassesRouter.JSONFromQuery(req.query));
36+
const options = {};
37+
const pipeline = [];
38+
39+
for (const key in body) {
40+
if (ALLOWED_KEYS.indexOf(key) === -1) {
41+
throw new Parse.Error(Parse.Error.INVALID_QUERY, `Invalid parameter for query: ${key}`);
42+
}
43+
if (key === 'group') {
44+
if (body[key].hasOwnProperty('_id')) {
45+
throw new Parse.Error(
46+
Parse.Error.INVALID_QUERY,
47+
`Invalid parameter for query: group. Please use objectId instead of _id`
48+
);
49+
}
50+
if (!body[key].hasOwnProperty('objectId')) {
51+
throw new Parse.Error(
52+
Parse.Error.INVALID_QUERY,
53+
`Invalid parameter for query: group. objectId is required`
54+
);
55+
}
56+
body[key]._id = body[key].objectId;
57+
delete body[key].objectId;
58+
}
59+
pipeline.push({ [`$${key}`]: body[key] });
60+
}
61+
if (body.distinct) {
62+
options.distinct = String(body.distinct);
63+
}
64+
options.pipeline = pipeline;
65+
if (typeof body.where === 'string') {
66+
body.where = JSON.parse(body.where);
67+
}
68+
return rest.find(req.config, req.auth, this.className(req), body.where, options, req.info.clientSDK)
69+
.then((response) => { return { response }; });
70+
}
71+
72+
mountRoutes() {
73+
this.route('GET','/aggregate/:className', middleware.promiseEnforceMasterKeyAccess, req => { return this.handleFind(req); });
74+
}
75+
}
76+
77+
export default AggregateRouter;

0 commit comments

Comments
 (0)