forked from xuos/xiuos
				
			
		
			
				
	
	
		
			206 lines
		
	
	
		
			6.6 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			206 lines
		
	
	
		
			6.6 KiB
		
	
	
	
		
			Python
		
	
	
	
| from __future__ import print_function
 | |
| 
 | |
| import collections
 | |
| import re
 | |
| import sys
 | |
| 
 | |
| import gzip
 | |
| import zlib
 | |
| 
 | |
| 
 | |
| _COMPRESSED_MARKER = 0xFF
 | |
| 
 | |
| 
 | |
| def check_non_ascii(msg):
 | |
|     for c in msg:
 | |
|         if ord(c) >= 0x80:
 | |
|             print(
 | |
|                 'Unable to generate compressed data: message "{}" contains a non-ascii character "{}".'.format(
 | |
|                     msg, c
 | |
|                 ),
 | |
|                 file=sys.stderr,
 | |
|             )
 | |
|             sys.exit(1)
 | |
| 
 | |
| 
 | |
| # Replace <char><space> with <char | 0x80>.
 | |
| # Trival scheme to demo/test.
 | |
| def space_compression(error_strings):
 | |
|     for line in error_strings:
 | |
|         check_non_ascii(line)
 | |
|         result = ""
 | |
|         for i in range(len(line)):
 | |
|             if i > 0 and line[i] == " ":
 | |
|                 result = result[:-1]
 | |
|                 result += "\\{:03o}".format(ord(line[i - 1]))
 | |
|             else:
 | |
|                 result += line[i]
 | |
|         error_strings[line] = result
 | |
|     return None
 | |
| 
 | |
| 
 | |
| # Replace common words with <0x80 | index>.
 | |
| # Index is into a table of words stored as aaaaa<0x80|a>bbb<0x80|b>...
 | |
| # Replaced words are assumed to have spaces either side to avoid having to store the spaces in the compressed strings.
 | |
| def word_compression(error_strings):
 | |
|     topn = collections.Counter()
 | |
| 
 | |
|     for line in error_strings.keys():
 | |
|         check_non_ascii(line)
 | |
|         for word in line.split(" "):
 | |
|             topn[word] += 1
 | |
| 
 | |
|     # Order not just by frequency, but by expected saving. i.e. prefer a longer string that is used less frequently.
 | |
|     # Use the word itself for ties so that compression is deterministic.
 | |
|     def bytes_saved(item):
 | |
|         w, n = item
 | |
|         return -((len(w) + 1) * (n - 1)), w
 | |
| 
 | |
|     top128 = sorted(topn.items(), key=bytes_saved)[:128]
 | |
| 
 | |
|     index = [w for w, _ in top128]
 | |
|     index_lookup = {w: i for i, w in enumerate(index)}
 | |
| 
 | |
|     for line in error_strings.keys():
 | |
|         result = ""
 | |
|         need_space = False
 | |
|         for word in line.split(" "):
 | |
|             if word in index_lookup:
 | |
|                 result += "\\{:03o}".format(0b10000000 | index_lookup[word])
 | |
|                 need_space = False
 | |
|             else:
 | |
|                 if need_space:
 | |
|                     result += " "
 | |
|                 need_space = True
 | |
|                 result += word
 | |
|         error_strings[line] = result.strip()
 | |
| 
 | |
|     return "".join(w[:-1] + "\\{:03o}".format(0b10000000 | ord(w[-1])) for w in index)
 | |
| 
 | |
| 
 | |
| # Replace chars in text with variable length bit sequence.
 | |
| # For comparison only (the table is not emitted).
 | |
| def huffman_compression(error_strings):
 | |
|     # https://github.com/tannewt/huffman
 | |
|     import huffman
 | |
| 
 | |
|     all_strings = "".join(error_strings)
 | |
|     cb = huffman.codebook(collections.Counter(all_strings).items())
 | |
| 
 | |
|     for line in error_strings:
 | |
|         b = "1"
 | |
|         for c in line:
 | |
|             b += cb[c]
 | |
|         n = len(b)
 | |
|         if n % 8 != 0:
 | |
|             n += 8 - (n % 8)
 | |
|         result = ""
 | |
|         for i in range(0, n, 8):
 | |
|             result += "\\{:03o}".format(int(b[i : i + 8], 2))
 | |
|         if len(result) > len(line) * 4:
 | |
|             result = line
 | |
|         error_strings[line] = result
 | |
| 
 | |
|     # TODO: This would be the prefix lengths and the table ordering.
 | |
|     return "_" * (10 + len(cb))
 | |
| 
 | |
| 
 | |
| # Replace common N-letter sequences with <0x80 | index>, where
 | |
| # the common sequences are stored in a separate table.
 | |
| # This isn't very useful, need a smarter way to find top-ngrams.
 | |
| def ngram_compression(error_strings):
 | |
|     topn = collections.Counter()
 | |
|     N = 2
 | |
| 
 | |
|     for line in error_strings.keys():
 | |
|         check_non_ascii(line)
 | |
|         if len(line) < N:
 | |
|             continue
 | |
|         for i in range(0, len(line) - N, N):
 | |
|             topn[line[i : i + N]] += 1
 | |
| 
 | |
|     def bytes_saved(item):
 | |
|         w, n = item
 | |
|         return -(len(w) * (n - 1))
 | |
| 
 | |
|     top128 = sorted(topn.items(), key=bytes_saved)[:128]
 | |
| 
 | |
|     index = [w for w, _ in top128]
 | |
|     index_lookup = {w: i for i, w in enumerate(index)}
 | |
| 
 | |
|     for line in error_strings.keys():
 | |
|         result = ""
 | |
|         for i in range(0, len(line) - N + 1, N):
 | |
|             word = line[i : i + N]
 | |
|             if word in index_lookup:
 | |
|                 result += "\\{:03o}".format(0b10000000 | index_lookup[word])
 | |
|             else:
 | |
|                 result += word
 | |
|         if len(line) % N != 0:
 | |
|             result += line[len(line) - len(line) % N :]
 | |
|         error_strings[line] = result.strip()
 | |
| 
 | |
|     return "".join(index)
 | |
| 
 | |
| 
 | |
| def main(collected_path, fn):
 | |
|     error_strings = collections.OrderedDict()
 | |
|     max_uncompressed_len = 0
 | |
|     num_uses = 0
 | |
| 
 | |
|     # Read in all MP_ERROR_TEXT strings.
 | |
|     with open(collected_path, "r") as f:
 | |
|         for line in f:
 | |
|             line = line.strip()
 | |
|             if not line:
 | |
|                 continue
 | |
|             num_uses += 1
 | |
|             error_strings[line] = None
 | |
|             max_uncompressed_len = max(max_uncompressed_len, len(line))
 | |
| 
 | |
|     # So that objexcept.c can figure out how big the buffer needs to be.
 | |
|     print("#define MP_MAX_UNCOMPRESSED_TEXT_LEN ({})".format(max_uncompressed_len))
 | |
| 
 | |
|     # Run the compression.
 | |
|     compressed_data = fn(error_strings)
 | |
| 
 | |
|     # Print the data table.
 | |
|     print('MP_COMPRESSED_DATA("{}")'.format(compressed_data))
 | |
| 
 | |
|     # Print the replacements.
 | |
|     for uncomp, comp in error_strings.items():
 | |
|         if uncomp == comp:
 | |
|             prefix = ""
 | |
|         else:
 | |
|             prefix = "\\{:03o}".format(_COMPRESSED_MARKER)
 | |
|         print('MP_MATCH_COMPRESSED("{}", "{}{}")'.format(uncomp, prefix, comp))
 | |
| 
 | |
|     # Used to calculate the "true" length of the (escaped) compressed strings.
 | |
|     def unescape(s):
 | |
|         return re.sub(r"\\\d\d\d", "!", s)
 | |
| 
 | |
|     # Stats. Note this doesn't include the cost of the decompressor code.
 | |
|     uncomp_len = sum(len(s) + 1 for s in error_strings.keys())
 | |
|     comp_len = sum(1 + len(unescape(s)) + 1 for s in error_strings.values())
 | |
|     data_len = len(compressed_data) + 1 if compressed_data else 0
 | |
|     print("// Total input length:      {}".format(uncomp_len))
 | |
|     print("// Total compressed length: {}".format(comp_len))
 | |
|     print("// Total data length:       {}".format(data_len))
 | |
|     print("// Predicted saving:        {}".format(uncomp_len - comp_len - data_len))
 | |
| 
 | |
|     # Somewhat meaningless comparison to zlib/gzip.
 | |
|     all_input_bytes = "\\0".join(error_strings.keys()).encode()
 | |
|     print()
 | |
|     if hasattr(gzip, "compress"):
 | |
|         gzip_len = len(gzip.compress(all_input_bytes)) + num_uses * 4
 | |
|         print("// gzip length:             {}".format(gzip_len))
 | |
|         print("// Percentage of gzip:      {:.1f}%".format(100 * (comp_len + data_len) / gzip_len))
 | |
|     if hasattr(zlib, "compress"):
 | |
|         zlib_len = len(zlib.compress(all_input_bytes)) + num_uses * 4
 | |
|         print("// zlib length:             {}".format(zlib_len))
 | |
|         print("// Percentage of zlib:      {:.1f}%".format(100 * (comp_len + data_len) / zlib_len))
 | |
| 
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     main(sys.argv[1], word_compression)
 |