Python Packing Streams of Arbitrary Length

When starting out with IO and learning to efficiently read and write binary structures to and from Python most people will confront the problem of packing a string or list into a structure.

Variations of this question plague Stack Overflow, here are two quick examples:
https://stackoverflow.com/questions/3753589/packing-and-unpacking-variable-length-array-string-using-the-struct-module-in-py,
https://stackoverflow.com/questions/34117620/how-to-unpack-variable-length-data-in-python-struct

I have a rather simple solution I decided to share:

When packing your data, before each variable length sequence of bytes, we want to write a mini header declaring how long the following piece of data will be.
If we want to truly handle arbitrary ranges no matter how large then our header size will also need to vary.

The header is created by the aptly named function make_dynamic_length_header(Length_In_Bytes), this function takes only a single argument, the size of the byte stream you intend on packing.

Next we calculate how many symbols it would take to represent this number in base 256, this is simply the ceiling of log256(length_in_bytes+1).

Next we want to pack log256(length_in_bytes+1) into an unsigned Long Long (8bytes), these eight bytes will be the start of the output we return at the end, so store them in a bytes variable.

Now that we know how many characters it will take to write down our length, first create a variable for the remaining number of bytes that need to be encoded at first this is equal to the initial argument length_in_bytes, next we iterate through the following loop until the remainder equals 0:

while remainder:
pack remainder mod 256 into a byte and append it to the variable you’ll return at the end,
remainder equals the floor of remainder divided by 256 (Casting to an integer will automatically truncate which is equivalent to floor for unsigned integers)

Once this is done, return your header.

The whole thing looks like this:

from math import log,ceil,pi,sqrt
from struct import pack,unpack

def make_dynamic_length_header(Length_In_Bytes):
    number_of_characters=ceil(log(Length_In_Bytes+1,256))
    packedhead=pack(">Q",number_of_characters)  #first eight bytes encode how many more bytes would be needed to write down the length of your stream. (Packed big endian)
    remainder=Length_In_Bytes
    while remainder:#loop to write down the length of the stream, in base_256
        val=pack("B",remainder%256)
        packedhead+=val
        remainder=int(remainder/256)
    return packedhead

And to use it to pack a bytes object we’ll want this little helper function, which simply takes bytes as an input, and returns a header describing the length of the following bytes, succeeded by the bytes themselves:

def pack_bytes_with_header(b):
    return make_dynamic_length_header(len(b))+b

Complete Code

from math import log,ceil,pi,sqrt
from struct import pack,unpack

def make_dynamic_length_header(Length_In_Bytes):
    number_of_characters=ceil(log(Length_In_Bytes+1,256))
    packedhead=pack(">Q",number_of_characters)  #first eight bytes encode how many more bytes would be needed to write down the length of your stream. (Packed big endian)
    remainder=Length_In_Bytes
    while remainder:#loop to write down the length of the stream, in base_256
        test=pack("B",remainder%256)
        packedhead+=test
        remainder=int(remainder/256)
    return packedhead

def read_dynamic_length_header(inbytes):
    head=inbytes[0:8]
    length=unpack(">Q",inbytes[0:8])[0] ##how many bytes did it take to write down the length of the data (8 bytes, big endian, max size larger than exibyte)
    lib=0#length_in_bytes
    mag=1
    for i in range(length): ##load that many bytes, magnitude of byte increases by 256 each time, smallest first.
        test=unpack("B",inbytes[8+i:9+i])[0]
        lib+=test*mag
        mag*=256
    return length,inbytes[9+i:]

def pack_bytes_with_header(b):
    return make_dynamic_length_header(len(b))+b
def unpack_bytes_with_header(b):
    return read_dynamic_length_header(b)

def random_megabyte():
    from random import random
    b=b""
    for i in range(1024**2):
        b+=pack("B",int(random()*256))
    return b

"""
KB=1024
MB=1024**2
GB=1024**3
TB=1024**4

####TESTS#######
TestSize1=1*KB
TestSize2=1*MB+50*KB
TestSize3=1*TB+50*GB
a=make_dynamic_length_header(TestSize1)
b=make_dynamic_length_header(TestSize2)
c=make_dynamic_length_header(TestSize3)

print(a)
print(read_dynamic_length_header(a))
print(b)
print(read_dynamic_length_header(b))
print(c)
print(read_dynamic_length_header(c))
testin=random_megabyte()
packed=pack_bytes_with_header(testin)
print(packed)
unpacked=unpack_bytes_with_header(packed)
print(unpacked)
print(testin==unpacked[1])
##################
"""

Applications:

There are infinite applications basically, anytime you want to share a stream with multiple pieces of data, you can simply concatenate all your byte streams together without obscuring where one begins and another ends.
Any long section of bytes will always be proceeded by a statement of its length.
Another advantage is that if you/your-code determines that a byte stream is irrelevant it knows exactly how any bytes to skip to get to the next stream.

I will add links to more examples here as I have built upon this method, but most of it will need to be scrubbed of private information.
My cipher and a few of my networking protocols, are built to utilize this method I will make sure to upload them soon

Back to top