0

I have a 5GB CSV of IP addresses that I need to parse to a MySQL database.

Currently reading rows from the CSV and inserting into the MySQL. It works great however I would love to make it fast.

Could I parallel the reading and writing somehow? Or perhaps chuck the csv down and spawn from processes to read & write each split csv?

import csv
from csv import reader
from csv import writer
import mysql.connector

cnx = mysql.connector.connect(user='root', password='', host='127.0.0.1', database='ips')
cursor = cnx.cursor()
i = 1

with open('iplist.csv', 'r') as read_obj:
    csv_reader = reader(read_obj)
    for row in csv_reader:
        query = """INSERT INTO ips (ip_start,ip_end,continent) VALUES ('%s','%s','%s')""" % (row[0],row[1],row[2])
        print (query)
        cursor.execute(query)
        cursor.execute('COMMIT')
        print(i)
        i = i + 1
cnx.close()

Any help is appreciated.

11
  • Can you check out stackoverflow.com/questions/44950893/… and see if it helps you? Commented Sep 7, 2021 at 6:23
  • 1
    Don't commit after every insert Commented Sep 7, 2021 at 6:44
  • @RajeshYogeshwar Going to test. Having an issue loading multiprocessing module. Commented Sep 7, 2021 at 7:12
  • 1
    @RajeshYogeshwar Got that sorted :) Commented Sep 7, 2021 at 7:29
  • 1
    I have a solution that will insert 33 million rows in around 3 minutes. I'll post it as an answer shortly Commented Sep 7, 2021 at 9:14

3 Answers 3

2

Use cursor.executemany to increase speed:

# Tested with:
# docker run --rm -e MYSQL_ALLOW_EMPTY_PASSWORD=y -p 3306:3306 mysql
#
# CREATE DATABASE ips;
# USE ips;
# CREATE TABLE ips (id INT PRIMARY KEY NOT NULL AUTO_INCREMENT, ip_start VARCHAR(15), ip_end VARCHAR(15), continent VARCHAR(20));

import mysql.connector
import csv
import itertools

CHUNKSIZE = 1000  # Number of lines

cnx = mysql.connector.connect(user='root', password='', host='127.0.0.1', database='ips')
cursor = cnx.cursor()

with open('iplist.csv', 'r') as csvfile:
    reader = csv.reader(csvfile)
    while True:
        records = list(itertools.islice(reader, CHUNKSIZE))
        if not records:
            break
        query = """INSERT INTO ips (ip_start, ip_end, continent) VALUES (%s, %s, %s)"""
        cursor.executemany(query, records)
        cursor.execute('COMMIT')
Sign up to request clarification or add additional context in comments.

2 Comments

This works perfectly! Do you have any ideas about the ideal chunk size? Not limited by RAM (64GB).
Glad to read that. It depends on your max_allowed_packet parameter. Check dev.mysql.com/doc/refman/8.0/en/packet-too-large.html. Take care, I use chunksize as a number of lines not as a buffer size.
1

I created a pseudo-random CSV file where each row is of the style "111.222.333.444,555.666.777.888,A continent". The file contains 33 million rows. The following code was able to insert all rows into a MySQL database table in ~3 minutes:-

import mysql.connector
import time
import concurrent.futures
import csv
import itertools

CSVFILE='/Users/Andy/iplist.csv'
CHUNK=10_000


def doBulkInsert(rows):
    with mysql.connector.connect(user='andy', password='monster', host='localhost', database='andy') as connection:
        connection.cursor().executemany(f'INSERT INTO ips (ip_start, ip_end, continent) VALUES (%s, %s, %s)', rows)
        connection.commit()


def main():
    _s = time.perf_counter()
    with open(CSVFILE) as csvfile:
        csvdata = csv.reader(csvfile)
        _s = time.perf_counter()
        with concurrent.futures.ThreadPoolExecutor() as executor:
            while (data := list(itertools.islice(csvdata, CHUNK))):
                executor.submit(doBulkInsert, data)
            executor.shutdown(wait=True)
            print(f'Duration = {time.perf_counter()-_s}')    

if __name__ == '__main__':
    main()

4 Comments

Works well. Just a quick question: does using with mysql.connector.connect(user='andy', password='monster', host='localhost', database='andy') as connection: instead of connection = mysql.connector .... etc etc improve something? For some reason, using the with version doesn't store any data. I change it to a variable and use that connection variable instead and it does, just seems a bit slower?
with is the keyword used with context managers. When the with block ends, the relevant class's exit method will be called which will, typically, release all of its resources. If you're having a problem with that code I suspect you copied something wrongly
Turns out I had an issue with my mysql-connector pip package. Reinstalling that fixed my issue.
Adding another 13 columns of similar sized data has upped my total processing time to 688s for 33m rows. I would assume that sounds about right considering the increase in data?
1

My recommendation would be chunk your list. Break it down into 5,000 (or similar) chunks, then iterate through those. This will reduce the amount of queries you are making. Query volume seems to be your biggest bottleneck.

https://medium.com/code-85/two-simple-algorithms-for-chunking-a-list-in-python-dc46bc9cc1a2

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.