-
Notifications
You must be signed in to change notification settings - Fork 209
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature: MERGE/Upsert Support #1534
base: main
Are you sure you want to change the base?
Conversation
@kevinjqliu - i'm doing this work on behalf of my company and when i ran my tests, i used a standard python virtual environment venv; i haven't figured out quite yet how to get poetry to work inside my companies firewall. So, not sure if those are errors I can address or if someone else can pitch in here. |
@mattmartin14, what's going on man!? Thanks for working on this and most impressively thanks for the comprehensive description. Out of curiosity, did you discuss your approach with anyone before putting this together? This is good but a few flags for OSS contributions to lower the upfront back and forth:
Contributing to OSS has a different focus than internal code, so hopefully these help. This does look well thought out in terms of implementation, but performance should be second or third try in favor of having code history that everyone in the community can wrap their heads around. I'd suggest to get these addressed before Fokko and Kevin scan it. I'll be happy to do a quick glance once the tests are running and there's some consensus around datafusion. PR number one yeah! |
Thanks @bitsondatadev for all this great feedback. I'll get working on your suggestions and push an update next week and will address all your concerns. |
Thanks @mattmartin14 for the PR! And thanks @bitsondatadev on the tips on working in OSS. I certainly had to learn a lot of these over the years. A couple things I think we can address first.
This has been a much anticipated and asked feature in the community. Issue #402 has been tracking it with many eyes on it. I think we still need to figure out the best approach to support this feature. Like you mentioned in the description, As we’re building out more of more engine-like features, it becomes harder to support more complex and data-intensive workloads such as MERGE INTO. We have been able to use pyarrow for query processing but it has its own limitations. For more compute intensive workloads, such as Bucket and Truncate transform, we were able to leverage rust (iceberg-rust) to handle the computation. Looking at #402, I don’t see any concrete plans on how we can support MERGE INTO. I’ve added this as an agenda on the monthly pyiceberg sync and will post the update. Please join us if you have time!
I’m very interested in exploring datafusion and ways we can leverage it for this project. As I mentioned above, we currently use pyarrow to handle most of the compute. It’ll be interesting to evaluate datafusion as an alternative. Datafusion has its own ecosystem of expression api, dataframe api, and runtime. All of which are good complements to pyiceberg. It has integrations with the rust side as well, something I have started exploring in apache/iceberg-rust#865 That said, I think we need a wider discussion and alignment on how to integrate with datafusion. It’s a good time to start thinking about it! I’ve added this as another discussion item on the monthly sync.
Compute intensive workloads are generally a bottleneck in python. I am excited for future pyiceberg <> iceberg-rust integration where we can leverage rust to perform those computations.
This is an interesting observation and I think I’ve seen someone else run into this issue before. We’d want to address this separately. This is something we might want to explore using datafusion’s expression api to replace our own parser. |
@kevinjqliu @Fokko @bitsondatadev - the issues should be resolved. I got poetry working in my company's firewall; i've also removed the dead code and added the license headers to each file. please take a look |
also - i added datafusion to the poetry toml file and lock and it appears that you all need to resolve the conflict here, as it's not letting me. |
Also @kevinjqliu - To address your question on datafusion. When I looked into this feature, I explored these 3 options for an arrow processing engine:
I ultimately decided that datafusion would make the most sense, given these things it had going:
Hope this helps on how I arrived at that conclusion. Just using native pyarrow to try and process the data would be a very large uphill battle as we would effectively have to build our own data processing engine with it e.g. hash joins, sorting, optimizations, etc. I figured it does not make sense to reinvent the wheel and instead use an engine that is already out there (datafusion) and put it to good use. I took a look at the attachment you posted for any upcoming meetings for the pyiceberg sync, but did not see any 2025 meetings listed. I'd be glad to attend to discuss this further, if needed. Thanks, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for working on this @mattmartin14. There is some work to be done here, mostly because we pull the table into memory, and then perform the operation, which defeats the purpose of Iceberg because we don't use the statistics to optimize the query. I left a few comments
|
||
#register both source and target tables so we can find the deltas to update/append | ||
ctx.register_dataset(source_table_name, ds.dataset(df)) | ||
ctx.register_dataset(target_table_name, ds.dataset(self.scan().to_arrow())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is something we want to avoid. This will materialize the whole table into memory, which does not scale well.
Instead, I think it makes much more sense to re-use the existing delete(delete_filter: BooleanExpresssion)
to delete existing rows. This will only be one of the files relevant to the query, and avoid a lot of unnecessary compute and IO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @Fokko - from our meeting yesterday, i kind of have to load all this into a datafusion table in order to identify the deltas. I'm not sure of any other way around this; if you have some examples, then i'd be glad to take a look, but i think what I'm doing is in the best spirit of what a MERGE command should do, considering it has to check on primary key matching rows as well as identify what rows are actually different based on the non-key columns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I jumped a bit to conclusions in my previous comment. Allow me to elaborate.
Even with the to_arrow()
that we have above, we pull in the whole table. Based on the Iceberg statistics, we can jump over data that's not needed at all. Instead, I would suggest constructing the predicate, and then pull in the data:
unique_keys = df.select(join_cols).group_by(join_cols).aggregate([])
if len(join_cols) == 1:
pred = In(join_cols[0], unique_keys[0].to_pylist())
else:
pred = Or(*[
And(*[
EqualTo(col, row[col])
for col in join_cols
])
for row in unique_keys.to_pylist()
])
ctx.register_dataset(target_table_name, ds.dataset(self.scan(row_filter=pred).to_arrow()))
This way we only load in Parquet files that are relevant to the set of keys that we're looking for.
values = [row[join_col] for row in update_recs.to_pylist()] | ||
# if strings are in the filter, we encapsulate with tick marks | ||
formatted_values = [f"'{value}'" if col_type == 'string' else str(value) for value in values] | ||
overwrite_filter = f"{join_col} IN ({', '.join(formatted_values)})" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see that we're constructing the overwrite filter as a string. The overwrite(..)
method will take this string, and parse it into a BooleanExpression
. How about constructing a BooleanExpression
right away:
overwrite_filter = f"{join_col} IN ({', '.join(formatted_values)})" | |
overwrite_filter = In(join_col, values) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain more on this? I thought i was building my overwrite filter expression correctly, given I have to provide a list of keys or composite key columns to scan for.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Happy to elaborate. The SQL based syntax is supported, mostly to make it easier for humans. If you follow the predicate in the code, first at txn.overwrite(update_recs, overwrite_filter)
, and then we come to the .delete()
method:
if isinstance(delete_filter, str):
delete_filter = _parse_row_filter(delete_filter)
What we do here, is we take the string, and turn it into a BooleanExpression
, as an example:
This converts the string into an In('col', [1,2,3])
method. Since we don't need to use SQL here, we can optimize this by pushing down In(..)
directly, and avoid conversion back and forth.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How would this work for a situation where I have a composite key for my primary key?
As an example, a table below has primary key on cust_id and line_item:
cust_id line_item cost
1 1001 30.29
2 2001 20.99
my overwrite_filter from what i understand would have to be:
(cust_id = 1 and line_item = 1001) or (cust_id = 2 and line_item = 2001)...and so forth as more rows are needing to be compared
would your method above for the _parse_row_filter work for composite keys? or for only single key tables?
|
||
Args: | ||
df: The input dataframe to merge with the table's data. | ||
join_cols: The columns to join on. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can also do this later, but I think we should make the join_cols
optional. Instead, we want to consider rows equal that have the same identifier-field-ids, see: https://iceberg.apache.org/spec/?column-projection#identifier-field-ids
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the join columns need to be provided in order to run a proper merge statement; from what i've seen, you cannot indicate a primary key on an iceberg table; thus the user would need to supply what you should match rows on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The primary-key equivalent of Iceberg is the identifier fields, so we could also get it from the table like this:
if join_cols is None:
identifier_field_ids = self.schema().identifier_field_ids
if len(identifier_field_ids) > 0:
join_cols = [
self.schema().find_column_name(identifier_field_id)
for identifier_field_id in identifier_field_ids
]
else:
raise ValueError("The table doesn't have identifier fields, please set join_cols.")
We can also do this in a follow-up PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so, what if a user had constructed an iceberg table and not specified the "identifier fields". Additionally, the incoming dataset to merge is not int he iceberg format, so it would not have identifier fields avaialble yet to compare.
Does that makes sense?
Hi All, I think if we tackle the basic merge/upsert pattern (when matched update all, when not matched insert all), that would cover 90% of most merge use cases. For things that require a more involved upsert using multiple matched predicates, we should direct the users to use spark sql, since that is already baked in to the platform. If anyone disagrees with that directional statement, please let me know. |
@@ -1064,6 +1064,119 @@ def name_mapping(self) -> Optional[NameMapping]: | |||
"""Return the table's field-id NameMapping.""" | |||
return self.metadata.name_mapping() | |||
|
|||
def merge_rows(self, df: pa.Table, join_cols: list |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you of calling this merge
instead? I think this aligns better with append
and overwrite
.
def merge_rows(self, df: pa.Table, join_cols: list | |
def merge(self, df: pa.Table, join_cols: list |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Fokko i chose "merge_rows" because when i looked through the table init.py file, i saw a lot of areas that had "merge" in it already e.g. "manifest_merge_enabled", "merge_append()", etc.
I figured it would be better to be more verbose in this case to say "merge_rows" vs. just "merge" as that might confuse others later when the look through the code.
Do you agree with this or do you still think we should change the function name to "merge" ?
Hi,
This is my first PR to the pyiceberg project, so I hope it is well received and I’m open to any feedback. This feature has been long asked by the pyiceberg community to be able to perform an “upsert” on an iceberg table a.k.a. MERGE command. There is a lot to unpack on this PR, so I’ll start with some high level items and go into more detail.
For basic terminology, an upsert/merge is where you take a new dataset and merge it to the target Iceberg table by doing an update on the target table for rows that have changed and an insert for new rows. This is all performed atomically as 1 transaction, meaning both the update and the insert succeed together, or they fail; this ensures the table is not left in an unknown state if one of the actions were able to succeed but the other errored out.
Over the last decade, the ANSI SQL MERGE command has evolved a lot to handle more than just update existing rows, insert new rows. This PR aims to just cover the BASIC upsert pattern; it has been architected to allow more of the ANSI MERGE command features over time (via the merge_options paramater) to be included if others would like to contribute, such as “WHEN NOT MATCHED BY SOURCE THEN DELETE”.
In order to efficiently process an upsert on the Iceberg table, it requires an engine to detect what rows have changed and what rows are new. The engine I have chosen for this new feature is datafusion, which is a rust based high performance pyarrow data processing engine; I realize this introduces a new dependency for the pyiceberg project, which i've flagged as optional in the pyproject.toml file. My rational on choosing datafusion is this is also the same library that is being used for the iceberg-rust project. Thus, down the road, remediation of this merge command to the more modern iceberg-rust implementation should be minimal.
As far as test coverage is concerned, my unit tests cover performing upserts on both single key and composite key Iceberg tables (meaning the table has more than 1 key field it needs to join on). For the performance testing, single key tables scale just fine. My unit tests does a 10k update/insert for one of the series of tets, but I have on my local workstation (Mac M2 pro) ran a test of 500k rows of updates/inserts on a single key join and it scales just fine.
Where I am hitting a wall on performance and am needing other’s help here is when you want to use a composite key. The composite key code builds an overwrite filter, and once that filter gets too lengthy (in my testing more than 200 rows), the visitor “OR” function in pyiceberg hits a recursion depth error. I took a hard look at the visitor code in the expressions folder and I was hesitant to try and change any of those functions due to me not having a clear understanding of how they work, and this is where I need other’s help. If you want to see this scaling cocern, simply update the paramater on my test_merge_scenario_3_composite_key def to generate a target table with 1000 rows and a source table with 2000 rows and you will see the error surface. I don't think its smart nor pratical to try and change the python recursion depth default limit because we will still hit the wall at some point unless the visitor "OR" function gets reworked to avoid recursion.
Some other ideas i've kicked around to try and mitigate this, is to have the merge_rows code temporarily modify the source dataframe and the target iceberg table to build a concatenated single key of all the composite keys to join on e.g. "col1_col2_col[n]...". But that would require modifying the entire iceberg target table with a full overwrite, which then defeats the purpose of an upsert to be able to run incremental updates on a table not have to overwrite the entire table.
I think this PR is a good first step to finally realizing MERGE/upsert support for pyiceberg and gets a piercing in the armor. Again, I’m welcome to other’s feedback on this topic and look forward to partnering with you all on this journey.
Thanks,
Matt Martin