2

I want to determine how to limit the memory usage inside a job which retrieves a blob from a local database and transfers it to a third party web service via chunks.

Using SqlDataReader, I appear to have two options:

  1. Create a method that uses GetBytes with an offset to retrieve part of a blob returning a byte[]. The caller of the method would then be responsible for making a web request to transfer this chunk.
  2. Create a method that uses GetStream and make multiple requests to ReadAsync to fill up a byte[] buffer, making a web request with this buffer until the document has been transferred.

I have a preference for option 1, because it limits the responsibility of the method, however if I call GetBytes with an offset, will it load the entire offset into memory or is sql server capable of just returning the small chunk requested? If I use option 2, then the method will have two responsibilities, loading a chunk from the database and making web requests to store the document elsewhere.

// option 1
public async Task<Tuple<int, byte[]>> GetDocumentChunk(int documentId, int offset, int maxChunkSize)
{
    var buffer = new byte[maxChunkSize];

    string sql = "SELECT Data FROM Document WHERE Id = @Id";

    using (SqlConnection connection = new SqlConnection(ConnectionString))
    {
        await connection.OpenAsync();

        using (SqlCommand command = new SqlCommand(sql, connection))
        {
            command.Parameters.AddWithValue("@Id", documentId);

            using (SqlDataReader reader = await command.ExecuteReaderAsync(CommandBehavior.SequentialAccess))
            {
                if (await reader.ReadAsync())
                {
                    int bytesRead = (int)reader.GetBytes(0, offset, buffer, 0, maxChunkSize);
                    return new Tuple<int, byte[]>(bytesRead, buffer);
                }
            }
        }
    }

    return new Tuple<int, byte[]>(0, buffer);
}

//option 2
public async Task<CallResult> TransferDocument(int documentId, int maxChunkSize)
{
    var buffer = new byte[maxChunkSize];

    string sql = "SELECT Data FROM Document WHERE Id = @Id";

    using (SqlConnection connection = new SqlConnection(ConnectionString))
    {
        await connection.OpenAsync();

        using (SqlCommand command = new SqlCommand(sql, connection))
        {
            command.Parameters.AddWithValue("@Id", documentId);

            using (SqlDataReader reader = await command.ExecuteReaderAsync(CommandBehavior.SequentialAccess))
            {
                using (Stream uploadDataStream = reader.GetStream(0))
                {
                    CallResult callResult;
                    int bytesRead;
                    do
                    {
                        bytesRead = await uploadDataStream.ReadAsync(buffer, 0, maxChunkSize);
                        callResult = await MyWebRequest(documentId, buffer, bytesRead);
                        if (callResult != CallResult.Success)
                        {
                            return callResult;
                        }
                    } while (bytesRead > 0);

                    return callResult;
                }
            }
        }
    }
}
1

1 Answer 1

1

With option 1 you'll make many requests to the source to get the data and GetBytes does not 'search' stream on the SQL server (and I'll surprise if it does), that will be a very inefficient solution.

IAsyncEnumerable

With option 2 you get the stream and process it as on-demand, so you'll make a single DB request and will gain all benefits from asynchronous I/O.

With C# 8 IAsyncEnumerable will fit your problem perfectly, but it is in the Preview stage so far.

CopyToAsync

If you can get a stream where you need to upload content to, then you can use CopyToAsync. But I assume that each chunk will be uploaded in the individual request. If so, you may introduce a component which will quack like a Stream but will actually upload content to the website when DB stream calls CopyToAsync() on it:

class WebSiteChunkUploader : Stream
{
     private HttpClient _client = new HttpClient();
     public override bool CanWrite => true;
     public override bool CanRead => false;

     public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) =>

         await _client.PostAsync("localhost", new ByteArrayContent(buffer,offset, count));
}

Old Good IEnumerable

Unfortunately you cannot mix yield return of IEnumerable with async/await. But if you decide to read stream with a blocking api, eg Read, then you can rewrite it with old good yield return:

public IEnumerable<Tuple<byte[],int>> TransferDocument(int documentId, int maxChunkSize)
{
    string sql = "SELECT Data FROM Document WHERE Id = @Id";
    var buffer = new byte[maxChunkSize];
    using (SqlConnection connection = new SqlConnection(ConnectionString))
    {
        connection.Open();
        using (SqlCommand command = new SqlCommand(sql, connection))
        {
            command.Parameters.AddWithValue("@Id", documentId);
            using (SqlDataReader reader = command.ExecuteReader(CommandBehavior.SequentialAccess))
            using (Stream uploadDataStream = reader.GetStream(0))
            {
                while(var bytesRead = uploadDataStream.Read(buffer, 0, maxChunkSize)) > 0)
                   yield return Tuple(buffer, bytesRead);
            }
        }
    }
}

...
async Task DoMyTransfer() 
{
  foreach(var buffer in TransferDocument(1, 10000)) {
    await moveBytes(buffer)
  }
}

In this case you won't have async IO with DB and fancy Tasks, but I suppose you'll need to throttle this upload operation anyway to do not overload DB with the connections.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.