0

I am reading files asynchronously, but I am running into errors saying await only allowed in async function. This error occurs when the await keyword is used inside of a function that was not marked as async, but I marked read_blob() as async

I don’t have any issues running this in .ipynb Jupiter notebook (in which I run the code cell by cell sequentially), but when I run it in VSCode in a .py file, it errors.

from azure.storage.blob.aio import ContainerClient
import asyncio
from azure.core.exceptions import ResourceNotFoundError
from io import StringIO, BytesIO


class AsyncContainerClient(ContainerClient):
    async def read_blob(self,
                  blob_name: str,
                  add_blob_name_col=False,
                  add_blob_date_col=False,
                  preprocessing_func=None,
                  zip_regex=r'.+\.gz$',
                  csv_regex='.+\.csv(\.gz)?$',
                  parquet_regex='.+\.parquet$',
                  regex_string=None,
                  **kwargs):
        assert isinstance(blob_name, str), f'{blob_name} is not a string'

        try:
            blob = (await self.download_blob(blob_name))

            with BytesIO() as byte_stream:
                await blob.readinto(byte_stream)
                byte_stream.seek(0)
                return pd.read_parquet(byte_stream, engine='pyarrow')
        except ResourceNotFoundError:
            return 0


blob_sas_url = "https://proan.blob"
acc = AsyncContainerClient.from_container_url(blob_sas_url)

test_dirs = ["models1/model.parquet", "models2/model.parquet", "models3/model.parquet"]
res = await asyncio.gather(*(acc.read_blob(f) for f in test_dirs))

error_img

2
  • The line shown in the error isn’t in the code you shared. Commented Jun 27, 2022 at 13:44
  • 2
    The error fully describes the issue. You should execute that part of code inside an async function. Commented Jun 27, 2022 at 13:45

1 Answer 1

3

The entry point to an async program should be asyncio.run, you should wrap your code in an async method then call it

from azure.storage.blob.aio import ContainerClient
import asyncio
from azure.core.exceptions import ResourceNotFoundError
from io import StringIO, BytesIO


class AsyncContainerClient(ContainerClient):

    async def read_blob(self,
                  blob_name: str,
                  add_blob_name_col=False,
                  add_blob_date_col=False,
                  preprocessing_func=None,
                  zip_regex=r'.+\.gz$',
                  csv_regex='.+\.csv(\.gz)?$',
                  parquet_regex='.+\.parquet$',
                  regex_string=None,
                  **kwargs):

        assert isinstance(blob_name, str), f'{blob_name} is not a string'

        try:
            blob = (await self.download_blob(blob_name))

            with BytesIO() as byte_stream:
                await blob.readinto(byte_stream)
                byte_stream.seek(0)
                return pd.read_parquet(byte_stream, engine='pyarrow')
        
        except ResourceNotFoundError:
            return 0


async def main():
    blob_sas_url = "https://proan.blob"
    acc = AsyncContainerClient.from_container_url(blob_sas_url)

    test_dirs = ["models1/model.parquet", "models2/model.parquet", 
    "models3/model.parquet"]
    return await asyncio.gather(*(acc.read_blob(f) for f in test_dirs))

asyncio.run(main())
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.