|
16 | 16 | // under the License.
|
17 | 17 |
|
18 | 18 | use std::collections::VecDeque;
|
19 |
| -use std::future::Future; |
20 |
| -use std::pin::Pin; |
21 |
| -use std::task::Context; |
22 | 19 | use std::task::Poll;
|
23 | 20 |
|
24 | 21 | use futures::poll;
|
25 |
| -use futures::stream::FuturesOrdered; |
26 |
| -use futures::FutureExt; |
27 |
| -use futures::StreamExt; |
28 | 22 |
|
29 | 23 | use crate::*;
|
30 | 24 |
|
@@ -271,253 +265,15 @@ impl<I: Send + 'static, O: Send + 'static> ConcurrentTasks<I, O> {
|
271 | 265 | }
|
272 | 266 | }
|
273 | 267 |
|
274 |
| -/// CONCURRENT_LARGE_THRESHOLD is the threshold to determine whether to use |
275 |
| -/// [`FuturesOrdered`] or not. |
276 |
| -/// |
277 |
| -/// The value of `8` is picked by random, no strict benchmark is done. |
278 |
| -/// Please raise an issue if you found the value is not good enough or you want to configure |
279 |
| -/// this value at runtime. |
280 |
| -const CONCURRENT_LARGE_THRESHOLD: usize = 8; |
281 |
| - |
282 |
| -/// ConcurrentFutures is a stream that can hold a stream of concurrent futures. |
283 |
| -/// |
284 |
| -/// - the order of the futures is the same. |
285 |
| -/// - the number of concurrent futures is limited by concurrent. |
286 |
| -/// - optimized for small number of concurrent futures. |
287 |
| -/// - zero cost for non-concurrent futures cases (concurrent == 1). |
288 |
| -pub struct ConcurrentFutures<F: Future + Unpin> { |
289 |
| - tasks: Tasks<F>, |
290 |
| - concurrent: usize, |
291 |
| -} |
292 |
| - |
293 |
| -/// Tasks is used to hold the entire task queue. |
294 |
| -enum Tasks<F: Future + Unpin> { |
295 |
| - /// The special case for concurrent == 1. |
296 |
| - /// |
297 |
| - /// It works exactly the same like `Option<Fut>` in a struct. |
298 |
| - Once(Option<F>), |
299 |
| - /// The special cases for concurrent is small. |
300 |
| - /// |
301 |
| - /// At this case, the cost to loop poll is lower than using `FuturesOrdered`. |
302 |
| - /// |
303 |
| - /// We will replace the future by `TaskResult::Ready` once it's ready to avoid consume it again. |
304 |
| - Small(VecDeque<TaskResult<F>>), |
305 |
| - /// The general cases for large concurrent. |
306 |
| - /// |
307 |
| - /// We use `FuturesOrdered` to avoid huge amount of poll on futures. |
308 |
| - Large(FuturesOrdered<F>), |
309 |
| -} |
310 |
| - |
311 |
| -impl<F: Future + Unpin> Unpin for Tasks<F> {} |
312 |
| - |
313 |
| -enum TaskResult<F: Future + Unpin> { |
314 |
| - Polling(F), |
315 |
| - Ready(F::Output), |
316 |
| -} |
317 |
| - |
318 |
| -impl<F> ConcurrentFutures<F> |
319 |
| -where |
320 |
| - F: Future + Unpin + 'static, |
321 |
| -{ |
322 |
| - /// Create a new ConcurrentFutures by specifying the number of concurrent futures. |
323 |
| - pub fn new(concurrent: usize) -> Self { |
324 |
| - if (0..2).contains(&concurrent) { |
325 |
| - Self { |
326 |
| - tasks: Tasks::Once(None), |
327 |
| - concurrent, |
328 |
| - } |
329 |
| - } else if (2..=CONCURRENT_LARGE_THRESHOLD).contains(&concurrent) { |
330 |
| - Self { |
331 |
| - tasks: Tasks::Small(VecDeque::with_capacity(concurrent)), |
332 |
| - concurrent, |
333 |
| - } |
334 |
| - } else { |
335 |
| - Self { |
336 |
| - tasks: Tasks::Large(FuturesOrdered::new()), |
337 |
| - concurrent, |
338 |
| - } |
339 |
| - } |
340 |
| - } |
341 |
| - |
342 |
| - /// Drop all tasks. |
343 |
| - pub fn clear(&mut self) { |
344 |
| - match &mut self.tasks { |
345 |
| - Tasks::Once(fut) => *fut = None, |
346 |
| - Tasks::Small(tasks) => tasks.clear(), |
347 |
| - Tasks::Large(tasks) => *tasks = FuturesOrdered::new(), |
348 |
| - } |
349 |
| - } |
350 |
| - |
351 |
| - /// Return the length of current concurrent futures (both ongoing and ready). |
352 |
| - pub fn len(&self) -> usize { |
353 |
| - match &self.tasks { |
354 |
| - Tasks::Once(fut) => fut.is_some() as usize, |
355 |
| - Tasks::Small(v) => v.len(), |
356 |
| - Tasks::Large(v) => v.len(), |
357 |
| - } |
358 |
| - } |
359 |
| - |
360 |
| - /// Return true if there is no futures in the queue. |
361 |
| - pub fn is_empty(&self) -> bool { |
362 |
| - self.len() == 0 |
363 |
| - } |
364 |
| - |
365 |
| - /// Return the number of remaining space to push new futures. |
366 |
| - pub fn remaining(&self) -> usize { |
367 |
| - self.concurrent - self.len() |
368 |
| - } |
369 |
| - |
370 |
| - /// Return true if there is remaining space to push new futures. |
371 |
| - pub fn has_remaining(&self) -> bool { |
372 |
| - self.remaining() > 0 |
373 |
| - } |
374 |
| - |
375 |
| - /// Push new future into the end of queue. |
376 |
| - pub fn push_back(&mut self, f: F) { |
377 |
| - debug_assert!( |
378 |
| - self.has_remaining(), |
379 |
| - "concurrent futures must have remaining space" |
380 |
| - ); |
381 |
| - |
382 |
| - match &mut self.tasks { |
383 |
| - Tasks::Once(fut) => { |
384 |
| - *fut = Some(f); |
385 |
| - } |
386 |
| - Tasks::Small(v) => v.push_back(TaskResult::Polling(f)), |
387 |
| - Tasks::Large(v) => v.push_back(f), |
388 |
| - } |
389 |
| - } |
390 |
| - |
391 |
| - /// Push new future into the start of queue, this task will be exactly the next to poll. |
392 |
| - pub fn push_front(&mut self, f: F) { |
393 |
| - debug_assert!( |
394 |
| - self.has_remaining(), |
395 |
| - "concurrent futures must have remaining space" |
396 |
| - ); |
397 |
| - |
398 |
| - match &mut self.tasks { |
399 |
| - Tasks::Once(fut) => { |
400 |
| - *fut = Some(f); |
401 |
| - } |
402 |
| - Tasks::Small(v) => v.push_front(TaskResult::Polling(f)), |
403 |
| - Tasks::Large(v) => v.push_front(f), |
404 |
| - } |
405 |
| - } |
406 |
| -} |
407 |
| - |
408 |
| -impl<F> futures::Stream for ConcurrentFutures<F> |
409 |
| -where |
410 |
| - F: Future + Unpin + 'static, |
411 |
| -{ |
412 |
| - type Item = F::Output; |
413 |
| - |
414 |
| - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
415 |
| - match &mut self.get_mut().tasks { |
416 |
| - Tasks::Once(fut) => match fut { |
417 |
| - Some(x) => x.poll_unpin(cx).map(|v| { |
418 |
| - *fut = None; |
419 |
| - Some(v) |
420 |
| - }), |
421 |
| - None => Poll::Ready(None), |
422 |
| - }, |
423 |
| - Tasks::Small(v) => { |
424 |
| - // Poll all tasks together. |
425 |
| - for task in v.iter_mut() { |
426 |
| - if let TaskResult::Polling(f) = task { |
427 |
| - match f.poll_unpin(cx) { |
428 |
| - Poll::Pending => {} |
429 |
| - Poll::Ready(res) => { |
430 |
| - // Replace with ready value if this future has been resolved. |
431 |
| - *task = TaskResult::Ready(res); |
432 |
| - } |
433 |
| - } |
434 |
| - } |
435 |
| - } |
436 |
| - |
437 |
| - // Pick the first one to check. |
438 |
| - match v.front_mut() { |
439 |
| - // Return pending if the first one is still polling. |
440 |
| - Some(TaskResult::Polling(_)) => Poll::Pending, |
441 |
| - Some(TaskResult::Ready(_)) => { |
442 |
| - let res = v.pop_front().unwrap(); |
443 |
| - match res { |
444 |
| - TaskResult::Polling(_) => unreachable!(), |
445 |
| - TaskResult::Ready(res) => Poll::Ready(Some(res)), |
446 |
| - } |
447 |
| - } |
448 |
| - None => Poll::Ready(None), |
449 |
| - } |
450 |
| - } |
451 |
| - Tasks::Large(v) => v.poll_next_unpin(cx), |
452 |
| - } |
453 |
| - } |
454 |
| -} |
455 |
| - |
456 | 268 | #[cfg(test)]
|
457 | 269 | mod tests {
|
458 |
| - use std::task::ready; |
459 | 270 | use std::time::Duration;
|
460 | 271 |
|
461 |
| - use futures::future::BoxFuture; |
462 |
| - use futures::Stream; |
463 | 272 | use rand::Rng;
|
464 | 273 | use tokio::time::sleep;
|
465 | 274 |
|
466 | 275 | use super::*;
|
467 | 276 |
|
468 |
| - struct Lister { |
469 |
| - size: usize, |
470 |
| - idx: usize, |
471 |
| - concurrent: usize, |
472 |
| - tasks: ConcurrentFutures<BoxFuture<'static, usize>>, |
473 |
| - } |
474 |
| - |
475 |
| - impl Stream for Lister { |
476 |
| - type Item = usize; |
477 |
| - |
478 |
| - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
479 |
| - // Randomly sleep for a while, simulate some io operations that up to 100 microseconds. |
480 |
| - let timeout = Duration::from_micros(rand::thread_rng().gen_range(0..100)); |
481 |
| - let idx = self.idx; |
482 |
| - if self.tasks.len() < self.concurrent && self.idx < self.size { |
483 |
| - let fut = async move { |
484 |
| - tokio::time::sleep(timeout).await; |
485 |
| - idx |
486 |
| - }; |
487 |
| - self.idx += 1; |
488 |
| - self.tasks.push_back(Box::pin(fut)); |
489 |
| - } |
490 |
| - |
491 |
| - if let Some(v) = ready!(self.tasks.poll_next_unpin(cx)) { |
492 |
| - Poll::Ready(Some(v)) |
493 |
| - } else { |
494 |
| - Poll::Ready(None) |
495 |
| - } |
496 |
| - } |
497 |
| - } |
498 |
| - |
499 |
| - #[tokio::test] |
500 |
| - async fn test_concurrent_futures() { |
501 |
| - let cases = vec![ |
502 |
| - ("once", 1), |
503 |
| - ("small", CONCURRENT_LARGE_THRESHOLD - 1), |
504 |
| - ("large", CONCURRENT_LARGE_THRESHOLD + 1), |
505 |
| - ]; |
506 |
| - |
507 |
| - for (name, concurrent) in cases { |
508 |
| - let lister = Lister { |
509 |
| - size: 1000, |
510 |
| - idx: 0, |
511 |
| - concurrent, |
512 |
| - tasks: ConcurrentFutures::new(concurrent), |
513 |
| - }; |
514 |
| - let expected: Vec<usize> = (0..1000).collect(); |
515 |
| - let result: Vec<usize> = lister.collect().await; |
516 |
| - |
517 |
| - assert_eq!(expected, result, "concurrent futures failed: {}", name); |
518 |
| - } |
519 |
| - } |
520 |
| - |
521 | 277 | #[tokio::test]
|
522 | 278 | async fn test_concurrent_tasks() {
|
523 | 279 | let executor = Executor::new();
|
|
0 commit comments