Skip to content

Commit b15f1f3

Browse files
committed
Replace leaf search cancelation tracking task with future
1 parent 74d77ce commit b15f1f3

File tree

3 files changed

+53
-32
lines changed

3 files changed

+53
-32
lines changed

quickwit/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

quickwit/quickwit-search/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ http = { workspace = true }
2222
itertools = { workspace = true }
2323
mockall = { workspace = true }
2424
once_cell = { workspace = true }
25+
pin-project = { workspace = true }
2526
postcard = { workspace = true }
2627
prost = { workspace = true }
2728
rayon = { workspace = true }

quickwit/quickwit-search/src/service.rs

Lines changed: 51 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,16 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::future::Future;
1516
use std::pin::Pin;
1617
use std::str::FromStr;
1718
use std::sync::Arc;
19+
use std::task::{ready, Context, Poll};
1820
use std::time::{Duration, Instant};
1921

2022
use async_trait::async_trait;
2123
use bytes::Bytes;
24+
use pin_project::{pin_project, pinned_drop};
2225
use quickwit_common::uri::Uri;
2326
use quickwit_config::SearcherConfig;
2427
use quickwit_doc_mapper::DocMapper;
@@ -35,7 +38,7 @@ use quickwit_storage::{
3538
MemorySizedCache, QuickwitCache, SplitCache, StorageCache, StorageResolver,
3639
};
3740
use tantivy::aggregation::AggregationLimitsGuard;
38-
use tokio::sync::{oneshot, Semaphore};
41+
use tokio::sync::Semaphore;
3942
use tokio_stream::wrappers::UnboundedReceiverStream;
4043

4144
use crate::leaf::multi_index_leaf_search;
@@ -203,17 +206,18 @@ impl SearchService for SearchServiceImpl {
203206
.iter()
204207
.map(|req| req.split_offsets.len())
205208
.sum::<usize>();
206-
let completion_tx = start_leaf_search_metric_recording(num_splits).await;
207-
let leaf_search_response_result = multi_index_leaf_search(
208-
self.searcher_context.clone(),
209-
leaf_search_request,
210-
&self.storage_resolver,
211-
)
212-
.await;
213-
214-
completion_tx.send(leaf_search_response_result.is_ok()).ok();
215209

216-
leaf_search_response_result
210+
LeafSearchMetricsFuture {
211+
tracked: multi_index_leaf_search(
212+
self.searcher_context.clone(),
213+
leaf_search_request,
214+
&self.storage_resolver,
215+
),
216+
start: Instant::now(),
217+
targeted_splits: num_splits,
218+
status: None,
219+
}
220+
.await
217221
}
218222

219223
async fn fetch_docs(
@@ -527,37 +531,52 @@ impl SearcherContext {
527531
}
528532
}
529533

530-
/// Spawns a task that records leaf search metrics either
531-
/// - when the result is received through the returned channel (success or failure)
532-
/// - the returned channels are dropped (cancelled)
533-
#[must_use]
534-
async fn start_leaf_search_metric_recording(num_splits: usize) -> oneshot::Sender<bool> {
535-
let (completion_tx, completion_rx) = tokio::sync::oneshot::channel();
536-
let start = Instant::now();
534+
/// Wrapper around the search future to track metrics.
535+
#[pin_project(PinnedDrop)]
536+
struct LeafSearchMetricsFuture<F>
537+
where F: Future<Output = Result<LeafSearchResponse, SearchError>>
538+
{
539+
#[pin]
540+
tracked: F,
541+
start: Instant,
542+
targeted_splits: usize,
543+
status: Option<&'static str>,
544+
}
537545

538-
tokio::spawn(async move {
539-
let label_values = if let Ok(is_success) = completion_rx.await {
540-
if is_success {
541-
["success"]
542-
} else {
543-
["error"]
544-
}
545-
} else {
546-
["cancelled"]
547-
};
546+
#[pinned_drop]
547+
impl<F> PinnedDrop for LeafSearchMetricsFuture<F>
548+
where F: Future<Output = Result<LeafSearchResponse, SearchError>>
549+
{
550+
fn drop(self: Pin<&mut Self>) {
551+
let label_values = [self.status.unwrap_or("cancelled")];
548552
SEARCH_METRICS
549553
.leaf_search_requests_total
550554
.with_label_values(label_values)
551555
.inc();
552556
SEARCH_METRICS
553557
.leaf_search_request_duration_seconds
554558
.with_label_values(label_values)
555-
.observe(start.elapsed().as_secs_f64());
559+
.observe(self.start.elapsed().as_secs_f64());
556560
SEARCH_METRICS
557561
.leaf_search_targeted_splits
558562
.with_label_values(label_values)
559-
.observe(num_splits as f64);
560-
});
563+
.observe(self.targeted_splits as f64);
564+
}
565+
}
561566

562-
completion_tx
567+
impl<F> Future for LeafSearchMetricsFuture<F>
568+
where F: Future<Output = Result<LeafSearchResponse, SearchError>>
569+
{
570+
type Output = Result<LeafSearchResponse, SearchError>;
571+
572+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
573+
let this = self.project();
574+
let response = ready!(this.tracked.poll(cx));
575+
*this.status = if response.is_ok() {
576+
Some("success")
577+
} else {
578+
Some("error")
579+
};
580+
Poll::Ready(Ok(response?))
581+
}
563582
}

0 commit comments

Comments
 (0)