From 6351066cd6ff525fd25bad2505da71943c8a500c Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Thu, 13 Feb 2025 15:51:07 -0500 Subject: [PATCH] Add table upsert support (#1660) Closes #402 This PR adds the `upsert` function to the `Table` class and supports the following upsert operations: - when matched update all - when not matched insert all This PR is a remake of #1534 due to some infrastructure issues. For additional context, please refer to that PR. --------- Co-authored-by: VAA7RQ Co-authored-by: VAA7RQ Co-authored-by: mattmartin14 <51217870+mattmartin14@users.noreply.github.com> Co-authored-by: Fokko Driesprong --- poetry.lock | 34 +++- pyiceberg/table/__init__.py | 80 +++++++++ pyiceberg/table/upsert_util.py | 118 +++++++++++++ pyproject.toml | 5 + tests/table/test_upsert.py | 309 +++++++++++++++++++++++++++++++++ 5 files changed, 539 insertions(+), 7 deletions(-) create mode 100644 pyiceberg/table/upsert_util.py create mode 100644 tests/table/test_upsert.py diff --git a/poetry.lock b/poetry.lock index 03c2918773..911db4711f 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1099,6 +1099,26 @@ files = [ {file = "cython-3.0.12.tar.gz", hash = "sha256:b988bb297ce76c671e28c97d017b95411010f7c77fa6623dd0bb47eed1aee1bc"}, ] +[[package]] +name = "datafusion" +version = "44.0.0" +description = "Build and run queries against data" +optional = false +python-versions = ">=3.8" +groups = ["dev"] +files = [ + {file = "datafusion-44.0.0-cp38-abi3-macosx_10_12_x86_64.whl", hash = "sha256:4786f0a09c6b422ac18c6ea095650c14454be5af3df880b5c169688f610ab41a"}, + {file = "datafusion-44.0.0-cp38-abi3-macosx_11_0_arm64.whl", hash = "sha256:bbad11b33c424a658edbc52db39dfe4ddc30339ffac7c43cdc1aa128c260ae76"}, + {file = "datafusion-44.0.0-cp38-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:4ca3b47fd34e1c96cf6d40a877245afd36f3ccf8b39dda1e5b6f811f273af781"}, + {file = "datafusion-44.0.0-cp38-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:22d2e3ecf5d0b1b75c8ad48c8d9af14a0ac4de1633e86d3b397614f68aa8123c"}, + {file = "datafusion-44.0.0-cp38-abi3-win_amd64.whl", hash = "sha256:b36774dca54a0e1c88c8080b8c72cc2df5e95f4340a0cdbdd18a0473401551c5"}, + {file = "datafusion-44.0.0.tar.gz", hash = "sha256:5fc3740406ff531527aa8baa5954fe0bf1f02ea72170e172746b38cffc0d8d50"}, +] + +[package.dependencies] +pyarrow = ">=11.0.0" +typing-extensions = {version = "*", markers = "python_version < \"3.13\""} + [[package]] name = "decorator" version = "5.1.1" @@ -1800,15 +1820,15 @@ requests = ["requests (>=2.18.0,<3.0.0dev)"] [[package]] name = "googleapis-common-protos" -version = "1.67.0" +version = "1.66.0" description = "Common protobufs used in Google APIs" optional = true python-versions = ">=3.7" groups = ["main"] markers = "extra == \"gcsfs\"" files = [ - {file = "googleapis_common_protos-1.67.0-py2.py3-none-any.whl", hash = "sha256:579de760800d13616f51cf8be00c876f00a9f146d3e6510e19d1f4111758b741"}, - {file = "googleapis_common_protos-1.67.0.tar.gz", hash = "sha256:21398025365f138be356d5923e9168737d94d46a72aefee4a6110a1f23463c86"}, + {file = "googleapis_common_protos-1.66.0-py2.py3-none-any.whl", hash = "sha256:d7abcd75fabb2e0ec9f74466401f6c119a0b498e27370e9be4c94cb7e382b8ed"}, + {file = "googleapis_common_protos-1.66.0.tar.gz", hash = "sha256:c3e7b33d15fdca5374cc0a7346dd92ffa847425cc4ea941d970f13680052ec8c"}, ] [package.dependencies] @@ -3776,10 +3796,9 @@ files = [ name = "pyarrow" version = "19.0.0" description = "Python library for Apache Arrow" -optional = true +optional = false python-versions = ">=3.9" -groups = ["main"] -markers = "extra == \"pyarrow\" or extra == \"pandas\" or extra == \"duckdb\" or extra == \"ray\" or extra == \"daft\"" +groups = ["main", "dev"] files = [ {file = "pyarrow-19.0.0-cp310-cp310-macosx_12_0_arm64.whl", hash = "sha256:c318eda14f6627966997a7d8c374a87d084a94e4e38e9abbe97395c215830e0c"}, {file = "pyarrow-19.0.0-cp310-cp310-macosx_12_0_x86_64.whl", hash = "sha256:62ef8360ff256e960f57ce0299090fb86423afed5e46f18f1225f960e05aae3d"}, @@ -3824,6 +3843,7 @@ files = [ {file = "pyarrow-19.0.0-cp39-cp39-win_amd64.whl", hash = "sha256:597360ffc71fc8cceea1aec1fb60cb510571a744fffc87db33d551d5de919bec"}, {file = "pyarrow-19.0.0.tar.gz", hash = "sha256:8d47c691765cf497aaeed4954d226568563f1b3b74ff61139f2d77876717084b"}, ] +markers = {main = "extra == \"pyarrow\" or extra == \"pandas\" or extra == \"duckdb\" or extra == \"ray\" or extra == \"daft\""} [package.extras] test = ["cffi", "hypothesis", "pandas", "pytest", "pytz"] @@ -5796,4 +5816,4 @@ zstandard = ["zstandard"] [metadata] lock-version = "2.1" python-versions = "^3.9.2, !=3.9.7" -content-hash = "4fc6b147b3c4d213c2bafcc1c284df25bedf5b5b71dcac0cfefed343195a68fa" +content-hash = "c3587101a24b4eb668c214848042ffefc9cc8054d9c8d91b954563bcf19f152c" diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index f16aa28844..89e7ffa544 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -153,6 +153,14 @@ DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE = "downcast-ns-timestamp-to-us-on-write" +@dataclass() +class UpsertResult: + """Summary the upsert operation.""" + + rows_updated: int = 0 + rows_inserted: int = 0 + + class TableProperties: PARQUET_ROW_GROUP_SIZE_BYTES = "write.parquet.row-group-size-bytes" PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT = 128 * 1024 * 1024 # 128 MB @@ -1092,6 +1100,78 @@ def name_mapping(self) -> Optional[NameMapping]: """Return the table's field-id NameMapping.""" return self.metadata.name_mapping() + def upsert( + self, df: pa.Table, join_cols: list[str], when_matched_update_all: bool = True, when_not_matched_insert_all: bool = True + ) -> UpsertResult: + """Shorthand API for performing an upsert to an iceberg table. + + Args: + + df: The input dataframe to upsert with the table's data. + join_cols: The columns to join on. These are essentially analogous to primary keys + when_matched_update_all: Bool indicating to update rows that are matched but require an update due to a value in a non-key column changing + when_not_matched_insert_all: Bool indicating new rows to be inserted that do not match any existing rows in the table + + Example Use Cases: + Case 1: Both Parameters = True (Full Upsert) + Existing row found → Update it + New row found → Insert it + + Case 2: when_matched_update_all = False, when_not_matched_insert_all = True + Existing row found → Do nothing (no updates) + New row found → Insert it + + Case 3: when_matched_update_all = True, when_not_matched_insert_all = False + Existing row found → Update it + New row found → Do nothing (no inserts) + + Case 4: Both Parameters = False (No Merge Effect) + Existing row found → Do nothing + New row found → Do nothing + (Function effectively does nothing) + + + Returns: + An UpsertResult class (contains details of rows updated and inserted) + """ + from pyiceberg.table import upsert_util + + if not when_matched_update_all and not when_not_matched_insert_all: + raise ValueError("no upsert options selected...exiting") + + if upsert_util.has_duplicate_rows(df, join_cols): + raise ValueError("Duplicate rows found in source dataset based on the key columns. No upsert executed") + + # get list of rows that exist so we don't have to load the entire target table + matched_predicate = upsert_util.create_match_filter(df, join_cols) + matched_iceberg_table = self.scan(row_filter=matched_predicate).to_arrow() + + update_row_cnt = 0 + insert_row_cnt = 0 + + with self.transaction() as tx: + if when_matched_update_all: + # function get_rows_to_update is doing a check on non-key columns to see if any of the values have actually changed + # we don't want to do just a blanket overwrite for matched rows if the actual non-key column data hasn't changed + # this extra step avoids unnecessary IO and writes + rows_to_update = upsert_util.get_rows_to_update(df, matched_iceberg_table, join_cols) + + update_row_cnt = len(rows_to_update) + + # build the match predicate filter + overwrite_mask_predicate = upsert_util.create_match_filter(rows_to_update, join_cols) + + tx.overwrite(rows_to_update, overwrite_filter=overwrite_mask_predicate) + + if when_not_matched_insert_all: + rows_to_insert = upsert_util.get_rows_to_insert(df, matched_iceberg_table, join_cols) + + insert_row_cnt = len(rows_to_insert) + + tx.append(rows_to_insert) + + return UpsertResult(rows_updated=update_row_cnt, rows_inserted=insert_row_cnt) + def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None: """ Shorthand API for appending a PyArrow table to the table. diff --git a/pyiceberg/table/upsert_util.py b/pyiceberg/table/upsert_util.py new file mode 100644 index 0000000000..87ff6a6a6e --- /dev/null +++ b/pyiceberg/table/upsert_util.py @@ -0,0 +1,118 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import functools +import operator + +import pyarrow as pa +from pyarrow import Table as pyarrow_table +from pyarrow import compute as pc + +from pyiceberg.expressions import ( + And, + BooleanExpression, + EqualTo, + In, + Or, +) + + +def create_match_filter(df: pyarrow_table, join_cols: list[str]) -> BooleanExpression: + unique_keys = df.select(join_cols).group_by(join_cols).aggregate([]) + + if len(join_cols) == 1: + return In(join_cols[0], unique_keys[0].to_pylist()) + else: + return Or(*[And(*[EqualTo(col, row[col]) for col in join_cols]) for row in unique_keys.to_pylist()]) + + +def has_duplicate_rows(df: pyarrow_table, join_cols: list[str]) -> bool: + """Check for duplicate rows in a PyArrow table based on the join columns.""" + return len(df.select(join_cols).group_by(join_cols).aggregate([([], "count_all")]).filter(pc.field("count_all") > 1)) > 0 + + +def get_rows_to_update(source_table: pa.Table, target_table: pa.Table, join_cols: list[str]) -> pa.Table: + """ + Return a table with rows that need to be updated in the target table based on the join columns. + + When a row is matched, an additional scan is done to evaluate the non-key columns to detect if an actual change has occurred. + Only matched rows that have an actual change to a non-key column value will be returned in the final output. + """ + all_columns = set(source_table.column_names) + join_cols_set = set(join_cols) + + non_key_cols = list(all_columns - join_cols_set) + + match_expr = functools.reduce(operator.and_, [pc.field(col).isin(target_table.column(col).to_pylist()) for col in join_cols]) + + matching_source_rows = source_table.filter(match_expr) + + rows_to_update = [] + + for index in range(matching_source_rows.num_rows): + source_row = matching_source_rows.slice(index, 1) + + target_filter = functools.reduce(operator.and_, [pc.field(col) == source_row.column(col)[0].as_py() for col in join_cols]) + + matching_target_row = target_table.filter(target_filter) + + if matching_target_row.num_rows > 0: + needs_update = False + + for non_key_col in non_key_cols: + source_value = source_row.column(non_key_col)[0].as_py() + target_value = matching_target_row.column(non_key_col)[0].as_py() + + if source_value != target_value: + needs_update = True + break + + if needs_update: + rows_to_update.append(source_row) + + if rows_to_update: + rows_to_update_table = pa.concat_tables(rows_to_update) + else: + rows_to_update_table = pa.Table.from_arrays([], names=source_table.column_names) + + common_columns = set(source_table.column_names).intersection(set(target_table.column_names)) + rows_to_update_table = rows_to_update_table.select(list(common_columns)) + + return rows_to_update_table + + +def get_rows_to_insert(source_table: pa.Table, target_table: pa.Table, join_cols: list[str]) -> pa.Table: + source_filter_expr = pc.scalar(True) + + for col in join_cols: + target_values = target_table.column(col).to_pylist() + expr = pc.field(col).isin(target_values) + + if source_filter_expr is None: + source_filter_expr = expr + else: + source_filter_expr = source_filter_expr & expr + + non_matching_expr = ~source_filter_expr + + source_columns = set(source_table.column_names) + target_columns = set(target_table.column_names) + + common_columns = source_columns.intersection(target_columns) + + non_matching_rows = source_table.filter(non_matching_expr).select(common_columns) + + return non_matching_rows diff --git a/pyproject.toml b/pyproject.toml index 032722cd9a..63d231881a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -97,6 +97,7 @@ pytest-mock = "3.14.0" pyspark = "3.5.3" cython = "3.0.12" deptry = ">=0.14,<0.24" +datafusion = "^44.0.0" docutils = "!=0.21.post1" # https://github.com/python-poetry/poetry/issues/9248#issuecomment-2026240520 [tool.poetry.group.docs.dependencies] @@ -504,5 +505,9 @@ ignore_missing_imports = true module = "polars.*" ignore_missing_imports = true +[[tool.mypy.overrides]] +module = "datafusion.*" +ignore_missing_imports = true + [tool.coverage.run] source = ['pyiceberg/'] diff --git a/tests/table/test_upsert.py b/tests/table/test_upsert.py new file mode 100644 index 0000000000..8843b13a76 --- /dev/null +++ b/tests/table/test_upsert.py @@ -0,0 +1,309 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import pytest +from datafusion import SessionContext +from pyarrow import Table as pa_table + +from pyiceberg.table import UpsertResult +from tests.catalog.test_base import InMemoryCatalog, Table + +_TEST_NAMESPACE = "test_ns" + + +def show_iceberg_table(table: Table, ctx: SessionContext) -> None: + import pyarrow.dataset as ds + + table_name = "target" + if ctx.table_exist(table_name): + ctx.deregister_table(table_name) + ctx.register_dataset(table_name, ds.dataset(table.scan().to_arrow())) + ctx.sql(f"SELECT * FROM {table_name} limit 5").show() + + +def show_df(df: pa_table, ctx: SessionContext) -> None: + import pyarrow.dataset as ds + + ctx.register_dataset("df", ds.dataset(df)) + ctx.sql("select * from df limit 10").show() + + +def gen_source_dataset(start_row: int, end_row: int, composite_key: bool, add_dup: bool, ctx: SessionContext) -> pa_table: + additional_columns = ", t.order_id + 1000 as order_line_id" if composite_key else "" + + dup_row = ( + f""" + UNION ALL + ( + SELECT t.order_id {additional_columns} + , date '2021-01-01' as order_date, 'B' as order_type + from t + limit 1 + ) + """ + if add_dup + else "" + ) + + sql = f""" + with t as (SELECT unnest(range({start_row},{end_row+1})) as order_id) + SELECT t.order_id {additional_columns} + , date '2021-01-01' as order_date, 'B' as order_type + from t + {dup_row} + """ + + df = ctx.sql(sql).to_arrow_table() + + return df + + +def gen_target_iceberg_table( + start_row: int, end_row: int, composite_key: bool, ctx: SessionContext, catalog: InMemoryCatalog, namespace: str +) -> Table: + additional_columns = ", t.order_id + 1000 as order_line_id" if composite_key else "" + + df = ctx.sql(f""" + with t as (SELECT unnest(range({start_row},{end_row+1})) as order_id) + SELECT t.order_id {additional_columns} + , date '2021-01-01' as order_date, 'A' as order_type + from t + """).to_arrow_table() + + table = catalog.create_table(f"{_TEST_NAMESPACE}.target", df.schema) + + table.append(df) + + return table + + +def assert_upsert_result(res: UpsertResult, expected_updated: int, expected_inserted: int) -> None: + assert res.rows_updated == expected_updated, f"rows updated should be {expected_updated}, but got {res.rows_updated}" + assert res.rows_inserted == expected_inserted, f"rows inserted should be {expected_inserted}, but got {res.rows_inserted}" + + +@pytest.fixture(scope="session") +def catalog_conn() -> InMemoryCatalog: + catalog = InMemoryCatalog("test") + catalog.create_namespace(namespace=_TEST_NAMESPACE) + yield catalog + + +@pytest.mark.parametrize( + "join_cols, src_start_row, src_end_row, target_start_row, target_end_row, when_matched_update_all, when_not_matched_insert_all, expected_updated, expected_inserted", + [ + (["order_id"], 1, 2, 2, 3, True, True, 1, 1), # single row + (["order_id"], 5001, 15000, 1, 10000, True, True, 5000, 5000), # 10k rows + (["order_id"], 501, 1500, 1, 1000, True, False, 500, 0), # update only + (["order_id"], 501, 1500, 1, 1000, False, True, 0, 500), # insert only + ], +) +def test_merge_rows( + catalog_conn: InMemoryCatalog, + join_cols: list[str], + src_start_row: int, + src_end_row: int, + target_start_row: int, + target_end_row: int, + when_matched_update_all: bool, + when_not_matched_insert_all: bool, + expected_updated: int, + expected_inserted: int, +) -> None: + ctx = SessionContext() + + catalog = catalog_conn + + source_df = gen_source_dataset(src_start_row, src_end_row, False, False, ctx) + ice_table = gen_target_iceberg_table(target_start_row, target_end_row, False, ctx, catalog, _TEST_NAMESPACE) + res = ice_table.upsert( + df=source_df, + join_cols=join_cols, + when_matched_update_all=when_matched_update_all, + when_not_matched_insert_all=when_not_matched_insert_all, + ) + + assert_upsert_result(res, expected_updated, expected_inserted) + + catalog.drop_table(f"{_TEST_NAMESPACE}.target") + + +def test_merge_scenario_skip_upd_row(catalog_conn: InMemoryCatalog) -> None: + """ + tests a single insert and update; skips a row that does not need to be updated + """ + + ctx = SessionContext() + + df = ctx.sql(""" + select 1 as order_id, date '2021-01-01' as order_date, 'A' as order_type + union all + select 2 as order_id, date '2021-01-01' as order_date, 'A' as order_type + """).to_arrow_table() + + catalog = catalog_conn + table = catalog.create_table(f"{_TEST_NAMESPACE}.target", df.schema) + + table.append(df) + + source_df = ctx.sql(""" + select 1 as order_id, date '2021-01-01' as order_date, 'A' as order_type + union all + select 2 as order_id, date '2021-01-01' as order_date, 'B' as order_type + union all + select 3 as order_id, date '2021-01-01' as order_date, 'A' as order_type + """).to_arrow_table() + + res = table.upsert(df=source_df, join_cols=["order_id"]) + + expected_updated = 1 + expected_inserted = 1 + + assert_upsert_result(res, expected_updated, expected_inserted) + + catalog.drop_table(f"{_TEST_NAMESPACE}.target") + + +def test_merge_scenario_date_as_key(catalog_conn: InMemoryCatalog) -> None: + """ + tests a single insert and update; primary key is a date column + """ + + ctx = SessionContext() + + df = ctx.sql(""" + select date '2021-01-01' as order_date, 'A' as order_type + union all + select date '2021-01-02' as order_date, 'A' as order_type + """).to_arrow_table() + + catalog = catalog_conn + table = catalog.create_table(f"{_TEST_NAMESPACE}.target", df.schema) + + table.append(df) + + source_df = ctx.sql(""" + select date '2021-01-01' as order_date, 'A' as order_type + union all + select date '2021-01-02' as order_date, 'B' as order_type + union all + select date '2021-01-03' as order_date, 'A' as order_type + """).to_arrow_table() + + res = table.upsert(df=source_df, join_cols=["order_date"]) + + expected_updated = 1 + expected_inserted = 1 + + assert_upsert_result(res, expected_updated, expected_inserted) + + catalog.drop_table(f"{_TEST_NAMESPACE}.target") + + +def test_merge_scenario_string_as_key(catalog_conn: InMemoryCatalog) -> None: + """ + tests a single insert and update; primary key is a string column + """ + + ctx = SessionContext() + + df = ctx.sql(""" + select 'abc' as order_id, 'A' as order_type + union all + select 'def' as order_id, 'A' as order_type + """).to_arrow_table() + + catalog = catalog_conn + table = catalog.create_table(f"{_TEST_NAMESPACE}.target", df.schema) + + table.append(df) + + source_df = ctx.sql(""" + select 'abc' as order_id, 'A' as order_type + union all + select 'def' as order_id, 'B' as order_type + union all + select 'ghi' as order_id, 'A' as order_type + """).to_arrow_table() + + res = table.upsert(df=source_df, join_cols=["order_id"]) + + expected_updated = 1 + expected_inserted = 1 + + assert_upsert_result(res, expected_updated, expected_inserted) + + catalog.drop_table(f"{_TEST_NAMESPACE}.target") + + +def test_merge_scenario_composite_key(catalog_conn: InMemoryCatalog) -> None: + """ + tests merging 200 rows with a composite key + """ + + ctx = SessionContext() + + catalog = catalog_conn + table = gen_target_iceberg_table(1, 200, True, ctx, catalog, _TEST_NAMESPACE) + source_df = gen_source_dataset(101, 300, True, False, ctx) + + res = table.upsert(df=source_df, join_cols=["order_id", "order_line_id"]) + + expected_updated = 100 + expected_inserted = 100 + + assert_upsert_result(res, expected_updated, expected_inserted) + + catalog.drop_table(f"{_TEST_NAMESPACE}.target") + + +def test_merge_source_dups(catalog_conn: InMemoryCatalog) -> None: + """ + tests duplicate rows in source + """ + + ctx = SessionContext() + + catalog = catalog_conn + table = gen_target_iceberg_table(1, 10, False, ctx, catalog, _TEST_NAMESPACE) + source_df = gen_source_dataset(5, 15, False, True, ctx) + + with pytest.raises(Exception, match="Duplicate rows found in source dataset based on the key columns. No upsert executed"): + table.upsert(df=source_df, join_cols=["order_id"]) + + catalog.drop_table(f"{_TEST_NAMESPACE}.target") + + +def test_key_cols_misaligned(catalog_conn: InMemoryCatalog) -> None: + """ + tests join columns missing from one of the tables + """ + + ctx = SessionContext() + + df = ctx.sql("select 1 as order_id, date '2021-01-01' as order_date, 'A' as order_type").to_arrow_table() + + catalog = catalog_conn + table = catalog.create_table(f"{_TEST_NAMESPACE}.target", df.schema) + + table.append(df) + + df_src = ctx.sql("select 1 as item_id, date '2021-05-01' as order_date, 'B' as order_type").to_arrow_table() + + with pytest.raises(Exception, match=r"""Field ".*" does not exist in schema"""): + table.upsert(df=df_src, join_cols=["order_id"]) + + catalog.drop_table(f"{_TEST_NAMESPACE}.target")