Skip to content

Add middleware to prioritize download traffic #2479

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl App {
/// - GitHub OAuth
/// - Database connection pools
/// - A `git2::Repository` instance from the index repo checkout (that server.rs ensures exists)
pub fn new(config: &Config, http_client: Option<Client>) -> App {
pub fn new(config: Config, http_client: Option<Client>) -> App {
use oauth2::prelude::*;
use oauth2::{AuthUrl, ClientId, ClientSecret, Scope, TokenUrl};
use url::Url;
Expand Down Expand Up @@ -126,7 +126,7 @@ impl App {
read_only_replica_database,
github,
session_key: config.session_key.clone(),
config: config.clone(),
config,
http_client,
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/bin/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = cargo_registry::Config::default();
let client = Client::new();

let app = App::new(&config, Some(client));
let app = App::new(config.clone(), Some(client));
let app = cargo_registry::build_handler(Arc::new(app));

// On every server restart, ensure the categories available in the database match
Expand Down
29 changes: 27 additions & 2 deletions src/middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use self::log_connection_pool_status::LogConnectionPoolStatus;
use self::static_or_continue::StaticOrContinue;

pub mod app;
mod balance_capacity;
mod block_traffic;
pub mod current_user;
mod debug;
Expand Down Expand Up @@ -46,8 +47,6 @@ pub fn build_middleware(app: Arc<App>, endpoints: R404) -> MiddlewareBuilder {
if env == Env::Development {
// Print a log for each request.
m.add(Debug);
// Locally serve crates and readmes
m.around(StaticOrContinue::new("local_uploads"));
}

if env::var_os("DEBUG_REQUESTS").is_some() {
Expand All @@ -74,13 +73,39 @@ pub fn build_middleware(app: Arc<App>, endpoints: R404) -> MiddlewareBuilder {

// Note: The following `m.around()` middleware is run from bottom to top

// This is currently the final middleware to run. If a middleware layer requires a database
// connection, it should be run after this middleware so that the potential pool usage can be
// tracked here.
//
// In production we currently have 2 equally sized pools (primary and a read-only replica).
// Because such a large portion of production traffic is for download requests (which update
// download counts), we consider only the primary pool here.
if let Ok(capacity) = env::var("DB_POOL_SIZE") {
if let Ok(capacity) = capacity.parse() {
if capacity >= 10 {
println!(
"Enabling BalanceCapacity middleware with {} pool capacity",
capacity
);
m.around(balance_capacity::BalanceCapacity::new(capacity))
} else {
println!("BalanceCapacity middleware not enabled. DB_POOL_SIZE is too low.");
}
}
}

// Serve the static files in the *dist* directory, which are the frontend assets.
// Not needed for the backend tests.
if env != Env::Test {
m.around(EmberHtml::new("dist"));
m.around(StaticOrContinue::new("dist"));
}

if env == Env::Development {
// Locally serve crates and readmes
m.around(StaticOrContinue::new("local_uploads"));
}

m.around(Head::default());

for (header, blocked_values) in config.blocked_traffic {
Expand Down
115 changes: 115 additions & 0 deletions src/middleware/balance_capacity.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
//! Reject certain requests as instance load reaches capacity.
//!
//! The primary goal of this middleware is to avoid starving the download endpoint of resources.
//! When bots send many parallel requests that run slow database queries, download requests may
//! block and eventually timeout waiting for a database connection.
//!
//! Bots must continue to respect our crawler policy, but until we can manually block bad clients
//! we should avoid dropping download requests even if that means rejecting some legitimate
//! requests to other endpoints.

use std::sync::atomic::{AtomicUsize, Ordering};

use super::prelude::*;
use conduit::{RequestExt, StatusCode};

#[derive(Default)]
pub(super) struct BalanceCapacity {
handler: Option<Box<dyn Handler>>,
capacity: usize,
in_flight_requests: AtomicUsize,
log_at_percentage: usize,
throttle_at_percentage: usize,
dl_only_at_percentage: usize,
}

impl BalanceCapacity {
pub fn new(capacity: usize) -> Self {
Self {
handler: None,
capacity,
in_flight_requests: AtomicUsize::new(0),
log_at_percentage: read_env_percentage("WEB_CAPACITY_LOG_PCT", 20),
throttle_at_percentage: read_env_percentage("WEB_CAPACITY_THROTTLE_PCT", 70),
dl_only_at_percentage: read_env_percentage("WEB_CAPACITY_DL_ONLY_PCT", 80),
}
}
}

impl AroundMiddleware for BalanceCapacity {
fn with_handler(&mut self, handler: Box<dyn Handler>) {
self.handler = Some(handler);
}
}

impl Handler for BalanceCapacity {
fn call(&self, request: &mut dyn RequestExt) -> AfterResult {
// The _drop_on_exit ensures the counter is decremented for all exit paths (including panics)
let (_drop_on_exit, count) = RequestCounter::add_one(&self.in_flight_requests);
let handler = self.handler.as_ref().unwrap();
let load = 100 * count / self.capacity;

// Begin logging request count so early stages of load increase can be located
if load >= self.log_at_percentage {
super::log_request::add_custom_metadata(request, "in_flight_requests", count);
}

// Download requests are always accepted
if request.path().starts_with("/api/v1/crates/") && request.path().ends_with("/download") {
return handler.call(request);
}

// Reject read-only requests as load nears capacity. Bots are likely to send only safe
// requests and this helps prioritize requests that users may be reluctant to retry.
if load >= self.throttle_at_percentage && request.method().is_safe() {
return over_capacity_response(request);
}

// As load reaches capacity, all non-download requests are rejected
if load >= self.dl_only_at_percentage {
return over_capacity_response(request);
}

handler.call(request)
}
}

fn over_capacity_response(request: &mut dyn RequestExt) -> AfterResult {
// TODO: Generate an alert so we can investigate
super::log_request::add_custom_metadata(request, "cause", "over capacity");
let body = "Service temporarily unavailable";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we'll want an explicit dropped_due_to_low_capacity=true or similar item in the log, to know at a glance why that request returned a 503.

Response::builder()
.status(StatusCode::SERVICE_UNAVAILABLE)
.header(header::CONTENT_LENGTH, body.len())
.body(Body::from_static(body.as_bytes()))
.map_err(box_error)
}

fn read_env_percentage(name: &str, default: usize) -> usize {
if let Ok(value) = std::env::var(name) {
value.parse().unwrap_or(default)
} else {
default
}
}

// FIXME(JTG): I've copied the following from my `conduit-hyper` crate. Once we transition from
// `civet`, we could pass the in_flight_request count from `condut-hyper` via a request extension.

/// A struct that stores a reference to an atomic counter so it can be decremented when dropped
struct RequestCounter<'a> {
counter: &'a AtomicUsize,
}

impl<'a> RequestCounter<'a> {
fn add_one(counter: &'a AtomicUsize) -> (Self, usize) {
let previous = counter.fetch_add(1, Ordering::SeqCst);
(Self { counter }, previous + 1)
}
}

impl<'a> Drop for RequestCounter<'a> {
fn drop(&mut self) {
self.counter.fetch_sub(1, Ordering::SeqCst);
}
}
2 changes: 1 addition & 1 deletion src/tests/all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ fn build_app(
None
};

let app = App::new(&config, client);
let app = App::new(config, client);
t!(t!(app.primary_database.get()).begin_test_transaction());
let app = Arc::new(app);
let handler = cargo_registry::build_handler(Arc::clone(&app));
Expand Down