Skip to content

Commit 24be36f

Browse files
authored
Add Prometheus metrics for Datafusion calls to S3 store (#527)
1 parent 3523480 commit 24be36f

File tree

4 files changed

+283
-0
lines changed

4 files changed

+283
-0
lines changed

server/src/metrics/storage.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,12 +64,25 @@ pub mod s3 {
6464
.expect("metric can be created")
6565
});
6666

67+
pub static QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME: Lazy<HistogramVec> = Lazy::new(|| {
68+
HistogramVec::new(
69+
HistogramOpts::new("query_s3_response_time", "S3 Request Latency")
70+
.namespace(METRICS_NAMESPACE),
71+
&["method", "status"],
72+
)
73+
.expect("metric can be created")
74+
});
75+
6776
impl StorageMetrics for S3Config {
6877
fn register_metrics(&self, handler: &actix_web_prometheus::PrometheusMetrics) {
6978
handler
7079
.registry
7180
.register(Box::new(REQUEST_RESPONSE_TIME.clone()))
7281
.expect("metric can be registered");
82+
handler
83+
.registry
84+
.register(Box::new(QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME.clone()))
85+
.expect("metric can be registered");
7386
}
7487
}
7588
}

server/src/storage.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use chrono::Local;
2323
use std::fmt::Debug;
2424

2525
mod localfs;
26+
mod metrics_layer;
2627
mod object_storage;
2728
pub mod retention;
2829
mod s3;

server/src/storage/metrics_layer.rs

Lines changed: 267 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,267 @@
1+
use std::{
2+
ops::Range,
3+
task::{Context, Poll},
4+
time,
5+
};
6+
7+
use async_trait::async_trait;
8+
use bytes::Bytes;
9+
use futures_util::{stream::BoxStream, Stream, StreamExt};
10+
use object_store::{
11+
path::Path, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore,
12+
};
13+
use tokio::io::AsyncWrite;
14+
15+
use crate::metrics::storage::s3::QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME;
16+
17+
#[derive(Debug)]
18+
pub struct MetricLayer<T: ObjectStore> {
19+
inner: T,
20+
}
21+
22+
impl<T: ObjectStore> MetricLayer<T> {
23+
pub fn new(inner: T) -> Self {
24+
Self { inner }
25+
}
26+
}
27+
28+
impl<T: ObjectStore> std::fmt::Display for MetricLayer<T> {
29+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
30+
write!(f, "Metric({})", self.inner)
31+
}
32+
}
33+
34+
#[async_trait]
35+
impl<T: ObjectStore> ObjectStore for MetricLayer<T> {
36+
async fn put(&self, location: &Path, bytes: Bytes) -> object_store::Result<()> {
37+
let time = time::Instant::now();
38+
self.inner.put(location, bytes).await?;
39+
let elapsed = time.elapsed().as_secs_f64();
40+
QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME
41+
.with_label_values(&["PUT", "200"])
42+
.observe(elapsed);
43+
return Ok(());
44+
}
45+
46+
// todo completly tracking multipart upload
47+
async fn put_multipart(
48+
&self,
49+
location: &Path,
50+
) -> object_store::Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
51+
let time = time::Instant::now();
52+
let (id, write) = self.inner.put_multipart(location).await?;
53+
let elapsed = time.elapsed().as_secs_f64();
54+
QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME
55+
.with_label_values(&["PUT_MULTIPART", "200"])
56+
.observe(elapsed);
57+
58+
Ok((id, write))
59+
}
60+
61+
async fn abort_multipart(
62+
&self,
63+
location: &Path,
64+
multipart_id: &MultipartId,
65+
) -> object_store::Result<()> {
66+
let time = time::Instant::now();
67+
let elapsed = time.elapsed().as_secs_f64();
68+
self.inner.abort_multipart(location, multipart_id).await?;
69+
QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME
70+
.with_label_values(&["PUT_MULTIPART_ABORT", "200"])
71+
.observe(elapsed);
72+
Ok(())
73+
}
74+
75+
async fn append(
76+
&self,
77+
location: &Path,
78+
) -> object_store::Result<Box<dyn AsyncWrite + Unpin + Send>> {
79+
let time = time::Instant::now();
80+
let write = self.inner.append(location).await?;
81+
let elapsed = time.elapsed().as_secs_f64();
82+
QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME
83+
.with_label_values(&["APPEND", "200"])
84+
.observe(elapsed);
85+
86+
Ok(write)
87+
}
88+
89+
async fn get(&self, location: &Path) -> object_store::Result<GetResult> {
90+
let time = time::Instant::now();
91+
let res = self.inner.get(location).await?;
92+
let elapsed = time.elapsed().as_secs_f64();
93+
QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME
94+
.with_label_values(&["GET", "200"])
95+
.observe(elapsed);
96+
Ok(res)
97+
}
98+
99+
async fn get_opts(
100+
&self,
101+
location: &Path,
102+
options: GetOptions,
103+
) -> object_store::Result<GetResult> {
104+
let time = time::Instant::now();
105+
let res = self.inner.get_opts(location, options).await?;
106+
let elapsed = time.elapsed().as_secs_f64();
107+
QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME
108+
.with_label_values(&["GET_OPTS", "200"])
109+
.observe(elapsed);
110+
Ok(res)
111+
}
112+
113+
async fn get_range(&self, location: &Path, range: Range<usize>) -> object_store::Result<Bytes> {
114+
let time = time::Instant::now();
115+
let res = self.inner.get_range(location, range).await?;
116+
let elapsed = time.elapsed().as_secs_f64();
117+
QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME
118+
.with_label_values(&["GET_RANGE", "200"])
119+
.observe(elapsed);
120+
Ok(res)
121+
}
122+
123+
async fn get_ranges(
124+
&self,
125+
location: &Path,
126+
ranges: &[Range<usize>],
127+
) -> object_store::Result<Vec<Bytes>> {
128+
let time = time::Instant::now();
129+
let res = self.inner.get_ranges(location, ranges).await?;
130+
let elapsed = time.elapsed().as_secs_f64();
131+
QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME
132+
.with_label_values(&["GET_RANGES", "200"])
133+
.observe(elapsed);
134+
Ok(res)
135+
}
136+
137+
async fn head(&self, location: &Path) -> object_store::Result<ObjectMeta> {
138+
let time = time::Instant::now();
139+
let res = self.inner.head(location).await?;
140+
let elapsed = time.elapsed().as_secs_f64();
141+
QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME
142+
.with_label_values(&["HEAD", "200"])
143+
.observe(elapsed);
144+
Ok(res)
145+
}
146+
147+
async fn delete(&self, location: &Path) -> object_store::Result<()> {
148+
let time = time::Instant::now();
149+
let res = self.inner.delete(location).await?;
150+
let elapsed = time.elapsed().as_secs_f64();
151+
QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME
152+
.with_label_values(&["DELETE", "200"])
153+
.observe(elapsed);
154+
Ok(res)
155+
}
156+
157+
fn delete_stream<'a>(
158+
&'a self,
159+
locations: BoxStream<'a, object_store::Result<Path>>,
160+
) -> BoxStream<'a, object_store::Result<Path>> {
161+
self.inner.delete_stream(locations)
162+
}
163+
164+
async fn list(
165+
&self,
166+
prefix: Option<&Path>,
167+
) -> object_store::Result<BoxStream<'_, object_store::Result<ObjectMeta>>> {
168+
let time = time::Instant::now();
169+
let inner = self.inner.list(prefix).await?;
170+
let res = StreamMetricWrapper {
171+
time,
172+
labels: ["LIST", "200"],
173+
inner,
174+
};
175+
Ok(Box::pin(res))
176+
}
177+
178+
async fn list_with_offset(
179+
&self,
180+
prefix: Option<&Path>,
181+
offset: &Path,
182+
) -> object_store::Result<BoxStream<'_, object_store::Result<ObjectMeta>>> {
183+
let time = time::Instant::now();
184+
let inner = self.inner.list_with_offset(prefix, offset).await?;
185+
let res = StreamMetricWrapper {
186+
time,
187+
labels: ["LIST_OFFSET", "200"],
188+
inner,
189+
};
190+
Ok(Box::pin(res))
191+
}
192+
193+
async fn list_with_delimiter(&self, prefix: Option<&Path>) -> object_store::Result<ListResult> {
194+
let time = time::Instant::now();
195+
let res = self.inner.list_with_delimiter(prefix).await?;
196+
let elapsed = time.elapsed().as_secs_f64();
197+
QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME
198+
.with_label_values(&["LIST_DELIM", "200"])
199+
.observe(elapsed);
200+
Ok(res)
201+
}
202+
203+
async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> {
204+
let time = time::Instant::now();
205+
let res = self.inner.copy(from, to).await?;
206+
let elapsed = time.elapsed().as_secs_f64();
207+
QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME
208+
.with_label_values(&["COPY", "200"])
209+
.observe(elapsed);
210+
Ok(res)
211+
}
212+
213+
async fn rename(&self, from: &Path, to: &Path) -> object_store::Result<()> {
214+
let time = time::Instant::now();
215+
let res = self.inner.rename(from, to).await?;
216+
let elapsed = time.elapsed().as_secs_f64();
217+
QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME
218+
.with_label_values(&["RENAME", "200"])
219+
.observe(elapsed);
220+
Ok(res)
221+
}
222+
223+
async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> {
224+
let time = time::Instant::now();
225+
let res = self.inner.copy_if_not_exists(from, to).await?;
226+
let elapsed = time.elapsed().as_secs_f64();
227+
QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME
228+
.with_label_values(&["COPY_IF", "200"])
229+
.observe(elapsed);
230+
Ok(res)
231+
}
232+
233+
async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> {
234+
let time = time::Instant::now();
235+
let res = self.inner.rename_if_not_exists(from, to).await?;
236+
let elapsed = time.elapsed().as_secs_f64();
237+
QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME
238+
.with_label_values(&["RENAME_IF", "200"])
239+
.observe(elapsed);
240+
Ok(res)
241+
}
242+
}
243+
244+
struct StreamMetricWrapper<'a, const N: usize, T> {
245+
time: time::Instant,
246+
labels: [&'static str; N],
247+
inner: BoxStream<'a, T>,
248+
}
249+
250+
impl<T, const N: usize> Stream for StreamMetricWrapper<'_, N, T> {
251+
type Item = T;
252+
253+
fn poll_next(
254+
mut self: std::pin::Pin<&mut Self>,
255+
cx: &mut Context<'_>,
256+
) -> Poll<Option<Self::Item>> {
257+
match self.inner.poll_next_unpin(cx) {
258+
t @ Poll::Ready(None) => {
259+
QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME
260+
.with_label_values(&self.labels)
261+
.observe(self.time.elapsed().as_secs_f64());
262+
t
263+
}
264+
t => t,
265+
}
266+
}
267+
}

server/src/storage/s3.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ use std::time::{Duration, Instant};
4141
use crate::metrics::storage::{s3::REQUEST_RESPONSE_TIME, StorageMetrics};
4242
use crate::storage::{LogStream, ObjectStorage, ObjectStorageError};
4343

44+
use super::metrics_layer::MetricLayer;
4445
use super::{object_storage, ObjectStorageProvider};
4546

4647
// in bytes
@@ -178,6 +179,7 @@ impl ObjectStorageProvider for S3Config {
178179

179180
// limit objectstore to a concurrent request limit
180181
let s3 = LimitStore::new(s3, super::MAX_OBJECT_STORE_REQUESTS);
182+
let s3 = MetricLayer::new(s3);
181183

182184
let object_store_registry: DefaultObjectStoreRegistry = DefaultObjectStoreRegistry::new();
183185
let url = ObjectStoreUrl::parse(format!("s3://{}", &self.bucket_name)).unwrap();

0 commit comments

Comments
 (0)