2

I have an asyncio python azure script that uses multiple tasks to upload files to blobs from an asyncio queue. It works fine, at least up until the point where it uses up all available memory on the system. I can't figure out where the memory leak is. Normally I use memory-profiler, but this doesn't seem to work with async functions.

Can someone tell me either what I'm doing wrong here, or else what the best way would be to find out where the issue lies? Thanks. It's not clear to me what is not being cleaned up, if anything.

I put anywhere from a few hundred to a few thousand files on the work queue, and usually run with 3-5 tasks. Within the space of a couple of minutes this program uses up anywhere from 3 to 6GB of resident memory and then starts eating into swap until, if it runs long enough, it gets killed from memory starvation. This is on a linux box with 8GB memory using Python 3.6.8 and the following azure libraries:

azure-common 1.1.25
azure-core 1.3.0
azure-identity 1.3.0
azure-nspkg 3.0.2
azure-storage-blob 12.3.0

from azure.identity.aio import ClientSecretCredential
from azure.storage.blob.aio import BlobClient

async def uploadBlobsTask(taskName, args, workQueue):
    while not workQueue.empty():
        fileName = await workQueue.get()
        blobName = fileName.replace(args.sourceDirPrefix, '')

        blobClient = BlobClient(
            "https://{}.blob.core.windows.net".format(args.accountName),
            credential = args.creds,
            container_name = args.container,
            blob_name = blobName,
        )

        async with blobClient:
            args.logger.info("Task {}: uploading {} as {}".format(taskName, fileName, blobName))
            try:
                with open(fileName, "rb") as data:
                    await blobClient.upload_blob(data, overwrite=True)
                fileNameMoved = fileName + '.moved'
                with open(fileNameMoved, "w") as fm:
                    fm.write("")
            except KeyboardInterrupt:
                raise
            except:
                args.logger.error("Task {}: {}".format(taskName, traceback.format_exc()))
                await workQueue.put(fileName)
            finally:
                workQueue.task_done()


async def processFiles(args):
    workQueue = asyncio.Queue()

    for (path, dirs, files) in os.walk(args.sourceDir):
        for f in files:
            fileName = os.path.join(path, f)                               
            await workQueue.put(fileName)

    creds = ClientSecretCredential(args.tenant, args.appId, args.password)
    args.creds = creds
    tasks = [ args.loop.create_task(uploadBlobsTask(str(i), args, workQueue)) for i in range(1, args.tasks+1) ]
    await asyncio.gather(*tasks)
    await creds.close()


loop = asyncio.get_event_loop()
args.loop = loop
loop.run_until_complete(processFiles(args))
loop.close()

1 Answer 1

2

For what it's worth, I seem to have managed to fix this so that it works without memory leaks. I did this by obtaining a containerClient and then obtaining blobClients off of that (ie, containerClient.get_blob_client()) instead of obtaining BlobClient objects directly. Now the overall memory usage tops out at a very low level rather than growing continuously as before.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.