1

I have a massive table (over 100B records), that I added an empty column to. I parse strings from another field (string) if the required string is available, extract an integer from that field, and want to update it in the new column for all rows that have that string.

At the moment, after data has been parsed and saved locally in a dataframe, I iterate on it to update the Redshift table with clean data. This takes approx 1sec/iteration, which is way too long.

My current code example:

conn = psycopg2.connect(connection_details)
cur = conn.cursor()
clean_df = raw_data.apply(clean_field_to_parse)
for ind, row in clean_df.iterrows():
  update_query = build_update_query(row.id, row.clean_integer1, row.clean_integer2)
  cur.execute(update_query)

where update_query is a function to generate the update query:

def update_query(id, int1, int2):
  query = """
  update tab_tab
  set 
  clean_int_1 = {}::int,
  clean_int_2 = {}::int,
  updated_date = GETDATE()
  where id = {}
  ;
  """
  return query.format(int1, int2, id)

and where clean_df is structured like:

id . field_to_parse . clean_int_1 . clean_int_2
1  . {'int_1':'2+1'}.      3      .    np.nan
2  . {'int_2':'7-0'}.     np.nan  .      7

Is there a way to update specific table fields in bulk, so that there is no need to execute one query at a time?

I'm parsing the strings and running the update statement from Python. The database is stored on Redshift.

14
  • 3
    100B is 100 billion? You really don't want to be parsing that df line-by-line, let alone running single queries. What does parse_single_row() do? Any solution must first address that before then doing inserts in bulk Commented Nov 11, 2019 at 19:58
  • 2
    The whole point of pandas is not to iterate dataframes though. Batch processing alone will probably be several orders of magnitude faster, that's before you then increase the update efficiency of the table. You should include the function anyway, it's not possible for me to understand from just a description. Also "this will take longer than a few days" is the understatement of the year :P Commented Nov 11, 2019 at 20:05
  • 1
    Thanks. Also, apologies, I misread your last comment Commented Nov 11, 2019 at 20:20
  • 1
    So, batch the DF, process the column in the batch as a whole, convert the batch to an in-memory file, create a staging table, copy the file to the staging table and then join across to the main table. Quite a bit of faffing but hopefully significantly faster Commented Nov 11, 2019 at 20:37
  • 1
    docs.aws.amazon.com/redshift/latest/dg/json-functions.html <- You could just write a Redshift query and run it without having to do it with Python. This will be the most productive way to do it, IMHO Commented Nov 11, 2019 at 20:38

1 Answer 1

6

As mentioned, consider pure SQL and avoid iterating through billions of rows by pushing the Pandas data frame to Postgres as a staging table and then run one single UPDATE across both tables. With SQLAlchemy you can use DataFrame.to_sql to create a table replica of data frame. Even add an index of the join field, id, and drop the very large staging table at end.

from sqlalchemy import create_engine

engine = create_engine("postgresql+psycopg2://myuser:mypwd!@myhost/mydatabase")

# PUSH TO POSTGRES (SAME NAME AS DF)
clean_df.to_sql(name="clean_df", con=engine, if_exists="replace", index=False)

# SQL UPDATE (USING TRANSACTION)
with engine.begin() as conn:     

    sql = "CREATE INDEX idx_clean_df_id ON clean_df(id)"
    conn.execute(sql)

    sql = """UPDATE tab_tab t
             SET t.clean_int_1 = c.int1,
                 t.clean_int_2 = c.int2,
                 t.updated_date = GETDATE()
             FROM clean_df c
             WHERE c.id = t.id
          """
    conn.execute(sql)

    sql = "DROP TABLE IF EXISTS clean_df"
    conn.execute(sql)

engine.dispose()
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.