@@ -312,18 +312,36 @@ def transform_children(
312312
313313# Input Nodex
314314@dataclass (frozen = True )
315- class ReadLocalNode (BigFrameNode ):
315+ class LeafNode (BigFrameNode ):
316+ @property
317+ def roots (self ) -> typing .Set [BigFrameNode ]:
318+ return {self }
319+
320+ @property
321+ def supports_fast_head (self ) -> bool :
322+ return False
323+
324+ def transform_children (
325+ self , t : Callable [[BigFrameNode ], BigFrameNode ]
326+ ) -> BigFrameNode :
327+ return self
328+
329+ @property
330+ def row_count (self ) -> typing .Optional [int ]:
331+ """How many rows are in the data source. None means unknown."""
332+ return None
333+
334+
335+ @dataclass (frozen = True )
336+ class ReadLocalNode (LeafNode ):
316337 feather_bytes : bytes
317338 data_schema : schemata .ArraySchema
339+ n_rows : int
318340 session : typing .Optional [bigframes .session .Session ] = None
319341
320342 def __hash__ (self ):
321343 return self ._node_hash
322344
323- @property
324- def roots (self ) -> typing .Set [BigFrameNode ]:
325- return {self }
326-
327345 @functools .cached_property
328346 def schema (self ) -> schemata .ArraySchema :
329347 return self .data_schema
@@ -333,6 +351,10 @@ def variables_introduced(self) -> int:
333351 """Defines the number of variables generated by the current node. Used to estimate query planning complexity."""
334352 return len (self .schema .items ) + 1
335353
354+ @property
355+ def supports_fast_head (self ) -> bool :
356+ return True
357+
336358 @property
337359 def order_ambiguous (self ) -> bool :
338360 return False
@@ -341,20 +363,38 @@ def order_ambiguous(self) -> bool:
341363 def explicitly_ordered (self ) -> bool :
342364 return True
343365
344- def transform_children (
345- self , t : Callable [[BigFrameNode ], BigFrameNode ]
346- ) -> BigFrameNode :
347- return self
366+ @property
367+ def row_count (self ) -> typing .Optional [int ]:
368+ return self .n_rows
348369
349370
350- ## Put ordering in here or just add order_by node above?
351371@dataclass (frozen = True )
352- class ReadTableNode ( BigFrameNode ) :
372+ class GbqTable :
353373 project_id : str = field ()
354374 dataset_id : str = field ()
355375 table_id : str = field ()
356-
357376 physical_schema : Tuple [bq .SchemaField , ...] = field ()
377+ n_rows : int = field ()
378+ cluster_cols : typing .Optional [Tuple [str , ...]]
379+
380+ @staticmethod
381+ def from_table (table : bq .Table ) -> GbqTable :
382+ return GbqTable (
383+ project_id = table .project ,
384+ dataset_id = table .dataset_id ,
385+ table_id = table .table_id ,
386+ physical_schema = tuple (table .schema ),
387+ n_rows = table .num_rows ,
388+ cluster_cols = None
389+ if table .clustering_fields is None
390+ else tuple (table .clustering_fields ),
391+ )
392+
393+
394+ ## Put ordering in here or just add order_by node above?
395+ @dataclass (frozen = True )
396+ class ReadTableNode (LeafNode ):
397+ table : GbqTable
358398 # Subset of physical schema columns, with chosen BQ types
359399 columns : schemata .ArraySchema = field ()
360400
@@ -370,10 +410,10 @@ class ReadTableNode(BigFrameNode):
370410
371411 def __post_init__ (self ):
372412 # enforce invariants
373- physical_names = set (map (lambda i : i .name , self .physical_schema ))
413+ physical_names = set (map (lambda i : i .name , self .table . physical_schema ))
374414 if not set (self .columns .names ).issubset (physical_names ):
375415 raise ValueError (
376- f"Requested schema { self .columns } cannot be derived from table schemal { self .physical_schema } "
416+ f"Requested schema { self .columns } cannot be derived from table schemal { self .table . physical_schema } "
377417 )
378418 if self .order_col_is_sequential and len (self .total_order_cols ) != 1 :
379419 raise ValueError ("Sequential primary key must have only one component" )
@@ -385,10 +425,6 @@ def session(self):
385425 def __hash__ (self ):
386426 return self ._node_hash
387427
388- @property
389- def roots (self ) -> typing .Set [BigFrameNode ]:
390- return {self }
391-
392428 @property
393429 def schema (self ) -> schemata .ArraySchema :
394430 return self .columns
@@ -398,6 +434,13 @@ def relation_ops_created(self) -> int:
398434 # Assume worst case, where readgbq actually has baked in analytic operation to generate index
399435 return 3
400436
437+ @property
438+ def supports_fast_head (self ) -> bool :
439+ # Fast head is only supported when row offsets are available.
440+ # In the future, ORDER BY+LIMIT optimizations may allow fast head when
441+ # clustered and/or partitioned on ordering key
442+ return self .order_col_is_sequential
443+
401444 @property
402445 def order_ambiguous (self ) -> bool :
403446 return len (self .total_order_cols ) == 0
@@ -410,37 +453,34 @@ def explicitly_ordered(self) -> bool:
410453 def variables_introduced (self ) -> int :
411454 return len (self .schema .items ) + 1
412455
413- def transform_children (
414- self , t : Callable [[BigFrameNode ], BigFrameNode ]
415- ) -> BigFrameNode :
416- return self
456+ @property
457+ def row_count (self ) -> typing .Optional [int ]:
458+ if self .sql_predicate is None :
459+ return self .table .n_rows
460+ return None
417461
418462
419463# This node shouldn't be used in the "original" expression tree, only used as replacement for original during planning
420464@dataclass (frozen = True )
421- class CachedTableNode (BigFrameNode ):
465+ class CachedTableNode (LeafNode ):
422466 # The original BFET subtree that was cached
423467 # note: this isn't a "child" node.
424468 original_node : BigFrameNode = field ()
425469 # reference to cached materialization of original_node
426- project_id : str = field ()
427- dataset_id : str = field ()
428- table_id : str = field ()
429- physical_schema : Tuple [bq .SchemaField , ...] = field ()
430-
470+ table : GbqTable
431471 ordering : typing .Optional [orderings .RowOrdering ] = field ()
432472
433473 def __post_init__ (self ):
434474 # enforce invariants
435- physical_names = set (map (lambda i : i .name , self .physical_schema ))
475+ physical_names = set (map (lambda i : i .name , self .table . physical_schema ))
436476 logical_names = self .original_node .schema .names
437477 if not set (logical_names ).issubset (physical_names ):
438478 raise ValueError (
439- f"Requested schema { logical_names } cannot be derived from table schema { self .physical_schema } "
479+ f"Requested schema { logical_names } cannot be derived from table schema { self .table . physical_schema } "
440480 )
441481 if not set (self .hidden_columns ).issubset (physical_names ):
442482 raise ValueError (
443- f"Requested hidden columns { self .hidden_columns } cannot be derived from table schema { self .physical_schema } "
483+ f"Requested hidden columns { self .hidden_columns } cannot be derived from table schema { self .table . physical_schema } "
444484 )
445485
446486 @property
@@ -450,10 +490,6 @@ def session(self):
450490 def __hash__ (self ):
451491 return self ._node_hash
452492
453- @property
454- def roots (self ) -> typing .Set [BigFrameNode ]:
455- return {self }
456-
457493 @property
458494 def schema (self ) -> schemata .ArraySchema :
459495 return self .original_node .schema
@@ -473,6 +509,13 @@ def hidden_columns(self) -> typing.Tuple[str, ...]:
473509 if col not in self .schema .names
474510 )
475511
512+ @property
513+ def supports_fast_head (self ) -> bool :
514+ # Fast head is only supported when row offsets are available.
515+ # In the future, ORDER BY+LIMIT optimizations may allow fast head when
516+ # clustered and/or partitioned on ordering key
517+ return (self .ordering is None ) or self .ordering .is_sequential
518+
476519 @property
477520 def order_ambiguous (self ) -> bool :
478521 return not isinstance (self .ordering , orderings .TotalOrdering )
@@ -483,10 +526,9 @@ def explicitly_ordered(self) -> bool:
483526 self .ordering .all_ordering_columns
484527 ) > 0
485528
486- def transform_children (
487- self , t : Callable [[BigFrameNode ], BigFrameNode ]
488- ) -> BigFrameNode :
489- return self
529+ @property
530+ def row_count (self ) -> typing .Optional [int ]:
531+ return self .table .n_rows
490532
491533
492534# Unary nodes
0 commit comments