diff --git a/CHANGELOG.md b/CHANGELOG.md index 633f9930ea..b27da8d301 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,30 @@ [1]: https://pypi.org/project/bigframes/#history +## [1.11.0](https://github.com/googleapis/python-bigquery-dataframes/compare/v1.10.0...v1.11.0) (2024-07-01) + + +### Features + +* Add .agg support for size ([#792](https://github.com/googleapis/python-bigquery-dataframes/issues/792)) ([87e6018](https://github.com/googleapis/python-bigquery-dataframes/commit/87e60182c964c369079165e87ce73dd0c0481a5a)) +* Add `bigframes.bigquery.json_set` ([#782](https://github.com/googleapis/python-bigquery-dataframes/issues/782)) ([1b613e0](https://github.com/googleapis/python-bigquery-dataframes/commit/1b613e00eddf18fa40ed1d08ff19c4ebeeac2197)) +* Add `bigframes.streaming.to_pubsub` method to create continuous query that writes to Pub/Sub ([#801](https://github.com/googleapis/python-bigquery-dataframes/issues/801)) ([b47f32d](https://github.com/googleapis/python-bigquery-dataframes/commit/b47f32d74a0c9eb908be690b2dd56b0f5579b133)) +* Add `DataFrame.to_arrow` to create Arrow Table from DataFrame ([#807](https://github.com/googleapis/python-bigquery-dataframes/issues/807)) ([1e3feda](https://github.com/googleapis/python-bigquery-dataframes/commit/1e3feda9e8fe9d08a0e3838066f6414f8015197d)) +* Add `PolynomialFeatures` support to `to_gbq` and pipelines ([#805](https://github.com/googleapis/python-bigquery-dataframes/issues/805)) ([57d98b9](https://github.com/googleapis/python-bigquery-dataframes/commit/57d98b9e3298583ec40c04665ab84e6ad2b948fb)) +* Add Series.peek to preview data efficiently ([#727](https://github.com/googleapis/python-bigquery-dataframes/issues/727)) ([580e1b9](https://github.com/googleapis/python-bigquery-dataframes/commit/580e1b9e965d883a67f91a6db8311c2416ca8fe5)) +* Expose gcf memory param in `remote_function` ([#803](https://github.com/googleapis/python-bigquery-dataframes/issues/803)) ([014765c](https://github.com/googleapis/python-bigquery-dataframes/commit/014765c22410a0b4559896d163c440f46f7ce98f)) +* More informative error when query plan too complex ([#811](https://github.com/googleapis/python-bigquery-dataframes/issues/811)) ([136dc24](https://github.com/googleapis/python-bigquery-dataframes/commit/136dc24e160339d27f6335e7b28f08cd95d2c67d)) + + +### Bug Fixes + +* Include internally required packages in `remote_function` hash ([#799](https://github.com/googleapis/python-bigquery-dataframes/issues/799)) ([4b8fc15](https://github.com/googleapis/python-bigquery-dataframes/commit/4b8fc15ec2c126566269f84d75289198fee2c655)) + + +### Documentation + +* Document dtype limitation on row processing `remote_function` ([#800](https://github.com/googleapis/python-bigquery-dataframes/issues/800)) ([487dff6](https://github.com/googleapis/python-bigquery-dataframes/commit/487dff6ac147683aef529e1ff8c197dce3fb437c)) + ## [1.10.0](https://github.com/googleapis/python-bigquery-dataframes/compare/v1.9.0...v1.10.0) (2024-06-21) diff --git a/bigframes/bigquery/__init__.py b/bigframes/bigquery/__init__.py index 85a9010a7d..ec26d14f33 100644 --- a/bigframes/bigquery/__init__.py +++ b/bigframes/bigquery/__init__.py @@ -36,6 +36,10 @@ import bigframes.series as series +# Array functions defined from +# https://cloud.google.com/bigquery/docs/reference/standard-sql/array_functions + + def array_length(series: series.Series) -> series.Series: """Compute the length of each array element in the Series. @@ -154,6 +158,56 @@ def array_to_string(series: series.Series, delimiter: str) -> series.Series: return series._apply_unary_op(ops.ArrayToStringOp(delimiter=delimiter)) +# JSON functions defined from +# https://cloud.google.com/bigquery/docs/reference/standard-sql/json_functions + + +def json_set( + series: series.Series, + json_path_value_pairs: typing.Sequence[typing.Tuple[str, typing.Any]], +) -> series.Series: + """Produces a new JSON value within a Series by inserting or replacing values at + specified paths. + + **Examples:** + + >>> import bigframes.pandas as bpd + >>> import bigframes.bigquery as bbq + >>> import numpy as np + >>> bpd.options.display.progress_bar = None + + >>> s = bpd.read_gbq("SELECT JSON '{\\\"a\\\": 1}' AS data")["data"] + >>> bbq.json_set(s, json_path_value_pairs=[("$.a", 100), ("$.b", "hi")]) + 0 {"a":100,"b":"hi"} + Name: data, dtype: string + + Args: + series (bigframes.series.Series): + The Series containing JSON data (as native JSON objects or JSON-formatted strings). + json_path_value_pairs (Sequence[Tuple[str, typing.Any]]): + Pairs of JSON path and the new value to insert/replace. + + Returns: + bigframes.series.Series: A new Series with the transformed JSON data. + + """ + # SQLGlot parser does not support the "create_if_missing => true" syntax, so + # create_if_missing is not currently implemented. + + for json_path_value_pair in json_path_value_pairs: + if len(json_path_value_pair) != 2: + raise ValueError( + "Incorrect format: Expected (, ), but found: " + + f"{json_path_value_pair}" + ) + + json_path, json_value = json_path_value_pair + series = series._apply_binary_op( + json_value, ops.JSONSet(json_path=json_path), alignment="left" + ) + return series + + def vector_search( base_table: str, column_to_search: str, diff --git a/bigframes/core/__init__.py b/bigframes/core/__init__.py index 89ef5f525e..00a36b9c05 100644 --- a/bigframes/core/__init__.py +++ b/bigframes/core/__init__.py @@ -344,10 +344,12 @@ def project_window_op( never_skip_nulls: will disable null skipping for operators that would otherwise do so skip_reproject_unsafe: skips the reprojection step, can be used when performing many non-dependent window operations, user responsible for not nesting window expressions, or using outputs as join, filter or aggregation keys before a reprojection """ - if not self.session._strictly_ordered: - # TODO: Support unbounded windows with aggregate ops and some row-order-independent analytic ops - # TODO: Support non-deterministic windowing - raise ValueError("Windowed ops not supported in unordered mode") + # TODO: Support non-deterministic windowing + if window_spec.row_bounded or not op.order_independent: + if not self.session._strictly_ordered: + raise ValueError( + "Order-dependent windowed ops not supported in unordered mode" + ) return ArrayValue( nodes.WindowOpNode( child=self.node, diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 598c32670e..379c661179 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -467,6 +467,36 @@ def _validate_result_schema(self, result_df: pd.DataFrame): f"This error should only occur while testing. Ibis schema: {ibis_schema} does not match actual schema: {actual_schema}" ) + def to_arrow( + self, + *, + ordered: bool = True, + ) -> Tuple[pa.Table, bigquery.QueryJob]: + """Run query and download results as a pyarrow Table.""" + # pa.Table.from_pandas puts index columns last, so update the expression to match. + expr = self.expr.select_columns( + list(self.value_columns) + list(self.index_columns) + ) + + _, query_job = self.session._query_to_destination( + self.session._to_sql(expr, ordered=ordered), + list(self.index_columns), + api_name="cached", + do_clustering=False, + ) + results_iterator = query_job.result() + pa_table = results_iterator.to_arrow() + + pa_index_labels = [] + for index_level, index_label in enumerate(self._index_labels): + if isinstance(index_label, str): + pa_index_labels.append(index_label) + else: + pa_index_labels.append(f"__index_level_{index_level}__") + + pa_table = pa_table.rename_columns(list(self.column_labels) + pa_index_labels) + return pa_table, query_job + def to_pandas( self, max_download_size: Optional[int] = None, @@ -995,7 +1025,7 @@ def filter(self, predicate: scalars.Expression): def aggregate_all_and_stack( self, - operation: agg_ops.UnaryAggregateOp, + operation: typing.Union[agg_ops.UnaryAggregateOp, agg_ops.NullaryAggregateOp], *, axis: int | str = 0, value_col_id: str = "values", @@ -1004,7 +1034,12 @@ def aggregate_all_and_stack( axis_n = utils.get_axis_number(axis) if axis_n == 0: aggregations = [ - (ex.UnaryAggregation(operation, ex.free_var(col_id)), col_id) + ( + ex.UnaryAggregation(operation, ex.free_var(col_id)) + if isinstance(operation, agg_ops.UnaryAggregateOp) + else ex.NullaryAggregation(operation), + col_id, + ) for col_id in self.value_columns ] index_id = guid.generate_guid() @@ -1033,6 +1068,11 @@ def aggregate_all_and_stack( (ex.UnaryAggregation(agg_ops.AnyValueOp(), ex.free_var(col_id)), col_id) for col_id in [*self.index_columns] ] + # TODO: may need add NullaryAggregation in main_aggregation + # when agg add support for axis=1, needed for agg("size", axis=1) + assert isinstance( + operation, agg_ops.UnaryAggregateOp + ), f"Expected a unary operation, but got {operation}. Please report this error and how you got here to the BigQuery DataFrames team (bit.ly/bigframes-feedback)." main_aggregation = ( ex.UnaryAggregation(operation, ex.free_var(value_col_id)), value_col_id, @@ -1125,7 +1165,11 @@ def remap_f(x): def aggregate( self, by_column_ids: typing.Sequence[str] = (), - aggregations: typing.Sequence[typing.Tuple[str, agg_ops.UnaryAggregateOp]] = (), + aggregations: typing.Sequence[ + typing.Tuple[ + str, typing.Union[agg_ops.UnaryAggregateOp, agg_ops.NullaryAggregateOp] + ] + ] = (), *, dropna: bool = True, ) -> typing.Tuple[Block, typing.Sequence[str]]: @@ -1139,7 +1183,9 @@ def aggregate( """ agg_specs = [ ( - ex.UnaryAggregation(operation, ex.free_var(input_id)), + ex.UnaryAggregation(operation, ex.free_var(input_id)) + if isinstance(operation, agg_ops.UnaryAggregateOp) + else ex.NullaryAggregation(operation), guid.generate_guid(), ) for input_id, operation in aggregations @@ -1175,18 +1221,32 @@ def aggregate( output_col_ids, ) - def get_stat(self, column_id: str, stat: agg_ops.UnaryAggregateOp): + def get_stat( + self, + column_id: str, + stat: typing.Union[agg_ops.UnaryAggregateOp, agg_ops.NullaryAggregateOp], + ): """Gets aggregates immediately, and caches it""" if stat.name in self._stats_cache[column_id]: return self._stats_cache[column_id][stat.name] # TODO: Convert nonstandard stats into standard stats where possible (popvar, etc.) # if getting a standard stat, just go get the rest of them - standard_stats = self._standard_stats(column_id) + standard_stats = typing.cast( + typing.Sequence[ + typing.Union[agg_ops.UnaryAggregateOp, agg_ops.NullaryAggregateOp] + ], + self._standard_stats(column_id), + ) stats_to_fetch = standard_stats if stat in standard_stats else [stat] aggregations = [ - (ex.UnaryAggregation(stat, ex.free_var(column_id)), stat.name) + ( + ex.UnaryAggregation(stat, ex.free_var(column_id)) + if isinstance(stat, agg_ops.UnaryAggregateOp) + else ex.NullaryAggregation(stat), + stat.name, + ) for stat in stats_to_fetch ] expr = self.expr.aggregate(aggregations) @@ -1231,13 +1291,20 @@ def get_binary_stat( def summarize( self, column_ids: typing.Sequence[str], - stats: typing.Sequence[agg_ops.UnaryAggregateOp], + stats: typing.Sequence[ + typing.Union[agg_ops.UnaryAggregateOp, agg_ops.NullaryAggregateOp] + ], ): """Get a list of stats as a deferred block object.""" label_col_id = guid.generate_guid() labels = [stat.name for stat in stats] aggregations = [ - (ex.UnaryAggregation(stat, ex.free_var(col_id)), f"{col_id}-{stat.name}") + ( + ex.UnaryAggregation(stat, ex.free_var(col_id)) + if isinstance(stat, agg_ops.UnaryAggregateOp) + else ex.NullaryAggregation(stat), + f"{col_id}-{stat.name}", + ) for stat in stats for col_id in column_ids ] @@ -2286,13 +2353,13 @@ def to_sql_query( idx_labels, ) - def cached(self, *, optimize_offsets=False, force: bool = False) -> None: + def cached(self, *, force: bool = False, session_aware: bool = False) -> None: """Write the block to a session table.""" # use a heuristic for whether something needs to be cached if (not force) and self.session._is_trivially_executable(self.expr): return - if optimize_offsets: - self.session._cache_with_offsets(self.expr) + elif session_aware: + self.session._cache_with_session_awareness(self.expr) else: self.session._cache_with_cluster_cols( self.expr, cluster_cols=self.index_columns diff --git a/bigframes/core/compile/scalar_op_compiler.py b/bigframes/core/compile/scalar_op_compiler.py index 6b8e60434e..0bc9f2e370 100644 --- a/bigframes/core/compile/scalar_op_compiler.py +++ b/bigframes/core/compile/scalar_op_compiler.py @@ -894,6 +894,26 @@ def array_to_string_op_impl(x: ibis_types.Value, op: ops.ArrayToStringOp): return typing.cast(ibis_types.ArrayValue, x).join(op.delimiter) +# JSON Ops +@scalar_op_compiler.register_binary_op(ops.JSONSet, pass_op=True) +def json_set_op_impl(x: ibis_types.Value, y: ibis_types.Value, op: ops.JSONSet): + if x.type().is_json(): + return json_set( + json_obj=x, + json_path=op.json_path, + json_value=y, + ).to_expr() + else: + # Enabling JSON type eliminates the need for less efficient string conversions. + return vendored_ibis_ops.ToJsonString( + json_set( + json_obj=parse_json(x), + json_path=op.json_path, + json_value=y, + ) + ).to_expr() + + ### Binary Ops def short_circuit_nulls(type_override: typing.Optional[ibis_dtypes.DataType] = None): """Wraps a binary operator to generate nulls of the expected type if either input is a null scalar.""" @@ -1469,3 +1489,15 @@ def float_floor(a: float) -> float: def float_ceil(a: float) -> float: """Convert string to timestamp.""" return 0 # pragma: NO COVER + + +@ibis.udf.scalar.builtin(name="parse_json") +def parse_json(a: str) -> ibis_dtypes.JSON: + """Converts a JSON-formatted STRING value to a JSON value.""" + + +@ibis.udf.scalar.builtin(name="json_set") +def json_set( + json_obj: ibis_dtypes.JSON, json_path: ibis_dtypes.str, json_value +) -> ibis_dtypes.JSON: + """Produces a new SQL JSON value with the specified JSON data inserted or replaced.""" diff --git a/bigframes/core/groupby/__init__.py b/bigframes/core/groupby/__init__.py index 9c2bf18caa..11a5d43ba0 100644 --- a/bigframes/core/groupby/__init__.py +++ b/bigframes/core/groupby/__init__.py @@ -27,6 +27,7 @@ import bigframes.core.blocks as blocks import bigframes.core.ordering as order import bigframes.core.utils as utils +import bigframes.core.validations as validations import bigframes.core.window as windows import bigframes.core.window_spec as window_specs import bigframes.dataframe as df @@ -72,6 +73,10 @@ def __init__( if col_id not in self._by_col_ids ] + @property + def _session(self) -> core.Session: + return self._block.session + def __getitem__( self, key: typing.Union[ @@ -229,20 +234,25 @@ def count(self) -> df.DataFrame: def nunique(self) -> df.DataFrame: return self._aggregate_all(agg_ops.nunique_op) + @validations.requires_strict_ordering() def cumsum(self, *args, numeric_only: bool = False, **kwargs) -> df.DataFrame: if not numeric_only: self._raise_on_non_numeric("cumsum") return self._apply_window_op(agg_ops.sum_op, numeric_only=True) + @validations.requires_strict_ordering() def cummin(self, *args, numeric_only: bool = False, **kwargs) -> df.DataFrame: return self._apply_window_op(agg_ops.min_op, numeric_only=numeric_only) + @validations.requires_strict_ordering() def cummax(self, *args, numeric_only: bool = False, **kwargs) -> df.DataFrame: return self._apply_window_op(agg_ops.max_op, numeric_only=numeric_only) + @validations.requires_strict_ordering() def cumprod(self, *args, **kwargs) -> df.DataFrame: return self._apply_window_op(agg_ops.product_op, numeric_only=True) + @validations.requires_strict_ordering() def shift(self, periods=1) -> series.Series: window = window_specs.rows( grouping_keys=tuple(self._by_col_ids), @@ -251,6 +261,7 @@ def shift(self, periods=1) -> series.Series: ) return self._apply_window_op(agg_ops.ShiftOp(periods), window=window) + @validations.requires_strict_ordering() def diff(self, periods=1) -> series.Series: window = window_specs.rows( grouping_keys=tuple(self._by_col_ids), @@ -259,6 +270,7 @@ def diff(self, periods=1) -> series.Series: ) return self._apply_window_op(agg_ops.DiffOp(periods), window=window) + @validations.requires_strict_ordering() def rolling(self, window: int, min_periods=None) -> windows.Window: # To get n size window, need current row and n-1 preceding rows. window_spec = window_specs.rows( @@ -274,6 +286,7 @@ def rolling(self, window: int, min_periods=None) -> windows.Window: block, window_spec, self._selected_cols, drop_null_groups=self._dropna ) + @validations.requires_strict_ordering() def expanding(self, min_periods: int = 1) -> windows.Window: window_spec = window_specs.cumulative_rows( grouping_keys=tuple(self._by_col_ids), @@ -286,10 +299,10 @@ def expanding(self, min_periods: int = 1) -> windows.Window: block, window_spec, self._selected_cols, drop_null_groups=self._dropna ) - def agg(self, func=None, **kwargs) -> df.DataFrame: + def agg(self, func=None, **kwargs) -> typing.Union[df.DataFrame, series.Series]: if func: if isinstance(func, str): - return self._agg_string(func) + return self.size() if func == "size" else self._agg_string(func) elif utils.is_dict_like(func): return self._agg_dict(func) elif utils.is_list_like(func): @@ -315,7 +328,11 @@ def _agg_string(self, func: str) -> df.DataFrame: return dataframe if self._as_index else self._convert_index(dataframe) def _agg_dict(self, func: typing.Mapping) -> df.DataFrame: - aggregations: typing.List[typing.Tuple[str, agg_ops.UnaryAggregateOp]] = [] + aggregations: typing.List[ + typing.Tuple[ + str, typing.Union[agg_ops.UnaryAggregateOp, agg_ops.NullaryAggregateOp] + ] + ] = [] column_labels = [] want_aggfunc_level = any(utils.is_list_like(aggs) for aggs in func.values()) @@ -510,6 +527,10 @@ def __init__( self._value_name = value_name self._dropna = dropna # Applies to aggregations but not windowing + @property + def _session(self) -> core.Session: + return self._block.session + def head(self, n: int = 5) -> series.Series: block = self._block if self._dropna: @@ -627,26 +648,31 @@ def agg(self, func=None) -> typing.Union[df.DataFrame, series.Series]: aggregate = agg + @validations.requires_strict_ordering() def cumsum(self, *args, **kwargs) -> series.Series: return self._apply_window_op( agg_ops.sum_op, ) + @validations.requires_strict_ordering() def cumprod(self, *args, **kwargs) -> series.Series: return self._apply_window_op( agg_ops.product_op, ) + @validations.requires_strict_ordering() def cummax(self, *args, **kwargs) -> series.Series: return self._apply_window_op( agg_ops.max_op, ) + @validations.requires_strict_ordering() def cummin(self, *args, **kwargs) -> series.Series: return self._apply_window_op( agg_ops.min_op, ) + @validations.requires_strict_ordering() def cumcount(self, *args, **kwargs) -> series.Series: return ( self._apply_window_op( @@ -656,6 +682,7 @@ def cumcount(self, *args, **kwargs) -> series.Series: - 1 ) + @validations.requires_strict_ordering() def shift(self, periods=1) -> series.Series: """Shift index by desired number of periods.""" window = window_specs.rows( @@ -665,6 +692,7 @@ def shift(self, periods=1) -> series.Series: ) return self._apply_window_op(agg_ops.ShiftOp(periods), window=window) + @validations.requires_strict_ordering() def diff(self, periods=1) -> series.Series: window = window_specs.rows( grouping_keys=tuple(self._by_col_ids), @@ -673,6 +701,7 @@ def diff(self, periods=1) -> series.Series: ) return self._apply_window_op(agg_ops.DiffOp(periods), window=window) + @validations.requires_strict_ordering() def rolling(self, window: int, min_periods=None) -> windows.Window: # To get n size window, need current row and n-1 preceding rows. window_spec = window_specs.rows( @@ -692,6 +721,7 @@ def rolling(self, window: int, min_periods=None) -> windows.Window: is_series=True, ) + @validations.requires_strict_ordering() def expanding(self, min_periods: int = 1) -> windows.Window: window_spec = window_specs.cumulative_rows( grouping_keys=tuple(self._by_col_ids), diff --git a/bigframes/core/indexes/base.py b/bigframes/core/indexes/base.py index cfb22929c8..696742180b 100644 --- a/bigframes/core/indexes/base.py +++ b/bigframes/core/indexes/base.py @@ -30,6 +30,7 @@ import bigframes.core.expression as ex import bigframes.core.ordering as order import bigframes.core.utils as utils +import bigframes.core.validations as validations import bigframes.dtypes import bigframes.formatting_helpers as formatter import bigframes.operations as ops @@ -114,6 +115,10 @@ def from_frame( index._linked_frame = frame return index + @property + def _session(self): + return self._block.session + @property def name(self) -> blocks.Label: names = self.names @@ -179,6 +184,7 @@ def empty(self) -> bool: return self.shape[0] == 0 @property + @validations.requires_strict_ordering() def is_monotonic_increasing(self) -> bool: """ Return a boolean if the values are equal or increasing. @@ -192,6 +198,7 @@ def is_monotonic_increasing(self) -> bool: ) @property + @validations.requires_strict_ordering() def is_monotonic_decreasing(self) -> bool: """ Return a boolean if the values are equal or decreasing. @@ -341,6 +348,7 @@ def max(self) -> typing.Any: def min(self) -> typing.Any: return self._apply_aggregation(agg_ops.min_op) + @validations.requires_strict_ordering() def argmax(self) -> int: block, row_nums = self._block.promote_offsets() block = block.order_by( @@ -353,6 +361,7 @@ def argmax(self) -> int: return typing.cast(int, series.Series(block.select_column(row_nums)).iloc[0]) + @validations.requires_strict_ordering() def argmin(self) -> int: block, row_nums = self._block.promote_offsets() block = block.order_by( @@ -424,6 +433,8 @@ def dropna(self, how: typing.Literal["all", "any"] = "any") -> Index: return Index(result) def drop_duplicates(self, *, keep: str = "first") -> Index: + if keep is not False: + validations.enforce_ordered(self, "drop_duplicates") block = block_ops.drop_duplicates(self._block, self._block.index_columns, keep) return Index(block) diff --git a/bigframes/core/pruning.py b/bigframes/core/pruning.py new file mode 100644 index 0000000000..55165a616c --- /dev/null +++ b/bigframes/core/pruning.py @@ -0,0 +1,77 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import bigframes.core.expression as ex +import bigframes.core.schema as schemata +import bigframes.dtypes +import bigframes.operations as ops + +LOW_CARDINALITY_TYPES = [bigframes.dtypes.BOOL_DTYPE] + +COMPARISON_OP_TYPES = tuple( + type(i) + for i in ( + ops.eq_op, + ops.eq_null_match_op, + ops.ne_op, + ops.gt_op, + ops.ge_op, + ops.lt_op, + ops.le_op, + ) +) + + +def cluster_cols_for_predicate( + predicate: ex.Expression, schema: schemata.ArraySchema +) -> list[str]: + """Try to determine cluster col candidates that work with given predicates.""" + # TODO: Prioritize based on predicted selectivity (eg. equality conditions are probably very selective) + if isinstance(predicate, ex.UnboundVariableExpression): + cols = [predicate.id] + elif isinstance(predicate, ex.OpExpression): + op = predicate.op + # TODO: Support geo predicates, which support pruning if clustered (other than st_disjoint) + # https://cloud.google.com/bigquery/docs/reference/standard-sql/geography_functions + if isinstance(op, COMPARISON_OP_TYPES): + cols = cluster_cols_for_comparison(predicate.inputs[0], predicate.inputs[1]) + elif isinstance(op, (type(ops.invert_op))): + cols = cluster_cols_for_predicate(predicate.inputs[0], schema) + elif isinstance(op, (type(ops.and_op), type(ops.or_op))): + left_cols = cluster_cols_for_predicate(predicate.inputs[0], schema) + right_cols = cluster_cols_for_predicate(predicate.inputs[1], schema) + cols = [*left_cols, *[col for col in right_cols if col not in left_cols]] + else: + cols = [] + else: + # Constant + cols = [] + return [ + col for col in cols if bigframes.dtypes.is_clusterable(schema.get_type(col)) + ] + + +def cluster_cols_for_comparison( + left_ex: ex.Expression, right_ex: ex.Expression +) -> list[str]: + # TODO: Try to normalize expressions such that one side is a single variable. + # eg. Convert -cola>=3 to cola<-3 and colb+3 < 4 to colb < 1 + if left_ex.is_const: + # There are some invertible ops that would also be ok + if isinstance(right_ex, ex.UnboundVariableExpression): + return [right_ex.id] + elif right_ex.is_const: + if isinstance(left_ex, ex.UnboundVariableExpression): + return [left_ex.id] + return [] diff --git a/bigframes/core/tree_properties.py b/bigframes/core/tree_properties.py index 2847a8f7f1..846cf50d77 100644 --- a/bigframes/core/tree_properties.py +++ b/bigframes/core/tree_properties.py @@ -15,7 +15,7 @@ import functools import itertools -from typing import Callable, Dict, Optional +from typing import Callable, Dict, Optional, Sequence import bigframes.core.nodes as nodes @@ -91,6 +91,43 @@ def _node_counts_inner( ) +def count_nodes(forest: Sequence[nodes.BigFrameNode]) -> dict[nodes.BigFrameNode, int]: + """ + Counts the number of instances of each subtree present within a forest. + + Memoizes internally to accelerate execution, but cache not persisted (not reused between invocations). + + Args: + forest (Sequence of BigFrameNode): + The roots of each tree in the forest + + Returns: + dict[BigFramesNode, int]: The number of occurences of each subtree. + """ + + def _combine_counts( + left: Dict[nodes.BigFrameNode, int], right: Dict[nodes.BigFrameNode, int] + ) -> Dict[nodes.BigFrameNode, int]: + return { + key: left.get(key, 0) + right.get(key, 0) + for key in itertools.chain(left.keys(), right.keys()) + } + + empty_counts: Dict[nodes.BigFrameNode, int] = {} + + @functools.cache + def _node_counts_inner( + subtree: nodes.BigFrameNode, + ) -> Dict[nodes.BigFrameNode, int]: + """Helper function to count occurences of duplicate nodes in a subtree. Considers only nodes in a complexity range""" + child_counts = [_node_counts_inner(child) for child in subtree.child_nodes] + node_counts = functools.reduce(_combine_counts, child_counts, empty_counts) + return _combine_counts(node_counts, {subtree: 1}) + + counts = [_node_counts_inner(root) for root in forest] + return functools.reduce(_combine_counts, counts, empty_counts) + + def replace_nodes( root: nodes.BigFrameNode, replacements: dict[nodes.BigFrameNode, nodes.BigFrameNode], diff --git a/bigframes/core/validations.py b/bigframes/core/validations.py new file mode 100644 index 0000000000..dc22047e3b --- /dev/null +++ b/bigframes/core/validations.py @@ -0,0 +1,51 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""DataFrame is a two dimensional data structure.""" + +from __future__ import annotations + +import functools +from typing import Protocol, TYPE_CHECKING + +import bigframes.constants +import bigframes.exceptions + +if TYPE_CHECKING: + from bigframes import Session + + +class HasSession(Protocol): + @property + def _session(self) -> Session: + ... + + +def requires_strict_ordering(): + def decorator(meth): + @functools.wraps(meth) + def guarded_meth(object: HasSession, *args, **kwargs): + enforce_ordered(object, meth.__name__) + return meth(object, *args, **kwargs) + + return guarded_meth + + return decorator + + +def enforce_ordered(object: HasSession, opname: str) -> None: + if not object._session._strictly_ordered: + raise bigframes.exceptions.OrderRequiredError( + f"Op {opname} not supported when strict ordering is disabled. {bigframes.constants.FEEDBACK_LINK}" + ) diff --git a/bigframes/core/window_spec.py b/bigframes/core/window_spec.py index 71e88a4c3d..57c57b451a 100644 --- a/bigframes/core/window_spec.py +++ b/bigframes/core/window_spec.py @@ -152,3 +152,13 @@ class WindowSpec: ordering: Tuple[orderings.OrderingExpression, ...] = tuple() bounds: Union[RowsWindowBounds, RangeWindowBounds, None] = None min_periods: int = 0 + + @property + def row_bounded(self): + """ + Whether the window is bounded by row offsets. + + This is relevant for determining whether the window requires a total order + to calculate deterministically. + """ + return isinstance(self.bounds, RowsWindowBounds) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 75420ca957..4dcc4414ed 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -44,6 +44,7 @@ import numpy import pandas import pandas.io.formats.format +import pyarrow import tabulate import bigframes @@ -61,6 +62,7 @@ import bigframes.core.indexes as indexes import bigframes.core.ordering as order import bigframes.core.utils as utils +import bigframes.core.validations as validations import bigframes.core.window import bigframes.core.window_spec as window_spec import bigframes.dtypes @@ -277,10 +279,12 @@ def loc(self) -> indexers.LocDataFrameIndexer: return indexers.LocDataFrameIndexer(self) @property + @validations.requires_strict_ordering() def iloc(self) -> indexers.ILocDataFrameIndexer: return indexers.ILocDataFrameIndexer(self) @property + @validations.requires_strict_ordering() def iat(self) -> indexers.IatDataFrameIndexer: return indexers.IatDataFrameIndexer(self) @@ -337,10 +341,12 @@ def _has_index(self) -> bool: return len(self._block.index_columns) > 0 @property + @validations.requires_strict_ordering() def T(self) -> DataFrame: return DataFrame(self._get_block().transpose()) @requires_index + @validations.requires_strict_ordering() def transpose(self) -> DataFrame: return self.T @@ -1183,6 +1189,34 @@ def cov(self, *, numeric_only: bool = False) -> DataFrame: return DataFrame(frame._block.calculate_pairwise_metric(agg_ops.CovOp())) + def to_arrow( + self, + *, + ordered: Optional[bool] = None, + ) -> pyarrow.Table: + """Write DataFrame to an Arrow table / record batch. + + Args: + ordered (bool, default None): + Determines whether the resulting Arrow table will be deterministically ordered. + In some cases, unordered may result in a faster-executing query. If set to a value + other than None, will override Session default. + + Returns: + pyarrow.Table: A pyarrow Table with all rows and columns of this DataFrame. + """ + warnings.warn( + "to_arrow is in preview. Types and unnamed / duplicate name columns may change in future.", + category=bigframes.exceptions.PreviewWarning, + ) + + self._optimize_query_complexity() + pa_table, query_job = self._block.to_arrow( + ordered=ordered if ordered is not None else self._session._strictly_ordered, + ) + self._set_internal_query_job(query_job) + return pa_table + def to_pandas( self, max_download_size: Optional[int] = None, @@ -1264,6 +1298,7 @@ def copy(self) -> DataFrame: def head(self, n: int = 5) -> DataFrame: return typing.cast(DataFrame, self.iloc[:n]) + @validations.requires_strict_ordering() def tail(self, n: int = 5) -> DataFrame: return typing.cast(DataFrame, self.iloc[-n:]) @@ -1306,6 +1341,8 @@ def nlargest( ) -> DataFrame: if keep not in ("first", "last", "all"): raise ValueError("'keep must be one of 'first', 'last', or 'all'") + if keep != "all": + validations.enforce_ordered(self, "nlargest") column_ids = self._sql_names(columns) return DataFrame(block_ops.nlargest(self._block, n, column_ids, keep=keep)) @@ -1317,6 +1354,8 @@ def nsmallest( ) -> DataFrame: if keep not in ("first", "last", "all"): raise ValueError("'keep must be one of 'first', 'last', or 'all'") + if keep != "all": + validations.enforce_ordered(self, "nlargest") column_ids = self._sql_names(columns) return DataFrame(block_ops.nsmallest(self._block, n, column_ids, keep=keep)) @@ -1499,6 +1538,7 @@ def rename_axis( labels = [mapper] return DataFrame(self._block.with_index_labels(labels)) + @validations.requires_strict_ordering() def equals(self, other: typing.Union[bigframes.series.Series, DataFrame]) -> bool: # Must be same object type, same column dtypes, and same label values if not isinstance(other, DataFrame): @@ -1896,6 +1936,7 @@ def _reindex_columns(self, columns): def reindex_like(self, other: DataFrame, *, validate: typing.Optional[bool] = None): return self.reindex(index=other.index, columns=other.columns, validate=validate) + @validations.requires_strict_ordering() @requires_index def interpolate(self, method: str = "linear") -> DataFrame: if method == "pad": @@ -1921,10 +1962,12 @@ def replace( lambda x: x.replace(to_replace=to_replace, value=value, regex=regex) ) + @validations.requires_strict_ordering() def ffill(self, *, limit: typing.Optional[int] = None) -> DataFrame: window = window_spec.rows(preceding=limit, following=0) return self._apply_window_op(agg_ops.LastNonNullOp(), window) + @validations.requires_strict_ordering() def bfill(self, *, limit: typing.Optional[int] = None) -> DataFrame: window = window_spec.rows(preceding=0, following=limit) return self._apply_window_op(agg_ops.FirstNonNullOp(), window) @@ -2190,13 +2233,16 @@ def agg( aggregate.__doc__ = inspect.getdoc(vendored_pandas_frame.DataFrame.agg) @requires_index + @validations.requires_strict_ordering() def idxmin(self) -> bigframes.series.Series: return bigframes.series.Series(block_ops.idxmin(self._block)) @requires_index + @validations.requires_strict_ordering() def idxmax(self) -> bigframes.series.Series: return bigframes.series.Series(block_ops.idxmax(self._block)) + @validations.requires_strict_ordering() def melt( self, id_vars: typing.Optional[typing.Iterable[typing.Hashable]] = None, @@ -2301,6 +2347,7 @@ def _pivot( return DataFrame(pivot_block) @requires_index + @validations.requires_strict_ordering() def pivot( self, *, @@ -2315,6 +2362,7 @@ def pivot( return self._pivot(columns=columns, index=index, values=values) @requires_index + @validations.requires_strict_ordering() def pivot_table( self, values: typing.Optional[ @@ -2414,6 +2462,7 @@ def _stack_multi(self, level: LevelsType = -1): return DataFrame(block) @requires_index + @validations.requires_strict_ordering() def unstack(self, level: LevelsType = -1): if not utils.is_list_like(level): level = [level] @@ -2624,6 +2673,7 @@ def _perform_join_by_index( block, _ = self._block.join(other._block, how=how, block_identity_join=True) return DataFrame(block) + @validations.requires_strict_ordering() def rolling(self, window: int, min_periods=None) -> bigframes.core.window.Window: # To get n size window, need current row and n-1 preceding rows. window_def = window_spec.rows( @@ -2633,6 +2683,7 @@ def rolling(self, window: int, min_periods=None) -> bigframes.core.window.Window self._block, window_def, self._block.value_columns ) + @validations.requires_strict_ordering() def expanding(self, min_periods: int = 1) -> bigframes.core.window.Window: window = window_spec.cumulative_rows(min_periods=min_periods) return bigframes.core.window.Window( @@ -2735,6 +2786,7 @@ def notna(self) -> DataFrame: notnull = notna notnull.__doc__ = inspect.getdoc(vendored_pandas_frame.DataFrame.notna) + @validations.requires_strict_ordering() def cumsum(self): is_numeric_types = [ (dtype in bigframes.dtypes.NUMERIC_BIGFRAMES_TYPES_PERMISSIVE) @@ -2747,6 +2799,7 @@ def cumsum(self): window_spec.cumulative_rows(), ) + @validations.requires_strict_ordering() def cumprod(self) -> DataFrame: is_numeric_types = [ (dtype in bigframes.dtypes.NUMERIC_BIGFRAMES_TYPES_PERMISSIVE) @@ -2759,18 +2812,21 @@ def cumprod(self) -> DataFrame: window_spec.cumulative_rows(), ) + @validations.requires_strict_ordering() def cummin(self) -> DataFrame: return self._apply_window_op( agg_ops.min_op, window_spec.cumulative_rows(), ) + @validations.requires_strict_ordering() def cummax(self) -> DataFrame: return self._apply_window_op( agg_ops.max_op, window_spec.cumulative_rows(), ) + @validations.requires_strict_ordering() def shift(self, periods: int = 1) -> DataFrame: window = window_spec.rows( preceding=periods if periods > 0 else None, @@ -2778,6 +2834,7 @@ def shift(self, periods: int = 1) -> DataFrame: ) return self._apply_window_op(agg_ops.ShiftOp(periods), window) + @validations.requires_strict_ordering() def diff(self, periods: int = 1) -> DataFrame: window = window_spec.rows( preceding=periods if periods > 0 else None, @@ -2785,6 +2842,7 @@ def diff(self, periods: int = 1) -> DataFrame: ) return self._apply_window_op(agg_ops.DiffOp(periods), window) + @validations.requires_strict_ordering() def pct_change(self, periods: int = 1) -> DataFrame: # Future versions of pandas will not perfrom ffill automatically df = self.ffill() @@ -2802,6 +2860,7 @@ def _apply_window_op( ) return DataFrame(block.select_columns(result_ids)) + @validations.requires_strict_ordering() def sample( self, n: Optional[int] = None, @@ -3457,6 +3516,8 @@ def drop_duplicates( *, keep: str = "first", ) -> DataFrame: + if keep is not False: + validations.enforce_ordered(self, "drop_duplicates(keep != False)") if subset is None: column_ids = self._block.value_columns elif utils.is_list_like(subset): @@ -3470,6 +3531,8 @@ def drop_duplicates( return DataFrame(block) def duplicated(self, subset=None, keep: str = "first") -> bigframes.series.Series: + if keep is not False: + validations.enforce_ordered(self, "duplicated(keep != False)") if subset is None: column_ids = self._block.value_columns else: @@ -3563,6 +3626,7 @@ def _optimize_query_complexity(self): _DataFrameOrSeries = typing.TypeVar("_DataFrameOrSeries") + @validations.requires_strict_ordering() def dot(self, other: _DataFrameOrSeries) -> _DataFrameOrSeries: if not isinstance(other, (DataFrame, bf_series.Series)): raise NotImplementedError( diff --git a/bigframes/dtypes.py b/bigframes/dtypes.py index ced1c215e5..160802ded9 100644 --- a/bigframes/dtypes.py +++ b/bigframes/dtypes.py @@ -74,52 +74,95 @@ class SimpleDtypeInfo: logical_bytes: int = ( 8 # this is approximate only, some types are variably sized, also, compression ) + orderable: bool = False + clusterable: bool = False # TODO: Missing BQ types: INTERVAL, JSON, RANGE # TODO: Add mappings to python types SIMPLE_TYPES = ( SimpleDtypeInfo( - dtype=INT_DTYPE, arrow_dtype=pa.int64(), type_kind=("INT64", "INTEGER") + dtype=INT_DTYPE, + arrow_dtype=pa.int64(), + type_kind=("INT64", "INTEGER"), + orderable=True, + clusterable=True, ), SimpleDtypeInfo( - dtype=FLOAT_DTYPE, arrow_dtype=pa.float64(), type_kind=("FLOAT64", "FLOAT") + dtype=FLOAT_DTYPE, + arrow_dtype=pa.float64(), + type_kind=("FLOAT64", "FLOAT"), + orderable=True, ), SimpleDtypeInfo( dtype=BOOL_DTYPE, arrow_dtype=pa.bool_(), type_kind=("BOOL", "BOOLEAN"), logical_bytes=1, + orderable=True, + clusterable=True, ), - SimpleDtypeInfo(dtype=STRING_DTYPE, arrow_dtype=pa.string(), type_kind=("STRING",)), SimpleDtypeInfo( - dtype=DATE_DTYPE, arrow_dtype=pa.date32(), type_kind=("DATE",), logical_bytes=4 + dtype=STRING_DTYPE, + arrow_dtype=pa.string(), + type_kind=("STRING",), + orderable=True, + clusterable=True, ), - SimpleDtypeInfo(dtype=TIME_DTYPE, arrow_dtype=pa.time64("us"), type_kind=("TIME",)), SimpleDtypeInfo( - dtype=DATETIME_DTYPE, arrow_dtype=pa.timestamp("us"), type_kind=("DATETIME",) + dtype=DATE_DTYPE, + arrow_dtype=pa.date32(), + type_kind=("DATE",), + logical_bytes=4, + orderable=True, + clusterable=True, + ), + SimpleDtypeInfo( + dtype=TIME_DTYPE, + arrow_dtype=pa.time64("us"), + type_kind=("TIME",), + orderable=True, + ), + SimpleDtypeInfo( + dtype=DATETIME_DTYPE, + arrow_dtype=pa.timestamp("us"), + type_kind=("DATETIME",), + orderable=True, + clusterable=True, ), SimpleDtypeInfo( dtype=TIMESTAMP_DTYPE, arrow_dtype=pa.timestamp("us", tz="UTC"), type_kind=("TIMESTAMP",), + orderable=True, + clusterable=True, + ), + SimpleDtypeInfo( + dtype=BYTES_DTYPE, arrow_dtype=pa.binary(), type_kind=("BYTES",), orderable=True ), - SimpleDtypeInfo(dtype=BYTES_DTYPE, arrow_dtype=pa.binary(), type_kind=("BYTES",)), SimpleDtypeInfo( dtype=NUMERIC_DTYPE, arrow_dtype=pa.decimal128(38, 9), type_kind=("NUMERIC",), logical_bytes=16, + orderable=True, + clusterable=True, ), SimpleDtypeInfo( dtype=BIGNUMERIC_DTYPE, arrow_dtype=pa.decimal256(76, 38), type_kind=("BIGNUMERIC",), logical_bytes=32, + orderable=True, + clusterable=True, ), # Geo has no corresponding arrow dtype SimpleDtypeInfo( - dtype=GEO_DTYPE, arrow_dtype=None, type_kind=("GEOGRAPHY",), logical_bytes=40 + dtype=GEO_DTYPE, + arrow_dtype=None, + type_kind=("GEOGRAPHY",), + logical_bytes=40, + clusterable=True, ), ) @@ -197,6 +240,17 @@ def is_struct_like(type: ExpressionType) -> bool: ) +def is_json_like(type: ExpressionType) -> bool: + # TODO: Add JSON type support + return type == STRING_DTYPE + + +def is_json_encoding_type(type: ExpressionType) -> bool: + # Types can be converted into JSON. + # https://cloud.google.com/bigquery/docs/reference/standard-sql/json_functions#json_encodings + return type != GEO_DTYPE + + def is_numeric(type: ExpressionType) -> bool: return type in NUMERIC_BIGFRAMES_TYPES_PERMISSIVE @@ -209,9 +263,25 @@ def is_comparable(type: ExpressionType) -> bool: return (type is not None) and is_orderable(type) +_ORDERABLE_SIMPLE_TYPES = set( + mapping.dtype for mapping in SIMPLE_TYPES if mapping.orderable +) + + def is_orderable(type: ExpressionType) -> bool: # On BQ side, ARRAY, STRUCT, GEOGRAPHY, JSON are not orderable - return not is_array_like(type) and not is_struct_like(type) and (type != GEO_DTYPE) + return type in _ORDERABLE_SIMPLE_TYPES + + +_CLUSTERABLE_SIMPLE_TYPES = set( + mapping.dtype for mapping in SIMPLE_TYPES if mapping.clusterable +) + + +def is_clusterable(type: ExpressionType) -> bool: + # https://cloud.google.com/bigquery/docs/clustered-tables#cluster_column_types + # This is based on default database type mapping, could in theory represent in non-default bq type to cluster. + return type in _CLUSTERABLE_SIMPLE_TYPES def is_bool_coercable(type: ExpressionType) -> bool: diff --git a/bigframes/exceptions.py b/bigframes/exceptions.py index bae239b6da..1d31749760 100644 --- a/bigframes/exceptions.py +++ b/bigframes/exceptions.py @@ -47,5 +47,13 @@ class NullIndexError(ValueError): """Object has no index.""" +class OrderRequiredError(ValueError): + """Operation requires total row ordering to be enabled.""" + + +class QueryComplexityError(RuntimeError): + """Query plan is too complex to execute.""" + + class TimeTravelDisabledWarning(Warning): """A query was reattempted without time travel.""" diff --git a/bigframes/functions/remote_function.py b/bigframes/functions/remote_function.py index 920dc7c039..c1878b6c31 100644 --- a/bigframes/functions/remote_function.py +++ b/bigframes/functions/remote_function.py @@ -102,6 +102,24 @@ def _get_hash(def_, package_requirements=None): return hashlib.md5(def_repr).hexdigest() +def _get_updated_package_requirements(package_requirements, is_row_processor): + requirements = [f"cloudpickle=={cloudpickle.__version__}"] + if is_row_processor: + # bigframes remote function will send an entire row of data as json, + # which would be converted to a pandas series and processed + # Ensure numpy versions match to avoid unpickling problems. See + # internal issue b/347934471. + requirements.append(f"numpy=={numpy.__version__}") + requirements.append(f"pandas=={pandas.__version__}") + requirements.append(f"pyarrow=={pyarrow.__version__}") + + if package_requirements: + requirements.extend(package_requirements) + + requirements = sorted(requirements) + return requirements + + def routine_ref_to_string_for_query(routine_ref: bigquery.RoutineReference) -> str: return f"`{routine_ref.project}.{routine_ref.dataset_id}`.{routine_ref.routine_id}" @@ -112,13 +130,22 @@ class IbisSignature(NamedTuple): output_type: IbisDataType -def get_cloud_function_name(def_, uniq_suffix=None, package_requirements=None): +def get_cloud_function_name( + def_, uniq_suffix=None, package_requirements=None, is_row_processor=False +): "Get a name for the cloud function for the given user defined function." + + # Augment user package requirements with any internal package + # requirements + package_requirements = _get_updated_package_requirements( + package_requirements, is_row_processor + ) + cf_name = _get_hash(def_, package_requirements) cf_name = f"bigframes-{cf_name}" # for identification if uniq_suffix: cf_name = f"{cf_name}-{uniq_suffix}" - return cf_name + return cf_name, package_requirements def get_remote_function_name(def_, uniq_suffix=None, package_requirements=None): @@ -277,21 +304,10 @@ def generate_cloud_function_code( """ # requirements.txt - requirements = ["cloudpickle >= 2.1.0"] - if is_row_processor: - # bigframes remote function will send an entire row of data as json, - # which would be converted to a pandas series and processed - # Ensure numpy versions match to avoid unpickling problems. See - # internal issue b/347934471. - requirements.append(f"numpy=={numpy.__version__}") - requirements.append(f"pandas=={pandas.__version__}") - requirements.append(f"pyarrow=={pyarrow.__version__}") if package_requirements: - requirements.extend(package_requirements) - requirements = sorted(requirements) - requirements_txt = os.path.join(directory, "requirements.txt") - with open(requirements_txt, "w") as f: - f.write("\n".join(requirements)) + requirements_txt = os.path.join(directory, "requirements.txt") + with open(requirements_txt, "w") as f: + f.write("\n".join(package_requirements)) # main.py entry_point = bigframes.functions.remote_function_template.generate_cloud_function_main_code( @@ -315,6 +331,7 @@ def create_cloud_function( max_instance_count=None, is_row_processor=False, vpc_connector=None, + memory_mib=1024, ): """Create a cloud function from the given user defined function. @@ -394,7 +411,8 @@ def create_cloud_function( self._cloud_function_docker_repository ) function.service_config = functions_v2.ServiceConfig() - function.service_config.available_memory = "1024M" + if memory_mib is not None: + function.service_config.available_memory = f"{memory_mib}Mi" if timeout_seconds is not None: if timeout_seconds > 1200: raise ValueError( @@ -457,6 +475,7 @@ def provision_bq_remote_function( cloud_function_max_instance_count, is_row_processor, cloud_function_vpc_connector, + cloud_function_memory_mib, ): """Provision a BigQuery remote function.""" # If reuse of any existing function with the same name (indicated by the @@ -469,9 +488,10 @@ def provision_bq_remote_function( ) # Derive the name of the cloud function underlying the intended BQ - # remote function - cloud_function_name = get_cloud_function_name( - def_, uniq_suffix, package_requirements + # remote function, also collect updated package requirements as + # determined in the name resolution + cloud_function_name, package_requirements = get_cloud_function_name( + def_, uniq_suffix, package_requirements, is_row_processor ) cf_endpoint = self.get_cloud_function_endpoint(cloud_function_name) @@ -487,6 +507,7 @@ def provision_bq_remote_function( max_instance_count=cloud_function_max_instance_count, is_row_processor=is_row_processor, vpc_connector=cloud_function_vpc_connector, + memory_mib=cloud_function_memory_mib, ) else: logger.info(f"Cloud function {cloud_function_name} already exists.") @@ -650,6 +671,7 @@ def remote_function( cloud_function_timeout: Optional[int] = 600, cloud_function_max_instances: Optional[int] = None, cloud_function_vpc_connector: Optional[str] = None, + cloud_function_memory_mib: Optional[int] = 1024, ): """Decorator to turn a user defined function into a BigQuery remote function. @@ -800,6 +822,15 @@ def remote_function( function. This is useful if your code needs access to data or service(s) that are on a VPC network. See for more details https://cloud.google.com/functions/docs/networking/connecting-vpc. + cloud_function_memory_mib (int, Optional): + The amounts of memory (in mebibytes) to allocate for the cloud + function (2nd gen) created. This also dictates a corresponding + amount of allocated CPU for the function. By default a memory of + 1024 MiB is set for the cloud functions created to support + BigQuery DataFrames remote function. If you want to let the + default memory of cloud functions be allocated, pass `None`. See + for more details + https://cloud.google.com/functions/docs/configuring/memory. """ # Some defaults may be used from the session if not provided otherwise import bigframes.exceptions as bf_exceptions @@ -1010,6 +1041,7 @@ def try_delattr(attr): cloud_function_max_instance_count=cloud_function_max_instances, is_row_processor=is_row_processor, cloud_function_vpc_connector=cloud_function_vpc_connector, + cloud_function_memory_mib=cloud_function_memory_mib, ) # TODO: Move ibis logic to compiler step diff --git a/bigframes/ml/base.py b/bigframes/ml/base.py index 6c81b66e55..70854a36e9 100644 --- a/bigframes/ml/base.py +++ b/bigframes/ml/base.py @@ -184,6 +184,16 @@ class BaseTransformer(BaseEstimator): def __init__(self): self._bqml_model: Optional[core.BqmlModel] = None + @abc.abstractmethod + def _keys(self): + pass + + def __eq__(self, other) -> bool: + return type(self) is type(other) and self._keys() == other._keys() + + def __hash__(self) -> int: + return hash(self._keys()) + _T = TypeVar("_T", bound="BaseTransformer") def to_gbq(self: _T, model_name: str, replace: bool = False) -> _T: diff --git a/bigframes/ml/compose.py b/bigframes/ml/compose.py index abf1a662b9..7f1bfe8d55 100644 --- a/bigframes/ml/compose.py +++ b/bigframes/ml/compose.py @@ -21,7 +21,7 @@ import re import types import typing -from typing import cast, List, Optional, Tuple, Union +from typing import cast, Iterable, List, Optional, Set, Tuple, Union import bigframes_vendored.sklearn.compose._column_transformer from google.cloud import bigquery @@ -40,6 +40,7 @@ "ML.BUCKETIZE": preprocessing.KBinsDiscretizer, "ML.QUANTILE_BUCKETIZE": preprocessing.KBinsDiscretizer, "ML.LABEL_ENCODER": preprocessing.LabelEncoder, + "ML.POLYNOMIAL_EXPAND": preprocessing.PolynomialFeatures, "ML.IMPUTER": impute.SimpleImputer, } ) @@ -56,21 +57,24 @@ class ColumnTransformer( def __init__( self, - transformers: List[ + transformers: Iterable[ Tuple[ str, Union[preprocessing.PreprocessingType, impute.SimpleImputer], - Union[str, List[str]], + Union[str, Iterable[str]], ] ], ): # TODO: if any(transformers) has fitted raise warning - self.transformers = transformers + self.transformers = list(transformers) self._bqml_model: Optional[core.BqmlModel] = None self._bqml_model_factory = globals.bqml_model_factory() # call self.transformers_ to check chained transformers self.transformers_ + def _keys(self): + return (self.transformers, self._bqml_model) + @property def transformers_( self, @@ -107,13 +111,13 @@ def _extract_from_bq_model( """Extract transformers as ColumnTransformer obj from a BQ Model. Keep the _bqml_model field as None.""" assert "transformColumns" in bq_model._properties - transformers: List[ + transformers_set: Set[ Tuple[ str, Union[preprocessing.PreprocessingType, impute.SimpleImputer], Union[str, List[str]], ] - ] = [] + ] = set() def camel_to_snake(name): name = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", name) @@ -134,7 +138,7 @@ def camel_to_snake(name): for prefix in _BQML_TRANSFROM_TYPE_MAPPING: if transform_sql.startswith(prefix): transformer_cls = _BQML_TRANSFROM_TYPE_MAPPING[prefix] - transformers.append( + transformers_set.add( ( camel_to_snake(transformer_cls.__name__), *transformer_cls._parse_from_sql(transform_sql), # type: ignore @@ -148,7 +152,7 @@ def camel_to_snake(name): f"Unsupported transformer type. {constants.FEEDBACK_LINK}" ) - transformer = cls(transformers=transformers) + transformer = cls(transformers=list(transformers_set)) transformer._output_names = output_names return transformer @@ -159,23 +163,37 @@ def _merge( ColumnTransformer, Union[preprocessing.PreprocessingType, impute.SimpleImputer] ]: """Try to merge the column transformer to a simple transformer. Depends on all the columns in bq_model are transformed with the same transformer.""" - transformers = self.transformers_ + transformers = self.transformers assert len(transformers) > 0 _, transformer_0, column_0 = transformers[0] + feature_columns_sorted = sorted( + [ + cast(str, feature_column.name) + for feature_column in bq_model.feature_columns + ] + ) + + if ( + len(transformers) == 1 + and isinstance(transformer_0, preprocessing.PolynomialFeatures) + and sorted(column_0) == feature_columns_sorted + ): + transformer_0._output_names = self._output_names + return transformer_0 + + if not isinstance(column_0, str): + return self columns = [column_0] for _, transformer, column in transformers[1:]: + if not isinstance(column, str): + return self # all transformers are the same if transformer != transformer_0: return self columns.append(column) # all feature columns are transformed - if sorted( - [ - cast(str, feature_column.name) - for feature_column in bq_model.feature_columns - ] - ) == sorted(columns): + if sorted(columns) == feature_columns_sorted: transformer_0._output_names = self._output_names return transformer_0 @@ -197,12 +215,12 @@ def _compile_to_sql( Returns: a list of tuples of (sql_expression, output_name)""" - return [ - transformer._compile_to_sql([column], X=X)[0] - for column in columns - for _, transformer, target_column in self.transformers_ - if column == target_column - ] + result = [] + for _, transformer, target_columns in self.transformers: + if isinstance(target_columns, str): + target_columns = [target_columns] + result += transformer._compile_to_sql(target_columns, X=X) + return result def fit( self, diff --git a/bigframes/ml/core.py b/bigframes/ml/core.py index 168bc584f7..ee4d8a8c27 100644 --- a/bigframes/ml/core.py +++ b/bigframes/ml/core.py @@ -112,6 +112,15 @@ def __init__(self, session: bigframes.Session, model: bigquery.Model): self.model_name ) + def _keys(self): + return (self._session, self._model) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self._keys() == other._keys() + + def __hash__(self): + return hash(self._keys()) + @property def session(self) -> bigframes.Session: """Get the BigQuery DataFrames session that this BQML model wrapper is tied to""" diff --git a/bigframes/ml/impute.py b/bigframes/ml/impute.py index d21fcbb1ad..ae71637aa5 100644 --- a/bigframes/ml/impute.py +++ b/bigframes/ml/impute.py @@ -18,7 +18,7 @@ from __future__ import annotations import typing -from typing import Any, List, Literal, Optional, Tuple, Union +from typing import Iterable, List, Literal, Optional, Tuple, Union import bigframes_vendored.sklearn.impute._base @@ -44,17 +44,12 @@ def __init__( self._bqml_model_factory = globals.bqml_model_factory() self._base_sql_generator = globals.base_sql_generator() - # TODO(garrettwu): implement __hash__ - def __eq__(self, other: Any) -> bool: - return ( - type(other) is SimpleImputer - and self.strategy == other.strategy - and self._bqml_model == other._bqml_model - ) + def _keys(self): + return (self._bqml_model, self.strategy) def _compile_to_sql( self, - columns: List[str], + columns: Iterable[str], X=None, ) -> List[Tuple[str, str]]: """Compile this transformer to a list of SQL expressions that can be included in diff --git a/bigframes/ml/pipeline.py b/bigframes/ml/pipeline.py index 03e5688453..04b8d73cf5 100644 --- a/bigframes/ml/pipeline.py +++ b/bigframes/ml/pipeline.py @@ -64,6 +64,7 @@ def __init__(self, steps: List[Tuple[str, base.BaseEstimator]]): preprocessing.MinMaxScaler, preprocessing.KBinsDiscretizer, preprocessing.LabelEncoder, + preprocessing.PolynomialFeatures, impute.SimpleImputer, ), ): diff --git a/bigframes/ml/preprocessing.py b/bigframes/ml/preprocessing.py index f3621d3a33..07fdc171cf 100644 --- a/bigframes/ml/preprocessing.py +++ b/bigframes/ml/preprocessing.py @@ -18,7 +18,7 @@ from __future__ import annotations import typing -from typing import Any, cast, List, Literal, Optional, Tuple, Union +from typing import cast, Iterable, List, Literal, Optional, Tuple, Union import bigframes_vendored.sklearn.preprocessing._data import bigframes_vendored.sklearn.preprocessing._discretization @@ -43,11 +43,10 @@ def __init__(self): self._bqml_model_factory = globals.bqml_model_factory() self._base_sql_generator = globals.base_sql_generator() - # TODO(garrettwu): implement __hash__ - def __eq__(self, other: Any) -> bool: - return type(other) is StandardScaler and self._bqml_model == other._bqml_model + def _keys(self): + return (self._bqml_model,) - def _compile_to_sql(self, columns: List[str], X=None) -> List[Tuple[str, str]]: + def _compile_to_sql(self, columns: Iterable[str], X=None) -> List[Tuple[str, str]]: """Compile this transformer to a list of SQL expressions that can be included in a BQML TRANSFORM clause @@ -125,11 +124,10 @@ def __init__(self): self._bqml_model_factory = globals.bqml_model_factory() self._base_sql_generator = globals.base_sql_generator() - # TODO(garrettwu): implement __hash__ - def __eq__(self, other: Any) -> bool: - return type(other) is MaxAbsScaler and self._bqml_model == other._bqml_model + def _keys(self): + return (self._bqml_model,) - def _compile_to_sql(self, columns: List[str], X=None) -> List[Tuple[str, str]]: + def _compile_to_sql(self, columns: Iterable[str], X=None) -> List[Tuple[str, str]]: """Compile this transformer to a list of SQL expressions that can be included in a BQML TRANSFORM clause @@ -207,11 +205,10 @@ def __init__(self): self._bqml_model_factory = globals.bqml_model_factory() self._base_sql_generator = globals.base_sql_generator() - # TODO(garrettwu): implement __hash__ - def __eq__(self, other: Any) -> bool: - return type(other) is MinMaxScaler and self._bqml_model == other._bqml_model + def _keys(self): + return (self._bqml_model,) - def _compile_to_sql(self, columns: List[str], X=None) -> List[Tuple[str, str]]: + def _compile_to_sql(self, columns: Iterable[str], X=None) -> List[Tuple[str, str]]: """Compile this transformer to a list of SQL expressions that can be included in a BQML TRANSFORM clause @@ -301,18 +298,12 @@ def __init__( self._bqml_model_factory = globals.bqml_model_factory() self._base_sql_generator = globals.base_sql_generator() - # TODO(garrettwu): implement __hash__ - def __eq__(self, other: Any) -> bool: - return ( - type(other) is KBinsDiscretizer - and self.n_bins == other.n_bins - and self.strategy == other.strategy - and self._bqml_model == other._bqml_model - ) + def _keys(self): + return (self._bqml_model, self.n_bins, self.strategy) def _compile_to_sql( self, - columns: List[str], + columns: Iterable[str], X: bpd.DataFrame, ) -> List[Tuple[str, str]]: """Compile this transformer to a list of SQL expressions that can be included in @@ -446,17 +437,10 @@ def __init__( self._bqml_model_factory = globals.bqml_model_factory() self._base_sql_generator = globals.base_sql_generator() - # TODO(garrettwu): implement __hash__ - def __eq__(self, other: Any) -> bool: - return ( - type(other) is OneHotEncoder - and self._bqml_model == other._bqml_model - and self.drop == other.drop - and self.min_frequency == other.min_frequency - and self.max_categories == other.max_categories - ) + def _keys(self): + return (self._bqml_model, self.drop, self.min_frequency, self.max_categories) - def _compile_to_sql(self, columns: List[str], X=None) -> List[Tuple[str, str]]: + def _compile_to_sql(self, columns: Iterable[str], X=None) -> List[Tuple[str, str]]: """Compile this transformer to a list of SQL expressions that can be included in a BQML TRANSFORM clause @@ -572,16 +556,10 @@ def __init__( self._bqml_model_factory = globals.bqml_model_factory() self._base_sql_generator = globals.base_sql_generator() - # TODO(garrettwu): implement __hash__ - def __eq__(self, other: Any) -> bool: - return ( - type(other) is LabelEncoder - and self._bqml_model == other._bqml_model - and self.min_frequency == other.min_frequency - and self.max_categories == other.max_categories - ) + def _keys(self): + return (self._bqml_model, self.min_frequency, self.max_categories) - def _compile_to_sql(self, columns: List[str], X=None) -> List[Tuple[str, str]]: + def _compile_to_sql(self, columns: Iterable[str], X=None) -> List[Tuple[str, str]]: """Compile this transformer to a list of SQL expressions that can be included in a BQML TRANSFORM clause @@ -672,18 +650,17 @@ class PolynomialFeatures( ) def __init__(self, degree: int = 2): + if degree not in range(1, 5): + raise ValueError(f"degree has to be [1, 4], input is {degree}.") self.degree = degree self._bqml_model: Optional[core.BqmlModel] = None self._bqml_model_factory = globals.bqml_model_factory() self._base_sql_generator = globals.base_sql_generator() - # TODO(garrettwu): implement __hash__ - def __eq__(self, other: Any) -> bool: - return ( - type(other) is PolynomialFeatures and self._bqml_model == other._bqml_model - ) + def _keys(self): + return (self._bqml_model, self.degree) - def _compile_to_sql(self, columns: List[str], X=None) -> List[Tuple[str, str]]: + def _compile_to_sql(self, columns: Iterable[str], X=None) -> List[Tuple[str, str]]: """Compile this transformer to a list of SQL expressions that can be included in a BQML TRANSFORM clause @@ -705,17 +682,18 @@ def _compile_to_sql(self, columns: List[str], X=None) -> List[Tuple[str, str]]: ] @classmethod - def _parse_from_sql(cls, sql: str) -> tuple[PolynomialFeatures, str]: - """Parse SQL to tuple(PolynomialFeatures, column_label). + def _parse_from_sql(cls, sql: str) -> tuple[PolynomialFeatures, tuple[str, ...]]: + """Parse SQL to tuple(PolynomialFeatures, column_labels). Args: sql: SQL string of format "ML.POLYNOMIAL_EXPAND(STRUCT(col_label0, col_label1, ...), degree)" Returns: tuple(MaxAbsScaler, column_label)""" - col_label = sql[sql.find("STRUCT(") + 7 : sql.find(")")] + col_labels = sql[sql.find("STRUCT(") + 7 : sql.find(")")].split(",") + col_labels = [label.strip() for label in col_labels] degree = int(sql[sql.rfind(",") + 1 : sql.rfind(")")]) - return cls(degree), col_label + return cls(degree), tuple(col_labels) def fit( self, @@ -762,8 +740,6 @@ def transform(self, X: Union[bpd.DataFrame, bpd.Series]) -> bpd.DataFrame: df[self._output_names], ) - # TODO(garrettwu): to_gbq() - PreprocessingType = Union[ OneHotEncoder, @@ -772,4 +748,5 @@ def transform(self, X: Union[bpd.DataFrame, bpd.Series]) -> bpd.DataFrame: MinMaxScaler, KBinsDiscretizer, LabelEncoder, + PolynomialFeatures, ] diff --git a/bigframes/operations/__init__.py b/bigframes/operations/__init__.py index c10b743631..145c415ca0 100644 --- a/bigframes/operations/__init__.py +++ b/bigframes/operations/__init__.py @@ -707,6 +707,30 @@ def output_type(self, *input_types): strconcat_op = StrConcatOp() +## JSON Ops +@dataclasses.dataclass(frozen=True) +class JSONSet(BinaryOp): + name: typing.ClassVar[str] = "json_set" + json_path: str + + def output_type(self, *input_types): + left_type = input_types[0] + right_type = input_types[1] + if not dtypes.is_json_like(left_type): + raise TypeError( + "Input type must be an valid JSON object or JSON-formatted string type." + + f" Received type: {left_type}" + ) + if not dtypes.is_json_encoding_type(right_type): + raise TypeError( + "The value to be assigned must be a type that can be encoded as JSON." + + f"Received type: {right_type}" + ) + + # After JSON type implementation, ONLY return JSON data. + return left_type + + # Ternary Ops @dataclasses.dataclass(frozen=True) class WhereOp(TernaryOp): diff --git a/bigframes/operations/aggregations.py b/bigframes/operations/aggregations.py index 783abfd788..675ead1188 100644 --- a/bigframes/operations/aggregations.py +++ b/bigframes/operations/aggregations.py @@ -42,6 +42,15 @@ def uses_total_row_ordering(self): def can_order_by(self): return False + @property + def order_independent(self): + """ + True if the output of the operator does not depend on the ordering of input rows. + + Navigation functions are a notable case that are not order independent. + """ + return False + @abc.abstractmethod def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionType: ... @@ -78,6 +87,15 @@ def name(self) -> str: def arguments(self) -> int: ... + @property + def order_independent(self): + """ + True if results don't depend on the order of the input. + + Almost all aggregation functions are order independent, excepting ``array_agg`` and ``string_agg``. + """ + return not self.can_order_by + @dataclasses.dataclass(frozen=True) class NullaryAggregateOp(AggregateOp, NullaryWindowOp): @@ -294,6 +312,10 @@ def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionT ) return pd.ArrowDtype(pa_type) + @property + def order_independent(self): + return True + @dataclasses.dataclass(frozen=True) class QcutOp(UnaryWindowOp): @@ -312,6 +334,10 @@ def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionT dtypes.is_orderable, dtypes.INT_DTYPE, "orderable" ).output_type(input_types[0]) + @property + def order_independent(self): + return True + @dataclasses.dataclass(frozen=True) class NuniqueOp(UnaryAggregateOp): @@ -349,6 +375,10 @@ def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionT dtypes.is_orderable, dtypes.INT_DTYPE, "orderable" ).output_type(input_types[0]) + @property + def order_independent(self): + return True + @dataclasses.dataclass(frozen=True) class DenseRankOp(UnaryWindowOp): @@ -361,6 +391,10 @@ def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionT dtypes.is_orderable, dtypes.INT_DTYPE, "orderable" ).output_type(input_types[0]) + @property + def order_independent(self): + return True + @dataclasses.dataclass(frozen=True) class FirstOp(UnaryWindowOp): @@ -487,7 +521,9 @@ def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionT # TODO: Alternative names and lookup from numpy function objects -_AGGREGATIONS_LOOKUP: dict[str, UnaryAggregateOp] = { +_AGGREGATIONS_LOOKUP: typing.Dict[ + str, typing.Union[UnaryAggregateOp, NullaryAggregateOp] +] = { op.name: op for op in [ sum_op, @@ -506,10 +542,14 @@ def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionT ApproxQuartilesOp(2), ApproxQuartilesOp(3), ] + + [ + # Add size_op separately to avoid Mypy type inference errors. + size_op, + ] } -def lookup_agg_func(key: str) -> UnaryAggregateOp: +def lookup_agg_func(key: str) -> typing.Union[UnaryAggregateOp, NullaryAggregateOp]: if callable(key): raise NotImplementedError( "Aggregating with callable object not supported, pass method name as string instead (eg. 'sum' instead of np.sum)." diff --git a/bigframes/pandas/__init__.py b/bigframes/pandas/__init__.py index f6f9aec800..faba0f3aa3 100644 --- a/bigframes/pandas/__init__.py +++ b/bigframes/pandas/__init__.py @@ -666,6 +666,7 @@ def remote_function( cloud_function_timeout: Optional[int] = 600, cloud_function_max_instances: Optional[int] = None, cloud_function_vpc_connector: Optional[str] = None, + cloud_function_memory_mib: Optional[int] = 1024, ): return global_session.with_default_session( bigframes.session.Session.remote_function, @@ -683,6 +684,7 @@ def remote_function( cloud_function_timeout=cloud_function_timeout, cloud_function_max_instances=cloud_function_max_instances, cloud_function_vpc_connector=cloud_function_vpc_connector, + cloud_function_memory_mib=cloud_function_memory_mib, ) diff --git a/bigframes/series.py b/bigframes/series.py index eda95fa1e8..c325783e96 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -43,6 +43,7 @@ import bigframes.core.ordering as order import bigframes.core.scalar as scalars import bigframes.core.utils as utils +import bigframes.core.validations as validations import bigframes.core.window import bigframes.core.window_spec import bigframes.dataframe @@ -92,10 +93,12 @@ def loc(self) -> bigframes.core.indexers.LocSeriesIndexer: return bigframes.core.indexers.LocSeriesIndexer(self) @property + @validations.requires_strict_ordering() def iloc(self) -> bigframes.core.indexers.IlocSeriesIndexer: return bigframes.core.indexers.IlocSeriesIndexer(self) @property + @validations.requires_strict_ordering() def iat(self) -> bigframes.core.indexers.IatSeriesIndexer: return bigframes.core.indexers.IatSeriesIndexer(self) @@ -160,6 +163,7 @@ def struct(self) -> structs.StructAccessor: return structs.StructAccessor(self._block) @property + @validations.requires_strict_ordering() def T(self) -> Series: return self.transpose() @@ -171,6 +175,7 @@ def _info_axis(self) -> indexes.Index: def _session(self) -> bigframes.Session: return self._get_block().expr.session + @validations.requires_strict_ordering() def transpose(self) -> Series: return self @@ -266,6 +271,7 @@ def equals( return False return block_ops.equals(self._block, other._block) + @validations.requires_strict_ordering() def reset_index( self, *, @@ -454,11 +460,13 @@ def case_when(self, caselist) -> Series: ignore_self=True, ) + @validations.requires_strict_ordering() def cumsum(self) -> Series: return self._apply_window_op( agg_ops.sum_op, bigframes.core.window_spec.cumulative_rows() ) + @validations.requires_strict_ordering() def ffill(self, *, limit: typing.Optional[int] = None) -> Series: window = bigframes.core.window_spec.rows(preceding=limit, following=0) return self._apply_window_op(agg_ops.LastNonNullOp(), window) @@ -466,25 +474,30 @@ def ffill(self, *, limit: typing.Optional[int] = None) -> Series: pad = ffill pad.__doc__ = inspect.getdoc(vendored_pandas_series.Series.ffill) + @validations.requires_strict_ordering() def bfill(self, *, limit: typing.Optional[int] = None) -> Series: window = bigframes.core.window_spec.rows(preceding=0, following=limit) return self._apply_window_op(agg_ops.FirstNonNullOp(), window) + @validations.requires_strict_ordering() def cummax(self) -> Series: return self._apply_window_op( agg_ops.max_op, bigframes.core.window_spec.cumulative_rows() ) + @validations.requires_strict_ordering() def cummin(self) -> Series: return self._apply_window_op( agg_ops.min_op, bigframes.core.window_spec.cumulative_rows() ) + @validations.requires_strict_ordering() def cumprod(self) -> Series: return self._apply_window_op( agg_ops.product_op, bigframes.core.window_spec.cumulative_rows() ) + @validations.requires_strict_ordering() def shift(self, periods: int = 1) -> Series: window = bigframes.core.window_spec.rows( preceding=periods if periods > 0 else None, @@ -492,6 +505,7 @@ def shift(self, periods: int = 1) -> Series: ) return self._apply_window_op(agg_ops.ShiftOp(periods), window) + @validations.requires_strict_ordering() def diff(self, periods: int = 1) -> Series: window = bigframes.core.window_spec.rows( preceding=periods if periods > 0 else None, @@ -499,11 +513,13 @@ def diff(self, periods: int = 1) -> Series: ) return self._apply_window_op(agg_ops.DiffOp(periods), window) + @validations.requires_strict_ordering() def pct_change(self, periods: int = 1) -> Series: # Future versions of pandas will not perfrom ffill automatically series = self.ffill() return Series(block_ops.pct_change(series._block, periods=periods)) + @validations.requires_strict_ordering() def rank( self, axis=0, @@ -595,6 +611,7 @@ def _mapping_replace(self, mapping: dict[typing.Hashable, typing.Hashable]): ) return Series(block.select_column(result)) + @validations.requires_strict_ordering() @requires_index def interpolate(self, method: str = "linear") -> Series: if method == "pad": @@ -617,15 +634,53 @@ def dropna( result = result.reset_index() return Series(result) + @validations.requires_strict_ordering() def head(self, n: int = 5) -> Series: return typing.cast(Series, self.iloc[0:n]) + @validations.requires_strict_ordering() def tail(self, n: int = 5) -> Series: return typing.cast(Series, self.iloc[-n:]) + def peek(self, n: int = 5, *, force: bool = True) -> pandas.DataFrame: + """ + Preview n arbitrary elements from the series without guarantees about row selection or ordering. + + ``Series.peek(force=False)`` will always be very fast, but will not succeed if data requires + full data scanning. Using ``force=True`` will always succeed, but may be perform queries. + Query results will be cached so that future steps will benefit from these queries. + + Args: + n (int, default 5): + The number of rows to select from the series. Which N rows are returned is non-deterministic. + force (bool, default True): + If the data cannot be peeked efficiently, the series will instead be fully materialized as part + of the operation if ``force=True``. If ``force=False``, the operation will throw a ValueError. + Returns: + pandas.Series: A pandas Series with n rows. + + Raises: + ValueError: If force=False and data cannot be efficiently peeked. + """ + maybe_result = self._block.try_peek(n) + if maybe_result is None: + if force: + self._cached() + maybe_result = self._block.try_peek(n, force=True) + assert maybe_result is not None + else: + raise ValueError( + "Cannot peek efficiently when data has aggregates, joins or window functions applied. Use force=True to fully compute dataframe." + ) + as_series = maybe_result.squeeze(axis=1) + as_series.name = self.name + return as_series + def nlargest(self, n: int = 5, keep: str = "first") -> Series: if keep not in ("first", "last", "all"): raise ValueError("'keep must be one of 'first', 'last', or 'all'") + if keep != "all": + validations.enforce_ordered(self, "nlargest(keep != 'all')") return Series( block_ops.nlargest(self._block, n, [self._value_column], keep=keep) ) @@ -633,6 +688,8 @@ def nlargest(self, n: int = 5, keep: str = "first") -> Series: def nsmallest(self, n: int = 5, keep: str = "first") -> Series: if keep not in ("first", "last", "all"): raise ValueError("'keep must be one of 'first', 'last', or 'all'") + if keep != "all": + validations.enforce_ordered(self, "nsmallest(keep != 'all')") return Series( block_ops.nsmallest(self._block, n, [self._value_column], keep=keep) ) @@ -934,7 +991,6 @@ def agg(self, func: str | typing.Sequence[str]) -> scalars.Scalar | Series: ) ) else: - return self._apply_aggregation( agg_ops.lookup_agg_func(typing.cast(str, func)) ) @@ -1083,6 +1139,7 @@ def clip(self, lower, upper): ) return Series(block.select_column(result_id).with_column_labels([self.name])) + @validations.requires_strict_ordering() def argmax(self) -> int: block, row_nums = self._block.promote_offsets() block = block.order_by( @@ -1095,6 +1152,7 @@ def argmax(self) -> int: scalars.Scalar, Series(block.select_column(row_nums)).iloc[0] ) + @validations.requires_strict_ordering() def argmin(self) -> int: block, row_nums = self._block.promote_offsets() block = block.order_by( @@ -1160,12 +1218,14 @@ def idxmin(self) -> blocks.Label: return indexes.Index(block).to_pandas()[0] @property + @validations.requires_strict_ordering() def is_monotonic_increasing(self) -> bool: return typing.cast( bool, self._block.is_monotonic_increasing(self._value_column) ) @property + @validations.requires_strict_ordering() def is_monotonic_decreasing(self) -> bool: return typing.cast( bool, self._block.is_monotonic_decreasing(self._value_column) @@ -1212,7 +1272,9 @@ def _align3(self, other1: Series | scalars.Scalar, other2: Series | scalars.Scal values, index = self._align_n([other1, other2], how) return (values[0], values[1], values[2], index) - def _apply_aggregation(self, op: agg_ops.UnaryAggregateOp) -> Any: + def _apply_aggregation( + self, op: agg_ops.UnaryAggregateOp | agg_ops.NullaryAggregateOp + ) -> Any: return self._block.get_stat(self._value_column, op) def _apply_window_op( @@ -1271,6 +1333,7 @@ def sort_index(self, *, axis=0, ascending=True, na_position="last") -> Series: block = block.order_by(ordering) return Series(block) + @validations.requires_strict_ordering() def rolling(self, window: int, min_periods=None) -> bigframes.core.window.Window: # To get n size window, need current row and n-1 preceding rows. window_spec = bigframes.core.window_spec.rows( @@ -1280,6 +1343,7 @@ def rolling(self, window: int, min_periods=None) -> bigframes.core.window.Window self._block, window_spec, self._block.value_columns, is_series=True ) + @validations.requires_strict_ordering() def expanding(self, min_periods: int = 1) -> bigframes.core.window.Window: window_spec = bigframes.core.window_spec.cumulative_rows( min_periods=min_periods @@ -1419,7 +1483,7 @@ def apply( # return Series with materialized result so that any error in the remote # function is caught early - materialized_series = result_series._cached() + materialized_series = result_series._cached(session_aware=False) return materialized_series def combine( @@ -1544,13 +1608,18 @@ def reindex_like(self, other: Series, *, validate: typing.Optional[bool] = None) return self.reindex(other.index, validate=validate) def drop_duplicates(self, *, keep: str = "first") -> Series: + if keep is not False: + validations.enforce_ordered(self, "drop_duplicates(keep != False)") block = block_ops.drop_duplicates(self._block, (self._value_column,), keep) return Series(block) + @validations.requires_strict_ordering() def unique(self) -> Series: return self.drop_duplicates() def duplicated(self, keep: str = "first") -> Series: + if keep is not False: + validations.enforce_ordered(self, "duplicated(keep != False)") block, indicator = block_ops.indicate_duplicates( self._block, (self._value_column,), keep ) @@ -1716,6 +1785,7 @@ def map( result_df = self_df.join(map_df, on="series") return result_df[self.name] + @validations.requires_strict_ordering() def sample( self, n: Optional[int] = None, @@ -1794,10 +1864,11 @@ def cache(self): Returns: Series: Self """ - return self._cached(force=True) + # Do not use session-aware cashing if user-requested + return self._cached(force=True, session_aware=False) - def _cached(self, *, force: bool = True) -> Series: - self._block.cached(force=force) + def _cached(self, *, force: bool = True, session_aware: bool = True) -> Series: + self._block.cached(force=force, session_aware=session_aware) return self def _optimize_query_complexity(self): diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index b0b2a3c418..867bdedf1c 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -16,7 +16,6 @@ from __future__ import annotations -import collections.abc import copy import datetime import itertools @@ -84,6 +83,7 @@ import bigframes.core.guid import bigframes.core.nodes as nodes import bigframes.core.ordering as order +import bigframes.core.pruning import bigframes.core.schema as schemata import bigframes.core.tree_properties as traversals import bigframes.core.tree_properties as tree_properties @@ -100,6 +100,7 @@ import bigframes.session._io.bigquery as bf_io_bigquery import bigframes.session._io.bigquery.read_gbq_table as bf_read_gbq_table import bigframes.session.clients +import bigframes.session.planner import bigframes.version # Avoid circular imports. @@ -342,13 +343,15 @@ def session_id(self): @property def objects( self, - ) -> collections.abc.Set[ + ) -> Iterable[ Union[ bigframes.core.indexes.Index, bigframes.series.Series, dataframe.DataFrame ] ]: + still_alive = [i for i in self._objects if i() is not None] + self._objects = still_alive # Create a set with strong references, be careful not to hold onto this needlessly, as will prevent garbage collection. - return set(i() for i in self._objects if i() is not None) # type: ignore + return tuple(i() for i in self._objects if i() is not None) # type: ignore @property def _project(self): @@ -1534,12 +1537,15 @@ def remote_function( cloud_function_timeout: Optional[int] = 600, cloud_function_max_instances: Optional[int] = None, cloud_function_vpc_connector: Optional[str] = None, + cloud_function_memory_mib: Optional[int] = 1024, ): """Decorator to turn a user defined function into a BigQuery remote function. Check out the code samples at: https://cloud.google.com/bigquery/docs/remote-functions#bigquery-dataframes. .. note:: - ``input_types=Series`` scenario is in preview. + ``input_types=Series`` scenario is in preview. It currently only + supports dataframe with column types ``Int64``/``Float64``/``boolean``/ + ``string``/``binary[pyarrow]``. .. note:: Please make sure following is setup before using this API: @@ -1665,6 +1671,15 @@ def remote_function( function. This is useful if your code needs access to data or service(s) that are on a VPC network. See for more details https://cloud.google.com/functions/docs/networking/connecting-vpc. + cloud_function_memory_mib (int, Optional): + The amounts of memory (in mebibytes) to allocate for the cloud + function (2nd gen) created. This also dictates a corresponding + amount of allocated CPU for the function. By default a memory of + 1024 MiB is set for the cloud functions created to support + BigQuery DataFrames remote function. If you want to let the + default memory of cloud functions be allocated, pass `None`. See + for more details + https://cloud.google.com/functions/docs/configuring/memory. Returns: callable: A remote function object pointing to the cloud assets created in the background to support the remote execution. The cloud assets can be @@ -1690,6 +1705,7 @@ def remote_function( cloud_function_timeout=cloud_function_timeout, cloud_function_max_instances=cloud_function_max_instances, cloud_function_vpc_connector=cloud_function_vpc_connector, + cloud_function_memory_mib=cloud_function_memory_mib, ) def read_gbq_function( @@ -1817,14 +1833,22 @@ def _start_query( Starts BigQuery query job and waits for results. """ job_config = self._prepare_query_job_config(job_config) - return bigframes.session._io.bigquery.start_query_with_client( - self, - sql, - job_config, - max_results, - timeout, - api_name=api_name, - ) + try: + return bigframes.session._io.bigquery.start_query_with_client( + self, + sql, + job_config, + max_results, + timeout, + api_name=api_name, + ) + except google.api_core.exceptions.BadRequest as e: + # Unfortunately, this error type does not have a separate error code or exception type + if "Resources exceeded during query execution" in e.message: + new_message = "Computation is too complex to execute as a single query. Try using DataFrame.cache() on intermediate results, or setting bigframes.options.compute.enable_multi_query_execution." + raise bigframes.exceptions.QueryComplexityError(new_message) from e + else: + raise def _start_query_ml_ddl( self, @@ -1874,21 +1898,34 @@ def _cache_with_offsets(self, array_value: core.ArrayValue): raise ValueError( "Caching with offsets only supported in strictly ordered mode." ) + offset_column = bigframes.core.guid.generate_guid("bigframes_offsets") sql = bigframes.core.compile.compile_unordered( self._with_cached_executions( - array_value.promote_offsets("bigframes_offsets").node + array_value.promote_offsets(offset_column).node ) ) tmp_table = self._sql_to_temp_table( - sql, cluster_cols=["bigframes_offsets"], api_name="cached" + sql, cluster_cols=[offset_column], api_name="cached" ) cached_replacement = array_value.as_cached( cache_table=self.bqclient.get_table(tmp_table), - ordering=order.ExpressionOrdering.from_offset_col("bigframes_offsets"), + ordering=order.ExpressionOrdering.from_offset_col(offset_column), ).node self._cached_executions[array_value.node] = cached_replacement + def _cache_with_session_awareness(self, array_value: core.ArrayValue) -> None: + # this is the occurence count across the whole session + forest = [obj._block.expr.node for obj in self.objects] + # These node types are cheap to re-compute + target, cluster_cols = bigframes.session.planner.session_aware_cache_plan( + array_value.node, forest + ) + if len(cluster_cols) > 0: + self._cache_with_cluster_cols(core.ArrayValue(target), cluster_cols) + else: + self._cache_with_offsets(core.ArrayValue(target)) + def _simplify_with_caching(self, array_value: core.ArrayValue): """Attempts to handle the complexity by caching duplicated subtrees and breaking the query into pieces.""" # Apply existing caching first diff --git a/bigframes/session/planner.py b/bigframes/session/planner.py new file mode 100644 index 0000000000..2a74521b43 --- /dev/null +++ b/bigframes/session/planner.py @@ -0,0 +1,74 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import itertools +from typing import Sequence, Tuple + +import bigframes.core.expression as ex +import bigframes.core.nodes as nodes +import bigframes.core.pruning as predicate_pruning +import bigframes.core.tree_properties as traversals + + +def session_aware_cache_plan( + root: nodes.BigFrameNode, session_forest: Sequence[nodes.BigFrameNode] +) -> Tuple[nodes.BigFrameNode, list[str]]: + """ + Determines the best node to cache given a target and a list of object roots for objects in a session. + + Returns the node to cache, and optionally a clustering column. + """ + node_counts = traversals.count_nodes(session_forest) + # These node types are cheap to re-compute, so it makes more sense to cache their children. + de_cachable_types = (nodes.FilterNode, nodes.ProjectionNode) + caching_target = cur_node = root + caching_target_refs = node_counts.get(caching_target, 0) + + filters: list[ + ex.Expression + ] = [] # accumulate filters into this as traverse downwards + clusterable_cols: set[str] = set() + while isinstance(cur_node, de_cachable_types): + if isinstance(cur_node, nodes.FilterNode): + # Filter node doesn't define any variables, so no need to chain expressions + filters.append(cur_node.predicate) + elif isinstance(cur_node, nodes.ProjectionNode): + # Projection defines the variables that are used in the filter expressions, need to substitute variables with their scalar expressions + # that instead reference variables in the child node. + bindings = {name: expr for expr, name in cur_node.assignments} + filters = [i.bind_all_variables(bindings) for i in filters] + else: + raise ValueError(f"Unexpected de-cached node: {cur_node}") + + cur_node = cur_node.child + cur_node_refs = node_counts.get(cur_node, 0) + if cur_node_refs > caching_target_refs: + caching_target, caching_target_refs = cur_node, cur_node_refs + schema = cur_node.schema + # Cluster cols only consider the target object and not other sesssion objects + clusterable_cols = set( + itertools.chain.from_iterable( + map( + lambda f: predicate_pruning.cluster_cols_for_predicate( + f, schema + ), + filters, + ) + ) + ) + # BQ supports up to 4 cluster columns, just prioritize by alphabetical ordering + # TODO: Prioritize caching columns by estimated filter selectivity + return caching_target, sorted(list(clusterable_cols))[:4] diff --git a/bigframes/streaming/__init__.py b/bigframes/streaming/__init__.py index 16da677ef5..0b6fd18561 100644 --- a/bigframes/streaming/__init__.py +++ b/bigframes/streaming/__init__.py @@ -16,6 +16,7 @@ import json from typing import Optional +import warnings from google.cloud import bigquery @@ -24,9 +25,11 @@ def to_bigtable( query: str, + *, instance: str, table: str, - bq_client: Optional[bigquery.Client] = None, + service_account_email: Optional[str] = None, + session: Optional[bigframes.Session] = None, app_profile: Optional[str] = None, truncate: bool = False, overwrite: bool = False, @@ -53,10 +56,15 @@ def to_bigtable( The name of the bigtable instance to export to. table (str): The name of the bigtable table to export to. - bq_client (str, default None): - The Client object to use for the query. This determines + service_account_email (str): + Full name of the service account to run the continuous query. + Example: accountname@projectname.gserviceaccounts.com + If not provided, the user account will be used, but this + limits the lifetime of the continuous query. + session (bigframes.Session, default None): + The session object to use for the query. This determines the project id and location of the query. If None, will - default to the bigframes global session default client. + default to the bigframes global session. app_profile (str, default None): The bigtable app profile to export to. If None, no app profile will be used. @@ -90,9 +98,16 @@ def to_bigtable( For example, the job can be cancelled or its error status can be examined. """ + warnings.warn( + "The bigframes.streaming module is a preview feature, and subject to change.", + stacklevel=1, + category=bigframes.exceptions.PreviewWarning, + ) + # get default client if not passed - if bq_client is None: - bq_client = bigframes.get_global_session().bqclient + if session is None: + session = bigframes.get_global_session() + bq_client = session.bqclient # build export string from parameters project = bq_client.project @@ -123,7 +138,117 @@ def to_bigtable( # override continuous http parameter job_config = bigquery.job.QueryJobConfig() - job_config_filled = job_config.from_api_repr({"query": {"continuous": True}}) + + job_config_dict: dict = {"query": {"continuous": True}} + if service_account_email is not None: + job_config_dict["query"]["connectionProperties"] = { + "key": "service_account", + "value": service_account_email, + } + job_config_filled = job_config.from_api_repr(job_config_dict) + job_config_filled.labels = {"bigframes-api": "streaming_to_bigtable"} + + # begin the query job + query_job = bq_client.query( + sql, + job_config=job_config_filled, # type:ignore + # typing error above is in bq client library + # (should accept abstract job_config, only takes concrete) + job_id=job_id, + job_id_prefix=job_id_prefix, + ) + + # return the query job to the user for lifetime management + return query_job + + +def to_pubsub( + query: str, + *, + topic: str, + service_account_email: str, + session: Optional[bigframes.Session] = None, + job_id: Optional[str] = None, + job_id_prefix: Optional[str] = None, +) -> bigquery.QueryJob: + """Launches a BigQuery continuous query and returns a + QueryJob object for some management functionality. + + This method requires an existing pubsub topic. For instructions + on creating a pubsub topic, see + https://cloud.google.com/pubsub/docs/samples/pubsub-quickstart-create-topic?hl=en + + Note that a service account is a requirement for continuous queries + exporting to pubsub. + + Args: + query (str): + The sql statement to execute as a continuous function. + For example: "SELECT * FROM dataset.table" + This will be wrapped in an EXPORT DATA statement to + launch a continuous query writing to pubsub. + topic (str): + The name of the pubsub topic to export to. + For example: "taxi-rides" + service_account_email (str): + Full name of the service account to run the continuous query. + Example: accountname@projectname.gserviceaccounts.com + session (bigframes.Session, default None): + The session object to use for the query. This determines + the project id and location of the query. If None, will + default to the bigframes global session. + job_id (str, default None): + If specified, replace the default job id for the query, + see job_id parameter of + https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.client.Client#google_cloud_bigquery_client_Client_query + job_id_prefix (str, default None): + If specified, a job id prefix for the query, see + job_id_prefix parameter of + https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.client.Client#google_cloud_bigquery_client_Client_query + + Returns: + google.cloud.bigquery.QueryJob: + See https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.job.QueryJob + The ongoing query job can be managed using this object. + For example, the job can be cancelled or its error status + can be examined. + """ + warnings.warn( + "The bigframes.streaming module is a preview feature, and subject to change.", + stacklevel=1, + category=bigframes.exceptions.PreviewWarning, + ) + + # get default client if not passed + if session is None: + session = bigframes.get_global_session() + bq_client = session.bqclient + + # build export string from parameters + sql = ( + "EXPORT DATA\n" + "OPTIONS (\n" + "format = 'CLOUD_PUBSUB',\n" + f'uri = "https://pubsub.googleapis.com/projects/%7Bbq_client.project%7D/topics/%7Btopic%7D"\n' + ")\n" + "AS (\n" + f"{query});" + ) + + # override continuous http parameter + job_config = bigquery.job.QueryJobConfig() + job_config_filled = job_config.from_api_repr( + { + "query": { + "continuous": True, + "connectionProperties": { + "key": "service_account", + "value": service_account_email, + }, + } + } + ) + job_config_filled.labels = {"bigframes-api": "streaming_to_pubsub"} # begin the query job query_job = bq_client.query( diff --git a/bigframes/version.py b/bigframes/version.py index 014b064071..75f3ffb361 100644 --- a/bigframes/version.py +++ b/bigframes/version.py @@ -12,4 +12,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -__version__ = "1.10.0" +__version__ = "1.11.0" diff --git a/samples/polars/create_polars_df_with_to_arrow_test.py b/samples/polars/create_polars_df_with_to_arrow_test.py new file mode 100644 index 0000000000..acb79f23c8 --- /dev/null +++ b/samples/polars/create_polars_df_with_to_arrow_test.py @@ -0,0 +1,40 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def test_create_polars_df() -> None: + # [START bigquery_dataframes_to_polars] + import polars + + import bigframes.enums + import bigframes.pandas as bpd + + bf_df = bpd.read_gbq_table( + "bigquery-public-data.usa_names.usa_1910_current", + # Setting index_col to either a unique column or NULL will give the + # best performance. + index_col=bigframes.enums.DefaultIndexKind.NULL, + ) + # TODO(developer): Do some analysis using BigQuery DataFrames. + # ... + + # Run the query and download the results as an Arrow table to convert into + # a Polars DataFrame. Use ordered=False if your polars analysis is OK with + # non-deterministic ordering. + arrow_table = bf_df.to_arrow(ordered=False) + polars_df = polars.from_arrow(arrow_table) + # [END bigquery_dataframes_to_polars] + + assert polars_df.shape == bf_df.shape + assert polars_df["number"].sum() == bf_df["number"].sum() diff --git a/samples/polars/noxfile.py b/samples/polars/noxfile.py new file mode 100644 index 0000000000..c36d5f2d81 --- /dev/null +++ b/samples/polars/noxfile.py @@ -0,0 +1,292 @@ +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import glob +import os +from pathlib import Path +import sys +from typing import Callable, Dict, Optional + +import nox + +# WARNING - WARNING - WARNING - WARNING - WARNING +# WARNING - WARNING - WARNING - WARNING - WARNING +# DO NOT EDIT THIS FILE EVER! +# WARNING - WARNING - WARNING - WARNING - WARNING +# WARNING - WARNING - WARNING - WARNING - WARNING + +BLACK_VERSION = "black==22.3.0" +ISORT_VERSION = "isort==5.10.1" + +# Copy `noxfile_config.py` to your directory and modify it instead. + +# `TEST_CONFIG` dict is a configuration hook that allows users to +# modify the test configurations. The values here should be in sync +# with `noxfile_config.py`. Users will copy `noxfile_config.py` into +# their directory and modify it. + +TEST_CONFIG = { + # You can opt out from the test for specific Python versions. + "ignored_versions": [], + # Old samples are opted out of enforcing Python type hints + # All new samples should feature them + "enforce_type_hints": False, + # An envvar key for determining the project id to use. Change it + # to 'BUILD_SPECIFIC_GCLOUD_PROJECT' if you want to opt in using a + # build specific Cloud project. You can also use your own string + # to use your own Cloud project. + "gcloud_project_env": "GOOGLE_CLOUD_PROJECT", + # 'gcloud_project_env': 'BUILD_SPECIFIC_GCLOUD_PROJECT', + # If you need to use a specific version of pip, + # change pip_version_override to the string representation + # of the version number, for example, "20.2.4" + "pip_version_override": None, + # A dictionary you want to inject into your test. Don't put any + # secrets here. These values will override predefined values. + "envs": {}, +} + + +try: + # Ensure we can import noxfile_config in the project's directory. + sys.path.append(".") + from noxfile_config import TEST_CONFIG_OVERRIDE +except ImportError as e: + print("No user noxfile_config found: detail: {}".format(e)) + TEST_CONFIG_OVERRIDE = {} + +# Update the TEST_CONFIG with the user supplied values. +TEST_CONFIG.update(TEST_CONFIG_OVERRIDE) + + +def get_pytest_env_vars() -> Dict[str, str]: + """Returns a dict for pytest invocation.""" + ret = {} + + # Override the GCLOUD_PROJECT and the alias. + env_key = TEST_CONFIG["gcloud_project_env"] + # This should error out if not set. + ret["GOOGLE_CLOUD_PROJECT"] = os.environ[env_key] + + # Apply user supplied envs. + ret.update(TEST_CONFIG["envs"]) + return ret + + +# DO NOT EDIT - automatically generated. +# All versions used to test samples. +ALL_VERSIONS = ["3.7", "3.8", "3.9", "3.10", "3.11", "3.12"] + +# Any default versions that should be ignored. +IGNORED_VERSIONS = TEST_CONFIG["ignored_versions"] + +TESTED_VERSIONS = sorted([v for v in ALL_VERSIONS if v not in IGNORED_VERSIONS]) + +INSTALL_LIBRARY_FROM_SOURCE = os.environ.get("INSTALL_LIBRARY_FROM_SOURCE", False) in ( + "True", + "true", +) + +# Error if a python version is missing +nox.options.error_on_missing_interpreters = True + +# +# Style Checks +# + + +# Linting with flake8. +# +# We ignore the following rules: +# E203: whitespace before ‘:’ +# E266: too many leading ‘#’ for block comment +# E501: line too long +# I202: Additional newline in a section of imports +# +# We also need to specify the rules which are ignored by default: +# ['E226', 'W504', 'E126', 'E123', 'W503', 'E24', 'E704', 'E121'] +FLAKE8_COMMON_ARGS = [ + "--show-source", + "--builtin=gettext", + "--max-complexity=20", + "--exclude=.nox,.cache,env,lib,generated_pb2,*_pb2.py,*_pb2_grpc.py", + "--ignore=E121,E123,E126,E203,E226,E24,E266,E501,E704,W503,W504,I202", + "--max-line-length=88", +] + + +@nox.session +def lint(session: nox.sessions.Session) -> None: + if not TEST_CONFIG["enforce_type_hints"]: + session.install("flake8") + else: + session.install("flake8", "flake8-annotations") + + args = FLAKE8_COMMON_ARGS + [ + ".", + ] + session.run("flake8", *args) + + +# +# Black +# + + +@nox.session +def blacken(session: nox.sessions.Session) -> None: + """Run black. Format code to uniform standard.""" + session.install(BLACK_VERSION) + python_files = [path for path in os.listdir(".") if path.endswith(".py")] + + session.run("black", *python_files) + + +# +# format = isort + black +# + + +@nox.session +def format(session: nox.sessions.Session) -> None: + """ + Run isort to sort imports. Then run black + to format code to uniform standard. + """ + session.install(BLACK_VERSION, ISORT_VERSION) + python_files = [path for path in os.listdir(".") if path.endswith(".py")] + + # Use the --fss option to sort imports using strict alphabetical order. + # See https://pycqa.github.io/isort/docs/configuration/options.html#force-sort-within-sections + session.run("isort", "--fss", *python_files) + session.run("black", *python_files) + + +# +# Sample Tests +# + + +PYTEST_COMMON_ARGS = ["--junitxml=sponge_log.xml"] + + +def _session_tests( + session: nox.sessions.Session, post_install: Callable = None +) -> None: + # check for presence of tests + test_list = glob.glob("**/*_test.py", recursive=True) + glob.glob( + "**/test_*.py", recursive=True + ) + test_list.extend(glob.glob("**/tests", recursive=True)) + + if len(test_list) == 0: + print("No tests found, skipping directory.") + return + + if TEST_CONFIG["pip_version_override"]: + pip_version = TEST_CONFIG["pip_version_override"] + session.install(f"pip=={pip_version}") + """Runs py.test for a particular project.""" + concurrent_args = [] + if os.path.exists("requirements.txt"): + if os.path.exists("constraints.txt"): + session.install("-r", "requirements.txt", "-c", "constraints.txt") + else: + session.install("-r", "requirements.txt") + with open("requirements.txt") as rfile: + packages = rfile.read() + + if os.path.exists("requirements-test.txt"): + if os.path.exists("constraints-test.txt"): + session.install("-r", "requirements-test.txt", "-c", "constraints-test.txt") + else: + session.install("-r", "requirements-test.txt") + with open("requirements-test.txt") as rtfile: + packages += rtfile.read() + + if INSTALL_LIBRARY_FROM_SOURCE: + session.install("-e", _get_repo_root()) + + if post_install: + post_install(session) + + if "pytest-parallel" in packages: + concurrent_args.extend(["--workers", "auto", "--tests-per-worker", "auto"]) + elif "pytest-xdist" in packages: + concurrent_args.extend(["-n", "auto"]) + + session.run( + "pytest", + *(PYTEST_COMMON_ARGS + session.posargs + concurrent_args), + # Pytest will return 5 when no tests are collected. This can happen + # on travis where slow and flaky tests are excluded. + # See http://doc.pytest.org/en/latest/_modules/_pytest/main.html + success_codes=[0, 5], + env=get_pytest_env_vars(), + ) + + +@nox.session(python=ALL_VERSIONS) +def py(session: nox.sessions.Session) -> None: + """Runs py.test for a sample using the specified version of Python.""" + if session.python in TESTED_VERSIONS: + _session_tests(session) + else: + session.skip( + "SKIPPED: {} tests are disabled for this sample.".format(session.python) + ) + + +# +# Readmegen +# + + +def _get_repo_root() -> Optional[str]: + """Returns the root folder of the project.""" + # Get root of this repository. Assume we don't have directories nested deeper than 10 items. + p = Path(os.getcwd()) + for i in range(10): + if p is None: + break + if Path(p / ".git").exists(): + return str(p) + # .git is not available in repos cloned via Cloud Build + # setup.py is always in the library's root, so use that instead + # https://github.com/googleapis/synthtool/issues/792 + if Path(p / "setup.py").exists(): + return str(p) + p = p.parent + raise Exception("Unable to detect repository root.") + + +GENERATED_READMES = sorted([x for x in Path(".").rglob("*.rst.in")]) + + +@nox.session +@nox.parametrize("path", GENERATED_READMES) +def readmegen(session: nox.sessions.Session, path: str) -> None: + """(Re-)generates the readme for a sample.""" + session.install("jinja2", "pyyaml") + dir_ = os.path.dirname(path) + + if os.path.exists(os.path.join(dir_, "requirements.txt")): + session.install("-r", os.path.join(dir_, "requirements.txt")) + + in_file = os.path.join(dir_, "README.rst.in") + session.run( + "python", _get_repo_root() + "/scripts/readme-gen/readme_gen.py", in_file + ) diff --git a/samples/polars/noxfile_config.py b/samples/polars/noxfile_config.py new file mode 100644 index 0000000000..91238e9e2f --- /dev/null +++ b/samples/polars/noxfile_config.py @@ -0,0 +1,42 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Default TEST_CONFIG_OVERRIDE for python repos. + +# You can copy this file into your directory, then it will be inported from +# the noxfile.py. + +# The source of truth: +# https://github.com/GoogleCloudPlatform/python-docs-samples/blob/master/noxfile_config.py + +TEST_CONFIG_OVERRIDE = { + # You can opt out from the test for specific Python versions. + "ignored_versions": ["2.7", "3.7", "3.8"], + # Old samples are opted out of enforcing Python type hints + # All new samples should feature them + "enforce_type_hints": True, + # An envvar key for determining the project id to use. Change it + # to 'BUILD_SPECIFIC_GCLOUD_PROJECT' if you want to opt in using a + # build specific Cloud project. You can also use your own string + # to use your own Cloud project. + "gcloud_project_env": "GOOGLE_CLOUD_PROJECT", + # "gcloud_project_env": "BUILD_SPECIFIC_GCLOUD_PROJECT", + # If you need to use a specific version of pip, + # change pip_version_override to the string representation + # of the version number, for example, "20.2.4" + "pip_version_override": None, + # A dictionary you want to inject into your test. Don't put any + # secrets here. These values will override predefined values. + "envs": {}, +} diff --git a/samples/polars/requirements-test.txt b/samples/polars/requirements-test.txt new file mode 100644 index 0000000000..beca2e44d9 --- /dev/null +++ b/samples/polars/requirements-test.txt @@ -0,0 +1,3 @@ +# samples/snippets should be runnable with no "extras" +google-cloud-testutils==1.4.0 +pytest==8.2.0 diff --git a/samples/polars/requirements.txt b/samples/polars/requirements.txt new file mode 100644 index 0000000000..e3f886e7e3 --- /dev/null +++ b/samples/polars/requirements.txt @@ -0,0 +1,3 @@ +bigframes==1.6.0 +polars==0.20.31 +pyarrow==15.0.0 diff --git a/scripts/benchmark/db-benchmark/groupby/G1_1e9_1e2_5_0/q10.py b/scripts/benchmark/db-benchmark/groupby/G1_1e9_1e2_5_0/q10.py new file mode 100644 index 0000000000..83d5d4ee14 --- /dev/null +++ b/scripts/benchmark/db-benchmark/groupby/G1_1e9_1e2_5_0/q10.py @@ -0,0 +1,16 @@ +# Contains code from https://github.com/duckdblabs/db-benchmark/blob/master/pandas/groupby-pandas.py + +import bigframes.pandas as bpd + +print("Groupby benchmark 10: sum v3 count by id1:id6") + +x = bpd.read_gbq("bigframes-dev-perf.dbbenchmark.G1_1e9_1e2_5_0") + +ans = x.groupby( + ["id1", "id2", "id3", "id4", "id5", "id6"], as_index=False, dropna=False +).agg({"v3": "sum", "v1": "size"}) +print(ans.shape) +chk = [ans["v3"].sum(), ans["v1"].sum()] +print(chk) + +bpd.reset_session() diff --git a/scripts/benchmark/db-benchmark/groupby/G1_1e9_1e2_5_0/q8.py b/scripts/benchmark/db-benchmark/groupby/G1_1e9_1e2_5_0/q8.py index 4bbad0048f..7a57d03efe 100644 --- a/scripts/benchmark/db-benchmark/groupby/G1_1e9_1e2_5_0/q8.py +++ b/scripts/benchmark/db-benchmark/groupby/G1_1e9_1e2_5_0/q8.py @@ -12,6 +12,7 @@ .groupby("id6", as_index=False, dropna=False) .head(2) ) +ans = ans.reset_index(drop=True) print(ans.shape) chk = [ans["v3"].sum()] print(chk) diff --git a/scripts/create_bigtable.py b/scripts/create_bigtable.py index 655e4b31ab..f81bb8a013 100644 --- a/scripts/create_bigtable.py +++ b/scripts/create_bigtable.py @@ -16,13 +16,10 @@ # bigframes.streaming testing if they don't already exist import os -import pathlib import sys import google.cloud.bigtable as bigtable -REPO_ROOT = pathlib.Path(__file__).parent.parent - PROJECT_ID = os.getenv("GOOGLE_CLOUD_PROJECT") if not PROJECT_ID: diff --git a/scripts/create_pubsub.py b/scripts/create_pubsub.py new file mode 100644 index 0000000000..5d25398983 --- /dev/null +++ b/scripts/create_pubsub.py @@ -0,0 +1,49 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This script create the bigtable resources required for +# bigframes.streaming testing if they don't already exist + +import os +import sys + +from google.cloud import pubsub_v1 + +PROJECT_ID = os.getenv("GOOGLE_CLOUD_PROJECT") + +if not PROJECT_ID: + print( + "Please set GOOGLE_CLOUD_PROJECT environment variable before running.", + file=sys.stderr, + ) + sys.exit(1) + + +def create_topic(topic_id): + # based on + # https://cloud.google.com/pubsub/docs/samples/pubsub-quickstart-create-topic?hl=en + + publisher = pubsub_v1.PublisherClient() + topic_path = publisher.topic_path(PROJECT_ID, topic_id) + + topic = publisher.create_topic(request={"name": topic_path}) + print(f"Created topic: {topic.name}") + + +def main(): + create_topic("penguins") + + +if __name__ == "__main__": + main() diff --git a/setup.py b/setup.py index dbd9ce5fc2..79baf1fb23 100644 --- a/setup.py +++ b/setup.py @@ -40,6 +40,7 @@ "geopandas >=0.12.2", "google-auth >=2.15.0,<3.0dev", "google-cloud-bigtable >=2.24.0", + "google-cloud-pubsub >=2.21.4", "google-cloud-bigquery[bqstorage,pandas] >=3.16.0", "google-cloud-functions >=1.12.0", "google-cloud-bigquery-connection >=1.12.0", diff --git a/testing/constraints-3.9.txt b/testing/constraints-3.9.txt index bbd7bf0069..5a76698576 100644 --- a/testing/constraints-3.9.txt +++ b/testing/constraints-3.9.txt @@ -5,6 +5,7 @@ gcsfs==2023.3.0 geopandas==0.12.2 google-auth==2.15.0 google-cloud-bigtable==2.24.0 +google-cloud-pubsub==2.21.4 google-cloud-bigquery==3.16.0 google-cloud-functions==1.12.0 google-cloud-bigquery-connection==1.12.0 diff --git a/tests/system/conftest.py b/tests/system/conftest.py index a41e6dc6b7..df4ff9aff0 100644 --- a/tests/system/conftest.py +++ b/tests/system/conftest.py @@ -149,9 +149,7 @@ def unordered_session() -> Generator[bigframes.Session, None, None]: @pytest.fixture(scope="session") def session_tokyo(tokyo_location: str) -> Generator[bigframes.Session, None, None]: - context = bigframes.BigQueryOptions( - location=tokyo_location, - ) + context = bigframes.BigQueryOptions(location=tokyo_location) session = bigframes.Session(context=context) yield session session.close() # close generated session at cleanup type diff --git a/tests/system/large/ml/test_compose.py b/tests/system/large/ml/test_compose.py index 45322e78dd..59c5a1538f 100644 --- a/tests/system/large/ml/test_compose.py +++ b/tests/system/large/ml/test_compose.py @@ -123,7 +123,7 @@ def test_columntransformer_save_load(new_penguins_df, dataset_id): ("standard_scaler", preprocessing.StandardScaler(), "culmen_length_mm"), ("standard_scaler", preprocessing.StandardScaler(), "flipper_length_mm"), ] - assert reloaded_transformer.transformers_ == expected + assert set(reloaded_transformer.transformers) == set(expected) assert reloaded_transformer._bqml_model is not None result = transformer.fit_transform( diff --git a/tests/system/large/ml/test_pipeline.py b/tests/system/large/ml/test_pipeline.py index 6e18248e0f..84a6b11ff2 100644 --- a/tests/system/large/ml/test_pipeline.py +++ b/tests/system/large/ml/test_pipeline.py @@ -487,6 +487,11 @@ def test_pipeline_columntransformer_fit_predict(session, penguins_df_default_ind preprocessing.LabelEncoder(), "species", ), + ( + "poly_feats", + preprocessing.PolynomialFeatures(), + ["culmen_length_mm", "flipper_length_mm"], + ), ] ), ), @@ -567,6 +572,11 @@ def test_pipeline_columntransformer_to_gbq(penguins_df_default_index, dataset_id impute.SimpleImputer(), ["culmen_length_mm", "flipper_length_mm"], ), + ( + "polynomial_features", + preprocessing.PolynomialFeatures(), + ["culmen_length_mm", "flipper_length_mm"], + ), ( "label", preprocessing.LabelEncoder(), @@ -589,7 +599,7 @@ def test_pipeline_columntransformer_to_gbq(penguins_df_default_index, dataset_id ) assert isinstance(pl_loaded._transform, compose.ColumnTransformer) - transformers = pl_loaded._transform.transformers_ + transformers = pl_loaded._transform.transformers expected = [ ( "one_hot_encoder", @@ -629,9 +639,14 @@ def test_pipeline_columntransformer_to_gbq(penguins_df_default_index, dataset_id impute.SimpleImputer(), "flipper_length_mm", ), + ( + "polynomial_features", + preprocessing.PolynomialFeatures(), + ("culmen_length_mm", "flipper_length_mm"), + ), ] - assert transformers == expected + assert set(transformers) == set(expected) assert isinstance(pl_loaded._estimator, linear_model.LinearRegression) assert pl_loaded._estimator.fit_intercept is False @@ -849,3 +864,36 @@ def test_pipeline_simple_imputer_to_gbq(penguins_df_default_index, dataset_id): assert isinstance(pl_loaded._estimator, linear_model.LinearRegression) assert pl_loaded._estimator.fit_intercept is False + + +def test_pipeline_poly_features_to_gbq(penguins_df_default_index, dataset_id): + pl = pipeline.Pipeline( + [ + ( + "transform", + preprocessing.PolynomialFeatures(degree=3), + ), + ("estimator", linear_model.LinearRegression(fit_intercept=False)), + ] + ) + + df = penguins_df_default_index.dropna() + X_train = df[ + [ + "culmen_length_mm", + "flipper_length_mm", + ] + ] + y_train = df[["body_mass_g"]] + pl.fit(X_train, y_train) + + pl_loaded = pl.to_gbq( + f"{dataset_id}.test_penguins_pipeline_poly_features", replace=True + ) + assert isinstance(pl_loaded._transform, preprocessing.PolynomialFeatures) + + poly_features = pl_loaded._transform + assert poly_features.degree == 3 + + assert isinstance(pl_loaded._estimator, linear_model.LinearRegression) + assert pl_loaded._estimator.fit_intercept is False diff --git a/tests/system/large/test_remote_function.py b/tests/system/large/test_remote_function.py index 6bfc9f0da3..ef8b9811df 100644 --- a/tests/system/large/test_remote_function.py +++ b/tests/system/large/test_remote_function.py @@ -590,7 +590,7 @@ def add_one(x): add_one_uniq, add_one_uniq_dir = make_uniq_udf(add_one) # Expected cloud function name for the unique udf - add_one_uniq_cf_name = get_cloud_function_name(add_one_uniq) + add_one_uniq_cf_name, _ = get_cloud_function_name(add_one_uniq) # There should be no cloud function yet for the unique udf cloud_functions = list( @@ -1800,3 +1800,63 @@ def float_parser(row): cleanup_remote_function_assets( session.bqclient, session.cloudfunctionsclient, float_parser_remote ) + + +@pytest.mark.parametrize( + ("memory_mib_args", "expected_memory"), + [ + pytest.param({}, "1024Mi", id="no-set"), + pytest.param({"cloud_function_memory_mib": None}, "256M", id="set-None"), + pytest.param({"cloud_function_memory_mib": 128}, "128Mi", id="set-128"), + pytest.param({"cloud_function_memory_mib": 1024}, "1024Mi", id="set-1024"), + pytest.param({"cloud_function_memory_mib": 4096}, "4096Mi", id="set-4096"), + pytest.param({"cloud_function_memory_mib": 32768}, "32768Mi", id="set-32768"), + ], +) +@pytest.mark.flaky(retries=2, delay=120) +def test_remote_function_gcf_memory( + session, scalars_dfs, memory_mib_args, expected_memory +): + try: + + def square(x: int) -> int: + return x * x + + square_remote = session.remote_function(reuse=False, **memory_mib_args)(square) + + # Assert that the GCF is created with the intended memory + gcf = session.cloudfunctionsclient.get_function( + name=square_remote.bigframes_cloud_function + ) + assert gcf.service_config.available_memory == expected_memory + + scalars_df, scalars_pandas_df = scalars_dfs + + bf_result = scalars_df["int64_too"].apply(square_remote).to_pandas() + pd_result = scalars_pandas_df["int64_too"].apply(square) + + pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) + finally: + # clean up the gcp assets created for the remote function + cleanup_remote_function_assets( + session.bqclient, session.cloudfunctionsclient, square_remote + ) + + +@pytest.mark.parametrize( + ("memory_mib",), + [ + pytest.param(127, id="127-too-low"), + pytest.param(32769, id="set-32769-too-high"), + ], +) +@pytest.mark.flaky(retries=2, delay=120) +def test_remote_function_gcf_memory_unsupported(session, memory_mib): + with pytest.raises( + google.api_core.exceptions.InvalidArgument, + match="Invalid value specified for container memory", + ): + + @session.remote_function(reuse=False, cloud_function_memory_mib=memory_mib) + def square(x: int) -> int: + return x * x diff --git a/tests/system/large/test_streaming.py b/tests/system/large/test_streaming.py index 48db61e5bf..c125fde15a 100644 --- a/tests/system/large/test_streaming.py +++ b/tests/system/large/test_streaming.py @@ -22,11 +22,12 @@ def test_streaming_to_bigtable(): job_id_prefix = "test_streaming_" sql = """SELECT body_mass_g, island as rowkey - FROM birds.penguins""" + FROM birds.penguins_bigtable_streaming""" query_job = bigframes.streaming.to_bigtable( sql, - "streaming-testing-instance", - "table-testing", + instance="streaming-testing-instance", + table="table-testing", + service_account_email="streaming-testing@bigframes-load-testing.iam.gserviceaccount.com", app_profile=None, truncate=True, overwrite=True, @@ -46,3 +47,29 @@ def test_streaming_to_bigtable(): assert str(query_job.job_id).startswith(job_id_prefix) finally: query_job.cancel() + + +def test_streaming_to_pubsub(): + # launch a continuous query + job_id_prefix = "test_streaming_pubsub_" + sql = """SELECT + island + FROM birds.penguins_pubsub_streaming""" + query_job = bigframes.streaming.to_pubsub( + sql, + topic="penguins", + service_account_email="streaming-testing@bigframes-load-testing.iam.gserviceaccount.com", + job_id=None, + job_id_prefix=job_id_prefix, + ) + + try: + # wait 100 seconds in order to ensure the query doesn't stop + # (i.e. it is continuous) + time.sleep(100) + assert query_job.error_result is None + assert query_job.errors is None + assert query_job.running() + assert str(query_job.job_id).startswith(job_id_prefix) + finally: + query_job.cancel() diff --git a/tests/system/small/bigquery/test_json.py b/tests/system/small/bigquery/test_json.py new file mode 100644 index 0000000000..ff759b8fda --- /dev/null +++ b/tests/system/small/bigquery/test_json.py @@ -0,0 +1,119 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json + +import geopandas as gpd # type: ignore +import pandas as pd +import pytest + +import bigframes.bigquery as bbq +import bigframes.pandas as bpd + + +def _get_series_from_json(json_data): + sql = " UNION ALL ".join( + [ + f"SELECT {id} AS id, JSON '{json.dumps(data)}' AS data" + for id, data in enumerate(json_data) + ] + ) + df = bpd.read_gbq(sql).set_index("id").sort_index() + return df["data"] + + +@pytest.mark.parametrize( + ("json_path", "expected_json"), + [ + pytest.param("$.a", [{"a": 10}], id="simple"), + pytest.param("$.a.b.c", [{"a": {"b": {"c": 10, "d": []}}}], id="nested"), + ], +) +def test_json_set_at_json_path(json_path, expected_json): + s = _get_series_from_json([{"a": {"b": {"c": "tester", "d": []}}}]) + actual = bbq.json_set(s, json_path_value_pairs=[(json_path, 10)]) + + expected = _get_series_from_json(expected_json) + pd.testing.assert_series_equal( + actual.to_pandas(), + expected.to_pandas(), + ) + + +@pytest.mark.parametrize( + ("json_value", "expected_json"), + [ + pytest.param(10, [{"a": {"b": 10}}, {"a": {"b": 10}}], id="int"), + pytest.param(0.333, [{"a": {"b": 0.333}}, {"a": {"b": 0.333}}], id="float"), + pytest.param("eng", [{"a": {"b": "eng"}}, {"a": {"b": "eng"}}], id="string"), + pytest.param([1, 2], [{"a": {"b": 1}}, {"a": {"b": 2}}], id="series"), + ], +) +def test_json_set_at_json_value_type(json_value, expected_json): + s = _get_series_from_json([{"a": {"b": "dev"}}, {"a": {"b": [1, 2]}}]) + actual = bbq.json_set(s, json_path_value_pairs=[("$.a.b", json_value)]) + + expected = _get_series_from_json(expected_json) + pd.testing.assert_series_equal( + actual.to_pandas(), + expected.to_pandas(), + ) + + +def test_json_set_w_more_pairs(): + s = _get_series_from_json([{"a": 2}, {"b": 5}, {"c": 1}]) + actual = bbq.json_set( + s, json_path_value_pairs=[("$.a", 1), ("$.b", 2), ("$.a", [3, 4, 5])] + ) + expected = _get_series_from_json( + [{"a": 3, "b": 2}, {"a": 4, "b": 2}, {"a": 5, "b": 2, "c": 1}] + ) + pd.testing.assert_series_equal( + actual.to_pandas(), + expected.to_pandas(), + ) + + +@pytest.mark.parametrize( + ("series", "json_path_value_pairs"), + [ + pytest.param( + _get_series_from_json([{"a": 10}]), + [("$.a", 1, 100)], + id="invalid_json_path_value_pairs", + marks=pytest.mark.xfail(raises=ValueError), + ), + pytest.param( + _get_series_from_json([{"a": 10}]), + [ + ( + "$.a", + bpd.read_pandas( + gpd.GeoSeries.from_wkt(["POINT (1 2)", "POINT (2 1)"]) + ), + ) + ], + id="invalid_json_value_type", + marks=pytest.mark.xfail(raises=TypeError), + ), + pytest.param( + bpd.Series([1, 2]), + [("$.a", 1)], + id="invalid_series_type", + marks=pytest.mark.xfail(raises=TypeError), + ), + ], +) +def test_json_set_w_invalid(series, json_path_value_pairs): + bbq.json_set(series, json_path_value_pairs=json_path_value_pairs) diff --git a/tests/system/small/ml/test_preprocessing.py b/tests/system/small/ml/test_preprocessing.py index 73b1855e09..16b153ab45 100644 --- a/tests/system/small/ml/test_preprocessing.py +++ b/tests/system/small/ml/test_preprocessing.py @@ -907,3 +907,35 @@ def test_poly_features_params(new_penguins_df): ], [1633, 1672, 1690], ) + + +def test_poly_features_save_load(new_penguins_df, dataset_id): + transformer = preprocessing.PolynomialFeatures(degree=3) + transformer.fit(new_penguins_df[["culmen_length_mm", "culmen_depth_mm"]]) + + reloaded_transformer = transformer.to_gbq( + f"{dataset_id}.temp_configured_model", replace=True + ) + assert isinstance(reloaded_transformer, preprocessing.PolynomialFeatures) + assert reloaded_transformer.degree == 3 + assert reloaded_transformer._bqml_model is not None + + result = reloaded_transformer.transform( + new_penguins_df[["culmen_length_mm", "culmen_depth_mm"]] + ).to_pandas() + + utils.check_pandas_df_schema_and_index( + result, + [ + "poly_feat_culmen_length_mm", + "poly_feat_culmen_length_mm_culmen_length_mm", + "poly_feat_culmen_length_mm_culmen_length_mm_culmen_length_mm", + "poly_feat_culmen_length_mm_culmen_length_mm_culmen_depth_mm", + "poly_feat_culmen_length_mm_culmen_depth_mm", + "poly_feat_culmen_length_mm_culmen_depth_mm_culmen_depth_mm", + "poly_feat_culmen_depth_mm", + "poly_feat_culmen_depth_mm_culmen_depth_mm", + "poly_feat_culmen_depth_mm_culmen_depth_mm_culmen_depth_mm", + ], + [1633, 1672, 1690], + ) diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index 0aac9e2578..625b920763 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -2485,12 +2485,19 @@ def test_dataframe_agg_single_string(scalars_dfs): ) -def test_dataframe_agg_int_single_string(scalars_dfs): +@pytest.mark.parametrize( + ("agg",), + ( + ("sum",), + ("size",), + ), +) +def test_dataframe_agg_int_single_string(scalars_dfs, agg): numeric_cols = ["int64_col", "int64_too", "bool_col"] scalars_df, scalars_pandas_df = scalars_dfs - bf_result = scalars_df[numeric_cols].agg("sum").to_pandas() - pd_result = scalars_pandas_df[numeric_cols].agg("sum") + bf_result = scalars_df[numeric_cols].agg(agg).to_pandas() + pd_result = scalars_pandas_df[numeric_cols].agg(agg) assert bf_result.dtype == "Int64" pd.testing.assert_series_equal( @@ -2537,6 +2544,7 @@ def test_dataframe_agg_int_multi_string(scalars_dfs): "sum", "nunique", "count", + "size", ] scalars_df, scalars_pandas_df = scalars_dfs bf_result = scalars_df[numeric_cols].agg(aggregations).to_pandas() @@ -4464,13 +4472,26 @@ def test_recursion_limit(scalars_df_index): scalars_df_index.to_pandas() +def test_query_complexity_error(scalars_df_index): + # This test requires automatic caching/query decomposition to be turned off + bf_df = scalars_df_index + for _ in range(8): + bf_df = bf_df.merge(bf_df, on="int64_col").head(30) + bf_df = bf_df[bf_df.columns[:20]] + + with pytest.raises( + bigframes.exceptions.QueryComplexityError, match=r"Try using DataFrame\.cache" + ): + bf_df.to_pandas() + + def test_query_complexity_repeated_joins( scalars_df_index, scalars_pandas_df_index, with_multiquery_execution ): pd_df = scalars_pandas_df_index bf_df = scalars_df_index - for _ in range(6): - # recursively join, resuling in 2^6 - 1 = 63 joins + for _ in range(8): + # recursively join, resuling in 2^8 - 1 = 255 joins pd_df = pd_df.merge(pd_df, on="int64_col").head(30) pd_df = pd_df[pd_df.columns[:20]] bf_df = bf_df.merge(bf_df, on="int64_col").head(30) diff --git a/tests/system/small/test_dataframe_io.py b/tests/system/small/test_dataframe_io.py index 8adbea88e4..ab1fdceae5 100644 --- a/tests/system/small/test_dataframe_io.py +++ b/tests/system/small/test_dataframe_io.py @@ -132,6 +132,67 @@ def test_sql_executes_and_includes_named_multiindex( ) +def test_to_arrow(scalars_df_default_index, scalars_pandas_df_default_index): + """Verify to_arrow() APIs returns the expected data.""" + expected = pa.Table.from_pandas( + scalars_pandas_df_default_index.drop(columns=["geography_col"]) + ) + + with pytest.warns( + bigframes.exceptions.PreviewWarning, + match="to_arrow", + ): + actual = scalars_df_default_index.drop(columns=["geography_col"]).to_arrow() + + # Make string_col match type. Otherwise, pa.Table.from_pandas uses + # LargeStringArray. LargeStringArray is unnecessary because our strings are + # less than 2 GB. + expected = expected.set_column( + expected.column_names.index("string_col"), + pa.field("string_col", pa.string()), + expected["string_col"].cast(pa.string()), + ) + + # Note: the final .equals assertion covers all these checks, but these + # finer-grained assertions are easier to debug. + assert actual.column_names == expected.column_names + for column in actual.column_names: + assert actual[column].equals(expected[column]) + assert actual.equals(expected) + + +def test_to_arrow_multiindex(scalars_df_index, scalars_pandas_df_index): + scalars_df_multiindex = scalars_df_index.set_index(["string_col", "int64_col"]) + scalars_pandas_df_multiindex = scalars_pandas_df_index.set_index( + ["string_col", "int64_col"] + ) + expected = pa.Table.from_pandas( + scalars_pandas_df_multiindex.drop(columns=["geography_col"]) + ) + + with pytest.warns( + bigframes.exceptions.PreviewWarning, + match="to_arrow", + ): + actual = scalars_df_multiindex.drop(columns=["geography_col"]).to_arrow() + + # Make string_col match type. Otherwise, pa.Table.from_pandas uses + # LargeStringArray. LargeStringArray is unnecessary because our strings are + # less than 2 GB. + expected = expected.set_column( + expected.column_names.index("string_col"), + pa.field("string_col", pa.string()), + expected["string_col"].cast(pa.string()), + ) + + # Note: the final .equals assertion covers all these checks, but these + # finer-grained assertions are easier to debug. + assert actual.column_names == expected.column_names + for column in actual.column_names: + assert actual[column].equals(expected[column]) + assert actual.equals(expected) + + def test_to_pandas_w_correct_dtypes(scalars_df_default_index): """Verify to_pandas() APIs returns the expected dtypes.""" actual = scalars_df_default_index.to_pandas().dtypes diff --git a/tests/system/small/test_groupby.py b/tests/system/small/test_groupby.py index 960dc10948..8e3baff4c2 100644 --- a/tests/system/small/test_groupby.py +++ b/tests/system/small/test_groupby.py @@ -140,11 +140,23 @@ def test_dataframe_groupby_agg_string( ) +def test_dataframe_groupby_agg_size_string(scalars_df_index, scalars_pandas_df_index): + col_names = ["int64_too", "float64_col", "int64_col", "bool_col", "string_col"] + bf_result = scalars_df_index[col_names].groupby("string_col").agg("size") + pd_result = scalars_pandas_df_index[col_names].groupby("string_col").agg("size") + + pd.testing.assert_series_equal(pd_result, bf_result.to_pandas(), check_dtype=False) + + def test_dataframe_groupby_agg_list(scalars_df_index, scalars_pandas_df_index): col_names = ["int64_too", "float64_col", "int64_col", "bool_col", "string_col"] - bf_result = scalars_df_index[col_names].groupby("string_col").agg(["count", "min"]) + bf_result = ( + scalars_df_index[col_names].groupby("string_col").agg(["count", "min", "size"]) + ) pd_result = ( - scalars_pandas_df_index[col_names].groupby("string_col").agg(["count", "min"]) + scalars_pandas_df_index[col_names] + .groupby("string_col") + .agg(["count", "min", "size"]) ) bf_result_computed = bf_result.to_pandas() @@ -161,8 +173,8 @@ def test_dataframe_groupby_agg_list_w_column_multi_index( pd_df = scalars_pandas_df_index[columns].copy() pd_df.columns = multi_columns - bf_result = bf_df.groupby(level=0).agg(["count", "min"]) - pd_result = pd_df.groupby(level=0).agg(["count", "min"]) + bf_result = bf_df.groupby(level=0).agg(["count", "min", "size"]) + pd_result = pd_df.groupby(level=0).agg(["count", "min", "size"]) bf_result_computed = bf_result.to_pandas() pd.testing.assert_frame_equal(pd_result, bf_result_computed, check_dtype=False) @@ -182,12 +194,12 @@ def test_dataframe_groupby_agg_dict_with_list( bf_result = ( scalars_df_index[col_names] .groupby("string_col", as_index=as_index) - .agg({"int64_too": ["mean", "max"], "string_col": "count"}) + .agg({"int64_too": ["mean", "max"], "string_col": "count", "bool_col": "size"}) ) pd_result = ( scalars_pandas_df_index[col_names] .groupby("string_col", as_index=as_index) - .agg({"int64_too": ["mean", "max"], "string_col": "count"}) + .agg({"int64_too": ["mean", "max"], "string_col": "count", "bool_col": "size"}) ) bf_result_computed = bf_result.to_pandas() @@ -413,16 +425,21 @@ def test_dataframe_groupby_nonnumeric_with_mean(): # ============== -def test_series_groupby_agg_string(scalars_df_index, scalars_pandas_df_index): +@pytest.mark.parametrize( + ("agg"), + [ + ("count"), + ("size"), + ], +) +def test_series_groupby_agg_string(scalars_df_index, scalars_pandas_df_index, agg): bf_result = ( - scalars_df_index["int64_col"] - .groupby(scalars_df_index["string_col"]) - .agg("count") + scalars_df_index["int64_col"].groupby(scalars_df_index["string_col"]).agg(agg) ) pd_result = ( scalars_pandas_df_index["int64_col"] .groupby(scalars_pandas_df_index["string_col"]) - .agg("count") + .agg(agg) ) bf_result_computed = bf_result.to_pandas() @@ -435,12 +452,12 @@ def test_series_groupby_agg_list(scalars_df_index, scalars_pandas_df_index): bf_result = ( scalars_df_index["int64_col"] .groupby(scalars_df_index["string_col"]) - .agg(["sum", "mean"]) + .agg(["sum", "mean", "size"]) ) pd_result = ( scalars_pandas_df_index["int64_col"] .groupby(scalars_pandas_df_index["string_col"]) - .agg(["sum", "mean"]) + .agg(["sum", "mean", "size"]) ) bf_result_computed = bf_result.to_pandas() diff --git a/tests/system/small/test_remote_function.py b/tests/system/small/test_remote_function.py index 5838ad75b0..d84d520988 100644 --- a/tests/system/small/test_remote_function.py +++ b/tests/system/small/test_remote_function.py @@ -742,6 +742,109 @@ def test_read_gbq_function_enforces_explicit_types( ) +@pytest.mark.flaky(retries=2, delay=120) +def test_df_apply_axis_1(session, scalars_dfs): + columns = [ + "bool_col", + "int64_col", + "int64_too", + "float64_col", + "string_col", + "bytes_col", + ] + scalars_df, scalars_pandas_df = scalars_dfs + + def add_ints(row): + return row["int64_col"] + row["int64_too"] + + with pytest.warns( + bigframes.exceptions.PreviewWarning, + match="input_types=Series is in preview.", + ): + add_ints_remote = session.remote_function( + bigframes.series.Series, + int, + )(add_ints) + + with pytest.warns( + bigframes.exceptions.PreviewWarning, match="axis=1 scenario is in preview." + ): + bf_result = scalars_df[columns].apply(add_ints_remote, axis=1).to_pandas() + + pd_result = scalars_pandas_df[columns].apply(add_ints, axis=1) + + # bf_result.dtype is 'Int64' while pd_result.dtype is 'object', ignore this + # mismatch by using check_dtype=False. + # + # bf_result.to_numpy() produces an array of numpy.float64's + # (in system_prerelease tests), while pd_result.to_numpy() produces an + # array of ints, ignore this mismatch by using check_exact=False. + pd.testing.assert_series_equal( + pd_result, bf_result, check_dtype=False, check_exact=False + ) + + +@pytest.mark.flaky(retries=2, delay=120) +def test_df_apply_axis_1_ordering(session, scalars_dfs): + columns = ["bool_col", "int64_col", "int64_too", "float64_col", "string_col"] + ordering_columns = ["bool_col", "int64_col"] + scalars_df, scalars_pandas_df = scalars_dfs + + def add_ints(row): + return row["int64_col"] + row["int64_too"] + + add_ints_remote = session.remote_function(bigframes.series.Series, int)(add_ints) + + bf_result = ( + scalars_df[columns] + .sort_values(ordering_columns) + .apply(add_ints_remote, axis=1) + .to_pandas() + ) + pd_result = ( + scalars_pandas_df[columns].sort_values(ordering_columns).apply(add_ints, axis=1) + ) + + # bf_result.dtype is 'Int64' while pd_result.dtype is 'object', ignore this + # mismatch by using check_dtype=False. + # + # bf_result.to_numpy() produces an array of numpy.float64's + # (in system_prerelease tests), while pd_result.to_numpy() produces an + # array of ints, ignore this mismatch by using check_exact=False. + pd.testing.assert_series_equal( + pd_result, bf_result, check_dtype=False, check_exact=False + ) + + +@pytest.mark.flaky(retries=2, delay=120) +def test_df_apply_axis_1_multiindex(session): + pd_df = pd.DataFrame( + {"x": [1, 2, 3], "y": [1.5, 3.75, 5], "z": ["pq", "rs", "tu"]}, + index=pd.MultiIndex.from_tuples([("a", 100), ("a", 200), ("b", 300)]), + ) + bf_df = session.read_pandas(pd_df) + + def add_numbers(row): + return row["x"] + row["y"] + + add_numbers_remote = session.remote_function(bigframes.series.Series, float)( + add_numbers + ) + + bf_result = bf_df.apply(add_numbers_remote, axis=1).to_pandas() + pd_result = pd_df.apply(add_numbers, axis=1) + + # bf_result.dtype is 'Float64' while pd_result.dtype is 'float64', ignore this + # mismatch by using check_dtype=False. + # + # bf_result.index[0].dtype is 'string[pyarrow]' while + # pd_result.index[0].dtype is 'object', ignore this mismatch by using + # check_index_type=False. + pd.testing.assert_series_equal( + pd_result, bf_result, check_dtype=False, check_index_type=False + ) + + def test_df_apply_axis_1_unsupported_callable(scalars_dfs): scalars_df, scalars_pandas_df = scalars_dfs columns = ["bool_col", "int64_col", "int64_too", "float64_col", "string_col"] diff --git a/tests/system/small/test_series.py b/tests/system/small/test_series.py index 3e21418f2f..10fcec63ce 100644 --- a/tests/system/small/test_series.py +++ b/tests/system/small/test_series.py @@ -506,15 +506,32 @@ def test_series_dropna(scalars_dfs, ignore_index): pd.testing.assert_series_equal(pd_result, bf_result, check_index_type=False) -def test_series_agg_single_string(scalars_dfs): +@pytest.mark.parametrize( + ("agg",), + ( + ("sum",), + ("size",), + ), +) +def test_series_agg_single_string(scalars_dfs, agg): scalars_df, scalars_pandas_df = scalars_dfs - bf_result = scalars_df["int64_col"].agg("sum") - pd_result = scalars_pandas_df["int64_col"].agg("sum") + bf_result = scalars_df["int64_col"].agg(agg) + pd_result = scalars_pandas_df["int64_col"].agg(agg) assert math.isclose(pd_result, bf_result) def test_series_agg_multi_string(scalars_dfs): - aggregations = ["sum", "mean", "std", "var", "min", "max", "nunique", "count"] + aggregations = [ + "sum", + "mean", + "std", + "var", + "min", + "max", + "nunique", + "count", + "size", + ] scalars_df, scalars_pandas_df = scalars_dfs bf_result = scalars_df["int64_col"].agg(aggregations).to_pandas() pd_result = scalars_pandas_df["int64_col"].agg(aggregations) @@ -1970,6 +1987,70 @@ def test_head_then_series_operation(scalars_dfs): ) +def test_series_peek(scalars_dfs): + scalars_df, scalars_pandas_df = scalars_dfs + peek_result = scalars_df["float64_col"].peek(n=3, force=False) + pd.testing.assert_series_equal( + peek_result, + scalars_pandas_df["float64_col"].reindex_like(peek_result), + ) + + +def test_series_peek_multi_index(scalars_dfs): + scalars_df, scalars_pandas_df = scalars_dfs + bf_series = scalars_df.set_index(["string_col", "bool_col"])["float64_col"] + bf_series.name = ("2-part", "name") + pd_series = scalars_pandas_df.set_index(["string_col", "bool_col"])["float64_col"] + pd_series.name = ("2-part", "name") + peek_result = bf_series.peek(n=3, force=False) + pd.testing.assert_series_equal( + peek_result, + pd_series.reindex_like(peek_result), + ) + + +def test_series_peek_filtered(scalars_dfs): + scalars_df, scalars_pandas_df = scalars_dfs + peek_result = scalars_df[scalars_df.int64_col > 0]["float64_col"].peek( + n=3, force=False + ) + pd_result = scalars_pandas_df[scalars_pandas_df.int64_col > 0]["float64_col"] + pd.testing.assert_series_equal( + peek_result, + pd_result.reindex_like(peek_result), + ) + + +@skip_legacy_pandas +def test_series_peek_force(scalars_dfs): + scalars_df, scalars_pandas_df = scalars_dfs + + cumsum_df = scalars_df[["int64_col", "int64_too"]].cumsum() + df_filtered = cumsum_df[cumsum_df.int64_col > 0]["int64_too"] + peek_result = df_filtered.peek(n=3, force=True) + pd_cumsum_df = scalars_pandas_df[["int64_col", "int64_too"]].cumsum() + pd_result = pd_cumsum_df[pd_cumsum_df.int64_col > 0]["int64_too"] + pd.testing.assert_series_equal( + peek_result, + pd_result.reindex_like(peek_result), + ) + + +@skip_legacy_pandas +def test_series_peek_force_float(scalars_dfs): + scalars_df, scalars_pandas_df = scalars_dfs + + cumsum_df = scalars_df[["int64_col", "float64_col"]].cumsum() + df_filtered = cumsum_df[cumsum_df.float64_col > 0]["float64_col"] + peek_result = df_filtered.peek(n=3, force=True) + pd_cumsum_df = scalars_pandas_df[["int64_col", "float64_col"]].cumsum() + pd_result = pd_cumsum_df[pd_cumsum_df.float64_col > 0]["float64_col"] + pd.testing.assert_series_equal( + peek_result, + pd_result.reindex_like(peek_result), + ) + + def test_shift(scalars_df_index, scalars_pandas_df_index): col_name = "int64_col" bf_result = scalars_df_index[col_name].shift().to_pandas() diff --git a/tests/system/small/test_session.py b/tests/system/small/test_session.py index 5d53a5af17..2f779f337e 100644 --- a/tests/system/small/test_session.py +++ b/tests/system/small/test_session.py @@ -570,6 +570,18 @@ def test_read_gbq_with_custom_global_labels( assert len(bigframes.options.compute.extra_query_labels) == 0 +def test_read_gbq_external_table(session: bigframes.Session): + # Verify the table is external to ensure it hasn't been altered + external_table_id = "bigframes-dev.bigframes_tests_sys.parquet_external_table" + external_table = session.bqclient.get_table(external_table_id) + assert external_table.table_type == "EXTERNAL" + + df = session.read_gbq(external_table_id) + + assert list(df.columns) == ["idx", "s1", "s2", "s3", "s4", "i1", "f1", "i2", "f2"] + assert df["i1"].max() == 99 + + def test_read_gbq_model(session, penguins_linear_model_name): model = session.read_gbq_model(penguins_linear_model_name) assert isinstance(model, bigframes.ml.linear_model.LinearRegression) diff --git a/tests/system/small/test_unordered.py b/tests/system/small/test_unordered.py index d555cedcc0..36bf2a2585 100644 --- a/tests/system/small/test_unordered.py +++ b/tests/system/small/test_unordered.py @@ -13,7 +13,9 @@ # limitations under the License. import pandas as pd import pyarrow as pa +import pytest +import bigframes.exceptions import bigframes.pandas as bpd from tests.system.utils import assert_pandas_df_equal, skip_legacy_pandas @@ -59,3 +61,52 @@ def test_unordered_mode_read_gbq(unordered_session): ) # Don't need ignore_order as there is only 1 row assert_pandas_df_equal(df.to_pandas(), expected) + + +@pytest.mark.parametrize( + ("keep"), + [ + pytest.param( + "first", + marks=pytest.mark.xfail(raises=bigframes.exceptions.OrderRequiredError), + ), + pytest.param( + False, + ), + ], +) +def test_unordered_drop_duplicates(unordered_session, keep): + pd_df = pd.DataFrame({"a": [1, 1, 3], "b": [4, 4, 6]}, dtype=pd.Int64Dtype()) + bf_df = bpd.DataFrame(pd_df, session=unordered_session) + + bf_result = bf_df.drop_duplicates(keep=keep) + pd_result = pd_df.drop_duplicates(keep=keep) + + assert_pandas_df_equal(bf_result.to_pandas(), pd_result, ignore_order=True) + + +@pytest.mark.parametrize( + ("function"), + [ + pytest.param( + lambda x: x.cumsum(), + id="cumsum", + ), + pytest.param( + lambda x: x.idxmin(), + id="idxmin", + ), + pytest.param( + lambda x: x.a.iloc[1::2], + id="series_iloc", + ), + ], +) +def test_unordered_mode_blocks_windowing(unordered_session, function): + pd_df = pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}, dtype=pd.Int64Dtype()) + df = bpd.DataFrame(pd_df, session=unordered_session) + with pytest.raises( + bigframes.exceptions.OrderRequiredError, + match=r"Op.*not supported when strict ordering is disabled", + ): + function(df) diff --git a/tests/unit/test_planner.py b/tests/unit/test_planner.py new file mode 100644 index 0000000000..2e276d0f1a --- /dev/null +++ b/tests/unit/test_planner.py @@ -0,0 +1,121 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import annotations + +import unittest.mock as mock + +import google.cloud.bigquery +import pandas as pd + +import bigframes.core as core +import bigframes.core.expression as ex +import bigframes.core.schema +import bigframes.operations as ops +import bigframes.session.planner as planner + +TABLE_REF = google.cloud.bigquery.TableReference.from_string("project.dataset.table") +SCHEMA = ( + google.cloud.bigquery.SchemaField("col_a", "INTEGER"), + google.cloud.bigquery.SchemaField("col_b", "INTEGER"), +) +TABLE = google.cloud.bigquery.Table( + table_ref=TABLE_REF, + schema=SCHEMA, +) +FAKE_SESSION = mock.create_autospec(bigframes.Session, instance=True) +type(FAKE_SESSION)._strictly_ordered = mock.PropertyMock(return_value=True) +LEAF: core.ArrayValue = core.ArrayValue.from_table( + session=FAKE_SESSION, + table=TABLE, + schema=bigframes.core.schema.ArraySchema.from_bq_table(TABLE), +) + + +def test_session_aware_caching_project_filter(): + """ + Test that if a node is filtered by a column, the node is cached pre-filter and clustered by the filter column. + """ + session_objects = [LEAF, LEAF.assign_constant("col_c", 4, pd.Int64Dtype())] + target = LEAF.assign_constant("col_c", 4, pd.Int64Dtype()).filter( + ops.gt_op.as_expr("col_a", ex.const(3)) + ) + result, cluster_cols = planner.session_aware_cache_plan( + target.node, [obj.node for obj in session_objects] + ) + assert result == LEAF.node + assert cluster_cols == ["col_a"] + + +def test_session_aware_caching_project_multi_filter(): + """ + Test that if a node is filtered by multiple columns, all of them are in the cluster cols + """ + session_objects = [LEAF, LEAF.assign_constant("col_c", 4, pd.Int64Dtype())] + predicate_1a = ops.gt_op.as_expr("col_a", ex.const(3)) + predicate_1b = ops.lt_op.as_expr("col_a", ex.const(55)) + predicate_1 = ops.and_op.as_expr(predicate_1a, predicate_1b) + predicate_3 = ops.eq_op.as_expr("col_b", ex.const(1)) + target = ( + LEAF.filter(predicate_1) + .assign_constant("col_c", 4, pd.Int64Dtype()) + .filter(predicate_3) + ) + result, cluster_cols = planner.session_aware_cache_plan( + target.node, [obj.node for obj in session_objects] + ) + assert result == LEAF.node + assert cluster_cols == ["col_a", "col_b"] + + +def test_session_aware_caching_unusable_filter(): + """ + Test that if a node is filtered by multiple columns in the same comparison, the node is cached pre-filter and not clustered by either column. + + Most filters with multiple column references cannot be used for scan pruning, as they cannot be converted to fixed value ranges. + """ + session_objects = [LEAF, LEAF.assign_constant("col_c", 4, pd.Int64Dtype())] + target = LEAF.assign_constant("col_c", 4, pd.Int64Dtype()).filter( + ops.gt_op.as_expr("col_a", "col_b") + ) + result, cluster_cols = planner.session_aware_cache_plan( + target.node, [obj.node for obj in session_objects] + ) + assert result == LEAF.node + assert cluster_cols == [] + + +def test_session_aware_caching_fork_after_window_op(): + """ + Test that caching happens only after an windowed operation, but before filtering, projecting. + + Windowing is expensive, so caching should always compute the window function, in order to avoid later recomputation. + """ + other = LEAF.promote_offsets("offsets_col").assign_constant( + "col_d", 5, pd.Int64Dtype() + ) + target = ( + LEAF.promote_offsets("offsets_col") + .assign_constant("col_c", 4, pd.Int64Dtype()) + .filter( + ops.eq_op.as_expr("col_a", ops.add_op.as_expr(ex.const(4), ex.const(3))) + ) + ) + result, cluster_cols = planner.session_aware_cache_plan( + target.node, + [ + other.node, + ], + ) + assert result == LEAF.promote_offsets("offsets_col").node + assert cluster_cols == ["col_a"] diff --git a/third_party/bigframes_vendored/ibis/expr/operations/json.py b/third_party/bigframes_vendored/ibis/expr/operations/json.py index 772c2e8ff4..1eb0554137 100644 --- a/third_party/bigframes_vendored/ibis/expr/operations/json.py +++ b/third_party/bigframes_vendored/ibis/expr/operations/json.py @@ -2,8 +2,8 @@ from __future__ import annotations import ibis.expr.datatypes as dt -from ibis.expr.operations.core import Unary +import ibis.expr.operations.core as ibis_ops_core -class ToJsonString(Unary): +class ToJsonString(ibis_ops_core.Unary): dtype = dt.string diff --git a/third_party/bigframes_vendored/sklearn/preprocessing/_polynomial.py b/third_party/bigframes_vendored/sklearn/preprocessing/_polynomial.py index 4e4624ba84..9ad43b7956 100644 --- a/third_party/bigframes_vendored/sklearn/preprocessing/_polynomial.py +++ b/third_party/bigframes_vendored/sklearn/preprocessing/_polynomial.py @@ -8,7 +8,12 @@ class PolynomialFeatures(TransformerMixin, BaseEstimator): - """Generate polynomial and interaction features.""" + """Generate polynomial and interaction features. + + Args: + degree (int): + Specifies the maximal degree of the polynomial features. Valid values [1, 4]. Default to 2. + """ def fit(self, X, y=None): """Compute number of output features.