For a project I need to parse some pretty large CSV files. The contents of some of the entries is stored in a MySQL database. I am trying to speed this up using multithreading, but up to now this only slows things down.
I parse a CSV file (up to 10GB), and some of these records (aprox. 5M out of a 20M+ record CSV) need to be inserted into a MySQL database. To determine which record needs to be inserted we use a Redis server with sets that contain the correct id's / references.
Since we process around 30 of these files at any given time, and there are some dependencies, we store each file on a Resque queue and have multiple servers handling these (prioritized) queues.
In a nutshell:
class Worker
def self.perform(file)
CsvParser.each(file) do |line|
next unless check_line_with_redis(line)
a = ObjectA.find_or_initialize_by_reference(line[:reference])
a.object_bs.destroy_all
a.update_attributes(line)
end
end
This works, scales nice horizontally (more CSV files = more servers), but larger CSV files pose a problem. We currently have files that take over 75 hours to parse this way. There are a number of optimizations I thought of already:
One is cutting down on the MySQL queries; we instantiate AR objects while an insert with plain SQL, if we know the objects Id, is much faster. This way we can probably get rid of most of AR and maybe even Rails to remove overhead this way. We can't use a plain MySQL load data since we have to map the CSVs records to other entities that might have different Ids by now (we combine a dozen legacy databases into a new database).
The other is trying to do more in the same time. There is some IO wait time, network wait time for both Redis and MySQL, and even while MRI uses green threads this might allow us to schedule our MySQL queries at the same time as the IO reads etc. But using the following code:
class Worker
def self.perform(file)
CsvParser.each(file) do |line|
next unless check_line_with_redis(line)
create_or_join_thread(line) do |myLine|
a = ObjectA.find_or_initialize_by_reference(myLine[:reference])
a.object_bs.destroy_all
a.update_attributes(myLine)
end
end
def self.create_or_join_thread(line)
@thread.join if @thread.present?
@thread = Thread.new(line) do |myLine|
yield myLine
end
end
end
This slowly slows down the process. When I ps au it starts off at 100% CPU, but as time progresses it goes down to just 2-3%. At that moment it does not insert new records at all, it just appears to hang.
I have straced the process, and at first I see MySQL queries pass by, after a while it appears it is not executing my ruby code at all. Could be a deadlock (it hung after parsing the last line of the CSV, but the process kept on running at 5% CPU and did not quit), or something I read here: http://timetobleed.com/ruby-threading-bugfix-small-fix-goes-a-long-way/
I am using Rails 2.3.8, REE, 1.8.7-2010.02 on Ubuntu 10.10. Any insights on how to handle large numbers of threads (or maybe why not to use threads here at all) is greatly appreciated!