Skip to content

Commit 8b35a9f

Browse files
author
Devdutt Shenoi
authored
Merge branch 'main' into query-param
2 parents 1138006 + 43a793d commit 8b35a9f

14 files changed

+487
-198
lines changed

.github/workflows/build-push-edge-kafka.yaml

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,24 @@ jobs:
3737
with:
3838
images: parseable/parseable
3939

40-
- name: Build and push
40+
- name: Build and push x86_64
4141
uses: docker/build-push-action@ad44023a93711e3deb337508980b4b5e9bcdc5dc
4242
with:
4343
context: .
4444
file: ./Dockerfile.kafka
4545
push: true
4646
tags: parseable/parseable:edge-kafka
47-
platforms: linux/amd64,linux/arm64
47+
platforms: linux/amd64
48+
build-args: |
49+
LIB_DIR=x86_64-linux-gnu
50+
51+
- name: Build and push aarch64
52+
uses: docker/build-push-action@ad44023a93711e3deb337508980b4b5e9bcdc5dc
53+
with:
54+
context: .
55+
file: ./Dockerfile.kafka
56+
push: true
57+
tags: parseable/parseable:edge-kafka
58+
platforms: =linux/arm64
59+
build-args: |
60+
LIB_DIR=aarch64-linux-gnu

.github/workflows/integration-test.yaml

Lines changed: 1 addition & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -32,27 +32,4 @@ jobs:
3232
- name: Stop compose
3333
if: always()
3434
run: docker compose -f docker-compose-distributed-test.yaml down -v
35-
36-
docker-compose-test-with-kafka:
37-
name: Quest Smoke and Load Tests for Standalone deployments with Kafka
38-
runs-on: ubuntu-latest
39-
steps:
40-
- name: Checkout
41-
uses: actions/checkout@v4
42-
- name: Start compose
43-
run: docker compose -f docker-compose-test-with-kafka.yaml up --build --exit-code-from quest
44-
- name: Stop compose
45-
if: always()
46-
run: docker compose -f docker-compose-test-with-kafka.yaml down -v
47-
48-
docker-compose-distributed-test-with-kafka:
49-
name: Quest Smoke and Load Tests for Distributed deployments with Kafka
50-
runs-on: ubuntu-latest
51-
steps:
52-
- name: Checkout
53-
uses: actions/checkout@v4
54-
- name: Start compose
55-
run: docker compose -f docker-compose-distributed-test-with-kafka.yaml up --build --exit-code-from quest
56-
- name: Stop compose
57-
if: always()
58-
run: docker compose -f docker-compose-distributed-test-with-kafka.yaml down -v
35+

Dockerfile.kafka

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,16 +49,17 @@ RUN cargo build --release --features kafka
4949
# final stage
5050
FROM gcr.io/distroless/cc-debian12:latest
5151

52+
# Copy only the libraries that binary needs since kafka is statically linked
53+
ARG LIB_DIR
54+
COPY --from=builder /usr/lib/${LIB_DIR}/libsasl2.so.2 /usr/lib/${LIB_DIR}/
55+
COPY --from=builder /usr/lib/${LIB_DIR}/libssl.so.3 /usr/lib/${LIB_DIR}/
56+
COPY --from=builder /usr/lib/${LIB_DIR}/libcrypto.so.3 /usr/lib/${LIB_DIR}/
57+
5258
WORKDIR /parseable
5359

5460
# Copy the Parseable binary from builder
5561
COPY --from=builder /parseable/target/release/parseable /usr/bin/parseable
5662

57-
# Copy only the libraries that binary needs since kafka is statically linked
58-
COPY --from=builder /usr/lib/x86_64-linux-gnu/libsasl2.so.2 /usr/lib/x86_64-linux-gnu/
59-
COPY --from=builder /usr/lib/x86_64-linux-gnu/libssl.so.3 /usr/lib/x86_64-linux-gnu/
60-
COPY --from=builder /usr/lib/x86_64-linux-gnu/libcrypto.so.3 /usr/lib/x86_64-linux-gnu/
61-
6263
# Copy CA certificates
6364
COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/
6465

docker-compose-distributed-test-with-kafka.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,8 @@ services:
9494
build:
9595
context: .
9696
dockerfile: Dockerfile.kafka
97+
args:
98+
- LIB_DIR=x86_64-linux-gnu
9799
platform: linux/amd64
98100
command: [ "parseable", "s3-store", ]
99101
expose:
@@ -140,6 +142,8 @@ services:
140142
build:
141143
context: .
142144
dockerfile: Dockerfile.kafka
145+
args:
146+
- LIB_DIR=x86_64-linux-gnu
143147
platform: linux/amd64
144148
command: [ "parseable", "s3-store", ]
145149
expose:

docker-compose-test-with-kafka.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ services:
2828
build:
2929
context: .
3030
dockerfile: Dockerfile.kafka
31+
args:
32+
- LIB_DIR=x86_64-linux-gnu
3133
platform: linux/amd64
3234
command: [ "parseable", "s3-store", ]
3335
ports:

src/alerts/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -330,7 +330,7 @@ pub struct RollingWindow {
330330
// should always be "now"
331331
pub eval_end: String,
332332
// x minutes (5m)
333-
pub eval_frequency: u32,
333+
pub eval_frequency: u64,
334334
}
335335

336336
#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)]
@@ -641,7 +641,7 @@ impl AlertConfig {
641641
columns
642642
}
643643

644-
pub fn get_eval_frequency(&self) -> u32 {
644+
pub fn get_eval_frequency(&self) -> u64 {
645645
match &self.eval_type {
646646
EvalConfig::RollingWindow(rolling_window) => rolling_window.eval_frequency,
647647
}

src/handlers/http/logstream.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,8 +103,6 @@ pub async fn list(req: HttpRequest) -> Result<impl Responder, StreamError> {
103103
.unwrap()
104104
.into_iter()
105105
.filter(|logstream| {
106-
warn!("logstream-\n{logstream:?}");
107-
108106
Users.authorize(key.clone(), Action::ListStream, Some(logstream), None)
109107
== crate::rbac::Response::Authorized
110108
})

src/handlers/http/modal/ingest_server.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,11 @@ impl ParseableServer for IngestServer {
203203
sync::run_local_sync().await;
204204
let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) =
205205
sync::object_store_sync().await;
206+
let (
207+
mut remote_conversion_handler,
208+
mut remote_conversion_outbox,
209+
mut remote_conversion_inbox,
210+
) = sync::arrow_conversion().await;
206211

207212
tokio::spawn(airplane::server());
208213

@@ -219,12 +224,16 @@ impl ParseableServer for IngestServer {
219224
// actix server finished .. stop other threads and stop the server
220225
remote_sync_inbox.send(()).unwrap_or(());
221226
localsync_inbox.send(()).unwrap_or(());
227+
remote_conversion_inbox.send(()).unwrap_or(());
222228
if let Err(e) = localsync_handler.await {
223229
error!("Error joining remote_sync_handler: {:?}", e);
224230
}
225231
if let Err(e) = remote_sync_handler.await {
226232
error!("Error joining remote_sync_handler: {:?}", e);
227233
}
234+
if let Err(e) = remote_conversion_handler.await {
235+
error!("Error joining remote_conversion_handler: {:?}", e);
236+
}
228237
return e
229238
},
230239
_ = &mut localsync_outbox => {
@@ -238,6 +247,13 @@ impl ParseableServer for IngestServer {
238247
error!("Error joining remote_sync_handler: {:?}", e);
239248
}
240249
(remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync().await;
250+
},
251+
_ = &mut remote_conversion_outbox => {
252+
// remote_conversion failed, this is recoverable by just starting remote_conversion thread again
253+
if let Err(e) = remote_conversion_handler.await {
254+
error!("Error joining remote_conversion_handler: {:?}", e);
255+
}
256+
(remote_conversion_handler, remote_conversion_outbox, remote_conversion_inbox) = sync::arrow_conversion().await;
241257
}
242258

243259
}

src/handlers/http/modal/mod.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,19 @@ pub trait ParseableServer {
139139

140140
// Perform S3 sync and wait for completion
141141
info!("Starting data sync to S3...");
142-
if let Err(e) = CONFIG.storage().get_object_store().sync(true).await {
142+
143+
if let Err(e) = CONFIG.storage().get_object_store().conversion(true).await {
144+
warn!("Failed to convert arrow files to parquet. {:?}", e);
145+
} else {
146+
info!("Successfully converted arrow files to parquet.");
147+
}
148+
149+
if let Err(e) = CONFIG
150+
.storage()
151+
.get_object_store()
152+
.upload_files_from_staging()
153+
.await
154+
{
143155
warn!("Failed to sync local data with object store. {:?}", e);
144156
} else {
145157
info!("Successfully synced all data to S3.");

src/handlers/http/modal/server.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,11 @@ impl ParseableServer for Server {
134134
sync::run_local_sync().await;
135135
let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) =
136136
sync::object_store_sync().await;
137-
137+
let (
138+
mut remote_conversion_handler,
139+
mut remote_conversion_outbox,
140+
mut remote_conversion_inbox,
141+
) = sync::arrow_conversion().await;
138142
if CONFIG.options.send_analytics {
139143
analytics::init_analytics_scheduler()?;
140144
}
@@ -152,12 +156,16 @@ impl ParseableServer for Server {
152156
// actix server finished .. stop other threads and stop the server
153157
remote_sync_inbox.send(()).unwrap_or(());
154158
localsync_inbox.send(()).unwrap_or(());
159+
remote_conversion_inbox.send(()).unwrap_or(());
155160
if let Err(e) = localsync_handler.await {
156161
error!("Error joining remote_sync_handler: {:?}", e);
157162
}
158163
if let Err(e) = remote_sync_handler.await {
159164
error!("Error joining remote_sync_handler: {:?}", e);
160165
}
166+
if let Err(e) = remote_conversion_handler.await {
167+
error!("Error joining remote_conversion_handler: {:?}", e);
168+
}
161169
return e
162170
},
163171
_ = &mut localsync_outbox => {
@@ -171,6 +179,13 @@ impl ParseableServer for Server {
171179
error!("Error joining remote_sync_handler: {:?}", e);
172180
}
173181
(remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync().await;
182+
},
183+
_ = &mut remote_conversion_outbox => {
184+
// remote_conversion failed, this is recoverable by just starting remote_conversion thread again
185+
if let Err(e) = remote_conversion_handler.await {
186+
error!("Error joining remote_conversion_handler: {:?}", e);
187+
}
188+
(remote_conversion_handler, remote_conversion_outbox, remote_conversion_inbox) = sync::arrow_conversion().await;
174189
}
175190

176191
};

src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,8 @@ pub use handlers::http::modal::{
5656
use once_cell::sync::Lazy;
5757
use reqwest::{Client, ClientBuilder};
5858

59-
pub const STORAGE_UPLOAD_INTERVAL: u32 = 60;
59+
pub const STORAGE_CONVERSION_INTERVAL: u32 = 60;
60+
pub const STORAGE_UPLOAD_INTERVAL: u32 = 30;
6061

6162
// A single HTTP client for all outgoing HTTP requests from the parseable server
6263
static HTTP_CLIENT: Lazy<Client> = Lazy::new(|| {

src/staging/streams.rs

Lines changed: 55 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
use std::{
2121
collections::HashMap,
22-
fs::{remove_file, OpenOptions},
22+
fs::{remove_file, File, OpenOptions},
2323
path::{Path, PathBuf},
2424
process,
2525
sync::{Arc, Mutex, RwLock},
@@ -165,6 +165,11 @@ impl<'a> Stream<'a> {
165165
paths
166166
}
167167

168+
/// Groups arrow files which are to be included in one parquet
169+
///
170+
/// Excludes the arrow file being written for the current minute (data is still being written to that one)
171+
///
172+
/// Only includes ones starting from the previous minute
168173
pub fn arrow_files_grouped_exclude_time(
169174
&self,
170175
exclude: NaiveDateTime,
@@ -173,6 +178,8 @@ impl<'a> Stream<'a> {
173178
let mut grouped_arrow_file: HashMap<PathBuf, Vec<PathBuf>> = HashMap::new();
174179
let mut arrow_files = self.arrow_files();
175180

181+
// if the shutdown signal is false i.e. normal condition
182+
// don't keep the ones for the current minute
176183
if !shutdown_signal {
177184
arrow_files.retain(|path| {
178185
!path
@@ -215,6 +222,45 @@ impl<'a> Stream<'a> {
215222
.collect()
216223
}
217224

225+
pub fn schema_files(&self) -> Vec<PathBuf> {
226+
let Ok(dir) = self.data_path.read_dir() else {
227+
return vec![];
228+
};
229+
230+
dir.flatten()
231+
.map(|file| file.path())
232+
.filter(|file| file.extension().is_some_and(|ext| ext.eq("schema")))
233+
.collect()
234+
}
235+
236+
pub fn get_schemas_if_present(&self) -> Option<Vec<Schema>> {
237+
let Ok(dir) = self.data_path.read_dir() else {
238+
return None;
239+
};
240+
241+
let mut schemas: Vec<Schema> = Vec::new();
242+
243+
for file in dir.flatten() {
244+
if let Some(ext) = file.path().extension() {
245+
if ext.eq("schema") {
246+
let file = File::open(file.path()).expect("Schema File should exist");
247+
248+
let schema = match serde_json::from_reader(file) {
249+
Ok(schema) => schema,
250+
Err(_) => continue,
251+
};
252+
schemas.push(schema);
253+
}
254+
}
255+
}
256+
257+
if !schemas.is_empty() {
258+
Some(schemas)
259+
} else {
260+
None
261+
}
262+
}
263+
218264
fn arrow_path_to_parquet(path: &Path, random_string: &str) -> PathBuf {
219265
let filename = path.file_stem().unwrap().to_str().unwrap();
220266
let (_, filename) = filename.split_once('.').unwrap();
@@ -249,6 +295,9 @@ impl<'a> Stream<'a> {
249295
}
250296
}
251297

298+
/// This function reads arrow files, groups their schemas
299+
///
300+
/// converts them into parquet files and returns a merged schema
252301
pub fn convert_disk_files_to_parquet(
253302
&self,
254303
time_partition: Option<&String>,
@@ -272,12 +321,12 @@ impl<'a> Stream<'a> {
272321
}
273322

274323
// warn!("staging files-\n{staging_files:?}\n");
275-
for (parquet_path, files) in staging_files {
324+
for (parquet_path, arrow_files) in staging_files {
276325
metrics::STAGING_FILES
277326
.with_label_values(&[&self.stream_name])
278-
.set(files.len() as i64);
327+
.set(arrow_files.len() as i64);
279328

280-
for file in &files {
329+
for file in &arrow_files {
281330
let file_size = file.metadata().unwrap().len();
282331
let file_type = file.extension().unwrap().to_str().unwrap();
283332

@@ -286,7 +335,7 @@ impl<'a> Stream<'a> {
286335
.add(file_size as i64);
287336
}
288337

289-
let record_reader = MergedReverseRecordReader::try_new(&files);
338+
let record_reader = MergedReverseRecordReader::try_new(&arrow_files);
290339
if record_reader.readers.is_empty() {
291340
continue;
292341
}
@@ -319,7 +368,7 @@ impl<'a> Stream<'a> {
319368
);
320369
remove_file(parquet_path).unwrap();
321370
} else {
322-
for file in files {
371+
for file in arrow_files {
323372
// warn!("file-\n{file:?}\n");
324373
let file_size = file.metadata().unwrap().len();
325374
let file_type = file.extension().unwrap().to_str().unwrap();

0 commit comments

Comments
 (0)