1

I have a large legacy application that has one function that is a prime candidate to be executed async. It's IO bound (network and disk) and doesn't return anything. This is a very simple similar implementation:

import random
import time
import requests

def fetch_urls(site):
    wait = random.randint(0, 5)
    filename = site.split("/")[2].replace(".", "_")
    print(f"Will fetch {site} in {wait} seconds")
    time.sleep(wait)
    r = requests.get(site)
    with open(filename, "w") as fd:
        fd.write(r.text)

def something(sites):
    for site in sites:
        fetch_urls(site)
    return True

def main():
    sites = ["https://www.google.com", "https://www.reddit.com", "https://www.msn.com"]
    start = time.perf_counter()
    something(sites)
    total_time = time.perf_counter() - start
    print(f"Finished in {total_time}")

if __name__ == "__main__":
    main()

My end goal would be updating the something function to run fetch_urls async. I cannot change fetch_urls.

All documentation and tutorials I can find assumes my entire application is async (starting from async def main()) but this is not the case. It's a huge application spanning across multiple modules and re-factoring everything for a single function doesn't look right.

For what I understand I will need to create a loop, add tasks to it and dispatch it somehow, but I tried many different things and I still get everything running just one after another - as oppose to concurrently.

I would appreciate any assistance. Thanks!

1 Answer 1

1

Replying to myself, it seems there is no easy way to do that with async. Ended up using concurrent.futures

import time
import requests
import concurrent.futures


def fetch_urls(url, name):
    wait = 5
    filename = url.split("/")[2].replace(".", "_")
    print(f"Will fetch {name} in {wait} seconds")
    time.sleep(wait)
    r = requests.get(url)
    with open(filename, "w") as fd:
        fd.write(r.text)

def something(sites):
    with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
        future_to_url = {
            executor.submit(fetch_urls, url["url"], url["name"]): (url)
            for url in sites["children"]
        }
        for future in concurrent.futures.as_completed(future_to_url):
            url = future_to_url[future]
            try:
                data = future.result()
            except Exception as exc:
                print("%r generated an exception: %s" % (url, exc))
    return True


def main():
    sites = {
        "parent": "https://stackoverflow.com",
        "children": [
            {"name": "google", "url": "https://google.com"},
            {"name": "reddit", "url": "https://reddit.com"},
        ],
    }
    start = time.perf_counter()
    something(sites)
    total_time = time.perf_counter() - start
    print(f"Finished in {total_time}")
Sign up to request clarification or add additional context in comments.

1 Comment

Indeed, I realized the same thing independently. Tried using async, but decided to use threads (instead of Process)--I believe it should be more lightweight. That would be concurrent.futures.ThreadPoolExecutor.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.