Skip to content

Commit 861dde3

Browse files
committed
feat: fix coderabbit feedback
1 parent c46cac6 commit 861dde3

File tree

6 files changed

+23
-22
lines changed

6 files changed

+23
-22
lines changed

docker-compose-gcs-distributed-test.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ services:
5656
- P_MODE=ingest
5757
- P_INGESTOR_ENDPOINT=parseable-ingest-one:8000
5858
- RUST_LOG=warn
59-
- GOOGLE_APPLICATION_CREDENTIALS=/parseable/svc/mpt-randd-8217aef869fd.json
59+
- GOOGLE_APPLICATION_CREDENTIALS=/parseable/svc/${GCS_CREDENTIALS_FILE:-key.json}
6060
networks:
6161
- parseable-internal
6262
healthcheck:
@@ -73,7 +73,7 @@ services:
7373
delay: 20s
7474
max_attempts: 3
7575
volumes:
76-
- "/home/opeyemi/Downloads/:/parseable/svc/:ro,z"
76+
- "${GCS_CREDENTIALS_PATH:-./credentials}:/parseable/svc/:ro,z"
7777

7878
quest:
7979
platform: linux/amd64

src/cli.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use crate::connectors::kafka::config::KafkaConfig;
2727
use crate::{
2828
oidc::{self, OpenidConfig},
2929
option::{validation, Compression, Mode},
30-
storage::{AzureBlobConfig, FSConfig, GCSConfig, S3Config},
30+
storage::{AzureBlobConfig, FSConfig, GcsConfig, S3Config},
3131
};
3232

3333
/// Default username and password for Parseable server, used by default for local mode.
@@ -82,7 +82,7 @@ pub enum StorageOptions {
8282
Blob(BlobStoreArgs),
8383

8484
#[command(name = "gcs-store")]
85-
GCS(GCSStoreArgs),
85+
Gcs(GcsStoreArgs),
8686
}
8787

8888
#[derive(Parser)]
@@ -119,11 +119,11 @@ pub struct BlobStoreArgs {
119119
}
120120

121121
#[derive(Parser)]
122-
pub struct GCSStoreArgs {
122+
pub struct GcsStoreArgs {
123123
#[command(flatten)]
124124
pub options: Options,
125125
#[command(flatten)]
126-
pub storage: GCSConfig,
126+
pub storage: GcsConfig,
127127
#[cfg(feature = "kafka")]
128128
#[command(flatten)]
129129
pub kafka: KafkaConfig,

src/metrics/storage.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -127,15 +127,15 @@ pub mod azureblob {
127127
}
128128

129129
pub mod gcs {
130-
use crate::{metrics::METRICS_NAMESPACE, storage::GCSConfig};
130+
use crate::{metrics::METRICS_NAMESPACE, storage::GcsConfig};
131131
use once_cell::sync::Lazy;
132132
use prometheus::{HistogramOpts, HistogramVec};
133133

134134
use super::StorageMetrics;
135135

136136
pub static REQUEST_RESPONSE_TIME: Lazy<HistogramVec> = Lazy::new(|| {
137137
HistogramVec::new(
138-
HistogramOpts::new("gcs_response_time", "gcs Request Latency")
138+
HistogramOpts::new("gcs_response_time", "GCS Request Latency")
139139
.namespace(METRICS_NAMESPACE),
140140
&["method", "status"],
141141
)
@@ -151,7 +151,7 @@ pub mod gcs {
151151
.expect("metric can be created")
152152
});
153153

154-
impl StorageMetrics for GCSConfig {
154+
impl StorageMetrics for GcsConfig {
155155
fn register_metrics(&self, handler: &actix_web_prometheus::PrometheusMetrics) {
156156
handler
157157
.registry

src/parseable/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ pub static PARSEABLE: Lazy<Parseable> = Lazy::new(|| match Cli::parse().storage
117117
args.kafka,
118118
Arc::new(args.storage),
119119
),
120-
StorageOptions::GCS(args) => Parseable::new(
120+
StorageOptions::Gcs(args) => Parseable::new(
121121
args.options,
122122
#[cfg(feature = "kafka")]
123123
args.kafka,

src/storage/gcs.rs

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,7 @@ use super::{
6666
{all-args}
6767
"
6868
)]
69-
70-
pub struct GCSConfig {
69+
pub struct GcsConfig {
7170
/// The endpoint to GCS or compatible object storage platform
7271
#[arg(
7372
long,
@@ -90,14 +89,14 @@ pub struct GCSConfig {
9089
/// Set client to skip tls verification
9190
#[arg(
9291
long,
93-
env = "P_S3_TLS_SKIP_VERIFY",
92+
env = "P_GCS_TLS_SKIP_VERIFY",
9493
value_name = "bool",
9594
default_value = "false"
9695
)]
9796
pub skip_tls: bool,
9897
}
9998

100-
impl GCSConfig {
99+
impl GcsConfig {
101100
fn get_default_builder(&self) -> GoogleCloudStorageBuilder {
102101
let mut client_options = ClientOptions::default()
103102
.with_allow_http(true)
@@ -121,7 +120,7 @@ impl GCSConfig {
121120
}
122121
}
123122

124-
impl ObjectStorageProvider for GCSConfig {
123+
impl ObjectStorageProvider for GcsConfig {
125124
fn name(&self) -> &'static str {
126125
"gcs"
127126
}
@@ -134,6 +133,8 @@ impl ObjectStorageProvider for GCSConfig {
134133
let gcs = MetricLayer::new(gcs);
135134

136135
let object_store_registry = DefaultObjectStoreRegistry::new();
136+
// Register GCS client under the "s3://" scheme so DataFusion can route
137+
// object store calls to our GoogleCloudStorage implementatio
137138
let url = ObjectStoreUrl::parse(format!("s3://{}", &self.bucket_name)).unwrap();
138139
object_store_registry.register_store(url.as_ref(), Arc::new(gcs));
139140

@@ -143,7 +144,7 @@ impl ObjectStorageProvider for GCSConfig {
143144
fn construct_client(&self) -> Arc<dyn ObjectStorage> {
144145
let gcs = self.get_default_builder().build().unwrap();
145146

146-
Arc::new(GCS {
147+
Arc::new(Gcs {
147148
client: Arc::new(gcs),
148149
bucket: self.bucket_name.clone(),
149150
root: StorePath::from(""),
@@ -167,13 +168,13 @@ impl ObjectStorageProvider for GCSConfig {
167168
}
168169

169170
#[derive(Debug)]
170-
pub struct GCS {
171+
pub struct Gcs {
171172
client: Arc<GoogleCloudStorage>,
172173
bucket: String,
173174
root: StorePath,
174175
}
175176

176-
impl GCS {
177+
impl Gcs {
177178
async fn _get_object(&self, path: &RelativePath) -> Result<Bytes, ObjectStorageError> {
178179
let instant = Instant::now();
179180

@@ -215,7 +216,7 @@ impl GCS {
215216
let source_str = source.to_string();
216217
if source_str.contains("<Code>NoSuchBucket</Code>") {
217218
return Err(ObjectStorageError::Custom(
218-
format!("Bucket '{}' does not exist in S3.", self.bucket).to_string(),
219+
format!("Bucket '{}' does not exist in GCS.", self.bucket).to_string(),
219220
));
220221
}
221222
}
@@ -335,7 +336,7 @@ impl GCS {
335336
} else {
336337
let bytes = tokio::fs::read(path).await?;
337338
let result = self.client.put(&key.into(), bytes.into()).await?;
338-
info!("Uploaded file to S3: {:?}", result);
339+
info!("Uploaded file to GCS: {:?}", result);
339340
Ok(())
340341
};
341342

@@ -406,7 +407,7 @@ impl GCS {
406407
}
407408

408409
#[async_trait]
409-
impl ObjectStorage for GCS {
410+
impl ObjectStorage for Gcs {
410411
async fn get_buffered_reader(
411412
&self,
412413
path: &RelativePath,

src/storage/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ pub mod store_metadata;
4747

4848
use self::retention::Retention;
4949
pub use azure_blob::AzureBlobConfig;
50-
pub use gcs::GCSConfig;
50+
pub use gcs::GcsConfig;
5151
pub use localfs::FSConfig;
5252
pub use object_storage::{ObjectStorage, ObjectStorageProvider};
5353
pub use s3::S3Config;

0 commit comments

Comments
 (0)