Skip to content

async http sender #114

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 17 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions questdb-rs-ffi/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

53 changes: 46 additions & 7 deletions questdb-rs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ crate-type = ["lib"]

[dependencies]
libc = "0.2"
socket2 = { version = "0.5.5", optional = true }
dns-lookup = "2.0.4"
base64ct = { version = "1.7", features = ["alloc"] }
rustls-pemfile = "2.0.0"
ryu = { version = "1.0" }
itoa = "1.0"
bytes = "1.10.1"

socket2 = { version = "0.5.5", optional = true }
aws-lc-rs = { version = "1.13", optional = true }
ring = { version = "0.17.14", optional = true }
rustls-pki-types = "1.0.1"
Expand All @@ -33,9 +35,21 @@ rustls-native-certs = { version = "0.8.1", optional = true }
webpki-roots = { version = "1.0.1", default-features = false, optional = true }
chrono = { version = "0.4.40", optional = true }

http = { version = "1.3.1", optional = true }

# We need to limit the `ureq` version to 3.0.x since we use
# the `ureq::unversioned` module which does not respect semantic versioning.
ureq = { version = "3.0.10, <3.1.0", default-features = false, features = ["_tls"], optional = true }

tokio = { version = "1.45.1", default-features = false, features = ["net"], optional = true }
tokio-rustls = { version = "0.26.2", default-features = false, optional = true }
#hyper = { version = "1.6.0", default-features = false, optional = true }
#http-body-util = { version = "0.1.3", optional = true }
#hyper-util = { version = "0.1.14", optional = true, features = ["client", "client-legacy", "http1"] }
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls"], optional = true }
lasso = { version = "0.7.3", features = ["multi-threaded"], optional = true }
crossbeam-queue = { version = "0.3.12", optional = true }

serde_json = { version = "1", optional = true }
questdb-confstr = "0.1.1"
rand = { version = "0.9.0", optional = true }
Expand All @@ -56,10 +70,10 @@ mio = { version = "1", features = ["os-poll", "net"] }
chrono = "0.4.31"
tempfile = "3"
webpki-roots = "1.0.1"
rstest = "0.25.0"
tokio = { version = "1.45.1", features = ["macros", "rt-multi-thread"]}

[features]
default = ["sync-sender", "tls-webpki-certs", "ring-crypto"]
default = ["sync-sender", "async-sender-http", "tls-webpki-certs", "ring-crypto"]

## Sync ILP/TCP + ILP/HTTP Sender
sync-sender = ["sync-sender-tcp", "sync-sender-http"]
Expand All @@ -68,7 +82,23 @@ sync-sender = ["sync-sender-tcp", "sync-sender-http"]
sync-sender-tcp = ["_sync-sender", "_sender-tcp", "dep:socket2"]

## Sync ILP/HTTP
sync-sender-http = ["_sync-sender", "_sender-http", "dep:ureq", "dep:serde_json", "dep:rand"]
sync-sender-http = [
"_sync-sender",
"_sender-http",
"dep:ureq"]

## Async ILP/HTTP Sender
async-sender-http = [
"_async-sender",
"_sender-http",
"dep:tokio",
"dep:tokio-rustls",
"dep:reqwest",
"dep:lasso",
"dep:crossbeam-queue"]

## Compatiblity alias.
ilp-over-http = ["sync-sender-tcp"]

## Allow use OS-provided root TLS certificates
tls-native-certs = ["dep:rustls-native-certs"]
Expand All @@ -92,9 +122,10 @@ json_tests = []
chrono_timestamp = ["chrono"]

# Hidden derived features, used in code to enable-disable code sections. Don't use directly.
_sender-tcp = []
_sender-http = []
_sync-sender = []
_sender-tcp = [] # enabled for sync-sender-tcp
_sender-http = ["dep:http", "dep:serde_json", "dep:rand"] # enabled for any(sync-sender-http, async-sender-http)
_sync-sender = [] # enabled for any(sync-sender-tcp, sync-sender-http)
_async-sender = [] # enabled for async-sender-http)

## Enable all cross-compatible features.
## The `aws-lc-crypto` and `ring-crypto` features are mutually exclusive,
Expand All @@ -112,6 +143,14 @@ almost-all-features = [
"ndarray"
]

[[example]]
name = "from_conf"
required-features = ["sync-sender-tcp"]

[[example]]
name = "from_env"
required-features = ["sync-sender-tcp"]

[[example]]
name = "basic"
required-features = ["chrono_timestamp", "ndarray"]
Expand Down
38 changes: 31 additions & 7 deletions questdb-rs/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ pub mod json_tests {
use crate::tests::{TestResult};
use base64ct::Base64;
use base64ct::Encoding;
use rstest::rstest;

fn matches_any_line(line: &[u8], expected: &[&str]) -> bool {
for &exp in expected {
Expand All @@ -144,12 +143,37 @@ pub mod json_tests {
// for line in serde_json::to_string_pretty(&spec).unwrap().split("\n") {
// writeln!(output, "/// {}", line)?;
// }
writeln!(output, "#[rstest]")?;
let test_name_slug = slugify!(&spec.test_name, separator = "_");
writeln!(output, "#[test]")?;
writeln!(
output,
"fn test_{:03}_{}(\n #[values(ProtocolVersion::V1, ProtocolVersion::V2)] version: ProtocolVersion,\n) -> TestResult {{",
index,
slugify!(&spec.test_name, separator = "_")
"fn test_{:03}_{}_v1() -> TestResult {{",
index, test_name_slug
)?;
writeln!(
output,
" _test_{:03}_{}(ProtocolVersion::V1)\n",
index, test_name_slug
)?;
writeln!(output, "}}");

writeln!(output, "#[test]")?;
writeln!(
output,
"fn test_{:03}_{}_v2() -> TestResult {{",
index, test_name_slug
)?;
writeln!(
output,
" _test_{:03}_{}(ProtocolVersion::V2)\n",
index, test_name_slug
)?;
writeln!(output, "}}");

writeln!(
output,
"fn _test_{:03}_{}(version: ProtocolVersion) -> TestResult {{",
index, test_name_slug
)?;
writeln!(output, " let mut buffer = Buffer::new(version);")?;

Expand Down Expand Up @@ -274,11 +298,11 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {

#[cfg(not(any(feature = "_sender-tcp", feature = "_sender-http")))]
compile_error!(
"At least one of `sync-sender-tcp` or `sync-sender-http` features must be enabled"
"At least one of `sync-sender-tcp`, `sync-sender-http`, or `async-sender-http` features must be enabled"
);

#[cfg(not(any(feature = "aws-lc-crypto", feature = "ring-crypto")))]
compile_error!("You must enable exactly one of the `aws-lc-crypto` or `ring-crypto` features, but none are enabled.");
compile_error!("You must enable exactly one of the `aws-lc-crypto` or `ring-crypto` features, but neither are enabled.");

#[cfg(all(feature = "aws-lc-crypto", feature = "ring-crypto"))]
compile_error!("You must enable exactly one of the `aws-lc-crypto` or `ring-crypto` features, but both are enabled.");
Expand Down
199 changes: 199 additions & 0 deletions questdb-rs/src/ingress/async_sender/http.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (c) 2014-2019 Appsicle
* Copyright (c) 2019-2025 QuestDB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/
use std::future::Future;
use std::time::Duration;

use crate::error::{fmt, Result};
use crate::ingress::conf::SETTINGS_RETRY_TIMEOUT;
use crate::ingress::http_common::{
is_retriable_status_code, process_settings_response, ParsedResponseHeaders,
};
use crate::ingress::tls::TlsSettings;
use crate::ingress::ProtocolVersion;
use bytes::Bytes;
use rand::Rng;
use reqwest::{Client, RequestBuilder, StatusCode, Url};
use tokio::time::{sleep, Instant};

// TODO:
// * Implement Auth.
// * Implement TLS.

pub(super) struct HttpClient {
tls: Option<TlsSettings>,
auth: Option<String>,
client: Client,
}

impl HttpClient {
pub fn new(tls: Option<TlsSettings>, auth: Option<String>, user_agent: &str) -> Result<Self> {
let builder = Client::builder().user_agent(user_agent);
let client = match builder.build() {
Ok(client) => client,
Err(e) => return Err(fmt!(ConfigError, "Could not create http client: {}", e)),
};
Ok(Self { tls, auth, client })
}

pub async fn get(
&self,
url: &Url,
request_timeout: Duration,
) -> (bool, Result<(StatusCode, ParsedResponseHeaders, Bytes)>) {
let builder = self.client.get(url.clone()).timeout(request_timeout);
perform_request(builder).await
}

pub async fn get_with_retries(
&self,
url: &Url,
request_timeout: Duration,
retry_timeout: Duration,
) -> Result<(StatusCode, ParsedResponseHeaders, Bytes)> {
request_with_retries(|| self.get(url, request_timeout), retry_timeout).await
}

pub async fn post(
&self,
url: &Url,
body: Bytes,
request_timeout: Duration,
) -> (bool, Result<(StatusCode, ParsedResponseHeaders, Bytes)>) {
let builder = self
.client
.post(url.clone())
.timeout(request_timeout)
.body(body);
perform_request(builder).await
}

pub async fn post_with_retries(
&self,
url: &Url,
body: Bytes,
request_timeout: Duration,
retry_timeout: Duration,
) -> Result<(StatusCode, ParsedResponseHeaders, Bytes)> {
request_with_retries(
|| self.post(url, body.clone(), request_timeout),
retry_timeout,
)
.await
}
}

pub(super) fn build_url(tls: bool, host: &str, port: &str, path: &str) -> Result<Url> {
let schema = if tls { "https" } else { "http" };
let url_string = format!("{schema}://{host}:{port}/{path}");
let map_url_err = |url, e| fmt!(CouldNotResolveAddr, "could not parse url {url:?}: {e}");
Url::parse(&url_string).map_err(|e| map_url_err(&url_string, e))
}

fn map_reqwest_err(
err: reqwest::Error,
) -> (bool, Result<(StatusCode, ParsedResponseHeaders, Bytes)>) {
let mut need_retry = false;
if err.is_timeout() || err.is_connect() || err.is_redirect() {
need_retry = true;
}
if let Some(status) = err.status() {
if is_retriable_status_code(status) {
need_retry = true;
}
}
(
need_retry,
Err(fmt!(SocketError, "Error receiving HTTP response: {err}")),
)
}

async fn perform_request(
builder: RequestBuilder,
) -> (bool, Result<(StatusCode, ParsedResponseHeaders, Bytes)>) {
let response = match builder.send().await {
Ok(response) => response,
Err(err) => return map_reqwest_err(err),
};
let status = response.status();
let header_data = ParsedResponseHeaders::parse(response.headers());
match response.bytes().await {
Ok(bytes) => (
is_retriable_status_code(status),
Ok((status, header_data, bytes)),
),
Err(err) => map_reqwest_err(err),
}
}

async fn request_with_retries<F, Fut>(
mut do_request: F,
retry_timeout: Duration,
) -> Result<(StatusCode, ParsedResponseHeaders, Bytes)>
where
F: FnMut() -> Fut,
Fut: Future<Output = (bool, Result<(StatusCode, ParsedResponseHeaders, Bytes)>)>,
{
let (need_retry, last_response) = do_request().await;
if !need_retry || retry_timeout.is_zero() {
return last_response;
}

let mut rng = rand::rng();
let retry_end = Instant::now() + retry_timeout;
let mut retry_interval_ms = 10;
loop {
let jitter_ms = rng.random_range(-5i32..5);
let to_sleep_ms = retry_interval_ms + jitter_ms;
let to_sleep = Duration::from_millis(to_sleep_ms as u64);
if (Instant::now() + to_sleep) > retry_end {
return last_response;
}
sleep(to_sleep).await;
let (need_retry, last_response) = do_request().await;
if !need_retry {
return last_response;
}
retry_interval_ms = (retry_interval_ms * 2).min(1000);
}
}

pub(super) async fn read_server_settings(
client: &HttpClient,
settings_url: &Url,
default_max_name_len: usize,
request_timeout: Duration,
) -> Result<(Vec<ProtocolVersion>, usize)> {
let default_protocol_version = ProtocolVersion::V1;

let response = client
.get_with_retries(settings_url, request_timeout, SETTINGS_RETRY_TIMEOUT)
.await;

process_settings_response(
response,
settings_url.as_str(),
default_protocol_version,
default_max_name_len,
)
}
Loading
Loading