Skip to content

Commit a8d0f91

Browse files
committed
partial
Signed-off-by: Andrew Thornton <[email protected]>
1 parent 7b5b616 commit a8d0f91

File tree

3 files changed

+119
-33
lines changed

3 files changed

+119
-33
lines changed

routers/web/events/websocket.go

Lines changed: 86 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package events
77
import (
88
"net/http"
99
"net/url"
10+
"time"
1011

1112
"code.gitea.io/gitea/modules/context"
1213
"code.gitea.io/gitea/modules/eventsource"
@@ -18,6 +19,12 @@ import (
1819
"github.com/gorilla/websocket"
1920
)
2021

22+
const (
23+
writeWait = 10 * time.Second
24+
pongWait = 60 * time.Second
25+
pingPeriod = (pongWait * 9) / 10
26+
)
27+
2128
type readMessage struct {
2229
messageType int
2330
message []byte
@@ -84,6 +91,19 @@ func Websocket(ctx *context.Context) {
8491

8592
readChan := make(chan readMessage, 20)
8693
go func() {
94+
defer conn.Close()
95+
conn.SetReadLimit(2048)
96+
if err := conn.SetReadDeadline(time.Now().Add(pongWait)); err != nil {
97+
log.Error("unable to SetReadDeadline: %v", err)
98+
return
99+
}
100+
conn.SetPongHandler(func(string) error {
101+
if err := conn.SetReadDeadline(time.Now().Add(pongWait)); err != nil {
102+
log.Error("unable to SetReadDeadline: %v", err)
103+
}
104+
return nil
105+
})
106+
87107
for {
88108
messageType, message, err := conn.ReadMessage()
89109
readChan <- readMessage{
@@ -95,30 +115,70 @@ func Websocket(ctx *context.Context) {
95115
close(readChan)
96116
return
97117
}
118+
if err := conn.SetReadDeadline(time.Now().Add(pongWait)); err != nil {
119+
log.Error("unable to SetReadDeadline: %v", err)
120+
return
121+
}
98122
}
99123
}()
100124

125+
pingTicker := time.NewTicker(pingPeriod)
126+
101127
for {
102128
select {
103129
case <-notify:
104130
return
105131
case <-shutdownCtx.Done():
106132
return
107-
case _, ok := <-readChan:
133+
case <-pingTicker.C:
134+
// ensure that we're not already cancelled
135+
select {
136+
case <-notify:
137+
return
138+
case <-shutdownCtx.Done():
139+
return
140+
default:
141+
}
142+
if err := conn.SetWriteDeadline(time.Now().Add(writeWait)); err != nil {
143+
log.Error("unable to SetWriteDeadline: %v", err)
144+
return
145+
}
146+
if err := conn.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
147+
log.Error("unable to send PingMessage: %v", err)
148+
return
149+
}
150+
case message, ok := <-readChan:
108151
if !ok {
109152
break
110153
}
154+
// ensure that we're not already cancelled
155+
select {
156+
case <-notify:
157+
return
158+
case <-shutdownCtx.Done():
159+
return
160+
default:
161+
}
162+
log.Info("Got Message: %d:%s:%v", message.messageType, message.message, message.err)
111163
case event, ok := <-eventChan:
112164
if !ok {
113165
break
114166
}
167+
// ensure that we're not already cancelled
168+
select {
169+
case <-notify:
170+
return
171+
case <-shutdownCtx.Done():
172+
return
173+
default:
174+
}
115175
if event.Name == "logout" {
116176
if ctx.Session.ID() == event.Data {
117-
_, _ = (&eventsource.Event{
177+
event = &eventsource.Event{
118178
Name: "logout",
119179
Data: "here",
120-
}).WriteTo(ctx.Resp)
121-
ctx.Resp.Flush()
180+
}
181+
_ = writeEvent(conn, event)
122182
go unregister()
123183
auth.HandleSignOut(ctx)
124184
break
@@ -129,22 +189,31 @@ func Websocket(ctx *context.Context) {
129189
Data: "elsewhere",
130190
}
131191
}
132-
133-
w, err := conn.NextWriter(websocket.TextMessage)
134-
if err != nil {
135-
log.Warn("Unable to get writer for websocket %v", err)
192+
if err := writeEvent(conn, event); err != nil {
136193
return
137194
}
195+
}
196+
}
197+
}
138198

139-
if err := json.NewEncoder(w).Encode(event); err != nil {
140-
log.Error("Unable to create encoder for %v %v", event, err)
141-
return
142-
}
143-
if err := w.Close(); err != nil {
144-
log.Warn("Unable to close writer for websocket %v", err)
145-
return
146-
}
199+
func writeEvent(conn *websocket.Conn, event *eventsource.Event) error {
200+
if err := conn.SetWriteDeadline(time.Now().Add(writeWait)); err != nil {
201+
log.Error("unable to SetWriteDeadline: %v", err)
202+
return err
203+
}
204+
w, err := conn.NextWriter(websocket.TextMessage)
205+
if err != nil {
206+
log.Warn("Unable to get writer for websocket %v", err)
207+
return err
208+
}
147209

148-
}
210+
if err := json.NewEncoder(w).Encode(event); err != nil {
211+
log.Error("Unable to create encoder for %v %v", event, err)
212+
return err
213+
}
214+
if err := w.Close(); err != nil {
215+
log.Warn("Unable to close writer for websocket %v", err)
216+
return err
149217
}
218+
return nil
150219
}

web_src/js/features/notification.js

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,10 @@ export function initNotificationsTable() {
2424
});
2525
}
2626

27-
async function receiveUpdateCount(data) {
27+
async function receiveUpdateCount(data, document) {
2828
console.log(data);
2929
try {
30+
console.log(window, document);
3031
const notificationCount = document.querySelector('.notification_count');
3132
if (data.Count > 0) {
3233
notificationCount.classList.remove('hidden');
@@ -36,6 +37,9 @@ async function receiveUpdateCount(data) {
3637

3738
notificationCount.textContent = `${data.Count}`;
3839
console.log(notificationCount);
40+
const oldDisplay = notificationCount.style.display;
41+
notificationCount.style.display = 'none';
42+
notificationCount.style.display = oldDisplay;
3943
await updateNotificationTable();
4044
} catch (error) {
4145
console.error(error, data);
@@ -62,9 +66,11 @@ export function initNotificationCount() {
6266
workerUrl = `${window.location.origin}${appSubUrl}/user/events`;
6367
}
6468

69+
const currentDocument = document;
70+
6571
if (worker) {
6672
worker.addEventListener('error', (event) => {
67-
console.error(event);
73+
console.error('error from listener: ', event);
6874
});
6975
worker.port.addEventListener('messageerror', () => {
7076
console.error('Unable to deserialize message');
@@ -75,12 +81,13 @@ export function initNotificationCount() {
7581
});
7682
worker.port.addEventListener('message', (event) => {
7783
if (!event.data || !event.data.type) {
78-
console.error(event);
84+
console.error('Unexpected event:', event);
7985
return;
8086
}
8187
console.log(event);
88+
console.log(currentDocument === document);
8289
if (event.data.type === 'notification-count') {
83-
const _promise = receiveUpdateCount(event.data.data).then(console.log('done'));
90+
const _promise = receiveUpdateCount(event.data.data, currentDocument).then(console.log('done'));
8491
} else if (event.data.type === 'error') {
8592
console.error(event.data);
8693
} else if (event.data.type === 'logout') {
@@ -98,6 +105,7 @@ export function initNotificationCount() {
98105
});
99106
worker.port.close();
100107
}
108+
console.log('done eventlistenter');
101109
});
102110
worker.port.addEventListener('error', (e) => {
103111
console.error(e);

web_src/js/features/websocket.sharedworker.js

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,18 @@ class Source {
2828
});
2929
}
3030
});
31+
this.webSocket.addEventListener('close', (event) => {
32+
if (!this.webSocket) {
33+
return;
34+
}
35+
const oldWebSocket = this.webSocket;
36+
this.webSocket = null;
37+
this.notifyClients({
38+
type: 'close',
39+
data: event
40+
});
41+
oldWebSocket.close();
42+
});
3143
}
3244

3345
register(port) {
@@ -66,20 +78,14 @@ class Source {
6678

6779
close() {
6880
if (!this.webSocket) return;
69-
70-
this.webSocket.close();
81+
const oldWebSocket = this.webSocket;
7182
this.webSocket = null;
83+
oldWebSocket.close();
7284
}
7385

7486
listen(eventType) {
7587
if (this.listening[eventType]) return;
7688
this.listening[eventType] = true;
77-
this.webSocket.addEventListener(eventType, (event) => {
78-
this.notifyClients({
79-
type: eventType,
80-
data: event.data
81-
});
82-
});
8389
}
8490

8591
notifyClients(event) {
@@ -104,13 +110,16 @@ self.addEventListener('connect', (e) => {
104110
if (sourcesByUrl[url]) {
105111
// we have a Source registered to this url
106112
const source = sourcesByUrl[url];
107-
source.register(port);
108-
sourcesByPort[port] = source;
109-
return;
113+
if (source.webSocket) {
114+
source.register(port);
115+
sourcesByPort[port] = source;
116+
return;
117+
}
118+
sourcesByUrl[url] = null;
110119
}
111120
let source = sourcesByPort[port];
112121
if (source) {
113-
if (source.eventSource && source.url === url) return;
122+
if (source.webSocket && source.url === url) return;
114123

115124
// How this has happened I don't understand...
116125
// deregister from that source

0 commit comments

Comments
 (0)