Init. commit

This commit is contained in:
Caplag
2026-03-02 21:44:22 +03:00
committed by Ivan Z
commit 9511b38280
38 changed files with 4397 additions and 0 deletions

View File

@@ -0,0 +1,346 @@
import argparse
import os
import struct
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, Iterable, Iterator, List, Optional, Tuple
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
SECTOR_SIZE = 512
@dataclass
class Fat32BPB:
bytes_per_sector: int
sectors_per_cluster: int
reserved_sectors: int
num_fats: int
total_sectors: int
fat_size_sectors: int
root_cluster: int
@property
def first_fat_sector(self) -> int:
return self.reserved_sectors
@property
def first_data_sector(self) -> int:
return self.reserved_sectors + self.num_fats * self.fat_size_sectors
class VeraCryptFileVolume:
"""Reads plaintext sectors from a VeraCrypt file container that is already decrypted in RAM (keys known)."""
def __init__(self, container: Path, xts_key: bytes, data_offset: int, data_length: Optional[int] = None):
self.container = container
self.xts_key = xts_key
self.base_file_sector = data_offset // SECTOR_SIZE
if data_offset % SECTOR_SIZE != 0:
raise ValueError("data_offset must be sector-aligned")
st = container.stat()
if data_length is None:
# Assume standard file container layout: header area at start AND backup header at end.
data_length = st.st_size - 2 * data_offset
self.data_length = data_length
if self.data_length < SECTOR_SIZE or self.data_length % SECTOR_SIZE != 0:
raise ValueError("data_length must be a positive multiple of sector size")
self.total_sectors = self.data_length // SECTOR_SIZE
self._fh = open(container, "rb")
def close(self) -> None:
try:
self._fh.close()
except Exception:
pass
def _decrypt_sector(self, file_sector: int, ct: bytes) -> bytes:
tweak = int(file_sector).to_bytes(16, "little", signed=False)
dec = Cipher(algorithms.AES(self.xts_key), modes.XTS(tweak)).decryptor()
return dec.update(ct) + dec.finalize()
def read_sector(self, vol_sector: int) -> bytes:
if not (0 <= vol_sector < self.total_sectors):
raise ValueError("volume sector out of range")
file_sector = self.base_file_sector + vol_sector
self._fh.seek(file_sector * SECTOR_SIZE)
ct = self._fh.read(SECTOR_SIZE)
if len(ct) != SECTOR_SIZE:
raise IOError("short read")
return self._decrypt_sector(file_sector, ct)
def parse_fat32_bpb(boot_sector: bytes) -> Fat32BPB:
oem = boot_sector[3:11]
if oem != b"MSDOS5.0":
raise ValueError(f"Unexpected OEM {oem!r} (expected MSDOS5.0)")
bps = struct.unpack_from("<H", boot_sector, 11)[0]
spc = boot_sector[13]
rsvd = struct.unpack_from("<H", boot_sector, 14)[0]
nfats = boot_sector[16]
root_ent_cnt = struct.unpack_from("<H", boot_sector, 17)[0]
tot16 = struct.unpack_from("<H", boot_sector, 19)[0]
fatsz16 = struct.unpack_from("<H", boot_sector, 22)[0]
tot32 = struct.unpack_from("<I", boot_sector, 32)[0]
fatsz32 = struct.unpack_from("<I", boot_sector, 36)[0]
root_clus = struct.unpack_from("<I", boot_sector, 44)[0]
if bps != SECTOR_SIZE:
raise ValueError(f"Unsupported bytes/sector: {bps}")
if root_ent_cnt != 0:
raise ValueError("Not FAT32 (root entry count != 0)")
if fatsz16 != 0:
raise ValueError("Not FAT32 (FATSz16 != 0)")
tot = tot32 or tot16
if tot == 0:
raise ValueError("Invalid total sectors")
if spc == 0:
raise ValueError("Invalid sectors/cluster")
if nfats < 1:
raise ValueError("Invalid number of FATs")
if fatsz32 == 0:
raise ValueError("Invalid FAT size")
if root_clus < 2:
raise ValueError("Invalid root cluster")
return Fat32BPB(
bytes_per_sector=bps,
sectors_per_cluster=spc,
reserved_sectors=rsvd,
num_fats=nfats,
total_sectors=tot,
fat_size_sectors=fatsz32,
root_cluster=root_clus,
)
def sanitize_name(s: str) -> str:
# Minimal cross-platform path sanitization.
s = s.replace("\\", "_").replace("/", "_").replace(":", "_")
s = s.strip().strip(".")
return s or "_"
def parse_short_name(ent: bytes) -> str:
name = ent[0:8].decode("ascii", errors="ignore").rstrip(" ")
ext = ent[8:11].decode("ascii", errors="ignore").rstrip(" ")
if not ext:
return name
return f"{name}.{ext}"
def parse_lfn_part(ent: bytes) -> str:
# LFN is UTF-16LE in three chunks
raw = ent[1:11] + ent[14:26] + ent[28:32]
out_chars: List[str] = []
for i in range(0, len(raw), 2):
(ch,) = struct.unpack_from("<H", raw, i)
if ch in (0x0000, 0xFFFF):
continue
out_chars.append(chr(ch))
return "".join(out_chars)
@dataclass
class DirEntry:
name: str
is_dir: bool
cluster: int
size: int
class Fat32:
def __init__(self, vol: VeraCryptFileVolume, bpb: Fat32BPB):
self.vol = vol
self.bpb = bpb
self._fat_sector_cache: Dict[int, bytes] = {}
def vol_sector_for_cluster(self, cluster: int) -> int:
return self.bpb.first_data_sector + (cluster - 2) * self.bpb.sectors_per_cluster
def read_fat_sector(self, fat_sector_index: int) -> bytes:
if fat_sector_index not in self._fat_sector_cache:
self._fat_sector_cache[fat_sector_index] = self.vol.read_sector(
self.bpb.first_fat_sector + fat_sector_index
)
return self._fat_sector_cache[fat_sector_index]
def fat_entry(self, cluster: int) -> int:
# FAT32 entry is 4 bytes, lower 28 bits used
off = cluster * 4
sector_index = off // SECTOR_SIZE
sector_off = off % SECTOR_SIZE
sec = self.read_fat_sector(sector_index)
(val,) = struct.unpack_from("<I", sec, sector_off)
return val & 0x0FFFFFFF
def iter_cluster_chain(self, start_cluster: int, *, max_steps: int = 200000) -> Iterator[int]:
cl = start_cluster
steps = 0
while 2 <= cl < 0x0FFFFFF8:
yield cl
nxt = self.fat_entry(cl)
if nxt == cl:
break
cl = nxt
steps += 1
if steps > max_steps:
raise RuntimeError("cluster chain too long (loop?)")
def read_cluster(self, cluster: int) -> bytes:
vsec0 = self.vol_sector_for_cluster(cluster)
buf = bytearray()
for i in range(self.bpb.sectors_per_cluster):
buf += self.vol.read_sector(vsec0 + i)
return bytes(buf)
def read_chain_data(self, start_cluster: int, size: int) -> bytes:
buf = bytearray()
for cl in self.iter_cluster_chain(start_cluster):
buf += self.read_cluster(cl)
if len(buf) >= size:
break
return bytes(buf[:size])
def read_directory(self, start_cluster: int) -> List[DirEntry]:
entries: List[DirEntry] = []
lfn_parts: List[str] = []
for cl in self.iter_cluster_chain(start_cluster):
data = self.read_cluster(cl)
for off in range(0, len(data), 32):
ent = data[off : off + 32]
first = ent[0]
if first == 0x00:
return entries
if first == 0xE5:
lfn_parts.clear()
continue
attr = ent[11]
if attr == 0x0F:
lfn_parts.append(parse_lfn_part(ent))
continue
name = ""
if lfn_parts:
name = "".join(reversed(lfn_parts))
lfn_parts.clear()
else:
name = parse_short_name(ent)
# Skip volume labels
if attr & 0x08:
continue
# Skip "." and ".."
if name in (".", ".."):
continue
is_dir = bool(attr & 0x10)
hi = struct.unpack_from("<H", ent, 20)[0]
lo = struct.unpack_from("<H", ent, 26)[0]
first_cluster = (hi << 16) | lo
size = struct.unpack_from("<I", ent, 28)[0]
entries.append(
DirEntry(
name=name,
is_dir=is_dir,
cluster=first_cluster,
size=size,
)
)
return entries
def walk_and_extract(
fs: Fat32,
start_cluster: int,
out_dir: Path,
rel_path: str = "",
*,
max_files: int = 5000,
) -> List[Path]:
extracted: List[Path] = []
stack: List[Tuple[int, str]] = [(start_cluster, rel_path)]
seen_dirs: set[Tuple[int, str]] = set()
while stack:
cl, rpath = stack.pop()
key = (cl, rpath)
if key in seen_dirs:
continue
seen_dirs.add(key)
for ent in fs.read_directory(cl):
name = sanitize_name(ent.name)
child_rel = os.path.join(rpath, name) if rpath else name
out_path = out_dir / child_rel
if ent.is_dir:
if ent.cluster >= 2:
stack.append((ent.cluster, child_rel))
continue
if ent.cluster < 2:
continue
out_path.parent.mkdir(parents=True, exist_ok=True)
data = fs.read_chain_data(ent.cluster, ent.size)
out_path.write_bytes(data)
extracted.append(out_path)
if len(extracted) >= max_files:
return extracted
return extracted
def main() -> int:
ap = argparse.ArgumentParser(description="Extract FAT32 files from a VeraCrypt container using recovered XTS keys")
ap.add_argument("container", type=Path)
ap.add_argument("--key-a", required=True, help="32-byte hex key A (data key)")
ap.add_argument("--key-b", required=True, help="32-byte hex key B (tweak key)")
ap.add_argument("--data-offset", default="0x20000", help="Start of encrypted volume area (default: 0x20000)")
ap.add_argument(
"--out-dir",
type=Path,
default=Path("out") / "vc_extracted",
help="Output directory (default: out/vc_extracted)",
)
args = ap.parse_args()
key_a = bytes.fromhex(args.key_a)
key_b = bytes.fromhex(args.key_b)
if len(key_a) != 32 or len(key_b) != 32:
raise SystemExit("Keys must be 32 bytes each")
data_offset = int(str(args.data_offset), 0)
xts_key = key_a + key_b
args.out_dir.mkdir(parents=True, exist_ok=True)
vol = VeraCryptFileVolume(args.container, xts_key, data_offset)
try:
boot = vol.read_sector(0) # volume sector 0 maps to file sector base_file_sector
bpb = parse_fat32_bpb(boot)
print(
f"[i] FAT32: total_sectors={bpb.total_sectors} spc={bpb.sectors_per_cluster} "
f"reserved={bpb.reserved_sectors} fats={bpb.num_fats} fatsz={bpb.fat_size_sectors} root={bpb.root_cluster}"
)
fs = Fat32(vol, bpb)
extracted = walk_and_extract(fs, bpb.root_cluster, args.out_dir)
print(f"[i] Extracted {len(extracted)} files into {args.out_dir}")
return 0
finally:
vol.close()
if __name__ == "__main__":
raise SystemExit(main())