Skip to content

Commit 1c0e509

Browse files
author
Devdutt Shenoi
committed
Merge remote-tracking branch 'origin/main' into reflect
2 parents 367a5a6 + 6fe35a6 commit 1c0e509

File tree

19 files changed

+437
-237
lines changed

19 files changed

+437
-237
lines changed

.github/workflows/build-push-edge-debug.yaml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,5 +45,3 @@ jobs:
4545
push: true
4646
tags: parseable/parseable:edge-debug
4747
platforms: linux/amd64,linux/arm64
48-
cache-from: type=gha
49-
cache-to: type=gha,mode=max

.github/workflows/build-push-edge.yaml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,5 +45,3 @@ jobs:
4545
push: true
4646
tags: parseable/parseable:edge
4747
platforms: linux/amd64,linux/arm64
48-
cache-from: type=gha
49-
cache-to: type=gha,mode=max

Cargo.lock

Lines changed: 1 addition & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "parseable"
3-
version = "1.7.4"
3+
version = "1.7.5"
44
authors = ["Parseable Team <[email protected]>"]
55
edition = "2021"
66
rust-version = "1.83.0"
@@ -128,7 +128,6 @@ sha1_smol = { version = "1.0", features = ["std"] }
128128
static-files = "0.2"
129129
ureq = "2.12"
130130
url = "2.5"
131-
vergen = { version = "9.0", features = ["build", "cargo", "rustc", "si"] }
132131
vergen-gitcl = { version = "1.0", features = ["build", "cargo", "rustc", "si"] }
133132
zip = { version = "2.3", default-features = false, features = ["deflate"] }
134133
anyhow = "1.0"

Dockerfile

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,30 +14,23 @@
1414
# along with this program. If not, see <http://www.gnu.org/licenses/>.
1515

1616
# build stage
17-
FROM rust:1.84.0-bookworm AS builder
18-
17+
FROM rust:1.84.0-bookworm AS builder
1918

2019
LABEL org.opencontainers.image.title="Parseable"
2120
LABEL maintainer="Parseable Team <[email protected]>"
2221
LABEL org.opencontainers.image.vendor="Parseable Inc"
2322
LABEL org.opencontainers.image.licenses="AGPL-3.0"
2423

2524
WORKDIR /parseable
26-
27-
# Cache dependencies
28-
COPY Cargo.toml Cargo.lock build.rs ./
29-
RUN mkdir src && echo "fn main() {}" > src/main.rs && cargo build --release && rm -rf src
30-
31-
# Build the actual binary
32-
COPY src ./src
25+
COPY . .
3326
RUN cargo build --release
3427

3528
# final stage
3629
FROM gcr.io/distroless/cc-debian12:latest
3730

3831
WORKDIR /parseable
3932

40-
# Copy the static binary into the final image
33+
# Copy the static shell into base image.
4134
COPY --from=builder /parseable/target/release/parseable /usr/bin/parseable
4235

4336
CMD ["/usr/bin/parseable"]

src/cli.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,14 @@ pub struct Options {
270270
)]
271271
pub row_group_size: usize,
272272

273+
#[arg(
274+
long,
275+
env = "P_EXECUTION_BATCH_SIZE",
276+
default_value = "20000",
277+
help = "batch size for query execution"
278+
)]
279+
pub execution_batch_size: usize,
280+
273281
#[arg(
274282
long = "compression-algo",
275283
env = "P_PARQUET_COMPRESSION_ALGO",

src/connectors/kafka/processor.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,20 +16,20 @@
1616
*
1717
*/
1818

19-
use std::sync::Arc;
20-
2119
use async_trait::async_trait;
2220
use futures_util::StreamExt;
2321
use rdkafka::consumer::{CommitMode, Consumer};
2422
use serde_json::Value;
23+
use std::collections::HashMap;
24+
use std::sync::Arc;
2525
use tokio_stream::wrappers::ReceiverStream;
2626
use tracing::{debug, error};
2727

2828
use crate::{
2929
connectors::common::processor::Processor,
3030
event::{
3131
format::{json, EventFormat, LogSourceEntry},
32-
Event as ParseableEvent,
32+
Event as ParseableEvent, USER_AGENT_KEY,
3333
},
3434
parseable::PARSEABLE,
3535
storage::StreamType,
@@ -76,6 +76,9 @@ impl ParseableSinkProcessor {
7676
}
7777
}
7878

79+
let mut p_custom_fields = HashMap::new();
80+
p_custom_fields.insert(USER_AGENT_KEY.to_string(), "kafka".to_string());
81+
7982
let p_event = json::Event::new(Value::Array(json_vec)).into_event(
8083
stream_name.to_string(),
8184
total_payload_size,
@@ -85,6 +88,7 @@ impl ParseableSinkProcessor {
8588
time_partition.as_ref(),
8689
schema_version,
8790
StreamType::UserDefined,
91+
&p_custom_fields,
8892
)?;
8993

9094
Ok(p_event)

src/event/format/json.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ impl EventFormat for Event {
149149
time_partition: Option<&String>,
150150
schema_version: SchemaVersion,
151151
stream_type: StreamType,
152+
p_custom_fields: &HashMap<String, String>,
152153
) -> Result<super::Event, anyhow::Error> {
153154
let custom_partition_values = match custom_partitions.as_ref() {
154155
Some(custom_partition) => {
@@ -168,6 +169,7 @@ impl EventFormat for Event {
168169
static_schema_flag,
169170
time_partition,
170171
schema_version,
172+
p_custom_fields,
171173
)?;
172174

173175
Ok(super::Event {

src/event/format/mod.rs

Lines changed: 6 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use serde_json::Value;
3333
use crate::{
3434
metadata::SchemaVersion,
3535
storage::StreamType,
36-
utils::arrow::{get_field, get_timestamp_array, replace_columns},
36+
utils::arrow::{add_parseable_fields, get_field},
3737
};
3838

3939
use super::{Event, DEFAULT_TIMESTAMP_KEY};
@@ -145,9 +145,10 @@ pub trait EventFormat: Sized {
145145
static_schema_flag: bool,
146146
time_partition: Option<&String>,
147147
schema_version: SchemaVersion,
148+
p_custom_fields: &HashMap<String, String>,
148149
) -> Result<(RecordBatch, bool), AnyError> {
149150
let p_timestamp = self.get_p_timestamp();
150-
let (data, mut schema, is_first) = self.to_data(
151+
let (data, schema, is_first) = self.to_data(
151152
storage_schema,
152153
time_partition,
153154
schema_version,
@@ -161,16 +162,6 @@ pub trait EventFormat: Sized {
161162
));
162163
};
163164

164-
// add the p_timestamp field to the event schema to the 0th index
165-
schema.insert(
166-
0,
167-
Arc::new(Field::new(
168-
DEFAULT_TIMESTAMP_KEY,
169-
DataType::Timestamp(TimeUnit::Millisecond, None),
170-
true,
171-
)),
172-
);
173-
174165
// prepare the record batch and new fields to be added
175166
let mut new_schema = Arc::new(Schema::new(schema));
176167
if !Self::is_schema_matching(new_schema.clone(), storage_schema, static_schema_flag) {
@@ -179,12 +170,8 @@ pub trait EventFormat: Sized {
179170
new_schema =
180171
update_field_type_in_schema(new_schema, None, time_partition, None, schema_version);
181172

182-
let mut rb = Self::decode(data, new_schema.clone())?;
183-
rb = replace_columns(
184-
rb.schema(),
185-
&rb,
186-
&[(0, Arc::new(get_timestamp_array(p_timestamp, rb.num_rows())))],
187-
);
173+
let rb = Self::decode(data, new_schema.clone())?;
174+
let rb = add_parseable_fields(rb, p_timestamp, p_custom_fields)?;
188175

189176
Ok((rb, is_first))
190177
}
@@ -222,6 +209,7 @@ pub trait EventFormat: Sized {
222209
time_partition: Option<&String>,
223210
schema_version: SchemaVersion,
224211
stream_type: StreamType,
212+
p_custom_fields: &HashMap<String, String>,
225213
) -> Result<Event, AnyError>;
226214
}
227215

src/event/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ use chrono::NaiveDateTime;
3535
use std::collections::HashMap;
3636

3737
pub const DEFAULT_TIMESTAMP_KEY: &str = "p_timestamp";
38+
pub const USER_AGENT_KEY: &str = "p_user_agent";
39+
pub const SOURCE_IP_KEY: &str = "p_src_ip";
40+
pub const FORMAT_KEY: &str = "p_format";
3841

3942
#[derive(Clone)]
4043
pub struct Event {

src/handlers/http/audit.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use actix_web::{
2323
middleware::Next,
2424
};
2525
use actix_web_httpauth::extractors::basic::BasicAuth;
26+
use http::header::USER_AGENT;
2627
use ulid::Ulid;
2728

2829
use crate::{
@@ -85,7 +86,7 @@ pub async fn audit_log_middleware(
8586
)
8687
.with_user_agent(
8788
req.headers()
88-
.get("User-Agent")
89+
.get(USER_AGENT)
8990
.and_then(|a| a.to_str().ok())
9091
.unwrap_or_default(),
9192
)

0 commit comments

Comments
 (0)