-
-
Notifications
You must be signed in to change notification settings - Fork 137
feat: prism post datasets API #1236
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughThis change adds a dataset service endpoint to the HTTP server. The implementation updates the route configuration in both the query server and server modules by registering a new method for datasets. A new asynchronous handler is introduced to process POST requests for datasets, including JSON parsing and authorization checks. Additionally, new request and response structures, along with an updated error enum in the Prism module, support dataset queries. Finally, minor improvements have been made to JSON conversion for Arrow record batches. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant Server
participant Handler
participant PrismModule
Client->>Server: POST /datasets (JSON payload)
Server->>Handler: Forward dataset request
Handler->>PrismModule: Execute get_datasets()
PrismModule-->>Handler: Return dataset info or error
Handler->>Server: Wrap response as JSON
Server-->>Client: HTTP JSON response
Possibly related PRs
Suggested labels
Suggested reviewers
Poem
Tip ⚡🧪 Multi-step agentic review comment chat (experimental)
📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (1)
🚧 Files skipped from review as they are similar to previous changes (1)
⏰ Context from checks skipped due to timeout of 90000ms (10)
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
src/prism/logstream/mod.rs (2)
196-213
: Well-documented dataset response structure.
PrismDatasetResponse
encapsulates essential stream metadata, statistics, retention, and query results. The doc comments are thorough. Consider storingdistinct_sources
in a typed struct if the shape is known, but usingserde_json::Value
is acceptable for flexibility.
229-345
: Robust dataset retrieval and aggregation logic.
- The loop processes each stream, skipping unavailable ones, which strengthens resilience.
- Hot tier information and distinct entry queries are well-integrated.
- Swallowing errors in distinct lookups is suitable for partial success, but consider logging warnings on failure for better observability.
- Parallelizing the loop (using tasks or rayon) might improve performance for large stream lists, but the current sequential approach is simpler to maintain.
Overall, the code is well-structured, and the error handling strategy aligns with partial success semantics.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
src/handlers/http/modal/query_server.rs
(1 hunks)src/handlers/http/modal/server.rs
(2 hunks)src/handlers/http/prism_logstream.rs
(1 hunks)src/prism/logstream/mod.rs
(5 hunks)src/response.rs
(1 hunks)src/utils/arrow/mod.rs
(2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (10)
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: coverage
- GitHub Check: Build Default x86_64-apple-darwin
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
🔇 Additional comments (23)
src/handlers/http/modal/query_server.rs (1)
79-80
: Looks good! Added prism datasets service.The addition of the datasets service aligns well with the existing infrastructure and properly follows the established pattern next to the logstream service.
src/handlers/http/modal/server.rs (1)
184-193
: Implementation of datasets endpoint follows best practices.The implementation follows the established pattern for route declaration and properly includes the necessary authorization checks for stream info, stats, and retention.
src/response.rs (1)
36-36
: Good refactoring to simplify JSON conversion.This change removes an unnecessary intermediate step, directly passing the record batches to the conversion function. This simplifies the code while maintaining the same functionality.
src/handlers/http/prism_logstream.rs (3)
20-20
: Updated imports to support JSON handling.Added the Json type import to support parsing request bodies for the new datasets endpoint.
24-24
: Added necessary imports for dataset functionality.Included PrismDatasetRequest to support the new datasets endpoint functionality.
33-38
: Well-implemented handler for the datasets endpoint.This handler properly:
- Takes a JSON payload via the
Json
extractor- Processes the request asynchronously
- Returns a properly formatted JSON response
- Includes appropriate error handling
The comment also clearly explains that this endpoint combines functionality from multiple other endpoints.
src/utils/arrow/mod.rs (3)
93-93
: Refactored function signature for enhanced clarity.Switching to a slice of owned
RecordBatch
objects (&[RecordBatch]
) ensures a more standard API and simplifies usage, preventing potential reference lifetime pitfalls.
96-98
: Looping over batches is straightforward and concise.Writing each
RecordBatch
in a loop is a clear and maintainable approach. The usage of the?
operator for error propagation is consistent with idiomatic Rust error handling.
194-194
: Consistent with the updated function signature.Switching from
vec![&r]
tovec![r]
matches the new function parameter style. This change eliminates the need for references toRecordBatch
in this context.src/prism/logstream/mod.rs (14)
25-27
: New imports for serialization, JSON handling, and debugging.Adding
serde
,serde_json
, andtracing
imports is appropriate for the newly introduced dataset and logging features.
36-36
: Extended query imports.Introducing
Query
andQueryError
in the imports indicates new query functionalities specific to this module, streamlining error handling and query building.
38-38
: Hot Tier Manager integration.Importing
HotTierError
,HotTierManager
, andStreamHotTier
suggests advanced stream management and a new dimension of error handling for hot tier capabilities.
40-40
: Expanded query utilities.Bringing in
execute
,CountsRequest
,CountsResponse
, andQUERY_SESSION
aligns with your new approach to handle count queries within the streaming logic.
43-46
: Utility imports for record batch JSON conversion and time parsing.Using
record_batches_to_json
andTimeRange
helps unify record transformations and date handling, improving modularity and readability across the codebase.
215-227
: Clear request structure for dataset queries.
PrismDatasetRequest
neatly defines the data needed to retrieve datasets. The user-friendly doc comments help clarify field usage. Ensure upstream validation rejects invalid or empty time strings if needed.
355-355
: New error annotation for hot tier.
#[error("Hottier: {0}")]
extends clarity by rendering a descriptive message when a hot tier-related error occurs.
356-357
: Introducing Hottier variant with from-attribute.
Hottier(#[from] HotTierError)
andQuery(#[from] QueryError)
make error handling consistent and concise throughthiserror
.
359-359
: Readable time parse error message.
#[error("TimeParse: {0}")]
ensures that users can quickly identify problems with time string inputs.
360-361
: Time parsing and execute error variants.
TimeParse(#[from] TimeParseError)
and#[error("Execute: {0}")]
unify typical parse and execution failures within the same error enumeration.
363-364
: Empty stream list checks.Returning a
BAD_REQUEST
(see below) for an empty stream list is a sensible choice, guiding the user to provide at least one stream.
373-373
: Mapping hot tier errors to 500.
PrismLogstreamError::Hottier(_) => StatusCode::INTERNAL_SERVER_ERROR
is consistent with critical server-side failure logic.
374-375
: Query and time parse error statuses.
- Query errors remain 500, indicating back-end failures.
- Time parse errors map to 404, guiding the user to correct the time string usage.
377-377
: Empty request yields a 400 Bad Request.Returning
StatusCode::BAD_REQUEST
helps users recognize that their request must contain valid streams.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (1)
src/prism/logstream/mod.rs (1)
244-315
: Consider parallel processing for multiple streams.For requests with many streams, processing each stream sequentially might lead to increased response times. Consider using parallel processing with
futures::future::join_all
to improve performance for multi-stream requests.- let mut responses = vec![]; - for stream in self.streams.iter() { - // existing processing logic - } + // Create a vector of futures for each stream + let futures = self.streams.iter().map(|stream| { + let stream = stream.clone(); + let start_time = self.start_time.clone(); + let end_time = self.end_time.clone(); + let key = key.clone(); + + async move { + // Skip if unauthorized or stream not found + if Users.authorize(key, Action::ListStream, Some(&stream), None) + != crate::rbac::Response::Authorized + { + warn!("Unauthorized access requested for stream: {stream}"); + return None; + } + + if PARSEABLE.check_or_load_stream(&stream).await { + debug!("Stream not found: {stream}"); + return None; + } + + // Process stream (existing logic) + match process_single_stream(&stream, &start_time, &end_time).await { + Ok(response) => Some(response), + Err(e) => { + warn!("Error processing stream {stream}: {e}"); + None + } + } + } + }); + + // Process all streams in parallel + let responses: Vec<_> = futures::future::join_all(futures) + .await + .into_iter() + .filter_map(|result| result) + .collect();You would need to extract the stream processing logic into a separate
process_single_stream
function.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
src/handlers/http/prism_logstream.rs
(1 hunks)src/prism/logstream/mod.rs
(5 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (10)
- GitHub Check: coverage
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Build Default x86_64-apple-darwin
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
🔇 Additional comments (5)
src/handlers/http/prism_logstream.rs (1)
32-38
: Function implementation looks good.The new
post_datasets
function correctly extracts the session key from the request, processes the dataset query, and returns a JSON response. The implementation follows the established pattern in the codebase.src/prism/logstream/mod.rs (4)
197-214
: Well-structured response model with comprehensive documentation.The
PrismDatasetResponse
struct is appropriately designed to encapsulate all relevant information about a dataset, including stream info, stats, retention policy, and query results. The documentation clearly explains each field's purpose.
216-228
: Request model follows good design practices.The
PrismDatasetRequest
struct properly uses Rust idioms like#[serde(rename_all = "camelCase")]
for JSON serialization/deserialization and provides clear field documentation.
367-376
: Good error handling extension.The additions to the
PrismLogstreamError
enum appropriately cover all the new error cases that might occur during dataset operations. This ensures proper error propagation and handling throughout the system.
385-389
: Appropriate HTTP status codes for error types.The status code mapping for the new error variants is appropriate, with authentication errors returning 401, time parsing errors returning 404, and internal processing errors returning 500.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
src/handlers/http/prism_logstream.rs (1)
36-45
: New endpoint implementation for dataset retrieval.The new
post_datasets
function is well-structured and follows established patterns:
- It extracts the session key for authentication
- Passes the session key to the dataset retrieval method
- Returns the result as JSON
The function correctly uses async/await with proper error propagation through the
?
operator.Consider adding more specific documentation about:
- The expected format of the request payload
- The response structure
- Any authorization requirements
Enhance the function documentation to be more specific about request/response format:
-/// A combination of /stats, /retention, /hottier, /info, /counts and /query +/// Retrieves dataset information for a Prism logstream +/// +/// Takes a JSON payload with stream identifier and time range, and returns +/// dataset details including creation time, stream type, log source, and +/// various statistics related to ingestion and storage. +/// +/// Requires authentication via session key.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/handlers/http/prism_logstream.rs
(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (10)
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: Build Default x86_64-apple-darwin
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: coverage
🔇 Additional comments (4)
src/handlers/http/prism_logstream.rs (4)
19-22
: Added HttpRequest and Json imports for new functionality.The imports are correctly added to support the new
post_datasets
function, providing the necessary components for handling JSON payloads and accessing HTTP request details.
24-27
: Updated imports to include PrismDatasetRequest type.The imports are properly organized to include the new
PrismDatasetRequest
type and the session key extraction utility, maintaining good code organization.
37-40
: Function signature follows best practices.The function signature correctly uses Actix's JSON extractor pattern for the request body and includes the HttpRequest parameter for session key extraction.
41-44
:❓ Verification inconclusive
Verify authorization checks in the get_datasets implementation.
The session key extraction and passing looks good. However, this code doesn't show the implementation of the actual authorization check that happens within
get_datasets
.
🏁 Script executed:
#!/bin/bash # Check if there are proper authorization checks in the get_datasets implementation rg -A 10 "fn get_datasets.*session_key" --type rustLength of output: 53
ACTION: Additional authorization verification required via manual or extended regex search
The session key extraction logic is solid, but our initial regex search did not locate the
get_datasets
implementation for verifying its internal authorization checks. Please investigate further by:
- Locating the full definition of
fn get_datasets
to inspect for any authorization-related logic.- Checking for keywords like
authorize
,permission
, orauth
in the function’s implementation.For example, run this extended shell script to search all Rust files for the definition and check for authorization check patterns:
#!/bin/bash # Find all occurrences of the get_datasets function and inspect following lines for authorization logic rg "fn get_datasets" --type rust # For each file found, display the function's context and look for authorization keywords for file in $(rg -l "fn get_datasets" --type rust); do echo "Inspecting file: $file" rg -A 15 "fn get_datasets" "$file" | grep -iE "authorize|permission|auth" donePlease verify the output manually to ensure that appropriate authorization checks are implemented.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
src/prism/logstream/mod.rs (1)
226-312
: Consider making time ranges configurable in dataset retrieval.The implementation is robust with good error handling and authorization checks. However, the time ranges for
CountsRequest
are hardcoded to "1h" and "now". Consider making these configurable through the API request to improve flexibility.#[derive(Deserialize, Default)] #[serde(rename_all = "camelCase")] pub struct PrismDatasetRequest { /// List of stream names to query #[serde(default)] streams: Vec<String>, + /// Start time for queries (default: "1h") + #[serde(default = "default_start_time")] + start_time: String, + /// End time for queries (default: "now") + #[serde(default = "default_end_time")] + end_time: String, } + +fn default_start_time() -> String { + "1h".to_string() +} + +fn default_end_time() -> String { + "now".to_string() +}Then update the relevant parts of the implementation to use these values:
let records = CountsRequest { stream: stream.clone(), - start_time: "1h".to_owned(), - end_time: "now".to_owned(), + start_time: self.start_time.clone(), + end_time: self.end_time.clone(), num_bins: 1, }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
src/handlers/http/prism_logstream.rs
(1 hunks)src/prism/logstream/mod.rs
(5 hunks)
🧰 Additional context used
🧠 Learnings (1)
src/prism/logstream/mod.rs (1)
Learnt from: de-sh
PR: parseablehq/parseable#1236
File: src/prism/logstream/mod.rs:332-332
Timestamp: 2025-03-13T11:39:52.587Z
Learning: SQL injection concerns can be ignored in this codebase as all SQL queries are run against immutable data streams, limiting the potential impact of any injection.
⏰ Context from checks skipped due to timeout of 90000ms (10)
- GitHub Check: Build Default x86_64-pc-windows-msvc
- GitHub Check: Build Kafka aarch64-apple-darwin
- GitHub Check: Build Default aarch64-unknown-linux-gnu
- GitHub Check: Build Default x86_64-apple-darwin
- GitHub Check: Build Default aarch64-apple-darwin
- GitHub Check: Build Default x86_64-unknown-linux-gnu
- GitHub Check: Quest Smoke and Load Tests for Standalone deployments
- GitHub Check: Quest Smoke and Load Tests for Distributed deployments
- GitHub Check: Build Kafka x86_64-unknown-linux-gnu
- GitHub Check: coverage
🔇 Additional comments (8)
src/handlers/http/prism_logstream.rs (3)
20-21
: New imports support dataset API endpoint functionality.The additional imports (
Json
,HttpRequest
) facilitate the JSON parsing and request handling capabilities needed for the new dataset POST endpoint.
24-27
: Module imports updated to include dataset-related components.The imports correctly include the new
PrismDatasetRequest
type and reuse the existing authorization utility function.
36-49
: Well-implemented dataset POST handler with clear error handling.The implementation follows established patterns for Actix Web handlers:
- Properly extracts session key for authorization
- Handles optional JSON payload with sensible defaults
- Delegates dataset retrieval logic to the model layer
- Returns a properly formatted JSON response
This maintains separation of concerns between HTTP handling and business logic.
src/prism/logstream/mod.rs (5)
197-214
: Well-structured and documented response type for dataset queries.The
PrismDatasetResponse
struct provides a comprehensive representation of dataset information with clear field documentation. The structure follows good API design by grouping related information together.
216-224
: Request structure uses proper serialization attributes.The
PrismDatasetRequest
struct is properly annotated with#[serde(rename_all = "camelCase")]
for consistent JSON field naming in the API. The#[serde(default)]
on thestreams
field ensures the API remains backward compatible if the field is omitted.
323-353
: SQL query construction appears safe in this context.The method builds a SQL query by directly interpolating user-provided field names. While this would normally be a SQL injection risk, I understand from the provided context that SQL injection concerns are mitigated in this codebase because queries are run against immutable data streams.
The method effectively retrieves distinct entries and processes the results appropriately. Consider also making the time range configurable here, matching the previous recommendation.
364-373
: Error types appropriately expanded to cover all failure modes.The new error variants in
PrismLogstreamError
properly account for all possible failure modes in the dataset retrieval process. This ensures that errors are accurately reported and can be handled appropriately.
382-386
: Status codes for new error types are appropriate.The mapping of error types to HTTP status codes follows RESTful conventions:
- Internal server errors for processing failures
- Not found for time parsing errors (consistent with resource not found semantics)
- Unauthorized for authentication errors
|
Signed-off-by: Devdutt Shenoi <[email protected]>
Fixes #XXXX.
Description
Example
Request:
Response:
NOTE: returns for all streams when none are mentioned or body is empty
This PR has:
Summary by CodeRabbit
Summary by CodeRabbit
New Features
Refactor