Skip to content

Commit c5e4a5d

Browse files
committed
simplify data structures
1 parent 11dc92b commit c5e4a5d

File tree

4 files changed

+138
-138
lines changed

4 files changed

+138
-138
lines changed

connection.go

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ type Connection struct {
2020
mutex *sync.Mutex
2121
requestId uint32
2222
Greeting *Greeting
23-
requests map[uint32]*responseAndError
23+
requests map[uint32]*Future
2424
packets chan []byte
2525
control chan struct{}
2626
opts Opts
@@ -45,7 +45,7 @@ func Connect(addr string, opts Opts) (conn *Connection, err error) {
4545
mutex: &sync.Mutex{},
4646
requestId: 0,
4747
Greeting: &Greeting{},
48-
requests: make(map[uint32]*responseAndError),
48+
requests: make(map[uint32]*Future),
4949
packets: make(chan []byte, 64),
5050
control: make(chan struct{}),
5151
opts: opts,
@@ -117,9 +117,9 @@ func (conn *Connection) closeConnection(neterr error) (err error) {
117117
conn.connection = nil
118118
conn.r = nil
119119
conn.w = nil
120-
for rid, resp := range conn.requests {
121-
resp.r.Error = neterr
122-
close(resp.c)
120+
for rid, fut := range conn.requests {
121+
fut.err = neterr
122+
close(fut.c)
123123
delete(conn.requests, rid)
124124
}
125125
return
@@ -171,18 +171,16 @@ func (conn *Connection) reader() {
171171
conn.closeConnection(err)
172172
continue
173173
}
174-
var resp Response
175-
resp_bytes = resp.fill(resp_bytes)
176-
if resp.Error != nil {
177-
conn.closeConnection(resp.Error)
174+
resp, err := newResponse(resp_bytes)
175+
if err != nil {
176+
conn.closeConnection(err)
178177
continue
179178
}
180179
conn.mutex.Lock()
181-
if r, ok := conn.requests[resp.RequestId]; ok {
180+
if fut, ok := conn.requests[resp.RequestId]; ok {
182181
delete(conn.requests, resp.RequestId)
183-
r.r = resp
184-
r.b = resp_bytes
185-
close(r.c)
182+
fut.resp = resp
183+
close(rae.c)
186184
conn.mutex.Unlock()
187185
} else {
188186
conn.mutex.Unlock()

request.go

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@ type Request struct {
1616
type Future struct {
1717
conn *Connection
1818
id uint32
19-
r responseAndError
19+
resp *Response
20+
err error
21+
c chan struct{}
2022
t *time.Timer
2123
tc <-chan time.Time
2224
}
@@ -194,22 +196,22 @@ func (req *Request) future() (f *Future) {
194196
f = &Future{
195197
conn: req.conn,
196198
id: req.requestId,
197-
r: responseAndError{c: make(chan struct{})},
199+
c: make(chan struct{}),
198200
}
199201
var packet []byte
200-
if packet, f.r.r.Error = req.pack(); f.r.r.Error != nil {
201-
close(f.r.c)
202+
if packet, f.err = req.pack(); f.err != nil {
203+
close(f.c)
202204
return
203205
}
204206

205207
req.conn.mutex.Lock()
206208
if req.conn.closed {
207209
req.conn.mutex.Unlock()
208-
f.r.r.Error = errors.New("using closed connection")
209-
close(f.r.c)
210+
f.err = errors.New("using closed connection")
211+
close(f.c)
210212
return
211213
}
212-
req.conn.requests[req.requestId] = &f.r
214+
req.conn.requests[req.requestId] = &f
213215
req.conn.mutex.Unlock()
214216
req.conn.packets <- (packet)
215217

@@ -222,16 +224,16 @@ func (req *Request) future() (f *Future) {
222224

223225
func (f *Future) wait() {
224226
select {
225-
case <-f.r.c:
227+
case <-f.c:
226228
default:
227229
select {
228-
case <-f.r.c:
230+
case <-f.c:
229231
case <-f.tc:
230232
f.conn.mutex.Lock()
231233
if _, ok := f.conn.requests[f.id]; ok {
232234
delete(f.conn.requests, f.id)
233-
close(f.r.c)
234-
f.r.r.Error = errors.New("client timeout")
235+
close(f.c)
236+
f.err = errors.New("client timeout")
235237
}
236238
f.conn.mutex.Unlock()
237239
}
@@ -245,10 +247,18 @@ func (f *Future) wait() {
245247

246248
func (f *Future) Get() (*Response, error) {
247249
f.wait()
248-
return f.r.get()
250+
if f.err != nil {
251+
return f.resp, f.err
252+
}
253+
f.err = f.resp.decodeBody()
254+
return f.resp, f.err
249255
}
250256

251-
func (f *Future) GetTyped(r interface{}) error {
257+
func (f *Future) GetTyped(r interface{}) (error) {
252258
f.wait()
253-
return f.r.getTyped(r)
259+
if f.err != nil {
260+
return f.err
261+
}
262+
f.err = f.resp.decodeBody(r)
263+
return f.err
254264
}

response.go

Lines changed: 51 additions & 111 deletions
Original file line numberDiff line numberDiff line change
@@ -1,172 +1,112 @@
11
package tarantool
22

33
import (
4-
"errors"
54
"fmt"
65
"gopkg.in/vmihailenco/msgpack.v2"
7-
"io"
86
)
97

108
type Response struct {
119
RequestId uint32
1210
Code uint32
13-
Error error
11+
Error string
1412
Data []interface{}
13+
buf smallBuf
1514
}
1615

17-
type responseAndError struct {
18-
c chan struct{}
19-
b []byte
20-
r Response
16+
func (resp *Response) fill(b []byte) {
17+
resp.buf.b = b
2118
}
2219

23-
type smallBuf struct {
24-
b []byte
25-
p int
20+
func newResponse(b []byte) (resp *Response, err error) {
21+
resp = &Response{ buf: smallBuf{b: b} }
22+
err = resp.decodeHeader()
23+
return
2624
}
2725

28-
func (s *smallBuf) Read(d []byte) (l int, err error) {
29-
l = len(s.b) - s.p
30-
if l == 0 && len(d) > 0 {
31-
return 0, io.EOF
32-
}
33-
if l > len(d) {
34-
l = len(d)
35-
}
36-
copy(d, s.b[s.p:])
37-
s.p += l
38-
return l, nil
39-
}
40-
41-
func (s *smallBuf) ReadByte() (b byte, err error) {
42-
if s.p == len(s.b) {
43-
return 0, io.EOF
44-
}
45-
b = s.b[s.p]
46-
s.p++
47-
return b, nil
48-
}
49-
50-
func (s *smallBuf) UnreadByte() error {
51-
if s.p == 0 {
52-
return errors.New("Could not unread")
53-
}
54-
s.p--
55-
return nil
56-
}
57-
58-
func (s *smallBuf) Len() int {
59-
return len(s.b) - s.p
60-
}
61-
62-
func (s *smallBuf) Bytes() []byte {
63-
if len(s.b) > s.p {
64-
return s.b[s.p:]
65-
}
66-
return nil
67-
}
68-
69-
func (r *Response) fill(b []byte) []byte {
26+
func (resp *Response) decodeHeader() (err error) {
7027
var l int
71-
s := smallBuf{b: b}
72-
d := msgpack.NewDecoder(&s)
73-
if l, r.Error = d.DecodeMapLen(); r.Error != nil {
74-
return nil
28+
d := msgpack.NewDecoder(&resp.buf)
29+
if l, err = d.DecodeMapLen(); err != nil {
30+
return
7531
}
7632
for ; l > 0; l-- {
7733
var cd int
78-
if cd, r.Error = d.DecodeInt(); r.Error != nil {
79-
return nil
34+
if cd, err = d.DecodeInt(); err != nil {
35+
return
8036
}
8137
switch cd {
8238
case KeySync:
83-
if r.RequestId, r.Error = d.DecodeUint32(); r.Error != nil {
84-
return nil
39+
if resp.RequestId, err = d.DecodeUint32(); err != nil {
40+
return
8541
}
8642
case KeyCode:
87-
if r.Code, r.Error = d.DecodeUint32(); r.Error != nil {
88-
return nil
43+
if resp.Code, err = d.DecodeUint32(); err != nil {
44+
return
8945
}
9046
}
9147
}
92-
return s.Bytes()
93-
}
94-
95-
func (resp *Response) String() (str string) {
96-
if resp.Code == OkCode {
97-
return fmt.Sprintf("<%d OK %v>", resp.RequestId, resp.Data)
98-
} else {
99-
return fmt.Sprintf("<%d ERR 0x%x %s>", resp.RequestId, resp.Code, resp.Error)
100-
}
48+
return nil
10149
}
10250

103-
func (r *responseAndError) get() (*Response, error) {
104-
if r.r.Error != nil {
105-
return &r.r, r.r.Error
106-
}
107-
if len(r.b) > 0 {
51+
func (resp *Response) decodeBody() (err error) {
52+
if len(resp.buf.Len()) > 2 {
10853
var body map[int]interface{}
109-
d := msgpack.NewDecoder(&smallBuf{b: r.b})
54+
d := msgpack.NewDecoder(&resp.buf)
11055

111-
if r.r.Error = d.Decode(&body); r.r.Error != nil {
112-
r.b = nil
113-
return nil, r.r.Error
56+
if err = d.Decode(&body); err != nil {
57+
return nil, err
11458
}
11559

11660
if body[KeyData] != nil {
11761
data := body[KeyData].([]interface{})
118-
r.r.Data = make([]interface{}, len(data))
62+
resp.Data = make([]interface{}, len(data))
11963
for i, v := range data {
120-
r.r.Data[i] = v.([]interface{})
64+
resp.Data[i] = v.([]interface{})
12165
}
12266
}
12367

124-
if r.r.Code != OkCode {
125-
r.r.Error = Error{r.r.Code, body[KeyError].(string)}
68+
if resp.Code != OkCode {
69+
err = Error{resp.Code, body[KeyError].(string)}
12670
}
127-
r.b = nil
12871
}
129-
130-
return &r.r, r.r.Error
13172
}
13273

133-
func (r *responseAndError) getTyped(res interface{}) error {
134-
if r.r.Error != nil {
135-
return r.r.Error
136-
}
137-
if len(r.b) > 0 {
74+
func (resp *Response) decodeBodyTyped(res interface{}) (err error) {
75+
if len(resp.buf.Len()) > 0 {
13876
var l int
139-
d := msgpack.NewDecoder(&smallBuf{b: r.b})
140-
if l, r.r.Error = d.DecodeMapLen(); r.r.Error != nil {
141-
r.b = nil
142-
return r.r.Error
77+
d := msgpack.NewDecoder(&resp.buf)
78+
if l, err = d.DecodeMapLen(); err != nil {
79+
return err
14380
}
14481

14582
for ; l > 0; l-- {
14683
var cd int
147-
if cd, r.r.Error = d.DecodeInt(); r.r.Error != nil {
148-
r.b = nil
149-
return r.r.Error
84+
if cd, err = d.DecodeInt(); err != nil {
85+
return err
15086
}
15187
switch cd {
15288
case KeyData:
153-
if r.r.Error = d.Decode(res); r.r.Error != nil {
154-
r.b = nil
155-
return r.r.Error
89+
if err = d.Decode(res); err != nil {
90+
return err
15691
}
15792
case KeyError:
158-
var str string
159-
if str, r.r.Error = d.DecodeString(); r.r.Error == nil {
160-
r.r.Error = Error{
161-
r.r.Code,
162-
str,
163-
}
93+
if resp.Error, err = d.DecodeString(); err != nil {
94+
return err
16495
}
165-
r.b = nil
166-
return r.r.Error
16796
}
16897
}
98+
99+
if resp.Code != OkCode {
100+
err = Error{resp.Code, body[KeyError].(string)}
101+
}
169102
}
103+
return
104+
}
170105

171-
return r.r.Error
106+
func (resp *Response) String() (str string) {
107+
if resp.Code == OkCode {
108+
return fmt.Sprintf("<%d OK %v>", resp.RequestId, resp.Data)
109+
} else {
110+
return fmt.Sprintf("<%d ERR 0x%x %s>", resp.RequestId, resp.Code, resp.Error)
111+
}
172112
}

0 commit comments

Comments
 (0)