2

I use multithreading to insert data to database, but it cannot return correct result, the code is in belows:

class MongoInsertThread(threading.Thread):
    def __init__(self,  queue, thread_id):
        super(MongoInsertThread, self).__init__()
        self.thread_id = thread_id
        self.queue = queue

    def run(self):
        print(self.thread_id,': ', self.queue.get())

def save_to_mongo_with_thread():
    q = queue.Queue()

    for e in range(3):
        for i in range(10):
            q.put([i], block=False)
        threads = []
        for i in range(5): ##(1)
            threads.append(MongoInsertThread(q, i))
        for t in threads:
            t.start()
        for t in threads:
            t.join()
        print("+++++++++++++++++++++++")

but the result generated by the code is:

0 :  [0]
1 :  [1]
2 :  [2]
3 :  [3]
4 :  [4]
+++++++++++++++++++++++
0 :  [5]
1 :  [6]
2 :  [7]
3 :  [8]
4 :  [9]
+++++++++++++++++++++++
0 :  [0]
1 :  [1]
2 :  [2]
3 :  [3]
4 :  [4]
+++++++++++++++++++++++

which is not I wanted, and I hope the result is:

0 :  [0]
1 :  [1]
2 :  [2]
3 :  [3]
4 :  [4]
0 :  [5]
1 :  [6]
2 :  [7]
3 :  [8]
4 :  [9]
+++++++++++++++++++++++
0 :  [0]
1 :  [1]
2 :  [2]
3 :  [3]
4 :  [4]
0 :  [5]
1 :  [6]
2 :  [7]
3 :  [8]
4 :  [9]
+++++++++++++++++++++++
0 :  [0]
1 :  [1]
2 :  [2]
3 :  [3]
4 :  [4]
0 :  [5]
1 :  [6]
2 :  [7]
3 :  [8]
4 :  [9]
+++++++++++++++++++++++

Maybe in the for loop went some wrong, but I cannot find any solutions to deal with it. where is wrong with my code? And how can I deal with it? And I use 11 to substitute 5 in ##(1), and it suspended, how can I deal with it?

2 Answers 2

3

How about using thread-pool? Here is my approach:

  • Change your MongoInsertThread.run() method to make it run forever until it sees some return point (a None job, e.g.).
  • Make a thread-pool MongoInsertThread threads.
  • Update save_to_mongo_with_threads: start the thread-pool -> put jobs to queue -> stop the thread-pool.

Updated: Explain a bit more about thread-pool in this solution

  • Thread-pool is a collection of multiple threads.
  • Threads within a thread-pool share the same queue of jobs.
  • Each thread runs forever (MongoInsertThread.run() method):
    • (1) gets a job from the shared queue.
    • (2) if the job is None -> break the forever loop (i.e. stop current thread)
    • (3) else (the job is not None) -> processes the job.
    • (4) go to (1).

Code:

import threading
import queue
import time


class MongoInsertThread(threading.Thread):
    def __init__(self, queue, thread_id):
        super(MongoInsertThread, self).__init__()
        self.thread_id = thread_id
        self.queue = queue

    def run(self):
        while True:
            job = self.queue.get()
            if job is None:
                return
            print(self.thread_id, ": ", job)


class ThreadPool:
    def __init__(self, queue, thread_count):
        self._queue = queue
        self._thread_count = thread_count
        self._workers = []
        for thread_id in range(thread_count):
            worker_thrd = MongoInsertThread(queue, thread_id)
            self._workers.append(worker_thrd)

    def start(self):
        for worker_thrd in self._workers:
            worker_thrd.start()

    def stop(self):
        # put None job, each worker thread picks one then stops
        for worker_thrd in self._workers:
            self._queue.put(None)
        # wait for worker threads
        for worker_thrd in self._workers:
            worker_thrd.join()


def save_to_mongo_with_threads():
    q = queue.Queue()

    pool = ThreadPool(q, 5)
    pool.start()

    time.sleep(1.0)

    for e in range(3):
        for i in range(10):
            q.put([e, i])
        print("+++++++++++++++++++++++")

    time.sleep(1.0)

    pool.stop()


save_to_mongo_with_threads()

Note: Jobs might be not evenly distributed among threads. One possible output:

+++++++++++++++++++++++
0 :  [0, 0]
2 :  [0, 1]
4 :  [0, 3]
1 :  [0, 4]
0 :  [0, 5]
2 :  [0, 6]
4 :  [0, 7]
3 :  [0, 2]
+++++++++++++++++++++++
2 :  [0, 9]
0 :  [0, 8]
+++++++++++++++++++++++
2 :  [1, 2]
0 :  [1, 3]
4 :  [1, 0]
4 :  [1, 7]
1 :  [1, 4]
2 :  [1, 5]
4 :  [1, 8]
1 :  [1, 9]
2 :  [2, 0]
4 :  [2, 1]
1 :  [2, 2]
2 :  [2, 3]
4 :  [2, 4]
3 :  [1, 1]
2 :  [2, 6]
4 :  [2, 7]
3 :  [2, 8]
2 :  [2, 9]
1 :  [2, 5]
0 :  [1, 6]
Sign up to request clarification or add additional context in comments.

2 Comments

Thank you! But it's a little bit hard to understand.
when initialize the ThreadPool, you pass an empty queue, and then start it, there is no data sending to the thread, why it doesn't go wrong? @duong_dajgja
1

The output you are seeing is a result of how you populate the queue, consider what the block

for i in range(10):
    q.put([i], block=False)

is doing; it is queuing 10 new values to q, subsequently the block

for i in range(5):
    threads.append(MongoInsertThread(q, i))

passes queue to 5 threads, each of which calls get() once for a total of 5 calls. So at line 16 in the first iteration of e, q is comprised of

[0], [1], [2], [3], [4], [5], [6], [7], [8], [9]

After the threads complete it contains

[5], [6], [7], [8], [9]

After which 10 more values are queued, yielding

[5], [6], [7], [8], [9], [0], [1], [2], [3], [4], [5], [6], [7], [8], [9]

5 values are then again removed leaving

[0], [1], [2], [3], [4], [5], [6], [7], [8], [9]

After which another 10 values are queued,

[0], [1], [2], [3], [4], [5], [6], [7], [8], [9], [0], [1], [2], [3], [4], [5], [6], [7], [8], [9]

Followed by the last removal which leaves

[5], [6], [7], [8], [9], [0], [1], [2], [3], [4], [5], [6], [7], [8], [9]

If you want 10 elements removed every time you must have 10 calls to get(), in the setup you have this requires 10 threads, or changing the thread loop to

for i in range(10):
  threads.append(MongoInsertThread(q, i))

However this isn't necessarily the most efficient way to do this, since for 100 elements you must generate 100 threads. It is better to generate a smaller number of threads and have them each call get() multiple times, like so

class MongoInsertThread(threading.Thread):
    def __init__(self,  queue, thread_id, m):
        super(MongoInsertThread, self).__init__()
        self.thread_id = thread_id
        self.queue = queue
        self.m = m

    def run(self):
        for i in range(self.m):
            # Using sys,stdout.write keeps the output lines from getting garbled
            sys.stdout.write(str(self.thread_id)+': '+str(self.queue.get())+"\n")

def save_to_mongo_with_thread():
    q = Queue.Queue()
    n1 = 11
    n2 = 5

    for e in range(3):
        for i in range(n1):
            q.put([i], block=False)
        threads = []
        # Caution - if n1 < n2 this will fail
        for i in range(n2-1):
            threads.append(MongoInsertThread(q, i, (n1/n2)))
        threads.append(MongoInsertThread(q, n2-1, (n1/n2)+(n1 % n2))) # Handles (n1 % n2 != 0)
        for t in threads:
            t.start()
        for t in threads:
            t.join()
        print("+++++++++++++++++++++++")

Sample output (using n1 = 11 not n1 = 10)

0: [0]
0: [1]
1: [2]
1: [3]
2: [4]
2: [5]
3: [6]
3: [7]
4: [8]
4: [9]
4: [10]
+++++++++++++++++++++++
0: [0]
0: [1]
1: [2]
1: [3]
2: [4]
2: [5]
3: [6]
3: [7]
4: [8]
4: [9]
4: [10]
+++++++++++++++++++++++
0: [0]
0: [1]
1: [2]
2: [3]
1: [4]
2: [5]
3: [6]
3: [7]
4: [8]
4: [9]
4: [10]
+++++++++++++++++++++++

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.