We are going to put everything together, with the Bitstream class we developed in the last part and will be making some changes to the Huffman class. I changed the HuffEntry since the BitStream has a write integer with fixed bitlength, so I made it generate an appropriate rather than using a list, and the constructor for the Huffman now takes a file and a mode (the same as the BitStream class). The header for the compressed file will take advantage of the fact that the frequencies are sorted, so it will be <code> <freq> …. but the length for the frequency will be 30 bits for the first one (1GB of the same value, unlikely), and each subsequent value with the log2 of the previous value (so if there were 200 of the first value, we know we only need at most 8 bits for the next value, and so on, until we reach one with 0 frequency).
import sys, argparse, bitstream
HEADER = 0x48756666 # Or Huff
class HuffEntry():
def __init__(self, parent : “HuffEntry”):
“””
Huffman Entries are recursive and have left and right trees and parent
:param parent: The root of the node (if None we are at the root)
“””
self.parent = parent
self.left = self.right = None
self.code = -1
self.length = 0
self.value = 0
def generate(self):
curr = self
n = self.value = 0
while curr.parent:
self.length += 1
n <<= 1
if curr.parent.right == curr:
n |= 1
curr = curr.parent
for _ in range(self.length):
self.value <<= 1
self.value |= n & 1
n >>= 1
class Huffman():
def __init__(self, filename : str, mode : str = “r”):
self.filename = filename
self.mode = mode
if mode == “r”:
pass
elif mode == “w”:
pass
else:
raise ValueError(“Invalid mode”)
def encode(self, source: str):
“””
Huffman encode file in source and save destination
:param source: Source file
:return: None
“””
counts = [0] * 256
with open(source, “rb”) as src:
while src:
block = src.read(4096)
if not block:
break
for val in block:
counts[val] += 1
freq = sorted([(count, val) for val, count in enumerate(counts)], reverse=True)
self.build(freq)
with bitstream.Bitstream(self.filename, self.mode) as f:
f.write(HEADER, 32) # Write “Huff”
size = 30 # Maximum of 2^30 (or 1GB of same byte)
for count, val in freq:
f.write(val, 8) # Write the character
f.write(count, size) # Write the frequency
size = count.bit_length() # Next value must be smaller
if not count:
break # Exit when we get a zero frequency
with open(source, “rb”) as src: # Now encode the contents of the file
while src:
block = src.read(4096)
if not block:
break
for val in block:
huff_entry = self.codes[val]
f.write(huff_entry.value, huff_entry.length)
def build(self, frequencies : list((int, int))):
“””
Create Huffman tree based on set of sorted frequencies
:param frequencies: List of frequency, value tuples
“””
self.cumulative = [] # To save multiple recalculations of the size
self.frequencies = frequencies
self.codes = {} # To lookup entry by code
self.total = 0
for count, entry in frequencies:
if count:
self.total += count
self.cumulative.append((self.total, entry))
self.root = HuffEntry(None)
# Recursively build Huffman tree
self.segment(self.root, 0, len(self.cumulative)-1)
for code in self.codes:
huff_entry = self.codes[code]
huff_entry.generate()
def segment(self, huff_entry : HuffEntry, start : int, end : int):
“””
Split the subset into roughly equal halves based on frequency
:param huff_entry: Huffman root (possibly of subtree)
:param start: Consider frequency[start:end]
:param end:
:return: None
“””
if start == end: # We are at leaf
code = self.cumulative[start][1] # Read the code
huff_entry.code = code
self.codes[code] = huff_entry # Store the code in the leaf
return
remaining = self.cumulative[end][0] # How much does the segment represent
initial = 0
if start:
initial = self.cumulative[start-1][0] # We need to calculate the initial start
remaining -= initial
mid = remaining >> 1 # The middle of the range
for split in range(start, end): # Find the split point
if self.cumulative[split][0] – initial > mid:
break
pass
#If 2 or 3 items in the list, always split to the left to be the smallest, since most frequent is first
if start + 2 >= end:
split = start
#print(“Split(%d, %d) = %d” % (start, end, split))
# Create left and right subtrees
huff_entry.left = HuffEntry(huff_entry)
huff_entry.right = HuffEntry(huff_entry)
# Create subtrees for left and right
self.segment(huff_entry.left, start, split)
self.segment(huff_entry.right, split + 1, end)
def __enter__(self):
f = self.bitstream = bitstream.Bitstream(self.filename, self.mode)
f.__enter__() # Open the file
header = self.bitstream.read(32)
if header != HEADER:
raise IOError(“Invalid Huffman file”)
size = 30
freq = []
# Load the packed Huffman header for the frequencies
while True:
value = f.read(8)
count = f.read(size)
if not count:
break
freq.append((count, value))
size = count.bit_length()
self.build(freq)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.bitstream.__exit__(exc_type, exc_val, exc_tb)
def save(self, filename : str):
with open(filename, “wb”) as f:
for byte in self:
f.write(byte)
def __iter__(self):
bytes = bytearray(1)
for _ in range(self.total):
curr = self.root
while curr.left: # Always left and right, so only need to check 1
if self.bitstream.readbit():
curr = curr.right
else:
curr = curr.left
bytes[0] = curr.code
yield bytes
def start():
“””
Huffman compress or decompress file based on command line arguments
:return:
“””
parser = argparse.ArgumentParser(description=‘Encode or decode file using Huffman compression.’)
source = parser.add_mutually_exclusive_group(required=True)
source.add_argument(‘-e’, ‘–encode’, dest=‘encode’, help=‘file to encode’)
source.add_argument(‘-d’, ‘–decode’, dest=‘decode’, help=‘file to decode’)
parser.add_argument(‘-o’, ‘–output’, dest=‘output’, help=‘file to save’, required=True)
args = parser.parse_args()
if args.encode:
huff = Huffman(args.output, ‘w’)
huff.encode(args.encode)
if args.decode:
with Huffman(args.decode) as f:
f.save(args.output)
def main(args=“”):
sys.argv = [sys.argv[0]] + args.split()
start()
if __name__ == “__main__”:
start()
#main(“-e assignment1_data.txt -o data.huf”)
#main(“-d data.huf -o output.txt”)
#main(“-h”)
At the bottom you can see how to invoke it with some sample data files, it compressed the test file from 11K to 5K. There are further changes that could be made but it works, to make it into a full compression program, it should store the filename, date modified, and possibly run length encoding as well.
We can write complex assignments for you that involve multiple files and complex behaviors, in a variety of programming languages.
You can rely on Programming Homework Assignment to provide any Python programming homework help you may have.