9

I need to process a large remote CSV line by line without downloading it entirely.

Below is the closest I got. I iterate byte chunks from Azure, and have some code to handle truncated lines. But this cannot work if csv values contain a newline as I am not able to discernate between value newlines and csv newlines.

# this does not work
def azure_iter_lines(logger_scope, client, file_path):
    # get a StorageStreamDownloader
    # https://learn.microsoft.com/en-us/python/api/azure-storage-file-datalake/azure.storage.filedatalake.storagestreamdownloader?view=azure-python
    file_client = client.get_file_client(file_path)
    file_handle = file_client.download_file()

    truncated_line = ''
    for chunk in file_handle.chunks():
        # have the previous truncated line appended to the next block
        chunk_txt = truncated_line + chunk.decode("utf-8")
        lines = chunk_txt.split('\n') # THIS CANNOT WORK AS VALUES CONTAIN NEWLINES
        for line in lines[0:len(lines)-2]:
            yield line
        truncated_line = lines[len(lines)-1]

    # process the last chunk (same code)
    chunk_txt = truncated_line
    lines = chunk_txt.split('\n') # THIS CANNOT WORK AS VALUES CONTAIN NEWLINES
    for line in lines[0:len(lines)-2]:
        yield line
    truncated_line = lines[len(lines)-1]

Ideally I would use csv.DictReader() but I was not able to to so as it downloads the file entirely.

# this does not work
def azure_iter_lines(logger_scope, client, file_path):
    file_client = client.get_file_client(file_path)
    file_handle = file_client.download_file()
    buffer = io.BytesIO()
    file_handle.readinto(buffer) # THIS DOWNLOADS THE FILE ENTIRELY
    csvreader = csv.DictReader(buffer, delimiter=";")
    return csvreader

Here is an update using some hints by @H.Leger

Please note that this still does not work

file_client = client.get_file_client(file_path)
file_handle = file_client.download_file()
stream = codecs.iterdecode(file_handle.chunks(), 'utf-8')
csvreader = csv.DictReader(stream, delimiter=";")
for row in csvreader:
    print(row)
# => _csv.Error: new-line character seen in unquoted field - do you need to open the file in universal-newline mode?

EDIT: Final solution based on @paiv answer

EDIT: Updated solution to use io instead of codecs for faster parsing

import io
import csv
import ctypes as ct

# bytes chunk iterator to python stream adapter 
# https://stackoverflow.com/a/67547597/2523414

class ChunksAdapter:
    def __init__(self, chunks):
        self.chunks = chunks
        self.buf = b''
        self.closed = False
    
    def readable(self):
        return True
        
    def writable(self):
        return False
    
    def seekable(self):
        return False
        
    def close(self):
        self.closed = True
        
    def read(self, size):
        if not self.buf:
            self.buf = next(self.chunks, b'')
        res, self.buf = self.buf[:size], self.buf[size:]
        return res



# get the downloader object
file_client = client.get_file_client(file_path)
downloader = file_client.download_file()
# adapt the downloader iterator to a byte stream
file_object = ChunksAdapter(downloader.chunks())
# decode bytes stream to utf-8
text_stream = io.TextIOWrapper(file_object, encoding='utf-8', newline='') 

# update csv field limit to handle large fields
# https://stackoverflow.com/a/54517228/2523414
csv.field_size_limit(int(ct.c_ulong(-1).value // 2)) 

csvreader = csv.DictReader(text_stream, delimiter=";", quotechar='"', quoting=csv.QUOTE_MINIMAL)
for row in csvreader:
    print(row)
14
  • Have you tried to use pandas:kite.com/python/answers/…? We can defile how many lines we read at one time. Commented May 13, 2021 at 2:51
  • @Jim Xu I couldn't find a way to use this without downloading the entire file first Commented May 14, 2021 at 6:16
  • Hi pandas.read_csv support http url. I think you can try it with the file URL which contained as token. Commented May 14, 2021 at 6:39
  • @Jim Xu I don't know if I can use an URL for this. I use the pyhton azure datalake storage client with cert auth Commented May 14, 2021 at 21:19
  • @ClémentPrévost Can you provide a small sample of the csv youre trying to parse Commented May 15, 2021 at 13:49

2 Answers 2

7
+100

Disclaimer: I know little Azure specifics. Ultimately, you would want to stream separate chunks too.

In Python, given a file object, you can set up CSV streaming this way:

import codecs
import csv
codec = codecs.getreader('utf-8')
text_stream = codec(file_object)
csvreader = csv.DictReader(text_stream)

Now you can iterate over csvreader, and it will read from file_object in a streaming fasion.

Edit: as @Martijn Pieters suggested, we can gain performance with TextIOWrapper instead of codecs:

text_stream = io.TextIOWrapper(file_object, encoding='utf-8', newline='')

Check the comment in csv module on newline parameter.

But Azure's StorageStreamDownloader does not provide python's file object interface. It has .chunks() generator (which I assume will invoke separate HTTP request to retrieve next chunk).

You can adapt .chunks() into a file object with a simple adapter:

class ChunksAdapter:
    def __init__(self, chunks):
        self.chunks = chunks
        self.buf = b''
        
    def read(self, size):
        if not self.buf:
            self.buf = next(self.chunks, b'')
        res, self.buf = self.buf[:size], self.buf[size:]
        return res

And use like

downloader = file_client.download_file()
file_object = ChunksAdapter(downloader.chunks())

Be sure to configure DictReader for the appropriate CSV dialect.

And set appropriate values for max_single_get_size, max_chunk_get_size on the blob client.

Sign up to request clarification or add additional context in comments.

3 Comments

Why use codecs at all? io.TextIOWrapper() would be more robust.
Hello I just tested the solution with io.TextIOWrapper() and the ChunkAdapter is missing some methods to make it work straight away: AttributeError: 'ChunksAdapter' object has no attribute 'readable'.
Ok I added the necessary methods to support io.TextIOWrapper and it's way faster
3

I believe the requests package can be useful for you. Using the stream option while getting your file and the Response.iter_lines() function should do what you need :

import codecs
import csv
import requests

url = "https://navitia.opendatasoft.com//explore/dataset/all-datasets/download?format=csv"
r = requests.get(url, stream=True)  # using the stream option to avoid loading everything

try:
    buffer = r.iter_lines()  # iter_lines() will feed you the distant file line by line
    reader = csv.DictReader(codecs.iterdecode(buffer, 'utf-8'), delimiter=';')
    for row in reader:
        print(row)  # Do stuff here
finally:
    r.close()

3 Comments

Thank you for your answer. I have 2 concerns here: i only have access to the azure lib that feed me a bytes chunk iterator, I cannot use requests. Even if that was possible, do iter_lines and dict reader understand newlines inside column values?
Oops, I didn't realize azure was so special. For your second point, there should be no issue with newlines inside string columns if the file is properly formatted.
I updated the question to use codecs.iterdecode and I have a newline-related error.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.