I am trying to read a big file and export batches of CSV files using Asyncio. I know Asyncio does not support async IO for the same file, so I am trying to export to individual files per task giving the batch number. But it only runs synchronously..
I have main.py and it has a function def start()
def start():
asyncio.get_event_loop().run_until_complete(processing.test_async(dictRunData))
I have processing.py and has a function test_async()
async def test_async(dictRunData):
num_logical_cpus = multiprocessing.cpu_count()
with open(dictRunData['input_file'], 'r') as infile:
content = infile.read().replace('\n', '')
lstcontent = ast.literal_eval(content)
tasks = []
chunkNum = 0
chunk_contents = numpy.array_split(numpy.array(lstcontent), num_logical_cpus)
print(f"number of chunks: {len(chunk_contents)}")
for chunk in chunk_contents:
chunkNum += 1
task = asyncio.create_task(process_chunk_async(chunk, chunkNum))
tasks.append(task)
result = await asyncio.gather(*tasks, return_exceptions=True)
Here is the function process the given chunk.
async def process_chunk_async(chunk, chunkNum, dictRunData):
dict_results = {}
for data in chunk:
..do something..
dict_results.append(data)
outputfile = await write_chunk_async(dict_results, chunkNum, dictRunData)
Here is the write_chunk_async
async def write_chunk_async(dict_results, chunkNum, dictRunData):
fileName = f"_{chunkNum}.csv"
wrtieFileTo = open(fileName,"a+")
for data in dict_results.keys():
wrtieFileTo.write(data + "\n")
wrtieFileTo.close()
print(f"Done write_chunk_async file: {fileName}")
asynciodoes what you want. I think you wantmultithreading?write_chunk_asynclook like?wrie_chunk_async