1

I am trying to read a big file and export batches of CSV files using Asyncio. I know Asyncio does not support async IO for the same file, so I am trying to export to individual files per task giving the batch number. But it only runs synchronously..

I have main.py and it has a function def start()

def start():
    asyncio.get_event_loop().run_until_complete(processing.test_async(dictRunData))

I have processing.py and has a function test_async()

async def test_async(dictRunData):
  num_logical_cpus = multiprocessing.cpu_count()
  with open(dictRunData['input_file'], 'r') as infile:
    content = infile.read().replace('\n', '')
    lstcontent = ast.literal_eval(content)

  tasks = []
  chunkNum = 0
  chunk_contents = numpy.array_split(numpy.array(lstcontent), num_logical_cpus)
  print(f"number of chunks: {len(chunk_contents)}")
  for chunk in chunk_contents:
    chunkNum += 1
    task = asyncio.create_task(process_chunk_async(chunk, chunkNum))
    tasks.append(task)

  result = await asyncio.gather(*tasks, return_exceptions=True)

Here is the function process the given chunk.

async def process_chunk_async(chunk, chunkNum, dictRunData):
    dict_results = {}
    for data in chunk:
       ..do something..
       dict_results.append(data)

    outputfile = await write_chunk_async(dict_results, chunkNum, dictRunData)

Here is the write_chunk_async

async def write_chunk_async(dict_results, chunkNum, dictRunData):
    fileName = f"_{chunkNum}.csv"
    wrtieFileTo = open(fileName,"a+")

    for data in dict_results.keys():
        wrtieFileTo.write(data + "\n")

    wrtieFileTo.close()

    print(f"Done write_chunk_async file: {fileName}")
3
  • I don't think asyncio does what you want. I think you want multithreading? Commented Mar 2, 2020 at 21:27
  • What does write_chunk_async look like? Commented Mar 2, 2020 at 21:38
  • @dano I just added wrie_chunk_async Commented Mar 2, 2020 at 21:43

1 Answer 1

3

asyncio only provides concurrency if you are using its APIs to do asynchronous I/O. In your sample code, all of your I/O (reading/writing files) is done using synchronous, blocking APIs, so using asyncio doesn't add any value. Now, asyncio actually doesn't provide any APIs for asynchronous reading/writing of files, because it is not well-supported at the Operating System level. See this explanation from the Python wiki.

There is a third-party library, aiofiles, which provides an asyncio-friendly API for file I/O, but it's just delegating all the work to background threads under the covers, so there's really no reason to use it you're not trying to integrate file I/O into an application already using asyncio. If all your application does is read/write files, just use threads directly. Keep in mind though, that if all your threads are reading/writing files to the same disk, multithreading may not help much, either, since ultimately all threads will contend with each other trying to access the single disk.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.