1

I'm seeking advice about my approach to the following problem. I have a constant input of data that I need to add to my buffer, and at every iteration, I need to pass buffered data to a function that accepts C-style array through a pointer.

I'm worrying about efficiency so I pondered how could I store and manage data in some sort of circular buffer, but also get it as a sequential raw data to pass it to the said function.

My current approach can be summarized in the following example:

#include <iostream>
#include <array>
#include <algorithm>

void foo(double* arr, int size)
{
  for (uint k = 0; k < size; k++)
    std::cout << arr[k] << ", ";

  std::cout << std::endl;
}

int main()
{
  const int size = 20;
  std::array<double, size> buffer{};

  for (double data = 0.0; data < 50.0; data += 1.0)
  {
      std::move(std::next(std::begin(buffer)), std::end(buffer), std::begin(buffer));
      buffer.back() = data;

      foo(buffer.data(), size);
  }
}

In real use-case, the buffer also needs to be padded to a "const" size of data at the beginning (I use quotes here because size may, or may not be known at compile-time, but once it is known, it will never change).

I store data in the std::array (or in std::vector if the size will not be known at compile-time) since the data is sequential in memory. When I need to insert new data, I use forward std::move to shift everything, and then I manually replace the last item. Finally, I just pass std::array::data() and its size to the function.

While at first glance this should work efficiently, reason tells me that because data is sequentially stored, the whole buffer will still be copied with std::move, and each insert will be O(n)

Real buffer size will probably be only in hundreds and data is arriving at 100Hz max, but the problem is I need the result of the called function as soon as possible so I don't want to lose time on a buffer management (even if we are talking few, or even less than ms). I have many questions about this, but their short-list is following:

  • Is my approach too naive?
  • Is my reasoning about O(n) correct?
  • Are there any other pitfalls with this approach?
  • Do you have suggestions for some other approach that I should look into?

3 Answers 3

1

Thank you for the answer Werner. When I run this solution on Repl.it, I get:

it took an average of 21us and a max of 57382us

For comparison, my original idea with the same buffer size has the following result:

it took an average of 19us and a max of 54129us

This means that my initial approach indeed was naive :)

In the meantime, while waiting for the answer, I've come up with following solution:

#include <iostream>
#include <array>
#include <algorithm>
#include <chrono>

void foo(double* arr, int size)
{
  for (uint k = 0; k < size; k++)
    std::cout << arr[k] << ", ";

  std::cout << std::endl;
}

int main()
{
  const int buffer_size = 20;
  std::array<double, buffer_size*2> buffer{};
  int buffer_idx = buffer_size;

  for (double data = 0.0; data < 100.0; data += 1.0)
  {
    buffer.at(buffer_idx - buffer_size) = data;
    buffer.at(buffer_idx++) = data;

    foo(buffer.data() + buffer_idx - buffer_size, buffer_size);

    buffer_idx -= buffer_size * (buffer_idx == buffer_size * 2);
  }
}

Since the size of the buffer is not a problem, I allocate twice the memory needed and insert data at two places, offset by the buffer size. When I reach the end, I just go back like the typewriter. The idea is that I fake the circular buffer by storing one more copy of data so it can read data as if it crossed full circle.

For buffer size of 50000, this gives me the following result which exactly what I wanted:

it took an average of 0us and a max of 23us
Sign up to request clarification or add additional context in comments.

2 Comments

If you really want performance you should replace te .at with the [ ] operator. See cplusplus.com/reference/vector/vector/at. In my measurements of your code it gives somewhere between 20% and 40% speedup.
this sounds like the virtual ringbuffer G. Sliepen mentionned below ;) nice, I learned something
1

Besides the answer by stribor14 I have two other suggestions. These are only based on performance, so readable or maintainable code will not really be found here.

My first idea when reading the problem was also to allocate twice the amount of storage but only write it once. When all places are written the second half will be copied over to the first half. My first instinct says this could be a better performing. My reasoning was that the same number of total writes will happen but all of the writes are sequential (instead of jumping every second write to another place in the array).

#include <cstddef>
#include <cstring>
#include <array>

const size_t buffer_size = 50'000;

int main()
{
    std::array<double, 2 * buffer_size> buffer{};
    double *index = buffer.data();
    double *mid = index + buffer_size;

    for (double data = 0.0; data < 10 * buffer_size; data += 1.0)
    {
        if (index == mid)
        {
            index = buffer.data();
            std::memcpy(index, mid, buffer_size * sizeof(double));
        }

        *(index++ + buffer_size) = data;

        foo(index, buffer_size);
    }
}

Alternatively I thought it would be possible to optimize to OP's own answer to remove the array accesses. The idea is that buffer[buffer_idx - buffer_size] takes 2 additions to calculate the location of that value namely: *(buffer + buffer_idx - buffer_size). If buffer_idx contains a pointer, only one addition is needed. This gives following code:

#include <cstddef>
#include <array>

const size_t buffer_size = 50'000;

int main()
{
    std::array<double, buffer_size * 2> buffer{};
    double *index = buffer.data();
    double *mid = buffer.data() + buffer_size;

    for (double data = 0.0; data < 10 * buffer_size; data += 1.0)
    {
        *index = data;
        *(index + buffer_size) = data;
        ++index;

        index -= buffer_size * (index == mid);

        foo(index, buffer_size);
    }
}

It was now I noticed that I was going down the rabbit hole of C++-optimization. So we couldn't stop there. To choose which implementation to use I wanted to run a benchmark. Werner Pirkl gave a good starting point. But running this on our optimized code is nonsensical because the measured times are 0μs. So let's change it a bit I wrote a loop more inside the benchmark to gave it some runtime and came up with:

const int repeats = 1000;
volatile double *ptr;
int duration = 0;
const size_t buffer_size = 50'000;

// ... Set up of the buffers and indices

for (int i = 0; i < repeats; ++i)
{
    auto t1 = std::chrono::high_resolution_clock::now();

    for (double data = 0.0; data < 10 * buffer_size; data += 1.0)
    {
        // ... add data to circular buffer

        ptr = // ... the start of the array
    }

    auto t2 = std::chrono::high_resolution_clock::now();
    duration += std::chrono::duration_cast<std::chrono::microseconds>(t2 - t1).count();
}

(Note the use of a volatile double * to ensure that the raw pointer to the contiguous array is not optimized out.)

While running these tests I noticed they are very dependent on compiler-flags (-O2 -O3 -march=native ...). I will give some results, but like all C++-benchmarks, take it with a grain of salt and run your own with a real-world workload. (The reported times are average ns per insertion)

                     with `memcpy`   stribor14   `operator[]`   with pointers 
                   |---------------|-----------|--------------|---------------|
               -O2 |         1.38  |     1.57  |        1.41  |         1.15  |
               -O3 |         1.37  |     1.63  |        1.36  |         1.09  |
 -O3 -march=native |         1.35  |     1.61  |        1.34  |         1.09  |

Needless to say: I was quite disappointed about what I thought should perform the best. But as earlier stated, this benchmark is in no way representative of any real-world performance.

1 Comment

I compile with -O3 -march=native because the rest of the code, which this is part of, uses vectorization. Thanks for your answer nonetheless. Also, I measured average time later to be around 130ns for my answer
0

You'll always have to copy your data, as a "continous" ringbuffer doesn't exist (maybe in some fancy silicon it does).

Also you can't initialize an array template of runtime defined size.

You could use a vector to achieve this:

#include <iostream>
#include <chrono>
#include <deque>
#include <vector>

int main() {

    std::vector<double> v;

    // pre fill it a little
    for(double data = 0.0; data > -50000.0; data -= 1.0) {
        v.push_back(data);
    }

    size_t cnt = 0;
    int duration = 0;
    int max = 0;

    for(double data = 0.0; data < 50000.0; data += 1.0, ++cnt) {

        auto t1 = std::chrono::high_resolution_clock::now();

        v.push_back(data);
        v.erase(v.begin());

        // foo(v.data(), v.size());

        auto t2 = std::chrono::high_resolution_clock::now();
        auto delta = std::chrono::duration_cast<std::chrono::microseconds>( t2 - t1 ).count();
        duration += delta;

        if(max == 0 || max < delta) {
            max = delta;
        }

    }

    std::cout << "it took an average of " << duration / cnt << "us and a max of " << max << " us" << std::endl;

    return 0;
}

Output:

it took an average of 11us and a max of 245 us

2 Comments

It turns out most computers do have the necessary fancy silicon to implement a continous ringbuffer, as long as the buffer's size is a multiple of the page size. See for example: vrb.sourceforge.net
The exact approach I implemented in the end

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.