From c59fa800451f4bfd44947ca4c4cbabc0ce082220 Mon Sep 17 00:00:00 2001 From: Opeyemi Folorunsho Date: Sat, 12 Jul 2025 14:32:57 +0100 Subject: [PATCH 01/12] feat: add gcs implementation for storage --- Cargo.lock | 1 + Cargo.toml | 24 +- Dockerfile.debug | 2 +- docker-compose-gcs-distributed-test.yaml | 114 ++++ src/cli.rs | 18 +- src/metrics/storage.rs | 39 ++ src/parseable/mod.rs | 6 + src/storage/gcs.rs | 679 +++++++++++++++++++++++ src/storage/mod.rs | 2 + 9 files changed, 878 insertions(+), 7 deletions(-) create mode 100644 docker-compose-gcs-distributed-test.yaml create mode 100644 src/storage/gcs.rs diff --git a/Cargo.lock b/Cargo.lock index 5cc606f2a..83a281d20 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3240,6 +3240,7 @@ dependencies = [ "rand", "reqwest 0.12.12", "ring", + "rustls-pemfile 2.2.0", "serde", "serde_json", "snafu", diff --git a/Cargo.toml b/Cargo.toml index aa2d2de11..382c22142 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,14 +10,19 @@ build = "build.rs" [dependencies] # Arrow and DataFusion ecosystem arrow = "54.0.0" -arrow-array = "54.0.0" +arrow-array = "54.0.0" arrow-flight = { version = "54.0.0", features = ["tls"] } arrow-ipc = { version = "54.0.0", features = ["zstd"] } arrow-json = "54.0.0" arrow-schema = { version = "54.0.0", features = ["serde"] } arrow-select = "54.0.0" datafusion = "45.0.0" -object_store = { version = "0.11.2", features = ["cloud", "aws", "azure"] } +object_store = { version = "0.11.2", features = [ + "cloud", + "aws", + "azure", + "gcp", +] } parquet = "54.0.0" # Web server and HTTP-related @@ -34,7 +39,11 @@ tower-http = { version = "0.6.1", features = ["cors"] } url = "2.4.0" # Connectors dependencies -rdkafka = { version = "0.37", optional = true, features = ["cmake-build", "tracing", "libz-static"] } +rdkafka = { version = "0.37", optional = true, features = [ + "cmake-build", + "tracing", + "libz-static", +] } sasl2-sys = { version = "0.1.22", optional = true, features = ["vendored"] } # Authentication and Security @@ -144,7 +153,14 @@ assets-sha1 = "3e703ef8bedf8ae55fd31713f6267ad14ad3d29d" [features] debug = [] -kafka = ["rdkafka", "rdkafka/ssl-vendored", "rdkafka/ssl", "rdkafka/sasl", "sasl2-sys", "sasl2-sys/vendored"] +kafka = [ + "rdkafka", + "rdkafka/ssl-vendored", + "rdkafka/ssl", + "rdkafka/sasl", + "sasl2-sys", + "sasl2-sys/vendored", +] [profile.release-lto] inherits = "release" diff --git a/Dockerfile.debug b/Dockerfile.debug index 35ba5fbae..de7880003 100644 --- a/Dockerfile.debug +++ b/Dockerfile.debug @@ -14,7 +14,7 @@ # along with this program. If not, see . # build stage -FROM rust:1.84.0-bookworm AS builder +FROM docker.io/rust:1.84.0-bookworm AS builder LABEL org.opencontainers.image.title="Parseable" LABEL maintainer="Parseable Team " diff --git a/docker-compose-gcs-distributed-test.yaml b/docker-compose-gcs-distributed-test.yaml new file mode 100644 index 000000000..347b0f03b --- /dev/null +++ b/docker-compose-gcs-distributed-test.yaml @@ -0,0 +1,114 @@ +networks: + parseable-internal: + +services: + # query server + parseable-query: + container_name: parseable-query + build: + context: . + dockerfile: Dockerfile.debug + platform: linux/amd64 + command: ["parseable", "gcs-store"] + ports: + - "8000:8000" + environment: + - P_S3_URL=http://minio:9000 + - P_S3_ACCESS_KEY=parseable + - P_S3_SECRET_KEY=supersecret + - P_S3_REGION=us-east-1 + - P_S3_BUCKET=parseable + - P_STAGING_DIR=/tmp/data + - P_USERNAME=parseableadmin + - P_PASSWORD=parseableadmin + - P_CHECK_UPDATE=false + - P_PARQUET_COMPRESSION_ALGO=snappy + - P_MODE=query + - RUST_LOG=warn + networks: + - parseable-internal + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8000/api/v1/liveness"] + interval: 15s + timeout: 20s + retries: 5 + deploy: + restart_policy: + condition: on-failure + delay: 20s + max_attempts: 3 + # ingest server one + parseable-ingest-one: + container_name: parseable-ingest-one + build: + context: . + dockerfile: Dockerfile.debug + platform: linux/amd64 + command: ["parseable", "gcs-store"] + ports: + - "8000" + environment: + - P_S3_URL=http://minio:9000 + - P_S3_ACCESS_KEY=parseable + - P_S3_SECRET_KEY=supersecret + - P_S3_REGION=us-east-1 + - P_S3_BUCKET=parseable + - P_STAGING_DIR=/tmp/data + - P_USERNAME=parseableadmin + - P_PASSWORD=parseableadmin + - P_CHECK_UPDATE=false + - P_PARQUET_COMPRESSION_ALGO=snappy + - P_MODE=ingest + - P_INGESTOR_ENDPOINT=parseable-ingest-one:8000 + - RUST_LOG=warn + networks: + - parseable-internal + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8000/api/v1/liveness"] + interval: 15s + timeout: 20s + retries: 5 + depends_on: + parseable-query: + condition: service_healthy + deploy: + restart_policy: + condition: on-failure + delay: 20s + max_attempts: 3 + + quest: + platform: linux/amd64 + image: ghcr.io/parseablehq/quest:main + pull_policy: always + command: + [ + "load", + "http://parseable-query:8000", + "parseableadmin", + "parseableadmin", + "20", + "10", + "5m", + "minio:9000", + "parseable", + "supersecret", + "parseable", + "http://parseable-ingest-one:8000", + "parseableadmin", + "parseableadmin", + ] + networks: + - parseable-internal + depends_on: + parseable-query: + condition: service_healthy + parseable-ingest-one: + condition: service_healthy + minio: + condition: service_healthy + deploy: + restart_policy: + condition: on-failure + delay: 20s + max_attempts: 3 diff --git a/src/cli.rs b/src/cli.rs index cfc74d81d..a26d3d966 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -27,7 +27,7 @@ use crate::connectors::kafka::config::KafkaConfig; use crate::{ oidc::{self, OpenidConfig}, option::{validation, Compression, Mode}, - storage::{AzureBlobConfig, FSConfig, S3Config}, + storage::{AzureBlobConfig, FSConfig, GCSConfig, S3Config}, }; /// Default username and password for Parseable server, used by default for local mode. @@ -80,6 +80,9 @@ pub enum StorageOptions { #[command(name = "blob-store")] Blob(BlobStoreArgs), + + #[command(name = "gcs-store")] + GCS(GCSStoreArgs), } #[derive(Parser)] @@ -115,6 +118,17 @@ pub struct BlobStoreArgs { pub kafka: KafkaConfig, } +#[derive(Parser)] +pub struct GCSStoreArgs { + #[command(flatten)] + pub options: Options, + #[command(flatten)] + pub storage: GCSConfig, + #[cfg(feature = "kafka")] + #[command(flatten)] + pub kafka: KafkaConfig, +} + #[derive(Parser, Debug, Default)] pub struct Options { // Authentication @@ -338,7 +352,7 @@ pub struct Options { #[arg( long, - env = "P_MEMORY_THRESHOLD", + env = "P_MEMORY_THRESHOLD", default_value = "80.0", value_parser = validation::validate_percentage, help = "Memory utilization threshold percentage (0.0-100.0) for resource monitoring" diff --git a/src/metrics/storage.rs b/src/metrics/storage.rs index a91c431cb..316ab7825 100644 --- a/src/metrics/storage.rs +++ b/src/metrics/storage.rs @@ -125,3 +125,42 @@ pub mod azureblob { } } } + +pub mod gcs { + use crate::{metrics::METRICS_NAMESPACE, storage::GCSConfig}; + use once_cell::sync::Lazy; + use prometheus::{HistogramOpts, HistogramVec}; + + use super::StorageMetrics; + + pub static REQUEST_RESPONSE_TIME: Lazy = Lazy::new(|| { + HistogramVec::new( + HistogramOpts::new("gcs_response_time", "gcs Request Latency") + .namespace(METRICS_NAMESPACE), + &["method", "status"], + ) + .expect("metric can be created") + }); + + pub static QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME: Lazy = Lazy::new(|| { + HistogramVec::new( + HistogramOpts::new("query_gcs_response_time", "GCS Request Latency") + .namespace(METRICS_NAMESPACE), + &["method", "status"], + ) + .expect("metric can be created") + }); + + impl StorageMetrics for GCSConfig { + fn register_metrics(&self, handler: &actix_web_prometheus::PrometheusMetrics) { + handler + .registry + .register(Box::new(REQUEST_RESPONSE_TIME.clone())) + .expect("metric can be registered"); + handler + .registry + .register(Box::new(QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME.clone())) + .expect("metric can be registered"); + } + } +} diff --git a/src/parseable/mod.rs b/src/parseable/mod.rs index 439f71ee0..970cf4f40 100644 --- a/src/parseable/mod.rs +++ b/src/parseable/mod.rs @@ -117,6 +117,12 @@ pub static PARSEABLE: Lazy = Lazy::new(|| match Cli::parse().storage args.kafka, Arc::new(args.storage), ), + StorageOptions::GCS(args) => Parseable::new( + args.options, + #[cfg(feature = "kafka")] + args.kafka, + Arc::new(args.storage), + ), }); /// All state related to parseable, in one place. diff --git a/src/storage/gcs.rs b/src/storage/gcs.rs new file mode 100644 index 000000000..a2fa23ec7 --- /dev/null +++ b/src/storage/gcs.rs @@ -0,0 +1,679 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use std::{ + collections::{BTreeMap, HashSet}, + path::Path, + sync::Arc, + time::{Duration, Instant}, +}; + +use async_trait::async_trait; +use bytes::Bytes; +use datafusion::{ + datasource::listing::ListingTableUrl, + execution::{ + object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry, ObjectStoreUrl}, + runtime_env::RuntimeEnvBuilder, + }, +}; +use futures::{stream::FuturesUnordered, StreamExt, TryStreamExt}; +use object_store::{ + buffered::BufReader, + gcp::{GoogleCloudStorage, GoogleCloudStorageBuilder}, + limit::LimitStore, + path::Path as StorePath, + BackoffConfig, ClientOptions, ObjectMeta, ObjectStore, PutPayload, RetryConfig, +}; +use relative_path::{RelativePath, RelativePathBuf}; +use tokio::{fs::OpenOptions, io::AsyncReadExt}; +use tracing::{error, info}; + +use crate::{ + handlers::http::users::USERS_ROOT_DIR, + metrics::storage::{azureblob::REQUEST_RESPONSE_TIME, StorageMetrics}, + parseable::LogStream, +}; + +use super::{ + metrics_layer::MetricLayer, object_storage::parseable_json_path, to_object_store_path, + ObjectStorage, ObjectStorageError, ObjectStorageProvider, CONNECT_TIMEOUT_SECS, + MIN_MULTIPART_UPLOAD_SIZE, PARSEABLE_ROOT_DIRECTORY, REQUEST_TIMEOUT_SECS, SCHEMA_FILE_NAME, + STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY, +}; + +#[derive(Debug, Clone, clap::Args)] +#[command( + name = "GCS config", + about = "Start Parseable with GCS or compatible as storage", + help_template = "\ +{about-section} +{all-args} +" +)] + +pub struct GCSConfig { + /// The endpoint to GCS or compatible object storage platform + #[arg(long, env = "P_S3_URL", value_name = "url", required = true)] + pub endpoint_url: String, + + /// The GCS or compatible object storage bucket to be used for storage + #[arg( + long, + env = "P_GCS_BUCKET", + value_name = "bucket-name", + required = true + )] + pub bucket_name: String, + + /// Set client to skip tls verification + #[arg( + long, + env = "P_S3_TLS_SKIP_VERIFY", + value_name = "bool", + default_value = "false" + )] + pub skip_tls: bool, +} + +impl GCSConfig { + fn get_default_builder(&self) -> GoogleCloudStorageBuilder { + let mut client_options = ClientOptions::default() + .with_allow_http(true) + .with_connect_timeout(Duration::from_secs(CONNECT_TIMEOUT_SECS)) + .with_timeout(Duration::from_secs(REQUEST_TIMEOUT_SECS)); + + if self.skip_tls { + client_options = client_options.with_allow_invalid_certificates(true) + } + let retry_config = RetryConfig { + max_retries: 5, + retry_timeout: Duration::from_secs(30), + backoff: BackoffConfig::default(), + }; + + let builder = GoogleCloudStorageBuilder::new() + .with_bucket_name(&self.bucket_name) + .with_retry(retry_config); + + builder.with_client_options(client_options) + } +} + +impl ObjectStorageProvider for GCSConfig { + fn name(&self) -> &'static str { + "gcs" + } + + fn get_datafusion_runtime(&self) -> RuntimeEnvBuilder { + let gcs = self.get_default_builder().build().unwrap(); + + // limit objectstore to a concurrent request limit + let gcs = LimitStore::new(gcs, super::MAX_OBJECT_STORE_REQUESTS); + let gcs = MetricLayer::new(gcs); + + let object_store_registry = DefaultObjectStoreRegistry::new(); + let url = ObjectStoreUrl::parse(format!("s3://{}", &self.bucket_name)).unwrap(); + object_store_registry.register_store(url.as_ref(), Arc::new(gcs)); + + RuntimeEnvBuilder::new().with_object_store_registry(Arc::new(object_store_registry)) + } + + fn construct_client(&self) -> Arc { + let gcs = self.get_default_builder().build().unwrap(); + + Arc::new(GCS { + client: Arc::new(gcs), + bucket: self.bucket_name.clone(), + root: StorePath::from(""), + }) + } + + fn get_endpoint(&self) -> String { + format!("{}/{}", self.endpoint_url, self.bucket_name) + } + + fn register_store_metrics(&self, handler: &actix_web_prometheus::PrometheusMetrics) { + self.register_metrics(handler) + } + + fn get_object_store(&self) -> Arc { + static STORE: once_cell::sync::OnceCell> = + once_cell::sync::OnceCell::new(); + + STORE.get_or_init(|| self.construct_client()).clone() + } +} + +#[derive(Debug)] +pub struct GCS { + client: Arc, + bucket: String, + root: StorePath, +} + +impl GCS { + async fn _get_object(&self, path: &RelativePath) -> Result { + let instant = Instant::now(); + + let resp = self.client.get(&to_object_store_path(path)).await; + + match resp { + Ok(resp) => { + let time = instant.elapsed().as_secs_f64(); + REQUEST_RESPONSE_TIME + .with_label_values(&["GET", "200"]) + .observe(time); + let body = resp.bytes().await.unwrap(); + Ok(body) + } + Err(err) => { + let time = instant.elapsed().as_secs_f64(); + REQUEST_RESPONSE_TIME + .with_label_values(&["GET", "400"]) + .observe(time); + Err(err.into()) + } + } + } + + async fn _put_object( + &self, + path: &RelativePath, + resource: PutPayload, + ) -> Result<(), ObjectStorageError> { + let time = Instant::now(); + let resp = self.client.put(&to_object_store_path(path), resource).await; + let status = if resp.is_ok() { "200" } else { "400" }; + let time = time.elapsed().as_secs_f64(); + REQUEST_RESPONSE_TIME + .with_label_values(&["PUT", status]) + .observe(time); + + if let Err(object_store::Error::NotFound { source, .. }) = &resp { + let source_str = source.to_string(); + if source_str.contains("NoSuchBucket") { + return Err(ObjectStorageError::Custom( + format!("Bucket '{}' does not exist in S3.", self.bucket).to_string(), + )); + } + } + + resp.map(|_| ()).map_err(|err| err.into()) + } + + async fn _delete_prefix(&self, key: &str) -> Result<(), ObjectStorageError> { + let object_stream = self.client.list(Some(&(key.into()))); + + object_stream + .for_each_concurrent(None, |x| async { + match x { + Ok(obj) => { + if (self.client.delete(&obj.location).await).is_err() { + error!("Failed to fetch object during delete stream"); + } + } + Err(_) => { + error!("Failed to fetch object during delete stream"); + } + }; + }) + .await; + + Ok(()) + } + + async fn _list_streams(&self) -> Result, ObjectStorageError> { + let mut result_file_list = HashSet::new(); + let resp = self.client.list_with_delimiter(None).await?; + + let streams = resp + .common_prefixes + .iter() + .flat_map(|path| path.parts()) + .map(|name| name.as_ref().to_string()) + .filter(|name| name != PARSEABLE_ROOT_DIRECTORY && name != USERS_ROOT_DIR) + .collect::>(); + + for stream in streams { + let stream_path = + object_store::path::Path::from(format!("{}/{}", &stream, STREAM_ROOT_DIRECTORY)); + let resp = self.client.list_with_delimiter(Some(&stream_path)).await?; + if resp + .objects + .iter() + .any(|name| name.location.filename().unwrap().ends_with("stream.json")) + { + result_file_list.insert(stream); + } + } + + Ok(result_file_list) + } + + async fn _list_dates(&self, stream: &str) -> Result, ObjectStorageError> { + let resp = self + .client + .list_with_delimiter(Some(&(stream.into()))) + .await?; + + let common_prefixes = resp.common_prefixes; + + // return prefixes at the root level + let dates: Vec<_> = common_prefixes + .iter() + .filter_map(|path| path.as_ref().strip_prefix(&format!("{stream}/"))) + .map(String::from) + .collect(); + + Ok(dates) + } + + async fn _list_manifest_files( + &self, + stream: &str, + ) -> Result>, ObjectStorageError> { + let mut result_file_list: BTreeMap> = BTreeMap::new(); + let resp = self + .client + .list_with_delimiter(Some(&(stream.into()))) + .await?; + + let dates = resp + .common_prefixes + .iter() + .flat_map(|path| path.parts()) + .filter(|name| name.as_ref() != stream && name.as_ref() != STREAM_ROOT_DIRECTORY) + .map(|name| name.as_ref().to_string()) + .collect::>(); + for date in dates { + let date_path = object_store::path::Path::from(format!("{}/{}", stream, &date)); + let resp = self.client.list_with_delimiter(Some(&date_path)).await?; + let manifests: Vec = resp + .objects + .iter() + .filter(|name| name.location.filename().unwrap().ends_with("manifest.json")) + .map(|name| name.location.to_string()) + .collect(); + result_file_list.entry(date).or_default().extend(manifests); + } + Ok(result_file_list) + } + async fn _upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError> { + let instant = Instant::now(); + + // // TODO: Uncomment this when multipart is fixed + // let should_multipart = std::fs::metadata(path)?.len() > MULTIPART_UPLOAD_SIZE as u64; + + let should_multipart = false; + + let res = if should_multipart { + // self._upload_multipart(key, path).await + // this branch will never get executed + Ok(()) + } else { + let bytes = tokio::fs::read(path).await?; + let result = self.client.put(&key.into(), bytes.into()).await?; + info!("Uploaded file to S3: {:?}", result); + Ok(()) + }; + + let status = if res.is_ok() { "200" } else { "400" }; + let time = instant.elapsed().as_secs_f64(); + REQUEST_RESPONSE_TIME + .with_label_values(&["UPLOAD_PARQUET", status]) + .observe(time); + + res + } + + async fn _upload_multipart( + &self, + key: &RelativePath, + path: &Path, + ) -> Result<(), ObjectStorageError> { + let mut file = OpenOptions::new().read(true).open(path).await?; + let location = &to_object_store_path(key); + + let mut async_writer = self.client.put_multipart(location).await?; + + let meta = file.metadata().await?; + let total_size = meta.len() as usize; + if total_size < MIN_MULTIPART_UPLOAD_SIZE { + let mut data = Vec::new(); + file.read_to_end(&mut data).await?; + self.client.put(location, data.into()).await?; + // async_writer.put_part(data.into()).await?; + // async_writer.complete().await?; + return Ok(()); + } else { + let mut data = Vec::new(); + file.read_to_end(&mut data).await?; + + // let mut upload_parts = Vec::new(); + + let has_final_partial_part = total_size % MIN_MULTIPART_UPLOAD_SIZE > 0; + let num_full_parts = total_size / MIN_MULTIPART_UPLOAD_SIZE; + let total_parts = num_full_parts + if has_final_partial_part { 1 } else { 0 }; + + // Upload each part + for part_number in 0..(total_parts) { + let start_pos = part_number * MIN_MULTIPART_UPLOAD_SIZE; + let end_pos = if part_number == num_full_parts && has_final_partial_part { + // Last part might be smaller than 5MB (which is allowed) + total_size + } else { + // All other parts must be at least 5MB + start_pos + MIN_MULTIPART_UPLOAD_SIZE + }; + + // Extract this part's data + let part_data = data[start_pos..end_pos].to_vec(); + + // Upload the part + async_writer.put_part(part_data.into()).await?; + + // upload_parts.push(part_number as u64 + 1); + } + if let Err(err) = async_writer.complete().await { + error!("Failed to complete multipart upload. {:?}", err); + async_writer.abort().await?; + }; + } + Ok(()) + } +} + +#[async_trait] +impl ObjectStorage for GCS { + async fn get_buffered_reader( + &self, + path: &RelativePath, + ) -> Result { + let path = &to_object_store_path(path); + let meta = self.client.head(path).await?; + + let store: Arc = self.client.clone(); + let buf = object_store::buffered::BufReader::new(store, &meta); + Ok(buf) + } + async fn upload_multipart( + &self, + key: &RelativePath, + path: &Path, + ) -> Result<(), ObjectStorageError> { + self._upload_multipart(key, path).await + } + async fn head(&self, path: &RelativePath) -> Result { + Ok(self.client.head(&to_object_store_path(path)).await?) + } + + async fn get_object(&self, path: &RelativePath) -> Result { + Ok(self._get_object(path).await?) + } + + async fn get_objects( + &self, + base_path: Option<&RelativePath>, + filter_func: Box bool + Send>, + ) -> Result, ObjectStorageError> { + let instant = Instant::now(); + + let prefix = if let Some(base_path) = base_path { + to_object_store_path(base_path) + } else { + self.root.clone() + }; + + let mut list_stream = self.client.list(Some(&prefix)); + + let mut res = vec![]; + + while let Some(meta) = list_stream.next().await.transpose()? { + let ingestor_file = filter_func(meta.location.filename().unwrap().to_string()); + + if !ingestor_file { + continue; + } + + let byts = self + .get_object( + RelativePath::from_path(meta.location.as_ref()) + .map_err(ObjectStorageError::PathError)?, + ) + .await?; + + res.push(byts); + } + + let instant = instant.elapsed().as_secs_f64(); + REQUEST_RESPONSE_TIME + .with_label_values(&["GET", "200"]) + .observe(instant); + + Ok(res) + } + + async fn get_ingestor_meta_file_paths( + &self, + ) -> Result, ObjectStorageError> { + let time = Instant::now(); + let mut path_arr = vec![]; + let mut object_stream = self.client.list(Some(&self.root)); + + while let Some(meta) = object_stream.next().await.transpose()? { + let flag = meta.location.filename().unwrap().starts_with("ingestor"); + + if flag { + path_arr.push(RelativePathBuf::from(meta.location.as_ref())); + } + } + + let time = time.elapsed().as_secs_f64(); + REQUEST_RESPONSE_TIME + .with_label_values(&["GET", "200"]) + .observe(time); + + Ok(path_arr) + } + + async fn get_stream_file_paths( + &self, + stream_name: &str, + ) -> Result, ObjectStorageError> { + let time = Instant::now(); + let mut path_arr = vec![]; + let path = to_object_store_path(&RelativePathBuf::from(stream_name)); + let mut object_stream = self.client.list(Some(&path)); + + while let Some(meta) = object_stream.next().await.transpose()? { + let flag = meta.location.filename().unwrap().starts_with(".ingestor"); + + if flag { + path_arr.push(RelativePathBuf::from(meta.location.as_ref())); + } + } + + path_arr.push(RelativePathBuf::from_iter([ + stream_name, + STREAM_METADATA_FILE_NAME, + ])); + path_arr.push(RelativePathBuf::from_iter([stream_name, SCHEMA_FILE_NAME])); + + let time = time.elapsed().as_secs_f64(); + REQUEST_RESPONSE_TIME + .with_label_values(&["GET", "200"]) + .observe(time); + + Ok(path_arr) + } + + async fn put_object( + &self, + path: &RelativePath, + resource: Bytes, + ) -> Result<(), ObjectStorageError> { + self._put_object(path, resource.into()) + .await + .map_err(|err| ObjectStorageError::ConnectionError(Box::new(err)))?; + + Ok(()) + } + + async fn delete_prefix(&self, path: &RelativePath) -> Result<(), ObjectStorageError> { + self._delete_prefix(path.as_ref()).await?; + + Ok(()) + } + + async fn delete_object(&self, path: &RelativePath) -> Result<(), ObjectStorageError> { + Ok(self.client.delete(&to_object_store_path(path)).await?) + } + + async fn check(&self) -> Result<(), ObjectStorageError> { + Ok(self + .client + .head(&to_object_store_path(&parseable_json_path())) + .await + .map(|_| ())?) + } + + async fn delete_stream(&self, stream_name: &str) -> Result<(), ObjectStorageError> { + self._delete_prefix(stream_name).await?; + + Ok(()) + } + + async fn try_delete_node_meta(&self, node_filename: String) -> Result<(), ObjectStorageError> { + let file = RelativePathBuf::from(&node_filename); + match self.client.delete(&to_object_store_path(&file)).await { + Ok(_) => Ok(()), + Err(err) => { + // if the object is not found, it is not an error + // the given url path was incorrect + if matches!(err, object_store::Error::NotFound { .. }) { + error!("Node does not exist"); + Err(err.into()) + } else { + error!("Error deleting node meta file: {:?}", err); + Err(err.into()) + } + } + } + } + + async fn list_streams(&self) -> Result, ObjectStorageError> { + self._list_streams().await + } + + async fn list_old_streams(&self) -> Result, ObjectStorageError> { + let resp = self.client.list_with_delimiter(None).await?; + + let common_prefixes = resp.common_prefixes; // get all dirs + + // return prefixes at the root level + let dirs: HashSet<_> = common_prefixes + .iter() + .filter_map(|path| path.parts().next()) + .map(|name| name.as_ref().to_string()) + .filter(|x| x != PARSEABLE_ROOT_DIRECTORY) + .collect(); + + let stream_json_check = FuturesUnordered::new(); + + for dir in &dirs { + let key = format!("{dir}/{STREAM_METADATA_FILE_NAME}"); + let task = async move { self.client.head(&StorePath::from(key)).await.map(|_| ()) }; + stream_json_check.push(task); + } + + stream_json_check.try_collect::<()>().await?; + + Ok(dirs) + } + + async fn list_dates(&self, stream_name: &str) -> Result, ObjectStorageError> { + let streams = self._list_dates(stream_name).await?; + + Ok(streams) + } + + async fn list_manifest_files( + &self, + stream_name: &str, + ) -> Result>, ObjectStorageError> { + let files = self._list_manifest_files(stream_name).await?; + + Ok(files) + } + + async fn upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError> { + self._upload_file(key, path).await?; + + Ok(()) + } + + fn absolute_url(&self, prefix: &RelativePath) -> object_store::path::Path { + object_store::path::Path::parse(prefix).unwrap() + } + + fn query_prefixes(&self, prefixes: Vec) -> Vec { + prefixes + .into_iter() + .map(|prefix| { + let path = format!("s3://{}/{}", &self.bucket, prefix); + ListingTableUrl::parse(path).unwrap() + }) + .collect() + } + + fn store_url(&self) -> url::Url { + url::Url::parse(&format!("s3://{}", self.bucket)).unwrap() + } + + async fn list_dirs(&self) -> Result, ObjectStorageError> { + let pre = object_store::path::Path::from("/"); + let resp = self.client.list_with_delimiter(Some(&pre)).await?; + + Ok(resp + .common_prefixes + .iter() + .flat_map(|path| path.parts()) + .map(|name| name.as_ref().to_string()) + .collect::>()) + } + + async fn list_dirs_relative( + &self, + relative_path: &RelativePath, + ) -> Result, ObjectStorageError> { + let prefix = object_store::path::Path::from(relative_path.as_str()); + let resp = self.client.list_with_delimiter(Some(&prefix)).await?; + + Ok(resp + .common_prefixes + .iter() + .flat_map(|path| path.parts()) + .map(|name| name.as_ref().to_string()) + .collect::>()) + } + + fn get_bucket_name(&self) -> String { + self.bucket.clone() + } +} diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 29dc2ea13..163870271 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -37,6 +37,7 @@ use std::fmt::Debug; mod azure_blob; pub mod field_stats; +mod gcs; mod localfs; mod metrics_layer; pub mod object_storage; @@ -46,6 +47,7 @@ pub mod store_metadata; use self::retention::Retention; pub use azure_blob::AzureBlobConfig; +pub use gcs::GCSConfig; pub use localfs::FSConfig; pub use object_storage::{ObjectStorage, ObjectStorageProvider}; pub use s3::S3Config; From 76929165144dab91e47963d2ffc81f42b4b08ef1 Mon Sep 17 00:00:00 2001 From: Opeyemi Folorunsho Date: Sun, 13 Jul 2025 13:52:05 +0100 Subject: [PATCH 02/12] feat: update helm chart to accomodate gcs service account secret --- helm/templates/ingestor-statefulset.yaml | 47 +++++++++---- helm/templates/querier-statefulset.yaml | 67 ++++++++++++------ helm/templates/standalone-deployment.yaml | 23 ++++-- helm/values.yaml | 85 ++++++++++++----------- 4 files changed, 139 insertions(+), 83 deletions(-) diff --git a/helm/templates/ingestor-statefulset.yaml b/helm/templates/ingestor-statefulset.yaml index 5143e78f3..33e4003c5 100644 --- a/helm/templates/ingestor-statefulset.yaml +++ b/helm/templates/ingestor-statefulset.yaml @@ -66,6 +66,12 @@ spec: tolerations: {{- toYaml . | nindent 8 }} {{- end }} + {{- if and .Values.parseable.gcsModeSecret .Values.parseable.gcsModeSecret.enabled }} + volumes: + - name: {{ .Values.parseable.gcsModeSecret.auth.secret_name }} + secret: + secretName: {{ .Values.parseable.gcsModeSecret.auth.secret_name }} + {{- end }} containers: - name: {{ .Chart.Name }} securityContext: @@ -74,19 +80,19 @@ spec: imagePullPolicy: {{ .Values.parseable.image.pullPolicy }} args: - /usr/bin/parseable - - {{ if eq .Values.parseable.store "gcs-store" }}"s3-store"{{ else }}{{ .Values.parseable.store | quote }}{{ end }} + - {{ .Values.parseable.store | quote }} - --ingestor-endpoint=$(HOSTNAME).{{ include "parseable.fullname" . }}-ingestor-headless.{{ .Release.Namespace }}.svc.cluster.local:{{ .Values.parseable.highAvailability.ingestor.port }} env: - {{- range $key, $value := .Values.parseable.highAvailability.ingestor.env }} - - name: {{ $key }} - value: {{ tpl $value $ | quote }} - name: HOSTNAME valueFrom: fieldRef: apiVersion: v1 fieldPath: metadata.name + {{- range $key, $value := .Values.parseable.highAvailability.ingestor.env }} + - name: {{ $key }} + value: {{ tpl $value $ | quote }} {{- end }} - + {{- if .Values.parseable.auditLogging.enabled }} - name: P_AUDIT_LOGGER value: {{ .Values.parseable.auditLogging.p_server | quote }} @@ -111,11 +117,13 @@ spec: {{- end }} {{- if and .Values.parseable.gcsModeSecret .Values.parseable.gcsModeSecret.enabled }} + - name: GOOGLE_APPLICATION_CREDENTIALS + value: {{ .Values.parseable.gcsModeSecret.auth.mount_path }} {{- range $secret := .Values.parseable.gcsModeSecret.secrets }} {{- range $key := $secret.keys }} {{- $envPrefix := $secret.prefix | default "" | upper }} {{- $envKey := $key | upper | replace "." "_" | replace "-" "_" }} - - name: {{ $envPrefix }}{{ $envKey | replace "GCS" "S3"}} + - name: {{ $envPrefix }}{{ $envKey }} valueFrom: secretKeyRef: name: {{ $secret.name }} @@ -137,30 +145,39 @@ spec: {{- end }} {{- end }} {{- end }} + - name: P_MODE value: "ingest" + {{- if .Values.parseable.kafkaConnector.enabled }} - {{- range $key, $value := .Values.parseable.kafkaConnector.env }} + {{- range $key, $value := .Values.parseable.kafkaConnector.env }} - name: {{ $key }} value: {{ tpl $value $ | quote }} - {{- end }} + {{- end }} {{- end }} ports: - containerPort: {{ .Values.parseable.highAvailability.ingestor.port }} - {{- with .Values.readinessProbe }} + {{- with .Values.readinessProbe }} readinessProbe: - {{ toYaml . | nindent 12 }} - {{- end }} + {{- toYaml . | nindent 10 }} + {{- end }} resources: - {{- toYaml .Values.parseable.highAvailability.ingestor.resources | nindent 12 }} - {{- if .Values.parseable.persistence.ingestor.enabled }} + {{- toYaml .Values.parseable.highAvailability.ingestor.resources | nindent 10 }} + {{- if or .Values.parseable.persistence.ingestor.enabled (and .Values.parseable.gcsModeSecret .Values.parseable.gcsModeSecret.enabled) }} volumeMounts: + {{- if .Values.parseable.persistence.ingestor.enabled }} - mountPath: "/parseable/staging" name: stage-volume {{- end }} - volumeClaimTemplates: + {{- if and .Values.parseable.gcsModeSecret .Values.parseable.gcsModeSecret.enabled }} + - mountPath: {{ .Values.parseable.gcsModeSecret.auth.mount_path }} + name: {{ .Values.parseable.gcsModeSecret.auth.secret_name }} + readOnly: true + {{- end }} + {{- end }} {{- if .Values.parseable.persistence.ingestor.enabled }} + volumeClaimTemplates: - metadata: name: stage-volume spec: @@ -171,4 +188,4 @@ spec: requests: storage: {{ .Values.parseable.persistence.ingestor.size | quote }} {{- end }} -{{- end }} \ No newline at end of file +{{- end }} diff --git a/helm/templates/querier-statefulset.yaml b/helm/templates/querier-statefulset.yaml index 31333a189..de1f28298 100644 --- a/helm/templates/querier-statefulset.yaml +++ b/helm/templates/querier-statefulset.yaml @@ -29,8 +29,10 @@ spec: minReadySeconds: 2 template: metadata: + {{- with .Values.parseable.podAnnotations }} annotations: - {{- .Values.parseable.podAnnotations | toYaml | nindent 8 }} + {{- toYaml . | nindent 8 }} + {{- end }} labels: {{- .Values.parseable.podLabels | toYaml | nindent 8 }} {{- include "parseable.querierLabelsSelector" . | nindent 8 }} @@ -39,19 +41,33 @@ spec: serviceAccountName: {{ include "parseable.serviceAccountName" . }} {{- with .Values.parseable.toleration }} tolerations: - {{ toYaml . | nindent 8 }} + {{- toYaml . | nindent 8 }} {{- end }} {{- with .Values.parseable.nodeSelector }} nodeSelector: {{- toYaml . | nindent 8 }} {{- end }} + {{- if and .Values.parseable.gcsModeSecret .Values.parseable.gcsModeSecret.enabled }} + volumes: + - name: {{ .Values.parseable.gcsModeSecret.auth.secret_name }} + secret: + secretName: {{ .Values.parseable.gcsModeSecret.auth.secret_name }} + - name: stage-volume + emptyDir: {} + {{- else }} + volumes: + - name: stage-volume + emptyDir: {} + {{- end }} containers: - name: {{ .Chart.Name }} securityContext: {{- toYaml .Values.parseable.securityContext | nindent 10 }} image: {{ .Values.parseable.image.repository }}:{{ .Values.parseable.image.tag | default .Chart.AppVersion }} imagePullPolicy: {{ .Values.parseable.image.pullPolicy }} - args: ["/usr/bin/parseable", {{ if eq .Values.parseable.store "gcs-store" }}"s3-store"{{ else }}{{ .Values.parseable.store | quote }}{{ end }}] + args: + - "/usr/bin/parseable" + - {{ .Values.parseable.store | quote }} env: - name: HOSTNAME valueFrom: @@ -66,7 +82,7 @@ spec: - name: P_MAX_DISK_USAGE_PERCENT value: "95.0" {{- end }} - {{- range $key, $value := .Values.parseable.env }} + {{- range $key, $value := .Values.parseable.env }} - name: {{ $key }} value: {{ tpl $value $ | quote }} {{- end }} @@ -95,11 +111,13 @@ spec: {{- end }} {{- if and .Values.parseable.gcsModeSecret .Values.parseable.gcsModeSecret.enabled }} + - name: GOOGLE_APPLICATION_CREDENTIALS + value: {{ .Values.parseable.gcsModeSecret.auth.mount_path }}/{{ .Values.parseable.gcsModeSecret.auth.secret_key }} {{- range $secret := .Values.parseable.gcsModeSecret.secrets }} {{- range $key := $secret.keys }} {{- $envPrefix := $secret.prefix | default "" | upper }} {{- $envKey := $key | upper | replace "." "_" | replace "-" "_" }} - - name: {{ $envPrefix }}{{ $envKey | replace "GCS" "S3"}} + - name: {{ $envPrefix }}{{ $envKey }} valueFrom: secretKeyRef: name: {{ $secret.name }} @@ -107,7 +125,7 @@ spec: {{- end }} {{- end }} {{- end }} - + {{- if and .Values.parseable.blobModeSecret .Values.parseable.blobModeSecret.enabled }} {{- range $secret := .Values.parseable.blobModeSecret.secrets }} {{- range $key := $secret.keys }} @@ -121,14 +139,15 @@ spec: {{- end }} {{- end }} {{- end }} + ports: - containerPort: 8000 {{- with .Values.parseable.readinessProbe }} readinessProbe: - {{ toYaml . | nindent 12 }} + {{- toYaml . | nindent 10 }} {{- end }} resources: - {{- toYaml .Values.parseable.resources | nindent 12 }} + {{- toYaml .Values.parseable.resources | nindent 10 }} volumeMounts: - mountPath: "/parseable/staging" name: stage-volume @@ -136,19 +155,21 @@ spec: - mountPath: "/parseable/hot-tier" name: hot-tier-volume {{- end }} - volumes: - - emptyDir: {} - name: stage-volume - {{- if .Values.parseable.sidecar.enabled}} + {{- if and .Values.parseable.gcsModeSecret .Values.parseable.gcsModeSecret.enabled }} + - mountPath: {{ .Values.parseable.gcsModeSecret.auth.mount_path }} + name: {{ .Values.parseable.gcsModeSecret.auth.secret_name }} + readOnly: true + {{- end }} + {{- if .Values.parseable.sidecar.enabled }} - name: {{ .Chart.Name }}-sidecar securityContext: - {{- toYaml .Values.parseable.securityContext | nindent 8 }} + {{- toYaml .Values.parseable.securityContext | nindent 10 }} image: {{ .Values.parseable.sidecar.image.repository }}:{{ .Values.parseable.sidecar.image.tag }} imagePullPolicy: {{ .Values.parseable.sidecar.image.pullPolicy }} - command: {{ .Values.parseable.sidecar.command }} - args: {{ .Values.parseable.sidecar.args }} + command: {{ .Values.parseable.sidecar.command }} + args: {{ .Values.parseable.sidecar.args }} env: - {{- range $key, $value := .Values.parseable.sidecar.env }} + {{- range $key, $value := .Values.parseable.sidecar.env }} - name: {{ $key }} value: {{ tpl $value $ | quote }} {{- end }} @@ -156,10 +177,11 @@ spec: - containerPort: {{ .Values.parseable.sidecar.ports }} resources: {{- toYaml .Values.parseable.sidecar.resources | nindent 10 }} - volumeMounts: {{ .Values.parseable.sidecar.volumeMounts | toYaml | nindent 10 }} + volumeMounts: + {{- .Values.parseable.sidecar.volumeMounts | toYaml | nindent 10 }} {{- end }} - volumeClaimTemplates: {{- if .Values.parseable.persistence.querier.enabled }} + volumeClaimTemplates: - metadata: name: hot-tier-volume spec: @@ -178,8 +200,13 @@ spec: resources: requests: storage: 5Gi + {{- if .Values.parseable.sidecar.enabled }} + {{- .Values.parseable.sidecar.volumeClaimTemplates | toYaml | nindent 2 }} + {{- end }} + {{- else }} + {{- if .Values.parseable.sidecar.enabled }} + volumeClaimTemplates: + {{- .Values.parseable.sidecar.volumeClaimTemplates | toYaml | nindent 2 }} {{- end }} - {{- if .Values.parseable.sidecar.enabled}} - {{- .Values.parseable.sidecar.volumeClaimTemplates | toYaml | nindent 4 }} {{- end }} {{- end }} diff --git a/helm/templates/standalone-deployment.yaml b/helm/templates/standalone-deployment.yaml index 968d237b8..20627bfe3 100644 --- a/helm/templates/standalone-deployment.yaml +++ b/helm/templates/standalone-deployment.yaml @@ -36,14 +36,14 @@ spec: imagePullPolicy: {{ .Values.parseable.image.pullPolicy }} # Uncomment to debug # command: [ "/bin/sh", "-c", "sleep 1000000" ] - args: [ "/usr/bin/parseable", {{ if eq .Values.parseable.store "gcs-store" }}"s3-store"{{ else }}{{ .Values.parseable.store | quote }}{{ end }}] + args: [ "/usr/bin/parseable", {{ .Values.parseable.store | quote }}] env: - name: HOSTNAME valueFrom: fieldRef: apiVersion: v1 fieldPath: metadata.name - {{- range $key, $value := .Values.parseable.env }} + {{- range $key, $value := .Values.parseable.env }} - name: {{ $key }} value: {{ tpl $value $ | quote }} {{- end }} @@ -92,11 +92,13 @@ spec: {{- end }} {{- if and .Values.parseable.gcsModeSecret .Values.parseable.gcsModeSecret.enabled }} + - name: GOOGLE_APPLICATION_CREDENTIALS + value: {{ .Values.parseable.gcsModeSecret.auth.mount_path }} {{- range $secret := .Values.parseable.gcsModeSecret.secrets }} {{- range $key := $secret.keys }} {{- $envPrefix := $secret.prefix | default "" | upper }} {{- $envKey := $key | upper | replace "." "_" | replace "-" "_" }} - - name: {{ $envPrefix }}{{ $envKey | replace "GCS" "S3"}} + - name: {{ $envPrefix }}{{ $envKey }} valueFrom: secretKeyRef: name: {{ $secret.name }} @@ -119,21 +121,30 @@ spec: {{- end }} {{- end }} - ports: - containerPort: 8000 {{- with .Values.readinessProbe }} readinessProbe: - {{ toYaml . | nindent 12 }} + {{- toYaml . | nindent 12 }} {{- end }} resources: {{- toYaml .Values.parseable.resources | nindent 12 }} volumeMounts: + {{- if and .Values.parseable.gcsModeSecret .Values.parseable.gcsModeSecret.enabled }} + - mountPath: {{ .Values.parseable.gcsModeSecret.auth.mount_path }} + name: {{ .Values.parseable.gcsModeSecret.auth.secret_name }} + readOnly: true + {{- end }} - mountPath: "/parseable/data" name: data-volume - mountPath: "/parseable/staging" name: stage-volume volumes: + {{- if and .Values.parseable.gcsModeSecret .Values.parseable.gcsModeSecret.enabled }} + - name: {{ .Values.parseable.gcsModeSecret.auth.secret_name }} + secret: + secretName: {{ .Values.parseable.gcsModeSecret.auth.secret_name }} + {{- end }} {{- if .Values.parseable.persistence.staging.enabled }} - name: stage-volume persistentVolumeClaim: @@ -158,4 +169,4 @@ spec: tolerations: {{- toYaml . | nindent 8 }} {{- end }} -{{- end }} # Closing for "if eq .Values.parseable.highAvailability.enabled false" +{{- end }} diff --git a/helm/values.yaml b/helm/values.yaml index 16279d08c..b43f58460 100644 --- a/helm/values.yaml +++ b/helm/values.yaml @@ -4,13 +4,13 @@ parseable: tag: "v2.3.3" pullPolicy: Always ## object store can be local-store, s3-store, blob-store or gcs-store. - store: local-store + store: gcs-store ## Set to true if you want to deploy Parseable in a HA mode (multiple ingestors + hot tier) ## Please note that highAvailability is not supported in local mode highAvailability: - enabled: false + enabled: true ingestor: - affinity: { } + affinity: {} # podAntiAffinity: # requiredDuringSchedulingIgnoredDuringExecution: # - labelSelector: @@ -21,9 +21,9 @@ parseable: port: 8000 extraLabels: app: parseable - podAnnotations: { } - nodeSelector: { } - tolerations: [ ] + podAnnotations: {} + nodeSelector: {} + tolerations: [] labels: app: parseable component: ingestor @@ -137,7 +137,11 @@ parseable: - s3.bucket - s3.region gcsModeSecret: - enabled: false + enabled: true + auth: + secret_name: parseable-env-secret + secret_key: key.json + mount_path: /var/secrets/google/key.json secrets: - name: parseable-env-secret prefix: P_ @@ -148,15 +152,12 @@ parseable: - staging.dir - fs.dir - gcs.url - - gcs.access.key - - gcs.secret.key - gcs.bucket - - gcs.region serviceAccount: create: true name: "parseable" - annotations: { } - nodeSelector: { } + annotations: {} + nodeSelector: {} service: type: ClusterIP port: 80 @@ -164,7 +165,7 @@ parseable: httpGet: path: /api/v1/readiness port: 8000 - toleration: [ ] + toleration: [] resources: limits: cpu: 500m @@ -173,7 +174,7 @@ parseable: cpu: 250m memory: 1Gi ## works only when highAvailability is enabled - ## Set it to true if you want to deploy Parseable + ## Set it to true if you want to deploy Parseable ## Query node with a sidecar sidecar: enabled: false @@ -181,8 +182,8 @@ parseable: repository: busybox tag: latest pullPolicy: IfNotPresent - command: [ ] - args: [ ] + command: [] + args: [] env: RUST_LOG: warn ports: 8000 @@ -193,7 +194,7 @@ parseable: - metadata: name: test-volume spec: - accessModes: [ "ReadWriteOnce" ] + accessModes: ["ReadWriteOnce"] resources: requests: storage: 1Gi @@ -217,36 +218,36 @@ parseable: fsGroupChangePolicy: "Always" nameOverride: "" fullnameOverride: "" - affinity: { } + affinity: {} podLabels: app: parseable component: query - tolerations: [ ] + tolerations: [] ## Use this section to create ServiceMonitor object for ## this Parseable deployment. Read more on ServiceMonitor ## here: https://prometheus-operator.dev/docs/api-reference/api/#monitoring.coreos.com/v1.ServiceMonitor metrics: serviceMonitor: enabled: false - labels: { } + labels: {} namespace: "" spec: jobLabel: "" - targetLabels: [ ] - podTargetLabels: [ ] - endpoints: [ ] - selector: { } - namespaceSelector: { } + targetLabels: [] + podTargetLabels: [] + endpoints: [] + selector: {} + namespaceSelector: {} sampleLimit: 0 - scrapeProtocols: [ ] + scrapeProtocols: [] targetLimit: 0 labelLimit: 0 labelNameLengthLimit: 0 labelValueLengthLimit: 0 keepDroppedTargets: 0 - attachMetadata: { } + attachMetadata: {} scrapeClass: "" - bodySizeLimit: { } + bodySizeLimit: {} kafkaConnector: enabled: false env: @@ -319,7 +320,7 @@ vector: image: repository: timberio/vector pullPolicy: IfNotPresent - pullSecrets: [ ] + pullSecrets: [] tag: "" sha: "" replicas: 1 @@ -332,7 +333,7 @@ vector: create: true serviceAccount: create: true - annotations: { } + annotations: {} name: automountToken: true podLabels: @@ -344,13 +345,13 @@ vector: service: enabled: true type: "ClusterIP" - annotations: { } - topologyKeys: [ ] - ports: [ ] + annotations: {} + topologyKeys: [] + ports: [] externalTrafficPolicy: "" loadBalancerIP: "" ipFamilyPolicy: "" - ipFamilies: [ ] + ipFamilies: [] serviceHeadless: enabled: true dnsPolicy: ClusterFirst @@ -376,7 +377,7 @@ vector: - kubernetes_logs encoding: codec: json - uri: 'http://parseable.parseable.svc.cluster.local/api/v1/ingest' + uri: "http://parseable.parseable.svc.cluster.local/api/v1/ingest" auth: strategy: basic user: admin @@ -386,7 +387,7 @@ vector: X-P-Stream: vectordemo healthcheck: enabled: true - path: 'http://parseable.parseable.svc.cluster.local/api/v1/liveness' + path: "http://parseable.parseable.svc.cluster.local/api/v1/liveness" port: 80 # Default values for fluent-bit. @@ -413,7 +414,7 @@ fluent-bit: tag: latest serviceAccount: create: true - annotations: { } + annotations: {} name: rbac: create: true @@ -424,8 +425,8 @@ fluent-bit: type: ClusterIP port: 2020 loadBalancerClass: - loadBalancerSourceRanges: [ ] - labels: { } + loadBalancerSourceRanges: [] + labels: {} livenessProbe: httpGet: path: / @@ -515,9 +516,9 @@ fluent-bit: Server_Port 80 Username {{ .Values.serverUsername }} Password {{ .Values.serverPassword }} - Stream k8s-events + Stream k8s-events - upstream: { } + upstream: {} customParsers: | [PARSER] @@ -533,7 +534,7 @@ fluent-bit: # Regex ^(?\S+Z) stderr F (?\S+ \S+) \[(?\S+)\] (?\d+#\d+): \*(?\d+) (?.*?) client: (?\S+), server: (?\S+), request: "(?\S+) (?\S+) HTTP/\S+", upstream: "(?[^"]+)", host: "(?\S+)"$ # Time_Key timestamp # Time_Format %Y/%m/%d %H:%M:%S - + # [PARSER] # Name nginx_access # Format regex From 9893a83a3046fbd75057fecb2cfee7b2924eea54 Mon Sep 17 00:00:00 2001 From: solomonope Date: Sun, 13 Jul 2025 15:13:33 +0100 Subject: [PATCH 03/12] feat: add local docker compose --- docker-compose-gcs-distributed-test.yaml | 30 ++++++++++-------------- src/parseable/mod.rs | 2 ++ src/storage/gcs.rs | 10 ++++++-- 3 files changed, 23 insertions(+), 19 deletions(-) diff --git a/docker-compose-gcs-distributed-test.yaml b/docker-compose-gcs-distributed-test.yaml index 347b0f03b..ead5276d0 100644 --- a/docker-compose-gcs-distributed-test.yaml +++ b/docker-compose-gcs-distributed-test.yaml @@ -13,11 +13,7 @@ services: ports: - "8000:8000" environment: - - P_S3_URL=http://minio:9000 - - P_S3_ACCESS_KEY=parseable - - P_S3_SECRET_KEY=supersecret - - P_S3_REGION=us-east-1 - - P_S3_BUCKET=parseable + - P_GCS_BUCKET=parseable-test-gcs-local - P_STAGING_DIR=/tmp/data - P_USERNAME=parseableadmin - P_PASSWORD=parseableadmin @@ -25,6 +21,7 @@ services: - P_PARQUET_COMPRESSION_ALGO=snappy - P_MODE=query - RUST_LOG=warn + - GOOGLE_APPLICATION_CREDENTIALS=/parseable/svc/mpt-randd-8217aef869fd.json networks: - parseable-internal healthcheck: @@ -36,7 +33,9 @@ services: restart_policy: condition: on-failure delay: 20s - max_attempts: 3 + max_attempts: + volumes: + - "/home/opeyemi/Downloads/:/parseable/svc/:ro,z" # ingest server one parseable-ingest-one: container_name: parseable-ingest-one @@ -48,11 +47,7 @@ services: ports: - "8000" environment: - - P_S3_URL=http://minio:9000 - - P_S3_ACCESS_KEY=parseable - - P_S3_SECRET_KEY=supersecret - - P_S3_REGION=us-east-1 - - P_S3_BUCKET=parseable + - P_GCS_BUCKET=parseable-test-gcs-local - P_STAGING_DIR=/tmp/data - P_USERNAME=parseableadmin - P_PASSWORD=parseableadmin @@ -61,6 +56,7 @@ services: - P_MODE=ingest - P_INGESTOR_ENDPOINT=parseable-ingest-one:8000 - RUST_LOG=warn + - GOOGLE_APPLICATION_CREDENTIALS=/parseable/svc/mpt-randd-8217aef869fd.json networks: - parseable-internal healthcheck: @@ -76,6 +72,8 @@ services: condition: on-failure delay: 20s max_attempts: 3 + volumes: + - "/home/opeyemi/Downloads/:/parseable/svc/:ro,z" quest: platform: linux/amd64 @@ -90,10 +88,10 @@ services: "20", "10", "5m", - "minio:9000", - "parseable", - "supersecret", - "parseable", + "storage.googleapis.com", + "GOOG1ESWXFI3QEE6X2SEYKIQDK6PONOW3QPKDD3HJZOBE34H6PWQWWQHYYBSI", + "1Cfhir7cep9ej56UlLX6RKmkuV2BMcC2+BDFoSoa", + "parseable-test-gcs-local", "http://parseable-ingest-one:8000", "parseableadmin", "parseableadmin", @@ -105,8 +103,6 @@ services: condition: service_healthy parseable-ingest-one: condition: service_healthy - minio: - condition: service_healthy deploy: restart_policy: condition: on-failure diff --git a/src/parseable/mod.rs b/src/parseable/mod.rs index 970cf4f40..168685523 100644 --- a/src/parseable/mod.rs +++ b/src/parseable/mod.rs @@ -249,6 +249,8 @@ impl Parseable { return "S3 bucket"; } else if self.storage.name() == "blob_store" { return "Azure Blob Storage"; + } else if self.storage.name() == "gcs" { + return "Google Object Store"; } "Unknown" } diff --git a/src/storage/gcs.rs b/src/storage/gcs.rs index a2fa23ec7..d24500fef 100644 --- a/src/storage/gcs.rs +++ b/src/storage/gcs.rs @@ -69,7 +69,13 @@ use super::{ pub struct GCSConfig { /// The endpoint to GCS or compatible object storage platform - #[arg(long, env = "P_S3_URL", value_name = "url", required = true)] + #[arg( + long, + env = "P_GCS_URL", + value_name = "url", + default_value = "https://storage.googleapis.com", + required = false + )] pub endpoint_url: String, /// The GCS or compatible object storage bucket to be used for storage @@ -107,7 +113,7 @@ impl GCSConfig { backoff: BackoffConfig::default(), }; - let builder = GoogleCloudStorageBuilder::new() + let builder = GoogleCloudStorageBuilder::from_env() .with_bucket_name(&self.bucket_name) .with_retry(retry_config); From c46cac6456e0331b781badd8dc0f1db9c8a600de Mon Sep 17 00:00:00 2001 From: solomonope Date: Sun, 13 Jul 2025 15:16:29 +0100 Subject: [PATCH 04/12] chore: remove secret --- docker-compose-gcs-distributed-test.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docker-compose-gcs-distributed-test.yaml b/docker-compose-gcs-distributed-test.yaml index ead5276d0..fa927e69f 100644 --- a/docker-compose-gcs-distributed-test.yaml +++ b/docker-compose-gcs-distributed-test.yaml @@ -89,8 +89,8 @@ services: "10", "5m", "storage.googleapis.com", - "GOOG1ESWXFI3QEE6X2SEYKIQDK6PONOW3QPKDD3HJZOBE34H6PWQWWQHYYBSI", - "1Cfhir7cep9ej56UlLX6RKmkuV2BMcC2+BDFoSoa", + "", + "", "parseable-test-gcs-local", "http://parseable-ingest-one:8000", "parseableadmin", From 861dde3d88598ca8ffe35d582621f1d09974e360 Mon Sep 17 00:00:00 2001 From: solomonope Date: Sun, 13 Jul 2025 18:52:19 +0100 Subject: [PATCH 05/12] feat: fix coderabbit feedback --- docker-compose-gcs-distributed-test.yaml | 4 ++-- src/cli.rs | 8 ++++---- src/metrics/storage.rs | 6 +++--- src/parseable/mod.rs | 2 +- src/storage/gcs.rs | 23 ++++++++++++----------- src/storage/mod.rs | 2 +- 6 files changed, 23 insertions(+), 22 deletions(-) diff --git a/docker-compose-gcs-distributed-test.yaml b/docker-compose-gcs-distributed-test.yaml index fa927e69f..0bbc5260f 100644 --- a/docker-compose-gcs-distributed-test.yaml +++ b/docker-compose-gcs-distributed-test.yaml @@ -56,7 +56,7 @@ services: - P_MODE=ingest - P_INGESTOR_ENDPOINT=parseable-ingest-one:8000 - RUST_LOG=warn - - GOOGLE_APPLICATION_CREDENTIALS=/parseable/svc/mpt-randd-8217aef869fd.json + - GOOGLE_APPLICATION_CREDENTIALS=/parseable/svc/${GCS_CREDENTIALS_FILE:-key.json} networks: - parseable-internal healthcheck: @@ -73,7 +73,7 @@ services: delay: 20s max_attempts: 3 volumes: - - "/home/opeyemi/Downloads/:/parseable/svc/:ro,z" + - "${GCS_CREDENTIALS_PATH:-./credentials}:/parseable/svc/:ro,z" quest: platform: linux/amd64 diff --git a/src/cli.rs b/src/cli.rs index a26d3d966..a4bce12b0 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -27,7 +27,7 @@ use crate::connectors::kafka::config::KafkaConfig; use crate::{ oidc::{self, OpenidConfig}, option::{validation, Compression, Mode}, - storage::{AzureBlobConfig, FSConfig, GCSConfig, S3Config}, + storage::{AzureBlobConfig, FSConfig, GcsConfig, S3Config}, }; /// Default username and password for Parseable server, used by default for local mode. @@ -82,7 +82,7 @@ pub enum StorageOptions { Blob(BlobStoreArgs), #[command(name = "gcs-store")] - GCS(GCSStoreArgs), + Gcs(GcsStoreArgs), } #[derive(Parser)] @@ -119,11 +119,11 @@ pub struct BlobStoreArgs { } #[derive(Parser)] -pub struct GCSStoreArgs { +pub struct GcsStoreArgs { #[command(flatten)] pub options: Options, #[command(flatten)] - pub storage: GCSConfig, + pub storage: GcsConfig, #[cfg(feature = "kafka")] #[command(flatten)] pub kafka: KafkaConfig, diff --git a/src/metrics/storage.rs b/src/metrics/storage.rs index 316ab7825..f96a317d9 100644 --- a/src/metrics/storage.rs +++ b/src/metrics/storage.rs @@ -127,7 +127,7 @@ pub mod azureblob { } pub mod gcs { - use crate::{metrics::METRICS_NAMESPACE, storage::GCSConfig}; + use crate::{metrics::METRICS_NAMESPACE, storage::GcsConfig}; use once_cell::sync::Lazy; use prometheus::{HistogramOpts, HistogramVec}; @@ -135,7 +135,7 @@ pub mod gcs { pub static REQUEST_RESPONSE_TIME: Lazy = Lazy::new(|| { HistogramVec::new( - HistogramOpts::new("gcs_response_time", "gcs Request Latency") + HistogramOpts::new("gcs_response_time", "GCS Request Latency") .namespace(METRICS_NAMESPACE), &["method", "status"], ) @@ -151,7 +151,7 @@ pub mod gcs { .expect("metric can be created") }); - impl StorageMetrics for GCSConfig { + impl StorageMetrics for GcsConfig { fn register_metrics(&self, handler: &actix_web_prometheus::PrometheusMetrics) { handler .registry diff --git a/src/parseable/mod.rs b/src/parseable/mod.rs index 168685523..87496cfc1 100644 --- a/src/parseable/mod.rs +++ b/src/parseable/mod.rs @@ -117,7 +117,7 @@ pub static PARSEABLE: Lazy = Lazy::new(|| match Cli::parse().storage args.kafka, Arc::new(args.storage), ), - StorageOptions::GCS(args) => Parseable::new( + StorageOptions::Gcs(args) => Parseable::new( args.options, #[cfg(feature = "kafka")] args.kafka, diff --git a/src/storage/gcs.rs b/src/storage/gcs.rs index d24500fef..24f346f21 100644 --- a/src/storage/gcs.rs +++ b/src/storage/gcs.rs @@ -66,8 +66,7 @@ use super::{ {all-args} " )] - -pub struct GCSConfig { +pub struct GcsConfig { /// The endpoint to GCS or compatible object storage platform #[arg( long, @@ -90,14 +89,14 @@ pub struct GCSConfig { /// Set client to skip tls verification #[arg( long, - env = "P_S3_TLS_SKIP_VERIFY", + env = "P_GCS_TLS_SKIP_VERIFY", value_name = "bool", default_value = "false" )] pub skip_tls: bool, } -impl GCSConfig { +impl GcsConfig { fn get_default_builder(&self) -> GoogleCloudStorageBuilder { let mut client_options = ClientOptions::default() .with_allow_http(true) @@ -121,7 +120,7 @@ impl GCSConfig { } } -impl ObjectStorageProvider for GCSConfig { +impl ObjectStorageProvider for GcsConfig { fn name(&self) -> &'static str { "gcs" } @@ -134,6 +133,8 @@ impl ObjectStorageProvider for GCSConfig { let gcs = MetricLayer::new(gcs); let object_store_registry = DefaultObjectStoreRegistry::new(); + // Register GCS client under the "s3://" scheme so DataFusion can route + // object store calls to our GoogleCloudStorage implementatio let url = ObjectStoreUrl::parse(format!("s3://{}", &self.bucket_name)).unwrap(); object_store_registry.register_store(url.as_ref(), Arc::new(gcs)); @@ -143,7 +144,7 @@ impl ObjectStorageProvider for GCSConfig { fn construct_client(&self) -> Arc { let gcs = self.get_default_builder().build().unwrap(); - Arc::new(GCS { + Arc::new(Gcs { client: Arc::new(gcs), bucket: self.bucket_name.clone(), root: StorePath::from(""), @@ -167,13 +168,13 @@ impl ObjectStorageProvider for GCSConfig { } #[derive(Debug)] -pub struct GCS { +pub struct Gcs { client: Arc, bucket: String, root: StorePath, } -impl GCS { +impl Gcs { async fn _get_object(&self, path: &RelativePath) -> Result { let instant = Instant::now(); @@ -215,7 +216,7 @@ impl GCS { let source_str = source.to_string(); if source_str.contains("NoSuchBucket") { return Err(ObjectStorageError::Custom( - format!("Bucket '{}' does not exist in S3.", self.bucket).to_string(), + format!("Bucket '{}' does not exist in GCS.", self.bucket).to_string(), )); } } @@ -335,7 +336,7 @@ impl GCS { } else { let bytes = tokio::fs::read(path).await?; let result = self.client.put(&key.into(), bytes.into()).await?; - info!("Uploaded file to S3: {:?}", result); + info!("Uploaded file to GCS: {:?}", result); Ok(()) }; @@ -406,7 +407,7 @@ impl GCS { } #[async_trait] -impl ObjectStorage for GCS { +impl ObjectStorage for Gcs { async fn get_buffered_reader( &self, path: &RelativePath, diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 163870271..d160760d5 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -47,7 +47,7 @@ pub mod store_metadata; use self::retention::Retention; pub use azure_blob::AzureBlobConfig; -pub use gcs::GCSConfig; +pub use gcs::GcsConfig; pub use localfs::FSConfig; pub use object_storage::{ObjectStorage, ObjectStorageProvider}; pub use s3::S3Config; From b6d4d549f7d837bca47b78cc195e64a9b78cfe74 Mon Sep 17 00:00:00 2001 From: solomonope Date: Mon, 14 Jul 2025 09:43:37 +0100 Subject: [PATCH 06/12] feat: fix localpath for docker-compose --- docker-compose-gcs-distributed-test.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docker-compose-gcs-distributed-test.yaml b/docker-compose-gcs-distributed-test.yaml index 0bbc5260f..a4a99b946 100644 --- a/docker-compose-gcs-distributed-test.yaml +++ b/docker-compose-gcs-distributed-test.yaml @@ -21,7 +21,7 @@ services: - P_PARQUET_COMPRESSION_ALGO=snappy - P_MODE=query - RUST_LOG=warn - - GOOGLE_APPLICATION_CREDENTIALS=/parseable/svc/mpt-randd-8217aef869fd.json + - GOOGLE_APPLICATION_CREDENTIALS=/parseable/svc/${GCS_CREDENTIALS_FILE:-key.json} networks: - parseable-internal healthcheck: @@ -35,7 +35,7 @@ services: delay: 20s max_attempts: volumes: - - "/home/opeyemi/Downloads/:/parseable/svc/:ro,z" + - "${GCS_CREDENTIALS_PATH:-./credentials}:/parseable/svc/:ro,z" # ingest server one parseable-ingest-one: container_name: parseable-ingest-one From 3193302075a50755cf0c6e9f92dd8596a56bcf98 Mon Sep 17 00:00:00 2001 From: solomonope Date: Mon, 14 Jul 2025 10:20:15 +0100 Subject: [PATCH 07/12] feat: register path with datafusion --- src/storage/gcs.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/storage/gcs.rs b/src/storage/gcs.rs index 24f346f21..e8ae811a4 100644 --- a/src/storage/gcs.rs +++ b/src/storage/gcs.rs @@ -133,9 +133,9 @@ impl ObjectStorageProvider for GcsConfig { let gcs = MetricLayer::new(gcs); let object_store_registry = DefaultObjectStoreRegistry::new(); - // Register GCS client under the "s3://" scheme so DataFusion can route - // object store calls to our GoogleCloudStorage implementatio - let url = ObjectStoreUrl::parse(format!("s3://{}", &self.bucket_name)).unwrap(); + // Register GCS client under the "gs://" scheme so DataFusion can route + // object store calls to our GoogleCloudStorage implementation + let url = ObjectStoreUrl::parse(format!("gs://{}", &self.bucket_name)).unwrap(); object_store_registry.register_store(url.as_ref(), Arc::new(gcs)); RuntimeEnvBuilder::new().with_object_store_registry(Arc::new(object_store_registry)) @@ -643,14 +643,14 @@ impl ObjectStorage for Gcs { prefixes .into_iter() .map(|prefix| { - let path = format!("s3://{}/{}", &self.bucket, prefix); + let path = format!("gs://{}/{}", &self.bucket, prefix); ListingTableUrl::parse(path).unwrap() }) .collect() } fn store_url(&self) -> url::Url { - url::Url::parse(&format!("s3://{}", self.bucket)).unwrap() + url::Url::parse(&format!("gs://{}", self.bucket)).unwrap() } async fn list_dirs(&self) -> Result, ObjectStorageError> { From 1fa7989d9b3ddd09dcdc61c25ec2e552406519dc Mon Sep 17 00:00:00 2001 From: solomonope Date: Mon, 14 Jul 2025 10:54:34 +0100 Subject: [PATCH 08/12] feat: use gcs metrics handler --- src/storage/gcs.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/src/storage/gcs.rs b/src/storage/gcs.rs index e8ae811a4..d41fd6896 100644 --- a/src/storage/gcs.rs +++ b/src/storage/gcs.rs @@ -23,6 +23,11 @@ use std::{ time::{Duration, Instant}, }; +use crate::{ + handlers::http::users::USERS_ROOT_DIR, + metrics::storage::{gcs::REQUEST_RESPONSE_TIME, StorageMetrics}, + parseable::LogStream, +}; use async_trait::async_trait; use bytes::Bytes; use datafusion::{ @@ -44,12 +49,6 @@ use relative_path::{RelativePath, RelativePathBuf}; use tokio::{fs::OpenOptions, io::AsyncReadExt}; use tracing::{error, info}; -use crate::{ - handlers::http::users::USERS_ROOT_DIR, - metrics::storage::{azureblob::REQUEST_RESPONSE_TIME, StorageMetrics}, - parseable::LogStream, -}; - use super::{ metrics_layer::MetricLayer, object_storage::parseable_json_path, to_object_store_path, ObjectStorage, ObjectStorageError, ObjectStorageProvider, CONNECT_TIMEOUT_SECS, From 05e81673cee7cf33bef05d8e5dc268ac83ac2c79 Mon Sep 17 00:00:00 2001 From: solomonope Date: Mon, 14 Jul 2025 13:03:42 +0100 Subject: [PATCH 09/12] feat: remove dead code and dead comments --- src/storage/gcs.rs | 31 ++++++------------------------- 1 file changed, 6 insertions(+), 25 deletions(-) diff --git a/src/storage/gcs.rs b/src/storage/gcs.rs index d41fd6896..6ae7c494c 100644 --- a/src/storage/gcs.rs +++ b/src/storage/gcs.rs @@ -155,7 +155,7 @@ impl ObjectStorageProvider for GcsConfig { } fn register_store_metrics(&self, handler: &actix_web_prometheus::PrometheusMetrics) { - self.register_metrics(handler) + self.register_metrics(handler); } fn get_object_store(&self) -> Arc { @@ -323,29 +323,16 @@ impl Gcs { async fn _upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError> { let instant = Instant::now(); - // // TODO: Uncomment this when multipart is fixed - // let should_multipart = std::fs::metadata(path)?.len() > MULTIPART_UPLOAD_SIZE as u64; + let bytes = tokio::fs::read(path).await?; + let result = self.client.put(&key.into(), bytes.into()).await?; + info!("Uploaded file to GCS: {:?}", result); - let should_multipart = false; - - let res = if should_multipart { - // self._upload_multipart(key, path).await - // this branch will never get executed - Ok(()) - } else { - let bytes = tokio::fs::read(path).await?; - let result = self.client.put(&key.into(), bytes.into()).await?; - info!("Uploaded file to GCS: {:?}", result); - Ok(()) - }; - - let status = if res.is_ok() { "200" } else { "400" }; let time = instant.elapsed().as_secs_f64(); REQUEST_RESPONSE_TIME - .with_label_values(&["UPLOAD_PARQUET", status]) + .with_label_values(&["UPLOAD_PARQUET", "200"]) .observe(time); - res + Ok(()) } async fn _upload_multipart( @@ -364,15 +351,11 @@ impl Gcs { let mut data = Vec::new(); file.read_to_end(&mut data).await?; self.client.put(location, data.into()).await?; - // async_writer.put_part(data.into()).await?; - // async_writer.complete().await?; return Ok(()); } else { let mut data = Vec::new(); file.read_to_end(&mut data).await?; - // let mut upload_parts = Vec::new(); - let has_final_partial_part = total_size % MIN_MULTIPART_UPLOAD_SIZE > 0; let num_full_parts = total_size / MIN_MULTIPART_UPLOAD_SIZE; let total_parts = num_full_parts + if has_final_partial_part { 1 } else { 0 }; @@ -393,8 +376,6 @@ impl Gcs { // Upload the part async_writer.put_part(part_data.into()).await?; - - // upload_parts.push(part_number as u64 + 1); } if let Err(err) = async_writer.complete().await { error!("Failed to complete multipart upload. {:?}", err); From b93ae8705cf03f1a38d8422b9e26a353b461aabe Mon Sep 17 00:00:00 2001 From: solomonope Date: Mon, 14 Jul 2025 13:14:25 +0100 Subject: [PATCH 10/12] feat: improve error handling for _upload_multipart --- src/storage/gcs.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/storage/gcs.rs b/src/storage/gcs.rs index 6ae7c494c..292bf3f02 100644 --- a/src/storage/gcs.rs +++ b/src/storage/gcs.rs @@ -378,8 +378,13 @@ impl Gcs { async_writer.put_part(part_data.into()).await?; } if let Err(err) = async_writer.complete().await { - error!("Failed to complete multipart upload. {:?}", err); - async_writer.abort().await?; + if let Err(abort_err) = async_writer.abort().await { + error!( + "Failed to abort multipart upload after completion failure: {:?}", + abort_err + ); + } + return Err(err.into()); }; } Ok(()) From 4f2494cdeb17d8d8ba1a06beb0ffc49ab063223c Mon Sep 17 00:00:00 2001 From: Opeyemi Folorunsho Date: Mon, 14 Jul 2025 15:32:31 +0100 Subject: [PATCH 11/12] feat: mount secret correctly for helm chart --- helm/templates/ingestor-statefulset.yaml | 2 +- helm/templates/querier-statefulset.yaml | 2 +- helm/values.yaml | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/helm/templates/ingestor-statefulset.yaml b/helm/templates/ingestor-statefulset.yaml index 33e4003c5..73185cd2b 100644 --- a/helm/templates/ingestor-statefulset.yaml +++ b/helm/templates/ingestor-statefulset.yaml @@ -118,7 +118,7 @@ spec: {{- if and .Values.parseable.gcsModeSecret .Values.parseable.gcsModeSecret.enabled }} - name: GOOGLE_APPLICATION_CREDENTIALS - value: {{ .Values.parseable.gcsModeSecret.auth.mount_path }} + value: {{ .Values.parseable.gcsModeSecret.auth.mount_path }}/{{ .Values.parseable.gcsModeSecret.auth.secret_key }} {{- range $secret := .Values.parseable.gcsModeSecret.secrets }} {{- range $key := $secret.keys }} {{- $envPrefix := $secret.prefix | default "" | upper }} diff --git a/helm/templates/querier-statefulset.yaml b/helm/templates/querier-statefulset.yaml index de1f28298..d0b4a2187 100644 --- a/helm/templates/querier-statefulset.yaml +++ b/helm/templates/querier-statefulset.yaml @@ -112,7 +112,7 @@ spec: {{- if and .Values.parseable.gcsModeSecret .Values.parseable.gcsModeSecret.enabled }} - name: GOOGLE_APPLICATION_CREDENTIALS - value: {{ .Values.parseable.gcsModeSecret.auth.mount_path }}/{{ .Values.parseable.gcsModeSecret.auth.secret_key }} + value: {{ .Values.parseable.gcsModeSecret.auth.mount_path }} {{- range $secret := .Values.parseable.gcsModeSecret.secrets }} {{- range $key := $secret.keys }} {{- $envPrefix := $secret.prefix | default "" | upper }} diff --git a/helm/values.yaml b/helm/values.yaml index b43f58460..591f8d8ac 100644 --- a/helm/values.yaml +++ b/helm/values.yaml @@ -4,7 +4,7 @@ parseable: tag: "v2.3.3" pullPolicy: Always ## object store can be local-store, s3-store, blob-store or gcs-store. - store: gcs-store + store: local-store ## Set to true if you want to deploy Parseable in a HA mode (multiple ingestors + hot tier) ## Please note that highAvailability is not supported in local mode highAvailability: @@ -141,7 +141,7 @@ parseable: auth: secret_name: parseable-env-secret secret_key: key.json - mount_path: /var/secrets/google/key.json + mount_path: /var/secrets/google secrets: - name: parseable-env-secret prefix: P_ From 989464ab61ba045ed018a5dc34c669c7a1c4016e Mon Sep 17 00:00:00 2001 From: Opeyemi Folorunsho Date: Mon, 14 Jul 2025 16:15:04 +0100 Subject: [PATCH 12/12] feat: update secret mounting for querier --- helm/templates/querier-statefulset.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/helm/templates/querier-statefulset.yaml b/helm/templates/querier-statefulset.yaml index d0b4a2187..de1f28298 100644 --- a/helm/templates/querier-statefulset.yaml +++ b/helm/templates/querier-statefulset.yaml @@ -112,7 +112,7 @@ spec: {{- if and .Values.parseable.gcsModeSecret .Values.parseable.gcsModeSecret.enabled }} - name: GOOGLE_APPLICATION_CREDENTIALS - value: {{ .Values.parseable.gcsModeSecret.auth.mount_path }} + value: {{ .Values.parseable.gcsModeSecret.auth.mount_path }}/{{ .Values.parseable.gcsModeSecret.auth.secret_key }} {{- range $secret := .Values.parseable.gcsModeSecret.secrets }} {{- range $key := $secret.keys }} {{- $envPrefix := $secret.prefix | default "" | upper }}