Skip to content

Commit f43d3b9

Browse files
committed
implement http2bridge
1 parent bc03420 commit f43d3b9

File tree

4 files changed

+689
-0
lines changed

4 files changed

+689
-0
lines changed

internal/transport/grpchttp2/errors.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,3 +70,9 @@ func (err ErrCode) String() string {
7070
}
7171
return fmt.Sprintf("unknown error code %#x", uint32(err))
7272
}
73+
74+
type connError ErrCode
75+
76+
func (err connError) Error() string {
77+
return fmt.Sprintf("connection error: %s", ErrCode(err))
78+
}

internal/transport/grpchttp2/framer.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,10 @@ const (
5353
FlagContinuationEndHeaders Flag = 0x4
5454
)
5555

56+
func (f Flag) Has(flag Flag) bool {
57+
return f&flag != 0
58+
}
59+
5660
// Setting represents the id and value pair of an HTTP/2 setting.
5761
// See [Setting Format].
5862
//
@@ -242,6 +246,8 @@ func (f *WindowUpdateFrame) Header() *FrameHeader {
242246
return f.hdr
243247
}
244248

249+
func (f *WindowUpdateFrame) Free() {}
250+
245251
// ContinuationFrame is the representation of a [CONTINUATION Frame]. The
246252
// CONTINUATION frame is used to continue a sequence of header block fragments.
247253
//
Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
/*
2+
*
3+
* Copyright 2024 gRPC authors.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package grpchttp2
20+
21+
import (
22+
"io"
23+
24+
"golang.org/x/net/http2"
25+
"golang.org/x/net/http2/hpack"
26+
"google.golang.org/grpc"
27+
)
28+
29+
const (
30+
initHeaderTableSize = 4096
31+
maxFrameLen = 16384
32+
)
33+
34+
type HTTP2FramerBridge struct {
35+
framer *http2.Framer
36+
buf [maxFrameLen]byte
37+
pool grpc.SharedBufferPool
38+
}
39+
40+
func NewHTTP2FramerBridge(w io.Writer, r io.Reader, maxHeaderListSize uint32) *HTTP2FramerBridge {
41+
fr := &HTTP2FramerBridge{
42+
framer: http2.NewFramer(w, r),
43+
pool: grpc.NewSharedBufferPool(),
44+
}
45+
46+
fr.framer.SetReuseFrames()
47+
fr.framer.MaxHeaderListSize = maxHeaderListSize
48+
fr.framer.ReadMetaHeaders = (hpack.NewDecoder(initHeaderTableSize, nil))
49+
50+
return fr
51+
}
52+
53+
func (fr *HTTP2FramerBridge) ReadFrame() (Frame, error) {
54+
f, err := fr.framer.ReadFrame()
55+
if err != nil {
56+
return nil, err
57+
}
58+
59+
h := f.Header()
60+
hdr := &FrameHeader{
61+
Size: h.Length,
62+
Type: FrameType(h.Type),
63+
Flags: Flag(h.Flags),
64+
StreamID: h.StreamID,
65+
}
66+
67+
switch f := f.(type) {
68+
case *http2.DataFrame:
69+
buf := fr.pool.Get(int(hdr.Size))
70+
copy(buf, f.Data())
71+
df := &DataFrame{
72+
hdr: hdr,
73+
Data: buf,
74+
}
75+
df.free = func() {
76+
fr.pool.Put(&buf)
77+
df.Data = nil
78+
}
79+
return df, nil
80+
case *http2.HeadersFrame:
81+
buf := fr.pool.Get(int(hdr.Size))
82+
copy(buf, f.HeaderBlockFragment())
83+
hf := &HeadersFrame{
84+
hdr: hdr,
85+
HdrBlock: buf,
86+
}
87+
hf.free = func() {
88+
fr.pool.Put(&buf)
89+
hf.HdrBlock = nil
90+
}
91+
return hf, nil
92+
case *http2.RSTStreamFrame:
93+
return &RSTStreamFrame{
94+
hdr: hdr,
95+
Code: ErrCode(f.ErrCode),
96+
}, nil
97+
case *http2.SettingsFrame:
98+
buf := make([]Setting, 0, f.NumSettings())
99+
f.ForeachSetting(func(s http2.Setting) error {
100+
buf = append(buf, Setting{
101+
ID: SettingID(s.ID),
102+
Value: s.Val,
103+
})
104+
return nil
105+
})
106+
sf := &SettingsFrame{
107+
hdr: hdr,
108+
Settings: buf,
109+
}
110+
return sf, nil
111+
case *http2.PingFrame:
112+
buf := fr.pool.Get(int(hdr.Size))
113+
copy(buf, f.Data[:])
114+
pf := &PingFrame{
115+
hdr: hdr,
116+
Data: buf,
117+
}
118+
pf.free = func() {
119+
fr.pool.Put(&buf)
120+
pf.Data = nil
121+
}
122+
return pf, nil
123+
case *http2.GoAwayFrame:
124+
buf := fr.pool.Get(int(hdr.Size - 8))
125+
copy(buf, f.DebugData())
126+
gf := &GoAwayFrame{
127+
hdr: hdr,
128+
DebugData: buf,
129+
Code: ErrCode(f.ErrCode),
130+
LastStreamID: f.LastStreamID,
131+
}
132+
gf.free = func() {
133+
fr.pool.Put(&buf)
134+
gf.DebugData = nil
135+
}
136+
return gf, nil
137+
case *http2.WindowUpdateFrame:
138+
return &WindowUpdateFrame{
139+
hdr: hdr,
140+
Inc: f.Increment,
141+
}, nil
142+
case *http2.ContinuationFrame:
143+
buf := fr.pool.Get(int(hdr.Size))
144+
copy(buf, f.HeaderBlockFragment())
145+
return &ContinuationFrame{
146+
hdr: hdr,
147+
HdrBlock: buf,
148+
}, nil
149+
case *http2.MetaHeadersFrame:
150+
return &MetaHeadersFrame{
151+
hdr: hdr,
152+
Fields: f.Fields,
153+
}, nil
154+
}
155+
156+
return nil, connError(ErrCodeProtocol)
157+
}
158+
159+
func (fr *HTTP2FramerBridge) WriteData(streamID uint32, endStream bool, data ...[]byte) error {
160+
off := 0
161+
162+
for _, s := range data {
163+
off += copy(fr.buf[off:], s)
164+
}
165+
166+
return fr.framer.WriteData(streamID, endStream, fr.buf[:off])
167+
}
168+
169+
func (fr *HTTP2FramerBridge) WriteHeaders(streamID uint32, endStream, endHeaders bool, headerBlock []byte) error {
170+
p := http2.HeadersFrameParam{
171+
StreamID: streamID,
172+
EndStream: endStream,
173+
EndHeaders: endHeaders,
174+
BlockFragment: headerBlock,
175+
}
176+
177+
return fr.framer.WriteHeaders(p)
178+
}
179+
180+
func (fr *HTTP2FramerBridge) WriteRSTStream(streamID uint32, code ErrCode) error {
181+
return fr.framer.WriteRSTStream(streamID, http2.ErrCode(code))
182+
}
183+
184+
func (fr *HTTP2FramerBridge) WriteSettings(settings ...Setting) error {
185+
ss := make([]http2.Setting, 0, len(settings))
186+
for _, s := range settings {
187+
ss = append(ss, http2.Setting{
188+
ID: http2.SettingID(s.ID),
189+
Val: s.Value,
190+
})
191+
}
192+
193+
return fr.framer.WriteSettings(ss...)
194+
}
195+
196+
func (fr *HTTP2FramerBridge) WriteSettingsAck() error {
197+
return fr.framer.WriteSettingsAck()
198+
}
199+
200+
func (fr *HTTP2FramerBridge) WritePing(ack bool, data [8]byte) error {
201+
return fr.framer.WritePing(ack, data)
202+
}
203+
204+
func (fr *HTTP2FramerBridge) WriteGoAway(maxStreamID uint32, code ErrCode, debugData []byte) error {
205+
return fr.framer.WriteGoAway(maxStreamID, http2.ErrCode(code), debugData)
206+
}
207+
208+
func (fr *HTTP2FramerBridge) WriteWindowUpdate(streamID, inc uint32) error {
209+
return fr.framer.WriteWindowUpdate(streamID, inc)
210+
}
211+
212+
func (fr *HTTP2FramerBridge) WriteContinuation(streamID uint32, endHeaders bool, headerBlock []byte) error {
213+
return fr.framer.WriteContinuation(streamID, endHeaders, headerBlock)
214+
}

0 commit comments

Comments
 (0)