Skip to content

test + refactor: staging #1129

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 43 commits into from
Feb 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
d25929b
refactor: construct prefix list in-place
Jan 21, 2025
bf64fcd
test: `StorageDir`
Jan 21, 2025
adf0840
refactor: flatten out formats
Jan 21, 2025
87e9a33
Merge remote-tracking branch 'origin/main' into to-file
Jan 23, 2025
a59ca6a
refactor: staging is sematically different from storage
Jan 23, 2025
8c05d15
style: `StorageDir` ~> `Staging`
Jan 23, 2025
4d59e14
refactor: doesn't err
Jan 23, 2025
d50409f
refactor: move writers to staging
Jan 23, 2025
c3991f6
chore: cargo fmt
Jan 23, 2025
83f3259
refactor: Semantically ingestion happens into staging
Jan 23, 2025
100195e
Merge remote-tracking branch 'origin/main' into to-file
Jan 23, 2025
f0b18e9
test: empty staging
Jan 23, 2025
d15055d
fix: keep streams in memory
Jan 24, 2025
3154fb3
style: call it staging
Jan 24, 2025
d47f325
doc: make it clear what the code is doing
Jan 24, 2025
481c985
refactor: move code to where it truly belongs
Jan 24, 2025
f89c143
use `STAGING`
Jan 24, 2025
b750c0f
don't request what you already know
Jan 24, 2025
747c79e
refactor: convert into method
Jan 24, 2025
771b28e
test: fix path expectation
Jan 24, 2025
3a5613b
Merge branch 'main' into to-file
Jan 25, 2025
14c701c
Merge branch 'main' into to-file
Jan 28, 2025
64d3803
refactor: move out unrelated code
Jan 29, 2025
0676ca5
refactor: merge error types
Jan 29, 2025
0298ecd
fix: actually close the writers
Jan 29, 2025
8d30f51
test: multiple arrow to parquet
Jan 29, 2025
dc634dc
refactor: improve code reuse/ readability
Jan 29, 2025
c0cca92
Merge branch 'main' into to-file
Jan 29, 2025
589d6e2
refactor + doc: `generate_prefix` working
Jan 29, 2025
43fa624
revert timeperiod changes
Jan 29, 2025
07b9aa4
Merge branch 'main' into to-file
Jan 29, 2025
4c82b75
Merge branch 'main' into to-file
Jan 30, 2025
321d08b
fix: ensure memory is also flushed
Jan 30, 2025
407d4f3
refactor: vec already knows the length
Jan 30, 2025
d20b66b
ci: lint fix
Jan 30, 2025
42d2013
refactor: update schema from storage
Jan 30, 2025
8b874bc
refactor: reader is not a util
Jan 30, 2025
9be6281
test: same minute ingestion
Jan 30, 2025
3c67d97
fix: empty custom partitioning
Jan 30, 2025
d7158c3
test: miss the current minute
Jan 30, 2025
e2d1ca3
fixes
Jan 30, 2025
a22f12a
Merge remote-tracking branch 'origin/main' into to-file
Jan 30, 2025
a40f763
Merge remote-tracking branch 'origin/main' into to-file
Feb 1, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
target
data*
staging/
staging/*
limitcache
examples
cert.pem
Expand Down
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ zip = { version = "2.2.0", default-features = false, features = ["deflate"] }
[dev-dependencies]
rstest = "0.23.0"
arrow = "53.0.0"
temp-dir = "0.1.14"

[package.metadata.parseable_ui]
assets-url = "https://github.com/parseablehq/console/releases/download/v0.9.18/build.zip"
Expand Down
4 changes: 2 additions & 2 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ pub struct BlobStoreArgs {
pub storage: AzureBlobConfig,
}

#[derive(Parser, Debug)]
#[derive(Parser, Debug, Default)]
pub struct Options {
// Authentication
#[arg(long, env = "P_USERNAME", help = "Admin username to be set for this Parseable server", default_value = DEFAULT_USERNAME)]
Expand Down Expand Up @@ -283,7 +283,7 @@ pub struct Options {
pub ingestor_endpoint: String,

#[command(flatten)]
oidc: Option<OidcConfig>,
pub oidc: Option<OidcConfig>,

// Kafka configuration (conditionally compiled)
#[cfg(any(
Expand Down
32 changes: 10 additions & 22 deletions src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,14 @@
*/

pub mod format;
mod writer;

use arrow_array::RecordBatch;
use arrow_schema::{Field, Fields, Schema};
use itertools::Itertools;
use std::sync::Arc;

use self::error::EventError;
pub use self::writer::STREAM_WRITERS;
use crate::{metadata, storage::StreamType};
use crate::{metadata, staging::STAGING, storage::StreamType};
use chrono::NaiveDateTime;
use std::collections::HashMap;

Expand All @@ -52,36 +50,32 @@ impl Event {
let mut key = get_schema_key(&self.rb.schema().fields);
if self.time_partition.is_some() {
let parsed_timestamp_to_min = self.parsed_timestamp.format("%Y%m%dT%H%M").to_string();
key = format!("{key}{parsed_timestamp_to_min}");
key.push_str(&parsed_timestamp_to_min);
}

if !self.custom_partition_values.is_empty() {
let mut custom_partition_key = String::default();
for (k, v) in self.custom_partition_values.iter().sorted_by_key(|v| v.0) {
custom_partition_key = format!("{custom_partition_key}&{k}={v}");
key.push_str(&format!("&{k}={v}"));
}
key = format!("{key}{custom_partition_key}");
}

let num_rows = self.rb.num_rows() as u64;
if self.is_first_event {
commit_schema(&self.stream_name, self.rb.schema())?;
}

STREAM_WRITERS.append_to_local(
&self.stream_name,
STAGING.get_or_create_stream(&self.stream_name).push(
&key,
&self.rb,
self.parsed_timestamp,
&self.custom_partition_values,
&self.stream_type,
self.stream_type,
)?;

metadata::STREAM_INFO.update_stats(
&self.stream_name,
self.origin_format,
self.origin_size,
num_rows,
self.rb.num_rows(),
self.parsed_timestamp,
)?;

Expand All @@ -93,21 +87,16 @@ impl Event {
pub fn process_unchecked(&self) -> Result<(), EventError> {
let key = get_schema_key(&self.rb.schema().fields);

STREAM_WRITERS.append_to_local(
&self.stream_name,
STAGING.get_or_create_stream(&self.stream_name).push(
&key,
&self.rb,
self.parsed_timestamp,
&self.custom_partition_values,
&self.stream_type,
self.stream_type,
)?;

Ok(())
}

pub fn clear(&self, stream_name: &str) {
STREAM_WRITERS.clear(stream_name);
}
}

pub fn get_schema_key(fields: &[Arc<Field>]) -> String {
Expand Down Expand Up @@ -138,14 +127,13 @@ pub mod error {
use arrow_schema::ArrowError;

use crate::metadata::error::stream_info::MetadataError;
use crate::staging::StagingError;
use crate::storage::ObjectStorageError;

use super::writer::errors::StreamWriterError;

#[derive(Debug, thiserror::Error)]
pub enum EventError {
#[error("Stream Writer Failed: {0}")]
StreamWriter(#[from] StreamWriterError),
StreamWriter(#[from] StagingError),
#[error("Metadata Error: {0}")]
Metadata(#[from] MetadataError),
#[error("Stream Writer Failed: {0}")]
Expand Down
94 changes: 0 additions & 94 deletions src/event/writer/file_writer.rs

This file was deleted.

Loading
Loading