# -*- coding: utf-8 -*- """ DACTRL Scalp Transfer Ablation ================================ Tests three approaches to recover scalp→thalamic transfer. Motivated by the finding that the original scalp encoder (B: F1=0.748) underperforms random init (A: F1=0.858) due to cross-domain distribution mismatch. OPTION 1 — Thalamic-calibrated scalp training: Fit scaler on thalamic data. Apply thalamic scaler to scalp features during Stage 1 SupCon training. Encoder learns PGES patterns in the thalamic feature distribution. At test time: thalamic scaler + scalp-trained encoder. Limitation: thalamus-specific — cannot generalize to hippocampus/STN without re-running with those region statistics. OPTION 2 — Scale-invariant features (platform-compatible): Replace absolute amplitude features with relative/ratio equivalents: - Band powers → relative (% of total spectral power) - RMS, Line_Length, Variance → normalized by within-window RMS - Entropy/complexity features → unchanged (already scale-invariant) Result: 16-dim feature vector with no hardware-specific amplitude dependence. Universal: valid for any deep brain region without target-specific calibration. This is the only option compatible with the full platform vision. OPTION 3 — Domain-adversarial SupCon (DANN-style): During scalp SupCon training, add a gradient reversal layer feeding a domain discriminator (scalp=0 vs thalamic=1). Encoder is trained to simultaneously: (a) separate PGES from baseline on scalp (SupCon loss) (b) fool the domain discriminator (GRL loss) Forces domain-invariant PGES embedding. Requires thalamic data during training. Limitation: needs labeled thalamic data from each new region — not universal. Baselines (from deployment scenarios): A: Random init + K examples → F1=0.858 (K=10) [target to beat] B: Original scalp (raw) → F1=0.748 (K=10) [known poor transfer] """ import os; os.environ.setdefault('PYTHONIOENCODING', 'utf-8') import copy, random, gc, warnings from pathlib import Path from datetime import datetime import numpy as np import pandas as pd import matplotlib; matplotlib.use('Agg') import matplotlib.pyplot as plt from sklearn.preprocessing import StandardScaler from sklearn.metrics import f1_score, roc_auc_score, accuracy_score import torch import torch.nn as nn import torch.nn.functional as F warnings.filterwarnings('ignore') DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu') print(f"[{'GPU' if torch.cuda.is_available() else 'CPU'}] " f"{torch.cuda.get_device_name(0) if torch.cuda.is_available() else 'No GPU'}") torch.manual_seed(42); np.random.seed(42); random.seed(42) import mne; mne.set_log_level('ERROR') SEEG_ROOT = Path(r"G:\PHD Datasets\Data\Thalamus\SEEG Seizure Data") METADATA = SEEG_ROOT / "metadata_SEEG.xlsx" _RESULTS = Path(r"D:\Projects\phd\PSEG\pges_toolkit\results") OUT_ROOT = _RESULTS / "dactrl_scalp_transfer_ablation" FIG_DIR = OUT_ROOT / "figures" TAB_DIR = OUT_ROOT / "tables" for d in [OUT_ROOT, FIG_DIR, TAB_DIR]: d.mkdir(parents=True, exist_ok=True) K_LIST = [2, 5, 10, 20] N_TRIALS = 10 N_EPOCHS_SUPCON = 100 DANN_LAMBDA = 0.5 # GRL weight for domain adversarial loss def log(msg): print(f"[{datetime.now().strftime('%H:%M:%S')}] {msg}", flush=True) NUCLEUS_MAP = { 'P1':'CeM','P3':'CeM','P5':'CeM','P9':'CeM', 'P2':'CL', 'P7':'CL', 'P8':'CL', 'P4':'MD', 'P6':'MD', 'P10':'ANT','P11':'ANT','P12':'ANT', 'P13':'ANT','P14':'ANT','P15':'ANT', } PRIMARY_EXCLUDE = {'P13'} # Feature indices (16-dim vector) FEAT_NAMES = ['RMS','Line_Length','Zero_Crossings','Variance', 'Delta_Power','Theta_Power','Alpha_Power','Beta_Power', 'Spectral_Ratio','Shannon_Entropy','Suppression_Ratio', 'Approx_Entropy','Sample_Entropy','ETC','LZC','Perm_Entropy'] FEAT = {n: i for i, n in enumerate(FEAT_NAMES)} # Amplitude-sensitive feature indices (hardware-dependent) AMP_FEATS = [FEAT['RMS'], FEAT['Line_Length'], FEAT['Variance']] BAND_FEATS = [FEAT['Delta_Power'], FEAT['Theta_Power'], FEAT['Alpha_Power'], FEAT['Beta_Power']] # Scale-invariant features (unchanged by amplitude) RATIO_FEATS = [FEAT['Zero_Crossings'], FEAT['Spectral_Ratio'], FEAT['Suppression_Ratio'], FEAT['Approx_Entropy'], FEAT['Sample_Entropy'], FEAT['ETC'], FEAT['LZC'], FEAT['Perm_Entropy']] # ── Import v3 ────────────────────────────────────────────────────────────────── _V3 = Path(__file__).parent / "dactrl_v3_episodic_protonet.py" _v3g = {'__file__': str(_V3)} with open(_V3, 'r', errors='replace') as _f: _src = _f.read().replace("if __name__ == '__main__':", "if __name__ == '__v3_never__':") exec(compile(_src, str(_V3), 'exec'), _v3g) load_all_seeg = _v3g['load_all_seeg'] compute_consensus_thresholds = _v3g['compute_consensus_thresholds'] reapply_consensus = _v3g['reapply_consensus'] FullModel = _v3g['FullModel'] stage1_supcon = _v3g['stage1_supcon'] protonet_classify = _v3g['protonet_classify'] diversity_support = _v3g['diversity_support'] _infer_seizure_ids = _v3g['_infer_seizure_ids'] load_scalp_data = _v3g['load_scalp_data'] log("v3 components loaded.") # ══════════════════════════════════════════════════════════════════════════════ # Feature transformations # ══════════════════════════════════════════════════════════════════════════════ def make_scale_invariant(X: np.ndarray) -> np.ndarray: """ Option 2: Convert to scale-invariant feature representation. - Band powers → relative (each band / total band power) - RMS, Line_Length, Variance → normalized by RMS (RMS becomes 1.0, others become RMS-relative ratios) - Shannon_Entropy → amplitude-dependent; normalize by RMS - All complexity/ratio features unchanged Universal: valid for any recording modality without target calibration. """ X_out = X.copy() eps = 1e-8 # Relative band powers (sum to ~1.0 within each window) total_band = (X[:, BAND_FEATS].sum(axis=1, keepdims=True) + eps) for fi in BAND_FEATS: X_out[:, fi] = X[:, fi] / total_band.squeeze() # Normalize amplitude features by RMS rms = X[:, FEAT['RMS']] + eps X_out[:, FEAT['Line_Length']] = X[:, FEAT['Line_Length']] / rms X_out[:, FEAT['Variance']] = X[:, FEAT['Variance']] / (rms ** 2) X_out[:, FEAT['RMS']] = np.ones(len(X)) # RMS/RMS = 1 (constant) X_out[:, FEAT['Shannon_Entropy']] = X[:, FEAT['Shannon_Entropy']] / rms return X_out # ══════════════════════════════════════════════════════════════════════════════ # Domain-adversarial SupCon (Option 3) # ══════════════════════════════════════════════════════════════════════════════ class GradientReversalFn(torch.autograd.Function): @staticmethod def forward(ctx, x, alpha): ctx.alpha = alpha return x.clone() @staticmethod def backward(ctx, grad_output): return -ctx.alpha * grad_output, None class DomainDiscriminator(nn.Module): def __init__(self, in_dim=64): super().__init__() self.net = nn.Sequential( nn.Linear(in_dim, 32), nn.ReLU(), nn.Linear(32, 1) ) def forward(self, x, alpha=1.0): x_rev = GradientReversalFn.apply(x, alpha) return self.net(x_rev).squeeze(-1) def supcon_loss(features, labels, temperature=0.07): """Supervised contrastive loss (batch version).""" features = F.normalize(features, dim=1) sim = torch.matmul(features, features.T) / temperature n = len(labels) mask_pos = (labels.unsqueeze(0) == labels.unsqueeze(1)).float() mask_self = torch.eye(n, device=features.device) mask_pos = mask_pos - mask_self sim_exp = torch.exp(sim - sim.max(dim=1, keepdim=True).values.detach()) denom = (sim_exp * (1 - mask_self)).sum(dim=1, keepdim=True) + 1e-8 log_prob = sim - torch.log(denom) - sim.max(dim=1, keepdim=True).values.detach() loss_per_anchor = -(mask_pos * log_prob).sum(dim=1) / (mask_pos.sum(dim=1) + 1e-8) return loss_per_anchor[mask_pos.sum(dim=1) > 0].mean() def stage1_dann(X_scalp_norm, y_scalp, X_thal_norm, n_epochs=N_EPOCHS_SUPCON, lam=DANN_LAMBDA): """ Option 3: SupCon on scalp + gradient reversal domain discriminator. Scalp=domain 0, Thalamic=domain 1. Encoder is trained to fool the discriminator (domain-invariant embeddings) while still separating PGES from baseline on scalp via SupCon. """ log(f" Stage 1 DANN: {len(X_scalp_norm)} scalp + {len(X_thal_norm)} thalamic, " f"{n_epochs} epochs, λ={lam}...") model = FullModel().to(DEVICE) discriminator = DomainDiscriminator(in_dim=64).to(DEVICE) opt = torch.optim.Adam( list(model.parameters()) + list(discriminator.parameters()), lr=1e-3) scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(opt, T_max=n_epochs) Xs = torch.tensor(X_scalp_norm, dtype=torch.float32) ys = torch.tensor(y_scalp, dtype=torch.long) Xt = torch.tensor(X_thal_norm, dtype=torch.float32) BATCH = 128 model.train(); discriminator.train() for ep in range(1, n_epochs + 1): # Sample scalp batch idx_s = torch.randperm(len(Xs))[:BATCH] xb_s = Xs[idx_s].to(DEVICE) yb_s = ys[idx_s].to(DEVICE) # Sample thalamic batch (unlabeled for SupCon, labeled for domain) idx_t = torch.randperm(len(Xt))[:BATCH] xb_t = Xt[idx_t].to(DEVICE) emb_s = model.encoder(xb_s) # (B, 64) emb_t = model.encoder(xb_t) # SupCon loss on scalp loss_sc = supcon_loss(emb_s, yb_s) # Domain labels: scalp=0, thalamic=1 alpha = 2.0 / (1.0 + np.exp(-10 * ep / n_epochs)) - 1.0 # ramps up all_emb = torch.cat([emb_s, emb_t], dim=0) dom_labels = torch.cat([torch.zeros(len(emb_s)), torch.ones(len(emb_t))]).to(DEVICE) dom_pred = discriminator(all_emb, alpha=alpha) loss_dom = F.binary_cross_entropy_with_logits(dom_pred, dom_labels) loss = loss_sc + lam * loss_dom opt.zero_grad(); loss.backward(); opt.step(); scheduler.step() if ep % 25 == 0: log(f" Ep {ep}/{n_epochs}: SupCon={loss_sc.item():.4f} " f"Domain={loss_dom.item():.4f} α={alpha:.2f}") model.eval() log(" Stage 1 DANN complete.") return model # ══════════════════════════════════════════════════════════════════════════════ # Evaluation helpers # ══════════════════════════════════════════════════════════════════════════════ def eval_patient_k(encoder, scaler, pdata, pid, scenario_name): """Few-shot test-time ProtoNet (K>0).""" X_norm = scaler.transform(pdata['X']) y = pdata['y_temporal'] sz_ids = _infer_seizure_ids(y) if len(np.unique(y)) < 2 or y.sum() < 2: return [] rows = [] for K in K_LIST: if y.sum() < K or (len(y) - y.sum()) < K: continue f1s = [] for _ in range(N_TRIALS): sup_idx, qry_idx = diversity_support(y, K, sz_ids) if len(qry_idx) < 3 or len(np.unique(y[sup_idx])) < 2: continue pred, _ = protonet_classify(encoder, X_norm[sup_idx], y[sup_idx], X_norm[qry_idx]) if len(np.unique(y[qry_idx])) < 2: continue f1s.append(f1_score(y[qry_idx], pred, average='macro', zero_division=0)) if not f1s: continue rows.append({'Scenario': scenario_name, 'Patient': pid, 'Nucleus': NUCLEUS_MAP.get(pid, '?'), 'K': K, 'F1': np.nanmean(f1s), 'F1_std': np.nanstd(f1s)}) log(f" {pid} K={K:>2}: F1={rows[-1]['F1']:.3f}") return rows def eval_zero_shot(encoder, scalp_scaler, X_scalp_norm, y_scalp, patients, scenario_name, X_transform=None): """ K=0: classify thalamic windows using scalp-derived class prototypes. X_transform: optional function applied to raw thalamic X before scalp_scaler. """ encoder.eval() with torch.no_grad(): emb = encoder(torch.tensor(X_scalp_norm, dtype=torch.float32 ).to(DEVICE)).cpu().numpy() pges_proto = emb[y_scalp == 1].mean(axis=0) base_proto = emb[y_scalp == 0].mean(axis=0) rows = [] for pid in sorted(patients.keys()): if pid in PRIMARY_EXCLUDE: continue pdata = patients[pid] y = pdata['y_temporal'] if len(np.unique(y)) < 2: continue X_raw = X_transform(pdata['X']) if X_transform else pdata['X'] X_norm = scalp_scaler.transform(X_raw) with torch.no_grad(): emb_t = encoder(torch.tensor(X_norm, dtype=torch.float32 ).to(DEVICE)).cpu().numpy() d_pges = np.linalg.norm(emb_t - pges_proto, axis=1) d_base = np.linalg.norm(emb_t - base_proto, axis=1) pred = (d_pges < d_base).astype(int) f1 = f1_score(y, pred, average='macro', zero_division=0) try: auc = roc_auc_score(y, -d_pges) except: auc = float('nan') log(f" {pid} K=0: F1={f1:.3f}") rows.append({'Scenario': scenario_name, 'Patient': pid, 'Nucleus': NUCLEUS_MAP.get(pid, '?'), 'K': 0, 'F1': f1, 'F1_std': 0.0}) return rows def run_scenario(name, encoder, scaler, patients, X_transform=None, scalp_scaler=None, X_scalp_norm=None, y_scalp=None): """Run both K=0 and K>0 evaluations for a scenario.""" rows = [] if scalp_scaler is not None and X_scalp_norm is not None: log(f" [{name}] K=0 zero-shot...") rows += eval_zero_shot(encoder, scalp_scaler, X_scalp_norm, y_scalp, patients, f"{name} K=0", X_transform) log(f" [{name}] K>0 few-shot...") for pid in sorted(patients.keys()): if pid in PRIMARY_EXCLUDE: continue log(f" {pid}...") pdata = patients[pid] if X_transform is not None: pdata_use = dict(pdata) pdata_use['X'] = X_transform(pdata['X']) else: pdata_use = pdata rows += eval_patient_k(encoder, scaler, pdata_use, pid, name) return rows # ══════════════════════════════════════════════════════════════════════════════ # Main # ══════════════════════════════════════════════════════════════════════════════ if __name__ == '__main__': log("=" * 70) log("DACTRL Scalp Transfer Ablation") log("Options 1 (thalamic-normalized), 2 (scale-invariant), 3 (DANN)") log("=" * 70) # ── Load data ───────────────────────────────────────────────────────────── log("\n[1] Loading thalamic patients...") meta_df = pd.read_excel(METADATA) patients = load_all_seeg(meta_df) thresh = compute_consensus_thresholds(patients) reapply_consensus(patients, thresh) log(f" {len(patients)} patients loaded.") X_thal = np.vstack([patients[p]['X'] for p in patients]) thal_scaler = StandardScaler().fit(X_thal) log(f" Thalamic scaler: {len(X_thal)} windows.") log("\n[2] Loading scalp data (CHB+TUH combined)...") X_scalp, y_scalp = load_scalp_data(max_chb=12, max_tuh=40) log(f" Combined scalp: {len(X_scalp)} windows (PGES={y_scalp.sum()}, " f"non-PGES={(y_scalp==0).sum()})") log("\n[2b] Loading TUH-only scalp data...") X_scalp_tuh, y_scalp_tuh = load_scalp_data(max_chb=0, max_tuh=40) log(f" TUH-only scalp: {len(X_scalp_tuh)} windows (PGES={y_scalp_tuh.sum()}, " f"non-PGES={(y_scalp_tuh==0).sum()})") # ── Baseline A: random init (reference) ─────────────────────────────────── log("\n[3] Baseline A — Random init + K examples (reference)") rnd_encoder = FullModel().to(DEVICE).encoder rnd_encoder.eval() rows_a = run_scenario("A: Random init", rnd_encoder, thal_scaler, patients) # ── Baseline B: original scalp (raw scaler) ──────────────────────────────── log("\n[4] Baseline B — Original scalp encoder (raw scalp scaler)") scalp_scaler_raw = StandardScaler().fit(X_scalp) X_scalp_norm_raw = scalp_scaler_raw.transform(X_scalp) model_b = stage1_supcon(X_scalp_norm_raw, y_scalp) rows_b = run_scenario("B: Scalp raw", model_b.encoder, scalp_scaler_raw, patients, scalp_scaler=scalp_scaler_raw, X_scalp_norm=X_scalp_norm_raw, y_scalp=y_scalp) # ── Baseline B_TUH: TUH-only scalp (raw scaler) ─────────────────────────── log("\n[4b] Baseline B_TUH — TUH-only scalp encoder (raw scalp scaler)") scalp_scaler_tuh = StandardScaler().fit(X_scalp_tuh) X_scalp_tuh_norm = scalp_scaler_tuh.transform(X_scalp_tuh) model_b_tuh = stage1_supcon(X_scalp_tuh_norm, y_scalp_tuh) rows_b_tuh = run_scenario("B_TUH: Scalp TUH-only", model_b_tuh.encoder, scalp_scaler_tuh, patients, scalp_scaler=scalp_scaler_tuh, X_scalp_norm=X_scalp_tuh_norm, y_scalp=y_scalp_tuh) # ── Option 1b: TUH-only + thalamic-normalized ───────────────────────────── log("\n[4c] Option 1b — TUH-only + thalamic-normalized") X_scalp_tuh_thal_norm = thal_scaler.transform(X_scalp_tuh) model_1b = stage1_supcon(X_scalp_tuh_thal_norm, y_scalp_tuh) rows_1b = run_scenario("Opt1b: TUH+Thal-norm", model_1b.encoder, thal_scaler, patients, scalp_scaler=thal_scaler, X_scalp_norm=X_scalp_tuh_thal_norm, y_scalp=y_scalp_tuh) # ── Option 1: thalamic-normalized scalp training ─────────────────────────── log("\n[5] Option 1 — Thalamic-normalized scalp training (CHB+TUH)") log(" Applying thalamic scaler to scalp features...") X_scalp_thal_norm = thal_scaler.transform(X_scalp) # scalp features in thalamic space model_1 = stage1_supcon(X_scalp_thal_norm, y_scalp) # At test time: thalamic scaler on thalamic data (same scaler used for training) rows_1 = run_scenario("Opt1: Thal-normalized", model_1.encoder, thal_scaler, patients, scalp_scaler=thal_scaler, # same scaler → prototypes in thalamic space X_scalp_norm=X_scalp_thal_norm, y_scalp=y_scalp) # ── Option 2: scale-invariant features ──────────────────────────────────── log("\n[6] Option 2 — Scale-invariant features (platform-compatible)") log(" Converting to relative band powers + RMS-normalized amplitude features...") X_scalp_si = make_scale_invariant(X_scalp) X_thal_si = np.vstack([make_scale_invariant(patients[p]['X']) for p in patients]) scalp_scaler_si = StandardScaler().fit(X_scalp_si) thal_scaler_si = StandardScaler().fit(X_thal_si) X_scalp_si_norm = scalp_scaler_si.transform(X_scalp_si) model_2 = stage1_supcon(X_scalp_si_norm, y_scalp) rows_2 = run_scenario("Opt2: Scale-invariant", model_2.encoder, thal_scaler_si, patients, X_transform=make_scale_invariant, scalp_scaler=scalp_scaler_si, X_scalp_norm=X_scalp_si_norm, y_scalp=y_scalp) # ── Option 3: domain-adversarial SupCon (DANN) ──────────────────────────── log("\n[7] Option 3 — Domain-adversarial SupCon (DANN)") log(" Training encoder to be domain-invariant (scalp↔thalamic)...") X_thal_norm_full = thal_scaler.transform(X_thal) # Use thalamic scaler for both (forces shared feature space via domain alignment) X_scalp_dann = thal_scaler.transform(X_scalp) model_3 = stage1_dann(X_scalp_dann, y_scalp, X_thal_norm_full) rows_3 = run_scenario("Opt3: DANN", model_3.encoder, thal_scaler, patients, scalp_scaler=thal_scaler, X_scalp_norm=X_scalp_dann, y_scalp=y_scalp) # ── Save and summarise ──────────────────────────────────────────────────── all_rows = rows_a + rows_b + rows_b_tuh + rows_1b + rows_1 + rows_2 + rows_3 df = pd.DataFrame(all_rows) df.to_csv(TAB_DIR / "scalp_transfer_ablation_raw.csv", index=False) summary = (df.groupby(['Scenario', 'K'])['F1'] .agg(F1_mean='mean', F1_std='std', N='count') .reset_index()) summary.to_csv(TAB_DIR / "scalp_transfer_ablation_summary.csv", index=False) log("\n" + "=" * 70) log("SCALP TRANSFER ABLATION SUMMARY") log("=" * 70) scenarios_order = ['A: Random init', 'B: Scalp raw', 'B_TUH: Scalp TUH-only', 'Opt1b: TUH+Thal-norm', 'Opt1: Thal-normalized', 'Opt2: Scale-invariant', 'Opt3: DANN'] log(f" {'Scenario':<30} K=0 K=2 K=5 K=10 K=20") log(" " + "-" * 70) for sc in scenarios_order: vals = [] for K in [0, 2, 5, 10, 20]: sub = df[(df['Scenario'].str.startswith(sc.split(':')[0])) & (df['K'] == K)] if sub.empty: vals.append(" — ") else: vals.append(f"{sub['F1'].mean():.3f}") log(f" {sc:<30} " + " ".join(vals)) log("\n KEY: Does any option beat random init (A) at K=10?") fa10 = df[(df['Scenario'] == 'A: Random init') & (df['K'] == 10)]['F1'].mean() log(f" Random init K=10 reference: {fa10:.3f}") for sc in ['Opt1: Thal-normalized', 'Opt2: Scale-invariant', 'Opt3: DANN']: sub = df[(df['Scenario'].str.startswith(sc.split(':')[0])) & (df['K'] == 10)] if not sub.empty: f = sub['F1'].mean() log(f" {sc:<30} K=10={f:.3f} vs random={f-fa10:+.3f}") log("\n KEY: Zero-shot (K=0) — which option beats chance (0.5)?") for sc in ['B: Scalp raw', 'Opt1: Thal-normalized', 'Opt2: Scale-invariant', 'Opt3: DANN']: sub = df[(df['Scenario'].str.contains(sc.split(':')[0])) & (df['K'] == 0)] if not sub.empty: f = sub['F1'].mean() log(f" {sc:<30} K=0={f:.3f} vs chance={f-0.5:+.3f}") # ── Figures ─────────────────────────────────────────────────────────────── log("\n[8] Generating figures...") fig, axes = plt.subplots(1, 2, figsize=(14, 5)) colors = {'A: Random init': '#2c3e50', 'B: Scalp raw': '#e74c3c', 'Opt1: Thal-normalized': '#e67e22', 'Opt2: Scale-invariant': '#27ae60', 'Opt3: DANN': '#8e44ad'} markers = {'A: Random init': 'o', 'B: Scalp raw': 'x', 'Opt1: Thal-normalized': 's', 'Opt2: Scale-invariant': '^', 'Opt3: DANN': 'D'} # K-curve plot ax = axes[0] for sc in scenarios_order: sub = df[df['Scenario'].str.startswith(sc.split(':')[0]) & (df['K'] > 0)] if sub.empty: continue grp = sub.groupby('K')['F1'].agg(['mean','std']).reset_index() lbl = sc.split(':')[1].strip() ax.plot(grp['K'], grp['mean'], color=colors.get(sc, 'gray'), marker=markers.get(sc, 'o'), label=lbl, linewidth=2.5, markersize=8) ax.fill_between(grp['K'], grp['mean'] - grp['std'], grp['mean'] + grp['std'], alpha=0.1, color=colors.get(sc, 'gray')) ax.axhline(0.883, color='black', linestyle='--', linewidth=1, label='v3 LOSO (0.883)') ax.set_xlabel('K (support examples per class)', fontsize=12) ax.set_ylabel('Mean Macro F1', fontsize=12) ax.set_title('Scalp Transfer Options — K-curve', fontweight='bold') ax.set_xticks(K_LIST); ax.set_ylim(0.3, 1.02) ax.legend(fontsize=9); ax.grid(True, alpha=0.3) # K=10 bar comparison ax = axes[1] sc_labels, means, errs, cols = [], [], [], [] for sc in scenarios_order: sub = df[df['Scenario'].str.startswith(sc.split(':')[0]) & (df['K'] == 10)] if sub.empty: continue sc_labels.append(sc.split(':')[1].strip()) means.append(sub['F1'].mean()) errs.append(sub['F1'].std()) cols.append(colors.get(sc, 'gray')) x = range(len(means)) bars = ax.bar(x, means, color=cols, alpha=0.85, edgecolor='black', linewidth=0.7, width=0.6) ax.errorbar(x, means, yerr=errs, fmt='none', color='black', capsize=5, linewidth=1.5) for bar, val in zip(bars, means): ax.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.008, f'{val:.3f}', ha='center', va='bottom', fontsize=9, fontweight='bold') ax.set_xticks(x); ax.set_xticklabels(sc_labels, rotation=15, ha='right') ax.axhline(0.883, color='black', linestyle='--', linewidth=1, label='v3 LOSO ref') ax.set_ylim(0.4, 1.05); ax.set_ylabel('Mean Macro F1 (K=10)') ax.set_title('K=10 Comparison: Scalp Transfer Options', fontweight='bold') ax.legend(fontsize=9); ax.grid(True, alpha=0.3, axis='y') fig.suptitle('DACTRL Scalp Transfer Ablation\n' 'Can scalp pre-training be recovered with feature engineering or domain adaptation?', fontsize=11, fontweight='bold') plt.tight_layout() p = FIG_DIR / "scalp_transfer_comparison.png" fig.savefig(p, dpi=150, bbox_inches='tight') plt.close(fig) log(f" Saved: {p}") log(f"\nAll outputs: {OUT_ROOT}") log("Done.")