1

I have got binary files that I want to read and then write to another file(CSV or pickle) in python.

I have a solution but it takes a long time.

The binary files consist of several datasets. the dataset has 1 header(4 byte), 1 sequence data(4 byte), 100 messages(ID:2 byte, data:8 byte).

aa aa aa aa c8 05 00 00 51 02 15 04 ca 8c 00 10
28 80 94 03 00 20 00 00 ff 83 23 98 b0 02 a2 ff
00 07 5a 75 00 00 11 01 00 80 00 ff 4f 2c 0d 84
12 01 ff 50 00 00 ff 2c 0d 00 20 02 0f a4 7e 00
00 fb 0f 12 60 02 06 11 07 30 45 c8 69 20 16 03
05 11 9a 0d 11 0e 00 7f 29 03 d6 9a 81 8c 31 28
00 10 51 02 14 04 cb 50 00 0f 08 80 b0 02 a2 ff
00 07 4b a5 00 00 11 01 00 80 00 ff 4f 25 0d b8
12 01 ff a0 00 00 ff 25 0d 00 20 02 12 c4 7e 00 ...

This is an example of the file I have.

For parsing the binary file, I coded it like this.

def parse(self, bindata):
    msg_list = defaultdict(list)

    
    with memoryview(bindata) as mv:
        old_seq = None

        while mv:
            # header
            header = mv[:HEADER_SIZE].tobytes()
            mv = mv[HEADER_SIZE:]
            if (header != HEADER):
                logging.error("invalid header")
                break

            # seq
            seq = int.from_bytes(
                mv[:SEQUENCE_SIZE].tobytes(), byteorder='little')
            mv = mv[SEQUENCE_SIZE:]
            if (old_seq and seq - old_seq != 1):
                logging.warning(
                    "sequence error. old=%d / current=%d", old_seq, seq)
            old_seq = seq
            

            # msg
            for msg_cnt in range(0, MSG_COUNT):
                if not mv or mv[:HEADER_SIZE] == HEADER:
                    break


                id = int.from_bytes(
                    mv[:MSG_ID_SIZE].tobytes(), byteorder='little')
                mv = mv[MSG_ID_SIZE:]
                
                msg = None
                try:
                    msg = self.__db.get_message_by_frame_id(id)
                except KeyError as e:
                    logging.exception("unknown can id. %s", hex(id))
                    mv = mv[MSG_DATA_SIZE:]
                    continue

                body = mv[:MSG_DATA_SIZE].tobytes()
                mv = mv[MSG_DATA_SIZE:]

                try:
                    decoded_msg = msg.decode(
                        body, decode_choices=False)
                    for k, v in decoded_msg.items():
                        msg_list[k].append(v)

                except Exception as e:
                    logging.exception("unpack error. %s", str(e))
    return msg_list

This code seems to be time-consuming because it accesses data sequentially.

so I want to know the other method.

Can I get a better recommendation?

1 Answer 1

2

You need the struct module. So, the blocks are 1008 bytes each?

    for i in range(0,len(mv),1008):
        hdr,seq = struct.unpack('II', mv[i:i+8] )
        for msg in range( 8, 1008, 100 ):
            id = mv[i+msg] * 256 + mv[i+msg+1]
            code = mv[i+msg+2:i+msg+10]
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.