6
import threading

threads = []
for n in range(0, 60000):
    t = threading.Thread(target=function,args=(x, n))
    t.start()
    threads.append(t)
for t in threads:
    t.join()

It is working well for range up to 800 on my laptop, but if I increase range to more than 800 I get the error can't create new thread.

How can I control number to threads to get created or any other way to make it work like timeout? I tried using threading.BoundedSemaphore function but that doesn't seem to work properly.

8
  • 6
    Don't create so many threads at once, of course. There is a relatively finite limit after which more threads will actually hurt performance, and 60k is well above that limit: remember that each thread consumes resources. Thankfully the system prevents this (either via soft or hard limit). Search for "thread pool" - e.g. stackoverflow.com/questions/3033952/… , docs.python.org/3/library/concurrent.futures.html - for one approach to solve this problem. Commented Sep 5, 2013 at 22:55
  • 6
    This smells very strongly of an XY problem. Why do you want (or think you want) 60000 threads? What are you trying to do? The right answer may be "use a threadpool of 8 threads with 60000 tasks", or "use 8 threads that each do a numpy vector operation on 7500 values", or "use greenlets/coroutines/explicit cooperative threads", or "there is no benefit to parallelizing this code in the first place", but it's almost certainly not going to be a direct answer to your question. Commented Sep 5, 2013 at 23:15
  • Also, in the future, don't just describe the error; post the actual traceback. In this case, people can guess what happened, but in general, it's better if you don't make us guess. Similarly, don't just say "I tried using threading.BoundedSemaphore function but that doesn't seem to work properly"; show the code you tried, and what it did wrong. Commented Sep 6, 2013 at 0:52
  • @abarnert but thats decreasing speed damn a lot Commented Sep 6, 2013 at 8:00
  • @Lady: I'm not sure what "that" you're referring to as decreasing speed a lot. But running more threads than you have cores definitely decreases speed far more than anything else you might consider. There's scheduler overhead (which is super-linear on most platforms), context switching, lost cache/paging/DMA benefits, etc., all of which waste time, and there's no compensating benefit at all. Commented Sep 6, 2013 at 17:57

1 Answer 1

18

The problem is that no major platform (as of mid-2013) will let you create anywhere near this number of threads. There are a wide variety of different limitations you could run into, and without knowing your platform, its configuration, and the exact error you got, it's impossible to know which one you ran into. But here are two examples:

  • On 32-bit Windows, the default thread stack is 1MB, and all of your thread stacks have to fit into the same 2GB of virtual memory space as everything else in your program, so you will run out long before 60000.
  • On 64-bit linux, you will likely exhaust one of your session's soft ulimit values before you get anywhere near running out of page space. (Linux has a variety of different limits beyond the ones required by POSIX.)

So, how can i control number to threads to get created or any other way to make it work like timeout or whatever?

Using as many threads as possible is very unlikely to be what you actually want to do. Running 800 threads on an 8-core machine means that you're spending a whole lot of time context-switching between the threads, and the cache keeps getting flushed before it ever gets primed, and so on.

Most likely, what you really want is one of the following:

  • One thread per CPU, serving a pool of 60000 tasks.
    • Maybe processes instead of threads (if the primary work is in Python, or in C code that doesn't explicitly release the GIL).
    • Maybe a fixed number of threads (e.g., a web browsers may do, say, 12 concurrent requests at a time, whether you have 1 core or 64).
    • Maybe a pool of, say, 600 batches of 100 tasks apiece, instead of 60000 single tasks.
  • 60000 cooperatively-scheduled fibers/greenlets/microthreads all sharing one real thread.
    • Maybe explicit coroutines instead of a scheduler.
    • Or "magic" cooperative greenlets via, e.g. gevent.
    • Maybe one thread per CPU, each running 1/Nth of the fibers.

But it's certainly possible.

Once you've hit whichever limit you're hitting, it's very likely that trying again will fail until a thread has finished its job and been joined, and it's pretty likely that trying again will succeed after that happens. So, given that you're apparently getting an exception, you could handle this the same way as anything else in Python: with a try/except block. For example, something like this:

threads = []
for n in range(0, 60000):
    while True:
        t = threading.Thread(target=function,args=(x, n))
        try:
            t.start()
            threads.append(t)
        except WhateverTheExceptionIs as e:
            if threads:
                threads[0].join()
                del threads[0]
            else:
                raise
        else:
            break
for t in threads:
    t.join()

Of course this assumes that the first task launched is likely to be the one of the first tasks finished. If this is not true, you'll need some way to explicitly signal doneness (condition, semaphore, queue, etc.), or you'll need to use some lower-level (platform-specific) library that gives you a way to wait on a whole list until at least one thread is finished.

Also, note that on some platforms (e.g., Windows XP), you can get bizarre behavior just getting near the limits.


On top of being a lot better, doing the right thing will probably be a lot simpler as well. For example, here's a process-per-CPU pool:

with concurrent.futures.ProcessPoolExecutor() as executor:
    fs = [executor.submit(function, x, n) for n in range(60000)]
    concurrent.futures.wait(fs)

… and a fixed-thread-count pool:

with concurrent.futures.ThreadPoolExecutor(12) as executor:
    fs = [executor.submit(function, x, n) for n in range(60000)]
    concurrent.futures.wait(fs)

… and a balancing-CPU-parallelism-with-numpy-vectorization batching pool:

with concurrent.futures.ThreadPoolExecutor() as executor:
    batchsize = 60000 // os.cpu_count()
    fs = [executor.submit(np.vector_function, x, 
                          np.arange(n, min(n+batchsize, 60000)))
          for n in range(0, 60000, batchsize)]
    concurrent.futures.wait(fs)

In the examples above, I used a list comprehension to submit all of the jobs and gather their futures, because we're not doing anything else inside the loop. But from your comments, it sounds like you do have other stuff you want to do inside the loop. So, let's convert it back into an explicit for statement:

with concurrent.futures.ProcessPoolExecutor() as executor:
    fs = []
    for n in range(60000):
        fs.append(executor.submit(function, x, n))
    concurrent.futures.wait(fs)

And now, whatever you want to add inside that loop, you can.


However, I don't think you actually want to add anything inside that loop. The loop just submits all the jobs as fast as possible; it's the wait function that sits around waiting for them all to finish, and it's probably there that you want to exit early.

To do this, you can use wait with the FIRST_COMPLETED flag, but it's much simpler to use as_completed.

Also, I'm assuming error is some kind of value that gets set by the tasks. In that case, you will need to put a Lock around it, as with any other mutable value shared between threads. (This is one place where there's slightly more than a one-line difference between a ProcessPoolExecutor and a ThreadPoolExecutor—if you use processes, you need multiprocessing.Lock instead of threading.Lock.)

So:

error_lock = threading.Lock
error = []

def function(x, n):
    # blah blah
    try:
        # blah blah
    except Exception as e:
        with error_lock:
            error.append(e)
    # blah blah

with concurrent.futures.ProcessPoolExecutor() as executor:
    fs = [executor.submit(function, x, n) for n in range(60000)]
    for f in concurrent.futures.as_completed(fs):
        do_something_with(f.result())
        with error_lock:
            if len(error) > 1: exit()

However, you might want to consider a different design. In general, if you can avoid sharing between threads, your life gets a lot easier. And futures are designed to make that easy, by letting you return a value or raise an exception, just like a regular function call. That f.result() will give you the returned value or raise the raised exception. So, you can rewrite that code as:

def function(x, n):
    # blah blah
    # don't bother to catch exceptions here, let them propagate out

with concurrent.futures.ProcessPoolExecutor() as executor:
    fs = [executor.submit(function, x, n) for n in range(60000)]
    error = []
    for f in concurrent.futures.as_completed(fs):
        try:
            result = f.result()
        except Exception as e:
            error.append(e)
            if len(error) > 1: exit()
        else:
            do_something_with(result)

Notice how similar this looks to the ThreadPoolExecutor Example in the docs. This simple pattern is enough to handle almost anything without locks, as long as the tasks don't need to interact with each other.

Sign up to request clarification or add additional context in comments.

9 Comments

Your first code solution, in which you used error handling to handle error. That work - i had done that before too. But problem is that after reaching maximum number of threads that can get created by process, speed get a lot slow. like same threads takes 0.02 second to complete task of range(0, 1000) and but for range(0, 60000) it takes almost 80 seconds to complete the task which is a lot more if i compare it with speed of range(0, 1000).while it should not take more than 1.5 sec to complete task at previous speed. Also, output simply shows it too that speed gets a lot slow after 1000
process pool executer is not working for me. After executing program, there is no output at all. It just keeps waiting and waiting. For thread pool executer - yes that worked but again same problem as with first solution.Its taking damn a lot of time - which it shouldn't, even after giving 800 to ThreadPoolExecutor.
for 4th solution what is np there? also, a parenthesis is missing in third line. np is raising NameError i.e, its not defined. Does it require to declare it as variable before somewhere or what? Making program compatible with python 2.7 not 3.x. Also, for concurrent i got it for python 2.7 !
@Lady: For the try/except solution, I explained why that will be very slow, and other reasons it's a bad idea. The details of how it slows down depend heavily on your platform. For example, with 32-bit Windows XPSP1, the first 1000 or so threads can be created very quickly, but then they get slower and slower, and around 1024 you finally get a failure that can take 15 seconds. With 64-bit OS X 10.8, the stack space is pegged, so on a 2GB machine, 1000 threads could easily be enough to cause swap thrashing. And so on.
@Lady: For the executor solutions, I can't debug your code unless I can see it. But if you read the docs, there are some simple examples that should help. (Meanwhile, it looks like you figured this out yourself, but for future readers, to use Python 2.7, you need to install the backport from PyPI.) Also, you do not want to give 800 to ThreadPoolExecutor; again, you almost certainly want either the default, or a smallish static number like 12.
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.