From fff9408a01069e0183d59c8bbc04fd2b15ea37f2 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Thu, 1 Aug 2024 21:13:06 +0000 Subject: [PATCH 1/5] perf: Generate SQL with fewer CTEs --- bigframes/core/compile/compiled.py | 69 ++++++++++++++++++++++-------- bigframes/core/compile/compiler.py | 1 - bigframes/core/window_spec.py | 14 +++++- 3 files changed, 63 insertions(+), 21 deletions(-) diff --git a/bigframes/core/compile/compiled.py b/bigframes/core/compile/compiled.py index c822dd331c..d0441aa049 100644 --- a/bigframes/core/compile/compiled.py +++ b/bigframes/core/compile/compiled.py @@ -24,6 +24,7 @@ import ibis.backends.bigquery as ibis_bigquery import ibis.common.deferred # type: ignore import ibis.expr.datatypes as ibis_dtypes +import ibis.expr.operations as ibis_ops import ibis.expr.types as ibis_types import pandas @@ -71,19 +72,16 @@ def __init__( # Allow creating a DataFrame directly from an Ibis table expression. # TODO(swast): Validate that each column references the same table (or # no table for literal values). - self._columns = tuple(columns) + self._columns = tuple( + column.resolve(table) + # TODO(https://github.com/ibis-project/ibis/issues/7613): use + # public API to refer to Deferred type. + if isinstance(column, ibis.common.deferred.Deferred) else column + for column in columns + ) # To allow for more efficient lookup by column name, create a # dictionary mapping names to column values. - self._column_names = { - ( - column.resolve(table) - # TODO(https://github.com/ibis-project/ibis/issues/7613): use - # public API to refer to Deferred type. - if isinstance(column, ibis.common.deferred.Deferred) - else column - ).get_name(): column - for column in self._columns - } + self._column_names = {column.get_name(): column for column in self._columns} @property def columns(self) -> typing.Tuple[ibis_types.Value, ...]: @@ -139,10 +137,6 @@ def projection( for expression, id in expression_id_pairs ] result = self._select(tuple(values)) # type: ignore - - # Need to reproject to convert ibis Scalar to ibis Column object - if any(exp_id[0].is_const for exp_id in expression_id_pairs): - result = result._reproject_to_table() return result @abc.abstractmethod @@ -166,6 +160,13 @@ def _get_ibis_column(self, key: str) -> ibis_types.Value: ), ) + def is_scalar_expr(self, key: str) -> bool: + # sometimes need to determine if column expression is a scalar expression. + # For instance, cannot filter on an analytic expression, or nest analytic expressions. + # Literals are excluded because ibis itself doesn't work well with them, not because of sql limitations. + ibis_expr = self._get_ibis_column(key) + return not is_literal(ibis_expr) and not is_window(ibis_expr) + def get_column_type(self, key: str) -> bigframes.dtypes.Dtype: ibis_type = typing.cast( bigframes.core.compile.ibis_types.IbisDtype, @@ -355,6 +356,9 @@ def _to_ibis_expr( return table def filter(self, predicate: ex.Expression) -> UnorderedIR: + if any(map(is_window, map(self._get_ibis_column, predicate.unbound_variables))): + # ibis doesn't support qualify syntax + return self._reproject_to_table().filter(predicate) bindings = {col: self._get_ibis_column(col) for col in self.column_ids} condition = op_compiler.compile_expression(predicate, bindings) return self._filter(condition) @@ -806,7 +810,6 @@ def project_window_op( output_name=None, *, never_skip_nulls=False, - skip_reproject_unsafe: bool = False, ) -> OrderedIR: """ Creates a new expression based on this expression with unary operation applied to one column. @@ -815,8 +818,18 @@ def project_window_op( window_spec: a specification of the window over which to apply the operator output_name: the id to assign to the output of the operator, by default will replace input col if distinct output id not provided never_skip_nulls: will disable null skipping for operators that would otherwise do so - skip_reproject_unsafe: skips the reprojection step, can be used when performing many non-dependent window operations, user responsible for not nesting window expressions, or using outputs as join, filter or aggregation keys before a reprojection """ + used_vars = [column_name, *window_spec.all_referenced_columns] + # Cannot nest analytic expressions, so reproject to cte first if needed. + if not all(map(self.is_scalar_expr, used_vars)): + return self._reproject_to_table().project_window_op( + column_name, + op, + window_spec, + output_name, + never_skip_nulls=never_skip_nulls, + ) + column = typing.cast(ibis_types.Column, self._get_ibis_column(column_name)) window = self._ibis_window_from_spec( window_spec, require_total_order=op.uses_total_row_ordering @@ -861,8 +874,7 @@ def project_window_op( window_op = case_statement result = self._set_or_replace_by_id(output_name or column_name, window_op) - # TODO(tbergeron): Automatically track analytic expression usage and defer reprojection until required for valid query generation. - return result._reproject_to_table() if not skip_reproject_unsafe else result + return result def _reproject_to_table(self) -> OrderedIR: table = self._to_ibis_expr( @@ -1034,6 +1046,9 @@ def _to_ibis_expr( return table def filter(self, predicate: ex.Expression) -> OrderedIR: + if any(map(is_window, map(self._get_ibis_column, predicate.unbound_variables))): + # ibis doesn't support qualify syntax + return self._reproject_to_table().filter(predicate) bindings = {col: self._get_ibis_column(col) for col in self.column_ids} condition = op_compiler.compile_expression(predicate, bindings) return self._filter(condition) @@ -1328,6 +1343,22 @@ def build(self) -> OrderedIR: ) +def is_literal(column: ibis_types.Value) -> bool: + # Unfortunately, Literals in ibis are not "Columns"s and therefore can't be aggregated. + return not isinstance(column, ibis_types.Column) + + +def is_window(column: ibis_types.Value) -> bool: + matches = ( + (column) + .op() + .find_topmost( + lambda x: isinstance(x, (ibis_ops.WindowFunction, ibis_ops.Relation)) + ) + ) + return any(isinstance(op, ibis_ops.WindowFunction) for op in matches) + + def _reduce_predicate_list( predicate_list: typing.Collection[ibis_types.BooleanValue], ) -> ibis_types.BooleanValue: diff --git a/bigframes/core/compile/compiler.py b/bigframes/core/compile/compiler.py index c7f8c5ab59..8fb1f7ab3a 100644 --- a/bigframes/core/compile/compiler.py +++ b/bigframes/core/compile/compiler.py @@ -304,7 +304,6 @@ def compile_window(self, node: nodes.WindowOpNode, ordered: bool = True): node.window_spec, node.output_name, never_skip_nulls=node.never_skip_nulls, - skip_reproject_unsafe=node.skip_reproject_unsafe, ) return result if ordered else result.to_unordered() diff --git a/bigframes/core/window_spec.py b/bigframes/core/window_spec.py index 57c57b451a..a18dec638a 100644 --- a/bigframes/core/window_spec.py +++ b/bigframes/core/window_spec.py @@ -14,7 +14,8 @@ from __future__ import annotations from dataclasses import dataclass -from typing import Optional, Tuple, Union +import itertools +from typing import Optional, Set, Tuple, Union import bigframes.core.ordering as orderings @@ -162,3 +163,14 @@ def row_bounded(self): to calculate deterministically. """ return isinstance(self.bounds, RowsWindowBounds) + + @property + def all_referenced_columns(self) -> Set[str]: + """ + Return list of all variables reference ind the window. + """ + ordering_vars = itertools.chain.from_iterable( + item.scalar_expression.unbound_variables for item in self.ordering + ) + itertools.chain(self.grouping_keys, ordering_vars) + return set(itertools.chain(self.grouping_keys, ordering_vars)) From 2378e443730f10ef7869b99ff60e477cf553591d Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Fri, 2 Aug 2024 20:15:51 +0000 Subject: [PATCH 2/5] don't generate invalid row_number() expressions --- bigframes/core/compile/compiled.py | 80 +++++++++++++----------------- bigframes/core/window_spec.py | 1 - bigframes/session/__init__.py | 2 +- 3 files changed, 35 insertions(+), 48 deletions(-) diff --git a/bigframes/core/compile/compiled.py b/bigframes/core/compile/compiled.py index d0441aa049..1a6d3bffb9 100644 --- a/bigframes/core/compile/compiled.py +++ b/bigframes/core/compile/compiled.py @@ -37,7 +37,6 @@ from bigframes.core.ordering import ( ascending_over, encode_order_string, - IntegerEncoding, join_orderings, OrderingExpression, RowOrdering, @@ -160,13 +159,6 @@ def _get_ibis_column(self, key: str) -> ibis_types.Value: ), ) - def is_scalar_expr(self, key: str) -> bool: - # sometimes need to determine if column expression is a scalar expression. - # For instance, cannot filter on an analytic expression, or nest analytic expressions. - # Literals are excluded because ibis itself doesn't work well with them, not because of sql limitations. - ibis_expr = self._get_ibis_column(key) - return not is_literal(ibis_expr) and not is_window(ibis_expr) - def get_column_type(self, key: str) -> bigframes.dtypes.Dtype: ibis_type = typing.cast( bigframes.core.compile.ibis_types.IbisDtype, @@ -301,8 +293,6 @@ def _to_ibis_expr( ArrayValue objects are sorted, so the following options are available to reflect this in the ibis expression. - * "offset_col": Zero-based offsets are generated as a column, this will - not sort the rows however. * "string_encoded": An ordered string column is provided in output table. * "unordered": No ordering information will be provided in output. Only value columns are projected. @@ -789,15 +779,32 @@ def promote_offsets(self, col_id: str) -> OrderedIR: """ # Special case: offsets already exist ordering = self._ordering + # Case 1, already have offsets, just create column from them + if ordering.is_sequential and (ordering.total_order_col is not None): + expr_builder = self.builder() + expr_builder.columns = [ + self._compile_expression( + ordering.total_order_col.scalar_expression + ).name(col_id), + *self.columns, + ] + return expr_builder.build() + # Cannot nest analytic expressions, so reproject to cte first if needed. + # Also ibis cannot window literals, so need to reproject those (even though this is legal in googlesql) + can_directly_window = not any( + map(lambda x: is_literal(x) or is_window(x), self._ibis_order) + ) + if not can_directly_window: + return self._reproject_to_table().promote_offsets(col_id) - if (not ordering.is_sequential) or (not ordering.total_order_col): - return self._project_offsets().promote_offsets(col_id) + window = ibis.window(order_by=self._ibis_order) + if self._predicates: + window = window.group_by(self._reduced_predicate) + offsets = ibis.row_number().over(window) expr_builder = self.builder() expr_builder.columns = [ - self._compile_expression(ordering.total_order_col.scalar_expression).name( - col_id - ), *self.columns, + offsets.name(col_id), ] return expr_builder.build() @@ -819,9 +826,15 @@ def project_window_op( output_name: the id to assign to the output of the operator, by default will replace input col if distinct output id not provided never_skip_nulls: will disable null skipping for operators that would otherwise do so """ - used_vars = [column_name, *window_spec.all_referenced_columns] # Cannot nest analytic expressions, so reproject to cte first if needed. - if not all(map(self.is_scalar_expr, used_vars)): + # Also ibis cannot window literals, so need to reproject those (even though this is legal in googlesql) + used_exprs = map( + self._get_any_column, [column_name, *window_spec.all_referenced_columns] + ) + can_directly_window = not any( + map(lambda x: is_literal(x) or is_window(x), used_exprs) + ) + if not can_directly_window: return self._reproject_to_table().project_window_op( column_name, op, @@ -956,7 +969,7 @@ def _to_ibis_expr( expose_hidden_cols: bool = False, fraction: Optional[float] = None, col_id_overrides: typing.Mapping[str, str] = {}, - ordering_mode: Literal["string_encoded", "offset_col", "unordered"], + ordering_mode: Literal["string_encoded", "unordered"], order_col_name: Optional[str] = ORDER_ID_COLUMN, ): """ @@ -965,8 +978,7 @@ def _to_ibis_expr( ArrayValue objects are sorted, so the following options are available to reflect this in the ibis expression. - * "offset_col": Zero-based offsets are generated as a column, this will - not sort the rows however. + * "string_encoded": An ordered string column is provided in output table. * "unordered": No ordering information will be provided in output. Only value columns are projected. @@ -993,10 +1005,9 @@ def _to_ibis_expr( """ assert ordering_mode in ( "string_encoded", - "offset_col", "unordered", ) - if expose_hidden_cols and ordering_mode in ("ordered_col", "offset_col"): + if expose_hidden_cols and ordering_mode in ("ordered_col"): raise ValueError( f"Cannot expose hidden ordering columns with ordering_mode {ordering_mode}" ) @@ -1189,27 +1200,6 @@ def _bake_ordering(self) -> OrderedIR: predicates=self._predicates, ) - def _project_offsets(self) -> OrderedIR: - """Create a new expression that contains offsets. Should only be executed when - offsets are needed for an operations. Has no effect on expression semantics.""" - if self._ordering.is_sequential: - return self - table = self._to_ibis_expr( - ordering_mode="offset_col", order_col_name=ORDER_ID_COLUMN - ) - columns = [table[column_name] for column_name in self._column_names] - ordering = TotalOrdering( - ordering_value_columns=tuple([ascending_over(ORDER_ID_COLUMN)]), - total_ordering_columns=frozenset([ORDER_ID_COLUMN]), - integer_encoding=IntegerEncoding(True, is_sequential=True), - ) - return OrderedIR( - table, - columns=columns, - hidden_ordering_columns=[table[ORDER_ID_COLUMN]], - ordering=ordering, - ) - def _create_order_columns( self, ordering_mode: str, @@ -1217,9 +1207,7 @@ def _create_order_columns( expose_hidden_cols: bool, ) -> typing.Sequence[ibis_types.Value]: # Generate offsets if current ordering id semantics are not sufficiently strict - if ordering_mode == "offset_col": - return (self._create_offset_column().name(order_col_name),) - elif ordering_mode == "string_encoded": + if ordering_mode == "string_encoded": return (self._create_string_ordering_column().name(order_col_name),) elif expose_hidden_cols: return self._hidden_ordering_columns diff --git a/bigframes/core/window_spec.py b/bigframes/core/window_spec.py index a18dec638a..f011e2848d 100644 --- a/bigframes/core/window_spec.py +++ b/bigframes/core/window_spec.py @@ -172,5 +172,4 @@ def all_referenced_columns(self) -> Set[str]: ordering_vars = itertools.chain.from_iterable( item.scalar_expression.unbound_variables for item in self.ordering ) - itertools.chain(self.grouping_keys, ordering_vars) return set(itertools.chain(self.grouping_keys, ordering_vars)) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 98cba867f2..02c1020729 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -1919,7 +1919,7 @@ def _start_query( new_message = "Computation is too complex to execute as a single query. Try using DataFrame.cache() on intermediate results, or setting bigframes.options.compute.enable_multi_query_execution." raise bigframes.exceptions.QueryComplexityError(new_message) from e else: - raise + raise ValueError(sql) def _start_query_ml_ddl( self, From 424e84a237c4f5b7423c4328d5da38569987501d Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Fri, 2 Aug 2024 20:34:10 +0000 Subject: [PATCH 3/5] fixed unit test by naming index --- tests/unit/session/test_session.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/unit/session/test_session.py b/tests/unit/session/test_session.py index 31029abd67..2f7eaa567a 100644 --- a/tests/unit/session/test_session.py +++ b/tests/unit/session/test_session.py @@ -246,7 +246,8 @@ def test_default_index_warning_not_raised_by_read_gbq_index_col_sequential_int64 index_col=bigframes.enums.DefaultIndexKind.SEQUENTIAL_INT64, ) - # We expect a window operation because we specificaly requested a sequential index. + # We expect a window operation because we specificaly requested a sequential index and named it. + df.index.name = "named_index" generated_sql = df.sql.casefold() assert "OVER".casefold() in generated_sql assert "ROW_NUMBER()".casefold() in generated_sql From 731a65299a93840b23fffa0ce40d6ba37f6c96cf Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Fri, 2 Aug 2024 21:04:27 +0000 Subject: [PATCH 4/5] revert unintended query error handling change --- bigframes/session/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 02c1020729..98cba867f2 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -1919,7 +1919,7 @@ def _start_query( new_message = "Computation is too complex to execute as a single query. Try using DataFrame.cache() on intermediate results, or setting bigframes.options.compute.enable_multi_query_execution." raise bigframes.exceptions.QueryComplexityError(new_message) from e else: - raise ValueError(sql) + raise def _start_query_ml_ddl( self, From 11802b6c055b7cb731560c03e042a5e67f25c009 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Mon, 5 Aug 2024 23:28:08 +0000 Subject: [PATCH 5/5] add ibis issue references --- bigframes/core/compile/compiled.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/bigframes/core/compile/compiled.py b/bigframes/core/compile/compiled.py index 1a6d3bffb9..b4b0d57054 100644 --- a/bigframes/core/compile/compiled.py +++ b/bigframes/core/compile/compiled.py @@ -347,7 +347,8 @@ def _to_ibis_expr( def filter(self, predicate: ex.Expression) -> UnorderedIR: if any(map(is_window, map(self._get_ibis_column, predicate.unbound_variables))): - # ibis doesn't support qualify syntax + # ibis doesn't support qualify syntax, so create CTE if filtering over window expression + # https://github.com/ibis-project/ibis/issues/9775 return self._reproject_to_table().filter(predicate) bindings = {col: self._get_ibis_column(col) for col in self.column_ids} condition = op_compiler.compile_expression(predicate, bindings) @@ -791,6 +792,7 @@ def promote_offsets(self, col_id: str) -> OrderedIR: return expr_builder.build() # Cannot nest analytic expressions, so reproject to cte first if needed. # Also ibis cannot window literals, so need to reproject those (even though this is legal in googlesql) + # Seee: https://github.com/ibis-project/ibis/issues/9773 can_directly_window = not any( map(lambda x: is_literal(x) or is_window(x), self._ibis_order) ) @@ -828,6 +830,7 @@ def project_window_op( """ # Cannot nest analytic expressions, so reproject to cte first if needed. # Also ibis cannot window literals, so need to reproject those (even though this is legal in googlesql) + # See: https://github.com/ibis-project/ibis/issues/9773 used_exprs = map( self._get_any_column, [column_name, *window_spec.all_referenced_columns] ) @@ -1058,7 +1061,8 @@ def _to_ibis_expr( def filter(self, predicate: ex.Expression) -> OrderedIR: if any(map(is_window, map(self._get_ibis_column, predicate.unbound_variables))): - # ibis doesn't support qualify syntax + # ibis doesn't support qualify syntax, so create CTE if filtering over window expression + # https://github.com/ibis-project/ibis/issues/9775 return self._reproject_to_table().filter(predicate) bindings = {col: self._get_ibis_column(col) for col in self.column_ids} condition = op_compiler.compile_expression(predicate, bindings)