Delta Lake with Polars: A Hands-On Walkthrough
Read, write, merge, and time-travel Delta Lake tables from Polars without Spark. Includes a MinIO setup so you can run a local S3-backed lakehouse on your laptop.
TL;DR. Polars speaks Delta natively. pl.read_delta and pl.scan_delta for reads; DataFrame.write_delta and LazyFrame.sink_delta for writes — including mode="merge" for upserts. Drop into the deltalake package directly for OPTIMIZE, VACUUM, and other maintenance. Point all of it at S3, MinIO, or a local folder by changing one argument. This post walks through the operations you’ll actually use, and ends with a docker-compose for running MinIO so you can have an S3-backed Delta lakehouse on your laptop in five minutes.
Setup
Two packages, no JVM:
pip install "polars>=1.20" "deltalake>=0.20"
polars ships the read_delta / write_delta family. deltalake is the underlying Rust library — you’ll want it directly for OPTIMIZE, VACUUM, and anything else that touches the transaction log. Polars depends on it transitively, so installing it explicitly just pins a version you can rely on.
Write your first table
import polars as pl
df = pl.DataFrame({
"order_id": [1, 2, 3],
"customer": ["Alice", "Bob", "Carol"],
"total": [120.0, 75.5, 240.0],
})
df.write_delta("./lake/orders")
Look in ./lake/orders/ and you’ll see one or more .parquet files plus a _delta_log/ subfolder containing 00000000000000000000.json. That JSON is the entire transaction history of your table so far — open it, it’s readable.
If your input is too big to materialise, use sink_delta on a LazyFrame instead:
(
pl.scan_csv("orders_dump.csv")
.filter(pl.col("status") == "paid")
.sink_delta("./lake/orders", mode="overwrite")
)
sink_delta streams: it never loads the full result into memory.
Append, overwrite, and schema evolution
The three modes you’ll use 99% of the time:
# Add new rows
new_rows.write_delta("./lake/orders", mode="append")
# Replace the table
df.write_delta("./lake/orders", mode="overwrite")
# Add rows AND allow new columns to be added
new_rows.write_delta(
"./lake/orders",
mode="append",
delta_write_options={"schema_mode": "merge"},
)
The third form is the one to know. By default, Delta refuses appends whose schema doesn’t match — which is what you want in production. When the schema should evolve (a new column landed in the source), you opt in with schema_mode="merge". The previous rows simply get null for the new column.
Flowfile’s shared write helper does almost exactly this — see shared/delta_utils.py:
def write_delta(df, output_path, mode="overwrite") -> bool:
delta_write_options = {}
if mode == "overwrite":
delta_write_options["schema_mode"] = "overwrite"
elif mode == "append":
delta_write_options["schema_mode"] = "merge"
if isinstance(df, pl.LazyFrame):
df.sink_delta(output_path, mode=mode,
delta_write_options=delta_write_options)
else:
df.write_delta(output_path, mode=mode,
delta_write_options=delta_write_options)
return True
Two things worth stealing from that snippet: prefer sink_delta whenever the input is a LazyFrame, and pick the schema_mode from the write mode so callers don’t have to think about it.
Read, scan, and time-travel
Eager read returns a DataFrame. Lazy scan returns a LazyFrame that pushes filters and projections down into the Delta scan — meaning Polars reads only the row groups and columns you need:
# Eager — loads everything
orders = pl.read_delta("./lake/orders")
# Lazy — Polars pushes the filter into the scan
big = (
pl.scan_delta("./lake/orders")
.filter(pl.col("total") > 100)
.select("customer", "total")
.collect()
)
Swap .collect() for .explain() and the printed query plan shows the filter and projection living inside the Delta scan, not above it. That’s the difference between reading a 50 GB table and reading 200 MB of it.
Time travel is one extra argument:
from datetime import datetime, timezone
# By version number
v0 = pl.read_delta("./lake/orders", version=0)
# By timestamp (datetime — must be timezone-aware)
yesterday = pl.read_delta(
"./lake/orders",
version=datetime(2026, 4, 27, 12, 0, tzinfo=timezone.utc),
)
The first time you use this to debug a downstream report that broke (“which write changed this row?”) is the moment Delta clicks.
MERGE: upserts without rewriting the table
Polars exposes MERGE directly through write_delta(mode="merge", ...). The pattern looks like SQL MERGE:
source = pl.DataFrame({
"order_id": [2, 4],
"customer": ["Bob", "Dan"],
"total": [80.0, 410.0],
})
(
source.write_delta(
"./lake/orders",
mode="merge",
delta_merge_options={
"predicate": "s.order_id = t.order_id",
"source_alias": "s",
"target_alias": "t",
},
)
.when_matched_update_all()
.when_not_matched_insert_all()
.execute()
)
write_delta(mode="merge") returns a TableMerger from the underlying deltalake package — that’s the object you’re chaining .when_matched_update_all() and .when_not_matched_insert_all() on. .execute() commits the whole thing as one atomic Delta transaction. Order 2’s total is updated from 75.5 to 80.0; order 4 is inserted as a new row; orders 1 and 3 are untouched.
LazyFrame.sink_delta accepts the same mode="merge" and delta_merge_options, so you can stream a larger-than-RAM source into a merge without ever materialising it.
The TableMerger also supports .when_matched_delete() and per-column updates if you need finer control. If you’d rather call the merger directly — say, because you’re already holding a DeltaTable for other reasons — DeltaTable("./lake/orders").merge(source=source.to_arrow(), ...) returns the same object. Flowfile wraps these patterns into one merge_into_delta helper with merge_mode in {"upsert", "update", "delete"} — useful if you find yourself writing the same boilerplate twice.
Housekeeping: OPTIMIZE and VACUUM
Streaming or hourly writes produce many small Parquet files. Two operations clean that up:
from deltalake import DeltaTable
dt = DeltaTable("./lake/orders")
# Compact small files into ~1 GB chunks
dt.optimize.compact()
# (Optional) physically sort by a frequently-filtered column
dt.optimize.z_order(["customer"])
# Delete tombstoned files older than 7 days
dt.vacuum(retention_hours=168, dry_run=False)
You don’t run these on every write. You run them on a schedule — daily compact, weekly vacuum is a reasonable starting point. Both are pure transaction-log operations: readers querying during a compact will simply see the pre-compact version until the new commit lands.
A useful diagnostic: “how big is this table really?” reads the active file list from the log rather than scanning the directory, which correctly excludes tombstones from old versions:
dt = DeltaTable("./lake/orders")
size_bytes = sum(
v for v in dt.get_add_actions(flatten=True).column("size_bytes").to_pylist()
if v is not None
)
The directory size on disk can be much bigger than this number — the difference is exactly what vacuum will free.
How Flowfile uses all of this
The snippets above aren’t toy examples — they’re pulled from real Flowfile code. The catalog stores every table as Delta on disk; everything else builds on the same primitives. Three places where Delta features carry real weight:
Virtual tables and the staleness check. A flow output can live in the catalog as a lazy Polars plan rather than a materialised table. When you read a virtual table, Flowfile collects the plan on the fly — but only if the upstream Delta sources haven’t changed since the plan was serialised. The check is a single DeltaTable(path, without_files=True).version() call per source, compared against the recorded version. If anything moved, the catalog falls back to the materialised snapshot. The full implementation lives in flowfile_core/catalog/delta_utils.py::check_source_versions_current — it’s about 20 lines, including the comment that calls out the TOCTOU race honestly.
Cheap previews. Clicking a catalog table shouldn’t load 50 GB. The preview reads the first N rows via to_pyarrow_dataset().head(n_rows), which uses the Delta log to pick which files to touch and Arrow’s lazy dataset to stop early:
def read_delta_preview(path: str, n_rows: int = 100) -> pa.Table:
dt = DeltaTable(str(path))
dataset = dt.to_pyarrow_dataset() # lazy, no data loaded yet
return dataset.head(n_rows)
Format-aware reads. Older flows wrote single Parquet files; newer ones write Delta directories. The catalog still has to read both. The dispatch is a one-liner each — is_delta_table checks for a _delta_log/ subfolder, is_legacy_parquet checks for a .parquet extension — and the rest of the code stays format-agnostic.
You don’t need to know any of this to use Flowfile. But if you’re building something on top of Delta yourself, those three patterns (version-pinned lazy reads, log-based previews, format-aware dispatch) come up immediately.
Run a local lakehouse with MinIO
Local Delta on a folder is great for development. The minute you want to share tables with someone else, or run two writers at once, you want object storage. MinIO is an S3-compatible server you can run in Docker. Here’s a minimal docker-compose.yml:
services:
minio:
image: minio/minio:latest
ports:
- "9000:9000" # S3 API
- "9001:9001" # web console
environment:
MINIO_ROOT_USER: minioadmin
MINIO_ROOT_PASSWORD: minioadmin
command: server /data --console-address ":9001"
volumes:
- ./minio-data:/data
createbucket:
image: minio/mc:latest
depends_on: [minio]
entrypoint: >
/bin/sh -c "
until (mc alias set local http://minio:9000 minioadmin minioadmin) do sleep 1; done;
mc mb -p local/lakehouse;
mc anonymous set download local/lakehouse;
exit 0;
"
docker compose up -d and you have S3 on http://localhost:9000, a web console on http://localhost:9001, and a bucket called lakehouse ready to use.
Now point Polars at it:
import polars as pl
storage_options = {
"AWS_ACCESS_KEY_ID": "minioadmin",
"AWS_SECRET_ACCESS_KEY": "minioadmin",
"AWS_ENDPOINT_URL": "http://localhost:9000",
"AWS_ALLOW_HTTP": "true",
"AWS_REGION": "us-east-1",
}
pl.DataFrame({"x": [1, 2, 3]}).write_delta(
"s3://lakehouse/orders",
storage_options=storage_options,
)
print(pl.read_delta("s3://lakehouse/orders", storage_options=storage_options))
Same dict works for scan_delta, sink_delta, and any DeltaTable(...) call — deltalake accepts identical storage_options. The only thing that changes between MinIO and real AWS S3 is dropping the endpoint URL and the AWS_ALLOW_HTTP flag.
If you’d rather not retype the dict in every script, Flowfile’s Cloud Storage connections save the credentials once and let any Cloud Storage Reader / Writer node reference them by name. The MinIO custom-endpoint and “allow HTTP” toggles are first-class fields, because that’s by far the most common local setup.
Standing on a lot of shoulders
Almost everything in this post is enabled by people most readers will never meet, working on libraries they don’t get paid enough for. Polars (Ritchie Vink and team) added native Delta as a first-class feature, not an afterthought. delta-rs — the Rust implementation of the Delta protocol — is what makes “Delta without Spark” actually work. Apache Arrow is the columnar format underneath all of it. MinIO is the reason this post has a local-lakehouse section at all.
A concrete illustration of how this ecosystem moves: in March 2026 a contributor landed read support for Delta deletion vectors in scan_delta. Deletion vectors are how modern Delta tables represent row-level deletes without rewriting whole files; before that PR, any table using them would silently return wrong results from Polars. After it merged, the same pl.scan_delta(...) call you’ve been writing throughout this post just started handling them correctly. No version bump in your code, no migration. One PR, one merge, and the entire Polars + Delta surface got better at the same time.
That’s the flywheel — a small group of maintainers and a long tail of contributors, building primitives that compose. Flowfile sits on top of all of it; so does this post.
Closing
Delta Lake without Spark used to be a hack. It isn’t anymore — and “isn’t anymore” is not a passive thing that happened, it’s the cumulative output of people deciding the answer should be “yes, you can do this from any language” instead of “use the JVM.” deltalake plus Polars now covers everything most teams actually do with a lakehouse: append, overwrite, merge, time-travel, optimize, vacuum. Point it at a folder for development, MinIO for shared local work, S3 for production — same code.
The whole surface fits in a single Python file. That’s a different relationship with your data layer than the one most of us inherited, and worth a thank-you to the people who made it possible.
Related reads: Demystifying Delta Lake for what’s happening under the hood, Polars vs Pandas in 2026 for why Polars is the natural Delta companion, and Why Your Data Should Stay on Your Laptop for the local-first angle on all of this.
Frequently asked questions
- Can Polars read and write Delta tables natively?
- Yes. Polars ships with `read_delta`, `scan_delta`, `write_delta`, and `sink_delta` built in. They wrap the Rust-based `deltalake` crate, so you get full Delta features without Spark or a JVM.
- What's the difference between scan_delta and read_delta?
- `scan_delta` returns a LazyFrame — Polars pushes filters and projections down into the Delta scan, so only the columns and row groups you need are read. `read_delta` is eager: it loads the full table into memory immediately. Prefer `scan_delta` for anything bigger than a toy dataset.
- How do I do an UPSERT (MERGE) into a Delta table from Polars?
- Use `df.write_delta(target, mode='merge', delta_merge_options={'predicate': 's.id = t.id', 'source_alias': 's', 'target_alias': 't'})`. The call returns a `TableMerger` from the underlying `deltalake` package — chain `.when_matched_update_all().when_not_matched_insert_all().execute()` to commit. `LazyFrame.sink_delta` accepts the same `mode='merge'` and options.
- Can I use Delta Lake with S3 or MinIO from Polars?
- Yes. Pass `storage_options={'AWS_ACCESS_KEY_ID': ..., 'AWS_SECRET_ACCESS_KEY': ..., 'AWS_ENDPOINT_URL': 'http://localhost:9000', 'AWS_ALLOW_HTTP': 'true', 'AWS_REGION': 'us-east-1'}` to any Polars Delta function. Same dict works for MinIO, real S3, R2, and other S3-compatible stores.
- How do I time-travel to a previous version of a Delta table in Polars?
- Pass `version=N` (an integer commit number) or a `datetime` to `pl.read_delta` or `pl.scan_delta`. Both forms read the table exactly as it existed at that point.
- Why are my Delta reads getting slower over time?
- Lots of small commits accumulate lots of small Parquet files. Run `DeltaTable(path).optimize.compact()` to consolidate them, and `DeltaTable(path).vacuum(retention_hours=168)` to delete tombstoned files. Both are housekeeping you run periodically, not on every write.