Extrema Compression V0.1

Introduction

ExtremaCompression is a low-loss compression method for audio.
Unlike MP3, OGG and other common standards the results still vary widely proportional to frequency.

Higher frequency oscillations will be encoded with minimal loss, but also minimal compression, however low frequencies will compress very small while the quality may deviate from the original product.

How does it work?

Possibly the most intuitive form of compression, the algorithm first divides the wave into segments from a high peak to a low peak or low peak to high peak. Second, it finds the closest matching shape to each segment (out of 255 precomputed lines/curves), and finally replaces each segment with a 5 byte approximation segment.

The Filetype

The intermediate file is simply an encoded bytestring to be read 5 bytes at a time.
Each 5 byte sample can represent two things, a curve, or silence.
If the first byte in the sample is a 0 it represents silence, any other value and it’s a curve.
-Silence
Byte 1: Always 0
Byte 2 to 5: A 4-bit unsigned-long representing how many samples to remain silent.*
-Curve
Byte 1: Unsigned-char, ordinal number representing which shape is used, not zero-indexed (i.e. 1 is the first item, 2 is the second.)
Byte 2 & 3: Unsigned-Short representing the length of the curve in samples*
Byte 4 & 5: Signed-Short representing the position of the speaker at the end of the curve. The start of the curve will be the end of the last curve.

*Sample rate is 44100. That means half a second is 22050samples. 2 seconds 88200samples, and 10ms is 441 samples. So a 440Hz note should oscillate 440 times every 44100 frames (every 100.22 frames.)

Compression Step by step:

First we:
-initialise everything needed for the loop,
and
-load the first frame to determine the start direction.

def RealtimeCompress(inputFile,outputFile): 
    WaveFile=openwave(inputFile)
    samplewidth=WaveFile.getsampwidth()
    cstruct="<l" if samplewidth==4 else "<h"
    magnitude=2**(8*samplewidth-1)-1
    channels=WaveFile.getnchannels()
    print("sample width: %s"%samplewidth)
    print("channels: %s"%channels)
    print("magnitude: %s"%magnitude)


Open the file inputFile, define which cstruct to use depending on the sample width of the wave, unsigned-long for PCM32, unsigned-short for PCM16. (optional: Print the wave properties)

    YThreshold=50/magnitude
    OutFile=open(outputFile,"wb")
    firstframe=WaveFile.readframes(1)[0:samplewidth]
    first=unpack(cstruct,firstframe)[0]/magnitude
    lastdirection=1 if first>YThreshold else -1 if first<-YThreshold else 0
    #1 is up, -1 down, 0 still

-set the Ythreshold (the minimum amount of change required to be considered a movement, until this amount is surpassed the wave is considered still)
-Initialise the output file,
-Read and unpack the first sample from the input file.
-if the first frame exceeds the threshold set the lastdirection to 1 or -1, else lastdirection=0 #1 is up, -1 is down, 0 is still

    last=first
    current=[first]
    n=0
    while 1:

-Initialise the variable last (as in yje last value) as the previous frames value (defined as first)
-Make a list containing only the first value, this will be used to store the current segment.
-Initalise a variable to keep count of how many frames have elapsed (I called it n)
and start the loop.

The loop interior.

        frame=WaveFile.readframes(1)[0:samplewidth]
        if frame==b'':
            #END OF FILE
            break

Load a single frame.
If no frame is available break the loop

        else:
            currentval=unpack(cstruct,frame)[0]/magnitude
            direction=1 if currentval>last+YThreshold else -1 if currentval<last-YThreshold else 0
            #determine direction

If the frame isn’t empty then unpack it into the variable currentval divided by magnitude
if currentval exceeds last by at least YThreshold, set the direction to 1, otherwise if last exceeds currentval by more than YThreshold set direction to -1. Otherwise its 0.

            if direction==lastdirection:
                current.append(currentval)
                last=currentval

now that the direction has been resolved, we compare it with the last direction,
If they’re the same, append currentval to current (the current segment)
and set last to currentval and proceed to the next iteration.

            else:
                n+=1
                ##MAKE SECTION
                if lastdirection==0:
                    #ADD SILENCE BYTES
                    silencebytes=pack("B",0)+pack("L",len(current)) #holybytes
                    OutFile.write(silencebytes)
                    del silencebytes

-In the event the direction has changed since the last sample (the else condition above) then
-Increase N by 1 (there’s a new output)
-If the last direction was 0 (stillness) then make some bytes representing the just elapsed silence. First the value 0 to signify it’s silence and not a curve <1 byte unsigned-char> then len(current) <4 byte unsigned-long>
-write the silence bytes to OutFile

             else:
                    #ADD SECTION
                    curvenum,score=Curves.closest(current)#compareCurve
                    section=(curvenum,len(current),current[-1])
                    #write-out section
                    curvenum=pack("B",section[0]+1)
                    xval=pack("H",int(section[1]))
                    yval=pack("h",int(section[2]*(2**15-1)))
                    curvebytes=curvenum+xval+yval
                    OutFile.write(curvebytes)
                    del curvebytes,xval,yval,curvenum,section,score

If it’s not a silence it’s a curve so run Curves.closet(current) to compare the contents of current with all 255 precomputed curves, to find the closest match. (This can be very slow, so we often just check 8 to 64 evenly spaced points along the curve, depending on the length)
Our whole section is as simple as (The closest shaped curve , the length of the current segment, and where it ends)
These 3 values are packed to an unsigned char, an unsigned short, and a signed short respectively, and in that order, and written to outfile, after that the resources are all freed from python.

                lastdirection=direction
                current=[currentval]
                last=currentval

Before the next iteration replace lastdirection with direction, replace current with [currentval] and replace last with currentval.

            if n==1024:
                OutFile.flush()
                n=0

Every 1024 frames flush the OutFile (everything we sent to file.write() will actually be visible in the directory now, if you want a peak before it’s done)

    OutFile.close()
    WaveFile.close()
    return 1

Once the loop is broken by the end of file, close both files, and return 1 to indicate a success.

Read on for part two, decompression.

The entire python library is available here, and a compressed song I wrote is here (try and decompress it yourself)
Decompression which is fast enough for realtime use but compression is too slow, I’ll rewrite the compression in C++ or Pascal.

Back to top