diff --git a/adapter/src/lib.rs b/adapter/src/lib.rs index cc23f6bca..48180a531 100644 --- a/adapter/src/lib.rs +++ b/adapter/src/lib.rs @@ -3,12 +3,12 @@ #![deny(clippy::match_bool)] use primitives::{Address, BigNum}; +use thiserror::Error; use tiny_keccak::Keccak; use web3::{ ethabi::{encode, token::Token}, types::{Address as EthAddress, U256}, }; -use thiserror::Error; pub use self::dummy::DummyAdapter; pub use self::ethereum::EthereumAdapter; @@ -25,10 +25,7 @@ pub enum AdapterTypes { #[error("{0}")] pub struct BalanceLeafError(String); -pub fn get_signable_state_root( - channel_id: &[u8], - balance_root: &[u8; 32], -) -> [u8; 32] { +pub fn get_signable_state_root(channel_id: &[u8], balance_root: &[u8; 32]) -> [u8; 32] { let tokens = [ Token::FixedBytes(channel_id.to_vec()), Token::FixedBytes(balance_root.to_vec()), diff --git a/primitives/src/adapter.rs b/primitives/src/adapter.rs index 884568566..5879c3be8 100644 --- a/primitives/src/adapter.rs +++ b/primitives/src/adapter.rs @@ -8,12 +8,7 @@ use std::{collections::HashMap, convert::From, fmt}; pub type AdapterResult = Result>; pub trait AdapterErrorKind: fmt::Debug + fmt::Display {} - -#[derive(Debug, Clone, Eq, PartialEq)] -pub struct Deposit { - pub total: BigNum, - pub still_on_create2: BigNum, -} +pub type Deposit = crate::Deposit; #[derive(Debug)] pub enum Error { diff --git a/primitives/src/campaign.rs b/primitives/src/campaign.rs index db1e8b28a..a51c6dfc9 100644 --- a/primitives/src/campaign.rs +++ b/primitives/src/campaign.rs @@ -358,6 +358,8 @@ pub mod validators { #[cfg(feature = "postgres")] mod postgres { + use crate::channel_v5::Channel; + use super::{Active, Campaign, CampaignId, PricingBounds, Validators}; use bytes::BytesMut; use postgres_types::{accepts, to_sql_checked, FromSql, IsNull, Json, ToSql, Type}; @@ -368,7 +370,7 @@ mod postgres { fn from(row: &Row) -> Self { Self { id: row.get("id"), - channel: row.get("channel"), + channel: Channel::from(row), creator: row.get("creator"), budget: row.get("budget"), validators: row.get("validators"), diff --git a/primitives/src/channel.rs b/primitives/src/channel.rs index 84ee2b0ad..acaf6c89e 100644 --- a/primitives/src/channel.rs +++ b/primitives/src/channel.rs @@ -94,6 +94,7 @@ impl FromStr for ChannelId { #[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] #[serde(rename_all = "camelCase")] +#[deprecated = "This is the old V4 Channel"] pub struct Channel { pub id: ChannelId, pub creator: ValidatorId, @@ -161,6 +162,7 @@ impl PricingBounds { #[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] #[serde(rename_all = "camelCase")] +#[deprecated = "This is the old V4 Channel Spec"] pub struct ChannelSpec { #[serde(default, skip_serializing_if = "Option::is_none")] pub title: Option, @@ -206,9 +208,11 @@ pub struct ChannelSpec { #[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)] /// A (leader, follower) tuple +#[deprecated = "This is the old V4 Channel ValidatorSpecs"] pub struct SpecValidators(ValidatorDesc, ValidatorDesc); #[derive(Debug)] +#[deprecated = "This is the old V4 Channel ValidatorSpecs"] pub enum SpecValidator<'a> { Leader(&'a ValidatorDesc), Follower(&'a ValidatorDesc), diff --git a/primitives/src/channel_v5.rs b/primitives/src/channel_v5.rs index af909fb5d..6334b31ca 100644 --- a/primitives/src/channel_v5.rs +++ b/primitives/src/channel_v5.rs @@ -127,65 +127,78 @@ mod test { #[cfg(feature = "postgres")] mod postgres { - use super::Channel; + use super::{Channel, Nonce}; use bytes::BytesMut; - use postgres_types::{accepts, to_sql_checked, FromSql, IsNull, Json, ToSql, Type}; + use postgres_types::{accepts, to_sql_checked, FromSql, IsNull, ToSql, Type}; use std::error::Error; + use tokio_postgres::Row; - impl<'a> FromSql<'a> for Channel { + impl<'a> FromSql<'a> for Nonce { fn from_sql(ty: &Type, raw: &'a [u8]) -> Result> { - let json: Json = FromSql::from_sql(ty, raw)?; + let nonce_string = String::from_sql(ty, raw)?; - Ok(json.0) + Ok(serde_json::from_value(serde_json::Value::String( + nonce_string, + ))?) } - accepts!(JSONB); + accepts!(VARCHAR); } - impl ToSql for Channel { + impl ToSql for Nonce { fn to_sql( &self, ty: &Type, w: &mut BytesMut, ) -> Result> { - Json(self).to_sql(ty, w) + self.0.to_string().to_sql(ty, w) } - accepts!(JSONB); + accepts!(VARCHAR); to_sql_checked!(); } + impl From<&Row> for Channel { + fn from(row: &Row) -> Self { + Self { + leader: row.get("leader"), + follower: row.get("follower"), + guardian: row.get("guardian"), + token: row.get("token"), + nonce: row.get("nonce"), + } + } + } + #[cfg(test)] mod test { - use crate::util::tests::prep_db::{postgres::POSTGRES_POOL, DUMMY_CAMPAIGN}; + use crate::{channel_v5::Nonce, util::tests::prep_db::postgres::POSTGRES_POOL}; #[tokio::test] - async fn channel_to_from_sql() { + async fn nonce_to_from_sql() { let client = POSTGRES_POOL.get().await.unwrap(); - let channel = DUMMY_CAMPAIGN.channel.clone(); - let sql_type = "JSONB"; + let nonce = Nonce::from(123_456_789_u64); + let sql_type = "VARCHAR"; // from SQL { - let channel_json = serde_json::to_string(&channel).expect("Should serialize"); - - let rows = client - .query(&*format!("SELECT '{}'::{}", channel_json, sql_type), &[]) + let row_nonce = client + .query_one(&*format!("SELECT '{}'::{}", nonce, sql_type), &[]) .await - .unwrap(); - let result = rows[0].get(0); + .unwrap() + .get(0); - assert_eq!(&channel, &result); + assert_eq!(&nonce, &row_nonce); } // to SQL { - let rows = client - .query(&*format!("SELECT $1::{}", sql_type), &[&channel]) + let row_nonce = client + .query_one(&*format!("SELECT $1::{}", sql_type), &[&nonce]) .await - .unwrap(); - let result = rows[0].get(0); - assert_eq!(&channel, &result); + .unwrap() + .get(0); + assert_eq!(&nonce, &row_nonce); } } } diff --git a/primitives/src/lib.rs b/primitives/src/lib.rs index 80de5db6d..9ec8d9339 100644 --- a/primitives/src/lib.rs +++ b/primitives/src/lib.rs @@ -1,5 +1,6 @@ #![deny(rust_2018_idioms)] #![deny(clippy::all)] +#![allow(deprecated)] use std::{error, fmt}; pub use self::{ @@ -10,8 +11,10 @@ pub use self::{ balances_map::{BalancesMap, UnifiedMap}, big_num::BigNum, campaign::{Campaign, CampaignId}, - channel::{Channel, ChannelId, ChannelSpec, SpecValidator, SpecValidators}, + channel::{ChannelId, ChannelSpec, SpecValidator, SpecValidators}, + channel_v5::Channel, config::Config, + deposit::Deposit, event_submission::EventSubmission, ipfs::IPFS, unified_num::UnifiedNum, @@ -43,6 +46,43 @@ pub mod targeting; mod unified_num; pub mod validator; +mod deposit { + use crate::{BigNum, UnifiedNum}; + use serde::{Deserialize, Serialize}; + + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] + #[serde(rename_all = "camelCase")] + pub struct Deposit { + pub total: N, + pub still_on_create2: N, + } + + impl Deposit { + pub fn to_precision(&self, precision: u8) -> Deposit { + Deposit { + total: self.total.to_precision(precision), + still_on_create2: self.total.to_precision(precision), + } + } + + pub fn from_precision( + deposit: Deposit, + precision: u8, + ) -> Option> { + let total = UnifiedNum::from_precision(deposit.total, precision); + let still_on_create2 = UnifiedNum::from_precision(deposit.still_on_create2, precision); + + match (total, still_on_create2) { + (Some(total), Some(still_on_create2)) => Some(Deposit { + total, + still_on_create2, + }), + _ => None, + } + } + } +} + pub mod util { pub use api::ApiUrl; diff --git a/primitives/src/sentry.rs b/primitives/src/sentry.rs index e5ae70135..011159949 100644 --- a/primitives/src/sentry.rs +++ b/primitives/src/sentry.rs @@ -1,8 +1,9 @@ use crate::{ balances::BalancesState, + channel::Channel as ChannelOld, spender::Spender, validator::{ApproveState, Heartbeat, MessageTypes, NewState, Type as MessageType}, - Address, Balances, BigNum, Channel, ChannelId, ValidatorId, IPFS, + Address, Balances, BigNum, ChannelId, ValidatorId, IPFS, }; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; @@ -181,16 +182,6 @@ pub struct AggregateEvents { pub event_payouts: HashMap, } -#[derive(Debug, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct ChannelListResponse { - pub channels: Vec, - // TODO: Replace with `Pagination` - pub total_pages: u64, - pub total: u64, - pub page: u64, -} - #[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct Pagination { @@ -210,6 +201,12 @@ pub struct LastApprovedResponse { pub heartbeats: Option>>, } +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct LastApprovedQuery { + pub with_heartbeat: Option, +} + #[derive(Serialize, Deserialize, Debug)] pub struct SuccessResponse { pub success: bool, @@ -244,7 +241,7 @@ pub struct ValidatorMessageResponse { #[derive(Serialize, Deserialize, Debug)] pub struct EventAggregateResponse { - pub channel: Channel, + pub channel: ChannelOld, pub events: Vec, } @@ -304,32 +301,28 @@ impl fmt::Display for ChannelReport { } pub mod channel_list { - use crate::ValidatorId; - use chrono::{serde::ts_seconds, DateTime, Utc}; + use crate::{channel_v5::Channel, ValidatorId}; use serde::{Deserialize, Serialize}; + use super::Pagination; + + #[derive(Debug, Serialize, Deserialize)] + #[serde(rename_all = "camelCase")] + pub struct ChannelListResponse { + pub channels: Vec, + #[serde(flatten)] + pub pagination: Pagination, + } + #[derive(Debug, Serialize, Deserialize)] pub struct ChannelListQuery { - #[serde(default = "default_page")] + #[serde(default)] + // default is `u64::default()` = `0` pub page: u64, - /// filters the list on `valid_until >= valid_until_ge` - /// It should be the same timestamp format as the `Channel.valid_until`: **seconds** - #[serde(with = "ts_seconds", default = "Utc::now", rename = "validUntil")] - pub valid_until_ge: DateTime, pub creator: Option, /// filters the channels containing a specific validator if provided pub validator: Option, } - - #[derive(Debug, Deserialize)] - #[serde(rename_all = "camelCase")] - pub struct LastApprovedQuery { - pub with_heartbeat: Option, - } - - fn default_page() -> u64 { - 0 - } } pub mod campaign { diff --git a/primitives/src/spender.rs b/primitives/src/spender.rs index 365167174..e220f9ef9 100644 --- a/primitives/src/spender.rs +++ b/primitives/src/spender.rs @@ -1,14 +1,6 @@ -use crate::{channel_v5::Channel, Address, BalancesMap, UnifiedNum}; -use chrono::{DateTime, Utc}; +use crate::{channel_v5::Channel, Address, Deposit, UnifiedNum}; use serde::{Deserialize, Serialize}; -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -#[serde(rename_all = "camelCase")] -pub struct Deposit { - pub total: UnifiedNum, - pub still_on_create2: UnifiedNum, -} - #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] pub struct SpenderLeaf { @@ -26,27 +18,19 @@ pub struct Spender { pub struct Spendable { pub spender: Address, pub channel: Channel, - pub deposit: Deposit, + pub deposit: Deposit, } -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct Aggregate { - pub spender: Address, - pub channel: Channel, - pub balances: BalancesMap, - pub created: DateTime, -} #[cfg(feature = "postgres")] mod postgres { use super::*; use tokio_postgres::Row; - impl From for Spendable { - fn from(row: Row) -> Self { + impl From<&Row> for Spendable { + fn from(row: &Row) -> Self { Self { spender: row.get("spender"), - channel: row.get("channel"), + channel: Channel::from(row), deposit: Deposit { total: row.get("total"), still_on_create2: row.get("still_on_create2"), diff --git a/primitives/src/util/tests/prep_db.rs b/primitives/src/util/tests/prep_db.rs index f0dcfcf94..9deb29339 100644 --- a/primitives/src/util/tests/prep_db.rs +++ b/primitives/src/util/tests/prep_db.rs @@ -1,7 +1,7 @@ use crate::{ campaign::{self, Active, Validators}, channel::{Pricing, PricingBounds}, - channel_v5::{self, Nonce}, + channel_v5::Nonce, targeting::Rules, AdUnit, Address, BigNum, Campaign, Channel, ChannelId, ChannelSpec, EventSubmission, SpecValidators, UnifiedNum, ValidatorDesc, ValidatorId, IPFS, @@ -86,7 +86,7 @@ lazy_static! { pub static ref DUMMY_CAMPAIGN: Campaign = { Campaign { id: "0x936da01f9abd4d9d80c702af85c822a8".parse().expect("Should parse"), - channel: channel_v5::Channel { + channel: Channel { leader: IDS["leader"], follower: IDS["follower"], guardian: IDS["tester"].to_address(), @@ -110,10 +110,10 @@ lazy_static! { } }; - pub static ref DUMMY_CHANNEL: Channel = { + pub static ref DUMMY_CHANNEL: crate::channel::Channel = { let nonce = BigNum::from(::between(100_000_000, 999_999_999)); - Channel { + crate::channel::Channel { id: ChannelId::from_hex("061d5e2a67d0a9a10f1c732bca12a676d83f79663a396f7d87b3e30b9b411088").expect("prep_db: failed to deserialize channel id"), creator: ValidatorId::try_from("033ed90e0fec3f3ea1c9b005c724d704501e0196").expect("Should be valid ValidatorId"), deposit_asset: "0x89d24A6b4CcB1B6fAA2625fE562bDD9a23260359".to_string(), diff --git a/sentry/migrations/20190806011140_initial-tables/up.sql b/sentry/migrations/20190806011140_initial-tables/up.sql index f611fafd0..19e46e586 100644 --- a/sentry/migrations/20190806011140_initial-tables/up.sql +++ b/sentry/migrations/20190806011140_initial-tables/up.sql @@ -1,7 +1,22 @@ +CREATE TABLE channels ( + id varchar(66) NOT NULL, + leader varchar(42) NOT NULL, + follower varchar(42) NOT NULL, + guardian varchar(42) NOT NULL, + token varchar(42) NOT NULL, + -- Using varchar for U256 for simplicity + nonce varchar(78) NOT NULL, + -- In order to be able to order the channels for the `GET channel` request + created timestamp(2) with time zone NOT NULL, + -- Do not rename the Primary key constraint (`channels_pkey`)! + PRIMARY KEY (id) +); + +CREATE INDEX idx_channel_created ON channels (created); + CREATE TABLE campaigns ( id varchar(34) NOT NULL, channel_id varchar(66) NOT NULL, - channel jsonb NOT NULL, creator varchar(42) NOT NULL, budget bigint NOT NULL, validators jsonb NOT NULL, @@ -13,7 +28,8 @@ CREATE TABLE campaigns ( created timestamp(2) with time zone NOT NULL, active_from timestamp(2) with time zone NULL, active_to timestamp(2) with time zone NOT NULL, - PRIMARY KEY (id) + PRIMARY KEY (id), + CONSTRAINT fk_campaigns_channel_id FOREIGN KEY (channel_id) REFERENCES channels (id) ON DELETE RESTRICT ON UPDATE RESTRICT ); CREATE INDEX idx_campaign_active_to ON campaigns (active_to); @@ -25,18 +41,18 @@ CREATE INDEX idx_campaign_created ON campaigns (created); CREATE TABLE spendable ( spender varchar(42) NOT NULL, channel_id varchar(66) NOT NULL, - channel jsonb NOT NULL, total bigint NOT NULL, still_on_create2 bigint NOT NULL, - PRIMARY KEY (spender, channel_id) + PRIMARY KEY (spender, channel_id), + CONSTRAINT fk_spendable_channel_id FOREIGN KEY (channel_id) REFERENCES channels (id) ON DELETE RESTRICT ON UPDATE RESTRICT ); CREATE TABLE validator_messages ( - -- TODO: Should the validator message be reference to channel_id or campaign_id? channel_id varchar(66) NOT NULL, "from" varchar(255) NOT NULL, msg jsonb NOT NULL, - received timestamp(2) with time zone NOT NULL + received timestamp(2) with time zone NOT NULL, + CONSTRAINT fk_validator_messages_channel_id FOREIGN KEY (channel_id) REFERENCES channels (id) ON DELETE RESTRICT ON UPDATE RESTRICT ); CREATE INDEX idx_validator_messages_received ON validator_messages (received); @@ -55,20 +71,19 @@ CREATE INDEX idx_validator_messages_msg_state_root ON validator_messages ((msg - -- count varchar NOT NULL, -- payout varchar NOT NULL -- ); - -- CREATE INDEX idx_event_aggregates_created ON event_aggregates (created); - -- CREATE INDEX idx_event_aggregates_channel ON event_aggregates (channel_id); - -- CREATE INDEX idx_event_aggregates_event_type ON event_aggregates (event_type); - CREATE AGGREGATE jsonb_object_agg (jsonb) ( SFUNC = 'jsonb_concat', STYPE = jsonb, INITCOND = '{}' ); -CREATE TYPE AccountingSide AS ENUM ('Earner', 'Spender'); +CREATE TYPE AccountingSide AS ENUM ( + 'Earner', + 'Spender' +); CREATE TABLE accounting ( channel_id varchar(66) NOT NULL, @@ -77,7 +92,7 @@ CREATE TABLE accounting ( amount bigint NOT NULL, updated timestamp(2) with time zone DEFAULT NULL NULL, created timestamp(2) with time zone NOT NULL, - -- Do not rename the Primary key constraint (`accounting_pkey`)! - PRIMARY KEY (channel_id, side, "address") -); \ No newline at end of file + PRIMARY KEY (channel_id, side, "address"), + CONSTRAINT fk_accounting_channel_id FOREIGN KEY (channel_id) REFERENCES channels (id) ON DELETE RESTRICT ON UPDATE RESTRICT +); diff --git a/sentry/src/analytics_recorder.rs b/sentry/src/analytics_recorder.rs index 3d2d07d3c..ad9876373 100644 --- a/sentry/src/analytics_recorder.rs +++ b/sentry/src/analytics_recorder.rs @@ -101,13 +101,13 @@ pub async fn record( ) .ignore(); db.zincr( - format!("{}:{}:{}", ChannelReport::Hostname, event, channel.id), + format!("{}:{}:{}", ChannelReport::Hostname, event, channel.id()), hostname, 1, ) .ignore(); db.zincr( - format!("{}:{}:{}", ChannelReport::HostnamePay, event, channel.id), + format!("{}:{}:{}", ChannelReport::HostnamePay, event, channel.id()), hostname, 1, ) diff --git a/sentry/src/db.rs b/sentry/src/db.rs index c238598cd..bf9dbf580 100644 --- a/sentry/src/db.rs +++ b/sentry/src/db.rs @@ -310,7 +310,8 @@ pub mod tests_postgres { .await? .simple_query(&queries) .await - .map_err(|err| PoolError::Backend(err))?; + .map_err(|err| PoolError::Backend(err)) + .expect("Should not error"); assert_eq!(2, result.len()); assert!(matches!(result[0], SimpleQueryMessage::CommandComplete(..))); assert!(matches!(result[1], SimpleQueryMessage::CommandComplete(..))); diff --git a/sentry/src/db/accounting.rs b/sentry/src/db/accounting.rs index 13d004890..63bc6a0f0 100644 --- a/sentry/src/db/accounting.rs +++ b/sentry/src/db/accounting.rs @@ -173,7 +173,10 @@ pub async fn spend_amount( mod test { use primitives::util::tests::prep_db::{ADDRESSES, DUMMY_CAMPAIGN}; - use crate::db::tests_postgres::{setup_test_migrations, DATABASE_POOL}; + use crate::db::{ + insert_channel, + tests_postgres::{setup_test_migrations, DATABASE_POOL}, + }; use super::*; @@ -184,8 +187,12 @@ mod test { setup_test_migrations(database.pool.clone()) .await .expect("Migrations should succeed"); + // insert the channel into the DB + let channel = insert_channel(&database.pool, DUMMY_CAMPAIGN.channel) + .await + .expect("Should insert"); - let channel_id = DUMMY_CAMPAIGN.channel.id(); + let channel_id = channel.id(); let earner = ADDRESSES["publisher"]; let spender = ADDRESSES["creator"]; @@ -376,7 +383,12 @@ mod test { .await .expect("Migrations should succeed"); - let channel_id = DUMMY_CAMPAIGN.channel.id(); + // insert the channel into the DB + let channel = insert_channel(&database.pool, DUMMY_CAMPAIGN.channel) + .await + .expect("Should insert"); + + let channel_id = channel.id(); let earner = ADDRESSES["publisher"]; let spender = ADDRESSES["creator"]; let spender_as_earner = spender; @@ -458,7 +470,12 @@ mod test { .await .expect("Migrations should succeed"); - let channel_id = DUMMY_CAMPAIGN.channel.id(); + // insert the channel into the DB + let channel = insert_channel(&database.pool, DUMMY_CAMPAIGN.channel) + .await + .expect("Should insert"); + + let channel_id = channel.id(); let earner = ADDRESSES["publisher"]; let other_earner = ADDRESSES["publisher2"]; let spender = ADDRESSES["creator"]; diff --git a/sentry/src/db/campaign.rs b/sentry/src/db/campaign.rs index ea24ec6f7..1074dc78e 100644 --- a/sentry/src/db/campaign.rs +++ b/sentry/src/db/campaign.rs @@ -5,20 +5,19 @@ use tokio_postgres::types::Json; pub use campaign_remaining::CampaignRemaining; /// ```text -/// INSERT INTO campaigns (id, channel_id, channel, creator, budget, validators, title, pricing_bounds, event_submission, ad_units, targeting_rules, created, active_from, active_to) -/// VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14) +/// INSERT INTO campaigns (id, channel_id, creator, budget, validators, title, pricing_bounds, event_submission, ad_units, targeting_rules, created, active_from, active_to) +/// VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) /// ``` pub async fn insert_campaign(pool: &DbPool, campaign: &Campaign) -> Result { let client = pool.get().await?; let ad_units = Json(campaign.ad_units.clone()); - let stmt = client.prepare("INSERT INTO campaigns (id, channel_id, channel, creator, budget, validators, title, pricing_bounds, event_submission, ad_units, targeting_rules, created, active_from, active_to) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)").await?; + let stmt = client.prepare("INSERT INTO campaigns (id, channel_id, creator, budget, validators, title, pricing_bounds, event_submission, ad_units, targeting_rules, created, active_from, active_to) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)").await?; let inserted = client .execute( &stmt, &[ &campaign.id, &campaign.channel.id(), - &campaign.channel, &campaign.creator, &campaign.budget, &campaign.validators, @@ -39,15 +38,20 @@ pub async fn insert_campaign(pool: &DbPool, campaign: &Campaign) -> Result Result, PoolError> { let client = pool.get().await?; - let statement = client.prepare("SELECT id, channel, creator, budget, validators, title, pricing_bounds, event_submission, ad_units, targeting_rules, created, active_from, active_to FROM campaigns WHERE id = $1").await?; + // TODO: Check and update + let statement = client.prepare("SELECT campaigns.id, creator, budget, validators, title, pricing_bounds, event_submission, ad_units, targeting_rules, campaigns.created, active_from, active_to, channels.leader, channels.follower, channels.guardian, channels.token, channels.nonce FROM campaigns INNER JOIN channels + ON campaigns.channel_id=channels.id WHERE campaigns.id = $1").await?; let row = client.query_opt(&statement, &[&campaign]).await?; @@ -56,15 +60,18 @@ pub async fn fetch_campaign( // TODO: We might need to use LIMIT to implement pagination /// ```text -/// SELECT id, channel, creator, budget, validators, title, pricing_bounds, event_submission, ad_units, targeting_rules, created, active_from, active_to -/// FROM campaigns WHERE channel_id = $1 +/// SELECT campaigns.id, creator, budget, validators, title, pricing_bounds, event_submission, ad_units, targeting_rules, campaigns.created, active_from, active_to, +/// channels.leader, channels.follower, channels.guardian, channels.token, channels.nonce +/// FROM campaigns INNER JOIN channels +/// ON campaigns.channel_id=channels.id WHERE campaigns.channel_id = $1 /// ``` pub async fn get_campaigns_by_channel( pool: &DbPool, channel_id: &ChannelId, ) -> Result, PoolError> { let client = pool.get().await?; - let statement = client.prepare("SELECT id, channel, creator, budget, validators, title, pricing_bounds, event_submission, ad_units, targeting_rules, created, active_from, active_to FROM campaigns WHERE channel_id = $1").await?; + let statement = client.prepare("SELECT campaigns.id, creator, budget, validators, title, pricing_bounds, event_submission, ad_units, targeting_rules, campaigns.created, active_from, active_to, channels.leader, channels.follower, channels.guardian, channels.token, channels.nonce FROM campaigns INNER JOIN channels + ON campaigns.channel_id=channels.id WHERE campaigns.channel_id = $1").await?; let rows = client.query(&statement, &[&channel_id]).await?; @@ -75,13 +82,14 @@ pub async fn get_campaigns_by_channel( /// ```text /// UPDATE campaigns SET budget = $1, validators = $2, title = $3, pricing_bounds = $4, event_submission = $5, ad_units = $6, targeting_rules = $7 -/// WHERE id = $8 -/// RETURNING id, channel, creator, budget, validators, title, pricing_bounds, event_submission, ad_units, targeting_rules, created, active_from, active_to +/// FROM channels WHERE campaigns.id = $8 AND campaigns.channel_id=channels.id +/// RETURNING campaigns.id, creator, budget, validators, title, pricing_bounds, event_submission, ad_units, targeting_rules, campaigns.created, active_from, active_to, +/// channels.leader, channels.follower, channels.guardian, channels.token, channels.nonce /// ``` pub async fn update_campaign(pool: &DbPool, campaign: &Campaign) -> Result { let client = pool.get().await?; let statement = client - .prepare("UPDATE campaigns SET budget = $1, validators = $2, title = $3, pricing_bounds = $4, event_submission = $5, ad_units = $6, targeting_rules = $7 WHERE id = $8 RETURNING id, channel, creator, budget, validators, title, pricing_bounds, event_submission, ad_units, targeting_rules, created, active_from, active_to") + .prepare("UPDATE campaigns SET budget = $1, validators = $2, title = $3, pricing_bounds = $4, event_submission = $5, ad_units = $6, targeting_rules = $7 FROM channels WHERE campaigns.id = $8 AND campaigns.channel_id=channels.id RETURNING campaigns.id, creator, budget, validators, title, pricing_bounds, event_submission, ad_units, targeting_rules, campaigns.created, active_from, active_to, channels.leader, channels.follower, channels.guardian, channels.token, channels.nonce") .await?; let ad_units = Json(&campaign.ad_units); @@ -389,7 +397,10 @@ mod test { use std::time::Duration; use tokio_postgres::error::SqlState; - use crate::db::tests_postgres::{setup_test_migrations, DATABASE_POOL}; + use crate::db::{ + insert_channel, + tests_postgres::{setup_test_migrations, DATABASE_POOL}, + }; use super::*; @@ -403,6 +414,11 @@ mod test { let campaign = DUMMY_CAMPAIGN.clone(); + // insert the channel into the DB + let _channel = insert_channel(&database.pool, DUMMY_CAMPAIGN.channel) + .await + .expect("Should insert"); + let non_existent_campaign = fetch_campaign(database.pool.clone(), &campaign.id) .await .expect("Should fetch successfully"); diff --git a/sentry/src/db/channel.rs b/sentry/src/db/channel.rs index 6aaadfc36..6e4ca7591 100644 --- a/sentry/src/db/channel.rs +++ b/sentry/src/db/channel.rs @@ -1,6 +1,5 @@ use chrono::Utc; -use primitives::{targeting::Rules, validator::MessageTypes, Channel, ChannelId, ValidatorId}; -use std::str::FromStr; +use primitives::{validator::MessageTypes, Channel, ChannelId, ValidatorId}; pub use list_channels::list_channels; @@ -12,72 +11,63 @@ pub async fn get_channel_by_id( ) -> Result, PoolError> { let client = pool.get().await?; - let select = client.prepare("SELECT id, creator, deposit_asset, deposit_amount, valid_until, targeting_rules, spec, exhausted FROM channels WHERE id = $1 LIMIT 1").await?; + let select = client + .prepare("SELECT leader, follower, guardian, token, nonce FROM channels WHERE id = $1 LIMIT 1") + .await?; - let results = client.query(&select, &[&id]).await?; + let row = client.query_opt(&select, &[&id]).await?; - Ok(results.get(0).map(Channel::from)) + Ok(row.as_ref().map(Channel::from)) } pub async fn get_channel_by_id_and_validator( pool: &DbPool, - id: &ChannelId, - validator_id: &ValidatorId, + id: ChannelId, + validator: ValidatorId, ) -> Result, PoolError> { let client = pool.get().await?; - let validator = serde_json::Value::from_str(&format!(r#"[{{"id": "{}"}}]"#, validator_id)) - .expect("Not a valid json"); - let query = "SELECT id, creator, deposit_asset, deposit_amount, valid_until, targeting_rules, spec, exhausted FROM channels WHERE id = $1 AND spec->'validators' @> $2 LIMIT 1"; + let query = "SELECT leader, follower, guardian, token, nonce FROM channels WHERE id = $1 AND (leader = $2 OR follower = $2) LIMIT 1"; let select = client.prepare(query).await?; - let results = client.query(&select, &[&id, &validator]).await?; + let row = client.query_opt(&select, &[&id, &validator]).await?; - Ok(results.get(0).map(Channel::from)) + Ok(row.as_ref().map(Channel::from)) } -pub async fn insert_channel(pool: &DbPool, channel: &Channel) -> Result { +/// Used to insert/get Channel when creating a Campaign +/// If channel already exists it will return it instead. +/// This call should never trigger a `SqlState::UNIQUE_VIOLATION` +/// +/// ```sql +/// INSERT INTO channels (id, leader, follower, guardian, token, nonce, created) +/// VALUES ($1, $2, $3, $4, $5, $6, NOW()) +/// ON CONFLICT ON CONSTRAINT channels_pkey DO UPDATE SET created=EXCLUDED.created +/// RETURNING leader, follower, guardian, token, nonce +/// ``` +pub async fn insert_channel(pool: &DbPool, channel: Channel) -> Result { let client = pool.get().await?; - let stmt = client.prepare("INSERT INTO channels (id, creator, deposit_asset, deposit_amount, valid_until, targeting_rules, spec, exhausted) values ($1, $2, $3, $4, $5, $6, $7, $8)").await?; + // We use `EXCLUDED.created` in order to have to DO UPDATE otherwise it does not return the fields + // when there is a CONFLICT + let stmt = client.prepare("INSERT INTO channels (id, leader, follower, guardian, token, nonce, created) VALUES ($1, $2, $3, $4, $5, $6, NOW()) + ON CONFLICT ON CONSTRAINT channels_pkey DO UPDATE SET created=EXCLUDED.created RETURNING leader, follower, guardian, token, nonce").await?; let row = client - .execute( + .query_one( &stmt, &[ - &channel.id, - &channel.creator, - &channel.deposit_asset, - &channel.deposit_amount, - &channel.valid_until, - &channel.targeting_rules, - &channel.spec, - &channel.exhausted, + &channel.id(), + &channel.leader, + &channel.follower, + &channel.guardian, + &channel.token, + &channel.nonce, ], ) .await?; - let inserted = row == 1; - Ok(inserted) -} - -#[deprecated(note = "AIP#61 now uses the modify Campaign route for updating targeting rules")] -pub async fn update_targeting_rules( - pool: &DbPool, - channel_id: &ChannelId, - targeting_rules: &Rules, -) -> Result { - let client = pool.get().await?; - - let stmt = client - .prepare("UPDATE channels SET targeting_rules=$1 WHERE id=$2") - .await?; - let row = client - .execute(&stmt, &[&targeting_rules, &channel_id]) - .await?; - - let updated = row == 1; - Ok(updated) + Ok(Channel::from(&row)) } pub async fn insert_validator_messages( @@ -93,7 +83,7 @@ pub async fn insert_validator_messages( let row = client .execute( &stmt, - &[&channel.id, &from, &validator_message, &Utc::now()], + &[&channel.id(), &from, &validator_message, &Utc::now()], ) .await?; @@ -101,30 +91,14 @@ pub async fn insert_validator_messages( Ok(inserted) } -#[deprecated = "No longer needed for V5"] -pub async fn update_exhausted_channel( - pool: &DbPool, - channel: &Channel, - index: u32, -) -> Result { - let client = pool.get().await?; - - let stmt = client - .prepare("UPDATE channels SET exhausted[$1] = true WHERE id = $2") - .await?; - // WARNING: By default PostgreSQL uses a one-based numbering convention for arrays, that is, an array of n elements starts with array[1] and ends with array[n]. - // this is why we add +1 to the index - let row = client.execute(&stmt, &[&(index + 1), &channel.id]).await?; - - let updated = row == 1; - Ok(updated) -} - mod list_channels { - use chrono::{DateTime, Utc}; - use primitives::{sentry::ChannelListResponse, Channel, ValidatorId}; + use primitives::{ + channel_v5::Channel, + sentry::{channel_list::ChannelListResponse, Pagination}, + ValidatorId, + }; use std::str::FromStr; - use tokio_postgres::types::{accepts, FromSql, ToSql, Type}; + use tokio_postgres::types::{accepts, FromSql, Type}; use crate::db::{DbPool, PoolError}; @@ -143,33 +117,40 @@ mod list_channels { accepts!(VARCHAR, TEXT); } + /// Lists the `Channel`s in `ASC` order. + /// This makes sure that if a new `Channel` is added + // while we are scrolling through the pages it will not alter the `Channel`s ordering pub async fn list_channels( pool: &DbPool, skip: u64, limit: u32, - creator: &Option, - validator: &Option, - valid_until_ge: &DateTime, + validator: Option, ) -> Result { let client = pool.get().await?; - let validator = validator.as_ref().map(|validator_id| { - serde_json::Value::from_str(&format!(r#"[{{"id": "{}"}}]"#, validator_id)) - .expect("Not a valid json") - }); - let (where_clauses, params) = - channel_list_query_params(creator, validator.as_ref(), valid_until_ge); - let total_count_params = (where_clauses.clone(), params.clone()); - // To understand why we use Order by, see Postgres Documentation: https://www.postgresql.org/docs/8.1/queries-limit.html - let statement = format!("SELECT id, creator, deposit_asset, deposit_amount, valid_until, targeting_rules, spec, exhausted FROM channels WHERE {} ORDER BY spec->>'created' DESC LIMIT {} OFFSET {}", where_clauses.join(" AND "), limit, skip); - let stmt = client.prepare(&statement).await?; + let rows = match validator { + Some(validator) => { + let where_clause = "(leader = $1 OR follower = $1)".to_string(); + + let statement = format!("SELECT leader, follower, guardian, token, nonce, created FROM channels WHERE {} ORDER BY created ASC LIMIT {} OFFSET {}", + where_clause, limit, skip); + let stmt = client.prepare(&statement).await?; + + client.query(&stmt, &[&validator.to_string()]).await? + } + None => { + let statement = format!("SELECT id, leader, follower, guardian, token, nonce, created FROM channels ORDER BY created ASC LIMIT {} OFFSET {}", + limit, skip); + let stmt = client.prepare(&statement).await?; + + client.query(&stmt, &[]).await? + } + }; - let rows = client.query(&stmt, params.as_slice()).await?; let channels = rows.iter().map(Channel::from).collect(); - let total_count = - list_channels_total_count(pool, (&total_count_params.0, total_count_params.1)).await?; + let total_count = list_channels_total_count(pool, validator).await?; // fast ceil for total_pages let total_pages = if total_count == 0 { @@ -179,47 +160,83 @@ mod list_channels { }; Ok(ChannelListResponse { - total_pages, - total: total_pages, - page: skip / limit as u64, channels, + pagination: Pagination { + total_pages, + total: total_pages, + page: skip / limit as u64, + }, }) } async fn list_channels_total_count<'a>( pool: &DbPool, - (where_clauses, params): (&'a [String], Vec<&'a (dyn ToSql + Sync)>), + validator: Option, ) -> Result { let client = pool.get().await?; - let statement = format!( - "SELECT COUNT(id)::varchar FROM channels WHERE {}", - where_clauses.join(" AND ") - ); - let stmt = client.prepare(&statement).await?; - let row = client.query_one(&stmt, params.as_slice()).await?; + let row = match validator { + Some(validator) => { + let where_clause = "(leader = $1 OR follower = $1)".to_string(); + + let statement = format!( + "SELECT COUNT(id)::varchar FROM channels WHERE {}", + where_clause + ); + let stmt = client.prepare(&statement).await?; + + client.query_one(&stmt, &[&validator.to_string()]).await? + } + None => { + let statement = "SELECT COUNT(id)::varchar FROM channels"; + let stmt = client.prepare(statement).await?; + + client.query_one(&stmt, &[]).await? + } + }; Ok(row.get::<_, TotalCount>(0).0) } +} - fn channel_list_query_params<'a>( - creator: &'a Option, - validator: Option<&'a serde_json::Value>, - valid_until_ge: &'a DateTime, - ) -> (Vec, Vec<&'a (dyn ToSql + Sync)>) { - let mut where_clauses = vec!["valid_until >= $1".to_string()]; - let mut params: Vec<&(dyn ToSql + Sync)> = vec![valid_until_ge]; - - if let Some(creator) = creator { - where_clauses.push(format!("creator = ${}", params.len() + 1)); - params.push(creator); - } +#[cfg(test)] +mod test { + use primitives::util::tests::prep_db::DUMMY_CAMPAIGN; - if let Some(validator) = validator { - where_clauses.push(format!("spec->'validators' @> ${}", params.len() + 1)); - params.push(validator); - } + use crate::db::{ + insert_channel, + tests_postgres::{setup_test_migrations, DATABASE_POOL}, + }; + + use super::list_channels::list_channels; + + #[tokio::test] + async fn insert_and_list_channels_return_channels() { + let database = DATABASE_POOL.get().await.expect("Should get database"); + setup_test_migrations(database.pool.clone()) + .await + .expect("Should setup migrations"); + + let actual_channel = { + let insert = insert_channel(&database.pool, DUMMY_CAMPAIGN.channel) + .await + .expect("Should insert Channel"); + + // once inserted, the channel should only be returned by the function + let only_select = insert_channel(&database.pool, DUMMY_CAMPAIGN.channel) + .await + .expect("Should run insert with RETURNING on the Channel"); + + assert_eq!(insert, only_select); + + only_select + }; + + let response = list_channels(&database.pool, 0, 10, None) + .await + .expect("Should list Channels"); - (where_clauses, params) + assert_eq!(1, response.channels.len()); + assert_eq!(DUMMY_CAMPAIGN.channel, actual_channel); } } diff --git a/sentry/src/db/event_aggregate.rs b/sentry/src/db/event_aggregate.rs index 8f848adf4..89b9404e7 100644 --- a/sentry/src/db/event_aggregate.rs +++ b/sentry/src/db/event_aggregate.rs @@ -2,10 +2,11 @@ use chrono::{DateTime, Utc}; use futures::pin_mut; use primitives::{ balances::UncheckedState, + channel::Channel as ChannelOld, channel_v5::Channel as ChannelV5, sentry::{EventAggregate, MessageResponse}, validator::{ApproveState, Heartbeat, NewState}, - Address, BigNum, Channel, ChannelId, ValidatorId, + Address, BigNum, ChannelId, ValidatorId, }; use std::{convert::TryFrom, ops::Add}; use tokio_postgres::{ @@ -17,7 +18,7 @@ use super::{DbPool, PoolError}; pub async fn latest_approve_state( pool: &DbPool, - channel: &Channel, + channel: &ChannelOld, ) -> Result>, PoolError> { let client = pool.get().await?; @@ -54,7 +55,7 @@ pub async fn latest_approve_state_v5( pub async fn latest_new_state( pool: &DbPool, - channel: &Channel, + channel: &ChannelOld, state_root: &str, ) -> Result>>, PoolError> { let client = pool.get().await?; diff --git a/sentry/src/db/spendable.rs b/sentry/src/db/spendable.rs index f3f18fb05..2aea1d7ad 100644 --- a/sentry/src/db/spendable.rs +++ b/sentry/src/db/spendable.rs @@ -3,12 +3,12 @@ use primitives::{spender::Spendable, Address, ChannelId}; use super::{DbPool, PoolError}; /// ```text -/// INSERT INTO spendable (spender, channel_id, channel, total, still_on_create2) -/// values ('0xce07CbB7e054514D590a0262C93070D838bFBA2e', '0x061d5e2a67d0a9a10f1c732bca12a676d83f79663a396f7d87b3e30b9b411088', '{}', 10.00000000, 2.00000000); +/// INSERT INTO spendable (spender, channel_id, total, still_on_create2) +/// values ('0xce07CbB7e054514D590a0262C93070D838bFBA2e', '0x061d5e2a67d0a9a10f1c732bca12a676d83f79663a396f7d87b3e30b9b411088', 10.00000000, 2.00000000); /// ``` pub async fn insert_spendable(pool: DbPool, spendable: &Spendable) -> Result { let client = pool.get().await?; - let stmt = client.prepare("INSERT INTO spendable (spender, channel_id, channel, total, still_on_create2) values ($1, $2, $3, $4, $5)").await?; + let stmt = client.prepare("INSERT INTO spendable (spender, channel_id, total, still_on_create2) values ($1, $2, $3, $4)").await?; let row = client .execute( @@ -16,7 +16,6 @@ pub async fn insert_spendable(pool: DbPool, spendable: &Spendable) -> Result Result Result, PoolError> { let client = pool.get().await?; - let statement = client.prepare("SELECT spender, channel_id, channel, total, still_on_create2 FROM spendable WHERE spender = $1 AND channel_id = $2").await?; + let statement = client.prepare("SELECT spender, total, still_on_create2, channels.leader, channels.follower, channels.guardian, channels.token, channels.nonce FROM spendable INNER JOIN channels ON channels.id = spendable.channel_id WHERE spender = $1 AND channel_id = $2").await?; let row = client.query_opt(&statement, &[spender, channel_id]).await?; - Ok(row.map(Spendable::from)) + Ok(row.as_ref().map(Spendable::from)) } -static GET_ALL_SPENDERS_STATEMENT: &str = "SELECT spender, channel_id, channel, total, still_on_create2 FROM spendable WHERE channel_id = $1"; +static GET_ALL_SPENDERS_STATEMENT: &str = "SELECT spender, total, still_on_create2, channels.leader, channels.follower, channels.guardian, channels.token, channels.nonce FROM spendable INNER JOIN channels ON channels.id = spendable.channel_id WHERE channel_id = $1"; // TODO: Include pagination pub async fn get_all_spendables_for_channel( @@ -55,12 +53,11 @@ pub async fn get_all_spendables_for_channel( let statement = client.prepare(GET_ALL_SPENDERS_STATEMENT).await?; let rows = client.query(&statement, &[channel_id]).await?; - let spendables: Vec = rows.into_iter().map(Spendable::from).collect(); - Ok(spendables) + Ok(rows.iter().map(Spendable::from).collect()) } -static UPDATE_SPENDABLE_STATEMENT: &str = "INSERT INTO spendable(spender, channel_id, channel, total, still_on_create2) VALUES($1, $2, $3, $4, $5) ON CONFLICT ON CONSTRAINT spendable_pkey DO UPDATE SET total = $4, still_on_create2 = $5 WHERE spendable.spender = $1 AND spendable.channel_id = $2 RETURNING spender, channel_id, channel, total, still_on_create2"; +static UPDATE_SPENDABLE_STATEMENT: &str = "WITH inserted_spendable AS (INSERT INTO spendable(spender, channel_id, total, still_on_create2) VALUES($1, $2, $3, $4) ON CONFLICT ON CONSTRAINT spendable_pkey DO UPDATE SET total = $3, still_on_create2 = $4 WHERE spendable.spender = $1 AND spendable.channel_id = $2 RETURNING *) SELECT inserted_spendable.*, channels.leader, channels.follower, channels.guardian, channels.token, channels.nonce FROM inserted_spendable INNER JOIN channels ON inserted_spendable.channel_id = channels.id"; // Updates spendable entry deposit or inserts a new spendable entry if it doesn't exist pub async fn update_spendable(pool: DbPool, spendable: &Spendable) -> Result { @@ -73,30 +70,32 @@ pub async fn update_spendable(pool: DbPool, spendable: &Spendable) -> Result Application { (route, _) if route.starts_with("/analytics") => analytics_router(req, self).await, // This is important because it prevents us from doing // expensive regex matching for routes without /channel - (path, _) if path.starts_with("/channel") => channels_router(req, self).await, + (path, _) if path.starts_with("/v5/channel") => channels_router(req, self).await, (path, _) if path.starts_with("/v5/campaign") => campaigns_router(req, self).await, _ => Err(ResponseError::NotFound), } @@ -320,6 +321,8 @@ async fn channels_router( .map_or("".to_string(), |m| m.as_str().to_string())]); req.extensions_mut().insert(param); + req = ChannelLoad.call(req, app).await?; + last_approved(req, app).await } else if let (Some(caps), &Method::GET) = (CHANNEL_STATUS_BY_CHANNEL_ID.captures(&path), method) diff --git a/sentry/src/middleware/campaign.rs b/sentry/src/middleware/campaign.rs index 2da58c1b8..759478a41 100644 --- a/sentry/src/middleware/campaign.rs +++ b/sentry/src/middleware/campaign.rs @@ -39,7 +39,10 @@ impl Middleware for CampaignLoad { mod test { use primitives::{util::tests::prep_db::DUMMY_CAMPAIGN, Campaign}; - use crate::{db::insert_campaign, test_util::setup_dummy_app}; + use crate::{ + db::{insert_campaign, insert_channel}, + test_util::setup_dummy_app, + }; use super::*; @@ -86,6 +89,10 @@ mod test { // existing Campaign { + // insert Channel + insert_channel(&app.pool, campaign.channel) + .await + .expect("Should insert Channel"); // insert Campaign assert!(insert_campaign(&app.pool, &campaign) .await diff --git a/sentry/src/middleware/channel.rs b/sentry/src/middleware/channel.rs index 3d1b4a712..f10869b6c 100644 --- a/sentry/src/middleware/channel.rs +++ b/sentry/src/middleware/channel.rs @@ -1,13 +1,12 @@ use crate::{ db::{get_channel_by_id, get_channel_by_id_and_validator}, middleware::Middleware, + Application, ResponseError, RouteParams, }; -use crate::{Application, ResponseError, RouteParams}; use futures::future::{BoxFuture, FutureExt}; use hex::FromHex; use hyper::{Body, Request}; -use primitives::adapter::Adapter; -use primitives::{ChannelId, ValidatorId}; +use primitives::{adapter::Adapter, ChannelId, ValidatorId}; use std::convert::TryFrom; use async_trait::async_trait; @@ -53,6 +52,7 @@ fn channel_load<'a, A: Adapter + 'static>( } #[derive(Debug)] +#[deprecated = "No longer needed for V4"] pub struct ChannelIfActive; #[async_trait] @@ -89,7 +89,7 @@ fn channel_if_active<'a, A: Adapter + 'static>( let validator_id = ValidatorId::try_from(&validator_id) .map_err(|_| ResponseError::BadRequest("Wrong Validator Id".to_string()))?; - let channel = get_channel_by_id_and_validator(&app.pool, &channel_id, &validator_id) + let channel = get_channel_by_id_and_validator(&app.pool, channel_id, validator_id) .await? .ok_or(ResponseError::NotFound)?; diff --git a/sentry/src/routes/campaign.rs b/sentry/src/routes/campaign.rs index 08ad1be66..ed68be63c 100644 --- a/sentry/src/routes/campaign.rs +++ b/sentry/src/routes/campaign.rs @@ -1,8 +1,9 @@ use crate::{ db::{ accounting::{get_accounting, Side}, - campaign::{get_campaigns_by_channel, insert_campaign, update_campaign}, - spendable::fetch_spendable, + campaign::{get_campaigns_by_channel, update_campaign}, + insert_campaign, insert_channel, + spendable::{fetch_spendable, update_spendable}, CampaignRemaining, DbPool, RedisError, }, success_response, Application, Auth, ResponseError, @@ -10,10 +11,13 @@ use crate::{ use deadpool_postgres::PoolError; use hyper::{Body, Request, Response}; use primitives::{ - adapter::Adapter, + adapter::{Adapter, AdapterErrorKind, Error as AdapterError}, campaign_validator::Validator, + channel_v5::Channel, + config::TokenInfo, sentry::campaign_create::{CreateCampaign, ModifyCampaign}, - Address, Campaign, UnifiedNum, + spender::Spendable, + Address, Campaign, Deposit, UnifiedNum, }; use slog::error; use std::cmp::{max, Ordering}; @@ -40,6 +44,36 @@ pub enum Error { Pool(#[from] PoolError), } +#[derive(Debug, Error)] +pub enum LatestSpendableError { + #[error("Adapter: {0}")] + Adapter(#[from] AdapterError), + #[error("Overflow occurred while converting native token precision to unified precision")] + Overflow, + #[error("DB Pool error: {0}")] + Pool(#[from] PoolError), +} +/// Gets the latest Spendable from the Adapter and updates it in the Database +/// before returning it +pub async fn update_latest_spendable( + adapter: &A, + pool: &DbPool, + channel: Channel, + token: &TokenInfo, + address: Address, +) -> Result> { + let latest_deposit = adapter.get_deposit(&channel, &address).await?; + + let spendable = Spendable { + spender: address, + channel, + deposit: Deposit::::from_precision(latest_deposit, token.precision.get()) + .ok_or(LatestSpendableError::Overflow)?, + }; + + Ok(update_spendable(pool.clone(), &spendable).await?) +} + pub async fn create_campaign( req: Request, app: &Application, @@ -67,8 +101,21 @@ pub async fn create_campaign( )); } - let error_response = - ResponseError::BadRequest("err occurred; please try again later".to_string()); + let token = app + .config + .token_address_whitelist + .get(&campaign.channel.token) + .ok_or_else(|| ResponseError::BadRequest("Channel token is not whitelisted".to_string()))?; + + // make sure that the Channel is available in the DB + // insert Channel + insert_channel(&app.pool, campaign.channel) + .await + .map_err(|error| { + error!(&app.logger, "{}", &error; "module" => "create_campaign"); + + ResponseError::BadRequest("Failed to fetch/create Channel".to_string()) + })?; let total_remaining = { let accounting_spent = get_accounting( @@ -81,14 +128,15 @@ pub async fn create_campaign( .map(|accounting| accounting.amount) .unwrap_or_default(); - let latest_spendable = - fetch_spendable(app.pool.clone(), &campaign.creator, &campaign.channel.id()) - .await? - .ok_or_else(|| { - ResponseError::BadRequest( - "No spendable amount found for the Campaign creator".to_string(), - ) - })?; + let latest_spendable = update_latest_spendable( + &app.adapter, + &app.pool, + campaign.channel, + token, + campaign.creator, + ) + .await + .map_err(|err| ResponseError::BadRequest(err.to_string()))?; // Gets the latest Spendable for this (spender, channelId) pair let total_deposited = latest_spendable.deposit.total; @@ -140,7 +188,8 @@ pub async fn create_campaign( )); } - // insert Campaign + // Channel insertion can never create a `SqlState::UNIQUE_VIOLATION` + // Insert the Campaign too match insert_campaign(&app.pool, &campaign).await { Err(error) => { error!(&app.logger, "{}", &error; "module" => "create_campaign"); @@ -150,7 +199,10 @@ pub async fn create_campaign( "Campaign already exists".to_string(), )) } - _ => Err(error_response), + _err => Err(ResponseError::BadRequest( + "Error occurred when inserting Campaign in Database; please try again later" + .to_string(), + )), } } Ok(false) => Err(ResponseError::BadRequest( @@ -577,6 +629,7 @@ pub mod insert_events { use redis::aio::MultiplexedConnection; use crate::db::{ + insert_channel, redis_pool::TESTS_POOL, tests_postgres::{setup_test_migrations, DATABASE_POOL}, }; @@ -679,6 +732,11 @@ pub mod insert_events { let campaign = DUMMY_CAMPAIGN.clone(); + // make sure that the Channel is created in Database for the Accounting to work properly + insert_channel(&database.pool, campaign.channel) + .await + .expect("It should insert Channel"); + let publisher = ADDRESSES["publisher"]; let leader = campaign.leader().unwrap(); @@ -726,7 +784,7 @@ pub mod insert_events { assert!( spend_event.is_ok(), - "Campaign budget has no remaining funds to spend" + "Campaign budget has no remaining funds to spend or an error occurred" ); // Payout: 300 @@ -749,15 +807,10 @@ pub mod insert_events { #[cfg(test)] mod test { use super::{update_campaign::modify_campaign, *}; - use crate::{ - db::{accounting::update_accounting, spendable::insert_spendable}, - test_util::setup_dummy_app, - }; + use crate::test_util::setup_dummy_app; use hyper::StatusCode; use primitives::{ - spender::{Deposit, Spendable}, - util::tests::prep_db::DUMMY_CAMPAIGN, - ValidatorId, + adapter::Deposit, util::tests::prep_db::DUMMY_CAMPAIGN, BigNum, ChannelId, ValidatorId, }; #[tokio::test] @@ -766,6 +819,27 @@ mod test { /// Test with multiple campaigns (because of Budget) a modification of campaign async fn create_and_modify_with_multiple_campaigns() { let app = setup_dummy_app().await; + let dummy_campaign = DUMMY_CAMPAIGN.clone(); + + // this function should be called before each creation/modification of a Campaign! + let add_deposit_call = |channel: ChannelId, creator: Address, token: Address| { + app.adapter.add_deposit_call( + channel, + creator, + Deposit { + // a deposit 4 times larger than the Campaign Budget + total: UnifiedNum::from(200_000_000_000).to_precision( + app.config + .token_address_whitelist + .get(&token) + .expect("Should get token") + .precision + .get(), + ), + still_on_create2: BigNum::from(0), + }, + ) + }; let build_request = |create_campaign: CreateCampaign| -> Request { let auth = Auth { @@ -784,32 +858,11 @@ mod test { let campaign: Campaign = { // erases the CampaignId for the CreateCampaign request - let mut create = CreateCampaign::from(DUMMY_CAMPAIGN.clone()); + let mut create = CreateCampaign::from(dummy_campaign); // 500.00000000 create.budget = UnifiedNum::from(50_000_000_000); - - let spendable = Spendable { - spender: create.creator, - channel: create.channel.clone(), - deposit: Deposit { - // a deposit 4 times larger than the Campaign Budget - total: UnifiedNum::from(200_000_000_000), - still_on_create2: UnifiedNum::from(0), - }, - }; - assert!(insert_spendable(app.pool.clone(), &spendable) - .await - .expect("Should insert Spendable for Campaign creator")); - - let _accounting = update_accounting( - app.pool.clone(), - create.channel.id(), - create.creator, - Side::Spender, - UnifiedNum::default(), - ) - .await - .expect("Should create Accounting"); + // prepare for Campaign creation + add_deposit_call(create.channel.id(), create.creator, create.channel.token); let create_response = create_campaign(build_request(create), &app) .await @@ -853,6 +906,12 @@ mod test { ad_units: None, targeting_rules: None, }; + // prepare for Campaign modification + add_deposit_call( + campaign.channel.id(), + campaign.creator, + campaign.channel.token, + ); let modified_campaign = modify_campaign(&app.pool, &app.campaign_remaining, campaign.clone(), modify) @@ -872,6 +931,13 @@ mod test { // 500.00000000 create_second.budget = UnifiedNum::from(50_000_000_000); + // prepare for Campaign creation + add_deposit_call( + create_second.channel.id(), + create_second.creator, + create_second.channel.token, + ); + let create_response = create_campaign(build_request(create_second), &app) .await .expect("Should create campaign"); @@ -896,6 +962,9 @@ mod test { // 600.00000000 create.budget = UnifiedNum::from(60_000_000_000); + // prepare for Campaign creation + add_deposit_call(create.channel.id(), create.creator, create.channel.token); + let create_err = create_campaign(build_request(create), &app) .await .expect_err("Should return Error response"); @@ -921,6 +990,13 @@ mod test { targeting_rules: None, }; + // prepare for Campaign modification + add_deposit_call( + modified.channel.id(), + modified.creator, + modified.channel.token, + ); + let modified_campaign = modify_campaign(&app.pool, &app.campaign_remaining, modified, modify) .await @@ -940,6 +1016,9 @@ mod test { // 600.00000000 create.budget = UnifiedNum::from(60_000_000_000); + // prepare for Campaign creation + add_deposit_call(create.channel.id(), create.creator, create.channel.token); + let create_response = create_campaign(build_request(create), &app) .await .expect("Should return create campaign"); @@ -968,6 +1047,13 @@ mod test { targeting_rules: None, }; + // prepare for Campaign modification + add_deposit_call( + modified.channel.id(), + modified.creator, + modified.channel.token, + ); + let modify_err = modify_campaign(&app.pool, &app.campaign_remaining, modified, modify) .await .expect_err("Should return Error response"); diff --git a/sentry/src/routes/channel.rs b/sentry/src/routes/channel.rs index 7a91d2aa7..a423cd184 100644 --- a/sentry/src/routes/channel.rs +++ b/sentry/src/routes/channel.rs @@ -1,29 +1,25 @@ use crate::db::{ - event_aggregate::{ - latest_approve_state, latest_approve_state_v5, latest_heartbeats, latest_new_state, - latest_new_state_v5, - }, - get_channel_by_id, insert_channel, insert_validator_messages, list_channels, + event_aggregate::{latest_approve_state_v5, latest_heartbeats, latest_new_state_v5}, + insert_channel, insert_validator_messages, list_channels, spendable::{fetch_spendable, get_all_spendables_for_channel, update_spendable}, DbPool, PoolError, }; use crate::{success_response, Application, Auth, ResponseError, RouteParams}; use futures::future::try_join_all; -use hex::FromHex; use hyper::{Body, Request, Response}; use primitives::{ adapter::Adapter, balances::{CheckedState, UncheckedState}, + channel::Channel as ChannelOld, channel_v5::Channel as ChannelV5, config::TokenInfo, sentry::{ - channel_list::{ChannelListQuery, LastApprovedQuery}, - AllSpendersResponse, LastApproved, LastApprovedResponse, Pagination, SpenderResponse, - SuccessResponse, + channel_list::ChannelListQuery, AllSpendersResponse, LastApproved, LastApprovedQuery, + LastApprovedResponse, Pagination, SpenderResponse, SuccessResponse, }, - spender::{Deposit, Spendable, Spender, SpenderLeaf}, + spender::{Spendable, Spender, SpenderLeaf}, validator::{MessageTypes, NewState}, - Address, Channel, ChannelId, UnifiedNum, + Address, Channel, Deposit, UnifiedNum, }; use slog::{error, Logger}; use std::{collections::HashMap, str::FromStr}; @@ -36,12 +32,12 @@ pub async fn channel_status( use serde::Serialize; #[derive(Serialize)] struct ChannelStatusResponse<'a> { - channel: &'a Channel, + channel: &'a ChannelOld, } let channel = req .extensions() - .get::() + .get::() .expect("Request should have Channel"); let response = ChannelStatusResponse { channel }; @@ -49,13 +45,14 @@ pub async fn channel_status( Ok(success_response(serde_json::to_string(&response)?)) } +#[deprecated = "V5 Channel no longer needs creation of channel route"] pub async fn create_channel( req: Request, app: &Application, ) -> Result, ResponseError> { let body = hyper::body::to_bytes(req.into_body()).await?; - let channel = serde_json::from_slice::(&body) + let channel = serde_json::from_slice::(&body) .map_err(|e| ResponseError::FailedValidation(e.to_string()))?; // TODO AIP#61: No longer needed, remove! @@ -65,7 +62,7 @@ pub async fn create_channel( let error_response = ResponseError::BadRequest("err occurred; please try again later".into()); - match insert_channel(&app.pool, &channel).await { + match insert_channel(&app.pool, channel).await { Err(error) => { error!(&app.logger, "{}", &error; "module" => "create_channel"); @@ -78,7 +75,6 @@ pub async fn create_channel( _ => Err(error_response), } } - Ok(false) => Err(error_response), _ => Ok(()), }?; @@ -101,9 +97,7 @@ pub async fn channel_list( &app.pool, skip, app.config.channels_find_limit, - &query.creator, - &query.validator, - &query.valid_until_ge, + query.validator, ) .await?; @@ -125,14 +119,11 @@ pub async fn last_approved( req: Request, app: &Application, ) -> Result, ResponseError> { - // get request params - let route_params = req + // get request Channel + let channel = *req .extensions() - .get::() - .expect("request should have route params"); - - let channel_id = ChannelId::from_hex(route_params.index(0))?; - let channel = get_channel_by_id(&app.pool, &channel_id).await?.unwrap(); + .get::() + .ok_or(ResponseError::NotFound)?; let default_response = Response::builder() .header("Content-type", "application/json") @@ -145,26 +136,26 @@ pub async fn last_approved( ) .expect("should build response"); - let approve_state = match latest_approve_state(&app.pool, &channel).await? { + let approve_state = match latest_approve_state_v5(&app.pool, &channel).await? { Some(approve_state) => approve_state, None => return Ok(default_response), }; let state_root = approve_state.msg.state_root.clone(); - let new_state = latest_new_state(&app.pool, &channel, &state_root).await?; + let new_state = latest_new_state_v5(&app.pool, &channel, &state_root).await?; if new_state.is_none() { return Ok(default_response); } let query = serde_urlencoded::from_str::(req.uri().query().unwrap_or(""))?; - let validators = channel.spec.validators; - let channel_id = channel.id; + let validators = vec![channel.leader, channel.follower]; + let channel_id = channel.id(); let heartbeats = if query.with_heartbeat.is_some() { let result = try_join_all( validators .iter() - .map(|validator| latest_heartbeats(&app.pool, &channel_id, &validator.id)), + .map(|validator| latest_heartbeats(&app.pool, &channel_id, validator)), ) .await?; Some(result.into_iter().flatten().collect::>()) @@ -199,7 +190,7 @@ pub async fn create_validator_messages( let channel = req .extensions() - .get::() + .get::() .expect("Request should have Channel") .to_owned(); @@ -211,7 +202,7 @@ pub async fn create_validator_messages( .get("messages") .ok_or_else(|| ResponseError::BadRequest("missing messages body".to_string()))?; - match channel.spec.validators.find(&session.uid) { + match channel.find_validator(session.uid) { None => Err(ResponseError::Unauthorized), _ => { try_join_all(messages.iter().map(|message| { @@ -226,6 +217,7 @@ pub async fn create_validator_messages( } } +/// This will make sure to insert/get the `Channel` from DB before attempting to create the `Spendable` async fn create_or_update_spendable_document( adapter: &impl Adapter, token_info: &TokenInfo, @@ -233,6 +225,8 @@ async fn create_or_update_spendable_document( channel: &ChannelV5, spender: Address, ) -> Result { + insert_channel(&pool, *channel).await?; + let deposit = adapter.get_deposit(channel, &spender).await?; let total = UnifiedNum::from_precision(deposit.total, token_info.precision.get()); let still_on_create2 = @@ -452,7 +446,6 @@ mod test { .await .expect("should return None"); assert!(spendable.is_none()); - // Call create_or_update_spendable let new_spendable = create_or_update_spendable_document( &app.adapter, diff --git a/sentry/src/routes/event_aggregate.rs b/sentry/src/routes/event_aggregate.rs index 987653fcb..312697259 100644 --- a/sentry/src/routes/event_aggregate.rs +++ b/sentry/src/routes/event_aggregate.rs @@ -2,7 +2,9 @@ use chrono::{serde::ts_milliseconds_option, DateTime, Utc}; use hyper::{Body, Request, Response}; use serde::Deserialize; -use primitives::{adapter::Adapter, sentry::EventAggregateResponse, Channel}; +use primitives::{ + adapter::Adapter, channel::Channel as ChannelOld, sentry::EventAggregateResponse, +}; use crate::{success_response, Application, Auth, ResponseError}; @@ -19,7 +21,7 @@ pub async fn list_channel_event_aggregates( ) -> Result, ResponseError> { let channel = req .extensions() - .get::() + .get::() .expect("Request should have Channel"); let auth = req diff --git a/sentry/src/routes/validator_message.rs b/sentry/src/routes/validator_message.rs index e4c98d71b..1f739e14e 100644 --- a/sentry/src/routes/validator_message.rs +++ b/sentry/src/routes/validator_message.rs @@ -61,7 +61,8 @@ pub async fn list_validator_messages( .min(config_limit); let validator_messages = - get_validator_messages(&app.pool, &channel.id, validator_id, message_types, limit).await?; + get_validator_messages(&app.pool, &channel.id(), validator_id, message_types, limit) + .await?; let response = ValidatorMessageResponse { validator_messages }; diff --git a/sentry/src/spender.rs b/sentry/src/spender.rs index 46f91bc7e..e0d411954 100644 --- a/sentry/src/spender.rs +++ b/sentry/src/spender.rs @@ -1,28 +1,3 @@ -use std::time::Duration; - -use dashmap::DashMap; -use primitives::{spender::Aggregate, ChannelId}; - -#[derive(Debug)] -/// -pub struct Aggregator { - /// In-memory aggregates waiting to be saved to the underlying persistence storage (database) - aggregates: DashMap, - /// Specifies how often the Aggregate should be stored in the underlying persistence storage (database) - throttle: Duration, -} - -impl Aggregator { - /// Stores the aggregate to the database - pub fn store_aggregates() { - todo!("Store aggregate to DB") - } - /// Records new spending triggered by a Payout event - pub async fn record() { - todo!("Record a new payout") - } -} - pub mod fee { pub const PRO_MILLE: UnifiedNum = UnifiedNum::from_u64(1_000); diff --git a/validator_worker/src/channel.rs b/validator_worker/src/channel.rs index d1f7da83a..70541b4cd 100644 --- a/validator_worker/src/channel.rs +++ b/validator_worker/src/channel.rs @@ -9,17 +9,16 @@ use slog::Logger; use std::collections::{hash_map::Entry, HashSet}; pub async fn channel_tick( - adapter: A, + sentry: &SentryApi, config: &Config, - logger: &Logger, channel: Channel, - validators: Validators, + // validators: &Validators, ) -> Result { + let adapter = &sentry.adapter; let tick = channel .find_validator(adapter.whoami()) .ok_or(Error::ChannelNotIntendedForUs)?; - let sentry = SentryApi::init(adapter, logger.clone(), config.clone(), validators)?; // 1. `GET /channel/:id/spender/all` let all_spenders = sentry.get_all_spenders(channel.id()).await?; @@ -50,13 +49,13 @@ pub async fn channel_tick( // TODO: Add timeout match tick { primitives::Validator::Leader(_v) => { - let _leader_tick_status = leader::tick(&sentry, channel, accounting.balances, token) + let _leader_tick_status = leader::tick(sentry, channel, accounting.balances, token) .await .map_err(|err| Error::LeaderTick(channel.id(), TickError::Tick(Box::new(err))))?; } primitives::Validator::Follower(_v) => { let _follower_tick_status = - follower::tick(&sentry, channel, all_spenders, accounting.balances, token) + follower::tick(sentry, channel, all_spenders, accounting.balances, token) .await .map_err(|err| { Error::FollowerTick(channel.id(), TickError::Tick(Box::new(err))) @@ -70,14 +69,16 @@ pub async fn channel_tick( /// Fetches all `Campaign`s from Sentry and builds the `Channel`s to be processed /// along side all the `Validator`s' url & auth token pub async fn collect_channels( - adapter: A, + adapter: &A, sentry_url: &ApiUrl, _config: &Config, _logger: &Logger, ) -> Result<(HashSet, Validators), reqwest::Error> { let whoami = adapter.whoami(); - let campaigns = all_campaigns(sentry_url, whoami).await?; + // TODO: Move client creation + let client = reqwest::Client::new(); + let campaigns = all_campaigns(client, sentry_url, whoami).await?; let channels = campaigns .iter() .map(|campaign| campaign.channel) diff --git a/validator_worker/src/lib.rs b/validator_worker/src/lib.rs index dc7d02e7c..e9c4eb88c 100644 --- a/validator_worker/src/lib.rs +++ b/validator_worker/src/lib.rs @@ -9,7 +9,7 @@ use primitives::{ }; use thiserror::Error; -pub use self::sentry_interface::{all_channels, SentryApi}; +pub use self::sentry_interface::SentryApi; pub mod channel; pub mod error; diff --git a/validator_worker/src/main.rs b/validator_worker/src/main.rs index 5860b740a..82f560e43 100644 --- a/validator_worker/src/main.rs +++ b/validator_worker/src/main.rs @@ -23,7 +23,10 @@ use primitives::{ }; use slog::{error, info, Logger}; use std::fmt::Debug; -use validator_worker::channel::{channel_tick, collect_channels}; +use validator_worker::{ + channel::{channel_tick, collect_channels}, + SentryApi, +}; #[derive(Debug, Clone)] struct Args { @@ -172,7 +175,7 @@ async fn infinite(args: Args, logger: &Logger) { async fn all_channels_tick(args: Args, logger: &Logger) { let (channels, validators) = match collect_channels( - args.adapter.clone(), + &args.adapter, &args.sentry_url, &args.config, logger, @@ -187,15 +190,25 @@ async fn all_channels_tick(args: Args, logger: &Logger) }; let channels_size = channels.len(); - let tick_results = join_all(channels.into_iter().map(|channel| { - channel_tick( - args.adapter.clone(), - &args.config, - logger, - channel, - validators.clone(), - ) - })) + // initialize SentryApi once we have all the Campaign Validators we need to propagate messages to + let sentry = match SentryApi::init( + args.adapter.clone(), + logger.clone(), + args.config.clone(), + validators.clone(), + ) { + Ok(sentry) => sentry, + Err(err) => { + error!(logger, "Failed to initialize SentryApi for all channels"; "SentryApi::init()" => ?err); + return; + } + }; + + let tick_results = join_all( + channels + .into_iter() + .map(|channel| channel_tick(&sentry, &args.config, channel)), + ) .await; for channel_err in tick_results.into_iter().filter_map(Result::err) { diff --git a/validator_worker/src/sentry_interface.rs b/validator_worker/src/sentry_interface.rs index 96c6896b5..687084f22 100644 --- a/validator_worker/src/sentry_interface.rs +++ b/validator_worker/src/sentry_interface.rs @@ -1,22 +1,22 @@ use std::{collections::HashMap, time::Duration}; use chrono::{DateTime, Utc}; -use futures::future::{join_all, try_join_all, TryFutureExt}; +use futures::future::{join_all, TryFutureExt}; use reqwest::{Client, Response}; use slog::Logger; use primitives::{ adapter::Adapter, balances::{CheckedState, UncheckedState}, - channel::Channel as ChannelOld, + channel_v5::Channel, sentry::{ - AccountingResponse, ChannelListResponse, EventAggregateResponse, LastApprovedResponse, - SuccessResponse, ValidatorMessageResponse, + AccountingResponse, EventAggregateResponse, LastApprovedResponse, SuccessResponse, + ValidatorMessageResponse, }, spender::Spender, util::ApiUrl, validator::MessageTypes, - Address, {ChannelId, Config, ToETHChecksum, ValidatorId}, + Address, Campaign, {ChannelId, Config, ValidatorId}, }; use thiserror::Error; @@ -105,15 +105,12 @@ impl SentryApi { ) -> Result, Error> { let message_type = message_types.join("+"); - // TODO: Fix endpoint URL let endpoint = self .whoami .url .join(&format!( "v5/channel/{}/validator-messages/{}/{}?limit=1", - channel, - from.to_checksum(), - message_type + channel, from, message_type )) .expect("Should not error when creating endpoint url"); @@ -213,6 +210,21 @@ impl SentryApi { .await } + /// Fetches all `Campaign`s from `sentry` by going through all pages and collecting the `Campaign`s into a single `Vec` + pub async fn all_campaigns(&self) -> Result, Error> { + Ok( + campaigns::all_campaigns(self.client.clone(), &self.whoami.url, self.adapter.whoami()) + .await?, + ) + } + + pub async fn all_channels(&self) -> Result, Error> { + Ok( + channels::all_channels(self.client.clone(), &self.whoami.url, self.adapter.whoami()) + .await?, + ) + } + #[deprecated = "V5 no longer needs event aggregates"] pub async fn get_event_aggregates( &self, @@ -266,48 +278,66 @@ async fn propagate_to( Ok(validator_id) } -pub async fn all_channels( - sentry_url: &str, - whoami: &ValidatorId, -) -> Result, reqwest::Error> { - let url = sentry_url.to_owned(); - let first_page = fetch_page(url.clone(), 0, whoami).await?; - - if first_page.total_pages < 2 { - Ok(first_page.channels) - } else { - let all: Vec = - try_join_all((1..first_page.total_pages).map(|i| fetch_page(url.clone(), i, whoami))) - .await?; - - let result_all: Vec = std::iter::once(first_page) - .chain(all.into_iter()) - .flat_map(|ch| ch.channels.into_iter()) - .collect(); - Ok(result_all) +mod channels { + use futures::{future::try_join_all, TryFutureExt}; + use primitives::{ + channel_v5::Channel, + sentry::channel_list::{ChannelListQuery, ChannelListResponse}, + util::ApiUrl, + ValidatorId, + }; + use reqwest::{Client, Response}; + + pub async fn all_channels( + client: Client, + sentry_url: &ApiUrl, + whoami: ValidatorId, + ) -> Result, reqwest::Error> { + let first_page = fetch_page(&client, sentry_url, 0, whoami).await?; + + if first_page.pagination.total_pages < 2 { + Ok(first_page.channels) + } else { + let all: Vec = try_join_all( + (1..first_page.pagination.total_pages) + .map(|i| fetch_page(&client, sentry_url, i, whoami)), + ) + .await?; + + let result_all: Vec = std::iter::once(first_page) + .chain(all.into_iter()) + .flat_map(|ch| ch.channels.into_iter()) + .collect(); + Ok(result_all) + } } -} -async fn fetch_page( - sentry_url: String, - page: u64, - validator: &ValidatorId, -) -> Result { - let client = Client::new(); - - let query = [ - format!("page={}", page), - format!("validator={}", validator.to_checksum()), - ] - .join("&"); - - client - .get(&format!("{}/channel/list?{}", sentry_url, query)) - .send() - .and_then(|res: Response| res.json::()) - .await -} + async fn fetch_page( + client: &Client, + sentry_url: &ApiUrl, + page: u64, + validator: ValidatorId, + ) -> Result { + let query = ChannelListQuery { + page, + creator: None, + validator: Some(validator), + }; + let endpoint = sentry_url + .join(&format!( + "v5/channel/list?{}", + serde_urlencoded::to_string(query).expect("Should not fail to serialize") + )) + .expect("Should not fail to create endpoint URL"); + + client + .get(endpoint) + .send() + .and_then(|res: Response| res.json::()) + .await + } +} pub mod campaigns { use chrono::Utc; use futures::future::try_join_all; @@ -320,16 +350,18 @@ pub mod campaigns { /// Fetches all `Campaign`s from `sentry` by going through all pages and collecting the `Campaign`s into a single `Vec` pub async fn all_campaigns( + client: Client, sentry_url: &ApiUrl, whoami: ValidatorId, ) -> Result, reqwest::Error> { - let first_page = fetch_page(sentry_url, 0, whoami).await?; + let first_page = fetch_page(&client, sentry_url, 0, whoami).await?; if first_page.pagination.total_pages < 2 { Ok(first_page.campaigns) } else { let all = try_join_all( - (1..first_page.pagination.total_pages).map(|i| fetch_page(sentry_url, i, whoami)), + (1..first_page.pagination.total_pages) + .map(|i| fetch_page(&client, sentry_url, i, whoami)), ) .await?; @@ -342,11 +374,11 @@ pub mod campaigns { } async fn fetch_page( + client: &Client, sentry_url: &ApiUrl, page: u64, validator: ValidatorId, ) -> Result { - let client = Client::new(); let query = CampaignListQuery { page, active_to_ge: Utc::now(),