#!/usr/bin/env python3 from __future__ import annotations from collections import defaultdict from datetime import datetime from pathlib import Path INPUT_PATH = Path(__file__).resolve().parents[1] / "public" / "hormuz_feed.nmea" TIMING_OFFSET = 280 AIS_TEXT_TABLE = '@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]^_ !"#$%&\'()*+,-./0123456789:;<=>?' def armor_to_sixbit(char: str) -> int: value = ord(char) - 48 if value > 40: value -= 8 return value def payload_to_bits(payload: str, fill_bits: int) -> str: bits = "".join(f"{armor_to_sixbit(char):06b}" for char in payload) return bits[:-fill_bits] if fill_bits else bits def bits_to_int(bits: str) -> int: return int(bits, 2) def bits_to_signed(bits: str) -> int: value = int(bits, 2) if bits and bits[0] == "1": value -= 1 << len(bits) return value def decode_text(bits: str) -> str: chars = [] for index in range(0, len(bits), 6): chars.append(AIS_TEXT_TABLE[int(bits[index:index + 6], 2)]) return "".join(chars).rstrip("@ ").strip() def parse_messages(path: Path) -> list[tuple[datetime, str]]: fragments: dict[tuple[str, str, str], dict[int, str]] = {} fragment_totals: dict[tuple[str, str, str], int] = {} fill_bits: dict[tuple[str, str, str], int] = {} completed: list[tuple[datetime, str]] = [] for raw_line in path.read_text(encoding="ascii").splitlines(): if not raw_line: continue timestamp_text, sentence = raw_line.split(" ", 1) timestamp = datetime.fromisoformat(timestamp_text.replace("Z", "+00:00")) body, checksum = sentence[1:].split("*", 1) fields = body.split(",") total = int(fields[1]) index = int(fields[2]) seq_id = fields[3] or "-" channel = fields[4] payload = fields[5] fill = int(fields[6]) key = (timestamp_text, channel, seq_id) if total == 1: completed.append((timestamp, payload_to_bits(payload, fill))) continue fragments.setdefault(key, {})[index] = payload fragment_totals[key] = total fill_bits[key] = fill if len(fragments[key]) == total: merged = "".join(fragments[key][part] for part in range(1, total + 1)) completed.append((timestamp, payload_to_bits(merged, fill_bits[key]))) del fragments[key] del fragment_totals[key] del fill_bits[key] return sorted(completed, key=lambda item: item[0]) def decode_type5(bits: str) -> tuple[int, str] | None: if bits_to_int(bits[0:6]) != 5: return None mmsi = bits_to_int(bits[8:38]) destination = decode_text(bits[302:422]) return mmsi, destination def recover_flag(path: Path) -> str: per_mmsi: dict[int, list[tuple[datetime, str]]] = defaultdict(list) for timestamp, bits in parse_messages(path): decoded = decode_type5(bits) if decoded is None: continue mmsi, destination = decoded per_mmsi[mmsi].append((timestamp, destination)) suspects = { mmsi: rows for mmsi, rows in per_mmsi.items() if sum("CHINA OWNER" in destination for _, destination in rows) >= 4 } if len(suspects) != 1: raise RuntimeError(f"Expected exactly one suspect MMSI, got {list(suspects)}") target_rows = next(iter(suspects.values())) target_rows.sort(key=lambda item: item[0]) decoded_chars = [] for previous, current in zip(target_rows, target_rows[1:]): delta = int((current[0] - previous[0]).total_seconds()) decoded_chars.append(chr(delta - TIMING_OFFSET)) return "".join(decoded_chars) def main() -> None: flag = recover_flag(INPUT_PATH) print(flag) if __name__ == "__main__": main()