Skip to content

Commit c56ccca

Browse files
signature update for kafka processor
1 parent 2c0bfe7 commit c56ccca

File tree

1 file changed

+7
-2
lines changed

1 file changed

+7
-2
lines changed

src/connectors/kafka/processor.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use tracing::{debug, error};
2828
use crate::{
2929
connectors::common::processor::Processor,
3030
event::{
31-
format::{json, EventFormat, LogSource},
31+
format::{json, EventFormat, LogSourceEntry},
3232
Event as ParseableEvent,
3333
},
3434
parseable::PARSEABLE,
@@ -49,9 +49,14 @@ impl ParseableSinkProcessor {
4949
.first()
5050
.map(|r| r.topic.as_str())
5151
.unwrap_or_default();
52+
let log_source_entry = LogSourceEntry::default();
5253

5354
PARSEABLE
54-
.create_stream_if_not_exists(stream_name, StreamType::UserDefined, LogSource::Json)
55+
.create_stream_if_not_exists(
56+
stream_name,
57+
StreamType::UserDefined,
58+
vec![log_source_entry],
59+
)
5560
.await?;
5661

5762
let stream = PARSEABLE.get_stream(stream_name)?;

0 commit comments

Comments
 (0)