Skip to content

Emitter #366

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Nov 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .eslintrc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
}],
"valid-jsdoc": "warn",
"semi": ["error", "always"],
"quotes": ["error", "double", { "allowTemplateLiterals": true }]
"quotes": ["error", "double", { "allowTemplateLiterals": true }],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modifying ESlint rules is great, but really out of scope for a PR about emitters.

I would imagine 2 PRs that are separate:

  • One PR the rules for quotes/any.
  • One PR that changes the emitter.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would imagine 2 PRs that are separate:

I think the point of this change is that the existing linter rules was causing this PR to generate warnings during the build. See #366 (comment). I'm not sure of the benefit returned by extracting this small change into a separate PR, landing that first, then landing this. In fact, I recall recently that you were not happy about the number of commits in this repo - this seems counter to that complaint! 😉

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey, I'd quickly approve a change/fix to the eslint rules, but I don't want to approve a change that includes while (msg === undefined) {. The eslint change is a different decision than emitter functionality and would like to be merged even if any emitter PRs were not merge. Does that make sense?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The eslint change is a different decision than emitter functionality and would like to be merged even if any emitter PRs were not merge. Does that make sense?

Sure. It just seems pedantic to demand this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lance, I'm not demanding. This is just a review suggestion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The goal was to merge the PR in a timely manner so at that time we thought it would make sense to have only one PR

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having a PR reviewed in a timely manner is fair. In general, it would be easier to not go back and forth and review faster with a GitHub issue and small PR. There have been discussions about removing this before, and we just talked about splitting this repo into different modules, so it's difficult to approve this PR right away.

That said, this repo is likely going to change a bit with the split, and I don't want to block you if Lance approves.

"@typescript-eslint/no-explicit-any": "off"
}
}
86 changes: 58 additions & 28 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# JavaScript SDK for CloudEvents

[![Codacy Badge](https://api.codacy.com/project/badge/Grade/bd66e7c52002481993cd6d610534b0f7)](https://www.codacy.com/app/fabiojose/sdk-javascript?utm_source=github.com&utm_medium=referral&utm_content=cloudevents/sdk-javascript&utm_campaign=Badge_Grade)
[![Codacy Badge](https://api.codacy.com/project/badge/Coverage/bd66e7c52002481993cd6d610534b0f7)](https://www.codacy.com/app/fabiojose/sdk-javascript?utm_source=github.com&utm_medium=referral&utm_content=cloudevents/sdk-javascript&utm_campaign=Badge_Coverage)
[![Codacy Badge](https://api.codacy.com/project/badge/Grade/bd66e7c52002481993cd6d610534b0f7)](https://www.codacy.com/app/fabiojose/sdk-javascript?utm_source=github.com&utm_medium=referral&utm_content=cloudevents/sdk-javascript&utm_campaign=Badge_Grade)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Modifying the URLs and list markdown isn't really in scope for this Emitter PR

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tend to agree with this, but don't find it objectionable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The markdown linter should be also set to avoid this in the future

[![Codacy Badge](https://api.codacy.com/project/badge/Coverage/bd66e7c52002481993cd6d610534b0f7)](https://www.codacy.com/app/fabiojose/sdk-javascript?utm_source=github.com&utm_medium=referral&utm_content=cloudevents/sdk-javascript&utm_campaign=Badge_Coverage)
![Node.js CI](https://github.com/cloudevents/sdk-javascript/workflows/Node.js%20CI/badge.svg)
[![npm version](https://img.shields.io/npm/v/cloudevents.svg)](https://www.npmjs.com/package/cloudevents)
[![vulnerabilities](https://snyk.io/test/github/cloudevents/sdk-javascript/badge.svg)](https://snyk.io/test/github/cloudevents/sdk-javascript)
Expand All @@ -10,9 +10,9 @@ The CloudEvents SDK for JavaScript.

## Features

* Represent CloudEvents in memory
* Serialize and deserialize CloudEvents in different [event formats](https://github.com/cloudevents/spec/blob/v1.0/spec.md#event-format).
* Send and recieve CloudEvents with via different [protocol bindings](https://github.com/cloudevents/spec/blob/v1.0/spec.md#protocol-binding).
- Represent CloudEvents in memory
- Serialize and deserialize CloudEvents in different [event formats](https://github.com/cloudevents/spec/blob/v1.0/spec.md#event-format).
- Send and recieve CloudEvents with via different [protocol bindings](https://github.com/cloudevents/spec/blob/v1.0/spec.md#protocol-binding).

_Note:_ Supports CloudEvent versions 0.3, 1.0

Expand Down Expand Up @@ -51,16 +51,15 @@ using the `HTTP` binding to create a `Message` which has properties
for `headers` and `body`.

```js
const axios = require('axios').default;
const axios = require("axios").default;
const { HTTP } = require("cloudevents");


const ce = new CloudEvent({ type, source, data })
const ce = new CloudEvent({ type, source, data });
const message = HTTP.binary(ce); // Or HTTP.structured(ce)

axios({
method: 'post',
url: '...',
method: "post",
url: "...",
data: message.body,
headers: message.headers,
});
Expand All @@ -69,16 +68,16 @@ axios({
You may also use the `emitterFor()` function as a convenience.

```js
const axios = require('axios').default;
const axios = require("axios").default;
const { emitterFor, Mode } = require("cloudevents");

function sendWithAxios(message) {
// Do what you need with the message headers
// and body in this function, then send the
// event
axios({
method: 'post',
url: '...',
method: "post",
url: "...",
data: message.body,
headers: message.headers,
});
Expand All @@ -88,9 +87,38 @@ const emit = emitterFor(sendWithAxios, { mode: Mode.BINARY });
emit(new CloudEvent({ type, source, data }));
```

You may also use the `Emitter` singleton

```js
const axios = require("axios").default;
const { emitterFor, Mode, CloudEvent, Emitter } = require("cloudevents");

function sendWithAxios(message) {
// Do what you need with the message headers
// and body in this function, then send the
// event
axios({
method: "post",
url: "...",
data: message.body,
headers: message.headers,
});
}

const emit = emitterFor(sendWithAxios, { mode: Mode.BINARY });
// Set the emit
Emitter.on("cloudevent", emit);

...
// In any part of the code will send the event
new CloudEvent({ type, source, data }).emit();

// You can also have several listener to send the event to several endpoint
```

## CloudEvent Objects

All created `CloudEvent` objects are read-only. If you need to update a property or add a new extension to an existing cloud event object, you can use the `cloneWith` method. This will return a new `CloudEvent` with any update or new properties. For example:
All created `CloudEvent` objects are read-only. If you need to update a property or add a new extension to an existing cloud event object, you can use the `cloneWith` method. This will return a new `CloudEvent` with any update or new properties. For example:

```js
const {
Expand All @@ -112,24 +140,26 @@ There you will find Express.js, TypeScript and Websocket examples.

## Supported specification features

| Core Specification | [v0.3](https://github.com/cloudevents/spec/blob/v0.3/spec.md) | [v1.0](https://github.com/cloudevents/spec/blob/v1.0/spec.md) |
| ----------------------------- | --- | --- |
| CloudEvents Core | :heavy_check_mark: | :heavy_check_mark: |
| Core Specification | [v0.3](https://github.com/cloudevents/spec/blob/v0.3/spec.md) | [v1.0](https://github.com/cloudevents/spec/blob/v1.0/spec.md) |
| ------------------ | ------------------------------------------------------------- | ------------------------------------------------------------- |
| CloudEvents Core | :heavy_check_mark: | :heavy_check_mark: |

---

| Event Formats | [v0.3](https://github.com/cloudevents/spec/tree/v0.3) | [v1.0](https://github.com/cloudevents/spec/tree/v1.0) |
| ----------------------------- | --- | --- |
| AVRO Event Format | :x: | :x: |
| JSON Event Format | :heavy_check_mark: | :heavy_check_mark: |
| Event Formats | [v0.3](https://github.com/cloudevents/spec/tree/v0.3) | [v1.0](https://github.com/cloudevents/spec/tree/v1.0) |
| ----------------- | ----------------------------------------------------- | ----------------------------------------------------- |
| AVRO Event Format | :x: | :x: |
| JSON Event Format | :heavy_check_mark: | :heavy_check_mark: |

---

| Transport Protocols | [v0.3](https://github.com/cloudevents/spec/tree/v0.3) | [v1.0](https://github.com/cloudevents/spec/tree/v1.0) |
| ----------------------------- | --- | --- |
| AMQP Protocol Binding | :x: | :x: |
| HTTP Protocol Binding | :heavy_check_mark: | :heavy_check_mark: |
| Kafka Protocol Binding | :x: | :x: |
| MQTT Protocol Binding | :x: | :x: |
| NATS Protocol Binding | :x: | :x: |
| Transport Protocols | [v0.3](https://github.com/cloudevents/spec/tree/v0.3) | [v1.0](https://github.com/cloudevents/spec/tree/v1.0) |
| ---------------------- | ----------------------------------------------------- | ----------------------------------------------------- |
| AMQP Protocol Binding | :x: | :x: |
| HTTP Protocol Binding | :heavy_check_mark: | :heavy_check_mark: |
| Kafka Protocol Binding | :x: | :x: |
| MQTT Protocol Binding | :x: | :x: |
| NATS Protocol Binding | :x: | :x: |

## Community

Expand Down
12 changes: 12 additions & 0 deletions src/event/cloudevent.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { v4 as uuidv4 } from "uuid";
import { Emitter } from "..";

import {
CloudEventV03,
Expand Down Expand Up @@ -167,6 +168,17 @@ export class CloudEvent implements CloudEventV1, CloudEventV03 {
}
}

/**
* Emit this CloudEvent through the application
*
* @param {boolean} ensureDelivery fail the promise if one listener fail
* @return {Promise<CloudEvent>} this
*/
public async emit(ensureDelivery = true): Promise<this> {
await Emitter.emitEvent(this, ensureDelivery);
return this;
}

/**
* Clone a CloudEvent with new/update attributes
* @param {object} options attributes to augment the CloudEvent with
Expand Down
3 changes: 2 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { CloudEvent, Version } from "./event/cloudevent";
import { ValidationError } from "./event/validation";
import { CloudEventV03, CloudEventV03Attributes, CloudEventV1, CloudEventV1Attributes } from "./event/interfaces";

import { Options, TransportFunction, EmitterFunction, emitterFor } from "./transport/emitter";
import { Options, TransportFunction, EmitterFunction, emitterFor, Emitter } from "./transport/emitter";
import { Headers, Mode, Binding, HTTP, Message, Serializer, Deserializer } from "./message";

import CONSTANTS from "./constants";
Expand All @@ -28,6 +28,7 @@ export {
TransportFunction,
EmitterFunction,
emitterFor,
Emitter,
Options,
// From Constants
CONSTANTS,
Expand Down
64 changes: 64 additions & 0 deletions src/transport/emitter.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { CloudEvent } from "../event/cloudevent";
import { HTTP, Message, Mode } from "../message";
import { EventEmitter } from "events";

/**
* Options is an additional, optional dictionary of options that may
Expand Down Expand Up @@ -58,3 +59,66 @@ export function emitterFor(fn: TransportFunction, options = { binding: HTTP, mod
}
};
}

/**
* A static class to emit CloudEvents within an application
*/
export class Emitter extends EventEmitter {
/**
* Singleton store
*/
static instance: Emitter | undefined = undefined;

/**
* Create an Emitter
* On v4.0.0 this class will only remains as Singleton to allow using the
* EventEmitter of NodeJS
*/
private constructor() {
super();
}

/**
* Return or create the Emitter singleton
*
* @return {Emitter} return Emitter singleton
*/
static getInstance(): Emitter {
if (!Emitter.instance) {
Emitter.instance = new Emitter();
}
return Emitter.instance;
}

/**
* Add a listener for eventing
*
* @param {string} event type to listen to
* @param {Function} listener to call on event
* @return {void}
*/
static on(event: "cloudevent" | "newListener" | "removeListener", listener: (...args: any[]) => void): void {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we creating an event listening system for an event emitter?

I do not expect an eventing system like this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a shortcut to avoid the getSingleton that you did not like, I started to reimplement the EventEmitter interface, but found it overkill, as there are several other methods that can be useful. So instead of reinventing the wheel, I just added a shortcut method to streamline the process

Copy link
Member

@lance lance Nov 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my opinion, the example added to the README is very illustrative. This PR makes it so that you can set up your emitter function using emitterFor(), then have that respond to all cloudevent events in the Node event system by actually sending the CloudEvent. To me that seems quite valuable. Imagine that you have a large system that is pushing out CloudEvents, perhaps even to multiple different endpoints. You could add multiple emitter functions at startup that each send to a different endpoint. Then by simply calling new CloudEvent({...}).emit(); the CloudEvents is sent - with that single call - to all endpoints. I like it a lot.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this was the exact purpose, thanks for summarizing it so well

Copy link
Member

@grant grant Nov 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, the README example is great.

const axios = require("axios").default;
const { emitterFor, Mode, CloudEvent, Emitter } = require("cloudevents");
function sendWithAxios(message) {
  // Do what you need with the message headers
  // and body in this function, then send the
  // event
  axios({
    method: "post",
    url: "...",
    data: message.body,
    headers: message.headers,
  });
}
const emit = emitterFor(sendWithAxios, { mode: Mode.BINARY });
// Set the emit
Emitter.on("cloudevent", emit);
...
// In any part of the code will send the event
new CloudEvent({ type, source, data }).emit();

I think the fundamental question is why are we adding back the emit function? And adding a ensureDelivery param?

A CloudEvent object and a CloudEvent emitter/receiver seem like separate concepts. So I wouldn't imagine a CloudEvent has a send/receive function.

const axios = require("axios").default;
const { emitterFor, Mode, CloudEvent, Emitter } = require("cloudevents");
function sendWithAxios(message) {
  // Do what you need with the message headers
  // and body in this function, then send the
  // event
  axios({
    method: "post",
    url: "...",
    data: message.body,
    headers: message.headers,
  });
}
const emit = emitterFor(sendWithAxios, { mode: Mode.BINARY });

// In any part of the code will send the event
const ce = new CloudEvent({ type, source, data });
emit(ce);

Folks can add an eventing layer on-top of the primitives here. Maybe point out an error in the above alternative code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The emitEvent function was adding because EventEmitter is not able to "wait" for all listeners to finish async jobs.
You might by design consider that you need to fail the whole process if you were not able to send the event, or you can decide to let it go depending on how critical to your business the CloudEvent is.
This is the SDK so I think it should be able to answer both needs instead of limiting to one interpretation.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I wouldn't imagine a CloudEvent has a send/receive function.

Just to be clear, we are talking about two separate concepts. It's easy to get things mixed up when all of the terms are so similar. The two separate things as I see it are:

  1. Sending a CloudEvent over some network transport using the emitterFor() factory to create a user defined function that is capable of converting a CloudEvent into a Message and sending that over the wire with a network transport (typically HTTP).
  2. Using the Node.js built in EventEmitter capabilities to notify a listener that a CloudEvent should be sent using the method described in 1) point above. The listener in this case is the transport function created with emitterFor(). The new CloudEvent#emit() function just facilitates sending these using the Emitter.emitEvent() function.

This pull request is about using what is described in 2) to facilitate and orchestrate 1) in an efficient way across a large system that may need to send CloudEvents over network transport to multiple endpoints and do so efficiently by making use of built in Node.js EventEmitter capabilities.

Maybe point out an error in the above alternative code?

This pull request is not an alternative to that code. It builds upon that code to add a very useful feature, in my opinion, with very low overhead using Node.js built in, idiomatic capabilities.

this.getInstance().on(event, listener);
}

/**
* Emit an event inside this application
*
* @param {CloudEvent} event to emit
* @param {boolean} ensureDelivery fail the promise if one listener fail
* @return {void}
*/
static async emitEvent(event: CloudEvent, ensureDelivery = true): Promise<void> {
if (!ensureDelivery) {
// Ensure delivery is disabled so we don't wait for Promise
Emitter.getInstance().emit("cloudevent", event);
} else {
// Execute all listeners and wrap them in a Promise
await Promise.all(
Emitter.getInstance()
.listeners("cloudevent")
.map(async (l) => l(event)),
);
}
}
}
18 changes: 15 additions & 3 deletions test/integration/emitter_factory_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ const data = {
lunchBreak: "noon",
};

const fixture = new CloudEvent({
export const fixture = new CloudEvent({
source,
type,
data,
Expand Down Expand Up @@ -107,7 +107,13 @@ function testEmitter(fn: TransportFunction, bodyAttr: string) {
});
}

function assertBinary(response: Record<string, string>) {
/**
* Verify the received binary answer compare to the original fixture message
*
* @param {Record<string, Record<string, string>>} response received to compare to fixture
* @return {void} void
*/
export function assertBinary(response: Record<string, string>): void {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, this is not in scope with the PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The export was added to be able to share some logics and avoid duplicated code

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The export was added to be able to share some logics and avoid duplicated code

Makes sense to me.

expect(response.lunchBreak).to.equal(data.lunchBreak);
expect(response["ce-type"]).to.equal(type);
expect(response["ce-source"]).to.equal(source);
Expand All @@ -116,7 +122,13 @@ function assertBinary(response: Record<string, string>) {
expect(response[`ce-${ext3Name}`]).to.deep.equal(ext3Value);
}

function assertStructured(response: Record<string, Record<string, string>>) {
/**
* Verify the received structured answer compare to the original fixture message
*
* @param {Record<string, Record<string, string>>} response received to compare to fixture
* @return {void} void
*/
export function assertStructured(response: Record<string, Record<string, string>>): void {
expect(response.data.lunchBreak).to.equal(data.lunchBreak);
expect(response.type).to.equal(type);
expect(response.source).to.equal(source);
Expand Down
55 changes: 55 additions & 0 deletions test/integration/emitter_singleton_test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import "mocha";

import { emitterFor, HTTP, Mode, Message, Emitter } from "../../src";

import { fixture, assertStructured } from "./emitter_factory_test";

import { rejects, doesNotReject } from "assert";

describe("Emitter Singleton", () => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The casts used in this test seem very fishy (casting to unknown, then an interface).

Additionally, there are no comments, making the tests hard to understand.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is why we removed the eslint rule according with @lance

it("emit a Node.js 'cloudevent' event as an EventEmitter", async () => {
const msg: Message | unknown = await new Promise((resolve) => {
const fn = async (message: Message) => {
resolve(message);
};
const emitter = emitterFor(fn, { binding: HTTP, mode: Mode.STRUCTURED });
Emitter.on("cloudevent", emitter);

fixture.emit(false);
});
let body: unknown = (<Message>(<unknown>msg)).body;
if (typeof body === "string") {
body = JSON.parse(body);
}
assertStructured({ ...(<any>body), ...(<Message>(<unknown>msg)).headers });
});

it("emit a Node.js 'cloudevent' event as an EventEmitter with ensureDelivery", async () => {
let msg: Message | unknown = undefined;
const fn = async (message: Message) => {
msg = message;
};
const emitter = emitterFor(fn, { binding: HTTP, mode: Mode.STRUCTURED });
Emitter.on("cloudevent", emitter);
await fixture.emit(true);
let body: any = (<Message>msg).body;
if (typeof body === "string") {
body = JSON.parse(body);
}
assertStructured({ ...(<any>body), ...(<Message>(<unknown>msg)).headers });
});

it("emit a Node.js 'cloudevent' event as an EventEmitter with ensureDelivery Error", async () => {
const emitter = async () => {
throw new Error("Not sent");
};
Emitter.on("cloudevent", emitter);
// Should fail with emitWithEnsureDelivery
await rejects(() => fixture.emit(true));
// Should not fail with emitWithEnsureDelivery
// Work locally but not on Github Actions
if (!process.env.GITHUB_WORKFLOW) {
await doesNotReject(() => fixture.emit(false));
}
});
});