7

For a project I need to parse some pretty large CSV files. The contents of some of the entries is stored in a MySQL database. I am trying to speed this up using multithreading, but up to now this only slows things down.

I parse a CSV file (up to 10GB), and some of these records (aprox. 5M out of a 20M+ record CSV) need to be inserted into a MySQL database. To determine which record needs to be inserted we use a Redis server with sets that contain the correct id's / references.

Since we process around 30 of these files at any given time, and there are some dependencies, we store each file on a Resque queue and have multiple servers handling these (prioritized) queues.

In a nutshell:

class Worker
  def self.perform(file)
    CsvParser.each(file) do |line|
      next unless check_line_with_redis(line)
      a = ObjectA.find_or_initialize_by_reference(line[:reference])
      a.object_bs.destroy_all
      a.update_attributes(line)
    end
  end

This works, scales nice horizontally (more CSV files = more servers), but larger CSV files pose a problem. We currently have files that take over 75 hours to parse this way. There are a number of optimizations I thought of already:

One is cutting down on the MySQL queries; we instantiate AR objects while an insert with plain SQL, if we know the objects Id, is much faster. This way we can probably get rid of most of AR and maybe even Rails to remove overhead this way. We can't use a plain MySQL load data since we have to map the CSVs records to other entities that might have different Ids by now (we combine a dozen legacy databases into a new database).

The other is trying to do more in the same time. There is some IO wait time, network wait time for both Redis and MySQL, and even while MRI uses green threads this might allow us to schedule our MySQL queries at the same time as the IO reads etc. But using the following code:

class Worker
  def self.perform(file)
    CsvParser.each(file) do |line|
      next unless check_line_with_redis(line)
      create_or_join_thread(line) do |myLine|
        a = ObjectA.find_or_initialize_by_reference(myLine[:reference])
        a.object_bs.destroy_all
        a.update_attributes(myLine)
      end
    end

    def self.create_or_join_thread(line)
      @thread.join if @thread.present?
      @thread = Thread.new(line) do |myLine|
        yield myLine
      end
    end
  end

This slowly slows down the process. When I ps au it starts off at 100% CPU, but as time progresses it goes down to just 2-3%. At that moment it does not insert new records at all, it just appears to hang.

I have straced the process, and at first I see MySQL queries pass by, after a while it appears it is not executing my ruby code at all. Could be a deadlock (it hung after parsing the last line of the CSV, but the process kept on running at 5% CPU and did not quit), or something I read here: http://timetobleed.com/ruby-threading-bugfix-small-fix-goes-a-long-way/

I am using Rails 2.3.8, REE, 1.8.7-2010.02 on Ubuntu 10.10. Any insights on how to handle large numbers of threads (or maybe why not to use threads here at all) is greatly appreciated!

6
  • Wouldn't simply splitting the CSV files into smaller ones be the simplest solution? Then just throw more workers at it? Commented Mar 24, 2011 at 23:04
  • Theoratically, yes. But the CSV's contain multiline values. That means we have to parse the quotes and escapes quotes to see if we are at the end of a line, or only halfway a quoted multiline string with quotes in it. We can't simply split every X lines. Commented Mar 24, 2011 at 23:13
  • OK. I thought that there had to be a catch ;) Commented Mar 24, 2011 at 23:17
  • Have you looked at fastercsv.rubyforge.org ? Commented Mar 24, 2011 at 23:35
  • FasterCSV is the new hotness CSV in Ruby 1.9.2, so you'd probably gain some processing speed with 1.9.2 and its 'CSV'. I'm thinking it's more likely an index problem, but that's a wild guess without being able to see into the database. Commented Mar 25, 2011 at 0:07

2 Answers 2

1

Do you have any indexes on these tables?

Could you temporarily disable these indexes during your bulk inserts?

Before we do bulk inserts we disable index keys:

ALTER TABLE foo DISABLE KEYS

After we're finished, we enable index keys:

ALTER TABLE foo ENABLE KEYS

From the docs:

ALTER TABLE ... DISABLE KEYS tells MySQL to stop updating non-unique indexes. ALTER TABLE ... ENABLE KEYS then should be used to re-create missing indexes. MySQL does this with a special algorithm that is much faster than inserting keys one by one, so disabling keys before performing bulk insert operations should give a considerable speedup. Using ALTER TABLE ... DISABLE KEYS requires the INDEX privilege in addition to the privileges mentioned earlier. While the non-unique indexes are disabled, they are ignored for statements such as SELECT and EXPLAIN that otherwise would use them.
Sign up to request clarification or add additional context in comments.

3 Comments

That can make a big difference, unless there's a unique key that suddenly finds non-unique data. Then it gets unhappy.
I will give this a try. A bit complicated, since we also perform lookups and joins while inserting, so I have to disable indexes only on the tabels I am inserting into.. But then I need to make sure if we process a lot of files at the same time there are no conflicting ALTERs on the same table. Worth a shot to see if this is a path to pursue.
This did not influence the speed at all. I monitored the same process with both indexes on and off, and both took the same amount of time.. There are as many selects as there are inserts in most cases, so I think the selects take longer while the inserts are faster. Have not observed that yet though.
1

You could try wrapping the whole thing up in a single transaction - that would probably make quite a difference:

class Worker
  def self.perform(file)
    ObjectA.transaction do 
      CsvParser.each(file) do |line|
        next unless check_line_with_redis(line)
        a = ObjectA.find_or_initialize_by_reference(line[:reference])
        a.object_bs.destroy_all
        a.update_attributes(line)
      end
    end
  end
end

Otherwise every save will be wrapped in it's own transaction. Although, for a 10GB file you probably want to break it into hunks, of say 1000 inserts per transaction or something.

1 Comment

This is interesting. I might try this if I have a chance, thanks!

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.