#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ forge_repo_staging.py — генерирует публичный репозиторий с 500+ коммитами. Строка mesh: "OH-21B4" и base64-флаг встречаются РОВНО в одном коммите ветки `staging/router-mesh`. В main их нет (ветка смёржена через --squash). Можно сразу запушить: python3 forge_repo_staging.py --push Или задать автора: --author-name "Имя" --author-email you@example.com """ import os, sys, json, random, subprocess, shutil, argparse, base64, re from datetime import datetime, timedelta, timezone # -------- параметры по умолчанию -------- REPO_NAME = "iot-suite-repo-mega-fixed" BRANCH_SECRET = "staging/router-mesh" FEATURE_BRANCHES = [ ("feature/ui-polish", 6), ("hotfix/diag-timeout", 3), ("refactor/router-cfg", 5), ] MAIN_COMMITS = 520 # шумовых коммитов в main TIME_START = datetime(2023, 3, 1, 10, 0, tzinfo=timezone(timedelta(hours=1))) STEP_MAIN = timedelta(hours=6) STEP_BRANCH = timedelta(hours=2) AUTHORS_DEFAULT = [ ("Natalie Orlov", "natalie@orbital.home"), ("Anton Lebedev", "anton@orbital.home"), ("Nina Petrova", "nina@orbital.home"), ("Mark Isaev", "mark@orbital.home"), ("QA Bot", "qa-bot@orbital.home"), ] NOISE_MESSAGES = [ "docs: update changelog", "chore: reformat", "ci: bump node action", "router: refactor logger", "kettle: tweak diag", "nas: session cleanup", "printer: queue cosmetics", "deps: bump ejs", "deps: bump express", "test: deflake", "build: docker cache", "perf: micro-opt", "style: trailing space", "docs: badges", ] FLAG_PLAINTEXT = "caplag{Orb1ta1-h0me-vendor-found-needed-branch-version}" FLAG_B64 = base64.b64encode(FLAG_PLAINTEXT.encode()).decode() def sh(args, cwd=None, env=None): return subprocess.run(args, cwd=cwd, env=env, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True).stdout.strip() def write(path, content, binary=False): os.makedirs(os.path.dirname(path), exist_ok=True) mode = "wb" if (binary or isinstance(content, (bytes, bytearray))) else "w" with open(path, mode, encoding=None if mode=="wb" else "utf-8") as f: f.write(content) BASE_README = """# Orbital Home — IoT Suite (public) Публичная версия кодовой базы для демонстраций: Router, Printer, Kettle, NAS. Документация и закрытые части вырезаны. """ # ---------------- router (публичный минимальный) ---------------- ROUTER_SERVER = """import express from 'express'; import fs from 'fs'; import path from 'path'; import { fileURLToPath } from 'url'; const __dirname = path.dirname(fileURLToPath(import.meta.url)); const app = express(); const PORT = process.env.PORT || 8085; const cfgPath = path.join(__dirname, 'config', 'router.json'); let cfg = JSON.parse(fs.readFileSync(cfgPath, 'utf-8')); app.get('/info_connection', (req,res)=>{ res.json({ devices: [ { name:'Printer', model:'PRN-3130', vendor: cfg.vendor, mesh: cfg.mesh, url: 'http://printer.local' }, { name:'Kettle', model:'KTL-74C1', vendor: cfg.vendor, mesh: cfg.mesh, url: 'http://kettle.local' }, { name:'NAS', model:'NAS-220', vendor: cfg.vendor, mesh: cfg.mesh, url: 'http://nas.local' } ]}); }); app.get('/', (req,res)=> res.type('text/plain').send('Router UI (public build)')); app.listen(PORT, ()=> console.log(`[router-public] listening on ${PORT}`)); """ ROUTER_PKG = { "name":"router-public","version":"0.3.0","type":"module", "scripts":{"start":"node server.js"}, "dependencies":{"express":"^4.19.2"} } # ---------------- printer (замаскированный) ---------------- PRINTER_SERVER_PUBLIC = """import express from 'express'; import fs from 'fs'; import path from 'path'; import { fileURLToPath } from 'url'; const __dirname = path.dirname(fileURLToPath(import.meta.url)); const app = express(); const PORT = process.env.PORT || 8080; const QUEUE_DIR = path.join(__dirname, 'data', 'queue'); // : private build may include additional endpoints and auth app.get('/', (req,res)=> res.type('text/plain').send('Printer UI (public build)')); app.get('/jobs/preview', (req,res)=>{ const name = String(req.query.file||'').replace(/\\\\/g,'/'); if (!name || name.includes('..') || path.isAbsolute(name)) return res.status(400).send('bad path'); const abs = path.join(QUEUE_DIR, path.normalize(name)); if (!abs.startsWith(QUEUE_DIR)) return res.status(400).send('bad path'); if (!fs.existsSync(abs)) return res.status(404).send('not found'); fs.createReadStream(abs).pipe(res); }); app.get('/go', (req,res)=>{ const u = String(req.query.u||''); if (/^https?:\\/\\//i.test(u)) return res.redirect(302,u); res.redirect('/'); }); app.get('/robots.txt', (req,res)=> res.type('text/plain').send('User-agent: *\\nDisallow:\\n')); app.listen(PORT, ()=> console.log(`[printer-public] listening on ${PORT}`)); """ PRINTER_PKG_PUBLIC = { "name":"printer-public","version":"0.3.0","type":"module", "scripts":{"start":"node server.js"}, "dependencies":{"express":"^4.19.2"} } # ---------------- kettle (замаскированный) ---------------- KETTLE_SERVER_PUBLIC = """import express from 'express'; import path from 'path'; import { fileURLToPath } from 'url'; // import axios from 'axios'; // // import crypto from 'crypto'; // // const PRINTER_PROOF = process.env.PRINTER_PROOF; // // const KETTLE_NONCE = process.env.KETTLE_NONCE; // // const KETTLE_SHARED_KEY = process.env.KETTLE_SHARED_KEY; // const __dirname = path.dirname(fileURLToPath(import.meta.url)); const app = express(); const PORT = process.env.PORT || 8081; app.get('/', (req,res)=> res.redirect('/diagnostics')); app.get('/diagnostics', (req,res)=>{ // In private build this page exposes limited device info and triggers telemetry. res.type('text/plain').send('Kettle diagnostics (public build)'); }); app.post('/diagnostics/fetch', (req,res)=>{ // : route-signature-based fetch via internal gateway // key = (PRINTER_PROOF + ":" + KETTLE_NONCE).encode() //mac = hmac.new(key, (url + '\n' + ts).encode(), hashlib.sha256).hexdigest() res.status(501).type('text/plain').send('disabled in public build'); }); app.get('/assets/badge.svg', (req,res)=>{ res.type('image/svg+xml').send(''); }); app.listen(PORT, ()=> console.log(`[kettle-public] listening on ${PORT}`)); """ KETTLE_PKG_PUBLIC = { "name":"kettle-public","version":"0.3.0","type":"module", "scripts":{"start":"node server.js"}, "dependencies":{"express":"^4.19.2"} } # ---------------- nas (замаскированный) ---------------- NAS_SERVER_PUBLIC = """import express from 'express'; import path from 'path'; import { fileURLToPath } from 'url'; // import jwt from 'jsonwebtoken'; // // import crypto from 'crypto'; // // const JWT_SECRET = process.env.JWT_SECRET; // // const KETTLE_SHARED_KEY = process.env.KETTLE_SHARED_KEY; // const __dirname = path.dirname(fileURLToPath(import.meta.url)); const app = express(); const PORT = process.env.PORT || 5000; app.use(express.urlencoded({ extended:false })); app.use(express.json()); app.get('/', (req,res)=> res.redirect('/login')); app.get('/login', (req,res)=> res.type('text/plain').send('NAS login (public build)')); app.post('/login', (req,res)=> res.status(501).send('disabled in public build')); // admin_pass = hashlib.sha1((KETTLE_PROOF + ":" + PRINTER_PROOF).encode()).hexdigest()[:12] app.get('/internal/hint', (req,res)=>{ // internal gateway-only endpoint (requires shared key + route sign) res.status(403).type('text/plain').send('forbidden (public build)'); }); app.get('/admin/flag', (req,res)=> res.status(404).send('not found')); const auth = req.headers.authorization || ''; const token = auth.replace(/^Bearer\s+/,''); try{ const payload = jwt.verify(token, JWT_SECRET, { algorithms:['HS256'] }); if (payload.role !== 'admin') return res.status(403).send('need admin role'); if (payload.kpf !== KETTLE_PROOF) return res.status(403).send('incorrect kpf'); if (payload.aud !== 'nas-admin') return res.status(403).send('bad aud'); const now = Math.floor(Date.now()/1000); if (!payload.iat || Math.abs(now - payload.iat) > 120) return res.status(403).send('iat too old'); app.listen(PORT, ()=> console.log(`[nas-public] listening on ${PORT}`)); """ NAS_PKG_PUBLIC = { "name":"nas-public","version":"0.3.0","type":"module", "scripts":{"start":"node server.js"}, "dependencies":{"express":"^4.19.2"} } def commit(root, message, when, author): sh(["git","add","-A"], cwd=root) env = os.environ.copy() env.update({ "GIT_AUTHOR_NAME": author[0], "GIT_AUTHOR_EMAIL": author[1], "GIT_COMMITTER_NAME": author[0], "GIT_COMMITTER_EMAIL": author[1], "GIT_AUTHOR_DATE": when.isoformat(), "GIT_COMMITTER_DATE": when.isoformat(), }) sh(["git","commit","-m", message], cwd=root, env=env) def set_mesh(root, value): cfgp = os.path.join(root,"router/config/router.json") with open(cfgp,"r+",encoding="utf-8") as f: data = json.load(f) data["mesh"] = value f.seek(0); json.dump(data, f, indent=2); f.truncate() def add_vendor_note(root, b64text): p = os.path.join(root,"router/server.js") with open(p,"a",encoding="utf-8") as f: f.write(f"\n// vendor-note: {b64text}\n") def remove_vendor_note(root): p = os.path.join(root,"router/server.js") with open(p,"r",encoding="utf-8") as f: s = f.read() s = re.sub(r"\n?//\s*vendor-note:[^\n]*\n?", "\n", s, flags=re.IGNORECASE) with open(p,"w",encoding="utf-8") as f: f.write(s) def add_trailing_space_lines(path): with open(path, "r", encoding="utf-8") as f: lines = f.read().splitlines() with open(path, "w", encoding="utf-8") as f: for line in lines: f.write(line.rstrip(" ") + " \n") def strip_trailing_space_lines(path): with open(path, "r", encoding="utf-8") as f: lines = f.read().splitlines() with open(path, "w", encoding="utf-8") as f: for line in lines: f.write(line.rstrip(" ") + "\n") def snapshot_public_sources(root): for rel in ("kettle/server.js", "nas/server.js"): add_trailing_space_lines(os.path.join(root, rel)) def restore_public_sources(root): for rel in ("kettle/server.js", "nas/server.js"): strip_trailing_space_lines(os.path.join(root, rel)) def mutate_noise(root, i): choice = random.choice(["docs","router","printer","kettle","nas"]) if choice == "docs": p = os.path.join(root,"docs/CHANGELOG.md") with open(p,"a",encoding="utf-8") as f: f.write(f"- maintenance {i}\n") elif choice in ("router","printer","kettle","nas"): p = os.path.join(root, f"{choice}/server.js") if choice=="router" else os.path.join(root,f"{choice}/server.js") # лёгкая косметика-комментарий with open(p,"a",encoding="utf-8") as f: f.write(f"\n// note: minor tweak {i}\n") def init_repo(root, primary_author, authors): if os.path.exists(root): shutil.rmtree(root) os.makedirs(root, exist_ok=True) sh(["git","init","--initial-branch=main"], cwd=root) write(os.path.join(root,"README.md"), BASE_README) write(os.path.join(root,"docs/CHANGELOG.md"), "## Changelog (public)\n") # router write(os.path.join(root,"router/server.js"), ROUTER_SERVER) write(os.path.join(root,"router/package.json"), json.dumps(ROUTER_PKG, indent=2, ensure_ascii=False)) write(os.path.join(root,"router/config/router.json"), json.dumps({"vendor":"Orbital Home","mesh":"OH-2194"}, indent=2)) # printer (публичный, безопасный) write(os.path.join(root,"printer/server.js"), PRINTER_SERVER_PUBLIC) write(os.path.join(root,"printer/package.json"), json.dumps(PRINTER_PKG_PUBLIC, indent=2, ensure_ascii=False)) write(os.path.join(root,"printer/data/queue/readme.txt"), "public job\n") # kettle (публичный, урезанный) write(os.path.join(root,"kettle/server.js"), KETTLE_SERVER_PUBLIC) write(os.path.join(root,"kettle/package.json"), json.dumps(KETTLE_PKG_PUBLIC, indent=2, ensure_ascii=False)) # nas (публичный, урезанный) write(os.path.join(root,"nas/server.js"), NAS_SERVER_PUBLIC) write(os.path.join(root,"nas/package.json"), json.dumps(NAS_PKG_PUBLIC, indent=2, ensure_ascii=False)) commit(root, "init(public): scaffold modules", TIME_START, primary_author) def build_main(root, authors): t = TIME_START for i in range(1, MAIN_COMMITS+1): t = t + STEP_MAIN + timedelta(minutes=random.randint(0,59)) mutate_noise(root, i) commit(root, random.choice(NOISE_MESSAGES), t, random.choice(authors)) return t def merge_noff(root, branch_name, message, when, author): sh(["git","checkout","main"], cwd=root) sh(["git","merge","--no-ff", branch_name, "-m", message], cwd=root) def build_feature_branch(root, name, commits, base_time, authors): sh(["git","checkout","-b", name], cwd=root) t = base_time for i in range(commits): t = t + STEP_BRANCH + timedelta(minutes=random.randint(0,40)) mutate_noise(root, 5000+i) commit(root, f"{name}: routine update {i+1}", t, random.choice(authors)) merge_noff(root, name, f"merge: {name}", t + timedelta(minutes=5), random.choice(authors)) sh(["git","branch","-d", name], cwd=root) def build_secret_branch(root, base_time, authors): # ответвиться от текущего HEAD main sh(["git","checkout","-b", BRANCH_SECRET], cwd=root) t = base_time # немного шума до секрета for i in range(2): t = t + STEP_BRANCH + timedelta(minutes=random.randint(0,30)) mutate_noise(root, 8000+i) commit(root, random.choice(NOISE_MESSAGES), t, random.choice(authors)) # СЕКРЕТНЫЙ КОММИТ: OH-21B4 + base64-флаг как комментарий в router/server.js t = t + STEP_BRANCH set_mesh(root, "OH-21B4") add_vendor_note(root, FLAG_B64) snapshot_public_sources(root) commit(root, "router: set mesh id in config (staging bench)", t, random.choice(authors)) # ещё шум for i in range(2): t = t + STEP_BRANCH mutate_noise(root, 8100+i) commit(root, random.choice(NOISE_MESSAGES), t, random.choice(authors)) # вернуть прод-значение и удалить комментарий t = t + STEP_BRANCH set_mesh(root, "OH-2194") remove_vendor_note(root) restore_public_sources(root) commit(root, "router: revert mesh id to production", t, random.choice(authors)) # финальный шум на ветке t = t + STEP_BRANCH mutate_noise(root, 8200) commit(root, "chore: staging cleanup", t, random.choice(authors)) # squash-merge в main (в main ни OH-21B4, ни vendor-note не попадут) sh(["git","checkout","main"], cwd=root) sh(["git","merge","--squash", BRANCH_SECRET], cwd=root) t = t + STEP_BRANCH commit(root, f"merge: {BRANCH_SECRET}", t, random.choice(authors)) # Ветку НЕ удаляем — пусть живёт def tag_releases(root): sh(["git","tag","-a","v0.2.0","-m","public release 0.2.0"], cwd=root) sh(["git","tag","-a","v0.3.0","-m","public release 0.3.0"], cwd=root) def main(): parser = argparse.ArgumentParser() parser.add_argument("--author-name", default=None, help="Primary author name") parser.add_argument("--author-email", default=None, help="Primary author email") parser.add_argument("--push", default=None, help="Git remote URL to push (SSH or HTTPS)") args = parser.parse_args() primary = (args.author_name or "Your Name", args.author_email or "you@example.com") authors = [primary] + [a for a in AUTHORS_DEFAULT if a[1] != primary[1]] random.seed(42) root = os.path.abspath(REPO_NAME) init_repo(root, primary, authors) t_end_main = build_main(root, authors) # пара симпатичных merge-веток for name, count in FEATURE_BRANCHES: build_feature_branch(root, name, count, t_end_main, authors) t_end_main = t_end_main + timedelta(hours=1) # секретная ветка build_secret_branch(root, t_end_main, authors) tag_releases(root) print(f"[ok] repo created: {root}") print("\nПроверка локально:") print(" cd", REPO_NAME) print(" git log -S 'OH-21B4' --all --oneline") print(" git grep -n 'OH-21B4' $(git rev-list --all)") print(" git show $(git log -S 'OH-21B4' --all -n 1 --pretty=%H) | grep -i 'vendor-note'") print("\nПуш в GitHub вручную:") print(" git remote add origin ") print(" git push -u origin --all --tags") if args.push: sh(["git","remote","add","origin", args.push], cwd=root) sh(["git","push","-u","origin","--all"], cwd=root) sh(["git","push","origin","--tags"], cwd=root) print("[ok] pushed to:", args.push) print("Проверь ветки/теги на GitHub UI.") if __name__ == "__main__": main()