Skip to content

Commit 34b342d

Browse files
committed
Add SIGTERM handler for a graceful shutdown
Changes the wait_for_connection function in the Listen trait to be non-blocking. That allows checking in the main loop if the SIGTERM signal has been raised. Signed-off-by: Hugues de Valon <[email protected]>
1 parent 3cad90c commit 34b342d

File tree

6 files changed

+79
-19
lines changed

6 files changed

+79
-19
lines changed

Cargo.lock

Lines changed: 27 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,18 @@ authors = ["Ionut Mihalcea <[email protected]>",
55
"Hugues de Valon <[email protected]>"]
66
edition = "2018"
77

8+
[[bin]]
9+
name = "parsec"
10+
path = "src/bin/main.rs"
11+
812
[dependencies]
913
parsec-interface = { git = "https://github.com/parallaxsecond/parsec-interface-rs", tag = "0.1.0" }
1014
rand = "0.7.2"
1115
base64 = "0.10.1"
1216
uuid = "0.7.4"
1317
threadpool = "1.7.1"
1418
std-semaphore = "0.1.0"
19+
signal-hook = "0.1.10"
1520

1621
[dev-dependencies]
1722
parsec-client-test = { git = "https://github.com/parallaxsecond/parsec-client-test", tag = "0.1.2" }

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@ This project uses the following third party crates:
138138
* threadpool (Apache-2.0)
139139
* std-semaphore (MIT and Apache-2.0)
140140
* num_cpus (MIT and Apache-2.0)
141+
* signal-hook (MIT and Apache-2.0)
141142

142143
This project uses the following third party libraries:
143144
* [Mbed Crypto](https://github.com/ARMmbed/mbed-crypto) (Apache-2.0)

src/bin/main.rs

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,14 @@ use parsec::providers::{
2525
use parsec_interface::operations_protobuf::ProtobufConverter;
2626
use parsec_interface::requests::AuthType;
2727
use parsec_interface::requests::{BodyType, ProviderID};
28+
use signal_hook::flag;
29+
use signal_hook::SIGTERM;
30+
use std::io::Error;
2831
use std::path::PathBuf;
32+
use std::sync::atomic::{AtomicBool, Ordering};
2933
use std::sync::Arc;
3034
use std::sync::RwLock;
35+
use std::thread;
3136
use std::time::Duration;
3237
use threadpool::Builder;
3338

@@ -91,23 +96,38 @@ fn build_components() -> (FrontEndHandler, impl Listen) {
9196
(front_end, listener)
9297
}
9398

94-
fn main() {
99+
fn main() -> Result<(), Error> {
95100
let (front_end_handler, listener) = build_components();
96101
// Multiple threads can not just have a reference of the front end handler because they could
97102
// outlive the run function. It is needed to give them all ownership of the front end handler
98103
// through an Arc.
99104
let front_end_handler = Arc::from(front_end_handler);
100105

106+
// Register a boolean set to true when the SIGTERM signal is received.
107+
let kill_signal = Arc::new(AtomicBool::new(false));
108+
flag::register(SIGTERM, kill_signal.clone())?;
109+
101110
let threadpool = Builder::new().build();
102111

103112
loop {
104-
if let Some(stream) = listener.wait_on_connection() {
113+
if kill_signal.load(Ordering::Relaxed) {
114+
println!("SIGTERM signal received.");
115+
break;
116+
}
117+
118+
if let Some(stream) = listener.accept() {
105119
let front_end_handler = front_end_handler.clone();
106120
threadpool.execute(move || {
107121
front_end_handler.handle_request(stream);
108122
});
109123
} else {
110-
println!("Error on establishing last connection, continuing...");
124+
//TODO: this value should come from the configuration.
125+
thread::sleep(Duration::from_millis(10));
111126
}
112127
}
128+
129+
println!("Shutting down PARSEC, waiting for all threads to finish.");
130+
threadpool.join();
131+
132+
Ok(())
113133
}

src/front/domain_socket.rs

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// See the License for the specific language governing permissions and
1414
// limitations under the License.
1515
use std::fs;
16+
use std::io::ErrorKind;
1617
use std::os::unix::net::UnixListener;
1718
use std::path::Path;
1819
use std::time::Duration;
@@ -34,7 +35,7 @@ pub struct DomainSocketListener {
3435
timeout: Duration,
3536
}
3637

37-
impl Listen for DomainSocketListener {
38+
impl DomainSocketListener {
3839
/// Initialise the connection to the Unix socket.
3940
///
4041
/// # Panics
@@ -53,21 +54,25 @@ impl Listen for DomainSocketListener {
5354
Err(err) => panic!(err),
5455
};
5556

57+
// Set the socket as non-blocking.
58+
listener_val
59+
.set_nonblocking(true)
60+
.expect("Could not set the socket as non-blocking");
61+
5662
self.listener = Some(listener_val);
5763
}
64+
}
5865

66+
impl Listen for DomainSocketListener {
5967
fn set_timeout(&mut self, duration: Duration) {
6068
self.timeout = duration;
6169
}
6270

63-
fn wait_on_connection(&self) -> Option<Box<dyn ReadWrite + Send>> {
71+
fn accept(&self) -> Option<Box<dyn ReadWrite + Send>> {
6472
if let Some(listener) = &self.listener {
65-
let stream_result = listener
66-
.incoming()
67-
.next()
68-
.expect("The Incoming iterator should never return None!");
73+
let stream_result = listener.accept();
6974
match stream_result {
70-
Ok(stream) => {
75+
Ok((stream, _)) => {
7176
if let Err(err) = stream.set_read_timeout(Some(self.timeout)) {
7277
println!("Failed to set read timeout ({})", err);
7378
None
@@ -79,7 +84,11 @@ impl Listen for DomainSocketListener {
7984
}
8085
}
8186
Err(err) => {
82-
println!("Failed to connect with a UnixStream ({})", err);
87+
// Check if the error is because no connections are currently present.
88+
if err.kind() != ErrorKind::WouldBlock {
89+
// Only log the real errors.
90+
println!("Failed to connect with a UnixStream ({})", err);
91+
}
8392
None
8493
}
8594
}

src/front/listener.rs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,22 +21,20 @@ pub trait ReadWrite: std::io::Read + std::io::Write {}
2121
impl<T: std::io::Read + std::io::Write> ReadWrite for T {}
2222

2323
pub trait Listen {
24-
/// Initialise the internals of the listener.
25-
fn init(&mut self);
26-
2724
/// Set the timeout on read and write calls on any stream returned by this listener.
2825
fn set_timeout(&mut self, duration: Duration);
2926

30-
/// Blocking call that waits for incoming connections and returns a stream (a Read and Write
31-
/// trait object). Requests are read from the stream and responses are written to it.
32-
/// Streams returned by this method should have a timeout period as set by the `set_timeout`
33-
/// method.
27+
/// Non-blocking call that gets the next client connection and returns a stream
28+
/// (a Read and Write trait object). Requests are read from the stream and responses are written
29+
/// to it. Streams returned by this method should have a timeout period as set by the
30+
/// `set_timeout` method.
31+
/// If no connections are present, return `None`.
3432
/// If there are any errors in establishing the connection other than the missing
3533
/// initialization, the implementation should log them and return `None`.
3634
/// `Send` is needed because the stream is moved to a thread.
3735
///
3836
/// # Panics
3937
///
4038
/// If the listener has not been initialised before, with the `init` method.
41-
fn wait_on_connection(&self) -> Option<Box<dyn ReadWrite + Send>>;
39+
fn accept(&self) -> Option<Box<dyn ReadWrite + Send>>;
4240
}

0 commit comments

Comments
 (0)