Skip to content

Commit 20b7929

Browse files
committed
Fix the bug of listner intermediary state
1 parent 7f029e8 commit 20b7929

File tree

1 file changed

+57
-21
lines changed

1 file changed

+57
-21
lines changed

packages/transport-tcp/src/listener.ts

Lines changed: 57 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,21 @@ export interface TCPListenerMetrics {
5555
events: CounterGroup
5656
}
5757

58-
type Status = { started: false } | {
59-
started: true
58+
enum TCPListenerStatusCode {
59+
/**
60+
* When server object is initialized but we don't know the listening address yet or
61+
* the server object is stopped manually, can be resumed only by calling listen()
62+
**/
63+
INERT = 'inert',
64+
/* When listener is aware of the address but the server is not started listening */
65+
INITIALIZED = 'initializing',
66+
LISTENING = 'listening',
67+
/* During the connection limits */
68+
PAUSED = 'paused',
69+
}
70+
71+
type Status = { code: TCPListenerStatusCode.INERT } | {
72+
code: Exclude<TCPListenerStatusCode, TCPListenerStatusCode.INERT>
6073
listeningAddr: Multiaddr
6174
peerId: string | null
6275
netConfig: NetConfig
@@ -66,7 +79,7 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
6679
private readonly server: net.Server
6780
/** Keep track of open connections to destroy in case of timeout */
6881
private readonly connections = new Set<MultiaddrConnection>()
69-
private status: Status = { started: false }
82+
private status: Status = { code: TCPListenerStatusCode.INERT }
7083
private metrics?: TCPListenerMetrics
7184
private addr: string
7285

@@ -144,6 +157,8 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
144157
this.dispatchEvent(new CustomEvent<Error>('error', { detail: err }))
145158
})
146159
.on('close', () => {
160+
if(this.status.code === TCPListenerStatusCode.PAUSED) return
161+
147162
this.metrics?.status.update({
148163
[this.addr]: SERVER_STATUS_DOWN
149164
})
@@ -152,6 +167,9 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
152167
}
153168

154169
private onSocket (socket: net.Socket): void {
170+
if(this.status.code === TCPListenerStatusCode.INERT) {
171+
throw new Error('Server is is not listening yet')
172+
}
155173
// Avoid uncaught errors caused by unstable connections
156174
socket.on('error', err => {
157175
log('socket error', err)
@@ -161,7 +179,7 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
161179
let maConn: MultiaddrConnection
162180
try {
163181
maConn = toMultiaddrConnection(socket, {
164-
listeningAddr: this.status.started ? this.status.listeningAddr : undefined,
182+
listeningAddr: this.status.code ? this.status.listeningAddr : undefined,
165183
socketInactivityTimeout: this.context.socketInactivityTimeout,
166184
socketCloseTimeout: this.context.socketCloseTimeout,
167185
metrics: this.metrics?.events,
@@ -191,7 +209,7 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
191209
// another process during the time the server if closed. In that case there's not much
192210
// we can do. netListen() will be called again every time a connection is dropped, which
193211
// acts as an eventual retry mechanism. onListenError allows the consumer act on this.
194-
this.netListen().catch(e => {
212+
this.resume().catch(e => {
195213
log.error('error attempting to listen server once connection count under limit', e)
196214
this.context.closeServerOnMaxConnections?.onListenError?.(e as Error)
197215
})
@@ -206,7 +224,9 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
206224
this.context.closeServerOnMaxConnections != null &&
207225
this.connections.size >= this.context.closeServerOnMaxConnections.closeAbove
208226
) {
209-
this.netClose()
227+
this.pause(false).catch(e => {
228+
log.error('error attempting to close server once connection count over limit', e)
229+
})
210230
}
211231

212232
this.dispatchEvent(new CustomEvent<Connection>('connection', { detail: conn }))
@@ -232,7 +252,7 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
232252
}
233253

234254
getAddrs (): Multiaddr[] {
235-
if (!this.status.started) {
255+
if (this.status.code === TCPListenerStatusCode.INERT) {
236256
return []
237257
}
238258

@@ -264,7 +284,7 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
264284
}
265285

266286
async listen (ma: Multiaddr): Promise<void> {
267-
if (this.status.started) {
287+
if (this.status.code === TCPListenerStatusCode.LISTENING || this.status.code === TCPListenerStatusCode.PAUSED ) {
268288
throw Error('server is already listening')
269289
}
270290

@@ -273,26 +293,31 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
273293
const { backlog } = this.context
274294

275295
this.status = {
276-
started: true,
296+
code: TCPListenerStatusCode.INITIALIZED,
277297
listeningAddr,
278298
peerId,
279299
netConfig: multiaddrToNetConfig(listeningAddr, { backlog })
280300
}
281301

282-
await this.netListen()
302+
await this.resume()
283303
}
284304

285305
async close (): Promise<void> {
286306
await Promise.all(
287307
Array.from(this.connections.values()).map(async maConn => { await attemptClose(maConn) })
288308
)
289309

290-
// netClose already checks if server.listening
291-
this.netClose()
310+
await this.pause(true)
292311
}
293312

294-
private async netListen (): Promise<void> {
295-
if (!this.status.started || this.server.listening) {
313+
/**
314+
* Can resume a stopped or start an inert server
315+
*/
316+
private async resume (): Promise<void> {
317+
if (
318+
!(this.status.code === TCPListenerStatusCode.INITIALIZED ||
319+
this.status.code === TCPListenerStatusCode.PAUSED) ||
320+
this.server.listening) {
296321
return
297322
}
298323

@@ -303,12 +328,20 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
303328
this.server.once('error', reject)
304329
this.server.listen(netConfig, resolve)
305330
})
306-
331+
this.status = { ...this.status, code: TCPListenerStatusCode.LISTENING }
307332
log('Listening on %s', this.server.address())
308333
}
309334

310-
private netClose (): void {
311-
if (!this.status.started || !this.server.listening) {
335+
private async pause (permanent: boolean): Promise<void> {
336+
if(this.status.code === TCPListenerStatusCode.PAUSED && permanent) {
337+
this.status = { code: TCPListenerStatusCode.INERT }
338+
return
339+
}
340+
341+
if (
342+
!(this.status.code === TCPListenerStatusCode.INITIALIZED ||
343+
this.status.code === TCPListenerStatusCode.LISTENING) ||
344+
!this.server.listening) {
312345
return
313346
}
314347

@@ -326,9 +359,12 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
326359
// Stops the server from accepting new connections and keeps existing connections.
327360
// 'close' event is emitted only emitted when all connections are ended.
328361
// The optional callback will be called once the 'close' event occurs.
329-
//
330-
// NOTE: Since we want to keep existing connections and have checked `!this.server.listening` it's not necessary
331-
// to pass a callback to close.
332-
this.server.close()
362+
363+
// We need to set this status before closing server, so other procedures are aware
364+
// during the time the server is closing
365+
this.status = permanent ? { code: TCPListenerStatusCode.INERT } : { ...this.status, code: TCPListenerStatusCode.PAUSED }
366+
await new Promise<void>((resolve, reject) => {
367+
this.server.close( err => { err ? reject(err) : resolve() })
368+
})
333369
}
334370
}

0 commit comments

Comments
 (0)