-
Notifications
You must be signed in to change notification settings - Fork 645
Add support for graceful shutdown of the server #1672
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,19 +1,22 @@ | ||
#![deny(warnings, clippy::all, rust_2018_idioms)] | ||
|
||
use cargo_registry::{boot, App, Env}; | ||
use jemalloc_ctl; | ||
use std::{ | ||
fs::File, | ||
sync::{mpsc::channel, Arc}, | ||
sync::{mpsc::channel, Arc, Mutex}, | ||
thread, | ||
time::Duration, | ||
}; | ||
|
||
use civet::Server as CivetServer; | ||
use conduit_hyper::Service as HyperService; | ||
use futures::Future; | ||
use jemalloc_ctl; | ||
use reqwest::Client; | ||
|
||
enum Server { | ||
Civet(CivetServer), | ||
Hyper(HyperService<conduit_middleware::MiddlewareBuilder>), | ||
Hyper(tokio::runtime::Runtime), | ||
} | ||
|
||
use Server::*; | ||
|
@@ -56,7 +59,34 @@ fn main() { | |
|
||
let server = if dotenv::var("USE_HYPER").is_ok() { | ||
println!("Booting with a hyper based server"); | ||
Hyper(HyperService::new(app, threads as usize)) | ||
let addr = ([127, 0, 0, 1], port).into(); | ||
let service = HyperService::new(app, threads as usize); | ||
let server = hyper::Server::bind(&addr).serve(service); | ||
|
||
let (tx, rx) = futures::sync::oneshot::channel::<()>(); | ||
let server = server | ||
.with_graceful_shutdown(rx) | ||
.map_err(|e| log::error!("Server error: {}", e)); | ||
|
||
ctrlc_handler(move || tx.send(()).unwrap_or(())); | ||
|
||
let mut rt = tokio::runtime::Builder::new() | ||
.core_threads(4) | ||
.name_prefix("hyper-server-worker-") | ||
.after_start(|| { | ||
log::debug!("Stared thread {}", thread::current().name().unwrap_or("?")) | ||
}) | ||
.before_stop(|| { | ||
log::debug!( | ||
"Stopping thread {}", | ||
thread::current().name().unwrap_or("?") | ||
) | ||
}) | ||
.build() | ||
.unwrap(); | ||
rt.spawn(server); | ||
|
||
Hyper(rt) | ||
} else { | ||
println!("Booting with a civet based server"); | ||
let mut cfg = civet::Config::new(); | ||
|
@@ -66,19 +96,43 @@ fn main() { | |
|
||
println!("listening on port {}", port); | ||
|
||
// Give tokio a chance to spawn the first worker thread | ||
thread::sleep(Duration::from_millis(10)); | ||
|
||
// Creating this file tells heroku to tell nginx that the application is ready | ||
// to receive traffic. | ||
if heroku { | ||
println!("Writing to /tmp/app-initialized"); | ||
File::create("/tmp/app-initialized").unwrap(); | ||
} | ||
|
||
if let Hyper(server) = server { | ||
let addr = ([127, 0, 0, 1], port).into(); | ||
server.run(addr); | ||
} else { | ||
// Civet server is already running, but we need to block the main thread forever | ||
// TODO: handle a graceful shutdown by just waiting for a SIG{INT,TERM} | ||
let (_tx, rx) = channel::<()>(); | ||
rx.recv().unwrap(); | ||
// Block the main thread until the server has shutdown | ||
match server { | ||
Hyper(rt) => rt.shutdown_on_idle().wait().unwrap(), | ||
Civet(server) => { | ||
let (tx, rx) = channel::<()>(); | ||
ctrlc_handler(move || tx.send(()).unwrap_or(())); | ||
rx.recv().unwrap(); | ||
drop(server); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The drop seems unnecessary given it'll be dropped at the end of the block which immediately follows. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd prefer to leave the drop explicit here. Removing the drop means |
||
} | ||
} | ||
|
||
println!("Server has gracefully shutdown!"); | ||
} | ||
|
||
fn ctrlc_handler<F>(f: F) | ||
where | ||
F: FnOnce() + Send + 'static, | ||
{ | ||
let call_once = Mutex::new(Some(f)); | ||
|
||
ctrlc::set_handler(move || { | ||
if let Some(f) = call_once.lock().unwrap().take() { | ||
println!("Starting graceful shutdown"); | ||
f(); | ||
} else { | ||
println!("Already sent signal to start graceful shutdown"); | ||
} | ||
}) | ||
.unwrap(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be configured with an environment variable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's fine to leave this hard-coded for now. I added it mainly because sometimes we see the server boot with different levels of initial memory consumption and I'm trying to reduce potential variability (although I doubt this will really affect anything). Long term, I expect we can switch back to all the defaults provided by
Runtime::new()
, which will set the number of core threads to the CPU count.