I created a python code to run the merge command in Google BigQuery, which is used to do UPDATE, INSERT, and DELETE in a single statement. I'd appreciate it if someone could assist me in fine-tuning this code since i believe there are lot of redundant code which can be improved to a greater extent. I have tried my best but cannot make it any more better so need someone's help.
def merge_cmd(database, table, dc, keys, columns, filter_column, new_main_column_type_list):
try:
if dc != '-1':
merge_query = "MERGE INTO " + database + "." + table + "_" + dc + " TARGET USING " + database + "_stg." + table + "_" + dc + "_stg SOURCE "
else:
merge_query = "MERGE INTO " + database + "." + table + " TARGET USING " + database + "_stg." + table + "_stg SOURCE "
if dc != '-1':
keys = keys + ',dc'
key_list = keys.split(',')
col_no_key = columns.split(',')
for i in range(len(key_list)):
key_list[i] = key_list[i].lower()
for i in range(len(col_no_key)):
col_no_key[i] = col_no_key[i].lower()
for key in key_list:
col_no_key.remove(key)
cols_list = columns.split(',')
key_list_type = get_key_list_type(key_list, new_main_column_type_list)
col_no_key_list_type = get_key_list_type(col_no_key, new_main_column_type_list)
cols_list_type = get_key_list_type(cols_list, new_main_column_type_list)
ON = " ON "
b = 1
for key_type in key_list_type:
key = key_type.split(':')[0]
type_ = key_type.split(':')[1]
ON = ON + " TARGET." + key + " = CAST(SOURCE." + key + " as " + type_ + ")"
if b < len(key_list):
ON = ON + " AND "
b = b + 1
UPDATE = " WHEN MATCHED THEN UPDATE SET "
c = 1
for col_list_type in col_no_key_list_type:
col = col_list_type.split(':')[0]
type_ = col_list_type.split(':')[1]
if col == 'dt':
UPDATE = UPDATE + " TARGET." + col + " = " + " CAST(SOURCE.calendar_date as date)"
else:
UPDATE = UPDATE + " TARGET." + col + " = " + "CAST(SOURCE." + col + " as " + type_ + ")"
if c < len(col_no_key):
UPDATE = UPDATE + ","
c = c + 1
INSERT = " WHEN NOT MATCHED BY TARGET THEN INSERT (" + columns + ") VALUES ("
d = 1
for col_list_type in cols_list_type:
col = col_list_type.split(':')[0]
type_ = col_list_type.split(':')[1]
if col == 'dt':
INSERT = INSERT + " CAST(SOURCE.calendar_date as date)"
else:
INSERT = INSERT + " CAST(SOURCE." + col + " as " + type_ + ")"
if d < len(cols_list):
INSERT = INSERT + ","
d = d + 1
INSERT = INSERT + ")"
merge_query = merge_query + ON + UPDATE + INSERT
merge_query = "'" + merge_query + "'"
merge_cmd = "bq --location=us query --use_legacy_sql=false " + merge_query
print(merge_cmd)
return_cd, out, err = run_sys_command(merge_cmd)
if return_cd != 0:
for i in range(20):
time.sleep(30)
return_cd, out, err = run_sys_command(merge_cmd)
if return_cd == 0:
break
if return_cd != 0:
raise Exception('Merge Failed')
if __name__ == "__main__":
merge_cmd('testdb', 'test', 8134, 'load_id | test_load_id', 'invoice_id,dt', 'last_change_ts', 'invoice_id:INT64')
database,table,dc,keys,columns,filter_column,new_main_column_type_listthat we could use? \$\endgroup\$