0

I have a binary file which contains time series from sensors. Data format goes as follows:

#1(t0) #2(t0) #3(t0) ... #n(t0) #1(t1) #2(t1) #3(t1) ... #n(t1) ...

At a time, measured data from n sensors are stored in the file in binary format. I would like to reconstruct the time seires of a sensor such that

#1(t0) #1(t1) #1(t2) ...

The distance from #1(t0) to #1(t1), stride is fixed and known and the number of sensor are also known. The following code is my implementation. My implementation is trying to get a single data at a time and not that fast. Is there any way to improve the speed reading non-contiguous data as in collective io in MPI?

def collect_signal(fp, channel_no, stride, dtype):
    byteSize = np.dtype(dtype).itemsize
    fp.seek(0,2) # go to the file end
    eof = fp.tell() # get the eof address
    fp.seek(0,0) # rewind

    fp.seek(0 + channel_no,0) # starting point per each channel
    signal = []
    while True:
        start = fp.tell()
        sample = np.frombuffer(fp.read(byteSize), dtype=dtype)
        signal.append(sample[0])
        if fp.tell() == eof or fp.tell() + stride > eof:
            break;
        else:
            fp.seek(start + stride, 0)

    return signal

2 Answers 2

1

This simpler code may be faster. You might also want to look into using mmap to map the file into your process's address space, which lets you bypass a layer of kernel I/O calls.

def collect_signal(fp, channel_no, stride, dtype):
    byte_size = np.dtype(dtype).itemsize
    fp.seek(channel_no, 0)

    # Assuming that your read will always return an entire sample
    # or an empty string.
    for sample in iter(lambda: fp.read(byte_size), ''):
        sample = np.frombuffer(sample, dtype=dtype)
        signal.append(sample[0])
        fp.seek(stride, 1)

Another option might be to let frombuffer handle the stride for you, if you know how many channels there are. This involves reading slightly more data into memory at each step, but if the input is buffered, you are probably already reading more data into the buffer than fp.read actually returned.

    def collect_signal(fp, channel_no, stride, type):
        byte_size = np.dtype(dtype).itemsize)
        offset = channel_no * byte_size
        while True:
            sample = fp.read(byte_size * numChannels)
            if not sample:
                break
            sample = np.frombuffer(sample, dtype=dtype, count=1, offset=offset)
            signal.append(sample[0])
Sign up to request clarification or add additional context in comments.

2 Comments

thanks for the codes. AFAIK mmap does not make much difference when sequential IO is used. Is it not true?
The idea, IIUC, with mmap isn't just an change in the interface, but you are transferring data directly into your process's address space, bypassing any buffers in the file-system layer. I've also observed non-trivial differences in speed between m.read(d) and m[x:x+d], where m is my mmap object.
0

Following chepner's advice, I made some test. Signal generation is made by the following code:

#include <fstream>
#include <iostream>
#include <cmath>

int main(int argc, char** argv) {
    std::ofstream of("signal.bin");

    double omega = 10;
    double delt = 0.1;
    double phi = 1;
    int number_of_channel = 4;
    int nsize = 100000000;

    for (auto i=0; i<nsize; ++i) {
        for (auto j=0; j<number_of_channel; ++j) {
            double data = sin(omega*delt*i + phi*j);
            of.write(reinterpret_cast<char*>(&data), sizeof(double));
        }
    }
    of.close();
    return 0;
}

The signal generated a 3GB binary file. Then I wrote python code reading data non-contiguously.

import numpy as np
import time
import os

def collect_signal_rev0(fp, channel_no, stride, dtype):
    byteSize = np.dtype(dtype).itemsize
    fp.seek(0,2) # go to the file end
    eof = fp.tell() # get the eof address
    fp.seek(0,0) # rewind

    fp.seek(0 + channel_no,0) # starting point per each channel
    signal = []
    while True:
        start = fp.tell()
        sample = np.frombuffer(fp.read(byteSize), dtype=dtype)
        signal.append(sample[0])
        if fp.tell() == eof or fp.tell() + stride > eof:
            break;
        else:
            fp.seek(start + stride, 0)

    return signal

def collect_signal_rev1(fp, channel_no, stride, dtype):
    byte_size = np.dtype(dtype).itemsize
    fp.seek(channel_no, 0)

    signal = []
    sentinel = bytes()
    for sample in iter(lambda: fp.read(byte_size), sentinel):
        sample = np.frombuffer(sample, dtype=dtype)
        signal.append(sample[0])
        # since the file pointer is pointing to the next item after reading
        # it should stride with subtracted by 1 byte_size
        fp.seek(stride-byte_size, 1)

    return signal

def collect_signal_rev2(fp, num_channels, channel_no, stride, dtype):
    byte_size = np.dtype(dtype).itemsize
    offset = channel_no * byte_size

    signal = []
    while True:
        sample = fp.read(byte_size * num_channels)
        if not sample:
            break
        sample = np.frombuffer(sample, dtype=dtype, count=1, offset=offset)
        signal.append(sample[0])

    return signal

def collect_signal_rev3(fp, channel_no, stride, dtype):
    return fp[channel_no::stride]


if __name__ == '__main__':

    channels = 4
    byte_size = np.dtype(np.float).itemsize
    stride = channels*byte_size
    file_size = os.path.getsize('signal.bin')
    data_count = file_size//byte_size//channels

    print('file size: {0} bytes'.format(file_size))
    print('# of data count per channel: {0}'.format(data_count))

    start = time.clock()
    with open('signal.bin', 'rb') as fp:
        signal0 = collect_signal_rev0(fp, 0, stride, np.float)

    print('rev0 elapsed time = {0:.3f}sec'.format(time.clock() - start))

    start = time.clock()
    with open('signal.bin', 'rb') as fp:
        signal1 = collect_signal_rev1(fp, 0, stride, np.float)

    print('rev1 elapsed time = {0:.3f}sec'.format(time.clock() - start))

    start = time.clock()
    with open('signal.bin', 'rb') as fp:
        signal2 = collect_signal_rev2(fp, channels, 0, stride, np.float)

    print('rev2 elapsed time = {0:.3f}sec'.format(time.clock() - start))

    start = time.clock()
    fp = np.memmap('signal.bin', dtype=np.float, mode='r', shape=(data_count*channels,1))
    signal3 = collect_signal_rev3(fp, 0, channels, np.float)

    print('rev3 elapsed time = {0:.3f}sec'.format(time.clock() - start))

The above code show the following results:

file size: 3200000000 bytes
# of data count per channel: 100000000
rev0 elapsed time = 266.535sec
rev1 elapsed time = 116.474sec
rev2 elapsed time = 109.224sec
rev3 elapsed time = 0.001sec

So I found that numpy.mmap version is a lot faster than other 3 functions. Hundred thanks again.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.