1- #!/usr/bin/env python
1+ #!/usr/bin/env python3
22import sys
33import time
44import argparse
55import psycopg2 as pg
6+ import os
67import row_processor as Processor
78import six
89import json
1213 ('Posts' , 'ViewCount' ): "NULLIF(%(ViewCount)s, '')::int"
1314}
1415
16+ # part of the file already downloaded
17+ file_part = None
18+
19+ def show_progress (block_num , block_size , total_size ):
20+ """Display the total size of the file to download and the progess in percent"""
21+ global file_part
22+ if file_part is None :
23+ suffixes = ['B' ,'KB' ,'MB' ,'GB' ,'TB' ]
24+ suffixIndex = 0
25+ pp_size = total_size
26+ while pp_size > 1024 :
27+ suffixIndex += 1 #increment the index of the suffix
28+ pp_size = pp_size / 1024.0 #apply the division
29+ six .print_ ('Total file size is: {0:.1f} {1}' .format (pp_size ,suffixes [suffixIndex ]))
30+ six .print_ ("0 % of the file downloaded ...\r " , end = "" , flush = True )
31+ file_part = 0
32+
33+ downloaded = block_num * block_size
34+ if downloaded < total_size :
35+ percent = 100 * downloaded / total_size
36+ if percent - file_part > 1 :
37+ file_part = percent
38+ six .print_ ("{0} % of the file downloaded ...\r " .format (int (percent )), end = "" , flush = True )
39+ else :
40+ file_part = None
41+ six .print_ ("" )
42+
43+ def buildConnectionString (dbname , mbHost , mbPort , mbUsername , mbPassword ):
44+ dbConnectionParam = "dbname={}" .format (dbname )
45+
46+ if mbPort is not None :
47+ dbConnectionParam += ' port={}' .format (mbPort )
48+
49+ if mbHost is not None :
50+ dbConnectionParam += ' host={}' .format (mbHost )
51+
52+ # TODO Is the escaping done here correct?
53+ if mbUsername is not None :
54+ dbConnectionParam += ' user={}' .format (mbUsername )
55+
56+ # TODO Is the escaping done here correct?
57+ if mbPassword is not None :
58+ dbConnectionParam += ' password={}' .format (mbPassword )
59+ return dbConnectionParam
60+
1561def _makeDefValues (keys ):
1662 """Returns a dictionary containing None for all keys."""
1763 return dict (( (k , None ) for k in keys ))
@@ -150,7 +196,7 @@ def _getTableKeys(table):
150196 ]
151197 return keys
152198
153- def handleTable (table , insertJson , createFk , dbname , mbDbFile , mbHost , mbPort , mbUsername , mbPassword ):
199+ def handleTable (table , insertJson , createFk , mbDbFile , dbConnectionParam ):
154200 """Handle the table including the post/pre processing."""
155201 keys = _getTableKeys (table )
156202 dbFile = mbDbFile if mbDbFile is not None else table + '.xml'
@@ -165,23 +211,6 @@ def handleTable(table, insertJson, createFk, dbname, mbDbFile, mbHost, mbPort, m
165211 six .print_ ("Could not load pre/post/fk sql. Are you running from the correct path?" , file = sys .stderr )
166212 sys .exit (- 1 )
167213
168- dbConnectionParam = "dbname={}" .format (dbname )
169-
170- if mbPort is not None :
171- dbConnectionParam += ' port={}' .format (mbPort )
172-
173- if mbHost is not None :
174- dbConnectionParam += ' host={}' .format (mbHost )
175-
176- # TODO Is the escaping done here correct?
177- if mbUsername is not None :
178- dbConnectionParam += ' user={}' .format (mbUsername )
179-
180- # TODO Is the escaping done here correct?
181- if mbPassword is not None :
182- dbConnectionParam += ' password={}' .format (mbPassword )
183-
184-
185214 try :
186215 with pg .connect (dbConnectionParam ) as conn :
187216 with conn .cursor () as cur :
@@ -208,7 +237,7 @@ def handleTable(table, insertJson, createFk, dbname, mbDbFile, mbHost, mbPort, m
208237 ' VALUES\n ' + valuesStr + ';'
209238 cur .execute (cmd )
210239 conn .commit ()
211- six .print_ ('Table {0} processing took {1:.1f} seconds' .format (table , time .time () - start_time ))
240+ six .print_ ('Table \' {0}\' processing took {1:.1f} seconds' .format (table , time .time () - start_time ))
212241
213242 # Post-processing (creation of indexes)
214243 start_time = time .time ()
@@ -237,12 +266,32 @@ def handleTable(table, insertJson, createFk, dbname, mbDbFile, mbHost, mbPort, m
237266 six .print_ ("Warning from the database." , file = sys .stderr )
238267 six .print_ ("pg.Warning: {0}" .format (str (w )), file = sys .stderr )
239268
269+
270+ def moveTableToSchema (table , schemaName , dbConnectionParam ):
271+ try :
272+ with pg .connect (dbConnectionParam ) as conn :
273+ with conn .cursor () as cur :
274+ # create the schema
275+ cur .execute ('CREATE SCHEMA IF NOT EXISTS ' + schemaName + ';' )
276+ conn .commit ()
277+ # move the table to the right schema
278+ cur .execute ('ALTER TABLE ' + table + ' SET SCHEMA ' + schemaName + ';' )
279+ conn .commit ()
280+ except pg .Error as e :
281+ six .print_ ("Error in dealing with the database." , file = sys .stderr )
282+ six .print_ ("pg.Error ({0}): {1}" .format (e .pgcode , e .pgerror ), file = sys .stderr )
283+ six .print_ (str (e ), file = sys .stderr )
284+ except pg .Warning as w :
285+ six .print_ ("Warning from the database." , file = sys .stderr )
286+ six .print_ ("pg.Warning: {0}" .format (str (w )), file = sys .stderr )
287+
240288#############################################################
241289
242290parser = argparse .ArgumentParser ()
243- parser .add_argument ( 'table'
291+ parser .add_argument ( '-t' , '-- table'
244292 , help = 'The table to work on.'
245293 , choices = ['Users' , 'Badges' , 'Posts' , 'Tags' , 'Votes' , 'PostLinks' , 'PostHistory' , 'Comments' ]
294+ , default = None
246295 )
247296
248297parser .add_argument ( '-d' , '--dbname'
@@ -255,6 +304,22 @@ def handleTable(table, insertJson, createFk, dbname, mbDbFile, mbHost, mbPort, m
255304 , default = None
256305 )
257306
307+ parser .add_argument ( '-s' , '--so-project'
308+ , help = 'stackexchange project to load.'
309+ , default = None
310+ )
311+
312+ parser .add_argument ( '--archive-url'
313+ , help = 'URL of the archive directory to retrieve.'
314+ , default = 'https://ia800107.us.archive.org/27/items/stackexchange'
315+ )
316+
317+ parser .add_argument ( '-k' , '--keep-archive'
318+ , help = 'should we keep the downloaded archive.'
319+ , action = 'store_true'
320+ , default = False
321+ )
322+
258323parser .add_argument ( '-u' , '--username'
259324 , help = 'Username for the database.'
260325 , default = None
@@ -287,6 +352,11 @@ def handleTable(table, insertJson, createFk, dbname, mbDbFile, mbHost, mbPort, m
287352 , default = False
288353 )
289354
355+ parser .add_argument ( '-n' , '--schema-name'
356+ , help = 'Use specific schema.'
357+ , default = 'public'
358+ )
359+
290360parser .add_argument ( '--foreign-keys'
291361 , help = 'Create foreign keys.'
292362 , action = 'store_true'
@@ -295,22 +365,71 @@ def handleTable(table, insertJson, createFk, dbname, mbDbFile, mbHost, mbPort, m
295365
296366args = parser .parse_args ()
297367
298- table = args .table
299-
300368try :
301369 # Python 2/3 compatibility
302370 input = raw_input
303371except NameError :
304372 pass
305373
374+ dbConnectionParam = buildConnectionString (args .dbname , args .host , args .port , args .username , args .password )
375+
376+ # load given file in table
377+ if args .file and args .table :
378+ table = args .table
379+
380+ if table == 'Posts' :
381+ # If the user has not explicitly asked for loading the body, we replace it with NULL
382+ if not args .with_post_body :
383+ specialRules [('Posts' , 'Body' )] = 'NULL'
384+
385+ choice = input ('This will drop the {} table. Are you sure [y/n]?' .format (table ))
386+ if len (choice ) > 0 and choice [0 ].lower () == 'y' :
387+ handleTable (table , args .insert_json , args .foreign_keys , args .file , dbConnectionParam )
388+ else :
389+ six .print_ ("Cancelled." )
390+ if args .schema_name != 'public' :
391+ moveTableToSchema (table , args .schema_name , dbConnectionParam )
392+ exit (0 )
393+
394+ # load a project
395+ elif args .so_project :
396+ import urllib .request
397+ import libarchive
398+
399+ # download the 7z archive in /tmp
400+ file_name = args .so_project + '.stackexchange.com.7z'
401+ url = '{0}/{1}' .format (args .archive_url , file_name )
402+ filepath = '/tmp/' + file_name
403+ six .print_ ('Downloading the archive, please be patient ...' )
404+ try :
405+ urllib .request .urlretrieve (url , filepath , show_progress )
406+ except Exception as e :
407+ six .print_ ('Error: impossible to download the {0} archive ({1})' .format (url , e ))
408+ exit (1 )
409+
410+ try :
411+ libarchive .extract_file (filepath )
412+ except Exception as e :
413+ six .print_ ('Error: impossible to extract the {0} archive ({1})' .format (url , e ))
414+ exit (1 )
415+
416+ tables = [ 'Tags' , 'Users' , 'Badges' , 'Posts' , 'Comments' , 'Votes' , 'PostLinks' , 'PostHistory' ]
417+
418+ for table in tables :
419+ six .print_ ('Load {0}.xml file' .format (table ))
420+ handleTable (table , args .insert_json , args .foreign_keys , args .file , dbConnectionParam )
421+ # remove file
422+ os .remove (table + '.xml' )
423+
424+ if not args .keep_archive :
425+ # remove archive
426+ os .remove (filepath )
306427
307- if table == 'Posts ' :
308- # If the user has not explicitly asked for loading the body, we replace it with NULL
309- if not args .with_post_body :
310- specialRules [( 'Posts' , 'Body' )] = 'NULL'
428+ if args . schema_name != 'public ' :
429+ for table in tables :
430+ moveTableToSchema ( table , args .schema_name , dbConnectionParam )
431+ exit ( 0 )
311432
312- choice = input ('This will drop the {} table. Are you sure [y/n]? ' .format (table ))
313- if len (choice ) > 0 and choice [0 ].lower () == 'y' :
314- handleTable (table , args .insert_json , args .foreign_keys , args .dbname , args .file , args .host , args .port , args .username , args .password )
315433else :
316- six .print_ ("Cancelled." )
434+ six .print_ ("Error: you must either use '-f' and '-t' arguments or the '-s' argument." )
435+ parser .print_help ()
0 commit comments