1

I'd like to use the ON DUPLICATE KEY UPDATE optionality provided by SQLAlchemy to upsert a bunch of records.

These records have been sucessfully inserted with python using the following (where connection is engine.connect() object and table is a Table object)

record_list = [{'col1': 'name1', 'col2': '2015-01-31', 'col3': 27.2},
               {'col1': 'name1', 'col2': '2016-01-31', 'col3': 25.2}]
query = insert(table)
results = connection.execute(query, record_list)

Looking at the docs at https://docs.sqlalchemy.org/en/13/dialects/mysql.html#insert-on-duplicate-key-update-upsert as well as a number of SO questions (including the suggestion it's possible under the comments on SQLAlchemy ON DUPLICATE KEY UPDATE ) I've tried a number of different examples, but there were none that I could see that address multiple records with the upsert statement using this method.

I'm trying along the lines of

query = insert(table).values(record_list)
upsert_query = query.on_duplicate_key_update()
results = connection.execute(upsert_query)

but either get the issue that the .on_duplicate_key_update() requires cant be empty or that the SQL syntax is wrong.

If anyone has sucessfully managed and could help me with the code structure here I'd really appreciate it.

4 Answers 4

11

I just ran into a similar problem and creating a dictionary out of query.inserted solved it for me.

query = insert(table).values(record_list)
update_dict = {x.name: x for x in query.inserted}
upsert_query = query.on_duplicate_key_update(update_dict)
Sign up to request clarification or add additional context in comments.

1 Comment

It's better to filterout unique and primary keys: table_columns = t.columns._all_columns if isinstance(t, Table) else t._sa_class_manager.mapper.columns._all_columns update_dict = {x.name: x for x in stmt.inserted for c in table_columns if x.name == c.name and c.unique is not True and c.primary_key is not True} . Otherwise, the error "Duplicate entry '...' for key '...'" will raise.
3

Thanks to Federico Caselli of the SQLAlchemy project for explaining how to use on_duplicate_key_update in a discussion https://github.com/sqlalchemy/sqlalchemy/discussions/9328

Here's a Python3 script that demonstrates how to use SQLAlchemy version 2 to implement upsert using on_duplicate_key_update in the MySQL dialect:

import sqlalchemy as db
import sqlalchemy.dialects.mysql as mysql
from sqlalchemy import delete, select, String
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column


class Base(DeclarativeBase):
    pass


class User(Base):
    __tablename__ = "foo"
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(30))


engine = db.create_engine('mysql+mysqlconnector://USER-NAME-HERE:PASS-WORD-HERE@localhost/SCHEMA-NAME-HERE')
conn = engine.connect()

# setup step 0 - ensure the table exists
Base().metadata.create_all(bind=engine)

# setup step 1 - clean out rows with id 1..5
del_stmt = delete(User).where(User.id.in_([1, 2, 3, 4, 5]))
conn.execute(del_stmt)
conn.commit()
sel_stmt = select(User)
users = list(conn.execute(sel_stmt))
print(f'Table size after cleanout: {len(users)}')

# setup step 2 - insert 4 rows
ins_stmt = mysql.insert(User).values(
    [
        {"id": 1, "name": "x"},
        {"id": 2, "name": "y"},
        {"id": 3, "name": "w"},
        {"id": 4, "name": "z"},
    ]
)
conn.execute(ins_stmt)
conn.commit()
users = list(conn.execute(sel_stmt))
print(f'Table size after insert: {len(users)}')

# demonstrate upsert
ups_stmt = mysql.insert(User).values(
    [
        {"id": 1, "name": "xx"},
        {"id": 2, "name": "yy"},
        {"id": 3, "name": "ww"},
        {"id": 5, "name": "new"},
    ]
)
ups_stmt = ups_stmt.on_duplicate_key_update(name=ups_stmt.inserted.name)
# if you want to see the compiled result
# x = ups_stmt.compile(dialect=mysql.dialect())
# print(x.string, x.construct_params())
conn.execute(ups_stmt)
conn.commit()

users = list(conn.execute(sel_stmt))
print(f'Table size after upsert: {len(users)}')

Comments

2

@user12730260’s answer is great! but has a little bug, the correct code is:

query = insert(table).values(record_list)   # each record is a dict
update_dict = {x.name: x for x in query.inserted}  # specify columns for update, u can filter some of it
upsert_query = query.on_duplicate_key_update(**update_dict) # here's the modification: u should expand the columns dict

Comments

0

Your on_duplicate_key_update function requires arguments that define the data to be inserted in the update. Please have a look at the example in the documentation that you have already found.

insert().on_duplicate_key_update({"key": "value"})

9 Comments

Thanks for your reply. I tried passing in the record_list again (which is a list of dictionaries) but this didnt work - I think as a dictionary is expected. Any idea what would be required here where multiple records are being updated? Thx
just to clarify the above, I tried insert.on_duplicate_key_update(record_list)
The documentation gives an example of multiple updates, using a dict as well as a list: on_duplicate_key_stmt = insert_stmt.on_duplicate_key_update( {"data": "some data", "updated_at": func.current_timestamp()}, ) or on_duplicate_key_stmt = insert_stmt.on_duplicate_key_update( [ ("data", "some data"), ("updated_at", func.current_timestamp()), ], )
@kowpow Did you manage to use on_duplicate_key_update with a list of dictionaries?
@clumdee I added an answer above that demonstrates on_duplicate_key_update using a MySQL server via the SQLAlchemy MySQL dialect, please see stackoverflow.com/a/75538576/1630244
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.