Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/ten-taxis-deliver.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@smithy/types": patch
"@smithy/core": patch
---

schema serde: http binding and cbor serializer refactoring
98 changes: 70 additions & 28 deletions packages/core/src/submodules/cbor/CborCodec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { NormalizedSchema } from "@smithy/core/schema";
import { copyDocumentWithTransform, parseEpochTimestamp } from "@smithy/core/serde";
import { Codec, Schema, SchemaRef, SerdeFunctions, ShapeDeserializer, ShapeSerializer } from "@smithy/types";
import { parseEpochTimestamp } from "@smithy/core/serde";
import { Codec, Schema, SerdeFunctions, ShapeDeserializer, ShapeSerializer } from "@smithy/types";

import { cbor } from "./cbor";
import { dateToTag } from "./parseCborBody";
Expand Down Expand Up @@ -40,38 +40,80 @@ export class CborShapeSerializer implements ShapeSerializer {
}

public write(schema: Schema, value: unknown): void {
this.value = copyDocumentWithTransform(value, schema, (_: any, schemaRef: SchemaRef) => {
if (_ instanceof Date) {
return dateToTag(_);
}
if (_ instanceof Uint8Array) {
return _;
}
this.value = this.serialize(schema, value);
}

const ns = NormalizedSchema.of(schemaRef);
const sparse = !!ns.getMergedTraits().sparse;
/**
* Recursive serializer transform that copies and prepares the user input object
* for CBOR serialization.
*/
public serialize(schema: Schema, source: unknown): any {
const ns = NormalizedSchema.of(schema);

if (ns.isListSchema() && Array.isArray(_)) {
if (!sparse) {
return _.filter((item) => item != null);
switch (typeof source) {
case "undefined":
return null;
case "boolean":
case "number":
case "string":
case "bigint":
case "symbol":
return source;
case "function":
case "object":
if (source === null) {
return null;
}
} else if (_ && typeof _ === "object") {
const members = ns.getMemberSchemas();
const isStruct = ns.isStructSchema();
if (!sparse || isStruct) {
for (const [k, v] of Object.entries(_)) {
const filteredOutByNonSparse = !sparse && v == null;
const filteredOutByUnrecognizedMember = isStruct && !(k in members);
if (filteredOutByNonSparse || filteredOutByUnrecognizedMember) {
delete _[k];

const sourceObject = source as Record<string, unknown>;
const sparse = !!ns.getMergedTraits().sparse;

if (ns.isListSchema() && Array.isArray(sourceObject)) {
const newArray = [];
let i = 0;
for (const item of sourceObject) {
const value = this.serialize(ns.getValueSchema(), item);
if (value != null || sparse) {
newArray[i++] = value;
}
}
return _;
return newArray;
}
}

return _;
});
if (sourceObject instanceof Uint8Array) {
const newBytes = new Uint8Array(sourceObject.byteLength);
newBytes.set(sourceObject, 0);
return newBytes;
}
if (sourceObject instanceof Date) {
return dateToTag(sourceObject);
}
const newObject = {} as any;
if (ns.isMapSchema()) {
for (const key of Object.keys(sourceObject)) {
const value = this.serialize(ns.getValueSchema(), sourceObject[key]);
if (value != null || sparse) {
newObject[key] = value;
}
}
} else if (ns.isStructSchema()) {
for (const [key, memberSchema] of ns.structIterator()) {
const value = this.serialize(memberSchema, sourceObject[key]);
if (value != null) {
newObject[key] = value;
}
}
} else if (ns.isDocumentSchema()) {
for (const key of Object.keys(sourceObject)) {
const value = this.serialize(ns.getValueSchema(), sourceObject[key]);
if (value != null) {
newObject[key] = value;
}
}
}
return newObject;
default:
return source;
}
}

public flush(): Uint8Array {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,9 @@ describe(SmithyRpcV2CborProtocol.name, () => {
0,
["mySparseList", "myRegularList", "mySparseMap", "myRegularMap"],
[
[() => list("", "MyList", { sparse: 1 }, SCHEMA.NUMERIC), {}],
[() => list("", "MySparseList", { sparse: 1 }, SCHEMA.NUMERIC), {}],
[() => list("", "MyList", {}, SCHEMA.NUMERIC), {}],
[() => map("", "MyMap", { sparse: 1 }, SCHEMA.STRING, SCHEMA.NUMERIC), {}],
[() => map("", "MySparseMap", { sparse: 1 }, SCHEMA.STRING, SCHEMA.NUMERIC), {}],
[() => map("", "MyMap", {}, SCHEMA.STRING, SCHEMA.NUMERIC), {}],
]
),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { op, SCHEMA, struct } from "@smithy/core/schema";
import { map, op, SCHEMA, struct } from "@smithy/core/schema";
import { HttpResponse } from "@smithy/protocol-http";
import {
Codec,
Expand All @@ -8,6 +8,8 @@ import {
MetadataBearer,
OperationSchema,
ResponseMetadata,
Schema,
SerdeFunctions,
ShapeDeserializer,
ShapeSerializer,
} from "@smithy/types";
Expand Down Expand Up @@ -41,9 +43,11 @@ describe(HttpBindingProtocol.name, () => {
public getShapeId(): string {
throw new Error("Method not implemented.");
}

public getPayloadCodec(): Codec<any, any> {
throw new Error("Method not implemented.");
}

protected handleError(
operationSchema: OperationSchema,
context: HandlerExecutionContext,
Expand Down Expand Up @@ -157,4 +161,53 @@ describe(HttpBindingProtocol.name, () => {
);
expect(request.path).toEqual("/custom/Operation");
});

it("can deserialize a prefix header binding and header binding from the same header", async () => {
type TestSignature = (
schema: Schema,
context: HandlerExecutionContext & SerdeFunctions,
response: IHttpResponse,
dataObject: any
) => Promise<string[]>;
const deserializeHttpMessage = ((StringRestProtocol.prototype as any).deserializeHttpMessage as TestSignature).bind(
{
deserializer: new FromStringShapeDeserializer({
httpBindings: true,
timestampFormat: {
useTrait: true,
default: SCHEMA.TIMESTAMP_EPOCH_SECONDS,
},
}),
}
);
const httpResponse: IHttpResponse = {
statusCode: 200,
headers: {
"my-header": "header-value",
},
};

const dataObject = {};
await deserializeHttpMessage(
struct(
"",
"Struct",
0,
["prefixHeaders", "header"],
[
[map("", "Map", 0, 0, 0), { httpPrefixHeaders: "my-" }],
[0, { httpHeader: "my-header" }],
]
),
{} as any,
httpResponse,
dataObject
);
expect(dataObject).toEqual({
prefixHeaders: {
header: "header-value",
},
header: "header-value",
});
});
});
127 changes: 127 additions & 0 deletions packages/core/src/submodules/protocols/HttpBindingProtocol.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
import { NormalizedSchema, SCHEMA } from "@smithy/core/schema";
import { splitEvery, splitHeader } from "@smithy/core/serde";
import { HttpRequest } from "@smithy/protocol-http";
import {
Endpoint,
EndpointBearer,
EventStreamSerdeContext,
HandlerExecutionContext,
HttpRequest as IHttpRequest,
HttpResponse as IHttpResponse,
MetadataBearer,
OperationSchema,
Schema,
SerdeFunctions,
} from "@smithy/types";
import { sdkStreamMixin } from "@smithy/util-stream";

import { collectBody } from "./collect-stream-body";
import { extendedEncodeURIComponent } from "./extended-encode-uri-component";
Expand Down Expand Up @@ -226,4 +230,127 @@ export abstract class HttpBindingProtocol extends HttpProtocol {

return output;
}

/**
* The base method ignores HTTP bindings.
*
* @deprecated (only this signature) use signature without headerBindings.
* @override
*/
protected async deserializeHttpMessage(
schema: Schema,
context: HandlerExecutionContext & SerdeFunctions,
response: IHttpResponse,
headerBindings: Set<string>,
dataObject: any
): Promise<string[]>;
protected async deserializeHttpMessage(
schema: Schema,
context: HandlerExecutionContext & SerdeFunctions,
response: IHttpResponse,
dataObject: any
): Promise<string[]>;
protected async deserializeHttpMessage(
schema: Schema,
context: HandlerExecutionContext & SerdeFunctions,
response: IHttpResponse,
arg4: unknown,
arg5?: unknown
): Promise<string[]> {
let dataObject: any;
if (arg4 instanceof Set) {
dataObject = arg5;
} else {
dataObject = arg4;
}

const deserializer = this.deserializer;
const ns = NormalizedSchema.of(schema);
const nonHttpBindingMembers = [] as string[];

for (const [memberName, memberSchema] of ns.structIterator()) {
const memberTraits = memberSchema.getMemberTraits();

if (memberTraits.httpPayload) {
const isStreaming = memberSchema.isStreaming();
if (isStreaming) {
const isEventStream = memberSchema.isStructSchema();
if (isEventStream) {
// streaming event stream (union)
const context = this.serdeContext as unknown as EventStreamSerdeContext;
if (!context.eventStreamMarshaller) {
throw new Error("@smithy/core - HttpProtocol: eventStreamMarshaller missing in serdeContext.");
}
const memberSchemas = memberSchema.getMemberSchemas();
dataObject[memberName] = context.eventStreamMarshaller.deserialize(response.body, async (event) => {
const unionMember =
Object.keys(event).find((key) => {
return key !== "__type";
}) ?? "";
if (unionMember in memberSchemas) {
const eventStreamSchema = memberSchemas[unionMember];
return {
[unionMember]: await deserializer.read(eventStreamSchema, event[unionMember].body),
};
} else {
// todo(schema): This union convention is ignored by the event stream marshaller.
// todo(schema): This should be returned to the user instead.
// see "if (deserialized.$unknown) return;" in getUnmarshalledStream.ts
return {
$unknown: event,
};
}
});
} else {
// streaming blob body
dataObject[memberName] = sdkStreamMixin(response.body);
}
} else if (response.body) {
const bytes: Uint8Array = await collectBody(response.body, context as SerdeFunctions);
if (bytes.byteLength > 0) {
dataObject[memberName] = await deserializer.read(memberSchema, bytes);
}
}
} else if (memberTraits.httpHeader) {
const key = String(memberTraits.httpHeader).toLowerCase();
const value = response.headers[key];
if (null != value) {
if (memberSchema.isListSchema()) {
const headerListValueSchema = memberSchema.getValueSchema();
let sections: string[];
if (
headerListValueSchema.isTimestampSchema() &&
headerListValueSchema.getSchema() === SCHEMA.TIMESTAMP_DEFAULT
) {
sections = splitEvery(value, ",", 2);
} else {
sections = splitHeader(value);
}
const list = [];
for (const section of sections) {
list.push(await deserializer.read([headerListValueSchema, { httpHeader: key }], section.trim()));
}
dataObject[memberName] = list;
} else {
dataObject[memberName] = await deserializer.read(memberSchema, value);
}
}
} else if (memberTraits.httpPrefixHeaders !== undefined) {
dataObject[memberName] = {};
for (const [header, value] of Object.entries(response.headers)) {
if (header.startsWith(memberTraits.httpPrefixHeaders)) {
dataObject[memberName][header.slice(memberTraits.httpPrefixHeaders.length)] = await deserializer.read(
[memberSchema.getValueSchema(), { httpHeader: header }],
value
);
}
}
} else if (memberTraits.httpResponseCode) {
dataObject[memberName] = response.statusCode;
} else {
nonHttpBindingMembers.push(memberName);
}
}
return nonHttpBindingMembers;
}
}
7 changes: 2 additions & 5 deletions packages/core/src/submodules/protocols/HttpProtocol.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { HttpProtocol } from "./HttpProtocol";
import { FromStringShapeDeserializer } from "./serde/FromStringShapeDeserializer";

describe(HttpProtocol.name, () => {
it("can deserialize a prefix header binding and header binding from the same header", async () => {
it("ignores http bindings (only HttpBindingProtocol uses them)", async () => {
type TestSignature = (
schema: Schema,
context: HandlerExecutionContext & SerdeFunctions,
Expand Down Expand Up @@ -46,10 +46,7 @@ describe(HttpProtocol.name, () => {
dataObject
);
expect(dataObject).toEqual({
prefixHeaders: {
header: "header-value",
},
header: "header-value",
// headers were ignored
});
});
});
Loading
Loading