This has been a long time coming. I work with Duckdb nearly everyday to test and process small and medium datasets. I also work a lot with Apache Iceberg. Where these 2 generally do not jive though is when I need to run incremental merges to an Iceberg table a.k.a an “upsert”.
Upsert - An atomic operation on a table to update and insert date in a single transaction. Either all succeeds or all fails. ANSI SQL’s implementation of upsert is the MERGE command.
Up until recently, my only option was Spark SQL’s merge command, or a lot of extra elbow grease and coding on the pyiceberg side.
Well lo-and-behold, we have finally arrived at a new solution to upsert data to Iceberg without spark. I recently concluded my first PR to an open source project 🎉 and it was to bring upsert capabilities to pyiceberg. Here are some interesting links on the PR:
final PR that a maintainer of the project had to run due to infrastructure issues on my end
This PR will get rolled into PyIceberg 0.9.0, which should be coming soon. But if you can’t wait and want it now, you can run this pip command:
If you peruse through the PR, you will see some major shifts along the way. When I originally wrote it, I used Apache DataFusion as the processing engine to identify, based on the incoming dataframe, what rows had changed and what rows needed to be appended to the target iceberg table. As the community worked through the PR though, we eventually coalesced around just using pyarrow to identify the deltas and new records, since pyarrow was already heavily integrated into the pyiceberg project. This shift lessened the dependency on another library for pyiceberg to run, which TBH I think worked out real well.
Down the road, the community might consider biting the DataFusion apple though to help boost performance if some edge cases pop up that warrant it.
Enough of the Background, Let’s Get Crackin’
Alright, so time to actually see this new functionality (upsert) in action. To do this, I will generate first a duckdb dataset and make it into an iceberg table. To do that, we will use the following code:
Now that we have our Iceberg table, let’s take a quick peak on how it looks with a print statement:
Build a new dataset to simulate incoming updates
Now, we will build another simple arrow dataframe leveraging duckdb.
Pro Tip - Why am I using arrow dataframes for all this stuff, you might ask? That’s because that’s how the data gets worked through under the hood for pyiceberg. This is also prevalent in many other data processing engines, as mentioned in this previous article.
Time To Upsert
And now the moment we have been waiting for. We have our iceberg table, we have our new incoming source data. What do we think should happen here after the upsert is ran?
We should see order id 2 change its price from $34.99 to $27.81
We should see order id 3 get appended to the Iceberg table
To run the upsert, we simply run the command below. The second argument is a list of column(s) we want the 2 datasets to join on:
I think we have hit something special here. We have gotten something as complex as an upsert down to just 1 line of code abstraction. 🏆🏆🏆
And now let’s take a look at our Iceberg table to see if the changes we expected actually worked:
Winner Winner Chicken Dinner!
Conclusion
This article demonstrated with a simple example how one can leverage duckdb and pyiceberg to upsert data. This is an alternative to spark and should work well for small and medium datasets.
If you see the need to enhance this function in pyiceberg, feel free to submit a PR to the python-iceberg project. The contributors of the project are great to work with, are very smart, and you will learn a lot from them.
Thanks,
Matt