Skip to content

Commit 8763931

Browse files
rschmittdagnir
authored andcommitted
Add support for event stream requests over RPC
This commit adds support for RPC services that use client->server or bidirectional event streaming. In the RPC formats (i.e. AWS JSON), the marshalled request and response objects are sent as the first event in the request and response stream respectively. This is different from the REST formats, such as REST JSON, which bind the request and response fields to non-payload positions (headers, URI, method, status code, query string) when event streaming is used. The implementation of this feature is mainly performed by a new global interceptor, `EventStreamInitialRequestInterceptor`. The interceptor detects when an `initial-request` event needs to be sent and transforms the async response body by constructing the `initial-request` message and prepending it to the event stream. Due to the high precedence of global interceptors, any subsequent service or override interceptors that run on the body will see the transformed version. This also applies to the `EventStreamAws4Signer`, which will produce a signature over the `initial-request` event.
1 parent ea869a8 commit 8763931

File tree

27 files changed

+2319
-27
lines changed

27 files changed

+2319
-27
lines changed

codegen/src/main/java/software/amazon/awssdk/codegen/poet/client/specs/JsonProtocolSpec.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,7 @@ public CodeBlock asyncExecutionHandler(IntermediateModel intermediateModel, Oper
232232
.add(".withMarshaller($L)\n", asyncMarshaller(model, opModel, marshaller, protocolFactory))
233233
.add(asyncRequestBody(opModel))
234234
.add(fullDuplex(opModel))
235+
.add(hasInitialRequestEvent(opModel, isRestJson))
235236
.add(".withResponseHandler($L)\n", responseHandlerName(opModel, isRestJson))
236237
.add(".withErrorResponseHandler(errorResponseHandler)\n")
237238
.add(".withMetricCollector(apiCallMetricCollector)\n")
@@ -271,6 +272,11 @@ private CodeBlock fullDuplex(OperationModel opModel) {
271272
: CodeBlock.of("");
272273
}
273274

275+
private CodeBlock hasInitialRequestEvent(OperationModel opModel, boolean isRestJson) {
276+
return opModel.hasEventStreamInput() && !isRestJson ? CodeBlock.of(".withInitialRequestEvent(true)")
277+
: CodeBlock.of("");
278+
}
279+
274280
private CodeBlock asyncRequestBody(OperationModel opModel) {
275281
return opModel.hasEventStreamInput() ? CodeBlock.of(".withAsyncRequestBody($T.fromPublisher(adapted))",
276282
AsyncRequestBody.class)

codegen/src/main/java/software/amazon/awssdk/codegen/poet/transform/protocols/EventStreamJsonMarshallerSpec.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,6 @@
3232
*/
3333
public final class EventStreamJsonMarshallerSpec extends JsonMarshallerSpec {
3434

35-
private static final String JSON_CONTENT_TYPE = "application/json";
36-
3735
public EventStreamJsonMarshallerSpec(IntermediateModel model, ShapeModel shapeModel) {
3836
super(shapeModel);
3937
}
@@ -51,7 +49,7 @@ public CodeBlock marshalCodeBlock(ClassName requestClassName) {
5149

5250
// Add :content-type header only if payload is present
5351
if (!shapeModel.hasNoEventPayload()) {
54-
builder.add(".putHeader(\":content-type\", \"$L\")", determinePayloadContentType());
52+
builder.add(".putHeader(\":content-type\", $L)", determinePayloadContentType());
5553
}
5654

5755
builder.add(".build();");
@@ -85,12 +83,12 @@ private String determinePayloadContentType() {
8583
return getPayloadContentType(explicitEventPayload);
8684
}
8785

88-
return JSON_CONTENT_TYPE;
86+
return "protocolFactory.getContentType()";
8987
}
9088

9189
private String getPayloadContentType(MemberModel memberModel) {
92-
String blobContentType = "application/octet-stream";
93-
String stringContentType = "text/plain";
90+
String blobContentType = "\"application/octet-stream\"";
91+
String stringContentType = "\"text/plain\"";
9492
String variableType = memberModel.getVariable().getVariableType();
9593

9694
if ("software.amazon.awssdk.core.SdkBytes".equals(variableType)) {
@@ -99,6 +97,6 @@ private String getPayloadContentType(MemberModel memberModel) {
9997
return stringContentType;
10098
}
10199

102-
return JSON_CONTENT_TYPE;
100+
return "protocolFactory.getContentType()";
103101
}
104102
}

codegen/src/test/java/software/amazon/awssdk/codegen/poet/ClientTestModels.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
package software.amazon.awssdk.codegen.poet;
1717

1818
import java.io.File;
19+
20+
import org.eclipse.core.runtime.internal.adaptor.IModel;
1921
import software.amazon.awssdk.codegen.C2jModels;
2022
import software.amazon.awssdk.codegen.IntermediateModelBuilder;
2123
import software.amazon.awssdk.codegen.model.config.customization.CustomizationConfig;
@@ -31,7 +33,20 @@
3133
public class ClientTestModels {
3234
private ClientTestModels() {}
3335

34-
public static IntermediateModel jsonServiceModels() {
36+
public static IntermediateModel awsJsonServiceModels() {
37+
File serviceModel = new File(ClientTestModels.class.getResource("client/c2j/json/service-2.json").getFile());
38+
File customizationModel = new File(ClientTestModels.class.getResource("client/c2j/json/customization.config").getFile());
39+
File paginatorsModel = new File(ClientTestModels.class.getResource("client/c2j/json/paginators.json").getFile());
40+
C2jModels models = C2jModels.builder()
41+
.serviceModel(getServiceModel(serviceModel))
42+
.customizationConfig(getCustomizationConfig(customizationModel))
43+
.paginatorsModel(getPaginatorsModel(paginatorsModel))
44+
.build();
45+
46+
return new IntermediateModelBuilder(models).build();
47+
}
48+
49+
public static IntermediateModel restJsonServiceModels() {
3550
File serviceModel = new File(ClientTestModels.class.getResource("client/c2j/rest-json/service-2.json").getFile());
3651
File customizationModel = new File(ClientTestModels.class.getResource("client/c2j/rest-json/customization.config").getFile());
3752
File paginatorsModel = new File(ClientTestModels.class.getResource("client/c2j/rest-json/paginators.json").getFile());

codegen/src/test/java/software/amazon/awssdk/codegen/poet/builder/BuilderClassTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public void asyncClientBuilderClass() throws Exception {
6969
}
7070

7171
private void validateGeneration(Function<IntermediateModel, ClassSpec> generatorConstructor, String expectedClassName) {
72-
assertThat(generatorConstructor.apply(ClientTestModels.jsonServiceModels()), generatesTo(expectedClassName));
72+
assertThat(generatorConstructor.apply(ClientTestModels.restJsonServiceModels()), generatesTo(expectedClassName));
7373
}
7474

7575
private void validateQueryGeneration(Function<IntermediateModel, ClassSpec> generatorConstructor, String expectedClassName) {

codegen/src/test/java/software/amazon/awssdk/codegen/poet/client/PoetClientFunctionalTests.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,27 +28,26 @@ public class PoetClientFunctionalTests {
2828

2929
@Test
3030
public void asyncClientClass() throws Exception {
31-
AsyncClientClass asyncClientClass = new AsyncClientClass(
32-
GeneratorTaskParams.create(ClientTestModels.jsonServiceModels(), "sources/", "tests/"));
31+
AsyncClientClass asyncClientClass = createAsyncClientClass(ClientTestModels.restJsonServiceModels());
3332
assertThat(asyncClientClass, generatesTo("test-async-client-class.java"));
3433
}
3534

3635
@Test
3736
public void asyncClientInterface() throws Exception {
38-
ClassSpec asyncClientInterface = new AsyncClientInterface(ClientTestModels.jsonServiceModels());
37+
ClassSpec asyncClientInterface = new AsyncClientInterface(ClientTestModels.restJsonServiceModels());
3938
assertThat(asyncClientInterface, generatesTo("test-json-async-client-interface.java"));
4039
}
4140

4241
@Test
4342
public void simpleMethodsIntegClass() throws Exception {
4443
ClientSimpleMethodsIntegrationTests simpleMethodsClass = new ClientSimpleMethodsIntegrationTests(
45-
ClientTestModels.jsonServiceModels());
44+
ClientTestModels.restJsonServiceModels());
4645
assertThat(simpleMethodsClass, generatesTo("test-simple-methods-integ-class.java"));
4746
}
4847

4948
@Test
50-
public void syncClientClassJson() throws Exception {
51-
SyncClientClass syncClientClass = createSyncClientClass(ClientTestModels.jsonServiceModels());
49+
public void syncClientClassRestJson() throws Exception {
50+
SyncClientClass syncClientClass = createSyncClientClass(ClientTestModels.restJsonServiceModels());
5251
assertThat(syncClientClass, generatesTo("test-json-client-class.java"));
5352
}
5453

@@ -58,6 +57,11 @@ public void syncClientClassQuery() throws Exception {
5857
assertThat(syncClientClass, generatesTo("test-query-client-class.java"));
5958
}
6059

60+
@Test
61+
public void asyncClientClassAwsJson() throws Exception {
62+
AsyncClientClass asyncClientClass = createAsyncClientClass(ClientTestModels.awsJsonServiceModels());
63+
assertThat(asyncClientClass, generatesTo("test-aws-json-async-client-class.java"));
64+
}
6165

6266
@Test
6367
public void asyncClientClassQuery() throws Exception {
@@ -71,7 +75,6 @@ public void syncClientClassXml() throws Exception {
7175
assertThat(syncClientClass, generatesTo("test-xml-client-class.java"));
7276
}
7377

74-
7578
@Test
7679
public void asyncClientClassXml() throws Exception {
7780
AsyncClientClass syncClientClass = createAsyncClientClass(ClientTestModels.xmlServiceModels());
@@ -88,7 +91,7 @@ private AsyncClientClass createAsyncClientClass(IntermediateModel model) {
8891

8992
@Test
9093
public void syncClientInterface() throws Exception {
91-
ClassSpec syncClientInterface = new SyncClientInterface(ClientTestModels.jsonServiceModels());
94+
ClassSpec syncClientInterface = new SyncClientInterface(ClientTestModels.restJsonServiceModels());
9295
assertThat(syncClientInterface, generatesTo("test-json-client-interface.java"));
9396
}
9497

codegen/src/test/java/software/amazon/awssdk/codegen/poet/eventstream/EventStreamFunctionalTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public void responseHandlerBuilder() throws Exception {
4545

4646
private void runTest(BiFunction<GeneratorTaskParams, OperationModel, ClassSpec> specFactory,
4747
String expectedTestFile) {
48-
IntermediateModel model = ClientTestModels.jsonServiceModels();
48+
IntermediateModel model = ClientTestModels.restJsonServiceModels();
4949
GeneratorTaskParams dependencies = GeneratorTaskParams.create(model, "sources/", "tests/");
5050
ClassSpec classSpec = specFactory.apply(dependencies, model.getOperation("EventStreamOperation"));
5151
assertThat(classSpec, generatesTo(expectedTestFile));
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
{
2+
"authPolicyActions" : {
3+
"skip" : true
4+
},
5+
"presignersFqcn": "software.amazon.awssdk.services.acm.presign.AcmClientPresigners",
6+
"serviceSpecificHttpConfig": "software.amazon.MyServiceHttpConfig",
7+
"serviceSpecificClientConfigClass": "ServiceConfiguration",
8+
"customRetryPolicy": "software.amazon.MyServiceRetryPolicy",
9+
"verifiedSimpleMethods" : ["paginatedOperationWithResultKey"],
10+
"blacklistedSimpleMethods" : [
11+
"eventStreamOperation"
12+
],
13+
"utilitiesMethod": {
14+
"returnType": "software.amazon.awssdk.services.json.JsonUtilities",
15+
"createMethodParams": ["param1", "param2", "param3"]
16+
},
17+
"useLegacyEventGenerationScheme": {
18+
"EventStream": ["EventOne", "event-two", "eventThree"]
19+
}
20+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
{
2+
"pagination": {
3+
"PaginatedOperationWithResultKey": {
4+
"input_token": "NextToken",
5+
"output_token": "NextToken",
6+
"limit_key": "MaxResults",
7+
"result_key": "Items"
8+
},
9+
"PaginatedOperationWithoutResultKey": {
10+
"input_token": "NextToken",
11+
"output_token": "NextToken",
12+
"limit_key": "MaxResults"
13+
}
14+
}
15+
}

0 commit comments

Comments
 (0)