@@ -6,6 +6,7 @@ use futures::future;
6
6
use futures:: stream:: BoxStream ;
7
7
use futures_util:: { Future , StreamExt , TryFutureExt , TryStreamExt } ;
8
8
use http_auth_basic:: Credentials ;
9
+ use rand:: distributions:: { Alphanumeric , DistString } ;
9
10
use tonic:: metadata:: MetadataMap ;
10
11
use tonic:: transport:: Server ;
11
12
use tonic:: { Request , Response , Status , Streaming } ;
@@ -22,6 +23,7 @@ use crate::livetail::{Message, LIVETAIL};
22
23
use crate :: metadata:: STREAM_INFO ;
23
24
use crate :: rbac:: map:: SessionKey ;
24
25
use crate :: rbac:: { self , Users } ;
26
+ use crate :: utils;
25
27
26
28
use super :: SESSION_COOKIE_NAME ;
27
29
@@ -90,10 +92,15 @@ impl FlightService for FlightServiceImpl {
90
92
. schema ( stream)
91
93
. map_err ( |err| Status :: failed_precondition ( err. to_string ( ) ) ) ?;
92
94
93
- let rx = LIVETAIL . new_pipe ( "a" . to_string ( ) , stream. to_string ( ) ) ;
94
- let rx = rx. filter_map ( |x| {
95
+ let rx = LIVETAIL . new_pipe (
96
+ Alphanumeric . sample_string ( & mut rand:: thread_rng ( ) , 32 ) ,
97
+ stream. to_string ( ) ,
98
+ ) ;
99
+
100
+ let adapter_schema = schema. clone ( ) ;
101
+ let rx = rx. filter_map ( move |x| {
95
102
future:: ready ( match x {
96
- Message :: Record ( t) => Some ( Ok ( t ) ) ,
103
+ Message :: Record ( t) => Some ( Ok ( utils :: arrow :: adapt_batch ( & adapter_schema , & t ) ) ) ,
97
104
_ => None ,
98
105
} )
99
106
} ) ;
0 commit comments