As title says, this piece of code migrates files (binary and metadata) from a database to another one with different structure.
Currently my problem is that when I have to deal with a big database (between 12 and 30GB) , at a certain point the script CRASHES. With the 30GB database, this took 4 days of activity and then it just got killed from the system.
#!/usr/bin/python
# ------------------------------
# Import standard libraries |
# ------------------------------
#
import os
import sys
import mysql.connector
import psycopg2
import shutil
import base64
# ------------------------------
# Import internal snippets |
# ------------------------------
#
from include.db_config import *
from include.functions import *
# ------------------------------
# Open database connections |
# ------------------------------
#
# Mysql connection
try:
cnx_msql = mysql.connector.connect( host=host_mysql, user=user_mysql, passwd=pswd_mysql, db=dbna_mysql )
except mysql.connector.Error as e:
print "MYSQL: Unable to connect!", e.msg
sys.exit(1)
# Mysql unbuffered connection
try:
cnx_msql_unbuffered = mysql.connector.connect( host=host_mysql, user=user_mysql, passwd=pswd_mysql, db=dbna_mysql )
except mysql.connector.Error as e:
print "MYSQL: Unable to connect!", e.msg
sys.exit(1)
# Postgresql connection
try:
cnx_psql = psycopg2.connect(**psql_param)
# enable autocommit
#cnx_psql.set_isolation_level(0)
except psycopg2.Error as e:
print('PSQL: Unable to connect!\n{0}').format(e)
sys.exit(1)
# -----------
# FUNCTIONS |
# -----------
#
def fv_missing_records():
cur_msql = cnx_msql_unbuffered.cursor(dictionary=True)
cur_psql = cnx_psql.cursor()
qry_1 = "SELECT r.resource_id, r.author_id, r.create_date, r.visible_from, '' AS description, r.size, \
r.format, r.content_type, r.status, rep.content, rep.iv, 1 as version, NULL AS loid \
FROM resources r \
INNER JOIN repository rep ON r.RESOURCE_ID=rep.RESOURCE_ID AND rep.VERSION=1 \
LEFT JOIN versions v ON r.RESOURCE_ID=v.RESOURCE_ID \
WHERE v.RESOURCE_ID IS NULL and r.type = 4 ORDER BY r.RESOURCE_ID;"
execute_msql(cur_msql, qry_1)
qry_psql = "INSERT INTO file_versions(resource_id, author_id, create_date, publication_date, version, \
description, size, format, content_type, status, file_oid, iv) VALUES"
arg_psql = "(%(resource_id)s, %(author_id)s, %(create_date)s, %(visible_from)s, %(version)s, \
%(description)s, %(size)s, %(format)s, %(content_type)s, %(status)s, %(loid)s, %(iv)s)"
args_tmp = []
for row in cur_msql:
loid=lobject_direct_migration(cnx_psql, row['content']) ##cur_psql seem to be useless here, check it
# convert iv string to bytearray
row['iv'] = convert_iv(row['iv'])
# fix oid for the current row
row['loid'] = loid
dataset = cur_psql.mogrify(arg_psql, row)
execute_psql(cnx_psql, cur_psql, qry_psql, dataset, False)
info_sleep("Done adding missing records in file_versions!", 3)
cur_msql.close()
cur_psql.close()
def migrate_files(function_args):
#temporarely drop fk constraints
disable_fks(cnx_psql)
#here I'm outsite key-values foreach
fv_missing_records()
for key, values in function_args.items():
cur_msql = cnx_msql_unbuffered.cursor(dictionary=True)
cur_msql1 = cnx_msql.cursor(dictionary=True)
cur_psql = cnx_psql.cursor()
# SELECT from source db
cur_msql.execute(values[0])
for row in cur_msql:
#DEBUG#print row
if key == "files":
#print row['rid']
full_qry="SELECT resource_id, path, address, asas_id, progress \
FROM resources \
WHERE resource_id={0}".format(row['rid'])
# execute full query
cur_msql1.execute(full_qry)
for the_row in cur_msql1:
the_row = casting_repository_types(the_row)
### INSERT the row into destination db
#print the_row
dataset = cur_psql.mogrify(values[2], the_row)
#print dataset
execute_psql(cnx_psql, cur_psql, values[1], dataset, False)
elif key == "file_versions":
full_qry="SELECT v.resource_id, v.version, rep.content, v.description, v.format, v.content_type, \
v.size, v.user_id, v.timestamp, v.status, rep.iv, NULL AS loid \
FROM versions v \
INNER JOIN repository rep \
ON v.resource_id = rep.resource_id \
AND v.version = rep.version \
WHERE v.resource_id={0} AND v.version={1}".format(row['resource_id'], row['version'])
# execute full query
cur_msql1.execute(full_qry)
for the_row in cur_msql1:
dataset = cur_psql.mogrify(values[2], the_row)
### INSERT the row into destination db
execute_psql(cnx_psql, cur_psql, values[1], dataset, False)
loid=lobject_direct_migration(cnx_psql, the_row['content'])
# convert iv string to bytearray
the_row['iv']=convert_iv(the_row['iv'])
# fix oid for the current row
the_row['loid']=loid
elif key == "file_secview":
full_qry="SELECT resource_id, page, version, content, iv, NULL AS loid \
FROM repository_secview \
WHERE resource_id={0} AND page={1} AND version={2}".format(row['resource_id'], row['page'], row['version'])
# execute full query
cur_msql1.execute(full_qry)
for the_row in cur_msql1:
dataset = cur_psql.mogrify(values[2], the_row)
### INSERT the row into destination db
execute_psql(cnx_psql, cur_psql, values[1], dataset, False)
loid=lobject_direct_migration(cnx_psql, the_row['content'])
# fix page, in A3 starting from 0
the_row['page']=the_row['page']+1
# convert iv string to bytearray
the_row['iv']=convert_iv(the_row['iv'])
# fix oid for the current row
the_row['loid']=loid
elif key == "file_secprint":
full_qry="SELECT resource_id, version, content, iv, NULL AS loid \
FROM repository_secprint \
WHERE resource_id={0} AND version={1}".format(row['resource_id'], row['version'])
# execute full query
cur_msql1.execute(full_qry)
for the_row in cur_msql1:
dataset = cur_psql.mogrify(values[2], the_row)
### INSERT the row into destination db
execute_psql(cnx_psql, cur_psql, values[1], dataset, False)
loid=lobject_direct_migration(cnx_psql, the_row['content'])
# convert iv string to bytearray
the_row['iv']=convert_iv(the_row['iv'])
# fix oid for the current row
the_row['loid']=loid
elif key == "annotations":
full_qry="SELECT a.resource_id, ifnull(MAX(v.version), 1) AS maxversion, a.attachment_number, a.author_id, \
a.content, iv, a.update_date, NULL AS loid \
FROM annotations a LEFT JOIN versions v ON a.resource_id=v.resource_id \
WHERE resource_id={0} GROUP BY a.resource_id".format(row['resource_id'])
# execute full query
cur_msql1.execute(full_qry)
for the_row in cur_msql1:
dataset = cur_psql.mogrify(values[2], the_row)
### INSERT the row into destination db
execute_psql(cnx_psql, cur_psql, values[1], dataset, False)
loid=lobject_direct_migration(cnx_psql, the_row['content'])
# convert iv string to bytearray
the_row['iv']=convert_iv(the_row['iv'])
# fix oid for the current row
the_row['loid']=loid
# Cursors close
cur_msql.close()
cur_psql.close()
# fix back foreigh keys
enable_fks(cnx_psql)
info_sleep("Done with files!", 3)
# --------------
# Conversions |
# --------------
#
def casting_repository_types(the_row):
# Data type conversion : smallint to boolean
if the_row['progress'] == 0:
the_row['progress'] = False
elif the_row['progress'] == 1:
the_row['progress'] = True
if the_row['path'] == None:
the_row['path'] = 0
return the_row
# ------------
# Queries |
# ------------
#
function_args={ 'files':[], 'file_versions':[], 'file_secview':[], 'file_secprint':[], 'annotations':[]}
files=[
"SELECT resource_id as rid FROM resources \
WHERE type=4 and subtype=2",
"INSERT INTO files (resource_id, reference_id, address, versions_to_kept, show_in_welcome) VALUES ",
"(%(resource_id)s, %(path)s, %(address)s, %(asas_id)s, %(progress)s)"
]
file_versions=[ # /!\ publication_date, approval_request_date & signed_file_version set to NULL
"SELECT v.resource_id, v.version \
FROM versions v \
INNER JOIN repository rep \
ON v.resource_id = rep.resource_id \
AND v.version = rep.version",
"INSERT INTO file_versions (resource_id, author_id, create_date, version, description, size, format, content_type, status, file_oid, iv) VALUES ",
"(%(resource_id)s, %(user_id)s, %(timestamp)s, %(version)s, %(description)s, %(size)s, %(format)s, %(content_type)s, %(status)s, %(loid)s, %(iv)s)"
]
file_secview=[
"SELECT resource_id, page, version FROM repository_secview",
"INSERT INTO file_secview (resource_id, version, page, file_oid, iv) VALUES ",
"(%(resource_id)s, %(version)s, %(page)s, %(loid)s, %(iv)s)"
]
file_secprint=[
"SELECT resource_id, version FROM repository_secprint",
"INSERT INTO file_secprint (resource_id, version, file_oid, iv) VALUES ",
"(%(resource_id)s, %(version)s, %(loid)s, %(iv)s)"
]
annotations=[
"SELECT a.resource_id FROM annotations a LEFT JOIN versions v ON a.resource_id=v.resource_id GROUP BY a.resource_id",
"INSERT INTO annotations (resource_id, version, author_id, file_oid, iv, update_date) VALUES ",
"(%(resource_id)s, %(maxversion)s, %(author_id)s, %(loid)s, %(iv)s, %(update_date)s)"
]
for arg in files:
function_args['files'].append(arg)
for arg in file_versions:
function_args['file_versions'].append(arg)
for arg in file_secview:
function_args['file_secview'].append(arg)
for arg in file_secprint:
function_args['file_secprint'].append(arg)
for arg in annotations:
function_args['annotations'].append(arg)
migrate_files(function_args)
############## END OF SCRIPT ################
#-------------------------------------------#
# Finalizing stuff & closing db connections #
#-------------------------------------------#
#############################################
## Committing
cnx_psql.commit()
## Closing database connections
cnx_msql_unbuffered.close()
cnx_msql.close()
cnx_psql.close()
Here it is(below) the functions called by the main script(above):
### EXECUTE QUERIES ON DBs
def execute_psql(cnx_psql, cursor, query, dataset = None, direct_commit = False):
if dataset:
try:
cursor.execute( query + dataset )
print "EXECUTED QUERY : " + query + dataset
except psycopg2.Error as err:
print "/!\ Cannot execute the query on " + query + dataset, err.pgerror
cnx_psql.rollback()
sys.exit( "Rolledback! And leaving early this lucky script, find out what is wrong" )
else:
print "The dataset for " + query + " is empty, skipping..."
return cursor
def execute_msql(cursor, query):
try:
cursor.execute( query )
except mysql.connector.Error as err:
print "/!\ Cannot execute the following query:" , query
print "/!\ Error:", err
sys.exit( "leaving early this lucky script, find out what is wrong" )
return cursor
### FILES migration
def convert_iv(iv):
if iv:
iv = bytearray((base64.b64decode(string)))
return iv
def lobject_direct_migration(cnx_psql, blob):
bytearray_to_string = str(bytearray(blob))
loid = cnx_psql.lobject().oid
try:
cnx_psql.lobject(loid, mode='w').write( bytearray_to_string )
print "lo_migration | new oid : "+str(loid)
del bytearray_to_string
except psycopg2.Error as err:
print "/!\ Cannot insert large_object " + str(loid), err.pgerror
cnx_psql.rollback()
sys.exit( "Rollback! And leaving early this lucky script, find out what is wrong" )
return loid
Logging and monitoring: Additionally, in order to check the system resources consuming, I've implemented the following function, and called it after each query, but the system memory usage was very stable, and under a warning limit.
def get_log():
now = datetime.datetime.now()
print '\n*** BEGIN LOGGING ***'
print 'DATETIME: '+str(now)
print 'MEMORY USAGE: '+str(resource.getrusage(resource.RUSAGE_SELF).ru_maxrss)+' Kilobyte'
print 'TIME IN USER MODE (FLOAT): '+str(resource.getrusage(resource.RUSAGE_SELF).ru_utime)
print 'TIME IN SYSTEM MODE (FLOAT): '+str(resource.getrusage(resource.RUSAGE_SELF).ru_stime)
print 'SHARED MEMORY USAGE: '+str(resource.getrusage(resource.RUSAGE_SELF).ru_ixrss)+' Kilobyte'
print 'UNSHARED MEMORY USAGE: '+str(resource.getrusage(resource.RUSAGE_SELF).ru_idrss)+' Kilobyte'
print 'PAGE FAULTS NOT REQUIRING I/O: '+str(resource.getrusage(resource.RUSAGE_SELF).ru_minflt)
print 'PAGE FAULTS REQUIRING I/O: '+str(resource.getrusage(resource.RUSAGE_SELF).ru_majflt)
print 'NUMBER OF SWAP OUTS: '+str(resource.getrusage(resource.RUSAGE_SELF).ru_nswap)
print 'BLOCK INPUT OPERATIONS: '+str(resource.getrusage(resource.RUSAGE_SELF).ru_inblock)
print 'BLOCK OUTPUT OPERATIONS: '+str(resource.getrusage(resource.RUSAGE_SELF).ru_oublock)
print 'MESSAGES SENT: '+str(resource.getrusage(resource.RUSAGE_SELF).ru_msgsnd)
print 'MESSAGES RECEIVED: '+str(resource.getrusage(resource.RUSAGE_SELF).ru_msgrcv)
print 'SIGNALS RECEIVED: '+str(resource.getrusage(resource.RUSAGE_SELF).ru_nsignals)
print 'VOLUNTARY CONTEXT SWITCHES: '+str(resource.getrusage(resource.RUSAGE_SELF).ru_nvcsw)
print 'INVOLUNTARY CONTEXT SWITCHES: '+str(resource.getrusage(resource.RUSAGE_SELF).ru_nivcsw)
## LIMITS ON RESOURCES USAGE
#print str(resource.getrlimit(resource.RLIMIT_CORE))
#print str(resource.getrlimit(resource.RLIMIT_CPU))
#print str(resource.getrlimit(resource.RLIMIT_FSIZE))
print '*** END LOGGING ***\n'
return True
Trunk of a report Following a piece of the report obtained, as you can see, the datetime is unfortunately rounded at the second, but it is clear, to migrate each record less that 1 second is needed, most of the time is spended during the preparation of the record to be migrated
2016-06-23 03:41:35 | SOURCE DB QUERY : SELECT resource_id, page, version, content, iv, NULL AS loid FROM repository_secview WHERE resource_id=1445333482345622 AND page=0 AND version=1
2016-06-23 03:41:35 | DESTINATION DB QUERY : INSERT INTO file_secview (resource_id, version, page, file_oid, iv) VALUES (1445333482345622, 1, 1, 2095944, NULL)
2016-06-23 03:41:38 | SOURCE DB QUERY : SELECT resource_id, page, version, content, iv, NULL AS loid FROM repository_secview WHERE resource_id=1445333501859624 AND page=0 AND version=1
2016-06-23 03:41:38 | DESTINATION DB QUERY : INSERT INTO file_secview (resource_id, version, page, file_oid, iv) VALUES (1445333501859624, 1, 1, 2095945, NULL)
2016-06-23 03:41:40 | SOURCE DB QUERY : SELECT resource_id, page, version, content, iv, NULL AS loid FROM repository_secview WHERE resource_id=1457424635012030 AND page=0 AND version=1
2016-06-23 03:41:40 | DESTINATION DB QUERY : INSERT INTO file_secview (resource_id, version, page, file_oid, iv) VALUES (1457424635012030, 1, 1, 2095946, NULL)
Does anyone see a bottleneck? In my opinion the issue could lay into the following conversion into string, located into lobject_direct_migration() function, but I don't see any other way to perform the same action:
bytearray_to_string = str(bytearray(blob))
Any way to tune this snippet in order to have better performance? any improvement tip is welcome. I would like to decrease the execution time to 1day maximum : )
forin anifin aforin afor. I'm not surprised it takes forever to execute, but you indicate it currently takes over two days for 30GB? That's looong. Good idea to get this reviewed! :-) \$\endgroup\$