From fd8e614061529de38bda1117e845a16814b919f4 Mon Sep 17 00:00:00 2001 From: Bryan Burgers Date: Sun, 10 Mar 2019 15:01:00 -0500 Subject: [PATCH] WIP: Rate limit the publish crate endpoint A start to rate limiting the publish crate endpoint. Submitting to get early feedback on whether this is the right direction to go. --- **Why not use middleware?** I originally wanted something declarative, and thought middleware would be the ideal place for rate limiting. It looks like our framework does not have per-route middleware, and routing does not happen until after all of the middleware has run. I didn't see a clean way (other than parsing the URL a second time and maintaining two parallel lists of URL routes) to make this work. **Can we still do declarative rate limiting?** We can. In `src/router.rs`, I ended up with code like ``` api_router.push("/crates/new", c(krate::publish::publish).rl(rate_limit::RateLimitCategory::Publish)); ``` However, the initial use case is to rate limit new crates differently from new versions, and that will be imperative anyway, so I didn't pursue this any further. If this is valuable for other endpoints that don't require in-controller logic, we can add it later. --- Left to do: - [ ] Diesel migration to create a table - [ ] Complete the RateLimiterPostgres implementation - [ ] Select which limiter to use based on the environment - [ ] Write tests --- src/app.rs | 15 ++- src/controllers/krate/publish.rs | 4 + src/util/errors.rs | 25 +++++ src/util/mod.rs | 1 + src/util/rate_limit.rs | 167 +++++++++++++++++++++++++++++++ 5 files changed, 210 insertions(+), 2 deletions(-) create mode 100644 src/util/rate_limit.rs diff --git a/src/app.rs b/src/app.rs index 6d56a488cae..d908f997235 100644 --- a/src/app.rs +++ b/src/app.rs @@ -1,6 +1,6 @@ //! Application-wide components in a struct accessible from each request -use crate::{db, util::CargoResult, Config, Env}; +use crate::{db, util::CargoResult, util::rate_limit, Config, Env}; use std::{path::PathBuf, sync::Arc, time::Duration}; use diesel::r2d2; @@ -24,6 +24,9 @@ pub struct App { /// Only used in the development environment. pub git_repo_checkout: PathBuf, + /// The type of rate limiting to do. + pub rate_limiter: Box, + /// The server configuration pub config: Config, } @@ -84,11 +87,19 @@ impl App { .connection_customizer(Box::new(connection_config)) .thread_pool(thread_pool); + let diesel_database = db::diesel_pool(&config.db_url, config.env, diesel_db_config); + + // TODO: Use environment variable to select rate limiter style until we're confident that + // rate limiting doesn't add extra load. + let rate_limiter = Box::new(rate_limit::RateLimiterMemory::new()); + // let rate_limiter = Box::new(rate_limit::RateLimiterPostgres::new(diesel_database.clone())); + App { - diesel_database: db::diesel_pool(&config.db_url, config.env, diesel_db_config), + diesel_database, github, session_key: config.session_key.clone(), git_repo_checkout: config.git_repo_checkout.clone(), + rate_limiter, config: config.clone(), } } diff --git a/src/controllers/krate/publish.rs b/src/controllers/krate/publish.rs index c88cddcbbb6..4af178ca67e 100644 --- a/src/controllers/krate/publish.rs +++ b/src/controllers/krate/publish.rs @@ -9,6 +9,7 @@ use crate::git; use crate::render; use crate::util::{internal, ChainError, Maximums}; use crate::util::{read_fill, read_le_u32}; +use crate::util::rate_limit::*; use crate::controllers::prelude::*; use crate::models::dependency; @@ -73,6 +74,9 @@ pub fn publish(req: &mut dyn Request) -> CargoResult { ) })?; + // TODO: Do something different if this is a new crate or an existing crate + req.check_rate_limit(&user, RateLimitCategory::PublishCrate)?; + // Create a transaction on the database, if there are no errors, // commit the transactions to record a new or updated crate. conn.transaction(|| { diff --git a/src/util/errors.rs b/src/util/errors.rs index 49454a1d6e9..5362051be3c 100644 --- a/src/util/errors.rs +++ b/src/util/errors.rs @@ -259,6 +259,31 @@ impl fmt::Display for Unauthorized { } } +#[derive(Debug, Clone, Copy)] +pub struct TooManyRequests; + +impl CargoError for TooManyRequests { + fn description(&self) -> &str { + "too many requests" + } + + fn response(&self) -> Option { + let mut response = json_response(&Bad { + errors: vec![StringError { + detail: "too many requests have been seen recently".to_string(), + }], + }); + response.status = (429, "Too Many Requests"); + Some(response) + } +} + +impl fmt::Display for TooManyRequests { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + "too many requests have been sent recently".fmt(f) + } +} + #[derive(Debug)] struct BadRequest(String); diff --git a/src/util/mod.rs b/src/util/mod.rs index a622deab011..a1df2abad07 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -16,6 +16,7 @@ mod io_util; mod request_helpers; mod request_proxy; pub mod rfc3339; +pub mod rate_limit; pub fn json_response(t: &T) -> Response { let json = serde_json::to_string(t).unwrap(); diff --git a/src/util/rate_limit.rs b/src/util/rate_limit.rs new file mode 100644 index 00000000000..656ae62682f --- /dev/null +++ b/src/util/rate_limit.rs @@ -0,0 +1,167 @@ +use conduit::Request; +use std::collections::HashMap; + +use crate::db::{DieselPool}; +use crate::middleware::app::RequestApp; +use crate::models::{User}; +use crate::util::errors::{CargoResult, TooManyRequests}; +use std::sync::{Arc, Mutex}; + +use chrono::{DateTime, Duration, Utc}; + +/// Settings for a rate-limited route. +#[derive(Debug, Clone)] +pub struct RateLimitSettings { + /// The code for this category. Can be stored in a database, etc. + pub key: String, + /// The maximum number of tokens that can be acquired. + pub max_amount: usize, + /// How often we refill + pub refill_time: Duration, + /// The number of tokens that are added during a refill. + pub refill_amount: usize, +} + +/// The result from a rate limit check. +#[derive(Debug, Clone, Copy)] +pub struct RateLimitResult { + /// The remaining number of requests available + remaining: usize, +} + +/// A type that can perform rate limiting. +pub trait RateLimiter { + fn check_limit_multiple(&self, tokens: u32, user: &User, category: RateLimitCategory) -> CargoResult; +} + +/// Rate limit using a postgresql database. +#[allow(missing_debug_implementations)] +#[derive(Clone)] +pub struct RateLimiterPostgres { + diesel_database: DieselPool, +} + +impl RateLimiterPostgres { + /// Create a new postgres rate limiter from the given database pool. + pub fn new(diesel_database: DieselPool) -> RateLimiterPostgres { + RateLimiterPostgres { + diesel_database, + } + } +} + +impl RateLimiter for RateLimiterPostgres { + fn check_limit_multiple(&self, _tokens: u32, _user: &User, _category: RateLimitCategory) -> CargoResult { + let _conn = self.diesel_database.get()?; + + // TODO: Database interaction. + Err(Box::new(TooManyRequests)) + } +} + +type UserId = i32; +type RateLimiterMemoryKey = (UserId, RateLimitCategory); +#[derive(Debug, Clone, Copy)] +struct RateLimiterMemoryValue { + pub value: usize, + // TODO: Time. + pub last_update: DateTime, +} + +/// Rate limit using an internal memory store. This may not be ideal in a load-balanced +/// environment, unless all requests from a user get routed to the same instance. +#[derive(Debug, Clone)] +pub struct RateLimiterMemory { + data: Arc>>, +} + +impl RateLimiterMemory { + /// Create a new memory-based rate limiter + pub fn new() -> RateLimiterMemory { + RateLimiterMemory { + data: Arc::new(Mutex::new(HashMap::new())), + } + } +} + +impl RateLimiter for RateLimiterMemory { + fn check_limit_multiple(&self, tokens: u32, user: &User, category: RateLimitCategory) -> CargoResult { + let mut data = self.data.lock().unwrap(); + let settings = category.settings(); + let now = Utc::now(); + let mut entry = data.entry((user.id, category)).or_insert_with(|| RateLimiterMemoryValue { value: settings.max_amount, last_update: now }); + println!("Previous entry: {:?}", entry); + let mut now2 = now; + let mut refill_count = 0; + while now2 > entry.last_update + settings.refill_time { + now2 = now2 - settings.refill_time; + refill_count += 1; + } + entry.value = std::cmp::min( + settings.max_amount, + entry.value + refill_count * settings.refill_amount); + + entry.last_update = std::cmp::min( + now, + entry.last_update + (settings.refill_time * refill_count as i32)); + + if entry.value < tokens as usize { + return Err(Box::new(TooManyRequests)); + } + + entry.value -= tokens as usize; + + Ok(RateLimitResult { remaining: entry.value }) + } +} + +/// A rate limiter that does not limit at all. +#[derive(Debug, Clone, Copy)] +pub struct RateLimiterUnlimited; + +impl RateLimiter for RateLimiterUnlimited { + fn check_limit_multiple(&self, _tokens: u32, _user: &User, category: RateLimitCategory) -> CargoResult { + println!("Unlimited rate limiter!"); + let settings = category.settings(); + Ok(RateLimitResult { remaining: settings.max_amount }) + } +} + +/// All of the possible rate limit buckets. When rate limiting a new endpoint, add it here and set +/// the settings below. +#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)] +pub enum RateLimitCategory { + PublishCrate, + // How often a new crate can be uploaded + NewCrate, + // How often an uploaded crate can be uploaded + NewVersion, + // How often a request for crate info can be made + CrateInfo, +} + +impl RateLimitCategory { + pub fn settings(&self) -> RateLimitSettings { + use RateLimitCategory::*; + match *self { + PublishCrate => RateLimitSettings { key: "publish-crate".into(), max_amount: 3, refill_time: Duration::seconds(60), refill_amount: 1 }, + NewCrate => RateLimitSettings { key: "new-crate".into(), max_amount: 3, refill_time: Duration::seconds(60), refill_amount: 1 }, + NewVersion => RateLimitSettings { key: "new_version".into(), max_amount: 60, refill_time: Duration::seconds(1), refill_amount: 1 }, + CrateInfo => RateLimitSettings { key: "crate-info".into(), max_amount: 5, refill_time: Duration::seconds(10), refill_amount: 1 }, + } + } +} + +/// A trait that makes it possible to call `check_rate_limit` directly on a request object. +pub trait RequestRateLimit { + /// Check the rate limit for the given endpoint. This function consumes a single token from the + /// token bucket. + fn check_rate_limit(&mut self, user: &User, category: RateLimitCategory) -> CargoResult; +} + +impl RequestRateLimit for T { + fn check_rate_limit(&mut self, user: &User, category: RateLimitCategory) -> CargoResult { + let limiter = &self.app().rate_limiter; + limiter.check_limit_multiple(1, user, category) + } +}