Using staging tables for faster bulk upserts with Python and PostgreSQL
When bulk loading larger amounts of data, a simple approach like the one below when using SQLAlchemy can quickly become slow, especially for wider tables with many columns. This because of the bound parameter limit of 32767, forcing batching with smaller batch sizes.
stmt = insert(my_table).values(data)
upsert_stmt = stmt.on_conflict_do_update(
index_elements=["id"],
set_={"name": stmt.excluded.name, "value": stmt.excluded.value}
)
with engine.connect() as conn:
conn.execute(upsert_stmt)
Instead, we can use the psql driver connection to copy to a staging table first. This allows for moving larger amounts of data faster, using copy_to_table()
, and then doing the upsert in-engine within a transaction, ensuring rollback if anything unexpected happens.
PostgreSQL also supports temporary tables, which can be automatically cleaned up after the transaction by using the ON COMMIT DROP
clause.
Here is a quick example using the psql driver connection with the asyncpg library.
async with asyncpg_conn.transaction():
await asyncpg_conn.execute(f"""
CREATE TEMP TABLE staging_table_name (
id INTEGER NOT NULL,
name VARCHAR(255) NOT NULL,
value INTEGER NOT NULL,
) ON COMMIT DROP;
""")
cols = ["id", "name", "value"]
await asyncpg_conn.copy_to_table(
"staging_table_name",
source=csv_serialized_data,
format="csv",
delimiter=",",
columns=cols
)
cols_serialized = ",".join(cols)
await asyncpg_conn.execute(f"""
INSERT INTO main_table ({cols_serialized})
SELECT {cols_serialized}
FROM staging_table_name
ON CONFLICT (id)
DO UPDATE SET value = EXCLUDED.value, name = EXCLUDED.name;
""")
Depending on the specific table you're working with, this approach can make your upserts several times faster. For me it improved my upsert speed by around 5-6x for my use case.