@@ -32,38 +32,44 @@ private[http2] object HeaderCompression extends GraphStage[FlowShape[FrameEvent,
32
32
val encoder = new akka.http.shaded.com.twitter.hpack.Encoder (Http2Protocol .InitialMaxHeaderTableSize )
33
33
val os = new ByteArrayOutputStream (128 )
34
34
35
+ <<<<<<< HEAD
35
36
def onPull (): Unit = pull(eventsIn)
36
37
def onPush (): Unit = grab(eventsIn) match {
37
38
case ack @ SettingsAckFrame (s) =>
38
39
applySettings(s)
39
40
push(eventsOut, ack)
40
41
case ParsedHeadersFrame (streamId, endStream, kvs, prioInfo) =>
41
- kvs.foreach {
42
- case (key, value : String ) =>
43
- encoder.encodeHeader(os, key, value, false )
44
- case (key, value) =>
45
- throw new IllegalStateException (s " Didn't expect key-value-pair [ $key] -> [ $value]( ${value.getClass}) here. " )
46
- }
47
- val result = ByteString .fromArrayUnsafe(os.toByteArray) // BAOS.toByteArray always creates a copy
48
- os.reset()
49
- if (result.size <= currentMaxFrameSize) push(eventsOut, HeadersFrame (streamId, endStream, endHeaders = true , result, prioInfo))
42
+ // When ending the stream without any payload, use a DATA frame rather than
43
+ // a HEADERS frame to work around https://github.com/golang/go/issues/47851.
44
+ if (endStream && kvs.isEmpty) push(eventsOut, DataFrame (streamId, endStream, ByteString .empty))
50
45
else {
51
- val first = HeadersFrame (streamId, endStream, endHeaders = false , result.take(currentMaxFrameSize), prioInfo)
46
+ kvs.foreach {
47
+ case (key, value : String ) =>
48
+ encoder.encodeHeader(os, key, value, false )
49
+ case (key, value) =>
50
+ throw new IllegalStateException (s " Didn't expect key-value-pair [ $key] -> [ $value]( ${value.getClass}) here. " )
51
+ }
52
+ val result = ByteString .fromArrayUnsafe(os.toByteArray) // BAOS.toByteArray always creates a copy
53
+ os.reset()
54
+ if (result.size <= currentMaxFrameSize) push(eventsOut, HeadersFrame (streamId, endStream, endHeaders = true , result, prioInfo))
55
+ else {
56
+ val first = HeadersFrame (streamId, endStream, endHeaders = false , result.take(currentMaxFrameSize), prioInfo)
52
57
53
- push(eventsOut, first)
54
- setHandler(eventsOut, new OutHandler {
55
- private var remainingData = result.drop(currentMaxFrameSize)
58
+ push(eventsOut, first)
59
+ setHandler(eventsOut, new OutHandler {
60
+ private var remainingData = result.drop(currentMaxFrameSize)
56
61
57
- def onPull (): Unit = {
58
- val thisFragment = remainingData.take(currentMaxFrameSize)
59
- val rest = remainingData.drop(currentMaxFrameSize)
60
- val last = rest.isEmpty
62
+ def onPull (): Unit = {
63
+ val thisFragment = remainingData.take(currentMaxFrameSize)
64
+ val rest = remainingData.drop(currentMaxFrameSize)
65
+ val last = rest.isEmpty
61
66
62
- push(eventsOut, ContinuationFrame (streamId, endHeaders = last, thisFragment))
63
- if (last) setHandler(eventsOut, logic)
64
- else remainingData = rest
65
- }
66
- })
67
+ push(eventsOut, ContinuationFrame (streamId, endHeaders = last, thisFragment))
68
+ if (last) setHandler(eventsOut, logic)
69
+ else remainingData = rest
70
+ }
71
+ })
72
+ }
67
73
}
68
74
case x => push(eventsOut, x)
69
75
}
0 commit comments