#!/usr/bin/env python3 """ Quick delta clustering to show the two timing buckets that encode 0 and 1. Requires scapy: pip install scapy """ from collections import Counter import math import sys from typing import List from scapy.all import rdpcap, IP, ICMP # type: ignore TARGET_IP = "192.168.13.37" BIN_WIDTH = 0.05 # Seconds def load_icmp_times(pcap_path: str) -> List[float]: packets = rdpcap(pcap_path) icmp_flow = [ pkt for pkt in packets if IP in pkt and ICMP in pkt and (pkt[IP].src == TARGET_IP or pkt[IP].dst == TARGET_IP) ] icmp_flow.sort(key=lambda p: float(p.time)) return [float(pkt.time) for pkt in icmp_flow] def as_deltas(times: List[float]) -> List[float]: return [cur - prev for prev, cur in zip(times, times[1:])] def bin_value(value: float) -> float: return round(math.floor(value / BIN_WIDTH) * BIN_WIDTH, 2) def main() -> None: pcap_path = sys.argv[1] if len(sys.argv) > 1 else "../public/task.pcapng" deltas = as_deltas(load_icmp_times(pcap_path)) histogram = Counter(bin_value(dt) for dt in deltas) print("Top timing buckets (bin width {:.2f}s):".format(BIN_WIDTH)) for bucket, count in sorted(histogram.items(), key=lambda item: (-item[1], item[0]))[:20]: print(f"{bucket:>4.2f} s -> {count}") for center, label in ((1.2, "0"), (1.6, "1")): window = [dt for dt in deltas if abs(dt - center) <= 0.1] if window: avg = sum(window) / len(window) print(f"Near {center:.2f}s (bit {label}) count={len(window)} avg={avg:.4f}") if __name__ == "__main__": main()