Skip to content

Commit 35e8e33

Browse files
authored
Minor: Add implementation examples to ExecutionPlan::execute (#8013)
* Add implementation examples to ExecutionPlan::execute * Review feedback
1 parent 94dac76 commit 35e8e33

File tree

1 file changed

+104
-0
lines changed
  • datafusion/physical-plan/src

1 file changed

+104
-0
lines changed

datafusion/physical-plan/src/lib.rs

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,110 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
236236
}
237237

238238
/// Begin execution of `partition`, returning a stream of [`RecordBatch`]es.
239+
///
240+
/// # Implementation Examples
241+
///
242+
/// ## Return Precomputed Batch
243+
///
244+
/// We can return a precomputed batch as a stream
245+
///
246+
/// ```
247+
/// # use std::sync::Arc;
248+
/// # use arrow_array::RecordBatch;
249+
/// # use arrow_schema::SchemaRef;
250+
/// # use datafusion_common::Result;
251+
/// # use datafusion_execution::{SendableRecordBatchStream, TaskContext};
252+
/// # use datafusion_physical_plan::memory::MemoryStream;
253+
/// # use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
254+
/// struct MyPlan {
255+
/// batch: RecordBatch,
256+
/// }
257+
///
258+
/// impl MyPlan {
259+
/// fn execute(
260+
/// &self,
261+
/// partition: usize,
262+
/// context: Arc<TaskContext>
263+
/// ) -> Result<SendableRecordBatchStream> {
264+
/// let fut = futures::future::ready(Ok(self.batch.clone()));
265+
/// let stream = futures::stream::once(fut);
266+
/// Ok(Box::pin(RecordBatchStreamAdapter::new(self.batch.schema(), stream)))
267+
/// }
268+
/// }
269+
/// ```
270+
///
271+
/// ## Async Compute Batch
272+
///
273+
/// We can also lazily compute a RecordBatch when the returned stream is polled
274+
///
275+
/// ```
276+
/// # use std::sync::Arc;
277+
/// # use arrow_array::RecordBatch;
278+
/// # use arrow_schema::SchemaRef;
279+
/// # use datafusion_common::Result;
280+
/// # use datafusion_execution::{SendableRecordBatchStream, TaskContext};
281+
/// # use datafusion_physical_plan::memory::MemoryStream;
282+
/// # use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
283+
/// struct MyPlan {
284+
/// schema: SchemaRef,
285+
/// }
286+
///
287+
/// async fn get_batch() -> Result<RecordBatch> {
288+
/// todo!()
289+
/// }
290+
///
291+
/// impl MyPlan {
292+
/// fn execute(
293+
/// &self,
294+
/// partition: usize,
295+
/// context: Arc<TaskContext>
296+
/// ) -> Result<SendableRecordBatchStream> {
297+
/// let fut = get_batch();
298+
/// let stream = futures::stream::once(fut);
299+
/// Ok(Box::pin(RecordBatchStreamAdapter::new(self.schema.clone(), stream)))
300+
/// }
301+
/// }
302+
/// ```
303+
///
304+
/// ## Async Compute Batch Stream
305+
///
306+
/// We can lazily compute a RecordBatch stream when the returned stream is polled
307+
/// flattening the result into a single stream
308+
///
309+
/// ```
310+
/// # use std::sync::Arc;
311+
/// # use arrow_array::RecordBatch;
312+
/// # use arrow_schema::SchemaRef;
313+
/// # use futures::TryStreamExt;
314+
/// # use datafusion_common::Result;
315+
/// # use datafusion_execution::{SendableRecordBatchStream, TaskContext};
316+
/// # use datafusion_physical_plan::memory::MemoryStream;
317+
/// # use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
318+
/// struct MyPlan {
319+
/// schema: SchemaRef,
320+
/// }
321+
///
322+
/// async fn get_batch_stream() -> Result<SendableRecordBatchStream> {
323+
/// todo!()
324+
/// }
325+
///
326+
/// impl MyPlan {
327+
/// fn execute(
328+
/// &self,
329+
/// partition: usize,
330+
/// context: Arc<TaskContext>
331+
/// ) -> Result<SendableRecordBatchStream> {
332+
/// // A future that yields a stream
333+
/// let fut = get_batch_stream();
334+
/// // Use TryStreamExt::try_flatten to flatten the stream of streams
335+
/// let stream = futures::stream::once(fut).try_flatten();
336+
/// Ok(Box::pin(RecordBatchStreamAdapter::new(self.schema.clone(), stream)))
337+
/// }
338+
/// }
339+
/// ```
340+
///
341+
/// See [`futures::stream::StreamExt`] and [`futures::stream::TryStreamExt`] for further
342+
/// combinators that can be used with streams
239343
fn execute(
240344
&self,
241345
partition: usize,

0 commit comments

Comments
 (0)