Files
Kubok-Regionov/GAME/osint/gen.py
2025-12-22 05:19:38 +03:00

434 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
forge_repo_staging.py — генерирует публичный репозиторий с 500+ коммитами.
Строка mesh: "OH-21B4" и base64-флаг встречаются РОВНО в одном коммите ветки
`staging/router-mesh`. В main их нет (ветка смёржена через --squash).
Можно сразу запушить: python3 forge_repo_staging.py --push <git-url>
Или задать автора: --author-name "Имя" --author-email you@example.com
"""
import os, sys, json, random, subprocess, shutil, argparse, base64, re
from datetime import datetime, timedelta, timezone
# -------- параметры по умолчанию --------
REPO_NAME = "iot-suite-repo-mega-fixed"
BRANCH_SECRET = "staging/router-mesh"
FEATURE_BRANCHES = [
("feature/ui-polish", 6),
("hotfix/diag-timeout", 3),
("refactor/router-cfg", 5),
]
MAIN_COMMITS = 520 # шумовых коммитов в main
TIME_START = datetime(2023, 3, 1, 10, 0, tzinfo=timezone(timedelta(hours=1)))
STEP_MAIN = timedelta(hours=6)
STEP_BRANCH = timedelta(hours=2)
AUTHORS_DEFAULT = [
("Natalie Orlov", "natalie@orbital.home"),
("Anton Lebedev", "anton@orbital.home"),
("Nina Petrova", "nina@orbital.home"),
("Mark Isaev", "mark@orbital.home"),
("QA Bot", "qa-bot@orbital.home"),
]
NOISE_MESSAGES = [
"docs: update changelog", "chore: reformat", "ci: bump node action",
"router: refactor logger", "kettle: tweak diag", "nas: session cleanup",
"printer: queue cosmetics", "deps: bump ejs", "deps: bump express",
"test: deflake", "build: docker cache", "perf: micro-opt",
"style: trailing space", "docs: badges",
]
FLAG_PLAINTEXT = "caplag{Orb1ta1-h0me-vendor-found-needed-branch-version}"
FLAG_B64 = base64.b64encode(FLAG_PLAINTEXT.encode()).decode()
def sh(args, cwd=None, env=None):
return subprocess.run(args, cwd=cwd, env=env, check=True,
stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True).stdout.strip()
def write(path, content, binary=False):
os.makedirs(os.path.dirname(path), exist_ok=True)
mode = "wb" if (binary or isinstance(content, (bytes, bytearray))) else "w"
with open(path, mode, encoding=None if mode=="wb" else "utf-8") as f:
f.write(content)
BASE_README = """# Orbital Home — IoT Suite (public)
Публичная версия кодовой базы для демонстраций: Router, Printer, Kettle, NAS.
Документация и закрытые части вырезаны.
"""
# ---------------- router (публичный минимальный) ----------------
ROUTER_SERVER = """import express from 'express';
import fs from 'fs';
import path from 'path';
import { fileURLToPath } from 'url';
const __dirname = path.dirname(fileURLToPath(import.meta.url));
const app = express();
const PORT = process.env.PORT || 8085;
const cfgPath = path.join(__dirname, 'config', 'router.json');
let cfg = JSON.parse(fs.readFileSync(cfgPath, 'utf-8'));
app.get('/info_connection', (req,res)=>{
res.json({ devices: [
{ name:'Printer', model:'PRN-3130', vendor: cfg.vendor, mesh: cfg.mesh, url: 'http://printer.local' },
{ name:'Kettle', model:'KTL-74C1', vendor: cfg.vendor, mesh: cfg.mesh, url: 'http://kettle.local' },
{ name:'NAS', model:'NAS-220', vendor: cfg.vendor, mesh: cfg.mesh, url: 'http://nas.local' }
]});
});
app.get('/', (req,res)=> res.type('text/plain').send('Router UI (public build)'));
app.listen(PORT, ()=> console.log(`[router-public] listening on ${PORT}`));
"""
ROUTER_PKG = {
"name":"router-public","version":"0.3.0","type":"module",
"scripts":{"start":"node server.js"},
"dependencies":{"express":"^4.19.2"}
}
# ---------------- printer (замаскированный) ----------------
PRINTER_SERVER_PUBLIC = """import express from 'express';
import fs from 'fs';
import path from 'path';
import { fileURLToPath } from 'url';
const __dirname = path.dirname(fileURLToPath(import.meta.url));
const app = express();
const PORT = process.env.PORT || 8080;
const QUEUE_DIR = path.join(__dirname, 'data', 'queue');
// <redacted>: private build may include additional endpoints and auth
app.get('/', (req,res)=> res.type('text/plain').send('Printer UI (public build)'));
app.get('/jobs/preview', (req,res)=>{
const name = String(req.query.file||'').replace(/\\\\/g,'/');
if (!name || name.includes('..') || path.isAbsolute(name)) return res.status(400).send('bad path');
const abs = path.join(QUEUE_DIR, path.normalize(name));
if (!abs.startsWith(QUEUE_DIR)) return res.status(400).send('bad path');
if (!fs.existsSync(abs)) return res.status(404).send('not found');
fs.createReadStream(abs).pipe(res);
});
app.get('/go', (req,res)=>{
const u = String(req.query.u||'');
if (/^https?:\\/\\//i.test(u)) return res.redirect(302,u);
res.redirect('/');
});
app.get('/robots.txt', (req,res)=> res.type('text/plain').send('User-agent: *\\nDisallow:\\n'));
app.listen(PORT, ()=> console.log(`[printer-public] listening on ${PORT}`));
"""
PRINTER_PKG_PUBLIC = {
"name":"printer-public","version":"0.3.0","type":"module",
"scripts":{"start":"node server.js"},
"dependencies":{"express":"^4.19.2"}
}
# ---------------- kettle (замаскированный) ----------------
KETTLE_SERVER_PUBLIC = """import express from 'express';
import path from 'path';
import { fileURLToPath } from 'url';
// import axios from 'axios'; // <redacted>
// import crypto from 'crypto'; // <redacted>
// const PRINTER_PROOF = process.env.PRINTER_PROOF; // <redacted>
// const KETTLE_NONCE = process.env.KETTLE_NONCE; // <redacted>
// const KETTLE_SHARED_KEY = process.env.KETTLE_SHARED_KEY; // <redacted>
const __dirname = path.dirname(fileURLToPath(import.meta.url));
const app = express();
const PORT = process.env.PORT || 8081;
app.get('/', (req,res)=> res.redirect('/diagnostics'));
app.get('/diagnostics', (req,res)=>{
// In private build this page exposes limited device info and triggers telemetry.
res.type('text/plain').send('Kettle diagnostics (public build)');
});
app.post('/diagnostics/fetch', (req,res)=>{
// <redacted>: route-signature-based fetch via internal gateway
// key = (PRINTER_PROOF + ":" + KETTLE_NONCE).encode()
//mac = hmac.new(key, (url + '\n' + ts).encode(), hashlib.sha256).hexdigest()
res.status(501).type('text/plain').send('disabled in public build');
});
app.get('/assets/badge.svg', (req,res)=>{
res.type('image/svg+xml').send('<svg xmlns="http://www.w3.org/2000/svg" width="20" height="20"><circle cx="10" cy="10" r="7" fill="#5c7cfa"/></svg>');
});
app.listen(PORT, ()=> console.log(`[kettle-public] listening on ${PORT}`));
"""
KETTLE_PKG_PUBLIC = {
"name":"kettle-public","version":"0.3.0","type":"module",
"scripts":{"start":"node server.js"},
"dependencies":{"express":"^4.19.2"}
}
# ---------------- nas (замаскированный) ----------------
NAS_SERVER_PUBLIC = """import express from 'express';
import path from 'path';
import { fileURLToPath } from 'url';
// import jwt from 'jsonwebtoken'; // <redacted>
// import crypto from 'crypto'; // <redacted>
// const JWT_SECRET = process.env.JWT_SECRET; // <redacted>
// const KETTLE_SHARED_KEY = process.env.KETTLE_SHARED_KEY; // <redacted>
const __dirname = path.dirname(fileURLToPath(import.meta.url));
const app = express();
const PORT = process.env.PORT || 5000;
app.use(express.urlencoded({ extended:false }));
app.use(express.json());
app.get('/', (req,res)=> res.redirect('/login'));
app.get('/login', (req,res)=> res.type('text/plain').send('NAS login (public build)'));
app.post('/login', (req,res)=> res.status(501).send('disabled in public build'));
// admin_pass = hashlib.sha1((KETTLE_PROOF + ":" + PRINTER_PROOF).encode()).hexdigest()[:12]
app.get('/internal/hint', (req,res)=>{
// <redacted> internal gateway-only endpoint (requires shared key + route sign)
res.status(403).type('text/plain').send('forbidden (public build)');
});
app.get('/admin/flag', (req,res)=> res.status(404).send('not found'));
const auth = req.headers.authorization || '';
const token = auth.replace(/^Bearer\s+/,'');
try{
const payload = jwt.verify(token, JWT_SECRET, { algorithms:['HS256'] });
if (payload.role !== 'admin') return res.status(403).send('need admin role');
if (payload.kpf !== KETTLE_PROOF) return res.status(403).send('incorrect kpf');
if (payload.aud !== 'nas-admin') return res.status(403).send('bad aud');
const now = Math.floor(Date.now()/1000);
if (!payload.iat || Math.abs(now - payload.iat) > 120) return res.status(403).send('iat too old');
app.listen(PORT, ()=> console.log(`[nas-public] listening on ${PORT}`));
"""
NAS_PKG_PUBLIC = {
"name":"nas-public","version":"0.3.0","type":"module",
"scripts":{"start":"node server.js"},
"dependencies":{"express":"^4.19.2"}
}
def commit(root, message, when, author):
sh(["git","add","-A"], cwd=root)
env = os.environ.copy()
env.update({
"GIT_AUTHOR_NAME": author[0], "GIT_AUTHOR_EMAIL": author[1],
"GIT_COMMITTER_NAME": author[0], "GIT_COMMITTER_EMAIL": author[1],
"GIT_AUTHOR_DATE": when.isoformat(),
"GIT_COMMITTER_DATE": when.isoformat(),
})
sh(["git","commit","-m", message], cwd=root, env=env)
def set_mesh(root, value):
cfgp = os.path.join(root,"router/config/router.json")
with open(cfgp,"r+",encoding="utf-8") as f:
data = json.load(f)
data["mesh"] = value
f.seek(0); json.dump(data, f, indent=2); f.truncate()
def add_vendor_note(root, b64text):
p = os.path.join(root,"router/server.js")
with open(p,"a",encoding="utf-8") as f:
f.write(f"\n// vendor-note: {b64text}\n")
def remove_vendor_note(root):
p = os.path.join(root,"router/server.js")
with open(p,"r",encoding="utf-8") as f:
s = f.read()
s = re.sub(r"\n?//\s*vendor-note:[^\n]*\n?", "\n", s, flags=re.IGNORECASE)
with open(p,"w",encoding="utf-8") as f:
f.write(s)
def add_trailing_space_lines(path):
with open(path, "r", encoding="utf-8") as f:
lines = f.read().splitlines()
with open(path, "w", encoding="utf-8") as f:
for line in lines:
f.write(line.rstrip(" ") + " \n")
def strip_trailing_space_lines(path):
with open(path, "r", encoding="utf-8") as f:
lines = f.read().splitlines()
with open(path, "w", encoding="utf-8") as f:
for line in lines:
f.write(line.rstrip(" ") + "\n")
def snapshot_public_sources(root):
for rel in ("kettle/server.js", "nas/server.js"):
add_trailing_space_lines(os.path.join(root, rel))
def restore_public_sources(root):
for rel in ("kettle/server.js", "nas/server.js"):
strip_trailing_space_lines(os.path.join(root, rel))
def mutate_noise(root, i):
choice = random.choice(["docs","router","printer","kettle","nas"])
if choice == "docs":
p = os.path.join(root,"docs/CHANGELOG.md")
with open(p,"a",encoding="utf-8") as f:
f.write(f"- maintenance {i}\n")
elif choice in ("router","printer","kettle","nas"):
p = os.path.join(root, f"{choice}/server.js") if choice=="router" else os.path.join(root,f"{choice}/server.js")
# лёгкая косметика-комментарий
with open(p,"a",encoding="utf-8") as f:
f.write(f"\n// note: minor tweak {i}\n")
def init_repo(root, primary_author, authors):
if os.path.exists(root):
shutil.rmtree(root)
os.makedirs(root, exist_ok=True)
sh(["git","init","--initial-branch=main"], cwd=root)
write(os.path.join(root,"README.md"), BASE_README)
write(os.path.join(root,"docs/CHANGELOG.md"), "## Changelog (public)\n")
# router
write(os.path.join(root,"router/server.js"), ROUTER_SERVER)
write(os.path.join(root,"router/package.json"), json.dumps(ROUTER_PKG, indent=2, ensure_ascii=False))
write(os.path.join(root,"router/config/router.json"), json.dumps({"vendor":"Orbital Home","mesh":"OH-2194"}, indent=2))
# printer (публичный, безопасный)
write(os.path.join(root,"printer/server.js"), PRINTER_SERVER_PUBLIC)
write(os.path.join(root,"printer/package.json"), json.dumps(PRINTER_PKG_PUBLIC, indent=2, ensure_ascii=False))
write(os.path.join(root,"printer/data/queue/readme.txt"), "public job\n")
# kettle (публичный, урезанный)
write(os.path.join(root,"kettle/server.js"), KETTLE_SERVER_PUBLIC)
write(os.path.join(root,"kettle/package.json"), json.dumps(KETTLE_PKG_PUBLIC, indent=2, ensure_ascii=False))
# nas (публичный, урезанный)
write(os.path.join(root,"nas/server.js"), NAS_SERVER_PUBLIC)
write(os.path.join(root,"nas/package.json"), json.dumps(NAS_PKG_PUBLIC, indent=2, ensure_ascii=False))
commit(root, "init(public): scaffold modules", TIME_START, primary_author)
def build_main(root, authors):
t = TIME_START
for i in range(1, MAIN_COMMITS+1):
t = t + STEP_MAIN + timedelta(minutes=random.randint(0,59))
mutate_noise(root, i)
commit(root, random.choice(NOISE_MESSAGES), t, random.choice(authors))
return t
def merge_noff(root, branch_name, message, when, author):
sh(["git","checkout","main"], cwd=root)
sh(["git","merge","--no-ff", branch_name, "-m", message], cwd=root)
def build_feature_branch(root, name, commits, base_time, authors):
sh(["git","checkout","-b", name], cwd=root)
t = base_time
for i in range(commits):
t = t + STEP_BRANCH + timedelta(minutes=random.randint(0,40))
mutate_noise(root, 5000+i)
commit(root, f"{name}: routine update {i+1}", t, random.choice(authors))
merge_noff(root, name, f"merge: {name}", t + timedelta(minutes=5), random.choice(authors))
sh(["git","branch","-d", name], cwd=root)
def build_secret_branch(root, base_time, authors):
# ответвиться от текущего HEAD main
sh(["git","checkout","-b", BRANCH_SECRET], cwd=root)
t = base_time
# немного шума до секрета
for i in range(2):
t = t + STEP_BRANCH + timedelta(minutes=random.randint(0,30))
mutate_noise(root, 8000+i)
commit(root, random.choice(NOISE_MESSAGES), t, random.choice(authors))
# СЕКРЕТНЫЙ КОММИТ: OH-21B4 + base64-флаг как комментарий в router/server.js
t = t + STEP_BRANCH
set_mesh(root, "OH-21B4")
add_vendor_note(root, FLAG_B64)
snapshot_public_sources(root)
commit(root, "router: set mesh id in config (staging bench)", t, random.choice(authors))
# ещё шум
for i in range(2):
t = t + STEP_BRANCH
mutate_noise(root, 8100+i)
commit(root, random.choice(NOISE_MESSAGES), t, random.choice(authors))
# вернуть прод-значение и удалить комментарий
t = t + STEP_BRANCH
set_mesh(root, "OH-2194")
remove_vendor_note(root)
restore_public_sources(root)
commit(root, "router: revert mesh id to production", t, random.choice(authors))
# финальный шум на ветке
t = t + STEP_BRANCH
mutate_noise(root, 8200)
commit(root, "chore: staging cleanup", t, random.choice(authors))
# squash-merge в main (в main ни OH-21B4, ни vendor-note не попадут)
sh(["git","checkout","main"], cwd=root)
sh(["git","merge","--squash", BRANCH_SECRET], cwd=root)
t = t + STEP_BRANCH
commit(root, f"merge: {BRANCH_SECRET}", t, random.choice(authors))
# Ветку НЕ удаляем — пусть живёт
def tag_releases(root):
sh(["git","tag","-a","v0.2.0","-m","public release 0.2.0"], cwd=root)
sh(["git","tag","-a","v0.3.0","-m","public release 0.3.0"], cwd=root)
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--author-name", default=None, help="Primary author name")
parser.add_argument("--author-email", default=None, help="Primary author email")
parser.add_argument("--push", default=None, help="Git remote URL to push (SSH or HTTPS)")
args = parser.parse_args()
primary = (args.author_name or "Your Name", args.author_email or "you@example.com")
authors = [primary] + [a for a in AUTHORS_DEFAULT if a[1] != primary[1]]
random.seed(42)
root = os.path.abspath(REPO_NAME)
init_repo(root, primary, authors)
t_end_main = build_main(root, authors)
# пара симпатичных merge-веток
for name, count in FEATURE_BRANCHES:
build_feature_branch(root, name, count, t_end_main, authors)
t_end_main = t_end_main + timedelta(hours=1)
# секретная ветка
build_secret_branch(root, t_end_main, authors)
tag_releases(root)
print(f"[ok] repo created: {root}")
print("\nПроверка локально:")
print(" cd", REPO_NAME)
print(" git log -S 'OH-21B4' --all --oneline")
print(" git grep -n 'OH-21B4' $(git rev-list --all)")
print(" git show $(git log -S 'OH-21B4' --all -n 1 --pretty=%H) | grep -i 'vendor-note'")
print("\nПуш в GitHub вручную:")
print(" git remote add origin <YOUR_GITHUB_REPO_URL>")
print(" git push -u origin --all --tags")
if args.push:
sh(["git","remote","add","origin", args.push], cwd=root)
sh(["git","push","-u","origin","--all"], cwd=root)
sh(["git","push","origin","--tags"], cwd=root)
print("[ok] pushed to:", args.push)
print("Проверь ветки/теги на GitHub UI.")
if __name__ == "__main__":
main()