diff --git a/.gitignore b/.gitignore index 57ea8e65e..22df045e3 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,6 @@ target data* -staging/ +staging/* limitcache examples cert.pem diff --git a/Cargo.lock b/Cargo.lock index 908219d4c..00a83ec92 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3284,6 +3284,7 @@ dependencies = [ "sha2", "static-files", "sysinfo", + "temp-dir", "thiserror 2.0.9", "tokio", "tokio-stream", @@ -4559,6 +4560,12 @@ dependencies = [ "libc", ] +[[package]] +name = "temp-dir" +version = "0.1.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc1ee6eef34f12f765cb94725905c6312b6610ab2b0940889cfe58dae7bc3c72" + [[package]] name = "tempfile" version = "3.10.1" diff --git a/Cargo.toml b/Cargo.toml index 97832b7f3..b565185d5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -129,6 +129,7 @@ zip = { version = "2.2.0", default-features = false, features = ["deflate"] } [dev-dependencies] rstest = "0.23.0" arrow = "53.0.0" +temp-dir = "0.1.14" [package.metadata.parseable_ui] assets-url = "https://github.com/parseablehq/console/releases/download/v0.9.18/build.zip" diff --git a/src/cli.rs b/src/cli.rs index 24dbd7429..727ed4337 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -102,7 +102,7 @@ pub struct BlobStoreArgs { pub storage: AzureBlobConfig, } -#[derive(Parser, Debug)] +#[derive(Parser, Debug, Default)] pub struct Options { // Authentication #[arg(long, env = "P_USERNAME", help = "Admin username to be set for this Parseable server", default_value = DEFAULT_USERNAME)] @@ -283,7 +283,7 @@ pub struct Options { pub ingestor_endpoint: String, #[command(flatten)] - oidc: Option, + pub oidc: Option, // Kafka configuration (conditionally compiled) #[cfg(any( diff --git a/src/event/mod.rs b/src/event/mod.rs index ce4c5d23e..1178c7138 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -18,7 +18,6 @@ */ pub mod format; -mod writer; use arrow_array::RecordBatch; use arrow_schema::{Field, Fields, Schema}; @@ -26,8 +25,7 @@ use itertools::Itertools; use std::sync::Arc; use self::error::EventError; -pub use self::writer::STREAM_WRITERS; -use crate::{metadata, storage::StreamType}; +use crate::{metadata, staging::STAGING, storage::StreamType}; use chrono::NaiveDateTime; use std::collections::HashMap; @@ -52,36 +50,32 @@ impl Event { let mut key = get_schema_key(&self.rb.schema().fields); if self.time_partition.is_some() { let parsed_timestamp_to_min = self.parsed_timestamp.format("%Y%m%dT%H%M").to_string(); - key = format!("{key}{parsed_timestamp_to_min}"); + key.push_str(&parsed_timestamp_to_min); } if !self.custom_partition_values.is_empty() { - let mut custom_partition_key = String::default(); for (k, v) in self.custom_partition_values.iter().sorted_by_key(|v| v.0) { - custom_partition_key = format!("{custom_partition_key}&{k}={v}"); + key.push_str(&format!("&{k}={v}")); } - key = format!("{key}{custom_partition_key}"); } - let num_rows = self.rb.num_rows() as u64; if self.is_first_event { commit_schema(&self.stream_name, self.rb.schema())?; } - STREAM_WRITERS.append_to_local( - &self.stream_name, + STAGING.get_or_create_stream(&self.stream_name).push( &key, &self.rb, self.parsed_timestamp, &self.custom_partition_values, - &self.stream_type, + self.stream_type, )?; metadata::STREAM_INFO.update_stats( &self.stream_name, self.origin_format, self.origin_size, - num_rows, + self.rb.num_rows(), self.parsed_timestamp, )?; @@ -93,21 +87,16 @@ impl Event { pub fn process_unchecked(&self) -> Result<(), EventError> { let key = get_schema_key(&self.rb.schema().fields); - STREAM_WRITERS.append_to_local( - &self.stream_name, + STAGING.get_or_create_stream(&self.stream_name).push( &key, &self.rb, self.parsed_timestamp, &self.custom_partition_values, - &self.stream_type, + self.stream_type, )?; Ok(()) } - - pub fn clear(&self, stream_name: &str) { - STREAM_WRITERS.clear(stream_name); - } } pub fn get_schema_key(fields: &[Arc]) -> String { @@ -138,14 +127,13 @@ pub mod error { use arrow_schema::ArrowError; use crate::metadata::error::stream_info::MetadataError; + use crate::staging::StagingError; use crate::storage::ObjectStorageError; - use super::writer::errors::StreamWriterError; - #[derive(Debug, thiserror::Error)] pub enum EventError { #[error("Stream Writer Failed: {0}")] - StreamWriter(#[from] StreamWriterError), + StreamWriter(#[from] StagingError), #[error("Metadata Error: {0}")] Metadata(#[from] MetadataError), #[error("Stream Writer Failed: {0}")] diff --git a/src/event/writer/file_writer.rs b/src/event/writer/file_writer.rs deleted file mode 100644 index fed5afa7c..000000000 --- a/src/event/writer/file_writer.rs +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Parseable Server (C) 2022 - 2024 Parseable, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - * - */ - -use arrow_array::RecordBatch; -use arrow_ipc::writer::StreamWriter; -use derive_more::{Deref, DerefMut}; -use std::collections::HashMap; -use std::fs::{File, OpenOptions}; -use std::path::PathBuf; - -use super::errors::StreamWriterError; -use crate::storage::staging::StorageDir; -use chrono::NaiveDateTime; - -pub struct ArrowWriter { - pub writer: StreamWriter, -} - -#[derive(Deref, DerefMut, Default)] -pub struct FileWriter(HashMap); - -impl FileWriter { - // append to a existing stream - pub fn push( - &mut self, - stream_name: &str, - schema_key: &str, - record: &RecordBatch, - parsed_timestamp: NaiveDateTime, - custom_partition_values: &HashMap, - ) -> Result<(), StreamWriterError> { - match self.get_mut(schema_key) { - Some(writer) => { - writer.writer.write(record)?; - } - // entry is not present thus we create it - None => { - // this requires mutable borrow of the map so we drop this read lock and wait for write lock - let (_, writer) = init_new_stream_writer_file( - stream_name, - schema_key, - record, - parsed_timestamp, - custom_partition_values, - )?; - self.insert(schema_key.to_owned(), ArrowWriter { writer }); - } - }; - - Ok(()) - } - - pub fn close_all(self) { - for mut writer in self.0.into_values() { - _ = writer.writer.finish(); - } - } -} - -fn init_new_stream_writer_file( - stream_name: &str, - schema_key: &str, - record: &RecordBatch, - parsed_timestamp: NaiveDateTime, - custom_partition_values: &HashMap, -) -> Result<(PathBuf, StreamWriter), StreamWriterError> { - let dir = StorageDir::new(stream_name); - let path = dir.path_by_current_time(schema_key, parsed_timestamp, custom_partition_values); - std::fs::create_dir_all(dir.data_path)?; - - let file = OpenOptions::new().create(true).append(true).open(&path)?; - - let mut stream_writer = StreamWriter::try_new(file, &record.schema()) - .expect("File and RecordBatch both are checked"); - - stream_writer.write(record)?; - Ok((path, stream_writer)) -} diff --git a/src/event/writer/mod.rs b/src/event/writer/mod.rs deleted file mode 100644 index 895cd59ed..000000000 --- a/src/event/writer/mod.rs +++ /dev/null @@ -1,189 +0,0 @@ -/* - * Parseable Server (C) 2022 - 2024 Parseable, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - * - */ - -mod file_writer; -mod mem_writer; - -use std::{ - collections::HashMap, - sync::{Arc, Mutex, RwLock}, -}; - -use crate::{ - option::{Mode, CONFIG}, - storage::StreamType, -}; - -use self::{errors::StreamWriterError, file_writer::FileWriter, mem_writer::MemWriter}; -use arrow_array::RecordBatch; -use arrow_schema::Schema; -use chrono::NaiveDateTime; -use derive_more::{Deref, DerefMut}; -use once_cell::sync::Lazy; - -pub static STREAM_WRITERS: Lazy = Lazy::new(WriterTable::default); - -#[derive(Default)] -pub struct Writer { - pub mem: MemWriter<16384>, - pub disk: FileWriter, -} - -impl Writer { - fn push( - &mut self, - stream_name: &str, - schema_key: &str, - rb: &RecordBatch, - parsed_timestamp: NaiveDateTime, - custom_partition_values: &HashMap, - ) -> Result<(), StreamWriterError> { - self.disk.push( - stream_name, - schema_key, - rb, - parsed_timestamp, - custom_partition_values, - )?; - self.mem.push(schema_key, rb); - Ok(()) - } - - fn push_mem(&mut self, schema_key: &str, rb: &RecordBatch) -> Result<(), StreamWriterError> { - self.mem.push(schema_key, rb); - Ok(()) - } -} - -#[derive(Deref, DerefMut, Default)] -pub struct WriterTable(RwLock>>); - -impl WriterTable { - // Concatenates record batches and puts them in memory store for each event. - pub fn append_to_local( - &self, - stream_name: &str, - schema_key: &str, - record: &RecordBatch, - parsed_timestamp: NaiveDateTime, - custom_partition_values: &HashMap, - stream_type: &StreamType, - ) -> Result<(), StreamWriterError> { - if !self.read().unwrap().contains_key(stream_name) { - // Gets write privileges only for inserting a writer - self.write() - .unwrap() - .insert(stream_name.to_owned(), Mutex::new(Writer::default())); - } - - // Updates the writer with only read privileges - self.handle_existing_writer( - stream_name, - schema_key, - record, - parsed_timestamp, - custom_partition_values, - stream_type, - )?; - - Ok(()) - } - - /// Update writer for stream when it already exists - fn handle_existing_writer( - &self, - stream_name: &str, - schema_key: &str, - record: &RecordBatch, - parsed_timestamp: NaiveDateTime, - custom_partition_values: &HashMap, - stream_type: &StreamType, - ) -> Result<(), StreamWriterError> { - let hashmap_guard = self.read().unwrap(); - let mut writer = hashmap_guard - .get(stream_name) - .expect("Stream exists") - .lock() - .unwrap(); - if CONFIG.options.mode != Mode::Query || *stream_type == StreamType::Internal { - writer.push( - stream_name, - schema_key, - record, - parsed_timestamp, - custom_partition_values, - )?; - } else { - writer.push_mem(stream_name, record)?; - } - - Ok(()) - } - - pub fn clear(&self, stream_name: &str) { - let map = self.write().unwrap(); - if let Some(writer) = map.get(stream_name) { - let w = &mut writer.lock().unwrap().mem; - w.clear(); - } - } - - pub fn delete_stream(&self, stream_name: &str) { - self.write().unwrap().remove(stream_name); - } - - pub fn unset_all(&self) { - let mut table = self.write().unwrap(); - let map = std::mem::take(&mut *table); - drop(table); - for writer in map.into_values() { - let writer = writer.into_inner().unwrap(); - writer.disk.close_all(); - } - } - - pub fn recordbatches_cloned( - &self, - stream_name: &str, - schema: &Arc, - ) -> Option> { - let records = self - .0 - .read() - .unwrap() - .get(stream_name)? - .lock() - .unwrap() - .mem - .recordbatch_cloned(schema); - - Some(records) - } -} - -pub mod errors { - - #[derive(Debug, thiserror::Error)] - pub enum StreamWriterError { - #[error("Arrow writer failed: {0}")] - Writer(#[from] arrow_schema::ArrowError), - #[error("Io Error when creating new file: {0}")] - Io(#[from] std::io::Error), - } -} diff --git a/src/handlers/airplane.rs b/src/handlers/airplane.rs index 5edfdba21..448c986ef 100644 --- a/src/handlers/airplane.rs +++ b/src/handlers/airplane.rs @@ -39,6 +39,7 @@ use crate::handlers::livetail::cross_origin_config; use crate::metrics::QUERY_EXECUTE_TIME; use crate::option::CONFIG; use crate::query::{TableScanVisitor, QUERY_SESSION}; +use crate::staging::STAGING; use crate::utils::arrow::flight::{ append_temporary_events, get_query_from_ticket, into_flight_data, run_do_get_rpc, send_to_ingester, @@ -231,10 +232,12 @@ impl FlightService for AirServiceImpl { .collect::>(); let schema = Schema::try_merge(schemas).map_err(|err| Status::internal(err.to_string()))?; */ + // Taxi out airplane let out = into_flight_data(records); - if let Some(event) = event { - event.clear(&stream_name); + if event.is_some() { + // Clear staging of stream once airplane has taxied + STAGING.clear(&stream_name); } let time = time.elapsed().as_secs_f64(); @@ -242,6 +245,7 @@ impl FlightService for AirServiceImpl { .with_label_values(&[&format!("flight-query-{}", stream_name)]) .observe(time); + // Airplane takes off 🛫 out } diff --git a/src/handlers/http/health_check.rs b/src/handlers/http/health_check.rs index a0516d318..c9e55b5d5 100644 --- a/src/handlers/http/health_check.rs +++ b/src/handlers/http/health_check.rs @@ -17,6 +17,7 @@ */ use crate::option::CONFIG; +use crate::staging::STAGING; use actix_web::body::MessageBody; use actix_web::dev::{ServiceRequest, ServiceResponse}; use actix_web::error::ErrorServiceUnavailable; @@ -56,8 +57,8 @@ pub async fn shutdown() { let mut shutdown_flag = SIGNAL_RECEIVED.lock().await; *shutdown_flag = true; - // Sync to local - crate::event::STREAM_WRITERS.unset_all(); + // Sync staging + STAGING.flush_all(); } pub async fn readiness() -> HttpResponse { diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index 2aa544dbe..90e8f863f 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -32,11 +32,12 @@ use crate::metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STO use crate::option::{Mode, CONFIG}; use crate::rbac::role::Action; use crate::rbac::Users; +use crate::staging::{Stream, STAGING}; +use crate::stats; use crate::stats::{event_labels_date, storage_size_labels_date, Stats}; -use crate::storage::{retention::Retention, StorageDir}; +use crate::storage::retention::Retention; use crate::storage::{StreamInfo, StreamType}; use crate::utils::actix::extract_session_key_from_req; -use crate::{event, stats}; use crate::{metadata, validator}; use actix_web::http::header::{self, HeaderMap}; @@ -66,7 +67,7 @@ pub async fn delete(stream_name: Path) -> Result) -> Result) -> Result = Lazy::new(|| { + // all the files should be in the staging directory root + let entries = + std::fs::read_dir(&CONFIG.options.local_staging_path).expect("Couldn't read from file"); + let url = get_url(); + let port = url.port().unwrap_or(80).to_string(); + let url = url.to_string(); + + for entry in entries { + // cause the staging directory will have only one file with ingestor in the name + // so the JSON Parse should not error unless the file is corrupted + let path = entry.expect("Should be a directory entry").path(); + let flag = path + .file_name() + .unwrap_or_default() + .to_str() + .unwrap_or_default() + .contains("ingestor"); + + if flag { + // get the ingestor metadata from staging + let text = std::fs::read(path).expect("File should be present"); + let mut meta: Value = serde_json::from_slice(&text).expect("Valid JSON"); + + // migrate the staging meta + let obj = meta + .as_object_mut() + .expect("Could Not parse Ingestor Metadata Json"); + + if obj.get("flight_port").is_none() { + obj.insert( + "flight_port".to_owned(), + Value::String(CONFIG.options.flight_port.to_string()), + ); + } + + let mut meta: IngestorMetadata = + serde_json::from_value(meta).expect("Couldn't write to disk"); + + // compare url endpoint and port + if meta.domain_name != url { + info!( + "Domain Name was Updated. Old: {} New: {}", + meta.domain_name, url + ); + meta.domain_name = url; + } + + if meta.port != port { + info!("Port was Updated. Old: {} New: {}", meta.port, port); + meta.port = port; + } + + let token = base64::prelude::BASE64_STANDARD.encode(format!( + "{}:{}", + CONFIG.options.username, CONFIG.options.password + )); + + let token = format!("Basic {}", token); + + if meta.token != token { + // TODO: Update the message to be more informative with username and password + info!( + "Credentials were Updated. Old: {} New: {}", + meta.token, token + ); + meta.token = token; + } + + meta.put_on_disk(CONFIG.staging_dir()) + .expect("Couldn't write to disk"); + return meta; + } + } -/// ! have to use a guard before using it -pub static INGESTOR_META: Lazy = - Lazy::new(|| staging::get_ingestor_info().expect("Should Be valid Json")); + let store = CONFIG.storage().get_object_store(); + let out = IngestorMetadata::new( + port, + url, + DEFAULT_VERSION.to_string(), + store.get_bucket_name(), + &CONFIG.options.username, + &CONFIG.options.password, + get_ingestor_id(), + CONFIG.options.flight_port.to_string(), + ); + + out.put_on_disk(CONFIG.staging_dir()) + .expect("Should Be valid Json"); + out +}); pub struct IngestServer; diff --git a/src/handlers/http/modal/mod.rs b/src/handlers/http/modal/mod.rs index 89fc7021e..d36372b2b 100644 --- a/src/handlers/http/modal/mod.rs +++ b/src/handlers/http/modal/mod.rs @@ -24,6 +24,7 @@ pub mod server; pub mod ssl_acceptor; pub mod utils; +use std::path::Path; use std::sync::Arc; use actix_web::middleware::from_fn; @@ -203,6 +204,21 @@ impl IngestorMetadata { pub fn get_ingestor_id(&self) -> String { self.ingestor_id.clone() } + + /// Puts the ingestor info into the staging. + /// + /// This function takes the ingestor info as a parameter and stores it in staging. + /// # Parameters + /// + /// * `staging_path`: Staging root directory. + pub fn put_on_disk(&self, staging_path: &Path) -> anyhow::Result<()> { + let file_name = format!("ingestor.{}.json", self.ingestor_id); + let file_path = staging_path.join(file_name); + + std::fs::write(file_path, serde_json::to_vec(&self)?)?; + + Ok(()) + } } #[cfg(test)] diff --git a/src/handlers/http/modal/query/querier_logstream.rs b/src/handlers/http/modal/query/querier_logstream.rs index 83a8fbd13..1b41660a6 100644 --- a/src/handlers/http/modal/query/querier_logstream.rs +++ b/src/handlers/http/modal/query/querier_logstream.rs @@ -32,7 +32,6 @@ use tracing::{error, warn}; static CREATE_STREAM_LOCK: Mutex<()> = Mutex::const_new(()); use crate::{ - event, handlers::http::{ base_path_without_preceding_slash, cluster::{ @@ -48,8 +47,9 @@ use crate::{ hottier::HotTierManager, metadata::{self, STREAM_INFO}, option::CONFIG, + staging::{Stream, STAGING}, stats::{self, Stats}, - storage::{StorageDir, StreamType}, + storage::StreamType, }; pub async fn delete(stream_name: Path) -> Result { @@ -69,7 +69,7 @@ pub async fn delete(stream_name: Path) -> Result) -> Result Result<(), MetadataError> { let parsed_date = parsed_timestamp.date().to_string(); @@ -368,18 +368,6 @@ impl StreamInfo { } } -fn update_schema_from_staging(stream_name: &str, current_schema: Schema) -> Schema { - let staging_files = StorageDir::new(stream_name).arrow_files(); - let record_reader = MergedRecordReader::try_new(&staging_files).unwrap(); - if record_reader.readers.is_empty() { - return current_schema; - } - - let schema = record_reader.merged_schema(); - - Schema::try_merge(vec![schema, current_schema]).unwrap() -} - ///this function updates the data type of time partition field /// from utf-8 to timestamp if it is not already timestamp /// and updates the schema in the storage @@ -450,7 +438,9 @@ pub async fn load_stream_metadata_on_server_start( fetch_stats_from_storage(stream_name, stats).await; load_daily_metrics(&snapshot.manifest_list, stream_name); - let schema = update_schema_from_staging(stream_name, schema); + let schema = STAGING + .get_or_create_stream(stream_name) + .updated_schema(schema); let schema = HashMap::from_iter( schema .fields diff --git a/src/migration/metadata_migration.rs b/src/migration/metadata_migration.rs index 4385c93b0..5de34a690 100644 --- a/src/migration/metadata_migration.rs +++ b/src/migration/metadata_migration.rs @@ -21,9 +21,8 @@ use rand::distributions::DistString; use serde_json::{json, Map, Value as JsonValue}; use crate::{ - handlers::http::modal::IngestorMetadata, - option::CONFIG, - storage::{object_storage::ingestor_metadata_path, staging}, + handlers::http::modal::IngestorMetadata, option::CONFIG, + storage::object_storage::ingestor_metadata_path, }; /* @@ -197,7 +196,7 @@ pub async fn migrate_ingester_metadata() -> anyhow::Result. + * + * + */ + +use once_cell::sync::Lazy; +pub use streams::{Stream, Streams}; + +mod reader; +mod streams; +mod writer; + +#[derive(Debug, thiserror::Error)] +pub enum StagingError { + #[error("Unable to create recordbatch stream")] + Arrow(#[from] arrow_schema::ArrowError), + #[error("Could not generate parquet file")] + Parquet(#[from] parquet::errors::ParquetError), + #[error("IO Error {0}")] + ObjectStorage(#[from] std::io::Error), + #[error("Could not generate parquet file")] + Create, +} + +/// Staging is made up of multiple streams, each stream's context is housed in a single `Stream` object. +/// `STAGING` is a globally shared mapping of `Streams` that are in staging. +pub static STAGING: Lazy = Lazy::new(Streams::default); diff --git a/src/utils/arrow/reverse_reader.rs b/src/staging/reader.rs similarity index 69% rename from src/utils/arrow/reverse_reader.rs rename to src/staging/reader.rs index bce5cd695..6df0dc324 100644 --- a/src/utils/arrow/reverse_reader.rs +++ b/src/staging/reader.rs @@ -14,17 +14,148 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * + * */ use std::{ + fs::{remove_file, File}, io::{self, BufReader, Read, Seek, SeekFrom}, + path::PathBuf, + sync::Arc, vec::IntoIter, }; -use arrow_array::{RecordBatch, UInt64Array}; +use arrow_array::{RecordBatch, TimestampMillisecondArray}; use arrow_ipc::{reader::StreamReader, root_as_message_unchecked, MessageHeader}; -use arrow_select::take::take; +use arrow_schema::Schema; use byteorder::{LittleEndian, ReadBytesExt}; +use itertools::kmerge_by; +use tracing::error; + +use crate::{ + event::DEFAULT_TIMESTAMP_KEY, + utils::arrow::{adapt_batch, reverse}, +}; + +#[derive(Debug)] +pub struct MergedRecordReader { + pub readers: Vec>>, +} + +impl MergedRecordReader { + pub fn try_new(files: &[PathBuf]) -> Result { + let mut readers = Vec::with_capacity(files.len()); + + for file in files { + //remove empty files before reading + if file.metadata().unwrap().len() == 0 { + error!("Invalid file detected, removing it: {:?}", file); + remove_file(file).unwrap(); + } else { + let Ok(reader) = + StreamReader::try_new(BufReader::new(File::open(file).unwrap()), None) + else { + error!("Invalid file detected, ignoring it: {:?}", file); + continue; + }; + + readers.push(reader); + } + } + + Ok(Self { readers }) + } + + pub fn merged_schema(&self) -> Schema { + Schema::try_merge( + self.readers + .iter() + .map(|reader| reader.schema().as_ref().clone()), + ) + .unwrap() + } +} + +#[derive(Debug)] +pub struct MergedReverseRecordReader { + pub readers: Vec>>>, +} + +impl MergedReverseRecordReader { + pub fn try_new(files: &[PathBuf]) -> Self { + let mut readers = Vec::with_capacity(files.len()); + for file in files { + let Ok(reader) = get_reverse_reader(File::open(file).unwrap()) else { + error!("Invalid file detected, ignoring it: {:?}", file); + continue; + }; + + readers.push(reader); + } + + Self { readers } + } + + pub fn merged_iter( + self, + schema: Arc, + time_partition: Option, + ) -> impl Iterator { + let adapted_readers = self.readers.into_iter().map(|reader| reader.flatten()); + kmerge_by(adapted_readers, move |a: &RecordBatch, b: &RecordBatch| { + // Capture time_partition by value + let a_time = get_timestamp_millis(a, time_partition.clone()); + let b_time = get_timestamp_millis(b, time_partition.clone()); + a_time > b_time + }) + .map(|batch| reverse(&batch)) + .map(move |batch| adapt_batch(&schema, &batch)) + } + + pub fn merged_schema(&self) -> Schema { + Schema::try_merge( + self.readers + .iter() + .map(|reader| reader.schema().as_ref().clone()), + ) + .unwrap() + } +} + +fn get_timestamp_millis(batch: &RecordBatch, time_partition: Option) -> i64 { + match time_partition { + Some(time_partition) => { + let time_partition = time_partition.as_str(); + match batch.column_by_name(time_partition) { + Some(column) => column + .as_any() + .downcast_ref::() + .unwrap() + .value(0), + None => get_default_timestamp_millis(batch), + } + } + None => get_default_timestamp_millis(batch), + } +} +fn get_default_timestamp_millis(batch: &RecordBatch) -> i64 { + match batch + .column(0) + .as_any() + .downcast_ref::() + { + // Ideally we expect the first column to be a timestamp (because we add the timestamp column first in the writer) + Some(array) => array.value(0), + // In case the first column is not a timestamp, we fallback to look for default timestamp column across all columns + None => batch + .column_by_name(DEFAULT_TIMESTAMP_KEY) + .unwrap() + .as_any() + .downcast_ref::() + .unwrap() + .value(0), + } +} /// OffsetReader takes in a reader and list of offset and sizes and /// provides a reader over the file by reading only the offsets @@ -142,16 +273,6 @@ pub fn get_reverse_reader( Ok(StreamReader::try_new(BufReader::new(OffsetReader::new(reader, messages)), None).unwrap()) } -pub fn reverse(rb: &RecordBatch) -> RecordBatch { - let indices = UInt64Array::from_iter_values((0..rb.num_rows()).rev().map(|x| x as u64)); - let arrays = rb - .columns() - .iter() - .map(|col| take(&col, &indices, None).unwrap()) - .collect(); - RecordBatch::try_new(rb.schema(), arrays).unwrap() -} - // return limit for fn find_limit_and_type( reader: &mut (impl Read + Seek), diff --git a/src/staging/streams.rs b/src/staging/streams.rs new file mode 100644 index 000000000..967f6d682 --- /dev/null +++ b/src/staging/streams.rs @@ -0,0 +1,772 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * + */ + +use std::{ + collections::HashMap, + fs::{remove_file, OpenOptions}, + path::{Path, PathBuf}, + process, + sync::{Arc, Mutex, RwLock}, +}; + +use arrow_array::RecordBatch; +use arrow_ipc::writer::StreamWriter; +use arrow_schema::Schema; +use chrono::{NaiveDateTime, Timelike, Utc}; +use derive_more::{Deref, DerefMut}; +use itertools::Itertools; +use parquet::{ + arrow::ArrowWriter, + basic::Encoding, + file::properties::{WriterProperties, WriterPropertiesBuilder}, + format::SortingColumn, + schema::types::ColumnPath, +}; +use rand::distributions::DistString; +use tracing::error; + +use crate::{ + cli::Options, + event::DEFAULT_TIMESTAMP_KEY, + handlers::http::modal::ingest_server::INGESTOR_META, + metrics, + option::{Mode, CONFIG}, + storage::{StreamType, OBJECT_STORE_DATA_GRANULARITY}, + utils::minute_to_slot, +}; + +use super::{ + reader::{MergedRecordReader, MergedReverseRecordReader}, + writer::Writer, + StagingError, +}; + +const ARROW_FILE_EXTENSION: &str = "data.arrows"; + +pub type StreamRef<'a> = Arc>; + +/// State of staging associated with a single stream of data in parseable. +pub struct Stream<'a> { + pub stream_name: String, + pub data_path: PathBuf, + pub options: &'a Options, + pub writer: Mutex, +} + +impl<'a> Stream<'a> { + pub fn new(options: &'a Options, stream_name: impl Into) -> StreamRef<'a> { + let stream_name = stream_name.into(); + let data_path = options.local_stream_data_path(&stream_name); + + Arc::new(Self { + stream_name, + data_path, + options, + writer: Mutex::new(Writer::default()), + }) + } + + // Concatenates record batches and puts them in memory store for each event. + pub fn push( + &self, + schema_key: &str, + record: &RecordBatch, + parsed_timestamp: NaiveDateTime, + custom_partition_values: &HashMap, + stream_type: StreamType, + ) -> Result<(), StagingError> { + let mut guard = self.writer.lock().unwrap(); + if self.options.mode != Mode::Query || stream_type == StreamType::Internal { + match guard.disk.get_mut(schema_key) { + Some(writer) => { + writer.write(record)?; + } + None => { + // entry is not present thus we create it + let file_path = self.path_by_current_time( + schema_key, + parsed_timestamp, + custom_partition_values, + ); + std::fs::create_dir_all(&self.data_path)?; + + let file = OpenOptions::new() + .create(true) + .append(true) + .open(&file_path)?; + + let mut writer = StreamWriter::try_new(file, &record.schema()) + .expect("File and RecordBatch both are checked"); + + writer.write(record)?; + guard.disk.insert(schema_key.to_owned(), writer); + } + }; + } + + guard.mem.push(schema_key, record); + + Ok(()) + } + + pub fn path_by_current_time( + &self, + stream_hash: &str, + parsed_timestamp: NaiveDateTime, + custom_partition_values: &HashMap, + ) -> PathBuf { + let mut hostname = hostname::get().unwrap().into_string().unwrap(); + if self.options.mode == Mode::Ingest { + hostname.push_str(&INGESTOR_META.get_ingestor_id()); + } + let filename = format!( + "{}{stream_hash}.date={}.hour={:02}.minute={}.{}{hostname}.{ARROW_FILE_EXTENSION}", + Utc::now().format("%Y%m%dT%H%M"), + parsed_timestamp.date(), + parsed_timestamp.hour(), + minute_to_slot(parsed_timestamp.minute(), OBJECT_STORE_DATA_GRANULARITY).unwrap(), + custom_partition_values + .iter() + .sorted_by_key(|v| v.0) + .map(|(key, value)| format!("{key}={value}.")) + .join("") + ); + self.data_path.join(filename) + } + + pub fn arrow_files(&self) -> Vec { + let Ok(dir) = self.data_path.read_dir() else { + return vec![]; + }; + + let paths = dir + .flatten() + .map(|file| file.path()) + .filter(|file| file.extension().is_some_and(|ext| ext.eq("arrows"))) + .sorted_by_key(|f| f.metadata().unwrap().modified().unwrap()) + .collect(); + + paths + } + + pub fn arrow_files_grouped_exclude_time( + &self, + exclude: NaiveDateTime, + shutdown_signal: bool, + ) -> HashMap> { + let mut grouped_arrow_file: HashMap> = HashMap::new(); + let mut arrow_files = self.arrow_files(); + + if !shutdown_signal { + arrow_files.retain(|path| { + !path + .file_name() + .unwrap() + .to_str() + .unwrap() + .starts_with(&exclude.format("%Y%m%dT%H%M").to_string()) + }); + } + + let random_string = + rand::distributions::Alphanumeric.sample_string(&mut rand::thread_rng(), 15); + for arrow_file_path in arrow_files { + if arrow_file_path.metadata().unwrap().len() == 0 { + error!( + "Invalid arrow file {:?} detected for stream {}, removing it", + &arrow_file_path, self.stream_name + ); + remove_file(&arrow_file_path).unwrap(); + } else { + let key = Self::arrow_path_to_parquet(&arrow_file_path, &random_string); + grouped_arrow_file + .entry(key) + .or_default() + .push(arrow_file_path); + } + } + grouped_arrow_file + } + + pub fn parquet_files(&self) -> Vec { + let Ok(dir) = self.data_path.read_dir() else { + return vec![]; + }; + + dir.flatten() + .map(|file| file.path()) + .filter(|file| file.extension().is_some_and(|ext| ext.eq("parquet"))) + .collect() + } + + fn arrow_path_to_parquet(path: &Path, random_string: &str) -> PathBuf { + let filename = path.file_stem().unwrap().to_str().unwrap(); + let (_, filename) = filename.split_once('.').unwrap(); + assert!(filename.contains('.'), "contains the delim `.`"); + let filename_with_random_number = format!("{filename}.{random_string}.arrows"); + let mut parquet_path = path.to_owned(); + parquet_path.set_file_name(filename_with_random_number); + parquet_path.set_extension("parquet"); + parquet_path + } + + pub fn recordbatches_cloned(&self, schema: &Arc) -> Vec { + self.writer.lock().unwrap().mem.recordbatch_cloned(schema) + } + + pub fn clear(&self) { + self.writer.lock().unwrap().mem.clear(); + } + + fn flush(&self) { + let mut disk_writers = { + let mut writer = self.writer.lock().unwrap(); + // Flush memory + writer.mem.clear(); + // Take schema -> disk writer mapping + std::mem::take(&mut writer.disk) + }; + + // Flush disk + for writer in disk_writers.values_mut() { + _ = writer.finish(); + } + } + + pub fn convert_disk_files_to_parquet( + &self, + time_partition: Option<&String>, + custom_partition: Option<&String>, + shutdown_signal: bool, + ) -> Result, StagingError> { + let mut schemas = Vec::new(); + + let time = chrono::Utc::now().naive_utc(); + let staging_files = self.arrow_files_grouped_exclude_time(time, shutdown_signal); + if staging_files.is_empty() { + metrics::STAGING_FILES + .with_label_values(&[&self.stream_name]) + .set(0); + metrics::STORAGE_SIZE + .with_label_values(&["staging", &self.stream_name, "arrows"]) + .set(0); + metrics::STORAGE_SIZE + .with_label_values(&["staging", &self.stream_name, "parquet"]) + .set(0); + } + + // warn!("staging files-\n{staging_files:?}\n"); + for (parquet_path, files) in staging_files { + metrics::STAGING_FILES + .with_label_values(&[&self.stream_name]) + .set(files.len() as i64); + + for file in &files { + let file_size = file.metadata().unwrap().len(); + let file_type = file.extension().unwrap().to_str().unwrap(); + + metrics::STORAGE_SIZE + .with_label_values(&["staging", &self.stream_name, file_type]) + .add(file_size as i64); + } + + let record_reader = MergedReverseRecordReader::try_new(&files); + if record_reader.readers.is_empty() { + continue; + } + let merged_schema = record_reader.merged_schema(); + + let props = parquet_writer_props( + self.options, + &merged_schema, + time_partition, + custom_partition, + ) + .build(); + schemas.push(merged_schema.clone()); + let schema = Arc::new(merged_schema); + let parquet_file = OpenOptions::new() + .create(true) + .append(true) + .open(&parquet_path) + .map_err(|_| StagingError::Create)?; + let mut writer = ArrowWriter::try_new(&parquet_file, schema.clone(), Some(props))?; + for ref record in record_reader.merged_iter(schema, time_partition.cloned()) { + writer.write(record)?; + } + + writer.close()?; + if parquet_file.metadata().unwrap().len() < parquet::file::FOOTER_SIZE as u64 { + error!( + "Invalid parquet file {:?} detected for stream {}, removing it", + &parquet_path, &self.stream_name + ); + remove_file(parquet_path).unwrap(); + } else { + for file in files { + // warn!("file-\n{file:?}\n"); + let file_size = file.metadata().unwrap().len(); + let file_type = file.extension().unwrap().to_str().unwrap(); + if remove_file(file.clone()).is_err() { + error!("Failed to delete file. Unstable state"); + process::abort() + } + metrics::STORAGE_SIZE + .with_label_values(&["staging", &self.stream_name, file_type]) + .sub(file_size as i64); + } + } + } + + if schemas.is_empty() { + return Ok(None); + } + + Ok(Some(Schema::try_merge(schemas).unwrap())) + } + + pub fn updated_schema(&self, current_schema: Schema) -> Schema { + let staging_files = self.arrow_files(); + let record_reader = MergedRecordReader::try_new(&staging_files).unwrap(); + if record_reader.readers.is_empty() { + return current_schema; + } + + let schema = record_reader.merged_schema(); + + Schema::try_merge(vec![schema, current_schema]).unwrap() + } +} + +fn parquet_writer_props( + options: &Options, + merged_schema: &Schema, + time_partition: Option<&String>, + custom_partition: Option<&String>, +) -> WriterPropertiesBuilder { + // Determine time partition field + let time_partition_field = time_partition.map_or(DEFAULT_TIMESTAMP_KEY, |tp| tp.as_str()); + + // Find time partition index + let time_partition_idx = merged_schema.index_of(time_partition_field).unwrap_or(0); + + let mut props = WriterProperties::builder() + .set_max_row_group_size(options.row_group_size) + .set_compression(options.parquet_compression.into()) + .set_column_encoding( + ColumnPath::new(vec![time_partition_field.to_string()]), + Encoding::DELTA_BINARY_PACKED, + ); + + // Create sorting columns + let mut sorting_column_vec = vec![SortingColumn { + column_idx: time_partition_idx as i32, + descending: true, + nulls_first: true, + }]; + + // Describe custom partition column encodings and sorting + if let Some(custom_partition) = custom_partition { + for partition in custom_partition.split(',') { + if let Ok(idx) = merged_schema.index_of(partition) { + let column_path = ColumnPath::new(vec![partition.to_string()]); + props = props.set_column_encoding(column_path, Encoding::DELTA_BYTE_ARRAY); + + sorting_column_vec.push(SortingColumn { + column_idx: idx as i32, + descending: true, + nulls_first: true, + }); + } + } + } + + // Set sorting columns + props.set_sorting_columns(Some(sorting_column_vec)) +} + +#[derive(Deref, DerefMut, Default)] +pub struct Streams(RwLock>>); + +impl Streams { + /// Try to get the handle of a stream in staging, if it doesn't exist return `None`. + pub fn get_stream(&self, stream_name: &str) -> Option> { + self.read().unwrap().get(stream_name).cloned() + } + + /// Get the handle to a stream in staging, create one if it doesn't exist + pub fn get_or_create_stream(&self, stream_name: &str) -> StreamRef<'static> { + if let Some(staging) = self.get_stream(stream_name) { + return staging; + } + + let staging = Stream::new(&CONFIG.options, stream_name); + + // Gets write privileges only for creating the stream when it doesn't already exist. + self.write() + .unwrap() + .insert(stream_name.to_owned(), staging.clone()); + + staging + } + + pub fn clear(&self, stream_name: &str) { + if let Some(stream) = self.write().unwrap().get(stream_name) { + stream.clear(); + } + } + + pub fn delete_stream(&self, stream_name: &str) { + self.write().unwrap().remove(stream_name); + } + + pub fn flush_all(&self) { + let streams = self.read().unwrap(); + + for staging in streams.values() { + staging.flush() + } + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use arrow_array::{Int32Array, StringArray, TimestampMillisecondArray}; + use arrow_schema::{DataType, Field, TimeUnit}; + use chrono::{NaiveDate, TimeDelta}; + use temp_dir::TempDir; + use tokio::time::sleep; + + use super::*; + + #[test] + fn test_staging_new_with_valid_stream() { + let stream_name = "test_stream"; + + let options = Options::default(); + let staging = Stream::new(&options, stream_name); + + assert_eq!( + staging.data_path, + options.local_stream_data_path(stream_name) + ); + } + + #[test] + fn test_staging_with_special_characters() { + let stream_name = "test_stream_!@#$%^&*()"; + + let options = Options::default(); + let staging = Stream::new(&options, stream_name); + + assert_eq!( + staging.data_path, + options.local_stream_data_path(stream_name) + ); + } + + #[test] + fn test_staging_data_path_initialization() { + let stream_name = "example_stream"; + + let options = Options::default(); + let staging = Stream::new(&options, stream_name); + + assert_eq!( + staging.data_path, + options.local_stream_data_path(stream_name) + ); + } + + #[test] + fn test_staging_with_alphanumeric_stream_name() { + let stream_name = "test123stream"; + + let options = Options::default(); + let staging = Stream::new(&options, stream_name); + + assert_eq!( + staging.data_path, + options.local_stream_data_path(stream_name) + ); + } + + #[test] + fn test_arrow_files_empty_directory() { + let temp_dir = TempDir::new().unwrap(); + + let options = Options { + local_staging_path: temp_dir.path().to_path_buf(), + ..Default::default() + }; + let staging = Stream::new(&options, "test_stream"); + + let files = staging.arrow_files(); + + assert!(files.is_empty()); + } + + #[test] + fn generate_correct_path_with_current_time_and_no_custom_partitioning() { + let stream_name = "test_stream"; + let stream_hash = "abc123"; + let parsed_timestamp = NaiveDate::from_ymd_opt(2023, 10, 1) + .unwrap() + .and_hms_opt(12, 30, 0) + .unwrap(); + let custom_partition_values = HashMap::new(); + + let options = Options::default(); + let staging = Stream::new(&options, stream_name); + + let expected_path = staging.data_path.join(format!( + "{}{stream_hash}.date={}.hour={:02}.minute={}.{}.{ARROW_FILE_EXTENSION}", + Utc::now().format("%Y%m%dT%H%M"), + parsed_timestamp.date(), + parsed_timestamp.hour(), + minute_to_slot(parsed_timestamp.minute(), OBJECT_STORE_DATA_GRANULARITY).unwrap(), + hostname::get().unwrap().into_string().unwrap() + )); + + let generated_path = + staging.path_by_current_time(stream_hash, parsed_timestamp, &custom_partition_values); + + assert_eq!(generated_path, expected_path); + } + + #[test] + fn generate_correct_path_with_current_time_and_custom_partitioning() { + let stream_name = "test_stream"; + let stream_hash = "abc123"; + let parsed_timestamp = NaiveDate::from_ymd_opt(2023, 10, 1) + .unwrap() + .and_hms_opt(12, 30, 0) + .unwrap(); + let mut custom_partition_values = HashMap::new(); + custom_partition_values.insert("key1".to_string(), "value1".to_string()); + custom_partition_values.insert("key2".to_string(), "value2".to_string()); + + let options = Options::default(); + let staging = Stream::new(&options, stream_name); + + let expected_path = staging.data_path.join(format!( + "{}{stream_hash}.date={}.hour={:02}.minute={}.key1=value1.key2=value2.{}.{ARROW_FILE_EXTENSION}", + Utc::now().format("%Y%m%dT%H%M"), + parsed_timestamp.date(), + parsed_timestamp.hour(), + minute_to_slot(parsed_timestamp.minute(), OBJECT_STORE_DATA_GRANULARITY).unwrap(), + hostname::get().unwrap().into_string().unwrap() + )); + + let generated_path = + staging.path_by_current_time(stream_hash, parsed_timestamp, &custom_partition_values); + + assert_eq!(generated_path, expected_path); + } + + #[test] + fn test_convert_to_parquet_with_empty_staging() -> Result<(), StagingError> { + let temp_dir = TempDir::new()?; + let options = Options { + local_staging_path: temp_dir.path().to_path_buf(), + ..Default::default() + }; + let stream = "test_stream".to_string(); + let result = + Stream::new(&options, &stream).convert_disk_files_to_parquet(None, None, false)?; + assert!(result.is_none()); + // Verify metrics were set to 0 + let staging_files = metrics::STAGING_FILES.with_label_values(&[&stream]).get(); + assert_eq!(staging_files, 0); + let storage_size_arrows = metrics::STORAGE_SIZE + .with_label_values(&["staging", &stream, "arrows"]) + .get(); + assert_eq!(storage_size_arrows, 0); + let storage_size_parquet = metrics::STORAGE_SIZE + .with_label_values(&["staging", &stream, "parquet"]) + .get(); + assert_eq!(storage_size_parquet, 0); + Ok(()) + } + + fn write_log(staging: &StreamRef, schema: &Schema, mins: i64) { + let time: NaiveDateTime = Utc::now() + .checked_sub_signed(TimeDelta::minutes(mins)) + .unwrap() + .naive_utc(); + let batch = RecordBatch::try_new( + Arc::new(schema.clone()), + vec![ + Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3])), + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(StringArray::from(vec!["a", "b", "c"])), + ], + ) + .unwrap(); + staging + .push( + "abc", + &batch, + time, + &HashMap::new(), + StreamType::UserDefined, + ) + .unwrap(); + staging.flush(); + } + + #[test] + fn different_minutes_multiple_arrow_files_to_parquet() { + let temp_dir = TempDir::new().unwrap(); + let stream_name = "test_stream"; + let options = Options { + local_staging_path: temp_dir.path().to_path_buf(), + row_group_size: 1048576, + ..Default::default() + }; + let staging = Stream::new(&options, stream_name); + + // Create test arrow files + let schema = Schema::new(vec![ + Field::new( + DEFAULT_TIMESTAMP_KEY, + DataType::Timestamp(TimeUnit::Millisecond, None), + false, + ), + Field::new("id", DataType::Int32, false), + Field::new("value", DataType::Utf8, false), + ]); + + for i in 0..3 { + write_log(&staging, &schema, i); + } + // verify the arrow files exist in staging + assert_eq!(staging.arrow_files().len(), 3); + drop(staging); + + // Start with a fresh staging + let staging = Stream::new(&options, stream_name); + let result = staging + .convert_disk_files_to_parquet(None, None, true) + .unwrap(); + + assert!(result.is_some()); + let result_schema = result.unwrap(); + assert_eq!(result_schema.fields().len(), 3); + + // Verify parquet files were created and the arrow files deleted + assert_eq!(staging.parquet_files().len(), 3); + assert_eq!(staging.arrow_files().len(), 0); + } + + #[test] + fn same_minute_multiple_arrow_files_to_parquet() { + let temp_dir = TempDir::new().unwrap(); + let stream_name = "test_stream"; + let options = Options { + local_staging_path: temp_dir.path().to_path_buf(), + row_group_size: 1048576, + ..Default::default() + }; + let staging: Arc> = Stream::new(&options, stream_name); + + // Create test arrow files + let schema = Schema::new(vec![ + Field::new( + DEFAULT_TIMESTAMP_KEY, + DataType::Timestamp(TimeUnit::Millisecond, None), + false, + ), + Field::new("id", DataType::Int32, false), + Field::new("value", DataType::Utf8, false), + ]); + + for _ in 0..3 { + write_log(&staging, &schema, 0); + } + // verify the arrow files exist in staging + assert_eq!(staging.arrow_files().len(), 1); + drop(staging); + + // Start with a fresh staging + let staging = Stream::new(&options, stream_name); + let result = staging + .convert_disk_files_to_parquet(None, None, true) + .unwrap(); + + assert!(result.is_some()); + let result_schema = result.unwrap(); + assert_eq!(result_schema.fields().len(), 3); + + // Verify parquet files were created and the arrow files deleted + assert_eq!(staging.parquet_files().len(), 1); + assert_eq!(staging.arrow_files().len(), 0); + } + + #[tokio::test] + async fn miss_current_arrow_file_when_converting_to_parquet() { + let temp_dir = TempDir::new().unwrap(); + let stream_name = "test_stream"; + let options = Options { + local_staging_path: temp_dir.path().to_path_buf(), + row_group_size: 1048576, + ..Default::default() + }; + let staging = Stream::new(&options, stream_name); + + // Create test arrow files + let schema = Schema::new(vec![ + Field::new( + DEFAULT_TIMESTAMP_KEY, + DataType::Timestamp(TimeUnit::Millisecond, None), + false, + ), + Field::new("id", DataType::Int32, false), + Field::new("value", DataType::Utf8, false), + ]); + + // 2 logs in the previous minutes + for i in 0..2 { + write_log(&staging, &schema, i); + } + sleep(Duration::from_secs(60)).await; + + write_log(&staging, &schema, 0); + + // verify the arrow files exist in staging + assert_eq!(staging.arrow_files().len(), 3); + drop(staging); + + // Start with a fresh staging + let staging = Stream::new(&options, stream_name); + let result = staging + .convert_disk_files_to_parquet(None, None, false) + .unwrap(); + + assert!(result.is_some()); + let result_schema = result.unwrap(); + assert_eq!(result_schema.fields().len(), 3); + + // Verify parquet files were created and the arrow file left + assert_eq!(staging.parquet_files().len(), 2); + assert_eq!(staging.arrow_files().len(), 1); + } +} diff --git a/src/event/writer/mem_writer.rs b/src/staging/writer.rs similarity index 90% rename from src/event/writer/mem_writer.rs rename to src/staging/writer.rs index d24077333..c43252f14 100644 --- a/src/event/writer/mem_writer.rs +++ b/src/staging/writer.rs @@ -14,17 +14,29 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * + * */ -use std::{collections::HashSet, sync::Arc}; +use std::{ + collections::{HashMap, HashSet}, + fs::File, + sync::Arc, +}; use arrow_array::RecordBatch; +use arrow_ipc::writer::StreamWriter; use arrow_schema::Schema; use arrow_select::concat::concat_batches; use itertools::Itertools; use crate::utils::arrow::adapt_batch; +#[derive(Default)] +pub struct Writer { + pub mem: MemWriter<16384>, + pub disk: HashMap>, +} + /// Structure to keep recordbatches in memory. /// /// Any new schema is updated in the schema map. @@ -67,12 +79,11 @@ impl MemWriter { self.schema_map.clear(); self.read_buffer.clear(); self.mutable_buffer.inner.clear(); - self.mutable_buffer.rows = 0; } pub fn recordbatch_cloned(&self, schema: &Arc) -> Vec { let mut read_buffer = self.read_buffer.clone(); - if self.mutable_buffer.rows > 0 { + if !self.mutable_buffer.inner.is_empty() { let rb = concat_records(schema, &self.mutable_buffer.inner); read_buffer.push(rb) } @@ -93,13 +104,12 @@ fn concat_records(schema: &Arc, record: &[RecordBatch]) -> RecordBatch { #[derive(Debug, Default)] pub struct MutableBuffer { pub inner: Vec, - pub rows: usize, } impl MutableBuffer { fn push(&mut self, rb: &RecordBatch) -> Option> { - if self.rows + rb.num_rows() >= N { - let left = N - self.rows; + if self.inner.len() + rb.num_rows() >= N { + let left = N - self.inner.len(); let right = rb.num_rows() - left; let left_slice = rb.slice(0, left); let right_slice = if left < rb.num_rows() { @@ -111,16 +121,13 @@ impl MutableBuffer { // take all records let src = Vec::with_capacity(self.inner.len()); let inner = std::mem::replace(&mut self.inner, src); - self.rows = 0; if let Some(right_slice) = right_slice { - self.rows = right_slice.num_rows(); self.inner.push(right_slice); } Some(inner) } else { - self.rows += rb.num_rows(); self.inner.push(rb.clone()); None } diff --git a/src/storage/mod.rs b/src/storage/mod.rs index b33cc7b7b..75b3e736f 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -35,11 +35,9 @@ mod metrics_layer; pub(crate) mod object_storage; pub mod retention; mod s3; -pub mod staging; mod store_metadata; use self::retention::Retention; -pub use self::staging::StorageDir; pub use azure_blob::AzureBlobConfig; pub use localfs::FSConfig; pub use object_storage::{ObjectStorage, ObjectStorageProvider}; diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 4dc8603df..aaa628364 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -17,11 +17,10 @@ */ use super::{ - retention::Retention, staging::convert_disk_files_to_parquet, ObjectStorageError, - ObjectStoreFormat, Permisssion, StorageDir, StorageMetadata, -}; -use super::{ - LogStream, Owner, StreamType, ALERTS_ROOT_DIRECTORY, MANIFEST_FILE, PARSEABLE_METADATA_FILE_NAME, PARSEABLE_ROOT_DIRECTORY, SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY + retention::Retention, LogStream, ObjectStorageError, ObjectStoreFormat, Owner, Permisssion, + StorageMetadata, StreamType, ALERTS_ROOT_DIRECTORY, MANIFEST_FILE, + PARSEABLE_METADATA_FILE_NAME, PARSEABLE_ROOT_DIRECTORY, SCHEMA_FILE_NAME, + STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY, }; use crate::alerts::AlertConfig; @@ -31,6 +30,7 @@ use crate::handlers::http::users::{DASHBOARDS_DIR, FILTER_DIR, USERS_ROOT_DIR}; use crate::metadata::SchemaVersion; use crate::metrics::{EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_STORAGE_SIZE}; use crate::option::Mode; +use crate::staging::STAGING; use crate::{ catalog::{self, manifest::Manifest, snapshot::Snapshot}, metadata::STREAM_INFO, @@ -588,15 +588,14 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { let custom_partition = STREAM_INFO .get_custom_partition(stream) .map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?; - let dir = StorageDir::new(stream); - let schema = convert_disk_files_to_parquet( - stream, - &dir, - time_partition, - custom_partition.clone(), - shutdown_signal, - ) - .map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?; + let staging = STAGING.get_or_create_stream(stream); + let schema = staging + .convert_disk_files_to_parquet( + time_partition.as_ref(), + custom_partition.as_ref(), + shutdown_signal, + ) + .map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?; if let Some(schema) = schema { let static_schema_flag = STREAM_INFO @@ -607,8 +606,7 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { } } - let parquet_files = dir.parquet_files(); - for file in parquet_files { + for file in staging.parquet_files() { let filename = file .file_name() .expect("only parquet files are returned by iterator") diff --git a/src/storage/staging.rs b/src/storage/staging.rs deleted file mode 100644 index aab6603d9..000000000 --- a/src/storage/staging.rs +++ /dev/null @@ -1,461 +0,0 @@ -/* - * Parseable Server (C) 2022 - 2024 Parseable, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - * - */ - -use crate::{ - event::DEFAULT_TIMESTAMP_KEY, - handlers::http::modal::{ingest_server::INGESTOR_META, IngestorMetadata, DEFAULT_VERSION}, - metrics, - option::{Mode, CONFIG}, - storage::OBJECT_STORE_DATA_GRANULARITY, - utils::{ - self, arrow::merged_reader::MergedReverseRecordReader, get_ingestor_id, get_url, - hostname_unchecked, - }, -}; -use anyhow::anyhow; -use arrow_schema::{ArrowError, Schema}; -use base64::Engine; -use chrono::{NaiveDateTime, Timelike, Utc}; -use itertools::Itertools; -use parquet::{ - arrow::ArrowWriter, - basic::Encoding, - errors::ParquetError, - file::properties::{WriterProperties, WriterPropertiesBuilder}, - format::SortingColumn, - schema::types::ColumnPath, -}; -use rand::distributions::DistString; -use serde_json::Value as JsonValue; -use std::{ - collections::HashMap, - fs, - path::{Path, PathBuf}, - process, - sync::Arc, -}; -use tracing::{error, info}; - -const ARROW_FILE_EXTENSION: &str = "data.arrows"; -// const PARQUET_FILE_EXTENSION: &str = "data.parquet"; - -#[derive(Debug)] -pub struct StorageDir { - pub data_path: PathBuf, -} - -impl StorageDir { - pub fn new(stream_name: &str) -> Self { - let data_path = CONFIG.options.local_stream_data_path(stream_name); - - Self { data_path } - } - - pub fn file_time_suffix( - time: NaiveDateTime, - custom_partition_values: &HashMap, - extention: &str, - ) -> String { - let mut uri = utils::date_to_prefix(time.date()) - + &utils::hour_to_prefix(time.hour()) - + &utils::minute_to_prefix(time.minute(), OBJECT_STORE_DATA_GRANULARITY).unwrap(); - if !custom_partition_values.is_empty() { - uri = uri + &utils::custom_partition_to_prefix(custom_partition_values); - } - let local_uri = str::replace(&uri, "/", "."); - let hostname = hostname_unchecked(); - if CONFIG.options.mode == Mode::Ingest { - let id = INGESTOR_META.get_ingestor_id(); - format!("{local_uri}{hostname}{id}.{extention}") - } else { - format!("{local_uri}{hostname}.{extention}") - } - } - - fn filename_by_time( - stream_hash: &str, - time: NaiveDateTime, - custom_partition_values: &HashMap, - ) -> String { - format!( - "{}.{}", - stream_hash, - Self::file_time_suffix(time, custom_partition_values, ARROW_FILE_EXTENSION) - ) - } - - fn filename_by_current_time( - stream_hash: &str, - parsed_timestamp: NaiveDateTime, - custom_partition_values: &HashMap, - ) -> String { - Self::filename_by_time(stream_hash, parsed_timestamp, custom_partition_values) - } - - pub fn path_by_current_time( - &self, - stream_hash: &str, - parsed_timestamp: NaiveDateTime, - custom_partition_values: &HashMap, - ) -> PathBuf { - let server_time_in_min = Utc::now().format("%Y%m%dT%H%M").to_string(); - let mut filename = - Self::filename_by_current_time(stream_hash, parsed_timestamp, custom_partition_values); - filename = format!("{}{}", server_time_in_min, filename); - self.data_path.join(filename) - } - - pub fn arrow_files(&self) -> Vec { - let Ok(dir) = self.data_path.read_dir() else { - return vec![]; - }; - - let paths = dir - .flatten() - .map(|file| file.path()) - .filter(|file| file.extension().is_some_and(|ext| ext.eq("arrows"))) - .sorted_by_key(|f| f.metadata().unwrap().modified().unwrap()) - .collect(); - - paths - } - - pub fn arrow_files_grouped_exclude_time( - &self, - exclude: NaiveDateTime, - stream: &str, - shutdown_signal: bool, - ) -> HashMap> { - let mut grouped_arrow_file: HashMap> = HashMap::new(); - let mut arrow_files = self.arrow_files(); - - if !shutdown_signal { - arrow_files.retain(|path| { - !path - .file_name() - .unwrap() - .to_str() - .unwrap() - .starts_with(&exclude.format("%Y%m%dT%H%M").to_string()) - }); - } - - let random_string = - rand::distributions::Alphanumeric.sample_string(&mut rand::thread_rng(), 15); - for arrow_file_path in arrow_files { - if arrow_file_path.metadata().unwrap().len() == 0 { - error!( - "Invalid arrow file {:?} detected for stream {}, removing it", - &arrow_file_path, stream - ); - fs::remove_file(&arrow_file_path).unwrap(); - } else { - let key = Self::arrow_path_to_parquet(&arrow_file_path, random_string.clone()); - grouped_arrow_file - .entry(key) - .or_default() - .push(arrow_file_path); - } - } - grouped_arrow_file - } - - pub fn parquet_files(&self) -> Vec { - let Ok(dir) = self.data_path.read_dir() else { - return vec![]; - }; - - dir.flatten() - .map(|file| file.path()) - .filter(|file| file.extension().is_some_and(|ext| ext.eq("parquet"))) - .collect() - } - - fn arrow_path_to_parquet(path: &Path, random_string: String) -> PathBuf { - let filename = path.file_stem().unwrap().to_str().unwrap(); - let (_, filename) = filename.split_once('.').unwrap(); - let filename = filename.rsplit_once('.').expect("contains the delim `.`"); - let filename = format!("{}.{}", filename.0, filename.1); - let filename_with_random_number = format!("{}.{}.{}", filename, random_string, "arrows"); - let mut parquet_path = path.to_owned(); - parquet_path.set_file_name(filename_with_random_number); - parquet_path.set_extension("parquet"); - parquet_path - } -} - -// pub fn to_parquet_path(stream_name: &str, time: NaiveDateTime) -> PathBuf { -// let data_path = CONFIG.options.local_stream_data_path(stream_name); -// let dir = StorageDir::file_time_suffix(time, &HashMap::new(), PARQUET_FILE_EXTENSION); -// -// data_path.join(dir) -// } - -pub fn convert_disk_files_to_parquet( - stream: &str, - dir: &StorageDir, - time_partition: Option, - custom_partition: Option, - shutdown_signal: bool, -) -> Result, MoveDataError> { - let mut schemas = Vec::new(); - - let time = chrono::Utc::now().naive_utc(); - let staging_files = dir.arrow_files_grouped_exclude_time(time, stream, shutdown_signal); - if staging_files.is_empty() { - metrics::STAGING_FILES.with_label_values(&[stream]).set(0); - metrics::STORAGE_SIZE - .with_label_values(&["staging", stream, "arrows"]) - .set(0); - metrics::STORAGE_SIZE - .with_label_values(&["staging", stream, "parquet"]) - .set(0); - } - - // warn!("staging files-\n{staging_files:?}\n"); - for (parquet_path, files) in staging_files { - metrics::STAGING_FILES - .with_label_values(&[stream]) - .set(files.len() as i64); - - for file in &files { - let file_size = file.metadata().unwrap().len(); - let file_type = file.extension().unwrap().to_str().unwrap(); - - metrics::STORAGE_SIZE - .with_label_values(&["staging", stream, file_type]) - .add(file_size as i64); - } - - let record_reader = MergedReverseRecordReader::try_new(&files).unwrap(); - if record_reader.readers.is_empty() { - continue; - } - let merged_schema = record_reader.merged_schema(); - let mut index_time_partition: usize = 0; - if let Some(time_partition) = time_partition.as_ref() { - index_time_partition = merged_schema.index_of(time_partition).unwrap(); - } - let mut custom_partition_fields: HashMap = HashMap::new(); - if let Some(custom_partition) = custom_partition.as_ref() { - for custom_partition_field in custom_partition.split(',') { - let index = merged_schema.index_of(custom_partition_field).unwrap(); - custom_partition_fields.insert(custom_partition_field.to_string(), index); - } - } - let props = parquet_writer_props( - time_partition.clone(), - index_time_partition, - custom_partition_fields, - ) - .build(); - schemas.push(merged_schema.clone()); - let schema = Arc::new(merged_schema); - let parquet_file = fs::File::create(&parquet_path).map_err(|_| MoveDataError::Create)?; - let mut writer = ArrowWriter::try_new(&parquet_file, schema.clone(), Some(props))?; - for ref record in record_reader.merged_iter(schema, time_partition.clone()) { - writer.write(record)?; - } - - writer.close()?; - if parquet_file.metadata().unwrap().len() < parquet::file::FOOTER_SIZE as u64 { - error!( - "Invalid parquet file {:?} detected for stream {}, removing it", - &parquet_path, stream - ); - fs::remove_file(parquet_path).unwrap(); - } else { - for file in files { - // warn!("file-\n{file:?}\n"); - let file_size = file.metadata().unwrap().len(); - let file_type = file.extension().unwrap().to_str().unwrap(); - if fs::remove_file(file.clone()).is_err() { - error!("Failed to delete file. Unstable state"); - process::abort() - } - metrics::STORAGE_SIZE - .with_label_values(&["staging", stream, file_type]) - .sub(file_size as i64); - } - } - } - - if !schemas.is_empty() { - Ok(Some(Schema::try_merge(schemas).unwrap())) - } else { - Ok(None) - } -} - -pub fn parquet_writer_props( - time_partition: Option, - index_time_partition: usize, - custom_partition_fields: HashMap, -) -> WriterPropertiesBuilder { - let index_time_partition: i32 = index_time_partition as i32; - let mut time_partition_field = DEFAULT_TIMESTAMP_KEY.to_string(); - if let Some(time_partition) = time_partition { - time_partition_field = time_partition; - } - let mut sorting_column_vec: Vec = Vec::new(); - sorting_column_vec.push(SortingColumn { - column_idx: index_time_partition, - descending: true, - nulls_first: true, - }); - let mut props = WriterProperties::builder() - .set_max_row_group_size(CONFIG.options.row_group_size) - .set_compression(CONFIG.options.parquet_compression.into()) - .set_column_encoding( - ColumnPath::new(vec![time_partition_field]), - Encoding::DELTA_BINARY_PACKED, - ); - - for (field, index) in custom_partition_fields { - let field = ColumnPath::new(vec![field]); - let encoding = Encoding::DELTA_BYTE_ARRAY; - props = props.set_column_encoding(field, encoding); - let sorting_column = SortingColumn { - column_idx: index as i32, - descending: true, - nulls_first: true, - }; - sorting_column_vec.push(sorting_column); - } - props = props.set_sorting_columns(Some(sorting_column_vec)); - - props -} - -pub fn get_ingestor_info() -> anyhow::Result { - let path = PathBuf::from(&CONFIG.options.local_staging_path); - - // all the files should be in the staging directory root - let entries = std::fs::read_dir(path)?; - let url = get_url(); - let port = url.port().unwrap_or(80).to_string(); - let url = url.to_string(); - - for entry in entries { - // cause the staging directory will have only one file with ingestor in the name - // so the JSON Parse should not error unless the file is corrupted - let path = entry?.path(); - let flag = path - .file_name() - .unwrap_or_default() - .to_str() - .unwrap_or_default() - .contains("ingestor"); - - if flag { - // get the ingestor metadata from staging - let mut meta: JsonValue = serde_json::from_slice(&std::fs::read(path)?)?; - - // migrate the staging meta - let obj = meta - .as_object_mut() - .ok_or_else(|| anyhow!("Could Not parse Ingestor Metadata Json"))?; - - if obj.get("flight_port").is_none() { - obj.insert( - "flight_port".to_owned(), - JsonValue::String(CONFIG.options.flight_port.to_string()), - ); - } - - let mut meta: IngestorMetadata = serde_json::from_value(meta)?; - - // compare url endpoint and port - if meta.domain_name != url { - info!( - "Domain Name was Updated. Old: {} New: {}", - meta.domain_name, url - ); - meta.domain_name = url; - } - - if meta.port != port { - info!("Port was Updated. Old: {} New: {}", meta.port, port); - meta.port = port; - } - - let token = base64::prelude::BASE64_STANDARD.encode(format!( - "{}:{}", - CONFIG.options.username, CONFIG.options.password - )); - - let token = format!("Basic {}", token); - - if meta.token != token { - // TODO: Update the message to be more informative with username and password - info!( - "Credentials were Updated. Old: {} New: {}", - meta.token, token - ); - meta.token = token; - } - - put_ingestor_info(meta.clone())?; - return Ok(meta); - } - } - - let store = CONFIG.storage().get_object_store(); - let out = IngestorMetadata::new( - port, - url, - DEFAULT_VERSION.to_string(), - store.get_bucket_name(), - &CONFIG.options.username, - &CONFIG.options.password, - get_ingestor_id(), - CONFIG.options.flight_port.to_string(), - ); - - put_ingestor_info(out.clone())?; - Ok(out) -} - -/// Puts the ingestor info into the staging. -/// -/// This function takes the ingestor info as a parameter and stores it in staging. -/// # Parameters -/// -/// * `ingestor_info`: The ingestor info to be stored. -pub fn put_ingestor_info(info: IngestorMetadata) -> anyhow::Result<()> { - let path = PathBuf::from(&CONFIG.options.local_staging_path); - let file_name = format!("ingestor.{}.json", info.ingestor_id); - let file_path = path.join(file_name); - - std::fs::write(file_path, serde_json::to_vec(&info)?)?; - - Ok(()) -} - -#[derive(Debug, thiserror::Error)] -pub enum MoveDataError { - #[error("Unable to create recordbatch stream")] - Arrow(#[from] ArrowError), - #[error("Could not generate parquet file")] - Parquet(#[from] ParquetError), - #[error("IO Error {0}")] - ObjectStorage(#[from] std::io::Error), - #[error("Could not generate parquet file")] - Create, -} diff --git a/src/sync.rs b/src/sync.rs index d843e3a3a..24e8be765 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -25,6 +25,7 @@ use tracing::{error, info, warn}; use crate::alerts::{alerts_utils, AlertConfig, AlertError}; use crate::option::CONFIG; +use crate::staging::STAGING; use crate::{storage, STORAGE_UPLOAD_INTERVAL}; pub async fn object_store_sync() -> ( @@ -98,7 +99,7 @@ pub async fn run_local_sync() -> ( scheduler .every((storage::LOCAL_SYNC_INTERVAL as u32).seconds()) .run(|| async { - crate::event::STREAM_WRITERS.unset_all(); + STAGING.flush_all(); }); loop { diff --git a/src/utils/arrow/merged_reader.rs b/src/utils/arrow/merged_reader.rs deleted file mode 100644 index 3248bd37d..000000000 --- a/src/utils/arrow/merged_reader.rs +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Parseable Server (C) 2022 - 2024 Parseable, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - * - */ - -use arrow_array::{RecordBatch, TimestampMillisecondArray}; -use arrow_ipc::reader::StreamReader; -use arrow_schema::Schema; -use itertools::kmerge_by; -use std::{ - fs::{self, File}, - io::BufReader, - path::PathBuf, - sync::Arc, -}; -use tracing::error; - -use super::{ - adapt_batch, - reverse_reader::{reverse, OffsetReader}, -}; -use crate::{event::DEFAULT_TIMESTAMP_KEY, utils}; - -#[derive(Debug)] -pub struct MergedRecordReader { - pub readers: Vec>>, -} - -impl MergedRecordReader { - pub fn try_new(files: &[PathBuf]) -> Result { - let mut readers = Vec::with_capacity(files.len()); - - for file in files { - //remove empty files before reading - if file.metadata().unwrap().len() == 0 { - error!("Invalid file detected, removing it: {:?}", file); - fs::remove_file(file).unwrap(); - } else { - let Ok(reader) = - StreamReader::try_new(BufReader::new(File::open(file).unwrap()), None) - else { - error!("Invalid file detected, ignoring it: {:?}", file); - continue; - }; - - readers.push(reader); - } - } - - Ok(Self { readers }) - } - - pub fn merged_schema(&self) -> Schema { - Schema::try_merge( - self.readers - .iter() - .map(|reader| reader.schema().as_ref().clone()), - ) - .unwrap() - } -} - -#[derive(Debug)] -pub struct MergedReverseRecordReader { - pub readers: Vec>>>, -} - -impl MergedReverseRecordReader { - pub fn try_new(files: &[PathBuf]) -> Result { - let mut readers = Vec::with_capacity(files.len()); - for file in files { - let Ok(reader) = - utils::arrow::reverse_reader::get_reverse_reader(File::open(file).unwrap()) - else { - error!("Invalid file detected, ignoring it: {:?}", file); - continue; - }; - - readers.push(reader); - } - - Ok(Self { readers }) - } - - pub fn merged_iter( - self, - schema: Arc, - time_partition: Option, - ) -> impl Iterator { - let adapted_readers = self.readers.into_iter().map(|reader| reader.flatten()); - kmerge_by(adapted_readers, move |a: &RecordBatch, b: &RecordBatch| { - // Capture time_partition by value - let a_time = get_timestamp_millis(a, time_partition.clone()); - let b_time = get_timestamp_millis(b, time_partition.clone()); - a_time > b_time - }) - .map(|batch| reverse(&batch)) - .map(move |batch| adapt_batch(&schema, &batch)) - } - - pub fn merged_schema(&self) -> Schema { - Schema::try_merge( - self.readers - .iter() - .map(|reader| reader.schema().as_ref().clone()), - ) - .unwrap() - } -} - -fn get_timestamp_millis(batch: &RecordBatch, time_partition: Option) -> i64 { - match time_partition { - Some(time_partition) => { - let time_partition = time_partition.as_str(); - match batch.column_by_name(time_partition) { - Some(column) => column - .as_any() - .downcast_ref::() - .unwrap() - .value(0), - None => get_default_timestamp_millis(batch), - } - } - None => get_default_timestamp_millis(batch), - } -} -fn get_default_timestamp_millis(batch: &RecordBatch) -> i64 { - match batch - .column(0) - .as_any() - .downcast_ref::() - { - // Ideally we expect the first column to be a timestamp (because we add the timestamp column first in the writer) - Some(array) => array.value(0), - // In case the first column is not a timestamp, we fallback to look for default timestamp column across all columns - None => batch - .column_by_name(DEFAULT_TIMESTAMP_KEY) - .unwrap() - .as_any() - .downcast_ref::() - .unwrap() - .value(0), - } -} diff --git a/src/utils/arrow/mod.rs b/src/utils/arrow/mod.rs index 2cbdbf0a5..3cdc5193c 100644 --- a/src/utils/arrow/mod.rs +++ b/src/utils/arrow/mod.rs @@ -42,19 +42,17 @@ use std::sync::Arc; -use arrow_array::{Array, RecordBatch, TimestampMillisecondArray}; +use arrow_array::{Array, RecordBatch, TimestampMillisecondArray, UInt64Array}; use arrow_schema::Schema; +use arrow_select::take::take; use chrono::Utc; use itertools::Itertools; pub mod batch_adapter; pub mod flight; -pub mod merged_reader; -pub mod reverse_reader; use anyhow::Result; pub use batch_adapter::adapt_batch; -pub use merged_reader::MergedRecordReader; use serde_json::{Map, Value}; /// Replaces columns in a record batch with new arrays. @@ -139,6 +137,16 @@ pub fn get_timestamp_array(size: usize) -> TimestampMillisecondArray { TimestampMillisecondArray::from_value(Utc::now().timestamp_millis(), size) } +pub fn reverse(rb: &RecordBatch) -> RecordBatch { + let indices = UInt64Array::from_iter_values((0..rb.num_rows()).rev().map(|x| x as u64)); + let arrays = rb + .columns() + .iter() + .map(|col| take(&col, &indices, None).unwrap()) + .collect(); + RecordBatch::try_new(rb.schema(), arrays).unwrap() +} + #[cfg(test)] mod tests { use std::sync::Arc; diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 55ec1d1dd..11943fe37 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -32,18 +32,12 @@ use crate::rbac::Users; use actix::extract_session_key_from_req; use actix_web::HttpRequest; use chrono::{NaiveDate, NaiveDateTime, NaiveTime, Utc}; -use itertools::Itertools; use regex::Regex; use sha2::{Digest, Sha256}; -use std::collections::HashMap; use std::env; use tracing::debug; use url::Url; -pub fn hostname_unchecked() -> String { - hostname::get().unwrap().into_string().unwrap() -} - /// Convert minutes to a slot range /// e.g. given minute = 15 and OBJECT_STORE_DATA_GRANULARITY = 10 returns "10-19" pub fn minute_to_slot(minute: u32, data_granularity: u32) -> Option { @@ -61,30 +55,6 @@ pub fn minute_to_slot(minute: u32, data_granularity: u32) -> Option { Some(format!("{block_start:02}-{block_end:02}")) } -pub fn date_to_prefix(date: NaiveDate) -> String { - let date = format!("date={date}/"); - date.replace("UTC", "") -} - -pub fn custom_partition_to_prefix(custom_partition: &HashMap) -> String { - let mut prefix = String::default(); - for (key, value) in custom_partition.iter().sorted_by_key(|v| v.0) { - prefix.push_str(&format!("{key}={value}/", key = key, value = value)); - } - prefix -} - -pub fn hour_to_prefix(hour: u32) -> String { - format!("hour={hour:02}/") -} - -pub fn minute_to_prefix(minute: u32, data_granularity: u32) -> Option { - Some(format!( - "minute={}/", - minute_to_slot(minute, data_granularity)? - )) -} - pub fn get_url() -> Url { if CONFIG.options.ingestor_endpoint.is_empty() { return format!( @@ -93,8 +63,9 @@ pub fn get_url() -> Url { CONFIG.options.address ) .parse::() // if the value was improperly set, this will panic before hand - .unwrap_or_else(|err| panic!("{}, failed to parse `{}` as Url. Please set the environment variable `P_ADDR` to `:` without the scheme (e.g., 192.168.1.1:8000). Please refer to the documentation: https://logg.ing/env for more details.", - err, CONFIG.options.address)); + .unwrap_or_else(|err| { + panic!("{err}, failed to parse `{}` as Url. Please set the environment variable `P_ADDR` to `:` without the scheme (e.g., 192.168.1.1:8000). Please refer to the documentation: https://logg.ing/env for more details.", CONFIG.options.address) + }); } let ingestor_endpoint = &CONFIG.options.ingestor_endpoint;