124 lines
3.7 KiB
Python
124 lines
3.7 KiB
Python
#!/usr/bin/env python3
|
|
from __future__ import annotations
|
|
|
|
from collections import defaultdict
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
|
|
INPUT_PATH = Path(__file__).resolve().parents[1] / "public" / "hormuz_feed.nmea"
|
|
TIMING_OFFSET = 280
|
|
AIS_TEXT_TABLE = '@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]^_ !"#$%&\'()*+,-./0123456789:;<=>?'
|
|
|
|
|
|
def armor_to_sixbit(char: str) -> int:
|
|
value = ord(char) - 48
|
|
if value > 40:
|
|
value -= 8
|
|
return value
|
|
|
|
|
|
def payload_to_bits(payload: str, fill_bits: int) -> str:
|
|
bits = "".join(f"{armor_to_sixbit(char):06b}" for char in payload)
|
|
return bits[:-fill_bits] if fill_bits else bits
|
|
|
|
|
|
def bits_to_int(bits: str) -> int:
|
|
return int(bits, 2)
|
|
|
|
|
|
def bits_to_signed(bits: str) -> int:
|
|
value = int(bits, 2)
|
|
if bits and bits[0] == "1":
|
|
value -= 1 << len(bits)
|
|
return value
|
|
|
|
|
|
def decode_text(bits: str) -> str:
|
|
chars = []
|
|
for index in range(0, len(bits), 6):
|
|
chars.append(AIS_TEXT_TABLE[int(bits[index:index + 6], 2)])
|
|
return "".join(chars).rstrip("@ ").strip()
|
|
|
|
|
|
def parse_messages(path: Path) -> list[tuple[datetime, str]]:
|
|
fragments: dict[tuple[str, str, str], dict[int, str]] = {}
|
|
fragment_totals: dict[tuple[str, str, str], int] = {}
|
|
fill_bits: dict[tuple[str, str, str], int] = {}
|
|
completed: list[tuple[datetime, str]] = []
|
|
|
|
for raw_line in path.read_text(encoding="ascii").splitlines():
|
|
if not raw_line:
|
|
continue
|
|
timestamp_text, sentence = raw_line.split(" ", 1)
|
|
timestamp = datetime.fromisoformat(timestamp_text.replace("Z", "+00:00"))
|
|
body, checksum = sentence[1:].split("*", 1)
|
|
fields = body.split(",")
|
|
total = int(fields[1])
|
|
index = int(fields[2])
|
|
seq_id = fields[3] or "-"
|
|
channel = fields[4]
|
|
payload = fields[5]
|
|
fill = int(fields[6])
|
|
key = (timestamp_text, channel, seq_id)
|
|
|
|
if total == 1:
|
|
completed.append((timestamp, payload_to_bits(payload, fill)))
|
|
continue
|
|
|
|
fragments.setdefault(key, {})[index] = payload
|
|
fragment_totals[key] = total
|
|
fill_bits[key] = fill
|
|
|
|
if len(fragments[key]) == total:
|
|
merged = "".join(fragments[key][part] for part in range(1, total + 1))
|
|
completed.append((timestamp, payload_to_bits(merged, fill_bits[key])))
|
|
del fragments[key]
|
|
del fragment_totals[key]
|
|
del fill_bits[key]
|
|
|
|
return sorted(completed, key=lambda item: item[0])
|
|
|
|
|
|
def decode_type5(bits: str) -> tuple[int, str] | None:
|
|
if bits_to_int(bits[0:6]) != 5:
|
|
return None
|
|
mmsi = bits_to_int(bits[8:38])
|
|
destination = decode_text(bits[302:422])
|
|
return mmsi, destination
|
|
|
|
|
|
def recover_flag(path: Path) -> str:
|
|
per_mmsi: dict[int, list[tuple[datetime, str]]] = defaultdict(list)
|
|
for timestamp, bits in parse_messages(path):
|
|
decoded = decode_type5(bits)
|
|
if decoded is None:
|
|
continue
|
|
mmsi, destination = decoded
|
|
per_mmsi[mmsi].append((timestamp, destination))
|
|
|
|
suspects = {
|
|
mmsi: rows
|
|
for mmsi, rows in per_mmsi.items()
|
|
if sum("CHINA OWNER" in destination for _, destination in rows) >= 4
|
|
}
|
|
if len(suspects) != 1:
|
|
raise RuntimeError(f"Expected exactly one suspect MMSI, got {list(suspects)}")
|
|
|
|
target_rows = next(iter(suspects.values()))
|
|
target_rows.sort(key=lambda item: item[0])
|
|
decoded_chars = []
|
|
for previous, current in zip(target_rows, target_rows[1:]):
|
|
delta = int((current[0] - previous[0]).total_seconds())
|
|
decoded_chars.append(chr(delta - TIMING_OFFSET))
|
|
return "".join(decoded_chars)
|
|
|
|
|
|
def main() -> None:
|
|
flag = recover_flag(INPUT_PATH)
|
|
print(flag)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|