Skip to content

feat: add emitterFactory and friends #342

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Sep 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
566 changes: 463 additions & 103 deletions package-lock.json

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,10 @@
"@types/axios": "^0.14.0",
"@types/chai": "^4.2.11",
"@types/cucumber": "^6.0.1",
"@types/got": "^9.6.11",
"@types/mocha": "^7.0.2",
"@types/node": "^13.13.9",
"@types/superagent": "^4.1.10",
"@types/uuid": "^8.0.0",
"@typescript-eslint/eslint-plugin": "^3.4.0",
"@typescript-eslint/parser": "^3.4.0",
Expand All @@ -123,12 +125,14 @@
"eslint-plugin-import": "^2.20.2",
"eslint-plugin-node": "^11.1.0",
"eslint-plugin-prettier": "^3.1.4",
"got": "^11.7.0",
"http-parser-js": "^0.5.2",
"mocha": "~7.1.1",
"nock": "~12.0.3",
"nyc": "~15.0.0",
"prettier": "^2.0.5",
"standard-version": "^9.0.0",
"superagent": "^6.1.0",
"ts-node": "^8.10.2",
"typedoc": "^0.18.0",
"typedoc-clarity-theme": "~1.1.0",
Expand Down
13 changes: 12 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,14 @@ import { CloudEvent, Version } from "./event/cloudevent";
import { ValidationError } from "./event/validation";
import { CloudEventV03, CloudEventV03Attributes, CloudEventV1, CloudEventV1Attributes } from "./event/interfaces";

import { Emitter, TransportOptions } from "./transport/emitter";
import {
Emitter,
TransportOptions,
Options,
TransportFunction,
EmitterFunction,
emitterFor,
} from "./transport/emitter";
import { Receiver } from "./transport/receiver";
import { Protocol } from "./transport/protocols";
import { Headers, Mode, Binding, HTTP, Message, Serializer, Deserializer, headersFor } from "./message";
Expand Down Expand Up @@ -32,6 +39,10 @@ export {
Receiver, // TODO: Deprecated. Remove for 4.0
Protocol, // TODO: Deprecated. Remove for 4.0
TransportOptions, // TODO: Deprecated. Remove for 4.0
TransportFunction,
EmitterFunction,
emitterFor,
Options,
// From Constants
CONSTANTS,
};
10 changes: 7 additions & 3 deletions src/message/http/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@ import { Base64Parser, JSONParser, MappedParser, Parser, parserByContentType } f
// implements Serializer
export function binary(event: CloudEvent): Message {
const contentType: Headers = { [CONSTANTS.HEADER_CONTENT_TYPE]: CONSTANTS.DEFAULT_CONTENT_TYPE };
const headers: Headers = headersFor(event);
const headers: Headers = { ...contentType, ...headersFor(event) };
let body = asData(event.data, event.datacontenttype as string);
if (typeof body === "object") {
body = JSON.stringify(body);
}
return {
headers: { ...contentType, ...headers },
body: asData(event.data, event.datacontenttype as string),
headers,
body,
};
}

Expand Down
85 changes: 71 additions & 14 deletions src/transport/emitter.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import { CloudEvent } from "../event/cloudevent";
import { emitBinary, emitStructured } from "./http";
import { axiosEmitter } from "./http";
import { Protocol } from "./protocols";
import { AxiosResponse } from "axios";
import { Agent } from "http";
import { HTTP, Message, Mode } from "../message";

/**
* Options supplied to the Emitter when sending an event.
* In addition to url and protocol, TransportOptions may
* also accept custom options that will be passed to the
* Node.js http functions.
* @deprecated will be removed in 4.0.0
*/
export interface TransportOptions {
/**
Expand All @@ -26,8 +27,62 @@ export interface TransportOptions {
[key: string]: string | Record<string, unknown> | Protocol | Agent | undefined;
}

interface EmitterFunction {
(event: CloudEvent, options: TransportOptions): Promise<AxiosResponse>;
/**
* Options is an additional, optional dictionary of options that may
* be passed to an EmitterFunction and TransportFunction
*/
export interface Options {
[key: string]: string | Record<string, unknown> | unknown;
}

/**
* EmitterFunction is an invokable interface returned by the emitterFactory
* function. Invoke an EmitterFunction with a CloudEvent and optional transport
* options to send the event as a Message across supported transports.
*/
export interface EmitterFunction {
(event: CloudEvent, options?: Options): Promise<unknown>;
}

/**
* TransportFunction is an invokable interface provided to the emitterFactory.
* A TransportFunction's responsiblity is to send a JSON encoded event Message
* across the wire.
*/
export interface TransportFunction {
(message: Message, options?: Options): Promise<unknown>;
}

/**
* emitterFactory creates and returns an EmitterFunction using the supplied
* TransportFunction. The returned EmitterFunction will invoke the Binding's
* `binary` or `structured` function to convert a CloudEvent into a JSON
* Message based on the Mode provided, and invoke the TransportFunction with
* the Message and any supplied options.
*
* @param {TransportFunction} fn a TransportFunction that can accept an event Message
* @param { {Binding, Mode} } options network binding and message serialization options
* @param {Binding} options.binding a transport binding, e.g. HTTP
* @param {Mode} options.mode the encoding mode (Mode.BINARY or Mode.STRUCTURED)
* @returns {EmitterFunction} an EmitterFunction to send events with
*/
export function emitterFor(fn: TransportFunction, options = { binding: HTTP, mode: Mode.BINARY }): EmitterFunction {
if (!fn) {
throw new TypeError("A TransportFunction is required");
}
const { binding, mode } = options;
return function emit(event: CloudEvent, options?: Options): Promise<unknown> {
options = options || {};

switch (mode) {
case Mode.BINARY:
return fn(binding.binary(event), options);
case Mode.STRUCTURED:
return fn(binding.structured(event), options);
default:
throw new TypeError(`Unexpected transport mode: ${mode}`);
}
};
}

/**
Expand All @@ -36,19 +91,21 @@ interface EmitterFunction {
*
* @see https://github.com/cloudevents/spec/blob/v1.0/http-protocol-binding.md
* @see https://github.com/cloudevents/spec/blob/v1.0/http-protocol-binding.md#13-content-modes
* @deprecated Will be removed in 4.0.0. Consider using the emitterFactory
*
*/
export class Emitter {
url?: string;
protocol: Protocol;
emitter: EmitterFunction;
binaryEmitter: EmitterFunction;
structuredEmitter: EmitterFunction;

constructor(options: TransportOptions = { protocol: Protocol.HTTPBinary }) {
this.protocol = options.protocol as Protocol;
this.url = options.url;
this.emitter = emitBinary;
if (this.protocol === Protocol.HTTPStructured) {
this.emitter = emitStructured;
}

this.binaryEmitter = emitterFor(axiosEmitter(this.url as string));
this.structuredEmitter = emitterFor(axiosEmitter(this.url as string), { binding: HTTP, mode: Mode.STRUCTURED });
}

/**
Expand All @@ -63,15 +120,15 @@ export class Emitter {
* In that case, it will be used as the recipient endpoint. The endpoint can
* be overridden by providing a URL here.
* @returns {Promise} Promise with an eventual response from the receiver
* @deprecated Will be removed in 4.0.0. Consider using the Message interface with HTTP.[binary|structured](event)
* @deprecated Will be removed in 4.0.0. Consider using the emitterFactory
*/
send(event: CloudEvent, options?: TransportOptions): Promise<AxiosResponse> {
send(event: CloudEvent, options?: TransportOptions): Promise<unknown> {
options = options || {};
options.url = options.url || this.url;
if (options.protocol != this.protocol) {
if (this.protocol === Protocol.HTTPBinary) return emitBinary(event, options);
return emitStructured(event, options);
if (this.protocol === Protocol.HTTPBinary) return this.binaryEmitter(event, options);
return this.structuredEmitter(event, options);
}
return this.emitter(event, options);
return this.binaryEmitter(event, options);
}
}
32 changes: 0 additions & 32 deletions src/transport/http/binary_emitter.ts

This file was deleted.

19 changes: 17 additions & 2 deletions src/transport/http/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,17 @@
export * from "./binary_emitter";
export * from "./structured_emitter";
import { Message, Options } from "../..";
import axios from "axios";

export function axiosEmitter(sink: string) {
return function (message: Message, options?: Options): Promise<unknown> {
options = { ...options };
const headers = {
...message.headers,
...(options.headers as Record<string, string>),
};
delete options.headers;
return axios.post(sink, message.body, {
headers: headers,
...options,
});
};
}
20 changes: 0 additions & 20 deletions src/transport/http/structured_emitter.ts

This file was deleted.

Loading