Skip to content

Commit 48b90ff

Browse files
add user-agent for kafka
1 parent ae7deb5 commit 48b90ff

File tree

1 file changed

+7
-3
lines changed

1 file changed

+7
-3
lines changed

src/connectors/kafka/processor.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,20 +16,20 @@
1616
*
1717
*/
1818

19-
use std::sync::Arc;
20-
2119
use async_trait::async_trait;
2220
use futures_util::StreamExt;
2321
use rdkafka::consumer::{CommitMode, Consumer};
2422
use serde_json::Value;
23+
use std::collections::HashMap;
24+
use std::sync::Arc;
2525
use tokio_stream::wrappers::ReceiverStream;
2626
use tracing::{debug, error};
2727

2828
use crate::{
2929
connectors::common::processor::Processor,
3030
event::{
3131
format::{json, EventFormat, LogSourceEntry},
32-
Event as ParseableEvent,
32+
Event as ParseableEvent, USER_AGENT_KEY,
3333
},
3434
parseable::PARSEABLE,
3535
storage::StreamType,
@@ -76,6 +76,9 @@ impl ParseableSinkProcessor {
7676
}
7777
}
7878

79+
let mut p_custom_fields = HashMap::new();
80+
p_custom_fields.insert(USER_AGENT_KEY.to_string(), "kafka".to_string());
81+
7982
let p_event = json::Event::new(Value::Array(json_vec)).into_event(
8083
stream_name.to_string(),
8184
total_payload_size,
@@ -85,6 +88,7 @@ impl ParseableSinkProcessor {
8588
time_partition.as_ref(),
8689
schema_version,
8790
StreamType::UserDefined,
91+
&p_custom_fields,
8892
)?;
8993

9094
Ok(p_event)

0 commit comments

Comments
 (0)