1

I have the following code:

        with ThreadPoolExecutor(max_workers=num_of_pages) as executor:
            futh = [(executor.submit(self.getdata2, page, hed, data, apifolder,additional)) for page in pages]
            for data in as_completed(futh):
                datarALL = datarALL + data.result()
        return datarALL

The num_of_pages isn't fixed but usualy it's around 250. getdata2 func creates GET requests and return each page results:

The problem is that all 250 pages (threads) are created together. which means 250 GET requests which are called at the same time. This cause overload in the server so I get alot of retries due to delayed server response which shuts down the GET call and retry it. I want to avoid it.

I thought of creating some sort of lock which will prevent the thread/page from creating the GET request if there are more than 10 active requests. In such case it will wait till a slot is available.

Some thing like:

executing_now = []
def getdata2(...)
    ...
    while len(executing_now)>10:
       sleep(10)
    executing_now.append(page)
    response = requests.get(url, data=data, headers=hed, verify=False)
    ....
    executing_now.remove(page)
    return ...

Is there an existed mechanism for this in Python? This requires the threads to check for a shared memory... I want to avoid the multi threading problems such as deadlock etc..

Basically warp the GET call with a limit of how many threads can execute it on the same time.

2
  • Are you aware of the GIL in most implementations of Python ? Commented Aug 26, 2018 at 9:12
  • @BasileStarynkevitch yes but it doesn't matter here. My issue is when GET is called and then there is context switch to another thread. I want to narrow this possibility allowing only 10 threads to be executable. Commented Aug 26, 2018 at 9:15

1 Answer 1

1

We can use queue to "prepare" all your pages, and then you can limit to your thread pool to any number of threads since each thread will fetch needed page from queue:

# preparing here all you page objects
pages_queue = queue.Queue()
[pages_queue.put(page) for page in pages]

# ThreadPool - Each thread will take one page from queue, and when done, will fetch next one
with ThreadPoolExecutor(max_workers=10) as executor:
    futh = [(executor.submit(self.getdata2, pages_queue, hed, data, apifolder,additional))]
    for data in as_completed(futh):
        datarALL = datarALL + data.result()
return datarALL

def getdata2(...)
    ...
    try:
       while True: # non blocking wait will raise Empty when queue is empty
          page = pages_queue.get_nowait()
          response = requests.get(page.url, data=data, headers=hed, verify=False)
          ....
          return ...
    except queue.Empty:
       pass
Sign up to request clarification or add additional context in comments.

1 Comment

Thank you. actually your answer helped me to find a much simpler solution... The max_workers=num_of_pages was wrong. changing it to cpu_count() solved my issue.. Only 4 threads are created at each time. When one finished another is created.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.