Files
Geroi-Kodeksa/Gossips-Misc-Hard-main/solver_blackbox.py
2026-03-02 21:44:22 +03:00

668 lines
20 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
import argparse
import base64
import hashlib
import json
import math
import time
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Sequence, Tuple
import requests
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
from cryptography.hazmat.primitives import serialization
# ---------------- Protocol constants ----------------
KIND_ANSWER = "ANSWER"
FLAG_QUESTION_TEXT = "Почему слоник зеленый?"
ANSWER_ALGO = "rsa_vdf_v1"
QUESTION_ALGO = "xs64_v1"
KIND_QUESTION = "QUESTION"
KIND_ELECTION_Q = "ELECTION_QUESTION"
KIND_ELECTION_A = "ELECTION_ANSWER"
NGRAM_LEN = 12 # fixed by task
# ---------------- Small helpers ----------------
def canonical_json(obj: Dict[str, Any]) -> bytes:
return json.dumps(obj, ensure_ascii=False, separators=(",", ":"), sort_keys=True).encode("utf-8")
def canonical_msg_without_sig(msg: Dict[str, Any]) -> bytes:
d = {k: msg[k] for k in msg.keys() if k != "sig"}
return canonical_json(d)
def b64e(b: bytes) -> str:
return base64.b64encode(b).decode("ascii")
def b64d(s: str) -> bytes:
return base64.b64decode(s.encode("ascii"), validate=True)
def kid_from_pk_sha256_8_hex(pk_bytes: bytes) -> str:
return hashlib.sha256(pk_bytes).digest()[:8].hex()
def kid16_from_kid(kid_hex: str) -> int:
kid_hex = kid_hex.strip().lower()
if kid_hex.startswith("0x"):
kid_hex = kid_hex[2:]
if len(kid_hex) < 4:
return 0
return int(kid_hex[-4:], 16) & 0xFFFF
def canon_q(question_text: str) -> str:
return " ".join(str(question_text).strip().split())
def u16be(x: int) -> bytes:
return (int(x) & 0xFFFF).to_bytes(2, "big")
def sha256(data: bytes) -> bytes:
return hashlib.sha256(data).digest()
def seed_to_x(question_text: str, epoch: int, kid_hex: str, N: int) -> int:
kid16 = kid16_from_kid(kid_hex)
q_can = canon_q(question_text)
seed_bytes = sha256(b"vdf|" + u16be(epoch) + u16be(kid16) + q_can.encode("utf-8"))
x = int.from_bytes(seed_bytes, "big") % N
while math.gcd(x, N) != 1:
x = (x + 1) % N
return x
def crt(y_p: int, y_q: int, p: int, q: int) -> int:
inv_p = pow(p % q, -1, q)
t = ((y_q - y_p) % q) * inv_p % q
return y_p + p * t
def vdf_fast(x: int, p: int, q: int, T: int) -> int:
N = p * q
e_p = pow(2, T, p - 1)
e_q = pow(2, T, q - 1)
y_p = pow(x % p, e_p, p)
y_q = pow(x % q, e_q, q)
y = crt(y_p, y_q, p, q)
return y % N
def y_to_idx(y: int, N: int, dict_size: int) -> int:
if dict_size <= 0 or (dict_size & (dict_size - 1)) != 0:
raise ValueError("dict_size must be power of two")
lenN = (N.bit_length() + 7) // 8
h = sha256(int(y).to_bytes(lenN, "big"))
h32 = int.from_bytes(h[:4], "little")
return h32 & (dict_size - 1)
# ---------------- Book 12-gram source ----------------
@dataclass
class BookNGram:
tokens: List[str]
dict_size: int
@staticmethod
def load(book_path: str, *, encoding: str = "koi8_r") -> "BookNGram":
raw = open(book_path, "rb").read()
text = raw.decode(encoding, errors="strict")
text = text.replace("\x14", "").replace("\x15", "")
tokens = text.split()
if len(tokens) < NGRAM_LEN:
raise RuntimeError("book too short")
ngram_count = len(tokens) - NGRAM_LEN + 1
dict_size = 1 << (ngram_count.bit_length() - 1)
return BookNGram(tokens=tokens, dict_size=dict_size)
def lookup(self, idx: int) -> str:
return " ".join(self.tokens[idx: idx + NGRAM_LEN])
def expected_answer_fast(
question_text: str,
epoch: int,
kid_hex: str,
book: BookNGram,
p: int,
q: int,
T: int,
) -> str:
N = p * q
x = seed_to_x(question_text, epoch, kid_hex, N)
y = vdf_fast(x, p, q, T)
idx = y_to_idx(y, N, book.dict_size)
return book.lookup(idx)
# ---------------- xorshift32 + prime gen (same as service) ----------------
class XorShift32:
__slots__ = ("state",)
def __init__(self, state: int):
self.state = int(state) & 0xFFFFFFFF
if self.state == 0:
self.state = 1
def next_u32(self) -> int:
x = self.state
x ^= (x << 13) & 0xFFFFFFFF
x ^= (x >> 17) & 0xFFFFFFFF
x ^= (x << 5) & 0xFFFFFFFF
self.state = x & 0xFFFFFFFF
return self.state
def next_u64(self) -> int:
hi = self.next_u32()
lo = self.next_u32()
return ((hi << 32) | lo) & 0xFFFFFFFFFFFFFFFF
_SMALL_PRIMES = (
3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59,
61, 67, 71, 73, 79, 83, 89, 97,
)
def is_probable_prime(n: int, rounds: int = 32) -> bool:
n = int(n)
if n < 2:
return False
if n in (2, 3):
return True
if n % 2 == 0:
return False
for p in _SMALL_PRIMES:
if n == p:
return True
if n % p == 0:
return False
d = n - 1
s = 0
while (d & 1) == 0:
d >>= 1
s += 1
for i in range(int(rounds)):
a = 2 + i
if a >= n - 1:
a = 2 + (a % (n - 3))
x = pow(a, d, n)
if x == 1 or x == n - 1:
continue
for _ in range(s - 1):
x = (x * x) % n
if x == n - 1:
break
else:
return False
return True
def cand_from_rng_1024(rng) -> int:
parts = [rng.next_u64() for _ in range(16)]
b = b"".join(int(x).to_bytes(8, "big") for x in parts)
cand = int.from_bytes(b, "big")
cand |= (1 << 1023)
cand |= 1
return cand
def generate_prime_1024(rng) -> int:
cand = cand_from_rng_1024(rng)
while not is_probable_prime(cand):
cand += 2
return cand
class XorShift64:
__slots__ = ("state",)
def __init__(self, state: int):
self.state = int(state) & 0xFFFFFFFFFFFFFFFF
if self.state == 0:
self.state = 1
def next_u64(self) -> int:
x = self.state
x ^= (x << 13) & 0xFFFFFFFFFFFFFFFF
x ^= (x >> 7) & 0xFFFFFFFFFFFFFFFF
x ^= (x << 17) & 0xFFFFFFFFFFFFFFFF
self.state = x & 0xFFFFFFFFFFFFFFFF
return self.state
def master_seed32_from_secret(secret: str) -> int:
h = hashlib.sha256(secret.encode("utf-8")).digest()
seed32 = int.from_bytes(h[:4], "little") & 0xFFFFFFFF
if seed32 == 0:
seed32 = 1
return seed32
def master_seed64_from_secret(secret: str) -> int:
h = hashlib.sha256(secret.encode("utf-8")).digest()
seed64 = int.from_bytes(h[:8], "little") & 0xFFFFFFFFFFFFFFFF
if seed64 == 0:
seed64 = 1
return seed64
def generate_vdf_params(master_seed64: int, T: int) -> Tuple[int, int, int]:
rng = XorShift64(master_seed64)
p = generate_prime_1024(rng)
q = generate_prime_1024(rng)
while q == p:
q = generate_prime_1024(rng)
N = p * q
return p, q, N
# ---------------- Question parsing + linear recovery (64-bit xorshift) ----------------
@dataclass
class Vocab:
W: List[str]
N: List[str]
A: List[str]
E: List[str]
@staticmethod
def empty() -> "Vocab":
return Vocab(W=[], N=[], A=[], E=[])
def parse_question_words(q: str) -> Optional[Tuple[str, str, str, str]]:
# Expect "W N A E?" exactly 4 words
parts = q.strip().split()
if len(parts) != 4:
return None
w, n, a, e = parts
if not e.endswith("?"):
return None
e = e[:-1]
w = w.lower()
n = n.lower()
a = a.lower()
e = e.lower()
return w, n, a, e
def build_vocab_from_questions(questions: Sequence[str]) -> Vocab:
ws, ns, a_s, es = set(), set(), set(), set()
for q in questions:
parsed = parse_question_words(q)
if not parsed:
continue
w, n, a, e = parsed
ws.add(w); ns.add(n); a_s.add(a); es.add(e)
# Protocol guarantees 16 each once we've seen enough traffic
return Vocab(W=sorted(ws), N=sorted(ns), A=sorted(a_s), E=sorted(es))
def question_to_obs4(q: str, vocab: Vocab) -> Optional[Tuple[int, int, int, int]]:
parsed = parse_question_words(q)
if not parsed:
return None
w, n, a, e = parsed
try:
return (vocab.W.index(w), vocab.N.index(n), vocab.A.index(a), vocab.E.index(e))
except ValueError:
return None
def xs64_next(state: int) -> int:
x = state & 0xFFFFFFFFFFFFFFFF
x ^= (x << 13) & 0xFFFFFFFFFFFFFFFF
x ^= (x >> 7) & 0xFFFFFFFFFFFFFFFF
x ^= (x << 17) & 0xFFFFFFFFFFFFFFFF
return x & 0xFFFFFFFFFFFFFFFF
def build_linear_system_from_obs(obs: List[int], steps: int) -> Tuple[List[int], List[int]]:
MASK = 0xFFFFFFFFFFFFFFFF
def xs64_step_masks(m: int) -> int:
x = m
x ^= (x << 13) & MASK
x ^= (x >> 7) & MASK
x ^= (x << 17) & MASK
return x & MASK
rows: List[int] = []
rhs: List[int] = []
vecs = [1 << k for k in range(64)]
for t in range(steps):
vecs = [xs64_step_masks(v) for v in vecs]
nib = obs[t] & 0xF
for bit in range(4):
row = 0
for k in range(64):
if (vecs[k] >> bit) & 1:
row |= 1 << k
rows.append(row)
rhs.append((nib >> bit) & 1)
return rows, rhs
def gf2_gauss_elim(rows: List[int], rhs: List[int]) -> Optional[int]:
"""
Solve A x = b over GF(2) for 64 vars, return one solution as 64-bit int.
"""
n = 64
m = len(rows)
rows = rows[:]
rhs = rhs[:]
pivot_row_for_col = [-1] * n
r = 0
for c in range(n):
piv = None
for i in range(r, m):
if (rows[i] >> c) & 1:
piv = i
break
if piv is None:
continue
rows[r], rows[piv] = rows[piv], rows[r]
rhs[r], rhs[piv] = rhs[piv], rhs[r]
pivot_row_for_col[c] = r
for i in range(m):
if i != r and ((rows[i] >> c) & 1):
rows[i] ^= rows[r]
rhs[i] ^= rhs[r]
r += 1
if r == m:
break
for i in range(m):
if rows[i] == 0 and rhs[i] == 1:
return None
x = 0
for c in range(n - 1, -1, -1):
pr = pivot_row_for_col[c]
if pr == -1:
continue
s = rhs[pr]
row = rows[pr]
for j in range(c + 1, n):
if (row >> j) & 1:
s ^= (x >> j) & 1
if s & 1:
x |= 1 << c
return x & 0xFFFFFFFFFFFFFFFF
def recover_epoch_seed64_from_obs4(obs4: List[Tuple[int, int, int, int]]) -> Optional[int]:
"""
For each question we observe 4 consecutive xs64 outputs low 4 bits.
"""
obs_nibbles: List[int] = []
for o in obs4:
obs_nibbles.extend([o[0], o[1], o[2], o[3]])
obs_nibbles = obs_nibbles[:200] # 50 questions * 4
if len(obs_nibbles) < 64:
return None
rows, rhs = build_linear_system_from_obs(obs_nibbles, steps=len(obs_nibbles))
sol = gf2_gauss_elim(rows, rhs)
return sol
# ---------------- Feed client / play loop ----------------
class Player:
def __init__(self, feed_url: str, book: BookNGram, play: bool):
self.feed_url = feed_url.rstrip("/")
self.book = book
self.play = play
self.session = requests.Session()
# Generate an Ed25519 keypair for signing messages
self.sk = Ed25519PrivateKey.generate()
self.pk = self.sk.public_key().public_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PublicFormat.Raw,
)
self.kid = kid_from_pk_sha256_8_hex(self.pk)
self.pk_b64 = b64e(self.pk)
# Learned data
self.vocab: Optional[Vocab] = None
self.master_seed64: Optional[int] = None
self.master_seed32: Optional[int] = None
self.vdf_p: Optional[int] = None
self.vdf_q: Optional[int] = None
self.vdf_N: Optional[int] = None
self.vdf_T: Optional[int] = None
self.answered_election_qids: set[int] = set()
self._last_epoch_seen: Optional[int] = None
self._flag_sent_epoch: Optional[int] = None
self._flag_qid: Optional[int] = None
self._all_observed_questions: List[str] = [] # accumulate across epochs for vocab
def get_status(self) -> Dict[str, Any]:
r = self.session.get(self.feed_url + "/status", timeout=5)
r.raise_for_status()
return r.json()
def get_feed(self) -> List[Dict[str, Any]]:
r = self.session.get(self.feed_url + "/feed", timeout=10)
r.raise_for_status()
return r.json()
def post(self, *, kind: str, epoch: int, qid: int, text: str) -> None:
msg = {
"ts": int(time.time()),
"kind": kind,
"kid": self.kid,
"epoch": int(epoch),
"qid": int(qid),
"text": text,
"pk": self.pk_b64,
"sig": "",
}
sig = self.sk.sign(canonical_msg_without_sig(msg))
msg["sig"] = b64e(sig)
r = self.session.post(self.feed_url + "/feed", json=msg, timeout=10)
r.raise_for_status()
def try_learn_and_break(self, epoch: int, leader_kid: str, feed: List[Dict[str, Any]]) -> None:
qs = [str(m.get("text", "")) for m in feed if m.get("kind") == KIND_QUESTION and m.get("kid") == leader_kid]
if len(qs) < 20:
return
if self.vocab is None:
# Accumulate questions across epochs for vocab discovery
self._all_observed_questions.extend(qs)
v = build_vocab_from_questions(self._all_observed_questions)
if len(v.W) == 16 and len(v.N) == 16 and len(v.A) == 16 and len(v.E) == 16:
self.vocab = v
print("[solver] learned vocab sets (16 each)")
if self.vocab is None:
return
obs4 = []
for q in qs:
o = question_to_obs4(q, self.vocab)
if o is not None:
obs4.append(o)
if self.master_seed64 is None:
seed_epoch64 = recover_epoch_seed64_from_obs4(obs4)
if seed_epoch64 is None:
return
self.master_seed64 = seed_epoch64 ^ (epoch & 0xFFFFFFFFFFFFFFFF)
self.master_seed32 = self.master_seed64 & 0xFFFFFFFF
print(f"[solver] recovered master_seed64 = {self.master_seed64} (0x{self.master_seed64:016x})")
print(f"[solver] derived master_seed32 = {self.master_seed32} (0x{self.master_seed32:08x})")
if self.master_seed64 is not None and self.vdf_T is not None and self.vdf_N is not None and self.vdf_p is None:
p, q, N = generate_vdf_params(self.master_seed64, self.vdf_T)
if N != self.vdf_N:
print("[solver] generated N does not match /status (wrong seed?)")
return
self.vdf_p = p
self.vdf_q = q
print("[solver] factored vdf_N via regeneration: OK")
def maybe_ask_flag_question(self, epoch: int, mode: str, leader: str, feed: List[Dict[str, Any]]) -> None:
if not self.play:
return
if mode != "CHAT":
return
if leader != self.kid:
return
if self._flag_sent_epoch == epoch:
return
# выбрать следующий свободный qid для моих QUESTION в этой эпохе
my_qids = []
for m in feed:
try:
if int(m.get("epoch", -1)) != int(epoch):
continue
if str(m.get("kind")) != KIND_QUESTION:
continue
if str(m.get("kid")) != self.kid:
continue
my_qids.append(int(m.get("qid", -1)))
except Exception:
continue
qid = (max(my_qids) + 1) if my_qids else 0
self.post(kind=KIND_QUESTION, epoch=epoch, qid=qid, text=FLAG_QUESTION_TEXT)
self._flag_sent_epoch = epoch
self._flag_qid = qid
print(f"[solver] asked flag question qid={qid}")
def maybe_print_flag_answer(self, epoch: int, feed: List[Dict[str, Any]]) -> None:
if self._flag_sent_epoch != epoch or self._flag_qid is None:
return
for m in feed:
try:
if int(m.get("epoch", -1)) != int(epoch):
continue
if str(m.get("kind")) != KIND_ANSWER:
continue
if int(m.get("qid", -1)) != int(self._flag_qid):
continue
txt = str(m.get("text", ""))
except Exception:
continue
if txt.startswith("caplag{"):
print(f"[solver] FLAG: {txt}")
return
def play_showtime(self, epoch: int, mode: str, feed: List[Dict[str, Any]]) -> None:
if not self.play or mode != "ELECTION":
return
if self.vdf_p is None or self.vdf_q is None or self.vdf_T is None:
return
# Собрать все election questions текущей epoch
qs = []
for m in feed:
if m.get("kind") == KIND_ELECTION_Q and int(m.get("epoch", -1)) == int(epoch):
qid = int(m.get("qid", -1))
if qid >= 0:
qs.append((qid, str(m.get("text", ""))))
for qid, q_text in qs:
if qid in self.answered_election_qids:
continue
ans = expected_answer_fast(q_text, epoch, self.kid, self.book, self.vdf_p, self.vdf_q, self.vdf_T)
self.post(kind=KIND_ELECTION_A, epoch=epoch, qid=qid, text=ans)
self.answered_election_qids.add(qid)
def run_forever(self) -> None:
print(f"[solver] kid={self.kid} book.dict_size={self.book.dict_size}")
while True:
try:
st = self.get_status()
epoch = int(st["epoch"])
mode = str(st["mode"])
leader = str(st["leader_kid"]).lower()
if self._last_epoch_seen is None or epoch != self._last_epoch_seen:
self.answered_election_qids.clear()
self._last_epoch_seen = epoch
if str(st.get("answer_algo")) != ANSWER_ALGO:
raise RuntimeError(f"unexpected answer_algo: {st.get('answer_algo')}")
if str(st.get("question_algo")) != QUESTION_ALGO:
raise RuntimeError(f"unexpected question_algo: {st.get('question_algo')}")
self.vdf_N = int(st["vdf_N_hex"], 16)
self.vdf_T = int(st["vdf_T"])
# Ensure our local book-derived answer space matches the service
if "ngram_len" in st and int(st["ngram_len"]) != NGRAM_LEN:
raise RuntimeError(f"unexpected ngram_len: {st.get('ngram_len')}")
if "answer_space" in st:
svc_space = int(st["answer_space"])
if svc_space != self.book.dict_size:
max_space = len(self.book.tokens) - NGRAM_LEN + 1
if svc_space <= max_space and (svc_space & (svc_space - 1)) == 0:
print(f"[solver] overriding local dict_size {self.book.dict_size} -> {svc_space} from /status")
self.book.dict_size = svc_space
else:
raise RuntimeError("service answer_space is incompatible with local book")
feed = self.get_feed()
self.maybe_ask_flag_question(epoch, mode, leader, feed)
self.maybe_print_flag_answer(epoch, feed)
self.try_learn_and_break(epoch, leader, feed)
self.play_showtime(epoch, mode, feed)
if leader == self.kid:
print("[solver] I am the leader now!")
except Exception as e:
print("[solver] error:", repr(e))
sleep = 0.02 if mode == "ELECTION" else 0.25 # (не обязательно, но так чуть быстрее)
time.sleep(sleep)
def main() -> None:
ap = argparse.ArgumentParser()
ap.add_argument("--feed", required=True, help="Feed URL, e.g. http://127.0.0.1:9000")
ap.add_argument("--book", required=True, help="Path to KOI8-R book file")
ap.add_argument("--book-encoding", default="koi8_r")
ap.add_argument("--play", action="store_true", help="Actively answer election questions")
args = ap.parse_args()
book = BookNGram.load(args.book, encoding=args.book_encoding)
Player(args.feed, book, play=bool(args.play)).run_forever()
if __name__ == "__main__":
main()