Files
Geroi-Kodeksa/HumanAI-Forensic-Hard/scripts/extract_vc_fat32.py
2026-03-02 21:44:22 +03:00

347 lines
11 KiB
Python

import argparse
import os
import struct
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, Iterable, Iterator, List, Optional, Tuple
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
SECTOR_SIZE = 512
@dataclass
class Fat32BPB:
bytes_per_sector: int
sectors_per_cluster: int
reserved_sectors: int
num_fats: int
total_sectors: int
fat_size_sectors: int
root_cluster: int
@property
def first_fat_sector(self) -> int:
return self.reserved_sectors
@property
def first_data_sector(self) -> int:
return self.reserved_sectors + self.num_fats * self.fat_size_sectors
class VeraCryptFileVolume:
"""Reads plaintext sectors from a VeraCrypt file container that is already decrypted in RAM (keys known)."""
def __init__(self, container: Path, xts_key: bytes, data_offset: int, data_length: Optional[int] = None):
self.container = container
self.xts_key = xts_key
self.base_file_sector = data_offset // SECTOR_SIZE
if data_offset % SECTOR_SIZE != 0:
raise ValueError("data_offset must be sector-aligned")
st = container.stat()
if data_length is None:
# Assume standard file container layout: header area at start AND backup header at end.
data_length = st.st_size - 2 * data_offset
self.data_length = data_length
if self.data_length < SECTOR_SIZE or self.data_length % SECTOR_SIZE != 0:
raise ValueError("data_length must be a positive multiple of sector size")
self.total_sectors = self.data_length // SECTOR_SIZE
self._fh = open(container, "rb")
def close(self) -> None:
try:
self._fh.close()
except Exception:
pass
def _decrypt_sector(self, file_sector: int, ct: bytes) -> bytes:
tweak = int(file_sector).to_bytes(16, "little", signed=False)
dec = Cipher(algorithms.AES(self.xts_key), modes.XTS(tweak)).decryptor()
return dec.update(ct) + dec.finalize()
def read_sector(self, vol_sector: int) -> bytes:
if not (0 <= vol_sector < self.total_sectors):
raise ValueError("volume sector out of range")
file_sector = self.base_file_sector + vol_sector
self._fh.seek(file_sector * SECTOR_SIZE)
ct = self._fh.read(SECTOR_SIZE)
if len(ct) != SECTOR_SIZE:
raise IOError("short read")
return self._decrypt_sector(file_sector, ct)
def parse_fat32_bpb(boot_sector: bytes) -> Fat32BPB:
oem = boot_sector[3:11]
if oem != b"MSDOS5.0":
raise ValueError(f"Unexpected OEM {oem!r} (expected MSDOS5.0)")
bps = struct.unpack_from("<H", boot_sector, 11)[0]
spc = boot_sector[13]
rsvd = struct.unpack_from("<H", boot_sector, 14)[0]
nfats = boot_sector[16]
root_ent_cnt = struct.unpack_from("<H", boot_sector, 17)[0]
tot16 = struct.unpack_from("<H", boot_sector, 19)[0]
fatsz16 = struct.unpack_from("<H", boot_sector, 22)[0]
tot32 = struct.unpack_from("<I", boot_sector, 32)[0]
fatsz32 = struct.unpack_from("<I", boot_sector, 36)[0]
root_clus = struct.unpack_from("<I", boot_sector, 44)[0]
if bps != SECTOR_SIZE:
raise ValueError(f"Unsupported bytes/sector: {bps}")
if root_ent_cnt != 0:
raise ValueError("Not FAT32 (root entry count != 0)")
if fatsz16 != 0:
raise ValueError("Not FAT32 (FATSz16 != 0)")
tot = tot32 or tot16
if tot == 0:
raise ValueError("Invalid total sectors")
if spc == 0:
raise ValueError("Invalid sectors/cluster")
if nfats < 1:
raise ValueError("Invalid number of FATs")
if fatsz32 == 0:
raise ValueError("Invalid FAT size")
if root_clus < 2:
raise ValueError("Invalid root cluster")
return Fat32BPB(
bytes_per_sector=bps,
sectors_per_cluster=spc,
reserved_sectors=rsvd,
num_fats=nfats,
total_sectors=tot,
fat_size_sectors=fatsz32,
root_cluster=root_clus,
)
def sanitize_name(s: str) -> str:
# Minimal cross-platform path sanitization.
s = s.replace("\\", "_").replace("/", "_").replace(":", "_")
s = s.strip().strip(".")
return s or "_"
def parse_short_name(ent: bytes) -> str:
name = ent[0:8].decode("ascii", errors="ignore").rstrip(" ")
ext = ent[8:11].decode("ascii", errors="ignore").rstrip(" ")
if not ext:
return name
return f"{name}.{ext}"
def parse_lfn_part(ent: bytes) -> str:
# LFN is UTF-16LE in three chunks
raw = ent[1:11] + ent[14:26] + ent[28:32]
out_chars: List[str] = []
for i in range(0, len(raw), 2):
(ch,) = struct.unpack_from("<H", raw, i)
if ch in (0x0000, 0xFFFF):
continue
out_chars.append(chr(ch))
return "".join(out_chars)
@dataclass
class DirEntry:
name: str
is_dir: bool
cluster: int
size: int
class Fat32:
def __init__(self, vol: VeraCryptFileVolume, bpb: Fat32BPB):
self.vol = vol
self.bpb = bpb
self._fat_sector_cache: Dict[int, bytes] = {}
def vol_sector_for_cluster(self, cluster: int) -> int:
return self.bpb.first_data_sector + (cluster - 2) * self.bpb.sectors_per_cluster
def read_fat_sector(self, fat_sector_index: int) -> bytes:
if fat_sector_index not in self._fat_sector_cache:
self._fat_sector_cache[fat_sector_index] = self.vol.read_sector(
self.bpb.first_fat_sector + fat_sector_index
)
return self._fat_sector_cache[fat_sector_index]
def fat_entry(self, cluster: int) -> int:
# FAT32 entry is 4 bytes, lower 28 bits used
off = cluster * 4
sector_index = off // SECTOR_SIZE
sector_off = off % SECTOR_SIZE
sec = self.read_fat_sector(sector_index)
(val,) = struct.unpack_from("<I", sec, sector_off)
return val & 0x0FFFFFFF
def iter_cluster_chain(self, start_cluster: int, *, max_steps: int = 200000) -> Iterator[int]:
cl = start_cluster
steps = 0
while 2 <= cl < 0x0FFFFFF8:
yield cl
nxt = self.fat_entry(cl)
if nxt == cl:
break
cl = nxt
steps += 1
if steps > max_steps:
raise RuntimeError("cluster chain too long (loop?)")
def read_cluster(self, cluster: int) -> bytes:
vsec0 = self.vol_sector_for_cluster(cluster)
buf = bytearray()
for i in range(self.bpb.sectors_per_cluster):
buf += self.vol.read_sector(vsec0 + i)
return bytes(buf)
def read_chain_data(self, start_cluster: int, size: int) -> bytes:
buf = bytearray()
for cl in self.iter_cluster_chain(start_cluster):
buf += self.read_cluster(cl)
if len(buf) >= size:
break
return bytes(buf[:size])
def read_directory(self, start_cluster: int) -> List[DirEntry]:
entries: List[DirEntry] = []
lfn_parts: List[str] = []
for cl in self.iter_cluster_chain(start_cluster):
data = self.read_cluster(cl)
for off in range(0, len(data), 32):
ent = data[off : off + 32]
first = ent[0]
if first == 0x00:
return entries
if first == 0xE5:
lfn_parts.clear()
continue
attr = ent[11]
if attr == 0x0F:
lfn_parts.append(parse_lfn_part(ent))
continue
name = ""
if lfn_parts:
name = "".join(reversed(lfn_parts))
lfn_parts.clear()
else:
name = parse_short_name(ent)
# Skip volume labels
if attr & 0x08:
continue
# Skip "." and ".."
if name in (".", ".."):
continue
is_dir = bool(attr & 0x10)
hi = struct.unpack_from("<H", ent, 20)[0]
lo = struct.unpack_from("<H", ent, 26)[0]
first_cluster = (hi << 16) | lo
size = struct.unpack_from("<I", ent, 28)[0]
entries.append(
DirEntry(
name=name,
is_dir=is_dir,
cluster=first_cluster,
size=size,
)
)
return entries
def walk_and_extract(
fs: Fat32,
start_cluster: int,
out_dir: Path,
rel_path: str = "",
*,
max_files: int = 5000,
) -> List[Path]:
extracted: List[Path] = []
stack: List[Tuple[int, str]] = [(start_cluster, rel_path)]
seen_dirs: set[Tuple[int, str]] = set()
while stack:
cl, rpath = stack.pop()
key = (cl, rpath)
if key in seen_dirs:
continue
seen_dirs.add(key)
for ent in fs.read_directory(cl):
name = sanitize_name(ent.name)
child_rel = os.path.join(rpath, name) if rpath else name
out_path = out_dir / child_rel
if ent.is_dir:
if ent.cluster >= 2:
stack.append((ent.cluster, child_rel))
continue
if ent.cluster < 2:
continue
out_path.parent.mkdir(parents=True, exist_ok=True)
data = fs.read_chain_data(ent.cluster, ent.size)
out_path.write_bytes(data)
extracted.append(out_path)
if len(extracted) >= max_files:
return extracted
return extracted
def main() -> int:
ap = argparse.ArgumentParser(description="Extract FAT32 files from a VeraCrypt container using recovered XTS keys")
ap.add_argument("container", type=Path)
ap.add_argument("--key-a", required=True, help="32-byte hex key A (data key)")
ap.add_argument("--key-b", required=True, help="32-byte hex key B (tweak key)")
ap.add_argument("--data-offset", default="0x20000", help="Start of encrypted volume area (default: 0x20000)")
ap.add_argument(
"--out-dir",
type=Path,
default=Path("out") / "vc_extracted",
help="Output directory (default: out/vc_extracted)",
)
args = ap.parse_args()
key_a = bytes.fromhex(args.key_a)
key_b = bytes.fromhex(args.key_b)
if len(key_a) != 32 or len(key_b) != 32:
raise SystemExit("Keys must be 32 bytes each")
data_offset = int(str(args.data_offset), 0)
xts_key = key_a + key_b
args.out_dir.mkdir(parents=True, exist_ok=True)
vol = VeraCryptFileVolume(args.container, xts_key, data_offset)
try:
boot = vol.read_sector(0) # volume sector 0 maps to file sector base_file_sector
bpb = parse_fat32_bpb(boot)
print(
f"[i] FAT32: total_sectors={bpb.total_sectors} spc={bpb.sectors_per_cluster} "
f"reserved={bpb.reserved_sectors} fats={bpb.num_fats} fatsz={bpb.fat_size_sectors} root={bpb.root_cluster}"
)
fs = Fat32(vol, bpb)
extracted = walk_and_extract(fs, bpb.root_cluster, args.out_dir)
print(f"[i] Extracted {len(extracted)} files into {args.out_dir}")
return 0
finally:
vol.close()
if __name__ == "__main__":
raise SystemExit(main())