Files
Tajna-tretej-stolicy/stego-china-owner/solve/decode_timing.py
2026-04-22 10:58:32 +03:00

124 lines
3.7 KiB
Python

#!/usr/bin/env python3
from __future__ import annotations
from collections import defaultdict
from datetime import datetime
from pathlib import Path
INPUT_PATH = Path(__file__).resolve().parents[1] / "public" / "hormuz_feed.nmea"
TIMING_OFFSET = 280
AIS_TEXT_TABLE = '@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]^_ !"#$%&\'()*+,-./0123456789:;<=>?'
def armor_to_sixbit(char: str) -> int:
value = ord(char) - 48
if value > 40:
value -= 8
return value
def payload_to_bits(payload: str, fill_bits: int) -> str:
bits = "".join(f"{armor_to_sixbit(char):06b}" for char in payload)
return bits[:-fill_bits] if fill_bits else bits
def bits_to_int(bits: str) -> int:
return int(bits, 2)
def bits_to_signed(bits: str) -> int:
value = int(bits, 2)
if bits and bits[0] == "1":
value -= 1 << len(bits)
return value
def decode_text(bits: str) -> str:
chars = []
for index in range(0, len(bits), 6):
chars.append(AIS_TEXT_TABLE[int(bits[index:index + 6], 2)])
return "".join(chars).rstrip("@ ").strip()
def parse_messages(path: Path) -> list[tuple[datetime, str]]:
fragments: dict[tuple[str, str, str], dict[int, str]] = {}
fragment_totals: dict[tuple[str, str, str], int] = {}
fill_bits: dict[tuple[str, str, str], int] = {}
completed: list[tuple[datetime, str]] = []
for raw_line in path.read_text(encoding="ascii").splitlines():
if not raw_line:
continue
timestamp_text, sentence = raw_line.split(" ", 1)
timestamp = datetime.fromisoformat(timestamp_text.replace("Z", "+00:00"))
body, checksum = sentence[1:].split("*", 1)
fields = body.split(",")
total = int(fields[1])
index = int(fields[2])
seq_id = fields[3] or "-"
channel = fields[4]
payload = fields[5]
fill = int(fields[6])
key = (timestamp_text, channel, seq_id)
if total == 1:
completed.append((timestamp, payload_to_bits(payload, fill)))
continue
fragments.setdefault(key, {})[index] = payload
fragment_totals[key] = total
fill_bits[key] = fill
if len(fragments[key]) == total:
merged = "".join(fragments[key][part] for part in range(1, total + 1))
completed.append((timestamp, payload_to_bits(merged, fill_bits[key])))
del fragments[key]
del fragment_totals[key]
del fill_bits[key]
return sorted(completed, key=lambda item: item[0])
def decode_type5(bits: str) -> tuple[int, str] | None:
if bits_to_int(bits[0:6]) != 5:
return None
mmsi = bits_to_int(bits[8:38])
destination = decode_text(bits[302:422])
return mmsi, destination
def recover_flag(path: Path) -> str:
per_mmsi: dict[int, list[tuple[datetime, str]]] = defaultdict(list)
for timestamp, bits in parse_messages(path):
decoded = decode_type5(bits)
if decoded is None:
continue
mmsi, destination = decoded
per_mmsi[mmsi].append((timestamp, destination))
suspects = {
mmsi: rows
for mmsi, rows in per_mmsi.items()
if sum("CHINA OWNER" in destination for _, destination in rows) >= 4
}
if len(suspects) != 1:
raise RuntimeError(f"Expected exactly one suspect MMSI, got {list(suspects)}")
target_rows = next(iter(suspects.values()))
target_rows.sort(key=lambda item: item[0])
decoded_chars = []
for previous, current in zip(target_rows, target_rows[1:]):
delta = int((current[0] - previous[0]).total_seconds())
decoded_chars.append(chr(delta - TIMING_OFFSET))
return "".join(decoded_chars)
def main() -> None:
flag = recover_flag(INPUT_PATH)
print(flag)
if __name__ == "__main__":
main()