Skip to content

Commit 3f91da0

Browse files
committed
fixup: incorporate PR feedback on how to handle partition keys
Also, incorporates all of the changes from cloudevents#457 Signed-off-by: Lance Ball <[email protected]>
1 parent ebd9284 commit 3f91da0

File tree

4 files changed

+147
-132
lines changed

4 files changed

+147
-132
lines changed

src/index.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@ import { ValidationError } from "./event/validation";
88
import { CloudEventV1, CloudEventV1Attributes } from "./event/interfaces";
99

1010
import { Options, TransportFunction, EmitterFunction, emitterFor, Emitter } from "./transport/emitter";
11-
import { Headers, Mode, Binding, HTTP, Message, Serializer, Deserializer } from "./message";
11+
import {
12+
Headers, Mode, Binding, HTTP, Kafka, KafkaEvent, KafkaMessage, Message,
13+
Serializer, Deserializer } from "./message";
1214

1315
import CONSTANTS from "./constants";
1416

@@ -27,6 +29,9 @@ export {
2729
Deserializer,
2830
Serializer,
2931
HTTP,
32+
Kafka,
33+
KafkaEvent,
34+
KafkaMessage,
3035
// From transport
3136
TransportFunction,
3237
EmitterFunction,

src/message/kafka/headers.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
SPDX-License-Identifier: Apache-2.0
44
*/
55

6-
import { CloudEvent, CONSTANTS, Headers } from "../..";
6+
import { CloudEventV1, CONSTANTS, Headers } from "../..";
77

88
type KafkaHeaders = Readonly<{
99
ID: string;
@@ -55,7 +55,7 @@ export const HEADER_MAP: { [key: string]: string } = {
5555
* @param {CloudEvent} event a CloudEvent object
5656
* @returns {Headers} the CloudEvent attribute as Kafka headers
5757
*/
58-
export function headersFor<T>(event: CloudEvent<T>): Headers {
58+
export function headersFor<T>(event: CloudEventV1<T>): Headers {
5959
const headers: Headers = {};
6060

6161
Object.getOwnPropertyNames(event).forEach((property) => {

src/message/kafka/index.ts

Lines changed: 44 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,13 @@ import { Message, Headers, Binding } from "..";
88
import { headersFor, HEADER_MAP, KAFKA_CE_HEADERS } from "./headers";
99
import { sanitize } from "../http/headers";
1010

11+
// Export the binding implementation and message interface
12+
export {
13+
Kafka,
14+
KafkaMessage,
15+
KafkaEvent
16+
};
17+
1118
/**
1219
* Bindings for Kafka transport
1320
* @implements {@linkcode Binding}
@@ -19,45 +26,39 @@ import { sanitize } from "../http/headers";
1926
isEvent: isKafkaEvent,
2027
};
2128

29+
type Key = string | Buffer;
30+
2231
/**
2332
* Extends the base Message type to include
2433
* Kafka-specific fields
2534
*/
2635
interface KafkaMessage<T = string> extends Message {
27-
key: string | Buffer
36+
key: Key
2837
value: T | string | Buffer | unknown
2938
timestamp?: string
3039
}
3140

3241
/**
33-
* Extends the base CloudEvent type to include
34-
* the Kafka-specific `partitionkey` field, which
35-
* is explicitly mapped to `KafkaMessage#key`.
42+
* Extends the base CloudEventV1 interface to include a `partitionkey` field
43+
* which is explicitly mapped to KafkaMessage#key
3644
*/
3745
interface KafkaEvent<T> extends CloudEventV1<T> {
3846
/**
3947
* Maps to KafkaMessage#key per
4048
* https://github.com/cloudevents/spec/blob/v1.0.1/kafka-protocol-binding.md#31-key-mapping
4149
*/
42-
partitionkey: string | Buffer
50+
partitionkey: Key
4351
}
4452

45-
// Export the binding implementation and message interface
46-
export {
47-
Kafka,
48-
KafkaMessage,
49-
KafkaEvent
50-
};
51-
5253
/**
5354
* Serialize a CloudEvent for Kafka in binary mode
5455
* @implements {Serializer}
5556
* @see https://github.com/cloudevents/spec/blob/v1.0.1/kafka-protocol-binding.md#32-binary-content-mode
5657
*
57-
* @param {CloudEvent<T>} event The event to serialize
58+
* @param {KafkaEvent<T>} event The event to serialize
5859
* @returns {KafkaMessage<T>} a KafkaMessage instance
5960
*/
60-
function toBinaryKafkaMessage<T>(event: CloudEvent<T>): KafkaMessage<T> {
61+
function toBinaryKafkaMessage<T>(event: CloudEventV1<T>): KafkaMessage<T> {
6162
// 3.2.1. Content Type
6263
// For the binary mode, the header content-type property MUST be mapped directly
6364
// to the CloudEvents datacontenttype attribute.
@@ -67,7 +68,7 @@ function toBinaryKafkaMessage<T>(event: CloudEvent<T>): KafkaMessage<T> {
6768
};
6869
return {
6970
headers,
70-
key: createKey(event),
71+
key: event.partitionkey as Key,
7172
value: event.data,
7273
body: event.data,
7374
timestamp: timestamp(event.time)
@@ -82,14 +83,17 @@ function toBinaryKafkaMessage<T>(event: CloudEvent<T>): KafkaMessage<T> {
8283
* @param {CloudEvent<T>} event the CloudEvent to be serialized
8384
* @returns {KafkaMessage<T>} a KafkaMessage instance
8485
*/
85-
function toStructuredKafkaMessage<T>(event: CloudEvent<T>): KafkaMessage<T> {
86-
if (event.data_base64) {
86+
function toStructuredKafkaMessage<T>(event: CloudEventV1<T>): KafkaMessage<T> {
87+
if (event.data_base64 && event instanceof CloudEvent) {
8788
// The event's data is binary - delete it
8889
event = event.cloneWith({ data: undefined });
8990
}
9091
const value = event.toString();
9192
return {
92-
key: createKey(event),
93+
// All events may not have a partitionkey set, but if they do,
94+
// use it for the KafkaMessage#key per
95+
// https://github.com/cloudevents/spec/blob/v1.0.1/kafka-protocol-binding.md#31-key-mapping
96+
key: event.partitionkey as Key,
9397
value,
9498
headers: {
9599
[CONSTANTS.HEADER_CONTENT_TYPE]: CONSTANTS.DEFAULT_CE_CONTENT_TYPE,
@@ -104,27 +108,28 @@ function toBinaryKafkaMessage<T>(event: CloudEvent<T>): KafkaMessage<T> {
104108
* @implements {Deserializer}
105109
*
106110
* @param {Message} message the incoming message
107-
* @return {CloudEvent} A new {CloudEvent} instance
111+
* @return {KafkaEvent} A new {KafkaEvent} instance
108112
*/
109-
function deserializeKafkaMessage<T>(message: Message): CloudEventV1<T> | CloudEvent<T> | CloudEvent<T>[] {
113+
function deserializeKafkaMessage<T>(message: Message): CloudEvent<T> | CloudEvent<T>[] {
110114
if (!isKafkaEvent(message)) {
111115
throw new ValidationError("No CloudEvent detected");
112116
}
113-
if (!(message as KafkaMessage<T>).value) {
117+
const m = message as KafkaMessage<T>;
118+
if (!m.value) {
114119
throw new ValidationError("Value is null or undefined");
115120
}
116-
if (!message.headers) {
121+
if (!m.headers) {
117122
throw new ValidationError("Headers are null or undefined");
118123
}
119-
const cleanHeaders: Headers = sanitize(message.headers);
124+
const cleanHeaders: Headers = sanitize(m.headers);
120125
const mode: Mode = getMode(cleanHeaders);
121126
switch (mode) {
122127
case Mode.BINARY:
123-
return parseBinary(message as KafkaMessage<T>) as CloudEventV1<T>;
128+
return parseBinary(m);
124129
case Mode.STRUCTURED:
125-
return parseStructured(message as KafkaMessage<T>);
130+
return parseStructured(m);
126131
case Mode.BATCH:
127-
return parseBatched(message as KafkaMessage<T>);
132+
return parseBatched(m);
128133
default:
129134
throw new ValidationError("Unknown Message mode");
130135
}
@@ -137,22 +142,13 @@ function deserializeKafkaMessage<T>(message: Message): CloudEventV1<T> | CloudEv
137142
* @param {Message} message an incoming Message object
138143
* @returns {boolean} true if this Message is a CloudEvent
139144
*/
140-
function isKafkaEvent<T>(message: Message<T>): boolean {
145+
function isKafkaEvent(message: Message): boolean {
141146
const headers = sanitize(message.headers);
142147
return !!headers[KAFKA_CE_HEADERS.ID] || // A binary mode event
143148
headers[CONSTANTS.HEADER_CONTENT_TYPE]?.startsWith(CONSTANTS.MIME_CE) as boolean || // A structured mode event
144149
headers[CONSTANTS.HEADER_CONTENT_TYPE]?.startsWith(CONSTANTS.MIME_CE_BATCH) as boolean; // A batch of events
145150
}
146151

147-
/**
148-
* Creates a Kafka key for the event provided
149-
* @param {CloudEvent} e an event
150-
* @returns {string} the Kafka key
151-
*/
152-
function createKey<T>(e: CloudEvent<T>): string {
153-
return `${e.source}/${e.id}`;
154-
}
155-
156152
/**
157153
* Determines what content mode a Kafka message is in given the provided headers
158154
* @param {Headers} headers the headers
@@ -175,7 +171,7 @@ function getMode(headers: Headers): Mode {
175171
* @param {KafkaMessage} message the message
176172
* @returns {CloudEvent<T>} a CloudEvent<T>
177173
*/
178-
function parseBinary<T>(message: KafkaMessage<T>): KafkaEvent<T> {
174+
function parseBinary<T>(message: KafkaMessage<T>): CloudEvent<T> {
179175
const eventObj: { [key: string ]: unknown } = {};
180176
const headers = { ...message.headers };
181177

@@ -196,7 +192,7 @@ function parseBinary<T>(message: KafkaMessage<T>): KafkaEvent<T> {
196192
}
197193
}
198194

199-
return new KafkaEvent({
195+
return new CloudEvent({
200196
...eventObj,
201197
data: extractBinaryData(message),
202198
partitionkey: message.key,
@@ -205,8 +201,8 @@ function parseBinary<T>(message: KafkaMessage<T>): KafkaEvent<T> {
205201

206202
/**
207203
* Parses a structured kafka CE message and returns a CloudEvent
208-
* @param {KafkaMessage} message the message
209-
* @returns {CloudEvent<T>} a CloudEvent<T>
204+
* @param {KafkaMessage<T>} message the message
205+
* @returns {CloudEvent<T>} a KafkaEvent<T>
210206
*/
211207
function parseStructured<T>(message: KafkaMessage<T>): CloudEvent<T> {
212208
// Although the format of a structured encoded event could be something
@@ -215,13 +211,17 @@ function parseStructured<T>(message: KafkaMessage<T>): CloudEvent<T> {
215211
if (!message.headers[CONSTANTS.HEADER_CONTENT_TYPE]?.startsWith(CONSTANTS.MIME_CE_JSON)) {
216212
throw new ValidationError(`Unsupported event encoding ${message.headers[CONSTANTS.HEADER_CONTENT_TYPE]}`);
217213
}
218-
return new CloudEvent(JSON.parse(message.value as string), false);
214+
215+
return new CloudEvent({
216+
...JSON.parse(message.value as string),
217+
partitionkey: message.key,
218+
}, false);
219219
}
220220

221221
/**
222222
* Parses a batch kafka CE message and returns a CloudEvent[]
223-
* @param {KafkaMessage} message the message
224-
* @returns {CloudEvent<T>} a CloudEvent<T>[]
223+
* @param {KafkaMessage<T>} message the message
224+
* @returns {CloudEvent<T>[]} an array of KafkaEvent<T>
225225
*/
226226
function parseBatched<T>(message: KafkaMessage<T>): CloudEvent<T>[] {
227227
// Although the format of batch encoded events could be something
@@ -231,7 +231,7 @@ function parseBatched<T>(message: KafkaMessage<T>): CloudEvent<T>[] {
231231
throw new ValidationError(`Unsupported event encoding ${message.headers[CONSTANTS.HEADER_CONTENT_TYPE]}`);
232232
}
233233
const events = JSON.parse(message.value as string) as Record<string, unknown>[];
234-
return events.map((e) => new CloudEvent(e, false));
234+
return events.map((e) => new CloudEvent({ ...e, partitionkey: message.key }, false));
235235
}
236236

237237
/**

0 commit comments

Comments
 (0)