38
38
import bigframes .core .ordering as orderings
39
39
import bigframes .session
40
40
41
+ # A fixed number of variable to assume for overhead on some operations
42
+ OVERHEAD_VARIABLES = 5
43
+
41
44
42
45
@dataclass (frozen = True )
43
46
class BigFrameNode :
@@ -107,6 +110,38 @@ def roots(self) -> typing.Set[BigFrameNode]:
107
110
def schema (self ) -> schemata .ArraySchema :
108
111
...
109
112
113
+ @property
114
+ @abc .abstractmethod
115
+ def variables_introduced (self ) -> int :
116
+ """
117
+ Defines the number of variables generated by the current node. Used to estimate query planning complexity.
118
+ """
119
+ ...
120
+
121
+ @property
122
+ def relation_ops_created (self ) -> int :
123
+ """
124
+ Defines the number of relational ops generated by the current node. Used to estimate query planning complexity.
125
+ """
126
+ return 1
127
+
128
+ @functools .cached_property
129
+ def total_variables (self ) -> int :
130
+ return self .variables_introduced + sum (
131
+ map (lambda x : x .total_variables , self .child_nodes )
132
+ )
133
+
134
+ @functools .cached_property
135
+ def total_relational_ops (self ) -> int :
136
+ return self .relation_ops_created + sum (
137
+ map (lambda x : x .total_relational_ops , self .child_nodes )
138
+ )
139
+
140
+ @property
141
+ def planning_complexity (self ) -> int :
142
+ """Heuristic measure of planning complexity. Used to determine when to decompose overly complex computations."""
143
+ return self .total_variables * self .total_relational_ops
144
+
110
145
111
146
@dataclass (frozen = True )
112
147
class UnaryNode (BigFrameNode ):
@@ -165,6 +200,10 @@ def join_mapping_to_schema_item(mapping: JoinColumnMapping):
165
200
)
166
201
return schemata .ArraySchema (items )
167
202
203
+ @functools .cached_property
204
+ def variables_introduced (self ) -> int :
205
+ return OVERHEAD_VARIABLES
206
+
168
207
169
208
@dataclass (frozen = True )
170
209
class ConcatNode (BigFrameNode ):
@@ -193,6 +232,11 @@ def schema(self) -> schemata.ArraySchema:
193
232
)
194
233
return schemata .ArraySchema (items )
195
234
235
+ @functools .cached_property
236
+ def variables_introduced (self ) -> int :
237
+ """Defines the number of variables generated by the current node. Used to estimate query planning complexity."""
238
+ return OVERHEAD_VARIABLES
239
+
196
240
197
241
# Input Nodex
198
242
@dataclass (frozen = True )
@@ -216,6 +260,11 @@ def roots(self) -> typing.Set[BigFrameNode]:
216
260
def schema (self ) -> schemata .ArraySchema :
217
261
return self .data_schema
218
262
263
+ @functools .cached_property
264
+ def variables_introduced (self ) -> int :
265
+ """Defines the number of variables generated by the current node. Used to estimate query planning complexity."""
266
+ return len (self .schema .items ) + 1
267
+
219
268
220
269
# TODO: Refactor to take raw gbq object reference
221
270
@dataclass (frozen = True )
@@ -252,6 +301,15 @@ def schema(self) -> schemata.ArraySchema:
252
301
)
253
302
return schemata .ArraySchema (items )
254
303
304
+ @functools .cached_property
305
+ def variables_introduced (self ) -> int :
306
+ return len (self .columns ) + len (self .hidden_ordering_columns )
307
+
308
+ @property
309
+ def relation_ops_created (self ) -> int :
310
+ # Assume worst case, where readgbq actually has baked in analytic operation to generate index
311
+ return 2
312
+
255
313
256
314
# Unary nodes
257
315
@dataclass (frozen = True )
@@ -275,6 +333,10 @@ def schema(self) -> schemata.ArraySchema:
275
333
schemata .SchemaItem (self .col_id , bigframes .dtypes .INT_DTYPE )
276
334
)
277
335
336
+ @functools .cached_property
337
+ def variables_introduced (self ) -> int :
338
+ return 1
339
+
278
340
279
341
@dataclass (frozen = True )
280
342
class FilterNode (UnaryNode ):
@@ -287,6 +349,10 @@ def row_preserving(self) -> bool:
287
349
def __hash__ (self ):
288
350
return self ._node_hash
289
351
352
+ @property
353
+ def variables_introduced (self ) -> int :
354
+ return 1
355
+
290
356
291
357
@dataclass (frozen = True )
292
358
class OrderByNode (UnaryNode ):
@@ -304,6 +370,15 @@ def __post_init__(self):
304
370
def __hash__ (self ):
305
371
return self ._node_hash
306
372
373
+ @property
374
+ def variables_introduced (self ) -> int :
375
+ return 0
376
+
377
+ @property
378
+ def relation_ops_created (self ) -> int :
379
+ # Doesnt directly create any relational operations
380
+ return 0
381
+
307
382
308
383
@dataclass (frozen = True )
309
384
class ReversedNode (UnaryNode ):
@@ -313,6 +388,15 @@ class ReversedNode(UnaryNode):
313
388
def __hash__ (self ):
314
389
return self ._node_hash
315
390
391
+ @property
392
+ def variables_introduced (self ) -> int :
393
+ return 0
394
+
395
+ @property
396
+ def relation_ops_created (self ) -> int :
397
+ # Doesnt directly create any relational operations
398
+ return 0
399
+
316
400
317
401
@dataclass (frozen = True )
318
402
class ProjectionNode (UnaryNode ):
@@ -332,6 +416,12 @@ def schema(self) -> schemata.ArraySchema:
332
416
)
333
417
return schemata .ArraySchema (items )
334
418
419
+ @property
420
+ def variables_introduced (self ) -> int :
421
+ # ignore passthrough expressions
422
+ new_vars = sum (1 for i in self .assignments if not i [0 ].is_raw_variable )
423
+ return new_vars
424
+
335
425
336
426
# TODO: Merge RowCount into Aggregate Node?
337
427
# Row count can be compute from table metadata sometimes, so it is a bit special.
@@ -351,6 +441,11 @@ def schema(self) -> schemata.ArraySchema:
351
441
(schemata .SchemaItem ("count" , bigframes .dtypes .INT_DTYPE ),)
352
442
)
353
443
444
+ @property
445
+ def variables_introduced (self ) -> int :
446
+ # ignore passthrough expressions
447
+ return 1
448
+
354
449
355
450
@dataclass (frozen = True )
356
451
class AggregateNode (UnaryNode ):
@@ -388,6 +483,10 @@ def schema(self) -> schemata.ArraySchema:
388
483
)
389
484
return schemata .ArraySchema (tuple ([* by_items , * agg_items ]))
390
485
486
+ @property
487
+ def variables_introduced (self ) -> int :
488
+ return len (self .aggregations ) + len (self .by_column_ids )
489
+
391
490
392
491
@dataclass (frozen = True )
393
492
class WindowOpNode (UnaryNode ):
@@ -421,12 +520,31 @@ def schema(self) -> schemata.ArraySchema:
421
520
schemata .SchemaItem (self .output_name , new_item_dtype )
422
521
)
423
522
523
+ @property
524
+ def variables_introduced (self ) -> int :
525
+ return 1
526
+
527
+ @property
528
+ def relation_ops_created (self ) -> int :
529
+ # Assume that if not reprojecting, that there is a sequence of window operations sharing the same window
530
+ return 0 if self .skip_reproject_unsafe else 2
531
+
424
532
533
+ # TODO: Remove this op
425
534
@dataclass (frozen = True )
426
535
class ReprojectOpNode (UnaryNode ):
427
536
def __hash__ (self ):
428
537
return self ._node_hash
429
538
539
+ @property
540
+ def variables_introduced (self ) -> int :
541
+ return 0
542
+
543
+ @property
544
+ def relation_ops_created (self ) -> int :
545
+ # This op is not a real transformation, just a hint to the sql generator
546
+ return 0
547
+
430
548
431
549
@dataclass (frozen = True )
432
550
class UnpivotNode (UnaryNode ):
@@ -498,6 +616,17 @@ def infer_dtype(
498
616
]
499
617
return schemata .ArraySchema ((* index_items , * value_items , * passthrough_items ))
500
618
619
+ @property
620
+ def variables_introduced (self ) -> int :
621
+ return (
622
+ len (self .schema .items ) - len (self .passthrough_columns ) + OVERHEAD_VARIABLES
623
+ )
624
+
625
+ @property
626
+ def relation_ops_created (self ) -> int :
627
+ # Unpivot is essentially a cross join and a projection.
628
+ return 2
629
+
501
630
502
631
@dataclass (frozen = True )
503
632
class RandomSampleNode (UnaryNode ):
@@ -513,3 +642,7 @@ def row_preserving(self) -> bool:
513
642
514
643
def __hash__ (self ):
515
644
return self ._node_hash
645
+
646
+ @property
647
+ def variables_introduced (self ) -> int :
648
+ return 1
0 commit comments