Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ EXPRESS_PORT=8081
###############################
# Uncomment next line if you want to enable statement handling priority
#ENABLE_QUEUE_PRIORITY=true
# Event provider(redis|sqs). Redis by default
#EVENTS_REPO=redis
# Queue namespace
#QUEUE_NAMESPACE=DEV

##########
# Misc #
Expand Down
3 changes: 3 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
},
"dependencies": {
"@aws-sdk/client-s3": "^3.100.0",
"@aws-sdk/client-sqs": "^3.282.0",
"@aws-sdk/lib-storage": "^3.100.0",
"@azure/storage-blob": "^10.3.0",
"@google-cloud/storage": "^5.8.1",
Expand Down Expand Up @@ -208,6 +209,8 @@
"functional/prefer-type-literal": "off",
"functional/no-throw-statement": "off",
"functional/no-try-statement": "off",
"functional/no-let": "off",
"functional/no-loop-statement": "off",
"functional/prefer-readonly-type": [
"error",
{
Expand Down
6 changes: 6 additions & 0 deletions src/apps/AppConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { Redis } from 'ioredis';
import Tracker from 'jscommons/dist/tracker/Tracker';
import { Db } from 'mongodb';
import { LoggerInstance } from 'winston';
import { SQSClient } from '@aws-sdk/client-sqs';

export default interface AppConfig {
readonly repo: {
Expand Down Expand Up @@ -46,6 +47,11 @@ export default interface AppConfig {
readonly client: () => Promise<Redis>;
readonly isQueuePriorityEnabled: boolean;
};
readonly sqs: {
readonly prefix: string;
readonly client: () => Promise<SQSClient>;
readonly isQueuePriorityEnabled: boolean;
};
};
readonly service: {
readonly statements: {
Expand Down
1 change: 1 addition & 0 deletions src/apps/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ export default (appConfig: AppConfig): Router => {
local: appConfig.repo.local,
mongo: appConfig.repo.mongo,
redis: appConfig.repo.redis,
sqs: appConfig.repo.sqs,
s3: appConfig.repo.s3,
storageSubFolder: appConfig.repo.storageSubFolders.statements,
},
Expand Down
6 changes: 6 additions & 0 deletions src/apps/statements/AppConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { Redis } from 'ioredis';
import Tracker from 'jscommons/dist/tracker/Tracker';
import { Db } from 'mongodb';
import { LoggerInstance } from 'winston';
import { SQSClient } from '@aws-sdk/client-sqs';

export default interface AppConfig {
readonly logger: LoggerInstance;
Expand Down Expand Up @@ -62,5 +63,10 @@ export default interface AppConfig {
readonly client: () => Promise<Redis>;
readonly isQueuePriorityEnabled: boolean;
};
readonly sqs: {
readonly prefix: string;
readonly client: () => Promise<SQSClient>;
readonly isQueuePriorityEnabled: boolean;
};
};
}
5 changes: 5 additions & 0 deletions src/apps/statements/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ export default (appConfig: AppConfig): Result => {
prefix: appConfig.repo.redis.prefix,
isQueuePriorityEnabled: appConfig.repo.redis.isQueuePriorityEnabled,
},
sqs: {
client: appConfig.repo.sqs.client,
prefix: appConfig.repo.sqs.prefix,
isQueuePriorityEnabled: appConfig.repo.sqs.isQueuePriorityEnabled,
},
},
models: {
facade: appConfig.repo.factory.modelsRepoName,
Expand Down
2 changes: 2 additions & 0 deletions src/apps/statements/repo/eventsRepo/FactoryConfig.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import RedisFactoryConfig from './utils/redisEvents/FactoryConfig';
import SQSFactoryConfig from './utils/sqsEvents/FactoryConfig';

export default interface FactoryConfig {
readonly facade?: string;
readonly redis?: RedisFactoryConfig;
readonly sqs?: SQSFactoryConfig;
}
70 changes: 70 additions & 0 deletions src/apps/statements/repo/eventsRepo/emitNewStatements/sqs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import {
GetQueueUrlCommand,
SQSClient,
SendMessageBatchCommand,
SendMessageBatchRequestEntry,
} from '@aws-sdk/client-sqs';
import { v4 } from 'uuid';
import { getPrefixWithProcessingPriority } from '../utils/getPrefixWithProcessingPriority';
import { StatementProcessingPriority } from '../../../enums/statementProcessingPriority.enum';
import FacadeConfig from '../utils/sqsEvents/FacadeConfig';
import { STATEMENT_QUEUE } from '../utils/constants';
import Signature from './Signature';

const MAX_BATCH_SIZE = 10;

let queueUrl: string | undefined;

const publishMessages = async (sqsClient: SQSClient, statementProperties: string[]) => {
const statementPropertiesBatchRequest = statementProperties.map(
(statementProperty): SendMessageBatchRequestEntry => ({
Id: v4(),
MessageBody: statementProperty,
}),
);

for (let index = 0; index < statementPropertiesBatchRequest.length; index += MAX_BATCH_SIZE) {
await sqsClient.send(
new SendMessageBatchCommand({
QueueUrl: queueUrl,
Entries: statementPropertiesBatchRequest.slice(index, index + MAX_BATCH_SIZE),
}),
);
}
};

const getQueueUrl = async (
sqsClient: SQSClient,
prefix: string,
priority: StatementProcessingPriority,
isQueuePriorityEnabled: boolean,
) => {
if (queueUrl) {
return queueUrl;
}

const prefixWithPriority = getPrefixWithProcessingPriority(
prefix,
priority,
isQueuePriorityEnabled,
);

const getQueueUrlCommand = new GetQueueUrlCommand({
QueueName: `${prefixWithPriority}_${STATEMENT_QUEUE}`,
});

const commandResult = await sqsClient.send(getQueueUrlCommand);

queueUrl = commandResult.QueueUrl;

return queueUrl;
};

export default (config: FacadeConfig): Signature => {
return async ({ statementProperties, priority }) => {
const sqsClient = await config.client();

await getQueueUrl(sqsClient, config.prefix, priority, config.isQueuePriorityEnabled);
await publishMessages(sqsClient, statementProperties);
};
};
3 changes: 3 additions & 0 deletions src/apps/statements/repo/eventsRepo/factory.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import Facade from './Facade';
import FactoryConfig from './FactoryConfig';
import redisFactory from './utils/redisEvents/factory';
import sqsFactory from './utils/sqsEvents/factory';

export default (config: FactoryConfig): Facade => {
switch (config.facade) {
default:
case 'redis':
return redisFactory(config.redis);
case 'sqs':
return sqsFactory(config.sqs);
}
};
1 change: 1 addition & 0 deletions src/apps/statements/repo/eventsRepo/utils/constants.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export const EVENT_NAME = 'statement.new';
export const CHANNEL_NAME = 'statement.notify';
export const STATEMENT_QUEUE = 'STATEMENT_QUEUE';
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import { SQSClient } from '@aws-sdk/client-sqs';

export default interface FacadeConfig {
readonly client: () => Promise<SQSClient>;
readonly prefix: string;
readonly isQueuePriorityEnabled: boolean;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import { SQSClient } from '@aws-sdk/client-sqs';

export default interface FactoryConfig {
readonly client?: () => Promise<SQSClient>;
readonly prefix?: string;
readonly isQueuePriorityEnabled?: boolean;
}
26 changes: 26 additions & 0 deletions src/apps/statements/repo/eventsRepo/utils/sqsEvents/factory.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import { defaultTo } from 'lodash';
import emitNewStatements from '../../emitNewStatements/sqs';
import Facade from '../../Facade';
import connectToSQS from "../../../../../../utils/connectToSQS";
import FacadeConfig from './FacadeConfig';
import FactoryConfig from './FactoryConfig';

export default (factoryConfig: FactoryConfig = {}): Facade => {
const facadeConfig: FacadeConfig = {
client: defaultTo(factoryConfig.client, connectToSQS()),
prefix: defaultTo(factoryConfig.prefix, 'xapistatements'),
isQueuePriorityEnabled: defaultTo(factoryConfig.isQueuePriorityEnabled, false),
};
return {
emitNewStatements: emitNewStatements(facadeConfig),
clearRepo: async () => {
// Do nothing.
},
migrate: async () => {
// Do nothing.
},
rollback: async () => {
// Do nothing.
},
};
};
8 changes: 8 additions & 0 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,14 @@ export default {
prefix: getStringOption(process.env.REDIS_PREFIX, 'LEARNINGLOCKER'),
url: getStringOption(process.env.REDIS_URL, 'redis://127.0.0.1:6379/0'),
},
aws: {
region: globalAwsRegion,
accessKeyId: globalAwsIamAccessKeyId,
secretAccessKey: globalAwsIamAccessKeySecret,
},
sqs: {
prefix: getStringOption(process.env.QUEUE_NAMESPACE, 'DEV'),
},
repoFactory: {
authRepoName: getStringOption(process.env.AUTH_REPO, 'mongo'),
eventsRepoName: getStringOption(process.env.EVENTS_REPO, 'redis'),
Expand Down
6 changes: 6 additions & 0 deletions src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import config from './config';
import logger from './logger';
import connectToMongoDb from './utils/connectToMongoDb';
import connectToRedis from './utils/connectToRedis';
import connectToSQS from './utils/connectToSQS';

const expressApp = express();

Expand All @@ -31,6 +32,11 @@ expressApp.use(
prefix: config.redis.prefix,
isQueuePriorityEnabled: config.isQueuePriorityEnabled,
},
sqs: {
client: connectToSQS(),
prefix: config.sqs.prefix,
isQueuePriorityEnabled: config.isQueuePriorityEnabled,
},
repoFactory: config.repoFactory,
s3: config.s3StorageRepo,
storageSubFolders: config.storageSubFolders,
Expand Down
22 changes: 22 additions & 0 deletions src/utils/connectToSQS.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { SQSClient } from '@aws-sdk/client-sqs';
import { once } from 'lodash';
import config from '../config';
import logger from '../logger';

export default once((): (() => Promise<SQSClient>) => {
return once(async () => {
Comment on lines +6 to +7
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@PrinceWaune , double once? 👀

logger.info('Creating SQS connection');

return new SQSClient({
...(config.aws.region ? { region: config.aws.region } : null),
...(config.aws.accessKeyId && config.aws.secretAccessKey
? {
credentials: {
accessKeyId: config.aws.accessKeyId as string,
secretAccessKey: config.aws.secretAccessKey as string,
},
}
: null),
});
});
});
Loading