@@ -118,6 +118,9 @@ struct Server{L <: Listener}
118118 connections:: Set{Connection}
119119 # server listenandserve loop task
120120 task:: Task
121+ # Protects the connections Set which is mutated in the listenloop
122+ # while potentially being accessed by the close method at the same time
123+ connections_lock:: ReentrantLock
121124end
122125
123126port (s:: Server ) = Int (s. listener. addr. port)
@@ -127,8 +130,10 @@ Base.wait(s::Server) = wait(s.task)
127130function forceclose (s:: Server )
128131 shutdown (s. on_shutdown)
129132 close (s. listener)
130- for c in s. connections
131- close (c)
133+ Base. @lock s. connections_lock begin
134+ for c in s. connections
135+ close (c)
136+ end
132137 end
133138 return wait (s. task)
134139end
@@ -166,14 +171,19 @@ function Base.close(s::Server)
166171 shutdown (s. on_shutdown)
167172 close (s. listener)
168173 # first pass to mark or request connections to close
169- for c in s. connections
170- requestclose! (c)
174+ Base. @lock s. connections_lock begin
175+ for c in s. connections
176+ requestclose! (c)
177+ end
171178 end
172179 # second pass to wait for connections to close
173180 # we wait for connections to empty because as
174181 # connections close themselves, they are removed
175182 # from our connections Set
176- while ! isempty (s. connections)
183+ while true
184+ Base. @lock s. connections_lock begin
185+ isempty (s. connections) && break
186+ end
177187 sleep (0.5 + rand () * 0.1 )
178188 end
179189 return wait (s. task)
@@ -346,25 +356,28 @@ function listen!(f, listener::Listener;
346356 access_log:: Union{Function,Nothing} = nothing ,
347357 verbose= false , kw... )
348358 conns = Set {Connection} ()
359+ conns_lock = ReentrantLock ()
349360 ready_to_accept = Threads. Event ()
350361 if verbose > 0
351362 tsk = @_spawn_interactive LoggingExtras. withlevel (Logging. Debug; verbosity= verbose) do
352- listenloop (f, listener, conns, tcpisvalid, max_connections, readtimeout, access_log, ready_to_accept, verbose)
363+ listenloop (f, listener, conns, tcpisvalid, max_connections, readtimeout, access_log, ready_to_accept, conns_lock, verbose)
353364 end
354365 else
355- tsk = @_spawn_interactive listenloop (f, listener, conns, tcpisvalid, max_connections, readtimeout, access_log, ready_to_accept, verbose)
366+ tsk = @_spawn_interactive listenloop (f, listener, conns, tcpisvalid, max_connections, readtimeout, access_log, ready_to_accept, conns_lock, verbose)
356367 end
357368 # wait until the listenloop enters the loop
358369 wait (ready_to_accept)
359- return Server (listener, on_shutdown, conns, tsk)
370+ return Server (listener, on_shutdown, conns, tsk, conns_lock )
360371end
361372
362373""" "
363374Main server loop.
364375Accepts new tcp connections and spawns async tasks to handle them."
365376"""
366- function listenloop (f, listener, conns, tcpisvalid,
367- max_connections, readtimeout, access_log, ready_to_accept, verbose)
377+ function listenloop (
378+ f, listener, conns, tcpisvalid, max_connections, readtimeout, access_log, ready_to_accept,
379+ conns_lock, verbose
380+ )
368381 sem = Base. Semaphore (max_connections)
369382 verbose >= 0 && @infov 1 " Listening on: $(listener. hostname) :$(listener. hostport) , thread id: $(Threads. threadid ()) "
370383 notify (ready_to_accept)
@@ -382,13 +395,13 @@ function listenloop(f, listener, conns, tcpisvalid,
382395 end
383396 conn = Connection (io)
384397 conn. state = IDLE
385- push! (conns, conn)
398+ Base . @lock conns_lock push! (conns, conn)
386399 conn. host, conn. port = listener. hostname, listener. hostport
387400 @async try
388401 handle_connection (f, conn, listener, readtimeout, access_log)
389402 finally
390403 # handle_connection is in charge of closing the underlying io
391- delete! (conns, conn)
404+ Base . @lock conns_lock delete! (conns, conn)
392405 Base. release (sem)
393406 end
394407 catch e
0 commit comments