1

This is a follow-up question. Below is a piece of my Python script that reads a constantly growing log files (text) and insert data into Postgresql DB. New log file generated each day. What I do is I commit each line which cuases a huge load and a really poor performance (needs 4 hours to insert 30 min of the file data!). How can I improve this code to insert bulks insead of lines? and would this help improve the performance and reduce load? I've read about copy_from but couldn't figure out how to use it in such situation.

   import psycopg2 as psycopg
                    try:
                      connectStr = "dbname='postgis20' user='postgres' password='' host='localhost'"
                      cx = psycopg.connect(connectStr)
                      cu = cx.cursor()
                      logging.info("connected to DB")
                    except:
                      logging.error("could not connect to the database")


                import time
                file = open('textfile.log', 'r')
                while 1:
                    where = file.tell()
                    line = file.readline()
                    if not line:
                        time.sleep(1)
                        file.seek(where)
                    else:
                        print line, # already has newline
                        dodecode(line)
            ------------
    def dodecode(fields):
   global cx
   from time import strftime, gmtime
   from calendar import timegm
   import os
   msg = fields.split(',')
   part = eval(msg[2])
   msgnum = int(msg[3:6])
   print "message#:", msgnum
   print fields

   if (part==1):
     if msgnum==1:
       msg1 = msg_1.decode(bv)
       #print "message1 :",msg1
       Insert(msgnum,time,msg1)
     elif msgnum==2:
       msg2 = msg_2.decode(bv)
       #print "message2 :",msg2
       Insert(msgnum,time,msg2)
     elif msgnum==3:
     ....
     ....
     ....    
        ----------------
        def Insert(msgnum,time,msg):
         global cx

         try:    
                 if msgnum in [1,2,3]:   
                  if msg['type']==0:
                    cu.execute("INSERT INTO table1 ( messageid, timestamp, userid, position, text ) SELECT "+str(msgnum)+", '"+time+"', "+str(msg['UserID'])+", ST_GeomFromText('POINT("+str(float(msg['longitude']), '"+text+"')+" "+str(float(msg['latitude']))+")']))+"  WHERE NOT EXISTS (SELECT * FROM table1 WHERE timestamp='"+time+"' AND text='"+text+"';")      
                    cu.execute("INSERT INTO table2 ( field1,field2,field3, time_stamp, pos,) SELECT "+str(msg['UserID'])+","+str(int(msg['UserName']))+","+str(int(msg['UserIO']))+", '"+time+"', ST_GeomFromText('POINT("+str(float(msg['longitude']))+" "+str(float(msg['latitude']))+")')," WHERE NOT EXISTS (SELECT * FROM table2 WHERE field1="+str(msg['UserID'])+");")
                    cu.execute("Update table2 SET field3='"+str(int(msg['UserIO']))+"',time_stamp='"+str(time)+"',pos=ST_GeomFromText('POINT("+str(float(msg['longitude']))+" "+str(float(msg['latitude']))+")'),"' WHERE field1='"+str(msg['UserID'])+"' AND time_stamp < '"+str(time)+"';")
                  elif msg['type']==1:
                    cu.execute("INSERT INTO table1 ( messageid, timestamp, userid, position, text ) SELECT "+str(msgnum)+", '"+time+"', "+str(msg['UserID'])+", ST_GeomFromText('POINT("+str(float(msg['longitude']), '"+text+"')+" "+str(float(msg['latitude']))+")']))+"  WHERE NOT EXISTS (SELECT * FROM table1 WHERE timestamp='"+time+"' AND text='"+text+"';")    
                    cu.execute("INSERT INTO table2 ( field1,field2,field3, time_stamp, pos,) SELECT "+str(msg['UserID'])+","+str(int(msg['UserName']))+","+str(int(msg['UserIO']))+", '"+time+"', ST_GeomFromText('POINT("+str(float(msg['longitude']))+" "+str(float(msg['latitude']))+")')," WHERE NOT EXISTS (SELECT * FROM table2 WHERE field1="+str(msg['UserID'])+");")
                    cu.execute("Update table2 SET field3='"+str(int(msg['UserIO']))+"',time_stamp='"+str(time)+"',pos=ST_GeomFromText('POINT("+str(float(msg['longitude']))+" "+str(float(msg['latitude']))+")'),"' WHERE field1='"+str(msg['UserID'])+"' AND time_stamp < '"+str(time)+"';")
                  elif msg['type']==2:
                ....
                ....
                ....
     except Exception, err:
             #print('ERROR: %s\n' % str(err))
             logging.error('ERROR: %s\n' % str(err))
             cx.commit()

     cx.commit()  

1 Answer 1

1

doing multiple rows per transaction, and per query will make it go faster,

when faced with a similar problem I put multiple rows in the values part of the insert query, but you have complicated insert queries, so you'll likely need a different approach.

I'd suggest creating a temporary table and inserting say 10000 rows into it with ordinary multi-row inserts

insert into temptable values ( /* row1 data */ ) ,( /* row2 data */ ) etc...

500 rows per insert.is a good starting point.

then joining the temp table with the existing data to de-dupe it.

delete from temptable using livetable where /* .join condition */ ;

and de-duping it against itself if that is needed too

delete from temptable where id not in 
  ( select distinct on ( /* unique columns */) id from temptable);

then using insert-select to copy the rows from the temporary table into the live table

insert into livetable ( /* columns */ )
  select /* columns */ from temptable; 

it looks like you might need an update-from too

and finally dropping the temp table and starting again.

ans you're writing two tables you;re going to need to double-up all these operations.

I'd do the insert by maintaing a count and a list of values to insert and then at insert time building a repeating the (%s,%s,%s,%s) part ot the query as many times as needed and passing the list of values in separately and letting psycopg2 deal with the formatting.

I'd expect making those changes could get you a speed up of 5 times for more

Sign up to request clarification or add additional context in comments.

2 Comments

Thank you Jasen! I liked this idea! I just have one question: what's the point of the second step then joining the temp table with the existing data to de-dupe it i didn't get it.
I can't see your input file, so I can't see if your temporary table will have rows with duplicated data or not, if it does the insert into the live table will probably fail

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.