Skip to content

Commit 7f4da55

Browse files
authored
add live tail support for stream (#516)
This PR adds support for live tailing a stream. With this PR, server exposes a gRPC server on a configurable port (default 8001) and uses Arrow Flight as the communication protocol. As live tail is requested for a stream, Server sends events from that steam, before the local staging and parquet conversion. This means, server will use the event record batch to create a flight stream and send over to the client. fixes #503
1 parent 24be36f commit 7f4da55

File tree

10 files changed

+767
-42
lines changed

10 files changed

+767
-42
lines changed

Cargo.lock

Lines changed: 292 additions & 33 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

server/Cargo.toml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,13 @@ actix-web = { version = "4.3", features = ["rustls"] }
2424
actix-cors = "0.6"
2525
actix-web-prometheus = { version = "0.1" }
2626
actix-web-static-files = "4.0"
27+
mime = "0.3.17"
28+
29+
### LiveTail server deps
30+
arrow-flight = "42.0.0"
31+
tonic = "0.9.0"
32+
tonic-web = "0.9.0"
33+
tower-http = { version = "0.4.4", features = ["cors"] }
2734

2835
### other dependencies
2936
anyhow = { version = "1.0", features = ["backtrace"] }
@@ -91,6 +98,9 @@ nom = "7.1.3"
9198
humantime = "2.1.0"
9299
openid = { version = "0.12.0", default-features = false, features = ["rustls"] }
93100
url = "2.4.0"
101+
http-auth-basic = "0.3.3"
102+
cookies = "0.0.1"
103+
cookie = "0.17.0"
94104

95105
[build-dependencies]
96106
cargo_toml = "0.15"

server/src/analytics.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ fn build_metrics() -> HashMap<String, Value> {
155155
metrics
156156
}
157157

158-
pub async fn init_analytics_scheduler() {
158+
pub fn init_analytics_scheduler() {
159159
log::info!("Setting up schedular for anonymous user analytics");
160160

161161
let mut scheduler = AsyncScheduler::new();

server/src/event.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ impl Event {
6363
num_rows,
6464
)?;
6565

66+
crate::livetail::LIVETAIL.process(&self.stream_name, &self.rb);
67+
6668
if let Err(e) = metadata::STREAM_INFO
6769
.check_alerts(&self.stream_name, self.rb)
6870
.await

server/src/handlers.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,15 @@
1717
*/
1818

1919
pub mod http;
20+
pub mod livetail;
2021

2122
const PREFIX_TAGS: &str = "x-p-tag-";
2223
const PREFIX_META: &str = "x-p-meta-";
2324
const STREAM_NAME_HEADER_KEY: &str = "x-p-stream";
2425
const FILL_NULL_OPTION_KEY: &str = "send_null";
2526
const SEPARATOR: char = '^';
27+
28+
const OIDC_SCOPE: &str = "openid profile email";
29+
const COOKIE_AGE_DAYS: usize = 7;
30+
const SESSION_COOKIE_NAME: &str = "session";
31+
const USER_COOKIE_NAME: &str = "username";

server/src/handlers/http/oidc.rs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use ulid::Ulid;
3131
use url::Url;
3232

3333
use crate::{
34+
handlers::{COOKIE_AGE_DAYS, OIDC_SCOPE, SESSION_COOKIE_NAME, USER_COOKIE_NAME},
3435
oidc::{Claims, DiscoveredClient},
3536
option::CONFIG,
3637
rbac::{
@@ -42,10 +43,6 @@ use crate::{
4243
utils::actix::extract_session_key_from_req,
4344
};
4445

45-
// fetch common personalization scope to determine username.
46-
const SCOPE: &str = "openid profile email";
47-
const COOKIE_AGE_DAYS: usize = 7;
48-
4946
/// Struct representing query params returned from oidc provider
5047
#[derive(Deserialize, Debug)]
5148
pub struct Login {
@@ -182,7 +179,7 @@ fn redirect_to_oidc(
182179
) -> HttpResponse {
183180
let redirect = query.into_inner().redirect.to_string();
184181
let auth_url = oidc_client.auth_url(&Options {
185-
scope: Some(SCOPE.into()),
182+
scope: Some(OIDC_SCOPE.into()),
186183
state: Some(redirect),
187184
..Default::default()
188185
});
@@ -222,7 +219,7 @@ fn redirect_no_oauth_setup(mut url: Url) -> HttpResponse {
222219
}
223220

224221
fn cookie_session(id: Ulid) -> Cookie<'static> {
225-
let authorization_cookie = Cookie::build("session", id.to_string())
222+
let authorization_cookie = Cookie::build(SESSION_COOKIE_NAME, id.to_string())
226223
.max_age(time::Duration::days(COOKIE_AGE_DAYS as i64))
227224
.same_site(SameSite::Strict)
228225
.path("/")
@@ -231,7 +228,7 @@ fn cookie_session(id: Ulid) -> Cookie<'static> {
231228
}
232229

233230
fn cookie_username(username: &str) -> Cookie<'static> {
234-
let authorization_cookie = Cookie::build("username", username.to_string())
231+
let authorization_cookie = Cookie::build(USER_COOKIE_NAME, username.to_string())
235232
.max_age(time::Duration::days(COOKIE_AGE_DAYS as i64))
236233
.same_site(SameSite::Strict)
237234
.path("/")

server/src/handlers/livetail.rs

Lines changed: 242 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,242 @@
1+
/*
2+
* Parseable Server (C) 2022 - 2023 Parseable, Inc.
3+
*
4+
* This program is free software: you can redistribute it and/or modify
5+
* it under the terms of the GNU Affero General Public License as
6+
* published by the Free Software Foundation, either version 3 of the
7+
* License, or (at your option) any later version.
8+
*
9+
* This program is distributed in the hope that it will be useful,
10+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
* GNU Affero General Public License for more details.
13+
*
14+
* You should have received a copy of the GNU Affero General Public License
15+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
16+
*
17+
*/
18+
19+
use std::net::SocketAddr;
20+
21+
use arrow_array::RecordBatch;
22+
use arrow_flight::encode::FlightDataEncoderBuilder;
23+
use cookie::Cookie;
24+
use futures::stream::BoxStream;
25+
use futures_util::{Future, StreamExt, TryFutureExt, TryStreamExt};
26+
use http_auth_basic::Credentials;
27+
use rand::distributions::{Alphanumeric, DistString};
28+
use tonic::metadata::MetadataMap;
29+
use tonic::transport::Server;
30+
use tonic::{Request, Response, Status, Streaming};
31+
32+
use arrow_flight::{
33+
flight_service_server::FlightService, flight_service_server::FlightServiceServer, Action,
34+
ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo, HandshakeRequest,
35+
HandshakeResponse, PutResult, SchemaResult, Ticket,
36+
};
37+
use tonic_web::GrpcWebLayer;
38+
use tower_http::cors::{Any, CorsLayer};
39+
40+
use crate::livetail::{Message, LIVETAIL};
41+
use crate::metadata::STREAM_INFO;
42+
use crate::option::CONFIG;
43+
use crate::rbac::map::SessionKey;
44+
use crate::rbac::{self, Users};
45+
use crate::utils;
46+
47+
use super::SESSION_COOKIE_NAME;
48+
49+
#[derive(Clone)]
50+
pub struct FlightServiceImpl {}
51+
52+
#[tonic::async_trait]
53+
impl FlightService for FlightServiceImpl {
54+
type HandshakeStream = BoxStream<'static, Result<HandshakeResponse, Status>>;
55+
type ListFlightsStream = BoxStream<'static, Result<FlightInfo, Status>>;
56+
type DoGetStream = BoxStream<'static, Result<FlightData, Status>>;
57+
type DoPutStream = BoxStream<'static, Result<PutResult, Status>>;
58+
type DoActionStream = BoxStream<'static, Result<arrow_flight::Result, Status>>;
59+
type ListActionsStream = BoxStream<'static, Result<ActionType, Status>>;
60+
type DoExchangeStream = BoxStream<'static, Result<FlightData, Status>>;
61+
62+
async fn handshake(
63+
&self,
64+
_request: Request<Streaming<HandshakeRequest>>,
65+
) -> Result<Response<Self::HandshakeStream>, Status> {
66+
Err(Status::unimplemented(
67+
"handshake is disabled in favour of direct authentication and authorization",
68+
))
69+
}
70+
71+
async fn list_flights(
72+
&self,
73+
_request: Request<Criteria>,
74+
) -> Result<Response<Self::ListFlightsStream>, Status> {
75+
Err(Status::unimplemented("Implement list_flights"))
76+
}
77+
78+
async fn get_flight_info(
79+
&self,
80+
_request: Request<FlightDescriptor>,
81+
) -> Result<Response<FlightInfo>, Status> {
82+
Err(Status::unimplemented("Implement get_flight_info"))
83+
}
84+
85+
async fn get_schema(
86+
&self,
87+
_request: Request<FlightDescriptor>,
88+
) -> Result<Response<SchemaResult>, Status> {
89+
Err(Status::unimplemented("Implement get_schema"))
90+
}
91+
92+
async fn do_get(&self, req: Request<Ticket>) -> Result<Response<Self::DoGetStream>, Status> {
93+
let key = extract_session_key(req.metadata())?;
94+
let ticket: serde_json::Value = serde_json::from_slice(&req.into_inner().ticket)
95+
.map_err(|err| Status::internal(err.to_string()))?;
96+
let stream = extract_stream(&ticket)?;
97+
log::info!("livetail requested for stream {}", stream);
98+
match Users.authorize(key, rbac::role::Action::Query, Some(stream), None) {
99+
rbac::Response::Authorized => (),
100+
rbac::Response::UnAuthorized => {
101+
return Err(Status::permission_denied(
102+
"user is not authenticated to access this resource",
103+
))
104+
}
105+
rbac::Response::ReloadRequired => {
106+
return Err(Status::unauthenticated("reload required"))
107+
}
108+
}
109+
110+
let schema = STREAM_INFO
111+
.schema(stream)
112+
.map_err(|err| Status::failed_precondition(err.to_string()))?;
113+
114+
let rx = LIVETAIL.new_pipe(
115+
Alphanumeric.sample_string(&mut rand::thread_rng(), 32),
116+
stream.to_string(),
117+
);
118+
119+
let adapter_schema = schema.clone();
120+
let rx = rx.map(move |x| match x {
121+
Message::Record(t) => Ok(utils::arrow::adapt_batch(&adapter_schema, &t)),
122+
Message::Skipped(_) => {
123+
log::warn!("livetail channel capacity is full.");
124+
Ok(RecordBatch::new_empty(adapter_schema.clone()))
125+
}
126+
});
127+
128+
let rb_stream = FlightDataEncoderBuilder::new()
129+
.with_schema(schema)
130+
.build(rx);
131+
132+
let rb_stream = rb_stream.map_err(|err| Status::unknown(err.to_string()));
133+
Ok(Response::new(Box::pin(rb_stream)))
134+
}
135+
136+
async fn do_put(
137+
&self,
138+
_request: Request<Streaming<FlightData>>,
139+
) -> Result<Response<Self::DoPutStream>, Status> {
140+
Err(Status::unimplemented("Implement do_put"))
141+
}
142+
143+
async fn do_action(
144+
&self,
145+
_request: Request<Action>,
146+
) -> Result<Response<Self::DoActionStream>, Status> {
147+
Err(Status::unimplemented("Implement do_action"))
148+
}
149+
150+
async fn list_actions(
151+
&self,
152+
_request: Request<Empty>,
153+
) -> Result<Response<Self::ListActionsStream>, Status> {
154+
Err(Status::unimplemented("Implement list_actions"))
155+
}
156+
157+
async fn do_exchange(
158+
&self,
159+
_request: Request<Streaming<FlightData>>,
160+
) -> Result<Response<Self::DoExchangeStream>, Status> {
161+
Err(Status::unimplemented("Implement do_exchange"))
162+
}
163+
}
164+
165+
pub fn server() -> impl Future<Output = Result<(), Box<dyn std::error::Error + Send>>> + Send {
166+
let mut addr: SocketAddr = CONFIG
167+
.parseable
168+
.address
169+
.parse()
170+
.expect("valid socket address");
171+
addr.set_port(CONFIG.parseable.grpc_port);
172+
173+
let service = FlightServiceImpl {};
174+
175+
let svc = FlightServiceServer::new(service);
176+
177+
let cors = CorsLayer::new()
178+
// allow `GET` and `POST` when accessing the resource
179+
.allow_methods(Any)
180+
.allow_headers(Any)
181+
.allow_origin(Any);
182+
// allow requests from any origin
183+
184+
Server::builder()
185+
.accept_http1(true)
186+
.layer(cors)
187+
.layer(GrpcWebLayer::new())
188+
.add_service(svc)
189+
.serve(addr)
190+
.map_err(|err| Box::new(err) as Box<dyn std::error::Error + Send>)
191+
}
192+
193+
fn extract_stream(body: &serde_json::Value) -> Result<&str, Status> {
194+
body.as_object()
195+
.ok_or(Status::invalid_argument("expected object in request body"))?
196+
.get("stream")
197+
.ok_or(Status::invalid_argument("stream key value is not provided"))?
198+
.as_str()
199+
.ok_or(Status::invalid_argument("stream key value is invalid"))
200+
}
201+
202+
fn extract_session_key(headers: &MetadataMap) -> Result<SessionKey, Status> {
203+
// Extract username and password from the request using basic auth extractor.
204+
let basic = extract_basic_auth(headers).map(|creds| SessionKey::BasicAuth {
205+
username: creds.user_id,
206+
password: creds.password,
207+
});
208+
209+
if let Some(basic) = basic {
210+
return Ok(basic);
211+
}
212+
213+
let session = extract_cookie(headers)
214+
.map(|cookie| ulid::Ulid::from_string(cookie.value()))
215+
.transpose()
216+
.map_err(|_| Status::invalid_argument("Cookie is tampered with or invalid"))?;
217+
218+
if let Some(session) = session {
219+
return Ok(SessionKey::SessionId(session));
220+
}
221+
222+
Err(Status::unauthenticated("No authentication method supplied"))
223+
}
224+
225+
fn extract_basic_auth(header: &MetadataMap) -> Option<Credentials> {
226+
let creds = header
227+
.get("Authorization")
228+
.and_then(|value| value.to_str().ok())
229+
.and_then(|value| Credentials::from_header(value.to_string()).ok());
230+
creds
231+
}
232+
233+
fn extract_cookie(header: &MetadataMap) -> Option<Cookie> {
234+
let cookies = header
235+
.get("Cookies")
236+
.and_then(|value| value.to_str().ok())
237+
.map(Cookie::split_parse)?;
238+
239+
cookies
240+
.flatten()
241+
.find(|cookie| cookie.name() == SESSION_COOKIE_NAME)
242+
}

0 commit comments

Comments
 (0)