diff --git a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java index 2ff94b7804a..f3faa92d4a0 100644 --- a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java +++ b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java @@ -140,9 +140,15 @@ public final ClientStream newStream( ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult, callOptions.isWaitForReady()); if (transport != null) { - return transport.newStream( + ClientStream stream = transport.newStream( args.getMethodDescriptor(), args.getHeaders(), callOptions, tracers); + // User code provided authority takes precedence over the LB provided one; this will be + // overwritten by ClientCallImpl if the application sets an authority override + if (pickResult.getAuthorityOverride() != null) { + stream.setAuthority(pickResult.getAuthorityOverride()); + } + return stream; } } // This picker's conclusion is "buffer". If there hasn't been a newer picker set (possible @@ -287,10 +293,6 @@ final void reprocess(@Nullable SubchannelPicker picker) { for (final PendingStream stream : toProcess) { PickResult pickResult = picker.pickSubchannel(stream.args); CallOptions callOptions = stream.args.getCallOptions(); - // User code provided authority takes precedence over the LB provided one. - if (callOptions.getAuthority() == null && pickResult.getAuthorityOverride() != null) { - stream.setAuthority(pickResult.getAuthorityOverride()); - } final ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult, callOptions.isWaitForReady()); if (transport != null) { @@ -301,7 +303,7 @@ final void reprocess(@Nullable SubchannelPicker picker) { if (callOptions.getExecutor() != null) { executor = callOptions.getExecutor(); } - Runnable runnable = stream.createRealStream(transport); + Runnable runnable = stream.createRealStream(transport, pickResult.getAuthorityOverride()); if (runnable != null) { executor.execute(runnable); } @@ -354,7 +356,7 @@ private PendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers) { } /** Runnable may be null. */ - private Runnable createRealStream(ClientTransport transport) { + private Runnable createRealStream(ClientTransport transport, String authorityOverride) { ClientStream realStream; Context origContext = context.attach(); try { @@ -364,6 +366,13 @@ private Runnable createRealStream(ClientTransport transport) { } finally { context.detach(origContext); } + if (authorityOverride != null) { + // User code provided authority takes precedence over the LB provided one; this will be + // overwritten by an enqueud call from ClientCallImpl if the application sets an authority + // override. We must call the real stream directly because stream.start() has likely already + // been called on the delayed stream. + realStream.setAuthority(authorityOverride); + } return setStream(realStream); } diff --git a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java index a5160552a9e..f65e6abcf1b 100644 --- a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java +++ b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java @@ -503,26 +503,11 @@ public void uncaughtException(Thread t, Throwable e) { } @Test - public void reprocess_authorityOverridePresentInCallOptions_authorityOverrideFromLbIsIgnored() { - DelayedStream delayedStream = (DelayedStream) delayedTransport.newStream( - method, headers, callOptions, tracers); - delayedStream.start(mock(ClientStreamListener.class)); - SubchannelPicker picker = mock(SubchannelPicker.class); - PickResult pickResult = PickResult.withSubchannel( - mockSubchannel, null, "authority-override-hostname-from-lb"); - when(picker.pickSubchannel(any(PickSubchannelArgs.class))).thenReturn(pickResult); - - delayedTransport.reprocess(picker); - fakeExecutor.runDueTasks(); - - verify(mockRealStream, never()).setAuthority("authority-override-hostname-from-lb"); - } - - @Test - public void - reprocess_authorityOverrideNotInCallOptions_authorityOverrideFromLbIsSetIntoStream() { + public void reprocess_authorityOverrideFromLb() { + InOrder inOrder = inOrder(mockRealStream); DelayedStream delayedStream = (DelayedStream) delayedTransport.newStream( method, headers, callOptions.withAuthority(null), tracers); + delayedStream.setAuthority("authority-override-from-calloptions"); delayedStream.start(mock(ClientStreamListener.class)); SubchannelPicker picker = mock(SubchannelPicker.class); PickResult pickResult = PickResult.withSubchannel( @@ -536,7 +521,10 @@ public void reprocess_authorityOverridePresentInCallOptions_authorityOverrideFro delayedTransport.reprocess(picker); fakeExecutor.runDueTasks(); - verify(mockRealStream).setAuthority("authority-override-hostname-from-lb"); + // Must be set before start(), and may be overwritten + inOrder.verify(mockRealStream).setAuthority("authority-override-hostname-from-lb"); + inOrder.verify(mockRealStream).setAuthority("authority-override-from-calloptions"); + inOrder.verify(mockRealStream).start(any(ClientStreamListener.class)); } @Test @@ -563,28 +551,26 @@ public void reprocess_NoPendingStream() { } @Test - public void newStream_assignsTransport_authorityFromCallOptionsSupersedesAuthorityFromLB() { + public void newStream_authorityOverrideFromLb() { + InOrder inOrder = inOrder(mockRealStream); SubchannelPicker picker = mock(SubchannelPicker.class); - AbstractSubchannel subchannel = mock(AbstractSubchannel.class); - when(subchannel.getInternalSubchannel()).thenReturn(mockInternalSubchannel); PickResult pickResult = PickResult.withSubchannel( - subchannel, null, "authority-override-hostname-from-lb"); + mockSubchannel, null, "authority-override-hostname-from-lb"); when(picker.pickSubchannel(any(PickSubchannelArgs.class))).thenReturn(pickResult); - ArgumentCaptor callOptionsArgumentCaptor = - ArgumentCaptor.forClass(CallOptions.class); when(mockRealTransport.newStream( - any(MethodDescriptor.class), any(Metadata.class), callOptionsArgumentCaptor.capture(), - ArgumentMatchers.any())) + any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class), any())) .thenReturn(mockRealStream); delayedTransport.reprocess(picker); - verifyNoMoreInteractions(picker); - verifyNoMoreInteractions(transportListener); - CallOptions callOptions = - CallOptions.DEFAULT.withAuthority("authority-override-hosstname-from-calloptions"); - delayedTransport.newStream(method, headers, callOptions, tracers); - assertThat(callOptionsArgumentCaptor.getValue().getAuthority()).isEqualTo( - "authority-override-hosstname-from-calloptions"); + ClientStream stream = delayedTransport.newStream(method, headers, callOptions, tracers); + assertThat(stream).isSameInstanceAs(mockRealStream); + stream.setAuthority("authority-override-from-calloptions"); + stream.start(mock(ClientStreamListener.class)); + + // Must be set before start(), and may be overwritten + inOrder.verify(mockRealStream).setAuthority("authority-override-hostname-from-lb"); + inOrder.verify(mockRealStream).setAuthority("authority-override-from-calloptions"); + inOrder.verify(mockRealStream).start(any(ClientStreamListener.class)); } @Test