diff --git a/README.md b/README.md index 26ccbf4..36e9b2b 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,8 @@ Schema hints are taken from [a post on Meta.StackExchange](http://meta.stackexch ## Dependencies - [`lxml`](http://lxml.de/installation.html) - - [`psychopg2`](http://initd.org/psycopg/docs/install.html) + - [`psycopg2`](http://initd.org/psycopg/docs/install.html) + - [`libarchive-c`](https://pypi.org/project/libarchive-c/) ## Usage @@ -18,14 +19,14 @@ Schema hints are taken from [a post on Meta.StackExchange](http://meta.stackexch `Badges.xml`, `Votes.xml`, `Posts.xml`, `Users.xml`, `Tags.xml`. - In some old dumps, the cases in the filenames are different. - Execute in the current folder (in parallel, if desired): - - `python load_into_pg.py Badges` - - `python load_into_pg.py Posts` - - `python load_into_pg.py Tags` (not present in earliest dumps) - - `python load_into_pg.py Users` - - `python load_into_pg.py Votes` - - `python load_into_pg.py PostLinks` - - `python load_into_pg.py PostHistory` - - `python load_into_pg.py Comments` + - `python load_into_pg.py -t Badges` + - `python load_into_pg.py -t Posts` + - `python load_into_pg.py -t Tags` (not present in earliest dumps) + - `python load_into_pg.py -t Users` + - `python load_into_pg.py -t Votes` + - `python load_into_pg.py -t PostLinks` + - `python load_into_pg.py -t PostHistory` + - `python load_into_pg.py -t Comments` - Finally, after all the initial tables have been created: - `psql stackoverflow < ./sql/final_post.sql` - If you used a different database name, make sure to use that instead of @@ -34,7 +35,25 @@ Schema hints are taken from [a post on Meta.StackExchange](http://meta.stackexch - `psql stackoverflow < ./sql/optional_post.sql` - Again, remember to user the correct database name here, if not `stackoverflow`. -## Caveats +## Loading a complete stackexchange project + +You can use the script to download a given stackexchange compressed file from +[archive.org](https://ia800107.us.archive.org/27/items/stackexchange/) and load +all the tables at once, using the `-s` switch. + +You will need the `urllib` and `libarchive` modules. + +If you give a schema name using the `-n` switch, all the tables will be moved +to the given schema. This schema will be created in the script. + +To load the _dba.stackexchange.com_ project in the `dba` schema, you would execute: +`./load_into_pg.py -s dba -n dba` + +The paths are not changed in the final scripts `sql/final_post.sql` and +`sql/optional_post.sql`. To run them, first set the _search_path_ to your +schema name: `SET search_path TO ;` + +## Caveats and TODOs - It prepares some indexes and views which may not be necessary for your analysis. - The `Body` field in `Posts` table is NOT populated by default. You have to use `--with-post-body` argument to include it. diff --git a/load_into_pg.py b/load_into_pg.py index 66b651d..3fedb7a 100755 --- a/load_into_pg.py +++ b/load_into_pg.py @@ -3,6 +3,7 @@ import time import argparse import psycopg2 as pg +import os import row_processor as Processor import six import json @@ -12,6 +13,51 @@ ('Posts', 'ViewCount'): "NULLIF(%(ViewCount)s, '')::int" } +# part of the file already downloaded +file_part = None + +def show_progress(block_num, block_size, total_size): + """Display the total size of the file to download and the progess in percent""" + global file_part + if file_part is None: + suffixes=['B','KB','MB','GB','TB'] + suffixIndex = 0 + pp_size = total_size + while pp_size > 1024: + suffixIndex += 1 #increment the index of the suffix + pp_size = pp_size/1024.0 #apply the division + six.print_('Total file size is: {0:.1f} {1}'.format(pp_size,suffixes[suffixIndex])) + six.print_("0 % of the file downloaded ...\r", end="", flush=True) + file_part = 0 + + downloaded = block_num * block_size + if downloaded < total_size: + percent = 100 * downloaded / total_size + if percent - file_part > 1: + file_part = percent + six.print_("{0} % of the file downloaded ...\r".format(int(percent)), end="", flush=True) + else: + file_part = None + six.print_("") + +def buildConnectionString(dbname, mbHost, mbPort, mbUsername, mbPassword): + dbConnectionParam = "dbname={}".format(dbname) + + if mbPort is not None: + dbConnectionParam += ' port={}'.format(mbPort) + + if mbHost is not None: + dbConnectionParam += ' host={}'.format(mbHost) + + # TODO Is the escaping done here correct? + if mbUsername is not None: + dbConnectionParam += ' user={}'.format(mbUsername) + + # TODO Is the escaping done here correct? + if mbPassword is not None: + dbConnectionParam += ' password={}'.format(mbPassword) + return dbConnectionParam + def _makeDefValues(keys): """Returns a dictionary containing None for all keys.""" return dict(( (k, None) for k in keys )) @@ -150,7 +196,7 @@ def _getTableKeys(table): ] return keys -def handleTable(table, insertJson, createFk, dbname, mbDbFile, mbHost, mbPort, mbUsername, mbPassword): +def handleTable(table, insertJson, createFk, mbDbFile, dbConnectionParam): """Handle the table including the post/pre processing.""" keys = _getTableKeys(table) dbFile = mbDbFile if mbDbFile is not None else table + '.xml' @@ -165,23 +211,6 @@ def handleTable(table, insertJson, createFk, dbname, mbDbFile, mbHost, mbPort, m six.print_("Could not load pre/post/fk sql. Are you running from the correct path?", file=sys.stderr) sys.exit(-1) - dbConnectionParam = "dbname={}".format(dbname) - - if mbPort is not None: - dbConnectionParam += ' port={}'.format(mbPort) - - if mbHost is not None: - dbConnectionParam += ' host={}'.format(mbHost) - - # TODO Is the escaping done here correct? - if mbUsername is not None: - dbConnectionParam += ' user={}'.format(mbUsername) - - # TODO Is the escaping done here correct? - if mbPassword is not None: - dbConnectionParam += ' password={}'.format(mbPassword) - - try: with pg.connect(dbConnectionParam) as conn: with conn.cursor() as cur: @@ -208,7 +237,7 @@ def handleTable(table, insertJson, createFk, dbname, mbDbFile, mbHost, mbPort, m ' VALUES\n' + valuesStr + ';' cur.execute(cmd) conn.commit() - six.print_('Table {0} processing took {1:.1f} seconds'.format(table, time.time() - start_time)) + six.print_('Table \'{0}\' processing took {1:.1f} seconds'.format(table, time.time() - start_time)) # Post-processing (creation of indexes) start_time = time.time() @@ -216,7 +245,7 @@ def handleTable(table, insertJson, createFk, dbname, mbDbFile, mbHost, mbPort, m if post != '': cur.execute(post) conn.commit() - six.print_('Post processing took {} seconds'.format(time.time() - start_time)) + six.print_('Post processing took {0:.1f} seconds'.format(time.time() - start_time)) if createFk: # fk-processing (creation of foreign keys) start_time = time.time() @@ -224,7 +253,7 @@ def handleTable(table, insertJson, createFk, dbname, mbDbFile, mbHost, mbPort, m if post != '': cur.execute(fk) conn.commit() - six.print_('fk processing took {} seconds'.format(time.time() - start_time)) + six.print_('fk processing took {0:.1f} seconds'.format(time.time() - start_time)) except IOError as e: six.print_("Could not read from file {}.".format(dbFile), file=sys.stderr) @@ -237,12 +266,32 @@ def handleTable(table, insertJson, createFk, dbname, mbDbFile, mbHost, mbPort, m six.print_("Warning from the database.", file=sys.stderr) six.print_("pg.Warning: {0}".format(str(w)), file=sys.stderr) + +def moveTableToSchema(table, schemaName, dbConnectionParam): + try: + with pg.connect(dbConnectionParam) as conn: + with conn.cursor() as cur: + # create the schema + cur.execute('CREATE SCHEMA IF NOT EXISTS ' + schemaName + ';') + conn.commit() + # move the table to the right schema + cur.execute('ALTER TABLE '+table+' SET SCHEMA ' + schemaName + ';') + conn.commit() + except pg.Error as e: + six.print_("Error in dealing with the database.", file=sys.stderr) + six.print_("pg.Error ({0}): {1}".format(e.pgcode, e.pgerror), file=sys.stderr) + six.print_(str(e), file=sys.stderr) + except pg.Warning as w: + six.print_("Warning from the database.", file=sys.stderr) + six.print_("pg.Warning: {0}".format(str(w)), file=sys.stderr) + ############################################################# parser = argparse.ArgumentParser() -parser.add_argument( 'table' +parser.add_argument( '-t', '--table' , help = 'The table to work on.' , choices = ['Users', 'Badges', 'Posts', 'Tags', 'Votes', 'PostLinks', 'PostHistory', 'Comments'] + , default = None ) parser.add_argument( '-d', '--dbname' @@ -255,6 +304,22 @@ def handleTable(table, insertJson, createFk, dbname, mbDbFile, mbHost, mbPort, m , default = None ) +parser.add_argument( '-s', '--so-project' + , help = 'StackExchange project to load.' + , default = None + ) + +parser.add_argument( '--archive-url' + , help = 'URL of the archive directory to retrieve.' + , default = 'https://ia800107.us.archive.org/27/items/stackexchange' + ) + +parser.add_argument( '-k', '--keep-archive' + , help = 'Will preserve the downloaded archive instead of deleting it.' + , action = 'store_true' + , default = False + ) + parser.add_argument( '-u', '--username' , help = 'Username for the database.' , default = None @@ -287,6 +352,11 @@ def handleTable(table, insertJson, createFk, dbname, mbDbFile, mbHost, mbPort, m , default = False ) +parser.add_argument( '-n', '--schema-name' + , help = 'Use specific schema.' + , default = 'public' + ) + parser.add_argument( '--foreign-keys' , help = 'Create foreign keys.' , action = 'store_true' @@ -295,22 +365,83 @@ def handleTable(table, insertJson, createFk, dbname, mbDbFile, mbHost, mbPort, m args = parser.parse_args() -table = args.table - try: # Python 2/3 compatibility input = raw_input except NameError: pass +dbConnectionParam = buildConnectionString(args.dbname, args.host, args.port, args.username, args.password) + +# load given file in table +if args.file and args.table: + table = args.table + + if table == 'Posts': + # If the user has not explicitly asked for loading the body, we replace it with NULL + if not args.with_post_body: + specialRules[('Posts', 'Body')] = 'NULL' -if table == 'Posts': - # If the user has not explicitly asked for loading the body, we replace it with NULL - if not args.with_post_body: - specialRules[('Posts', 'Body')] = 'NULL' + choice = input('This will drop the {} table. Are you sure [y/n]?'.format(table)) + if len(choice) > 0 and choice[0].lower() == 'y': + handleTable(table, args.insert_json, args.foreign_keys, args.file, dbConnectionParam) + else: + six.print_("Cancelled.") + if args.schema_name != 'public': + moveTableToSchema(table, args.schema_name, dbConnectionParam) + exit(0) + +# load a project +elif args.so_project: + import libarchive + import tempfile + + filepath = None + temp_dir = None + if args.file: + filepath = args.file + url = filepath + else: + # download the 7z archive in tempdir + file_name = args.so_project + '.stackexchange.com.7z' + url = '{0}/{1}'.format(args.archive_url, file_name) + temp_dir = tempfile.mkdtemp(prefix='so_') + filepath = os.path.join(temp_dir, file_name) + six.print_('Downloading the archive in {0}'.format(filepath)) + six.print_('please be patient ...') + try: + six.moves.urllib.request.urlretrieve(url, filepath, show_progress) + except Exception as e: + six.print_('Error: impossible to download the {0} archive ({1})'.format(url, e)) + exit(1) + + try: + libarchive.extract_file(filepath) + except Exception as e: + six.print_('Error: impossible to extract the {0} archive ({1})'.format(url, e)) + exit(1) + + tables = [ 'Tags', 'Users', 'Badges', 'Posts', 'Comments', 'Votes', 'PostLinks', 'PostHistory' ] + + for table in tables: + six.print_('Load {0}.xml file'.format(table)) + handleTable(table, args.insert_json, args.foreign_keys, None, dbConnectionParam) + # remove file + os.remove(table+'.xml') + + if not args.keep_archive: + os.remove(filepath) + if temp_dir: + # remove the archive and the temporary directory + os.rmdir(temp_dir) + else: + six.print_("Archive '{0}' deleted".format(filepath)) + + if args.schema_name != 'public': + for table in tables: + moveTableToSchema(table, args.schema_name, dbConnectionParam) + exit(0) -choice = input('This will drop the {} table. Are you sure [y/n]? '.format(table)) -if len(choice) > 0 and choice[0].lower() == 'y': - handleTable(table, args.insert_json, args.foreign_keys, args.dbname, args.file, args.host, args.port, args.username, args.password) else: - six.print_("Cancelled.") + six.print_("Error: you must either use '-f' and '-t' arguments or the '-s' argument.") + parser.print_help() diff --git a/sql/Votes_pre.sql b/sql/Votes_pre.sql index 29aebe0..3ed0b53 100644 --- a/sql/Votes_pre.sql +++ b/sql/Votes_pre.sql @@ -1,7 +1,7 @@ DROP TABLE IF EXISTS Votes CASCADE; CREATE TABLE Votes ( Id int PRIMARY KEY , - PostId int not NULL , + PostId int , -- not NULL , VoteTypeId int not NULL , UserId int , CreationDate timestamp not NULL ,