Skip to content

WIP: Rate limit the publish crate endpoint #1676

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions src/app.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Application-wide components in a struct accessible from each request

use crate::{db, util::CargoResult, Config, Env};
use crate::{db, util::CargoResult, util::rate_limit, Config, Env};
use std::{path::PathBuf, sync::Arc, time::Duration};

use diesel::r2d2;
Expand All @@ -24,6 +24,9 @@ pub struct App {
/// Only used in the development environment.
pub git_repo_checkout: PathBuf,

/// The type of rate limiting to do.
pub rate_limiter: Box<dyn rate_limit::RateLimiter + Send + Sync>,

/// The server configuration
pub config: Config,
}
Expand Down Expand Up @@ -84,11 +87,19 @@ impl App {
.connection_customizer(Box::new(connection_config))
.thread_pool(thread_pool);

let diesel_database = db::diesel_pool(&config.db_url, config.env, diesel_db_config);

// TODO: Use environment variable to select rate limiter style until we're confident that
// rate limiting doesn't add extra load.
let rate_limiter = Box::new(rate_limit::RateLimiterMemory::new());
// let rate_limiter = Box::new(rate_limit::RateLimiterPostgres::new(diesel_database.clone()));

App {
diesel_database: db::diesel_pool(&config.db_url, config.env, diesel_db_config),
diesel_database,
github,
session_key: config.session_key.clone(),
git_repo_checkout: config.git_repo_checkout.clone(),
rate_limiter,
config: config.clone(),
}
}
Expand Down
4 changes: 4 additions & 0 deletions src/controllers/krate/publish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::git;
use crate::render;
use crate::util::{internal, ChainError, Maximums};
use crate::util::{read_fill, read_le_u32};
use crate::util::rate_limit::*;

use crate::controllers::prelude::*;
use crate::models::dependency;
Expand Down Expand Up @@ -73,6 +74,9 @@ pub fn publish(req: &mut dyn Request) -> CargoResult<Response> {
)
})?;

// TODO: Do something different if this is a new crate or an existing crate
req.check_rate_limit(&user, RateLimitCategory::PublishCrate)?;

// Create a transaction on the database, if there are no errors,
// commit the transactions to record a new or updated crate.
conn.transaction(|| {
Expand Down
25 changes: 25 additions & 0 deletions src/util/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,31 @@ impl fmt::Display for Unauthorized {
}
}

#[derive(Debug, Clone, Copy)]
pub struct TooManyRequests;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should have a retry after field.


impl CargoError for TooManyRequests {
fn description(&self) -> &str {
"too many requests"
}

fn response(&self) -> Option<Response> {
let mut response = json_response(&Bad {
errors: vec![StringError {
detail: "too many requests have been seen recently".to_string(),
}],
});
response.status = (429, "Too Many Requests");
Some(response)
}
}

impl fmt::Display for TooManyRequests {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
"too many requests have been sent recently".fmt(f)
}
}

#[derive(Debug)]
struct BadRequest(String);

Expand Down
1 change: 1 addition & 0 deletions src/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ mod io_util;
mod request_helpers;
mod request_proxy;
pub mod rfc3339;
pub mod rate_limit;

pub fn json_response<T: Serialize>(t: &T) -> Response {
let json = serde_json::to_string(t).unwrap();
Expand Down
167 changes: 167 additions & 0 deletions src/util/rate_limit.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
use conduit::Request;
use std::collections::HashMap;

use crate::db::{DieselPool};
use crate::middleware::app::RequestApp;
use crate::models::{User};
use crate::util::errors::{CargoResult, TooManyRequests};
use std::sync::{Arc, Mutex};

use chrono::{DateTime, Duration, Utc};

/// Settings for a rate-limited route.
#[derive(Debug, Clone)]
pub struct RateLimitSettings {
/// The code for this category. Can be stored in a database, etc.
pub key: String,
/// The maximum number of tokens that can be acquired.
pub max_amount: usize,
/// How often we refill
pub refill_time: Duration,
/// The number of tokens that are added during a refill.
pub refill_amount: usize,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to make this configurable. It'll always be 1 for the foreseeable future.

}

/// The result from a rate limit check.
#[derive(Debug, Clone, Copy)]
pub struct RateLimitResult {
/// The remaining number of requests available
remaining: usize,
}

/// A type that can perform rate limiting.
pub trait RateLimiter {
fn check_limit_multiple(&self, tokens: u32, user: &User, category: RateLimitCategory) -> CargoResult<RateLimitResult>;
}

/// Rate limit using a postgresql database.
#[allow(missing_debug_implementations)]
#[derive(Clone)]
pub struct RateLimiterPostgres {
diesel_database: DieselPool,
}

impl RateLimiterPostgres {
/// Create a new postgres rate limiter from the given database pool.
pub fn new(diesel_database: DieselPool) -> RateLimiterPostgres {
RateLimiterPostgres {
diesel_database,
}
}
}

impl RateLimiter for RateLimiterPostgres {
fn check_limit_multiple(&self, _tokens: u32, _user: &User, _category: RateLimitCategory) -> CargoResult<RateLimitResult> {
let _conn = self.diesel_database.get()?;

// TODO: Database interaction.
Err(Box::new(TooManyRequests))
}
}

type UserId = i32;
type RateLimiterMemoryKey = (UserId, RateLimitCategory);
#[derive(Debug, Clone, Copy)]
struct RateLimiterMemoryValue {
pub value: usize,
// TODO: Time.
pub last_update: DateTime<Utc>,
}

/// Rate limit using an internal memory store. This may not be ideal in a load-balanced
/// environment, unless all requests from a user get routed to the same instance.
#[derive(Debug, Clone)]
pub struct RateLimiterMemory {
data: Arc<Mutex<HashMap<RateLimiterMemoryKey, RateLimiterMemoryValue>>>,
}

impl RateLimiterMemory {
/// Create a new memory-based rate limiter
pub fn new() -> RateLimiterMemory {
RateLimiterMemory {
data: Arc::new(Mutex::new(HashMap::new())),
}
}
}

impl RateLimiter for RateLimiterMemory {
fn check_limit_multiple(&self, tokens: u32, user: &User, category: RateLimitCategory) -> CargoResult<RateLimitResult> {
let mut data = self.data.lock().unwrap();
let settings = category.settings();
let now = Utc::now();
let mut entry = data.entry((user.id, category)).or_insert_with(|| RateLimiterMemoryValue { value: settings.max_amount, last_update: now });
println!("Previous entry: {:?}", entry);
let mut now2 = now;
let mut refill_count = 0;
while now2 > entry.last_update + settings.refill_time {
now2 = now2 - settings.refill_time;
refill_count += 1;
}
entry.value = std::cmp::min(
settings.max_amount,
entry.value + refill_count * settings.refill_amount);

entry.last_update = std::cmp::min(
now,
entry.last_update + (settings.refill_time * refill_count as i32));

if entry.value < tokens as usize {
return Err(Box::new(TooManyRequests));
}

entry.value -= tokens as usize;

Ok(RateLimitResult { remaining: entry.value })
}
}

/// A rate limiter that does not limit at all.
#[derive(Debug, Clone, Copy)]
pub struct RateLimiterUnlimited;

impl RateLimiter for RateLimiterUnlimited {
fn check_limit_multiple(&self, _tokens: u32, _user: &User, category: RateLimitCategory) -> CargoResult<RateLimitResult> {
println!("Unlimited rate limiter!");
let settings = category.settings();
Ok(RateLimitResult { remaining: settings.max_amount })
}
}

/// All of the possible rate limit buckets. When rate limiting a new endpoint, add it here and set
/// the settings below.
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
pub enum RateLimitCategory {
PublishCrate,
// How often a new crate can be uploaded
NewCrate,
// How often an uploaded crate can be uploaded
NewVersion,
// How often a request for crate info can be made
CrateInfo,
}

impl RateLimitCategory {
pub fn settings(&self) -> RateLimitSettings {
use RateLimitCategory::*;
match *self {
PublishCrate => RateLimitSettings { key: "publish-crate".into(), max_amount: 3, refill_time: Duration::seconds(60), refill_amount: 1 },
NewCrate => RateLimitSettings { key: "new-crate".into(), max_amount: 3, refill_time: Duration::seconds(60), refill_amount: 1 },
NewVersion => RateLimitSettings { key: "new_version".into(), max_amount: 60, refill_time: Duration::seconds(1), refill_amount: 1 },
CrateInfo => RateLimitSettings { key: "crate-info".into(), max_amount: 5, refill_time: Duration::seconds(10), refill_amount: 1 },
}
}
}

/// A trait that makes it possible to call `check_rate_limit` directly on a request object.
pub trait RequestRateLimit {
/// Check the rate limit for the given endpoint. This function consumes a single token from the
/// token bucket.
fn check_rate_limit(&mut self, user: &User, category: RateLimitCategory) -> CargoResult<RateLimitResult>;
}

impl<T: Request + ?Sized> RequestRateLimit for T {
fn check_rate_limit(&mut self, user: &User, category: RateLimitCategory) -> CargoResult<RateLimitResult> {
let limiter = &self.app().rate_limiter;
limiter.check_limit_multiple(1, user, category)
}
}