BU40N MT1959 firmware format: packed region at 0x158000 (compression method known?)

Discussion of LibreDrive mode, compatible drives and firmwares
Post Reply
ibizara
Posts: 3
Joined: Mon Jun 22, 2026 12:33 pm

BU40N MT1959 firmware format: packed region at 0x158000 (compression method known?)

Post by ibizara »

I'm comparing BU40N 1.00 and 1.03 firmware internals while investigating OmniDrive behaviour and noticed a large packed-looking region that differs almost completely between firmware versions despite having a very similar header structure.

Firmware images examined:

Code: Select all

BU40N_1.00_stock.bin
MD5: edb28fcd7a239281ace26a468d382a9c

BU40N_1.03_MK.bin
MD5: 74ebaf627d2aac5f899191d6caceb54c
The 1.00 image is the original LG firmware. The 1.03_MK image is the MakeMKV/LibreDrive patched firmware based on LG 1.03.

Looking at the raw firmware images, there appears to be a module beginning at offset 0x158000.

BU40N 1.00

Code: Select all

offset:       0x158000
field 1:      0x00072464 (468,068)
field 2:      0x0004E98F (321,935)
region end:   0x1A698F
BU40N 1.03_MK

Code: Select all

offset:       0x158000
field 1:      0x00071C60 (466,016)
field 2:      0x0004E3B1 (320,433)
region end:   0x1A63B1
The start of the region looks like:

Code: Select all

0x158000:
00072464
0004E98F
07060605
07070706
08070706
08080808
...
and the corresponding structure in 1.03_MK is almost identical.

After the first 8 bytes there is approximately 0x140 bytes of highly structured low-valued data, followed by a high-entropy stream that differs almost completely between firmware versions.

The interesting part is that the 0x140-byte structure does not appear random. Treating it as code-length data produced the following result:

First 288 entries → Kraft sum = 1.0
Last 32 entries → Kraft sum = 1.0
Entire 320-byte table → Kraft sum = 2.0

This suggests the region contains two complete prefix-code (Huffman-like) tables, arranged as:

Code: Select all

0x158000  size field
0x158004  size field
0x158008  288-byte code-length table
0x158128  32-byte code-length table
0x158148  compressed bitstream
The table structure is nearly identical between 1.00 and 1.03_MK, while the compressed stream contents are almost entirely different.

I extracted the stream and tested the obvious formats:

Code: Select all

zlib
raw deflate
gzip
bzip2
lzma/xz
All failed.

I also looked at MediaTek's documented ALICE firmware compression (used in some MTK products). While there are some conceptual similarities (table-driven compressed instruction streams), this BU40N format does not appear to be a standard ALICE container.

Questions:
  1. Has anyone identified the compression/packing method used for this MT1959 firmware block?
  2. Does the decompressor reside inside the firmware itself, or in MT1959 boot ROM / mask ROM?
  3. Does this region contain executable ARM code, servo/DSP microcode, or some other firmware component?
  4. Has this area ever been reverse engineered by the LibreDrive / MakeMKV developers or anyone working on MTK optical drive firmware?
I'm mainly trying to understand the firmware format and whether this packed region has ever been decoded or modified successfully.
RibShark
Posts: 19
Joined: Mon Apr 29, 2019 6:27 pm

Re: BU40N MT1959 firmware format: packed region at 0x158000 (compression method known?)

Post by RibShark »

ibizara wrote: Mon Jun 22, 2026 12:40 pm Questions:
  1. Has anyone identified the compression/packing method used for this MT1959 firmware block?
  2. Does the decompressor reside inside the firmware itself, or in MT1959 boot ROM / mask ROM?
  3. Does this region contain executable ARM code, servo/DSP microcode, or some other firmware component?
  4. Has this area ever been reverse engineered by the LibreDrive / MakeMKV developers or anyone working on MTK optical drive firmware?
  1. I failed to work out what it was when I tried.
  2. The decompression code is in the firmware it seems, as ARM code.
  3. It contains THUMB code; various areas in the firmware jump to this code via a thunk. It's always decompressed to the same place so the addresses are static.
  4. Not sure about the MakeMKV dev but I haven't tried much to reverse the compression algorithm yet.
Lemme know if you are able to work this out, would be super helpful (I'm the OmniDrive dev). Right now I'm relying on RAM dumps from the drive which have this part decompressed, but being able to do this and recompress back into the firmware could open up some doors.
ibizara
Posts: 3
Joined: Mon Jun 22, 2026 12:33 pm

Re: BU40N MT1959 firmware format: packed region at 0x158000 (compression method known?)

Post by ibizara »

Small update / WIP.

I made some progress on the 0x158000 packed block in BU40N 1.00.

The original table split still looks correct:

Code: Select all

0x158008  288-byte literal/length code-length table
0x158128   32-byte distance code-length table
0x158148  compressed bitstream
I now have an experimental Python decoder that expands the BU40N 1.00 block from:

Code: Select all

compressed:   0x72464
decompressed: 0x4e98f
The format appears to be a custom canonical-Huffman + LZ77-style scheme, but not standard DEFLATE.

Current working assumptions:

Code: Select all

bitstream:       MSB-first
Huffman:         canonical codes, bit-reversed for lookup
symbol 256:      literal zero, not EOF
symbols 257-287: length symbols
distance:        raw distance symbol + 1
This produces an output file of the advertised decompressed size. As a sanity check, the decoded output contains:
0x27b76: CAETDVD_59110933
So it is definitely producing structured data from the packed block.

Important caveat: I do not think this is 100% solved yet. The output contains plausible Thumb-looking code and strings, but it does not currently decompile cleanly as one linear ARM/Thumb image. There may still be a small semantic difference in the decoder, a relocation/fixup step, a second transform, or simply mixed code/data/microcode in the decompressed payload.

Here is the current Python script:

Code: Select all

#!/usr/bin/env python3
import argparse
import struct
from pathlib import Path

LBASE = [
    3, 4, 5, 6, 7, 8, 9, 10,
    11, 13, 15, 17, 19, 23, 27, 31,
    35, 43, 51, 59, 67, 83, 99, 115,
    131, 163, 195, 227, 258, 258, 258,
]

LEXT = [
    0, 0, 0, 0, 0, 0, 0, 0,
    1, 1, 1, 1, 2, 2, 2, 2,
    3, 3, 3, 3, 4, 4, 4, 4,
    5, 5, 5, 5, 0, 0, 0,
]


class BitReader:
    def __init__(self, data: bytes):
        self.data = data
        self.bitpos = 0

    def read(self, n: int) -> int:
        value = 0

        for i in range(n):
            if self.bitpos >= len(self.data) * 8:
                raise EOFError("ran out of compressed input")

            byte = self.data[self.bitpos >> 3]
            bit = (byte >> (7 - (self.bitpos & 7))) & 1
            value |= bit << i
            self.bitpos += 1

        return value


def reverse_bits(value: int, width: int) -> int:
    out = 0
    for _ in range(width):
        out = (out << 1) | (value & 1)
        value >>= 1
    return out


def build_canonical_table(lengths: bytes) -> dict[tuple[int, int], int]:
    counts: dict[int, int] = {}

    for length in lengths:
        if length:
            counts[length] = counts.get(length, 0) + 1

    code = 0
    next_code: dict[int, int] = {}

    for bits in range(1, max(counts.keys(), default=0) + 1):
        code = (code + counts.get(bits - 1, 0)) << 1
        next_code[bits] = code

    table: dict[tuple[int, int], int] = {}

    for symbol, length in enumerate(lengths):
        if not length:
            continue

        canonical = next_code[length]
        next_code[length] += 1

        # Required for this stream.
        stored_code = reverse_bits(canonical, length)
        table[(stored_code, length)] = symbol

    return table


def decode_symbol(br: BitReader, table: dict[tuple[int, int], int]) -> int:
    code = 0

    for length in range(1, 32):
        code |= br.read(1) << (length - 1)

        symbol = table.get((code, length))
        if symbol is not None:
            return symbol

    raise ValueError(f"bad Huffman code at bit {br.bitpos}")


def decompress_partition(firmware: bytes, offset: int = 0x158000) -> tuple[bytes, int, int, int]:
    compressed_size, output_size = struct.unpack_from("<II", firmware, offset)

    lit_table_off = offset + 8
    dist_table_off = lit_table_off + 288
    stream_off = dist_table_off + 32

    lit_lengths = firmware[lit_table_off:lit_table_off + 288]
    dist_lengths = firmware[dist_table_off:dist_table_off + 32]
    stream = firmware[stream_off:stream_off + compressed_size]

    lit_tree = build_canonical_table(lit_lengths)
    dist_tree = build_canonical_table(dist_lengths)

    br = BitReader(stream)
    out = bytearray()

    while len(out) < output_size:
        symbol = decode_symbol(br, lit_tree)

        if symbol < 256:
            out.append(symbol)
            continue

        # In this format, symbol 256 behaves as literal zero.
        if symbol == 256:
            out.append(0)
            continue

        length_index = symbol - 257

        if length_index < 0 or length_index >= len(LBASE):
            raise ValueError(
                f"bad length symbol {symbol} at output={len(out):#x}, bit={br.bitpos}"
            )

        length = LBASE[length_index]
        extra_bits = LEXT[length_index]

        if extra_bits:
            length += br.read(extra_bits)

        distance_symbol = decode_symbol(br, dist_tree)

        # Unlike DEFLATE, this currently appears to use raw distance symbols.
        distance = distance_symbol + 1

        if distance <= 0 or distance > len(out):
            raise ValueError(
                f"invalid distance {distance} at output={len(out):#x}, bit={br.bitpos}"
            )

        for _ in range(length):
            out.append(out[-distance])

            if len(out) >= output_size:
                break

    return bytes(out), compressed_size, output_size, br.bitpos


def main() -> None:
    parser = argparse.ArgumentParser(
        description="Experimental BU40N 1.00 0x158000 partition decoder"
    )
    parser.add_argument("firmware", help="input BU40N firmware .bin")
    parser.add_argument("-o", "--output", default="decoded_158000.bin")
    parser.add_argument("--offset", default="0x158000")

    args = parser.parse_args()

    firmware = Path(args.firmware).read_bytes()
    offset = int(args.offset, 0)

    decoded, compressed_size, output_size, bits_used = decompress_partition(
        firmware, offset
    )

    Path(args.output).write_bytes(decoded)

    print(f"partition offset:   {offset:#x}")
    print(f"compressed size:    {compressed_size:#x}")
    print(f"decompressed size:  {len(decoded):#x}/{output_size:#x}")
    print(f"bits consumed:      {bits_used}")
    print(f"wrote:              {args.output}")


if __name__ == "__main__":
    main()
Run with:

Code: Select all

python3 decode_158000.py BU40N_1.00_stock.bin

partition offset:   0x158000
compressed size:    0x72464
decompressed size:  0x4e98f/0x4e98f
bits consumed:      1632522
wrote:              decoded_158000.bin

strings -a -tx decoded_158000.bin | grep CAETDVD
Expected string:

Code: Select all

27b76 CAETDVD_59110933
If anyone has a RAM dump of this region after the drive has decompressed it, comparing that against this output would probably show exactly what is still missing.
Last edited by ibizara on Sun Jun 28, 2026 2:35 pm, edited 1 time in total.
RibShark
Posts: 19
Joined: Mon Apr 29, 2019 6:27 pm

Re: BU40N MT1959 firmware format: packed region at 0x158000 (compression method known?)

Post by RibShark »

ibizara wrote: Sun Jun 28, 2026 2:28 pm If anyone has a RAM dump of this region after the drive has decompressed it, comparing that against this output would probably show exactly what is still missing.
RAM dump here: https://workupload.com/file/daUYdQasjXj

Please check DMs, I'm interesting in working with you on this; but these forums aren't the best place due to downtime. Do you have anywhere else I can contact you?
ibizara
Posts: 3
Joined: Mon Jun 22, 2026 12:33 pm

Re: BU40N MT1959 firmware format: packed region at 0x158000 (compression method known?)

Post by ibizara »

Thanks, that RAM dump was exactly what was needed.

I agree with your correction: my original decoder had the two header fields interpreted the wrong way round, and it also had at least two semantic mistakes in the LZ/Huffman layer.

The original interpretation I was using was:

Code: Select all

field0 = compressed size
field1 = decompressed size
but after comparing against the RAM dump, the better interpretation appears to be:

Code: Select all

0x158000 field0 = nominal decompressed/output size
0x158004 field1 = packed partition span from 0x158000
For BU40N 1.00 stock:

Code: Select all

field0 = 0x72464
field1 = 0x4e98f
So the layout is now more likely:

Code: Select all

0x158000  output size / nominal decompressed size
0x158004  packed partition span
0x158008  288-byte literal/length code-length table
0x158128   32-byte distance code-length table
0x158148  compressed bitstream
The bitstream therefore runs up to:

Code: Select all

0x158000 + 0x4e98f = 0x1a698f
The RAM dump you provided is:

Code: Select all

size = 0x71c84
which is close to, but not exactly, the `0x72464` output-size field. I make the difference:

Code: Select all

0x72464 - 0x71c84 = 0x7e0
So either the dump is missing/padded/truncated slightly, or `field0` is a nominal workspace/output bound rather than a strict byte-for-byte RAM dump length.

The really useful part is that the RAM dump immediately proved where my old decoder went wrong. The previous output matched only until offset `0x1a`, then diverged.

The old decoder produced this around the first mismatch:

Code: Select all

f1 b5 04 00 c0 20 84 b0 6f f0 c6 ef 61 09 03 90
01 20 04 91 6f f0 c4 ef 04 20 20 20 20 c4 ef 04
but the RAM dump has:

Code: Select all

f1 b5 04 00 c0 20 84 b0 6f f0 c6 ef 61 09 03 90
01 20 04 91 6f f0 c4 ef 04 20 6f f0 c6 ef 04 20
That first bad copy led to two important fixes.

First, symbol `256` is not a literal zero and not EOF. It appears to be the first LZ length symbol.

So instead of:

Code: Select all

0..255   literal bytes
256      literal zero
257..287 length symbols
the corrected interpretation is:

Code: Select all

0..255   literal bytes
256..287 length symbols
with:

Code: Select all

256 -> length 3
257 -> length 4
258 -> length 5
259 -> length 6
...
Second, the distance is not simply:

Code: Select all

distance = distance_symbol + 1
The early RAM comparison suggests the distance coding is:

Code: Select all

distance_prefix = Huffman-coded distance symbol
distance_low7   = next 7 raw bits, read MSB-first
distance        = (distance_prefix << 7) | distance_low7
With that change, the first bad area is fixed.

For example, at output offset `0x1a`, the corrected decoder sees:

Code: Select all

length symbol 257 -> length 4
distance prefix 0
raw7 = 0x12
distance = 0x12
which copies:

Code: Select all

6f f0 c6 ef
from the earlier bytes, matching the RAM dump.

Then the next copy gives:

Code: Select all

04 20 6f f0 c6 ef
which also matches.

With these changes, my current decoder now matches the RAM dump until offset:

Code: Select all

0x28a
The first remaining mismatch I see is:

Code: Select all

offset 0x28a

decoded:
68 ff 04 00 01 00 5a 48 ...

RAM:
22 fe 04 00 01 00 5a 48 ...
That difference looks branch/immediate-like rather than random corruption, so I do not yet know whether this is still a decompression error, a firmware-version mismatch, a relocation/runtime patch, or a dump-window issue.

The current decoder output statistics are:

Code: Select all

decoded size:           0x72464
bits consumed:          2357865
bytes consumed rounded: 0x47f4e
unused stream bytes:    0x68f9
The unused stream bytes are also suspicious, so I would not call this solved yet. It is just much closer than the previous version.

Here is the revised experimental decoder:

Code: Select all

#!/usr/bin/env python3
"""
Experimental BU40N / MT1959 decoder for the packed block at 0x158000.

Current working assumptions:

  * header[0] is the nominal decompressed/output size
  * header[1] is the packed partition span, including header + tables
  * literal/length table is 288 bytes at offset + 0x08
  * distance table is 32 bytes at offset + 0x128
  * bitstream starts at offset + 0x148
  * bitstream is read physically MSB-first
  * canonical Huffman codes are bit-reversed for lookup
  * symbols 0..255 are literal bytes
  * symbol 256 is not EOF and not literal zero
  * symbols 256..287 are LZ length symbols
  * length symbol 256 maps to length 3
  * distance is encoded as:
        Huffman-coded prefix symbol, then 7 raw MSB-first bits
        distance = (distance_prefix << 7) | raw7

This is still experimental. It fixes the previous divergence at 0x1a
against RibShark's RAM dump, but still diverges later.
"""

from __future__ import annotations

import argparse
import hashlib
import struct
from pathlib import Path


# Length bases, shifted so symbol 256 maps to length 3.
#
# This is deliberately not using DEFLATE's symbol numbering directly:
#
#   DEFLATE: symbol 257 -> length 3
#   here:    symbol 256 -> length 3
#
# At least in the verified early stream, no DEFLATE-style length extra bits
# are consumed. Consuming extra bits misaligns the following distance coding.
LBASE = [
    3, 4, 5, 6, 7, 8, 9, 10,
    11, 13, 15, 17, 19, 23, 27, 31,
    35, 43, 51, 59, 67, 83, 99, 115,
    131, 163, 195, 227, 258, 258, 258, 258,
]


class BitReader:
    def __init__(self, data: bytes):
        self.data = data
        self.bitpos = 0

    def _read_physical_bit(self) -> int:
        if self.bitpos >= len(self.data) * 8:
            raise EOFError("ran out of compressed input")

        byte = self.data[self.bitpos >> 3]
        bit = (byte >> (7 - (self.bitpos & 7))) & 1
        self.bitpos += 1
        return bit

    def read_huffman_bits(self, n: int) -> int:
        """
        Read n physical MSB-first bits, accumulating into bit 0 upwards.

        This matches the reversed canonical-code lookup used by the previous
        script and still appears to be correct.
        """
        value = 0

        for i in range(n):
            value |= self._read_physical_bit() << i

        return value

    def read_raw_msb(self, n: int) -> int:
        """
        Read n raw payload bits as a normal MSB-first integer.
        """
        value = 0

        for _ in range(n):
            value = (value << 1) | self._read_physical_bit()

        return value


def reverse_bits(value: int, width: int) -> int:
    out = 0

    for _ in range(width):
        out = (out << 1) | (value & 1)
        value >>= 1

    return out


def build_canonical_table(lengths: bytes) -> dict[tuple[int, int], int]:
    counts: dict[int, int] = {}

    for length in lengths:
        if length:
            counts[length] = counts.get(length, 0) + 1

    code = 0
    next_code: dict[int, int] = {}

    for bits in range(1, max(counts.keys(), default=0) + 1):
        code = (code + counts.get(bits - 1, 0)) << 1
        next_code[bits] = code

    table: dict[tuple[int, int], int] = {}

    for symbol, length in enumerate(lengths):
        if not length:
            continue

        canonical = next_code[length]
        next_code[length] += 1

        stored_code = reverse_bits(canonical, length)
        table[(stored_code, length)] = symbol

    return table


def decode_symbol(br: BitReader, table: dict[tuple[int, int], int]) -> int:
    code = 0

    for length in range(1, 32):
        code |= br.read_huffman_bits(1) << (length - 1)

        symbol = table.get((code, length))
        if symbol is not None:
            return symbol

    raise ValueError(f"bad Huffman code at bit {br.bitpos}")


def first_mismatch(a: bytes, b: bytes) -> int | None:
    for i, (x, y) in enumerate(zip(a, b)):
        if x != y:
            return i

    if len(a) != len(b):
        return min(len(a), len(b))

    return None


def decompress_partition(
    firmware: bytes,
    offset: int = 0x158000,
    output_limit: int | None = None,
) -> tuple[bytes, dict[str, int]]:
    output_size, packed_span = struct.unpack_from("<II", firmware, offset)

    lit_table_off = offset + 8
    dist_table_off = lit_table_off + 288
    stream_off = dist_table_off + 32
    packed_end = offset + packed_span

    if packed_end > len(firmware):
        raise ValueError(
            f"packed span ends beyond file: end={packed_end:#x}, file={len(firmware):#x}"
        )

    lit_lengths = firmware[lit_table_off:lit_table_off + 288]
    dist_lengths = firmware[dist_table_off:dist_table_off + 32]
    stream = firmware[stream_off:packed_end]

    lit_tree = build_canonical_table(lit_lengths)
    dist_tree = build_canonical_table(dist_lengths)

    if output_limit is None:
        output_limit = output_size

    br = BitReader(stream)
    out = bytearray()

    while len(out) < output_limit:
        symbol = decode_symbol(br, lit_tree)

        if symbol < 256:
            out.append(symbol)
            continue

        length_index = symbol - 256

        if length_index < 0 or length_index >= len(LBASE):
            raise ValueError(
                f"bad length symbol {symbol} at output={len(out):#x}, bit={br.bitpos}"
            )

        length = LBASE[length_index]

        distance_prefix = decode_symbol(br, dist_tree)
        distance_low7 = br.read_raw_msb(7)
        distance = (distance_prefix << 7) | distance_low7

        if distance <= 0 or distance > len(out):
            raise ValueError(
                f"invalid distance {distance} at output={len(out):#x}, "
                f"prefix={distance_prefix}, low7={distance_low7:#x}, bit={br.bitpos}"
            )

        for _ in range(length):
            out.append(out[-distance])

            if len(out) >= output_limit:
                break

    stats = {
        "output_size_header": output_size,
        "packed_span_header": packed_span,
        "stream_size": len(stream),
        "bits_consumed": br.bitpos,
        "bytes_consumed_rounded": (br.bitpos + 7) // 8,
        "unused_stream_bytes": len(stream) - ((br.bitpos + 7) // 8),
    }

    return bytes(out), stats


def main() -> None:
    parser = argparse.ArgumentParser(
        description="Experimental BU40N/MT1959 0x158000 packed-partition decoder"
    )
    parser.add_argument("firmware", help="input firmware .bin")
    parser.add_argument("-o", "--output", default="decoded_158000_v2.bin")
    parser.add_argument("--offset", default="0x158000")
    parser.add_argument(
        "--compare",
        help="optional RAM dump / oracle to compare against",
    )
    parser.add_argument(
        "--output-limit",
        default=None,
        help="override output limit, e.g. 0x71c84. Defaults to header[0].",
    )

    args = parser.parse_args()

    firmware = Path(args.firmware).read_bytes()
    offset = int(args.offset, 0)
    output_limit = int(args.output_limit, 0) if args.output_limit else None

    decoded, stats = decompress_partition(firmware, offset, output_limit)
    Path(args.output).write_bytes(decoded)

    print(f"partition offset:       {offset:#x}")
    print(f"output size header:     {stats['output_size_header']:#x}")
    print(f"packed span header:     {stats['packed_span_header']:#x}")
    print(f"stream size:            {stats['stream_size']:#x}")
    print(f"decoded size:           {len(decoded):#x}")
    print(f"bits consumed:          {stats['bits_consumed']}")
    print(f"bytes consumed rounded: {stats['bytes_consumed_rounded']:#x}")
    print(f"unused stream bytes:    {stats['unused_stream_bytes']:#x}")
    print(f"md5(decoded):           {hashlib.md5(decoded).hexdigest()}")
    print(f"wrote:                  {args.output}")

    if args.compare:
        oracle = Path(args.compare).read_bytes()
        mismatch = first_mismatch(decoded, oracle)

        print()
        print(f"compare file:           {args.compare}")
        print(f"compare size:           {len(oracle):#x}")
        print(f"md5(compare):           {hashlib.md5(oracle).hexdigest()}")

        if mismatch is None:
            print("compare result:         exact match")
        else:
            print(f"first mismatch:         {mismatch:#x}")
            print(
                "decoded bytes:          "
                + decoded[mismatch:mismatch + 16].hex(" ")
            )
            print(
                "compare bytes:          "
                + oracle[mismatch:mismatch + 16].hex(" ")
            )


if __name__ == "__main__":
    main()
Example usage:

Code: Select all

python3 decode_158000_v2.py BU40N_1.00_stock.bin \
  -o decoded_158000_v2.bin \
  --compare "BU40N 1.00 Decompressed Code.bin"
  
partition offset:       0x158000
output size header:     0x72464
packed span header:     0x4e98f
stream size:            0x4e847
decoded size:           0x72464
bits consumed:          2357865
bytes consumed rounded: 0x47f4e
unused stream bytes:    0x68f9
md5(decoded):           f6540a416982958434e499a94a89564f
wrote:                  decoded_158000_v2.bin

compare file:           BU40N 1.00 Decompressed Code.bin
compare size:           0x71c84
md5(compare):           8d3e2aa8df4beaf815a602afb1ee44b4
first mismatch:         0x28a
decoded bytes:          68 ff 04 00 01 00 5a 48 6f f0 aa ee 61 09 01 20
compare bytes:          22 fe 04 00 01 00 5a 48 6f f0 aa ee 61 09 01 20
Current conclusion:
  • The table split and canonical-Huffman layer still look correct.
  • The header fields were reversed in my first script.
  • Symbol 256 is a length symbol, not literal zero.
  • Distance coding appears to be prefix-Huffman + 7 raw bits.
  • The decoder now gets past the old 0x1a failure and matches to 0x28a.
  • It is still not fully solved.
ibizara
Posts: 3
Joined: Mon Jun 22, 2026 12:33 pm

Re: BU40N MT1959 firmware format: packed region at 0x158000 (compression method known?)

Post by ibizara »

Thanks again — I spent more time comparing the stock 1.00 firmware decode against the RAM dump, and there is a much better update now.

The short version is that the decompression side now looks effectively solved for the bytes covered by the RAM dump. The remaining raw differences are not random decompression errors; they appear to be a Thumb branch relocation/fixup pass applied after decompression.

Earlier I said the corrected decoder only matched until 0x28a. That was true for the v2 decoder, but it turned out v2 still had the length mapping wrong.

The corrected v3 findings are:
  • The header fields are still interpreted as:

    Code: Select all

    0x158000  nominal decompressed/output size / upper bound = 0x72464
    0x158004  packed partition span from 0x158000       = 0x4e98f
  • The table layout still looks correct:

    Code: Select all

    0x158008  288-byte literal/length code-length table
    0x158128   32-byte distance code-length table
    0x158148  compressed bitstream
  • The bitstream size is therefore:

    Code: Select all

    0x4e98f - 0x148 = 0x4e847
  • The Huffman layer is still canonical-Huffman with bit-reversed lookup, and the physical bitstream is MSB-first.
  • Symbols 0..255 are literals.
  • Symbols 256..287 are LZ copy lengths.
  • The big v3 correction is that the length mapping is linear, not DEFLATE-style.
So the length mapping is:

Code: Select all

symbol 256 -> length 3
symbol 257 -> length 4
symbol 258 -> length 5
...
symbol 287 -> length 34
or simply:

Code: Select all

length = symbol - 253
The distance mapping still appears to be:

Code: Select all

distance_prefix = Huffman-coded distance symbol
distance_low7   = next 7 raw bits, read MSB-first
distance        = (distance_prefix << 7) | distance_low7
With that v3 length fix, the raw decompressed output is no longer going off-track. Instead, the raw decompressed stream and the RAM dump differ at branch-looking Thumb-2 instructions, and those differences are explained by a relocation/fixup pass.

Current v3 run against BU40N 1.00 stock:

Code: Select all

partition offset:          0x158000
nominal output size:       0x72464
packed span:               0x4e98f
stream size:               0x4e847
decoded size:              0x72279
bits consumed:             2572856
bytes consumed rounded:    0x4e847
unused stream bytes:       0x0
status:                    EOF at output=0x72279, bit=2572856
md5(decoded raw):          8d371b8acfe9ae094a445af10fd6d160
Comparison against the RAM dump:

Code: Select all

compare size:              0x71c84
compare length used:       0x71c84
raw mismatching bytes:     10706
branch fixup sites:        2995
post-fixup mismatches:     0
post-fixup result:         exact match over compare length
decoded extends past RAM:  0x5f5
So after accounting for the branch fixup pattern, the decoded output matches the RAM dump exactly over the full RAM dump length.

This is not just isolated matching elsewhere. The result is continuous over:

Code: Select all

0x00000 .. 0x71c84
The raw decoded stream is:

Code: Select all

0x72279 bytes
The RAM dump is:

Code: Select all

0x71c84 bytes
So the decoder emits another:

Code: Select all

0x72279 - 0x71c84 = 0x5f5 bytes
after the end of the RAM dump.

That extra tail is not just zero padding. It contains useful-looking strings/tables such as:

Code: Select all

BDDMRUTIL_OBJVER_000D
BDPPCMD_OBJVER_0004
BDUTIL_OBJVER_0004
20160808
DVDPPCMD_OBJVER_0005
.m2ts
STREAM
VIDEO_TS
AUDIO_TS
So I think the RAM dump probably stops slightly before the true end of the decoded partition.

There is still a mismatch between the nominal header output size and the actual decoded stream size:

Code: Select all

header nominal size:  0x72464
actual decoded size:  0x72279
difference:           0x1eb
That now looks like the first field is a nominal output/workspace bound rather than the exact number of bytes emitted by the compressed stream.

The branch relocation/fixup is the interesting remaining bit.

Example at offset 0x2dc:

Code: Select all

raw decoded:  00 f0 00 f8
RAM dump:     ff f7 90 fe
The raw decoded instruction decodes as a BL-style immediate of 0. If I apply:

Code: Select all

new_imm = raw_imm - (output_offset + 4)
and re-encode the Thumb-2 branch immediate, it produces the RAM bytes exactly.

The v3 validation script does not blindly patch every BL-looking word, because data tables can contain values that look like Thumb instructions. It only counts/applies branch fixups that are confirmed by the RAM dump oracle. So the branch-fixup logic in this script is a validation tool, not yet a standalone loader.

Current conclusion:
  • The packed-region decompression for BU40N 1.00 now looks effectively solved for the RAM-covered range.
  • The old divergence at 0x1a was caused by wrong LZ semantics.
  • The later 0x28a-style differences were branch relocation/fixup differences, not general decompression failure.
  • The v3 decoder consumes the whole compressed bitstream: unused stream bytes = 0.
  • The raw decoded stream extends 0x5f5 bytes beyond the provided RAM dump.
  • The remaining unknown is how the real firmware/decompressor identifies which branch sites to relocate without using a RAM oracle.
For OmniDrive / 1.03MK, the practical warning is that the packed firmware probably stores the pre-relocation/raw branch form, not the RAM-style PC-relative branch form. So if we eventually patch decompressed code and recompress it, branch targets likely need to be represented in the raw/pre-fixup form, not copied directly from RAM.

Next things to try:
  • Run this v3 decoder against the 1.03MK block.
  • Compare that output against a matching 1.03MK RAM dump if available.
  • Reverse the real branch-fixup metadata/rule from the decompressor.
  • Only then start thinking seriously about recompression and safe patching.
Here is the current v3 decoder:

Code: Select all

#!/usr/bin/env python3
"""
decode_158000_v3.py

Experimental BU40N / MT1959 decoder for the packed block at 0x158000.

Status as of v3:

  * header[0] appears to be a nominal/decompressed upper size, not the packed size
  * header[1] appears to be the packed partition span from 0x158000
  * literal/length table: 288 one-byte canonical Huffman code lengths
  * distance table:       32 one-byte canonical Huffman code lengths
  * bitstream starts at:  offset + 0x148
  * bitstream physical bit order is MSB-first
  * Huffman lookup uses bit-reversed canonical codes
  * symbols 0..255 are literal bytes
  * symbols 256..287 are LZ copy lengths
  * length mapping is linear:
        length = symbol - 253
        therefore 256 -> 3, 257 -> 4, ..., 287 -> 34
  * distance mapping is:
        prefix = Huffman-coded distance symbol
        low7   = next 7 raw MSB-first bits
        distance = (prefix << 7) | low7

Important: the RAM dump appears to contain an extra runtime relocation/fixup pass
for Thumb-2 BL-like instructions. The raw decoded stream stores the branch
immediate as an absolute target/address-like value. The RAM image stores the
normal PC-relative branch encoding.

This script does NOT blindly patch every BL-looking word, because data tables
can contain values that look like BL instructions. Instead, when --compare is
provided, it counts and optionally applies only those branch fixups that are
confirmed by the RAM/oracle file.
"""

from __future__ import annotations

import argparse
import hashlib
import struct
from pathlib import Path


class BitReader:
    def __init__(self, data: bytes):
        self.data = data
        self.bitpos = 0

    def _read_physical_bit(self) -> int:
        if self.bitpos >= len(self.data) * 8:
            raise EOFError("ran out of compressed input")
        byte = self.data[self.bitpos >> 3]
        bit = (byte >> (7 - (self.bitpos & 7))) & 1
        self.bitpos += 1
        return bit

    def read_huffman_bits(self, n: int) -> int:
        # Physical bits are read MSB-first, but accumulated into bit 0 upwards
        # for the reversed canonical-code lookup.
        value = 0
        for i in range(n):
            value |= self._read_physical_bit() << i
        return value

    def read_raw_msb(self, n: int) -> int:
        value = 0
        for _ in range(n):
            value = (value << 1) | self._read_physical_bit()
        return value


def reverse_bits(value: int, width: int) -> int:
    out = 0
    for _ in range(width):
        out = (out << 1) | (value & 1)
        value >>= 1
    return out


def build_canonical_table(lengths: bytes) -> dict[tuple[int, int], int]:
    counts: dict[int, int] = {}

    for length in lengths:
        if length:
            counts[length] = counts.get(length, 0) + 1

    code = 0
    next_code: dict[int, int] = {}

    for bits in range(1, max(counts.keys(), default=0) + 1):
        code = (code + counts.get(bits - 1, 0)) << 1
        next_code[bits] = code

    table: dict[tuple[int, int], int] = {}

    for symbol, length in enumerate(lengths):
        if not length:
            continue

        canonical = next_code[length]
        next_code[length] += 1
        stored_code = reverse_bits(canonical, length)
        table[(stored_code, length)] = symbol

    return table


def decode_symbol(br: BitReader, table: dict[tuple[int, int], int]) -> int:
    code = 0

    for length in range(1, 32):
        code |= br.read_huffman_bits(1) << (length - 1)

        symbol = table.get((code, length))
        if symbol is not None:
            return symbol

    raise ValueError(f"bad Huffman code at bit {br.bitpos}")


def decode_partition(
    firmware: bytes,
    offset: int = 0x158000,
    output_limit: int | None = None,
) -> tuple[bytes, dict[str, int | str]]:
    nominal_output_size, packed_span = struct.unpack_from("<II", firmware, offset)

    lit_table_off = offset + 8
    dist_table_off = lit_table_off + 288
    stream_off = dist_table_off + 32
    packed_end = offset + packed_span

    if packed_end > len(firmware):
        raise ValueError(
            f"packed span extends beyond input file: end={packed_end:#x}, "
            f"file={len(firmware):#x}"
        )

    lit_lengths = firmware[lit_table_off:lit_table_off + 288]
    dist_lengths = firmware[dist_table_off:dist_table_off + 32]
    stream = firmware[stream_off:packed_end]

    lit_tree = build_canonical_table(lit_lengths)
    dist_tree = build_canonical_table(dist_lengths)

    if output_limit is None:
        output_limit = nominal_output_size

    br = BitReader(stream)
    out = bytearray()
    status = "ok"

    try:
        while len(out) < output_limit:
            symbol = decode_symbol(br, lit_tree)

            if symbol < 256:
                out.append(symbol)
                continue

            # v3 correction: linear length mapping, not DEFLATE-style bases.
            length = symbol - 253

            if length < 3:
                raise ValueError(
                    f"bad length symbol {symbol} at output={len(out):#x}, "
                    f"bit={br.bitpos}"
                )

            distance_prefix = decode_symbol(br, dist_tree)
            distance_low7 = br.read_raw_msb(7)
            distance = (distance_prefix << 7) | distance_low7

            if distance <= 0 or distance > len(out):
                raise ValueError(
                    f"invalid distance {distance:#x} at output={len(out):#x}, "
                    f"prefix={distance_prefix:#x}, low7={distance_low7:#x}, "
                    f"bit={br.bitpos}"
                )

            for _ in range(length):
                out.append(out[-distance])
                if len(out) >= output_limit:
                    break

    except EOFError:
        status = f"EOF at output={len(out):#x}, bit={br.bitpos}"

    stats: dict[str, int | str] = {
        "nominal_output_size": nominal_output_size,
        "packed_span": packed_span,
        "stream_size": len(stream),
        "bits_consumed": br.bitpos,
        "bytes_consumed_rounded": (br.bitpos + 7) // 8,
        "unused_stream_bytes": len(stream) - ((br.bitpos + 7) // 8),
        "status": status,
    }

    return bytes(out), stats


def thumb_bl_imm(h1: int, h2: int) -> int:
    # Thumb-2 BL-style immediate decode.
    s = (h1 >> 10) & 1
    imm10 = h1 & 0x3ff
    j1 = (h2 >> 13) & 1
    j2 = (h2 >> 11) & 1
    imm11 = h2 & 0x7ff

    i1 = (~(j1 ^ s)) & 1
    i2 = (~(j2 ^ s)) & 1

    imm = (
        (s << 24)
        | (i1 << 23)
        | (i2 << 22)
        | (imm10 << 12)
        | (imm11 << 1)
    )

    if s:
        imm -= 1 << 25

    return imm


def encode_thumb_bl_imm(imm: int, h1_orig: int, h2_orig: int) -> tuple[int, int]:
    if imm & 1:
        raise ValueError(f"odd Thumb BL immediate: {imm:#x}")

    val = imm & ((1 << 25) - 1)

    s = (val >> 24) & 1
    i1 = (val >> 23) & 1
    i2 = (val >> 22) & 1
    imm10 = (val >> 12) & 0x3ff
    imm11 = (val >> 1) & 0x7ff

    j1 = (~i1 ^ s) & 1
    j2 = (~i2 ^ s) & 1

    h1 = (h1_orig & 0xf800) | (s << 10) | imm10
    h2 = (h2_orig & 0xd000) | (j1 << 13) | (j2 << 11) | imm11

    return h1, h2


def branch_relocation_candidate(decoded: bytes, oracle: bytes, off: int) -> bool:
    if off + 4 > len(decoded) or off + 4 > len(oracle):
        return False

    h1, h2 = struct.unpack_from("<HH", decoded, off)

    # Thumb-2 BL/B.W-looking instruction. This can occur in data, so this
    # function is only safe because it checks the resulting bytes against oracle.
    if (h1 & 0xf800) != 0xf000 or (h2 & 0xd000) != 0xd000:
        return False

    imm = thumb_bl_imm(h1, h2)
    pc = off + 4
    new_imm = imm - pc

    if not (-(1 << 24) <= new_imm < (1 << 24)):
        return False

    new_h1, new_h2 = encode_thumb_bl_imm(new_imm, h1, h2)
    return struct.pack("<HH", new_h1, new_h2) == oracle[off:off + 4]


def compare_accounting_for_branch_fixups(
    decoded: bytes,
    oracle: bytes,
) -> tuple[bytes, dict[str, int]]:
    patched = bytearray(decoded)
    compare_len = min(len(decoded), len(oracle))
    branch_sites = 0

    for off in range(0, compare_len - 3, 2):
        if decoded[off:off + 4] == oracle[off:off + 4]:
            continue

        if branch_relocation_candidate(decoded, oracle, off):
            h1, h2 = struct.unpack_from("<HH", decoded, off)
            imm = thumb_bl_imm(h1, h2)
            new_h1, new_h2 = encode_thumb_bl_imm(imm - (off + 4), h1, h2)
            struct.pack_into("<HH", patched, off, new_h1, new_h2)
            branch_sites += 1

    mismatches = 0
    first_mismatch = -1

    for i in range(compare_len):
        if patched[i] != oracle[i]:
            mismatches += 1
            if first_mismatch < 0:
                first_mismatch = i

    raw_mismatches = sum(
        1 for i in range(compare_len) if decoded[i] != oracle[i]
    )

    stats = {
        "compare_len": compare_len,
        "raw_mismatching_bytes": raw_mismatches,
        "branch_fixup_sites": branch_sites,
        "post_fixup_mismatching_bytes": mismatches,
        "first_post_fixup_mismatch": first_mismatch,
    }

    return bytes(patched), stats


def main() -> None:
    parser = argparse.ArgumentParser(
        description="Experimental BU40N/MT1959 packed block decoder"
    )
    parser.add_argument("firmware", help="input firmware .bin")
    parser.add_argument("-o", "--output", default="decoded_158000_v3_raw.bin")
    parser.add_argument("--offset", default="0x158000")
    parser.add_argument("--output-limit", default=None)
    parser.add_argument("--compare", help="optional RAM dump/oracle")
    parser.add_argument(
        "--write-oracle-patched",
        help=(
            "optional output path for a RAM-style image patched only at "
            "branch sites confirmed by --compare"
        ),
    )

    args = parser.parse_args()

    firmware = Path(args.firmware).read_bytes()
    offset = int(args.offset, 0)
    output_limit = int(args.output_limit, 0) if args.output_limit else None

    decoded, stats = decode_partition(firmware, offset, output_limit)
    Path(args.output).write_bytes(decoded)

    print(f"partition offset:          {offset:#x}")
    print(f"nominal output size:       {stats['nominal_output_size']:#x}")
    print(f"packed span:               {stats['packed_span']:#x}")
    print(f"stream size:               {stats['stream_size']:#x}")
    print(f"decoded size:              {len(decoded):#x}")
    print(f"bits consumed:             {stats['bits_consumed']}")
    print(f"bytes consumed rounded:    {stats['bytes_consumed_rounded']:#x}")
    print(f"unused stream bytes:       {stats['unused_stream_bytes']:#x}")
    print(f"status:                    {stats['status']}")
    print(f"md5(decoded raw):          {hashlib.md5(decoded).hexdigest()}")
    print(f"wrote raw:                 {args.output}")

    if args.compare:
        oracle = Path(args.compare).read_bytes()
        patched, cstats = compare_accounting_for_branch_fixups(decoded, oracle)

        print()
        print(f"compare file:              {args.compare}")
        print(f"compare size:              {len(oracle):#x}")
        print(f"compare length used:       {cstats['compare_len']:#x}")
        print(f"raw mismatching bytes:     {cstats['raw_mismatching_bytes']}")
        print(f"branch fixup sites:        {cstats['branch_fixup_sites']}")
        print(f"post-fixup mismatches:     {cstats['post_fixup_mismatching_bytes']}")

        if cstats["first_post_fixup_mismatch"] >= 0:
            off = cstats["first_post_fixup_mismatch"]
            print(f"first post-fixup mismatch: {off:#x}")
            print(f"decoded bytes:             {patched[off:off + 16].hex(' ')}")
            print(f"oracle bytes:              {oracle[off:off + 16].hex(' ')}")
        else:
            print("post-fixup result:         exact match over compare length")

        if len(decoded) > len(oracle):
            print(f"decoded extends past RAM:  {len(decoded) - len(oracle):#x}")
        elif len(oracle) > len(decoded):
            print(f"RAM extends past decoded:  {len(oracle) - len(decoded):#x}")

        if args.write_oracle_patched:
            Path(args.write_oracle_patched).write_bytes(patched)
            print(f"wrote oracle-patched:      {args.write_oracle_patched}")
            print(f"md5(oracle-patched):       {hashlib.md5(patched).hexdigest()}")


if __name__ == "__main__":
    main()
Example usage:

Code: Select all

python3 decode_158000_v3.py BU40N_1.00_stock.bin \
  -o decoded_158000_v3_raw.bin \
  --compare "BU40N 1.00 Decompressed Code.bin" \
  --write-oracle-patched decoded_158000_v3_ramstyle.bin

partition offset:          0x158000
nominal output size:       0x72464
packed span:               0x4e98f
stream size:               0x4e847
decoded size:              0x72279
bits consumed:             2572856
bytes consumed rounded:    0x4e847
unused stream bytes:       0x0
status:                    EOF at output=0x72279, bit=2572856
md5(decoded raw):          8d371b8acfe9ae094a445af10fd6d160
wrote raw:                 decoded_158000_v3_raw.bin

compare file:              BU40N 1.00 Decompressed Code.bin
compare size:              0x71c84
compare length used:       0x71c84
raw mismatching bytes:     10706
branch fixup sites:        2995
post-fixup mismatches:     0
post-fixup result:         exact match over compare length
decoded extends past RAM:  0x5f5
wrote oracle-patched:      decoded_158000_v3_ramstyle.bin
md5(oracle-patched):       ea5b760e057c05188a77e36f6b41592e
Also, I currently cannot reply to private messages on here, but I am happy to compare notes. If anyone wants to contact me directly, please feel free to send a message using the ce.uk web form.
Last edited by ibizara on Tue Jun 30, 2026 5:23 pm, edited 1 time in total.
RibShark
Posts: 19
Joined: Mon Apr 29, 2019 6:27 pm

Re: BU40N MT1959 firmware format: packed region at 0x158000 (compression method known?)

Post by RibShark »

ibizara wrote: Tue Jun 30, 2026 5:21 pm The v3 validation script does not blindly patch every BL-looking word, because data tables can contain values that look like Thumb instructions.
Believe it or not the drive does exactly this. Here is some pseudocode as generated by IDA:

Code: Select all

i = 0;
if ( DecompressedDataSize >= 4 )
{
  do
  {
    instructions = &decompressedData[i];
    instruction1Byte1 = decompressedData[i + 1];
    isLongBranch = (instruction1Byte1 & 0xF8) == 0xF0;
    if ( isLongBranch )
    {
      instruction2Byte1 = instructions[3];
      isLongBranch = (~instruction2Byte1 & 0xF8) == 0;// (instruction2Byte1 & 0xF8) == 0xF8; 
    }
    if ( isLongBranch )
    {
      // Fixup Relocation
      v6 = (instructions[2] | (instruction1Byte1 << 19) | (decompressedData[i] << 11) | ((instruction2Byte1 & 7) << 8))
         - (i >> 1)
         - 2;
      instructions[1] = (v6 << 10 >> 29) | 0xF0;
      decompressedData[i] = v6 >> 11;
      instructions[3] = (v6 << 21 >> 29) | 0xF8;
      i += 2;
      instructions[2] = v6;
    }
    i += 2;
  }
  while ( i + 4 <= DecompressedDataSize );
}
Function for this is at 0x13DFE0 in the BU40N 1.00 firmware if you want to take a look yourself.

Also note that the MT1959 is ARMv5, so it's using THUMB rather than Thumb-2.
ibizara
Posts: 3
Joined: Mon Jun 22, 2026 12:33 pm

Re: BU40N MT1959 firmware format: packed region at 0x158000 (compression method known?)

Post by ibizara »

Small update.

Thanks again to RibShark — the RAM dump and the relocation pseudocode were the missing pieces.

The decompression side now looks much more solid. The packed block at 0x158000 appears to decode as:

firmware packed block
-> canonical Huffman + LZ-style decode
-> raw pre-fixup decompressed image
-> drive THUMB branch fixup pass
-> RAM-style image

The important correction from RibShark’s pseudocode is that the drive really does appear to blindly scan the decompressed data for THUMB long-branch-looking sequences and applies the fixup pass to them. That explains the differences I was previously seeing between the raw decoded output and the RAM dump.

I wrapped the current work into a Python tool:

Code: Select all

bu40n_mtk_packed_tool.py
It currently has three main commands:

Code: Select all

python3 bu40n_mtk_packed_tool.py extract firmware.bin \
  --raw-out decoded_raw.bin \
  --ramstyle-out decoded_ramstyle.bin \
  --report extract_report

Code: Select all

python3 bu40n_mtk_packed_tool.py roundtrip firmware.bin \
  --firmware-out roundtrip.bin \
  --report roundtrip_report

Code: Select all

python3 bu40n_mtk_packed_tool.py repack firmware.bin patched_decompressed.bin \
  --input-kind raw \
  --firmware-out patched_firmware.bin \
  --report repack_report
extract writes both forms:

raw = decompressed data before the drive branch-fixup pass
ramstyle = decompressed data after applying the drive-style branch fixups

repack can take either raw or ramstyle input. If --input-kind ramstyle is used, it first undoes the branch fixups before recompressing, because the packed firmware appears to store the pre-fixup/raw form.

I tested it on both BU40N 1.00 stock and BU40N 1.03MK.

BU40N 1.00 stock:

Code: Select all

offset:                   0x158000
nominal output size:      0x72464
packed span:              0x4e98f
stream size:              0x4e847

status:                   EOF at output=0x72279, bit=2572856
raw size:                 0x72279
nominal - raw size:       0x1eb
unused stream bytes:      0x0
branch fixup sites:       2995

raw md5:                  8d371b8acfe9ae094a445af10fd6d160
ramstyle md5:             ea5b760e057c05188a77e36f6b41592e
BU40N 1.03MK:

Code: Select all

offset:                   0x158000
nominal output size:      0x71c60
packed span:              0x4e3b1
stream size:              0x4e269

status:                   EOF at output=0x71a75, bit=2560840
raw size:                 0x71a75
nominal - raw size:       0x1eb
unused stream bytes:      0x0
branch fixup sites:       2959

raw md5:                  d55adc97845348d062c8c40bbed61840
ramstyle md5:             70ee8eb12c457779480fd24edcb139e7
The EOF at output=... status is expected here. The decoder consumes the whole compressed bitstream exactly, with unused stream bytes = 0. The first header field seems to be a nominal output/workspace bound rather than the exact emitted size, which would explain the consistent 0x1eb difference on both firmwares.

The recompressor does not reproduce the original packed bytes exactly, but round-tripping does work: decode -> recompress -> decode gives the same raw decompressed output, and applying the branch-fixup pass gives the same RAM-style output.

One caveat: this only handles the packed block itself. I am not yet claiming anything about wider firmware checksums, signatures, or whether a modified full image is safe to flash. For now this is mainly useful for examining, decoding, recompressing, and testing the packed region.

Code: Select all

#!/usr/bin/env python3
"""
bu40n_mtk_packed_tool.py

All-purpose experimental tool for the BU40N / MT1959 packed firmware block
at offset 0x158000.

It can:
  * extract/decompress the packed block from a firmware image
  * write raw pre-fixup and/or RAM-style post-fixup output
  * write a detailed text report and JSON metadata
  * recompress a raw or RAM-style decompressed block
  * insert the recompressed block back into a firmware image
  * verify by immediately decoding the newly written block

Current format assumptions, based on BU40N 1.00 stock and BU40N 1.03MK:
  * header[0] = nominal decompressed/output size / workspace upper bound
  * header[1] = packed partition span from partition offset
  * offset + 0x008: 288-byte literal/length canonical-Huffman table
  * offset + 0x128:  32-byte distance canonical-Huffman table
  * offset + 0x148: compressed bitstream
  * physical bitstream order is MSB-first
  * Huffman lookup uses bit-reversed canonical codes
  * literal symbols 0..255 emit a byte
  * symbols 256..287 are LZ copy lengths, length = symbol - 253
  * distance = (Huffman-coded prefix << 7) | next 7 raw MSB-first bits
  * distance range is 1..4095
  * copy length range is 3..34
  * after decompression, the drive applies a blind THUMB long-branch fixup pass

The compressor intentionally does NOT try to reproduce the original byte stream.
It emits a valid stream using the existing code-length tables from the target
firmware, then verifies that decoding it reproduces the requested raw image.
"""

from __future__ import annotations

import argparse
import binascii
import dataclasses
import hashlib
import json
import struct
import sys
from collections import defaultdict
from pathlib import Path
from typing import Dict, Iterable, List, Optional, Sequence, Tuple

DEFAULT_OFFSET = 0x158000
LITLEN_COUNT = 288
DIST_COUNT = 32
HEADER_SIZE = 8
LIT_TABLE_REL = 0x008
DIST_TABLE_REL = 0x128
STREAM_REL = 0x148
MIN_MATCH = 3
MAX_MATCH = 34
MAX_DISTANCE = (DIST_COUNT << 7) - 1  # 4095
NOMINAL_GAP_OBSERVED = 0x1EB


@dataclasses.dataclass(frozen=True)
class Hashes:
    length: int
    crc32: str
    md5: str
    sha1: str


@dataclasses.dataclass(frozen=True)
class PartitionInfo:
    offset: int
    nominal_output_size: int
    packed_span: int
    packed_end: int
    lit_table_offset: int
    dist_table_offset: int
    stream_offset: int
    stream_size: int
    lit_kraft_sum: float
    dist_kraft_sum: float
    firmware_size: int


@dataclasses.dataclass(frozen=True)
class DecodeResult:
    raw: bytes
    ramstyle: bytes
    info: PartitionInfo
    bits_consumed: int
    bytes_consumed_rounded: int
    unused_stream_bytes: int
    status: str
    branch_fixup_sites: int


@dataclasses.dataclass(frozen=True)
class CompressResult:
    stream: bytes
    packed_block: bytes
    nominal_output_size: int
    packed_span: int
    raw_size: int
    bits_written: int
    branch_unfix_sites: int
    branch_fixup_sites: int
    matches: int
    literals: int
    compressed_stream_size: int


class BitReader:
    def __init__(self, data: bytes):
        self.data = data
        self.bitpos = 0

    def _read_physical_bit(self) -> int:
        if self.bitpos >= len(self.data) * 8:
            raise EOFError("ran out of compressed input")
        byte = self.data[self.bitpos >> 3]
        bit = (byte >> (7 - (self.bitpos & 7))) & 1
        self.bitpos += 1
        return bit

    def read_huffman_bits(self, n: int) -> int:
        value = 0
        for i in range(n):
            value |= self._read_physical_bit() << i
        return value

    def read_raw_msb(self, n: int) -> int:
        value = 0
        for _ in range(n):
            value = (value << 1) | self._read_physical_bit()
        return value


class BitWriter:
    def __init__(self):
        self.buf = bytearray()
        self.bitpos = 0

    def _write_physical_bit(self, bit: int) -> None:
        if self.bitpos % 8 == 0:
            self.buf.append(0)
        if bit & 1:
            self.buf[-1] |= 1 << (7 - (self.bitpos & 7))
        self.bitpos += 1

    def write_huffman_code(self, reversed_code: int, length: int) -> None:
        # Decoder accumulates physical bits into bit 0 upwards, so write the
        # reversed-code integer from LSB to MSB.
        for i in range(length):
            self._write_physical_bit((reversed_code >> i) & 1)

    def write_raw_msb(self, value: int, n: int) -> None:
        for i in range(n - 1, -1, -1):
            self._write_physical_bit((value >> i) & 1)

    def finish(self) -> bytes:
        return bytes(self.buf)


def file_hashes(data: bytes) -> Hashes:
    return Hashes(
        length=len(data),
        crc32=f"{binascii.crc32(data) & 0xffffffff:08x}",
        md5=hashlib.md5(data).hexdigest(),
        sha1=hashlib.sha1(data).hexdigest(),
    )


def reverse_bits(value: int, width: int) -> int:
    out = 0
    for _ in range(width):
        out = (out << 1) | (value & 1)
        value >>= 1
    return out


def kraft_sum(lengths: bytes) -> float:
    return sum(2.0 ** -length for length in lengths if length)


def build_decode_table(lengths: bytes) -> Dict[Tuple[int, int], int]:
    encode = build_encode_table(lengths)
    return {(code, length): symbol for symbol, (code, length) in encode.items()}


def build_encode_table(lengths: bytes) -> Dict[int, Tuple[int, int]]:
    counts: Dict[int, int] = {}
    for length in lengths:
        if length:
            counts[length] = counts.get(length, 0) + 1

    code = 0
    next_code: Dict[int, int] = {}
    for bits in range(1, max(counts.keys(), default=0) + 1):
        code = (code + counts.get(bits - 1, 0)) << 1
        next_code[bits] = code

    table: Dict[int, Tuple[int, int]] = {}
    for symbol, length in enumerate(lengths):
        if not length:
            continue
        canonical = next_code[length]
        next_code[length] += 1
        table[symbol] = (reverse_bits(canonical, length), length)
    return table


def decode_symbol(br: BitReader, table: Dict[Tuple[int, int], int]) -> int:
    code = 0
    for length in range(1, 32):
        code |= br.read_huffman_bits(1) << (length - 1)
        symbol = table.get((code, length))
        if symbol is not None:
            return symbol
    raise ValueError(f"bad Huffman code at bit {br.bitpos}")


def read_partition_info(firmware: bytes, offset: int = DEFAULT_OFFSET) -> PartitionInfo:
    if offset < 0 or offset + STREAM_REL > len(firmware):
        raise ValueError(f"offset {offset:#x} is outside firmware size {len(firmware):#x}")

    nominal_output_size, packed_span = struct.unpack_from("<II", firmware, offset)
    packed_end = offset + packed_span
    lit_table_offset = offset + LIT_TABLE_REL
    dist_table_offset = offset + DIST_TABLE_REL
    stream_offset = offset + STREAM_REL

    if packed_span < STREAM_REL:
        raise ValueError(f"packed span {packed_span:#x} is smaller than header/tables {STREAM_REL:#x}")
    if packed_end > len(firmware):
        raise ValueError(
            f"packed span ends beyond firmware: end={packed_end:#x}, firmware={len(firmware):#x}"
        )

    lit_lengths = firmware[lit_table_offset:lit_table_offset + LITLEN_COUNT]
    dist_lengths = firmware[dist_table_offset:dist_table_offset + DIST_COUNT]

    return PartitionInfo(
        offset=offset,
        nominal_output_size=nominal_output_size,
        packed_span=packed_span,
        packed_end=packed_end,
        lit_table_offset=lit_table_offset,
        dist_table_offset=dist_table_offset,
        stream_offset=stream_offset,
        stream_size=packed_span - STREAM_REL,
        lit_kraft_sum=kraft_sum(lit_lengths),
        dist_kraft_sum=kraft_sum(dist_lengths),
        firmware_size=len(firmware),
    )


def _looks_like_thumb_long_branch(data: bytearray | bytes, i: int) -> bool:
    return i + 4 <= len(data) and (data[i + 1] & 0xF8) == 0xF0 and (data[i + 3] & 0xF8) == 0xF8


def _extract_drive_branch_value(data: bytearray | bytes, i: int) -> int:
    # This is the exact value construction from the ARM fixup routine, expressed
    # in byte terms.
    return (
        data[i + 2]
        | (data[i + 1] << 19)
        | (data[i] << 11)
        | ((data[i + 3] & 7) << 8)
    )


def _store_drive_branch_value(data: bytearray, i: int, value: int) -> None:
    # Re-encode using the exact byte placement used by the drive routine.
    value &= 0xFFFFFFFF
    data[i + 1] = 0xF0 | ((value >> 19) & 7)
    data[i] = (value >> 11) & 0xFF
    data[i + 3] = 0xF8 | ((value >> 8) & 7)
    data[i + 2] = value & 0xFF


def apply_drive_branch_fixups(raw: bytes) -> Tuple[bytes, int]:
    """Apply the drive's blind THUMB long-branch relocation pass."""
    data = bytearray(raw)
    i = 0
    sites = 0
    n = len(data)
    while i + 4 <= n:
        if _looks_like_thumb_long_branch(data, i):
            value = _extract_drive_branch_value(data, i)
            value = (value - (i >> 1) - 2) & 0xFFFFFFFF
            _store_drive_branch_value(data, i, value)
            sites += 1
            i += 2
        i += 2
    return bytes(data), sites


def undo_drive_branch_fixups(ramstyle: bytes) -> Tuple[bytes, int]:
    """Inverse of apply_drive_branch_fixups for preparing RAM-style code for firmware storage."""
    data = bytearray(ramstyle)
    i = 0
    sites = 0
    n = len(data)
    while i + 4 <= n:
        if _looks_like_thumb_long_branch(data, i):
            value = _extract_drive_branch_value(data, i)
            value = (value + (i >> 1) + 2) & 0xFFFFFFFF
            _store_drive_branch_value(data, i, value)
            sites += 1
            i += 2
        i += 2
    return bytes(data), sites


def decode_packed_block(
    firmware: bytes,
    offset: int = DEFAULT_OFFSET,
    output_limit: Optional[int] = None,
) -> DecodeResult:
    """Decode the packed partition and also return the drive/RAM-style fixed image."""
    info = read_partition_info(firmware, offset)
    lit_lengths = firmware[info.lit_table_offset:info.lit_table_offset + LITLEN_COUNT]
    dist_lengths = firmware[info.dist_table_offset:info.dist_table_offset + DIST_COUNT]
    stream = firmware[info.stream_offset:info.packed_end]

    lit_tree = build_decode_table(lit_lengths)
    dist_tree = build_decode_table(dist_lengths)

    if output_limit is None:
        output_limit = info.nominal_output_size

    br = BitReader(stream)
    out = bytearray()
    status = "ok"

    try:
        while len(out) < output_limit:
            symbol = decode_symbol(br, lit_tree)
            if symbol < 256:
                out.append(symbol)
                continue

            length = symbol - 253  # 256 -> 3, ..., 287 -> 34
            if length < MIN_MATCH or length > MAX_MATCH:
                raise ValueError(f"bad length symbol {symbol} at output={len(out):#x}, bit={br.bitpos}")

            distance_prefix = decode_symbol(br, dist_tree)
            distance_low7 = br.read_raw_msb(7)
            distance = (distance_prefix << 7) | distance_low7
            if distance <= 0 or distance > len(out):
                raise ValueError(
                    f"invalid distance {distance:#x} at output={len(out):#x}, "
                    f"prefix={distance_prefix:#x}, low7={distance_low7:#x}, bit={br.bitpos}"
                )

            for _ in range(length):
                out.append(out[-distance])
                if len(out) >= output_limit:
                    break
    except EOFError:
        status = f"EOF at output={len(out):#x}, bit={br.bitpos}"

    raw = bytes(out)
    ramstyle, sites = apply_drive_branch_fixups(raw)
    bytes_used = (br.bitpos + 7) // 8

    return DecodeResult(
        raw=raw,
        ramstyle=ramstyle,
        info=info,
        bits_consumed=br.bitpos,
        bytes_consumed_rounded=bytes_used,
        unused_stream_bytes=len(stream) - bytes_used,
        status=status,
        branch_fixup_sites=sites,
    )


def _write_symbol(bw: BitWriter, enc: Dict[int, Tuple[int, int]], symbol: int) -> None:
    try:
        code, length = enc[symbol]
    except KeyError:
        raise ValueError(f"symbol {symbol} has no Huffman code in target table") from None
    bw.write_huffman_code(code, length)


def _literal_cost(lit_lengths: Sequence[int], data: bytes, pos: int, length: int) -> int:
    return sum(lit_lengths[data[pos + j]] for j in range(length))


def _build_match_candidates(data: bytes, pos: int, chains: Dict[bytes, List[int]], max_chain: int) -> List[int]:
    if pos + MIN_MATCH > len(data):
        return []
    key = data[pos:pos + MIN_MATCH]
    prevs = chains.get(key)
    if not prevs:
        return []
    min_prev = max(0, pos - MAX_DISTANCE)
    candidates: List[int] = []
    # Recent positions tend to produce longer matches and lower distances.
    for p in reversed(prevs):
        if p < min_prev:
            break
        candidates.append(p)
        if len(candidates) >= max_chain:
            break
    return candidates


def _best_match_at(
    data: bytes,
    pos: int,
    chains: Dict[bytes, List[int]],
    lit_lengths: Sequence[int],
    dist_lengths: Sequence[int],
    max_chain: int,
) -> Optional[Tuple[int, int, int]]:
    """Return (length, distance, saving_bits) for the best cost-saving match at pos."""
    n = len(data)
    if pos + MIN_MATCH > n:
        return None

    best: Optional[Tuple[int, int, int, int]] = None  # saving, length, distance, bit_cost
    for prev in _build_match_candidates(data, pos, chains, max_chain):
        distance = pos - prev
        if distance <= 0 or distance > MAX_DISTANCE:
            continue
        dist_prefix = distance >> 7
        if dist_prefix >= len(dist_lengths) or dist_lengths[dist_prefix] == 0:
            continue

        max_len = min(MAX_MATCH, n - pos)
        length = 0
        # LZ77 copy can overlap, but for match discovery comparing source bytes
        # in the already-produced output works because previous bytes are data.
        while length < max_len and data[prev + length] == data[pos + length]:
            length += 1
            # If the match overlaps, data[prev + length] remains valid because
            # it is from the final target buffer.
        if length < MIN_MATCH:
            continue

        # Pick the best length for this distance by actual Huffman cost. There
        # are no length extra bits in this format.
        for l in range(MIN_MATCH, length + 1):
            length_symbol = l + 253
            lit_cost = _literal_cost(lit_lengths, data, pos, l)
            match_cost = lit_lengths[length_symbol] + dist_lengths[dist_prefix] + 7
            saving = lit_cost - match_cost
            if saving <= 0:
                continue
            if best is None or saving > best[0] or (saving == best[0] and l > best[1]):
                best = (saving, l, distance, match_cost)

    if best is None:
        return None
    saving, length, distance, _cost = best
    return length, distance, saving


def _add_position_to_chains(data: bytes, pos: int, chains: Dict[bytes, List[int]]) -> None:
    if pos + MIN_MATCH <= len(data):
        chains[data[pos:pos + MIN_MATCH]].append(pos)


def encode_lz_stream(
    raw: bytes,
    lit_lengths: bytes,
    dist_lengths: bytes,
    *,
    max_chain: int = 256,
    lazy: bool = True,
) -> Tuple[bytes, Dict[str, int]]:
    """Compress raw bytes into a valid BU40N/MT1959 bitstream using target tables."""
    lit_enc = build_encode_table(lit_lengths)
    dist_enc = build_encode_table(dist_lengths)

    required_symbols = list(range(256)) + list(range(256, 288))
    missing = [s for s in required_symbols if s not in lit_enc]
    if missing:
        raise ValueError(f"target literal/length table cannot encode required symbols: {missing[:10]}")
    missing_dist = [s for s in range(DIST_COUNT) if s not in dist_enc]
    if missing_dist:
        raise ValueError(f"target distance table cannot encode prefixes: {missing_dist[:10]}")

    lit_costs = list(lit_lengths)
    dist_costs = list(dist_lengths)
    bw = BitWriter()
    chains: Dict[bytes, List[int]] = defaultdict(list)
    pos = 0
    literals = 0
    matches = 0
    n = len(raw)

    while pos < n:
        best = _best_match_at(raw, pos, chains, lit_costs, dist_costs, max_chain)

        # One-symbol lazy parsing: use a literal now if the next position has a
        # materially better match. This is simple but usually helps size.
        if lazy and best is not None and pos + 1 < n:
            _add_position_to_chains(raw, pos, chains)
            next_best = _best_match_at(raw, pos + 1, chains, lit_costs, dist_costs, max_chain)
            # Undo the temporary chain addition by popping it back off.
            if pos + MIN_MATCH <= n:
                chains[raw[pos:pos + MIN_MATCH]].pop()
            if next_best is not None and next_best[2] > best[2] + lit_costs[raw[pos]]:
                best = None

        if best is None:
            _write_symbol(bw, lit_enc, raw[pos])
            _add_position_to_chains(raw, pos, chains)
            pos += 1
            literals += 1
            continue

        length, distance, _saving = best
        length_symbol = length + 253
        dist_prefix = distance >> 7
        dist_low7 = distance & 0x7F

        _write_symbol(bw, lit_enc, length_symbol)
        _write_symbol(bw, dist_enc, dist_prefix)
        bw.write_raw_msb(dist_low7, 7)

        for p in range(pos, pos + length):
            _add_position_to_chains(raw, p, chains)
        pos += length
        matches += 1

    stream = bw.finish()
    stats = {
        "bits_written": bw.bitpos,
        "bytes_written": len(stream),
        "literals": literals,
        "matches": matches,
    }
    return stream, stats


def make_packed_block(
    firmware: bytes,
    decompressed: bytes,
    offset: int = DEFAULT_OFFSET,
    *,
    input_kind: str = "raw",
    nominal_output_size: Optional[int] = None,
    max_chain: int = 256,
    lazy: bool = True,
) -> CompressResult:
    """
    Create a replacement packed block using tables from firmware.

    input_kind:
      * raw      = decompressed bytes are already pre-fixup firmware form
      * ramstyle = decompressed bytes are runtime/RAM-style and need unfixing
    """
    info = read_partition_info(firmware, offset)
    lit_lengths = firmware[info.lit_table_offset:info.lit_table_offset + LITLEN_COUNT]
    dist_lengths = firmware[info.dist_table_offset:info.dist_table_offset + DIST_COUNT]

    if input_kind == "raw":
        raw = decompressed
        unfix_sites = 0
    elif input_kind == "ramstyle":
        raw, unfix_sites = undo_drive_branch_fixups(decompressed)
    else:
        raise ValueError("input_kind must be 'raw' or 'ramstyle'")

    if nominal_output_size is None:
        # Keep the target firmware's original nominal/workspace bound if it is
        # still large enough. If the edited raw image grows, use the observed
        # firmware convention of actual_size + 0x1eb.
        nominal_output_size = max(info.nominal_output_size, len(raw) + NOMINAL_GAP_OBSERVED)

    stream, estats = encode_lz_stream(
        raw,
        lit_lengths,
        dist_lengths,
        max_chain=max_chain,
        lazy=lazy,
    )
    packed_span = STREAM_REL + len(stream)
    packed_block = (
        struct.pack("<II", nominal_output_size, packed_span)
        + lit_lengths
        + dist_lengths
        + stream
    )

    ramstyle, fix_sites = apply_drive_branch_fixups(raw)

    return CompressResult(
        stream=stream,
        packed_block=packed_block,
        nominal_output_size=nominal_output_size,
        packed_span=packed_span,
        raw_size=len(raw),
        bits_written=estats["bits_written"],
        branch_unfix_sites=unfix_sites,
        branch_fixup_sites=fix_sites,
        matches=estats["matches"],
        literals=estats["literals"],
        compressed_stream_size=len(stream),
    )


def patch_firmware_image(
    firmware: bytes,
    packed_block: bytes,
    offset: int = DEFAULT_OFFSET,
    *,
    allow_grow: bool = False,
) -> bytes:
    old_info = read_partition_info(firmware, offset)
    old_span = old_info.packed_span
    new_span = len(packed_block)
    if new_span > old_span and not allow_grow:
        raise ValueError(
            f"new packed block is larger than old block: new={new_span:#x}, old={old_span:#x}; "
            "use --allow-grow only if you know the surrounding firmware layout/checksums permit it"
        )

    out = bytearray(firmware)
    if new_span <= old_span:
        # Replace the live block and leave trailing bytes untouched; the updated
        # packed-span header makes the decoder ignore that tail.
        out[offset:offset + new_span] = packed_block
    else:
        out[offset:offset + old_span] = packed_block
    return bytes(out)


def _as_dict(obj) -> dict:
    if dataclasses.is_dataclass(obj):
        return dataclasses.asdict(obj)
    raise TypeError(type(obj).__name__)


def build_report(
    *,
    command: str,
    firmware_path: Path,
    firmware: bytes,
    info: PartitionInfo,
    decoded: Optional[DecodeResult] = None,
    compressed: Optional[CompressResult] = None,
    input_path: Optional[Path] = None,
    input_data: Optional[bytes] = None,
    output_firmware: Optional[bytes] = None,
    notes: Optional[List[str]] = None,
) -> Tuple[str, dict]:
    report: dict = {
        "command": command,
        "firmware_path": str(firmware_path),
        "firmware_hashes": _as_dict(file_hashes(firmware)),
        "partition": _as_dict(info),
        "notes": notes or [],
    }

    if decoded is not None:
        report["decode"] = {
            "status": decoded.status,
            "bits_consumed": decoded.bits_consumed,
            "bytes_consumed_rounded": decoded.bytes_consumed_rounded,
            "unused_stream_bytes": decoded.unused_stream_bytes,
            "nominal_minus_raw_size": info.nominal_output_size - len(decoded.raw),
            "branch_fixup_sites": decoded.branch_fixup_sites,
            "raw_hashes": _as_dict(file_hashes(decoded.raw)),
            "ramstyle_hashes": _as_dict(file_hashes(decoded.ramstyle)),
        }

    if input_path is not None and input_data is not None:
        report["input_decompressed_path"] = str(input_path)
        report["input_decompressed_hashes"] = _as_dict(file_hashes(input_data))

    if compressed is not None:
        report["compress"] = {
            "raw_size": compressed.raw_size,
            "nominal_output_size": compressed.nominal_output_size,
            "packed_span": compressed.packed_span,
            "stream_size": compressed.compressed_stream_size,
            "bits_written": compressed.bits_written,
            "literals": compressed.literals,
            "matches": compressed.matches,
            "branch_unfix_sites": compressed.branch_unfix_sites,
            "branch_fixup_sites": compressed.branch_fixup_sites,
            "fits_original_span": compressed.packed_span <= info.packed_span,
            "old_packed_span": info.packed_span,
            "span_delta_new_minus_old": compressed.packed_span - info.packed_span,
            "stream_hashes": _as_dict(file_hashes(compressed.stream)),
            "packed_block_hashes": _as_dict(file_hashes(compressed.packed_block)),
        }

    if output_firmware is not None:
        report["output_firmware_hashes"] = _as_dict(file_hashes(output_firmware))

    text_lines: List[str] = []
    text_lines.append("BU40N / MT1959 packed block report")
    text_lines.append("=" * 40)
    text_lines.append(f"command:                  {command}")
    text_lines.append(f"firmware:                 {firmware_path}")
    fh = file_hashes(firmware)
    text_lines.append(f"firmware length:          {fh.length:#x}")
    text_lines.append(f"firmware crc32:           {fh.crc32}")
    text_lines.append(f"firmware md5:             {fh.md5}")
    text_lines.append(f"firmware sha1:            {fh.sha1}")
    text_lines.append("")
    text_lines.append("Partition")
    text_lines.append("---------")
    text_lines.append(f"offset:                   {info.offset:#x}")
    text_lines.append(f"nominal output size:      {info.nominal_output_size:#x}")
    text_lines.append(f"packed span:              {info.packed_span:#x}")
    text_lines.append(f"packed end:               {info.packed_end:#x}")
    text_lines.append(f"literal table offset:     {info.lit_table_offset:#x}")
    text_lines.append(f"distance table offset:    {info.dist_table_offset:#x}")
    text_lines.append(f"stream offset:            {info.stream_offset:#x}")
    text_lines.append(f"stream size:              {info.stream_size:#x}")
    text_lines.append(f"literal Kraft sum:        {info.lit_kraft_sum:.12g}")
    text_lines.append(f"distance Kraft sum:       {info.dist_kraft_sum:.12g}")

    if decoded is not None:
        rh = file_hashes(decoded.raw)
        mh = file_hashes(decoded.ramstyle)
        text_lines.append("")
        text_lines.append("Decode")
        text_lines.append("------")
        text_lines.append(f"status:                   {decoded.status}")
        text_lines.append(f"raw size:                 {rh.length:#x}")
        text_lines.append(f"ramstyle size:            {mh.length:#x}")
        text_lines.append(f"nominal - raw size:       {info.nominal_output_size - rh.length:#x}")
        text_lines.append(f"bits consumed:            {decoded.bits_consumed}")
        text_lines.append(f"bytes consumed rounded:   {decoded.bytes_consumed_rounded:#x}")
        text_lines.append(f"unused stream bytes:      {decoded.unused_stream_bytes:#x}")
        text_lines.append(f"branch fixup sites:       {decoded.branch_fixup_sites}")
        text_lines.append(f"raw crc32:                {rh.crc32}")
        text_lines.append(f"raw md5:                  {rh.md5}")
        text_lines.append(f"raw sha1:                 {rh.sha1}")
        text_lines.append(f"ramstyle crc32:           {mh.crc32}")
        text_lines.append(f"ramstyle md5:             {mh.md5}")
        text_lines.append(f"ramstyle sha1:            {mh.sha1}")

    if input_path is not None and input_data is not None:
        ih = file_hashes(input_data)
        text_lines.append("")
        text_lines.append("Input decompressed image")
        text_lines.append("------------------------")
        text_lines.append(f"path:                     {input_path}")
        text_lines.append(f"length:                   {ih.length:#x}")
        text_lines.append(f"crc32:                    {ih.crc32}")
        text_lines.append(f"md5:                      {ih.md5}")
        text_lines.append(f"sha1:                     {ih.sha1}")

    if compressed is not None:
        sh = file_hashes(compressed.stream)
        bh = file_hashes(compressed.packed_block)
        text_lines.append("")
        text_lines.append("Compress")
        text_lines.append("--------")
        text_lines.append(f"raw size:                 {compressed.raw_size:#x}")
        text_lines.append(f"nominal output size:      {compressed.nominal_output_size:#x}")
        text_lines.append(f"packed span:              {compressed.packed_span:#x}")
        text_lines.append(f"stream size:              {compressed.compressed_stream_size:#x}")
        text_lines.append(f"bits written:             {compressed.bits_written}")
        text_lines.append(f"literals:                 {compressed.literals}")
        text_lines.append(f"matches:                  {compressed.matches}")
        text_lines.append(f"branch unfix sites:       {compressed.branch_unfix_sites}")
        text_lines.append(f"branch fixup sites:       {compressed.branch_fixup_sites}")
        text_lines.append(f"old packed span:          {info.packed_span:#x}")
        text_lines.append(f"fits original span:       {compressed.packed_span <= info.packed_span}")
        text_lines.append(f"span delta new-old:       {compressed.packed_span - info.packed_span:+#x}")
        text_lines.append(f"stream crc32:             {sh.crc32}")
        text_lines.append(f"stream md5:               {sh.md5}")
        text_lines.append(f"stream sha1:              {sh.sha1}")
        text_lines.append(f"packed block crc32:       {bh.crc32}")
        text_lines.append(f"packed block md5:         {bh.md5}")
        text_lines.append(f"packed block sha1:        {bh.sha1}")

    if output_firmware is not None:
        oh = file_hashes(output_firmware)
        text_lines.append("")
        text_lines.append("Output firmware")
        text_lines.append("---------------")
        text_lines.append(f"length:                   {oh.length:#x}")
        text_lines.append(f"crc32:                    {oh.crc32}")
        text_lines.append(f"md5:                      {oh.md5}")
        text_lines.append(f"sha1:                     {oh.sha1}")

    if notes:
        text_lines.append("")
        text_lines.append("Notes")
        text_lines.append("-----")
        for note in notes:
            text_lines.append(f"- {note}")

    return "\n".join(text_lines) + "\n", report


def write_reports(base: Optional[Path], text: str, data: dict) -> None:
    if base is None:
        return
    base.parent.mkdir(parents=True, exist_ok=True)
    base.with_suffix(base.suffix + ".txt" if base.suffix else ".txt").write_text(text, encoding="utf-8")
    base.with_suffix(base.suffix + ".json" if base.suffix else ".json").write_text(
        json.dumps(data, indent=2, sort_keys=True), encoding="utf-8"
    )


def cmd_extract(args: argparse.Namespace) -> int:
    firmware_path = Path(args.firmware)
    firmware = firmware_path.read_bytes()
    decoded = decode_packed_block(firmware, int(args.offset, 0), args.output_limit)

    if args.raw_out:
        Path(args.raw_out).write_bytes(decoded.raw)
    if args.ramstyle_out:
        Path(args.ramstyle_out).write_bytes(decoded.ramstyle)

    notes = []
    if decoded.unused_stream_bytes != 0:
        notes.append("compressed stream was not consumed exactly; investigate before patching")
    if decoded.info.nominal_output_size - len(decoded.raw) != NOMINAL_GAP_OBSERVED:
        notes.append("nominal-output-size gap differs from the 0x1eb pattern observed on BU40N 1.00/1.03MK")

    text, report = build_report(
        command="extract",
        firmware_path=firmware_path,
        firmware=firmware,
        info=decoded.info,
        decoded=decoded,
        notes=notes,
    )
    if args.report:
        write_reports(Path(args.report), text, report)
    print(text, end="")
    return 0


def cmd_repack(args: argparse.Namespace) -> int:
    firmware_path = Path(args.firmware)
    input_path = Path(args.input)
    firmware = firmware_path.read_bytes()
    input_data = input_path.read_bytes()
    offset = int(args.offset, 0)
    info = read_partition_info(firmware, offset)

    nominal = int(args.nominal_output_size, 0) if args.nominal_output_size else None
    compressed = make_packed_block(
        firmware,
        input_data,
        offset,
        input_kind=args.input_kind,
        nominal_output_size=nominal,
        max_chain=args.max_chain,
        lazy=not args.no_lazy,
    )

    notes = []
    if compressed.packed_span > info.packed_span:
        notes.append("new packed block is larger than the original span; not safe for normal in-place patching")

    output_firmware = None
    if args.firmware_out:
        output_firmware = patch_firmware_image(
            firmware,
            compressed.packed_block,
            offset,
            allow_grow=args.allow_grow,
        )
        Path(args.firmware_out).write_bytes(output_firmware)

        # Verify by decoding the output firmware image and comparing raw bytes.
        verify = decode_packed_block(output_firmware, offset)
        expected_raw = input_data if args.input_kind == "raw" else undo_drive_branch_fixups(input_data)[0]
        if verify.raw != expected_raw:
            notes.append("VERIFY FAILED: output firmware decodes to different raw bytes")
        else:
            notes.append("verify: output firmware decodes back to the requested raw bytes")

    if args.packed_block_out:
        Path(args.packed_block_out).write_bytes(compressed.packed_block)
    if args.stream_out:
        Path(args.stream_out).write_bytes(compressed.stream)

    text, report = build_report(
        command="repack",
        firmware_path=firmware_path,
        firmware=firmware,
        info=info,
        compressed=compressed,
        input_path=input_path,
        input_data=input_data,
        output_firmware=output_firmware,
        notes=notes,
    )
    if args.report:
        write_reports(Path(args.report), text, report)
    print(text, end="")
    return 0


def cmd_roundtrip(args: argparse.Namespace) -> int:
    firmware_path = Path(args.firmware)
    firmware = firmware_path.read_bytes()
    offset = int(args.offset, 0)
    decoded = decode_packed_block(firmware, offset)
    compressed = make_packed_block(
        firmware,
        decoded.raw,
        offset,
        input_kind="raw",
        nominal_output_size=decoded.info.nominal_output_size,
        max_chain=args.max_chain,
        lazy=not args.no_lazy,
    )
    patched = patch_firmware_image(firmware, compressed.packed_block, offset, allow_grow=args.allow_grow)
    verify = decode_packed_block(patched, offset)

    notes = []
    notes.append("roundtrip: recompressed stream is not expected to match original bytes")
    if verify.raw == decoded.raw:
        notes.append("verify: recompressed firmware decodes to original raw bytes")
    else:
        notes.append("VERIFY FAILED: recompressed firmware does not decode to original raw bytes")
    if verify.ramstyle == decoded.ramstyle:
        notes.append("verify: recompressed firmware produces original RAM-style bytes after fixup")
    else:
        notes.append("VERIFY FAILED: recompressed RAM-style bytes differ")

    if args.firmware_out:
        Path(args.firmware_out).write_bytes(patched)

    text, report = build_report(
        command="roundtrip",
        firmware_path=firmware_path,
        firmware=firmware,
        info=decoded.info,
        decoded=decoded,
        compressed=compressed,
        output_firmware=patched if args.firmware_out else None,
        notes=notes,
    )
    if args.report:
        write_reports(Path(args.report), text, report)
    print(text, end="")
    return 0 if verify.raw == decoded.raw else 1


def build_arg_parser() -> argparse.ArgumentParser:
    parser = argparse.ArgumentParser(
        description="BU40N/MT1959 0x158000 packed block extractor/repacker",
        formatter_class=argparse.ArgumentDefaultsHelpFormatter,
    )
    sub = parser.add_subparsers(dest="command", required=True)

    p = sub.add_parser("extract", help="decompress the packed block and write reports")
    p.add_argument("firmware")
    p.add_argument("--offset", default=hex(DEFAULT_OFFSET))
    p.add_argument("--output-limit", type=lambda s: int(s, 0), default=None)
    p.add_argument("--raw-out", help="write pre-branch-fixup raw decoded bytes")
    p.add_argument("--ramstyle-out", help="write post-branch-fixup RAM-style bytes")
    p.add_argument("--report", help="report basename; writes .txt and .json")
    p.set_defaults(func=cmd_extract)

    p = sub.add_parser("repack", help="compress a decompressed image and optionally patch it into firmware")
    p.add_argument("firmware", help="target firmware providing tables and insertion point")
    p.add_argument("input", help="decompressed raw or RAM-style image to compress")
    p.add_argument("--offset", default=hex(DEFAULT_OFFSET))
    p.add_argument("--input-kind", choices=["raw", "ramstyle"], default="raw")
    p.add_argument("--nominal-output-size", help="override header[0], e.g. 0x72464")
    p.add_argument("--firmware-out", help="write patched firmware image")
    p.add_argument("--packed-block-out", help="write replacement packed block only")
    p.add_argument("--stream-out", help="write compressed stream only")
    p.add_argument("--allow-grow", action="store_true", help="allow replacement block to exceed original packed span")
    p.add_argument("--max-chain", type=int, default=256, help="LZ search depth; higher is slower but may compress better")
    p.add_argument("--no-lazy", action="store_true", help="disable one-symbol lazy matching")
    p.add_argument("--report", help="report basename; writes .txt and .json")
    p.set_defaults(func=cmd_repack)

    p = sub.add_parser("roundtrip", help="extract, recompress, patch in memory, and verify exact decode")
    p.add_argument("firmware")
    p.add_argument("--offset", default=hex(DEFAULT_OFFSET))
    p.add_argument("--firmware-out", help="optionally write the round-tripped firmware image")
    p.add_argument("--allow-grow", action="store_true")
    p.add_argument("--max-chain", type=int, default=256)
    p.add_argument("--no-lazy", action="store_true")
    p.add_argument("--report", help="report basename; writes .txt and .json")
    p.set_defaults(func=cmd_roundtrip)

    return parser


def main(argv: Optional[Sequence[str]] = None) -> int:
    parser = build_arg_parser()
    args = parser.parse_args(argv)
    return args.func(args)


if __name__ == "__main__":
    raise SystemExit(main())
Post Reply