Skip to content

Commit 7926ea4

Browse files
ciaranschutteCiaran Schutte
andcommitted
Accumulator pipeline (#893)
* change resolver code based on new gql type * comments and cleanup * break up main entry point into two modules, one for querying and one for field processing * rename network config type, remove supported aggs from type * type cleanup * only send network queries with original query fields * move gql health check into module * cleanup, renaming types, removing redundant code * add type * fix merged conflict * adds full stop. fixes comment typo * fix pipeline code, move accumulator into class * handle unavailable node * check undefined in Success * add note * rename poorly named field * add comment: * rename file * http response success status as const * renamed AggAccumulator file * clean up types, change loose object to use record, fix any typings --------- Co-authored-by: Ciaran Schutte <[email protected]>
1 parent 4c4dc68 commit 7926ea4

File tree

6 files changed

+126
-125
lines changed

6 files changed

+126
-125
lines changed

modules/server/src/network/aggregations/index.ts renamed to modules/server/src/network/aggregations/AggregationAccumulator.ts

Lines changed: 44 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,30 @@
11
import { SupportedAggregation, SUPPORTED_AGGREGATIONS } from '../common';
2-
import { Aggregations, NetworkAggregation, NumericAggregations, RemoteAggregation } from '../types';
2+
import { Aggregations, Bucket, NumericAggregations } from '../types/aggregations';
3+
import { RemoteAggregation } from '../types/types';
4+
import { RequestedFieldsMap } from '../util';
35

4-
type NetworkResult = {
5-
[key: string]: RemoteAggregation;
6-
};
6+
type NetworkResult = Partial<Record<string, RemoteAggregation>>;
77

88
type ResolveAggregationInput = {
99
networkResult: NetworkResult;
1010
requestedAggregationFields: string[];
11-
accumulator: any;
11+
accumulator: RemoteAggregation;
1212
};
1313

1414
/**
1515
* Resolves returned aggregations from network queries into single accumulated aggregation
1616
*
1717
* @param
18-
* @returns number - Total bucket count for node
1918
*/
20-
export const resolveAggregations = ({
19+
const resolveAggregations = ({
2120
networkResult,
2221
requestedAggregationFields,
2322
accumulator,
24-
}: ResolveAggregationInput): number => {
23+
}: ResolveAggregationInput) => {
2524
const documentName = Object.keys(networkResult)[0];
2625

27-
const nodeBucketCount = requestedAggregationFields.reduce((bucketCountAcc, fieldName) => {
26+
requestedAggregationFields.forEach((fieldName) => {
2827
const fieldAggregations = networkResult[documentName][fieldName];
29-
const fieldBucketCount = fieldAggregations.bucket_count;
3028
const aggregationType = fieldAggregations.__typename;
3129

3230
const accumulatedFieldAggregations = accumulator[fieldName];
@@ -37,11 +35,9 @@ export const resolveAggregations = ({
3735

3836
// mutation - updates accumulator
3937
accumulator[fieldName] = resolvedAggregation;
40-
// returns total bucket count for node
41-
return bucketCountAcc + fieldBucketCount;
42-
}, 0);
38+
});
4339

44-
return nodeBucketCount;
40+
return accumulator;
4541
};
4642

4743
/**
@@ -53,7 +49,7 @@ export const resolveAggregations = ({
5349
export const resolveToNetworkAggregation = (
5450
type: SupportedAggregation,
5551
aggregations: Aggregations[],
56-
): NetworkAggregation | undefined => {
52+
): RemoteAggregation | undefined => {
5753
if (type === SUPPORTED_AGGREGATIONS.Aggregations) {
5854
return resolveAggregation(aggregations);
5955
} else if (type === SUPPORTED_AGGREGATIONS.NumericAggregations) {
@@ -71,7 +67,7 @@ export const resolveToNetworkAggregation = (
7167
* @param bucket - Bucket being processed
7268
* @param computedBuckets - Existing buckets
7369
*/
74-
const updateComputedBuckets = (bucket, computedBuckets) => {
70+
const updateComputedBuckets = (bucket: Bucket, computedBuckets: Bucket[]) => {
7571
/*
7672
* Unable to use lookup key eg. buckets[key]
7773
* "buckets": [
@@ -158,7 +154,7 @@ const updateComputedBuckets = (bucket, computedBuckets) => {
158154
* }
159155
* ```
160156
*/
161-
export const resolveAggregation = (aggregations: Aggregations[]): NetworkAggregation => {
157+
export const resolveAggregation = (aggregations: Aggregations[]): Aggregations => {
162158
const resolvedAggregation = aggregations.reduce((resolvedAggregation, agg) => {
163159
const computedBuckets = resolvedAggregation.buckets;
164160
agg.buckets.forEach((bucket) => updateComputedBuckets(bucket, computedBuckets));
@@ -172,3 +168,34 @@ const resolveNumericAggregation = (aggregations: NumericAggregations) => {
172168
// TODO: implement
173169
throw Error('Not implemented');
174170
};
171+
172+
const emptyAggregation: Aggregations = { bucket_count: 0, buckets: [] };
173+
174+
export class AggregationAccumulator {
175+
requestedFields: string[];
176+
totalAgg: RemoteAggregation;
177+
178+
constructor(requestedFieldsMap: RequestedFieldsMap) {
179+
const requestedFields = Object.keys(requestedFieldsMap);
180+
this.requestedFields = requestedFields;
181+
/*
182+
* seed accumulator with the requested field keys
183+
* this will make it easier to add to using key lookup instead of Array.find
184+
*/
185+
this.totalAgg = requestedFields.reduce<RemoteAggregation>((accumulator: any, field: any) => {
186+
return { ...accumulator, [field]: emptyAggregation };
187+
}, {});
188+
}
189+
190+
resolve(data: NetworkResult) {
191+
resolveAggregations({
192+
accumulator: this.totalAgg,
193+
networkResult: data,
194+
requestedAggregationFields: this.requestedFields,
195+
});
196+
}
197+
198+
result() {
199+
return this.totalAgg;
200+
}
201+
}

modules/server/src/network/httpResponses.ts

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
1+
const RESULT_STATUS = {
2+
SUCCESS: 'SUCCESS',
3+
} as const;
4+
15
// Success and Failure types
2-
export type Success<T> = { status: 'SUCCESS'; data: T };
6+
export type Success<T> = { status: typeof RESULT_STATUS.SUCCESS; data: T };
37
export type Failure<FailureStatus extends string, T = void> = {
48
status: FailureStatus;
59
message: string;
@@ -28,15 +32,15 @@ export type Result<T, FailureStatus extends string, FailureData = void> =
2832
export function isSuccess<T, FailureStatus extends string, FailureData>(
2933
result: Result<T, FailureStatus, FailureData>,
3034
): result is Success<T> {
31-
return result.status === 'SUCCESS';
35+
return result.status === RESULT_STATUS.SUCCESS;
3236
}
3337

3438
/**
3539
* Create a successful response for a Result or Either type, with data of the success type
3640
* @param {T} data
3741
* @returns {Success<T>} `{status: 'SUCCESS', data}`
3842
*/
39-
export const success = <T>(data: T): Success<T> => ({ status: 'SUCCESS', data });
43+
export const success = <T>(data: T): Success<T> => ({ status: RESULT_STATUS.SUCCESS, data });
4044

4145
/**
4246
* Create a response indicating a failure with a status naming the reason and message describing the failure.
Lines changed: 69 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
import { gql } from 'apollo-server-core';
22
import axios from 'axios';
33
import { DocumentNode } from 'graphql';
4-
import { resolveAggregations } from '../aggregations';
4+
import { AggregationAccumulator } from '../aggregations/AggregationAccumulator';
55
import { fetchGql } from '../gql';
6-
import { failure, isSuccess, success } from '../httpResponses';
7-
import { NetworkAggregation } from '../types/types';
8-
import { ASTtoString } from '../util';
6+
import { failure, isSuccess, Result, success } from '../httpResponses';
7+
import { NetworkConfig } from '../types/setup';
8+
import { ASTtoString, RequestedFieldsMap } from '../util';
99
import { CONNECTION_STATUS, NetworkNode } from './networkNode';
1010

1111
/**
@@ -14,7 +14,9 @@ import { CONNECTION_STATUS, NetworkNode } from './networkNode';
1414
* @param query
1515
* @returns
1616
*/
17-
const fetchData = async (query: NetworkQuery) => {
17+
const fetchData = async (
18+
query: NetworkQuery,
19+
): Promise<Result<unknown, typeof CONNECTION_STATUS.error>> => {
1820
const { url, gqlQuery } = query;
1921
console.log(`Fetch data starting for ${url}`);
2022
try {
@@ -33,77 +35,18 @@ const fetchData = async (query: NetworkQuery) => {
3335
return failure(CONNECTION_STATUS.ERROR, `Request cancelled: ${url}`);
3436
}
3537

36-
const responseStatus = error.response.status;
37-
if (responseStatus === 404) {
38-
console.error(`Network failure: ${url}`);
39-
return failure(CONNECTION_STATUS.ERROR, `Network failure: ${url}`);
40-
} else {
41-
return failure(CONNECTION_STATUS.ERROR, `Unknown error`);
38+
if (axios.isAxiosError(error)) {
39+
if (error.code === 'ECONNREFUSED') {
40+
console.error(`Network failure: ${url}`);
41+
return failure(CONNECTION_STATUS.ERROR, `Network failure: ${url}`);
42+
}
4243
}
44+
return failure(CONNECTION_STATUS.ERROR, `Unknown error`);
4345
} finally {
4446
console.log(`Fetch data completing for ${query.url}`);
4547
}
4648
};
4749

48-
/**
49-
* Query each remote connection
50-
*
51-
* @param queries - Query for each remote connection
52-
* @param requestedAggregationFields
53-
* @returns
54-
*/
55-
export const aggregationPipeline = async (
56-
queries: NetworkQuery[],
57-
requestedAggregationFields: any,
58-
) => {
59-
/*
60-
* seed accumulator with the requested field keys
61-
* this will make it easier to add to using key lookup instead of Array.find
62-
*/
63-
const emptyAggregation: NetworkAggregation = { bucket_count: 0, buckets: [] };
64-
const aggregationAccumulator = requestedAggregationFields.reduce((accumulator, field) => {
65-
return { ...accumulator, [field]: emptyAggregation };
66-
}, {});
67-
68-
const nodeInfo: NetworkNode[] = [];
69-
70-
const aggregationResultPromises = queries.map<
71-
Promise<{
72-
aggregations: any;
73-
remoteConnection: NetworkNode;
74-
}>
75-
>(async (query) => {
76-
const name = query.url; // TODO: use readable name not url
77-
const response = await fetchData(query);
78-
79-
if (response && isSuccess(response)) {
80-
const nodeBucketCount = resolveAggregations({
81-
networkResult: response.data,
82-
requestedAggregationFields,
83-
accumulator: aggregationAccumulator,
84-
});
85-
86-
nodeInfo.push({
87-
name,
88-
count: nodeBucketCount,
89-
status: CONNECTION_STATUS.OK,
90-
errors: '',
91-
});
92-
} else {
93-
nodeInfo.push({
94-
name,
95-
count: 0,
96-
status: CONNECTION_STATUS.ERROR,
97-
errors: response?.message || 'Error',
98-
});
99-
}
100-
});
101-
102-
// return accumulated results
103-
await Promise.allSettled(aggregationResultPromises);
104-
return { aggregationResults: aggregationAccumulator, nodeInfo };
105-
};
106-
10750
type NetworkQuery = {
10851
url: string;
10952
gqlQuery: DocumentNode;
@@ -135,7 +78,7 @@ type NetworkQuery = {
13578
* `
13679
* ```
13780
*/
138-
const createGqlFieldsString = (requestedFields: {}, documentName: string) => {
81+
const createGqlFieldsString = (requestedFields: RequestedFieldsMap, documentName: string) => {
13982
const gqlFieldsString = JSON.stringify(requestedFields)
14083
.replaceAll('"', '')
14184
.replaceAll(':', '')
@@ -148,36 +91,66 @@ const createGqlFieldsString = (requestedFields: {}, documentName: string) => {
14891
return gqlString;
14992
};
15093

94+
const createNetworkQuery = (
95+
config: NetworkConfig,
96+
requestedFields: RequestedFieldsMap,
97+
): DocumentNode => {
98+
const gqlString = createGqlFieldsString(requestedFields, config.documentName);
99+
100+
/*
101+
* convert string to AST object to use as query
102+
* not needed if gqlString is formatted correctly but this acts as a validity check
103+
*/
104+
try {
105+
const gqlQuery = gql`
106+
${gqlString}
107+
`;
108+
return gqlQuery;
109+
} catch (err) {
110+
console.error('invalid gql', err);
111+
}
112+
};
113+
151114
/**
152-
* Create queries for remote nodes based on requested fields
115+
* Query each remote connection
153116
*
154-
* @param configs
155-
* @param requestedFields
117+
* @param queries - Query for each remote connection
118+
* @param requestedAggregationFields
156119
* @returns
157120
*/
158-
export const createNetworkQueries = (
121+
export const aggregationPipeline = async (
159122
configs: NetworkConfig[],
160-
requestedFields: {},
161-
): NetworkQuery[] => {
162-
const queries = configs
163-
.map((config) => {
164-
const gqlString = createGqlFieldsString(requestedFields, config.documentName);
123+
requestedAggregationFields: RequestedFieldsMap,
124+
) => {
125+
const nodeInfo: NetworkNode[] = [];
165126

166-
/*
167-
* convert string to AST object to use as query
168-
* not needed if gqlString is formatted correctly but this acts as a validity check
169-
*/
170-
try {
171-
const gqlQuery = gql`
172-
${gqlString}
173-
`;
174-
return { url: config.graphqlUrl, gqlQuery };
175-
} catch (err) {
176-
console.error('invalid gql', err);
177-
return false;
178-
}
179-
})
180-
.filter(Boolean);
127+
const totalAgg = new AggregationAccumulator(requestedAggregationFields);
181128

182-
return queries;
129+
const aggregationResultPromises = configs.map(async (config) => {
130+
const gqlQuery = createNetworkQuery(config, requestedAggregationFields);
131+
const response = await fetchData({ url: config.graphqlUrl, gqlQuery });
132+
133+
const nodeName = config.displayName;
134+
135+
if (isSuccess(response)) {
136+
totalAgg.resolve(response.data);
137+
nodeInfo.push({
138+
name: nodeName,
139+
count: 1, // TODO total { hit } in query,
140+
status: CONNECTION_STATUS.OK,
141+
errors: '',
142+
});
143+
} else {
144+
nodeInfo.push({
145+
name: nodeName,
146+
count: 0,
147+
status: CONNECTION_STATUS.ERROR,
148+
errors: response?.message || 'Error',
149+
});
150+
}
151+
});
152+
153+
// return accumulated results
154+
await Promise.allSettled(aggregationResultPromises);
155+
return { aggregationResults: totalAgg.result(), nodeInfo };
183156
};

modules/server/src/network/resolvers/index.ts

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { type GraphQLResolveInfo } from 'graphql';
22
import { NetworkConfig } from '../types/setup';
33
import { resolveInfoToMap } from '../util';
4-
import { aggregationPipeline, createNetworkQueries } from './aggregations';
4+
import { aggregationPipeline } from './aggregations';
55
import { NetworkNode } from './networkNode';
66
import { createResponse } from './response';
77

@@ -30,16 +30,12 @@ export const createResolvers = (configs: NetworkConfig[]) => {
3030
info: GraphQLResolveInfo,
3131
) => {
3232
const requestedFieldsMap = resolveInfoToMap(info, 'aggregations');
33-
const networkQueries = createNetworkQueries(configs, requestedFieldsMap);
3433

35-
// Query remote connections and aggregate results
36-
const requestedFields = Object.keys(requestedFieldsMap);
3734
const { aggregationResults, nodeInfo } = await aggregationPipeline(
38-
networkQueries,
39-
requestedFields,
35+
configs,
36+
requestedFieldsMap,
4037
);
4138
const response = createResponse({ aggregationResults, nodeInfo });
42-
4339
return response;
4440
},
4541
},

modules/server/src/network/types/types.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ export type UnsupportedAggregations = NetworkFieldType<string>[];
2626

2727
export type ConnectionStatus = ObjectValues<typeof CONNECTION_STATUS>;
2828

29-
export type RemoteAggregation = { [key: string]: Aggregations | NumericAggregations };
29+
export type RemoteAggregation = Partial<Record<string, Aggregations | NumericAggregations>>;
3030

3131
export type NetworkAggregation = {
3232
bucket_count: number;

0 commit comments

Comments
 (0)