@@ -23,6 +23,9 @@ const (
23
23
writeWait = 10 * time .Second
24
24
pongWait = 60 * time .Second
25
25
pingPeriod = (pongWait * 9 ) / 10
26
+
27
+ maximumMessageSize = 2048
28
+ readMessageChanSize = 20 // <- I've put 20 here because it seems like a reasonable buffer but it may to increase
26
29
)
27
30
28
31
type readMessage struct {
@@ -89,38 +92,8 @@ func Websocket(ctx *context.Context) {
89
92
}
90
93
defer unregister ()
91
94
92
- readChan := make (chan readMessage , 20 )
93
- go func () {
94
- defer conn .Close ()
95
- conn .SetReadLimit (2048 )
96
- if err := conn .SetReadDeadline (time .Now ().Add (pongWait )); err != nil {
97
- log .Error ("unable to SetReadDeadline: %v" , err )
98
- return
99
- }
100
- conn .SetPongHandler (func (string ) error {
101
- if err := conn .SetReadDeadline (time .Now ().Add (pongWait )); err != nil {
102
- log .Error ("unable to SetReadDeadline: %v" , err )
103
- }
104
- return nil
105
- })
106
-
107
- for {
108
- messageType , message , err := conn .ReadMessage ()
109
- readChan <- readMessage {
110
- messageType : messageType ,
111
- message : message ,
112
- err : err ,
113
- }
114
- if err != nil {
115
- close (readChan )
116
- return
117
- }
118
- if err := conn .SetReadDeadline (time .Now ().Add (pongWait )); err != nil {
119
- log .Error ("unable to SetReadDeadline: %v" , err )
120
- return
121
- }
122
- }
123
- }()
95
+ readMessageChan := make (chan readMessage , readMessageChanSize )
96
+ go readMessagesFromConnToChan (conn , readMessageChan )
124
97
125
98
pingTicker := time .NewTicker (pingPeriod )
126
99
@@ -139,6 +112,7 @@ func Websocket(ctx *context.Context) {
139
112
return
140
113
default :
141
114
}
115
+
142
116
if err := conn .SetWriteDeadline (time .Now ().Add (writeWait )); err != nil {
143
117
log .Error ("unable to SetWriteDeadline: %v" , err )
144
118
return
@@ -147,10 +121,11 @@ func Websocket(ctx *context.Context) {
147
121
log .Error ("unable to send PingMessage: %v" , err )
148
122
return
149
123
}
150
- case message , ok := <- readChan :
124
+ case message , ok := <- readMessageChan :
151
125
if ! ok {
152
126
break
153
127
}
128
+
154
129
// ensure that we're not already cancelled
155
130
select {
156
131
case <- notify :
@@ -159,11 +134,14 @@ func Websocket(ctx *context.Context) {
159
134
return
160
135
default :
161
136
}
137
+
138
+ // FIXME: HANDLE MESSAGES
162
139
log .Info ("Got Message: %d:%s:%v" , message .messageType , message .message , message .err )
163
140
case event , ok := <- eventChan :
164
141
if ! ok {
165
142
break
166
143
}
144
+
167
145
// ensure that we're not already cancelled
168
146
select {
169
147
case <- notify :
@@ -172,6 +150,8 @@ func Websocket(ctx *context.Context) {
172
150
return
173
151
default :
174
152
}
153
+
154
+ // Handle events
175
155
if event .Name == "logout" {
176
156
if ctx .Session .ID () == event .Data {
177
157
event = & eventsource.Event {
@@ -196,14 +176,50 @@ func Websocket(ctx *context.Context) {
196
176
}
197
177
}
198
178
179
+ func readMessagesFromConnToChan (conn * websocket.Conn , messageChan chan readMessage ) {
180
+ defer func () {
181
+ close (messageChan ) // Please note: this has to be within a wrapping anonymous func otherwise it will be evaluated when creating the defer
182
+ _ = conn .Close ()
183
+ }()
184
+ conn .SetReadLimit (maximumMessageSize )
185
+ if err := conn .SetReadDeadline (time .Now ().Add (pongWait )); err != nil {
186
+ log .Error ("unable to SetReadDeadline: %v" , err )
187
+ return
188
+ }
189
+ conn .SetPongHandler (func (string ) error {
190
+ if err := conn .SetReadDeadline (time .Now ().Add (pongWait )); err != nil {
191
+ log .Error ("unable to SetReadDeadline: %v" , err )
192
+ }
193
+ return nil
194
+ })
195
+
196
+ for {
197
+ messageType , message , err := conn .ReadMessage ()
198
+ messageChan <- readMessage {
199
+ messageType : messageType ,
200
+ message : message ,
201
+ err : err ,
202
+ }
203
+ if err != nil {
204
+ // don't need to handle the error here as it is passed down the channel
205
+ return
206
+ }
207
+ if err := conn .SetReadDeadline (time .Now ().Add (pongWait )); err != nil {
208
+ log .Error ("unable to SetReadDeadline: %v" , err )
209
+ return
210
+ }
211
+ }
212
+ }
213
+
199
214
func writeEvent (conn * websocket.Conn , event * eventsource.Event ) error {
200
215
if err := conn .SetWriteDeadline (time .Now ().Add (writeWait )); err != nil {
201
216
log .Error ("unable to SetWriteDeadline: %v" , err )
202
217
return err
203
218
}
219
+
204
220
w , err := conn .NextWriter (websocket .TextMessage )
205
221
if err != nil {
206
- log .Warn ("Unable to get writer for websocket %v" , err )
222
+ log .Error ("Unable to get writer for websocket %v" , err )
207
223
return err
208
224
}
209
225
0 commit comments