scripts.step_3_simulate_repression

Python port of the R ‘KD (repression)’ step.

Pipeline: 1) NFC background from fcs_files/NFC → compute mBFP_neg, mmCherry_neg (mean of first 3 medians). 2) Load time-course medians from fcs_files/time-course_data with boundary + singlet gating, then subtract NFC background. 3) Parse filename tokens → (plasmid, exp, rep, time); keep KD rows (exp=="KD"). 4) Normalize signals (KD): • fc.cherry = mCherry / mean_mCherry_at_time0 (per plasmid) • norm.bfp = (BFP - mean.init) / (mean.final - mean.init) where mean.init = mean(BFP | t==0), mean.final = mean(BFP | t>10) per plasmid. 5) Load parameters from CSVs: • parameters/half_times_upregulation.csv → t_up • parameters/Hill_parameters.csv → K, n • parameters/alphamcherry.csv → alpha (global or per-plasmid) 6) Simulate ODEs (repression model) for each plasmid (SP430A/427/428/411/430): • beta = ln(2) / t_up • dR/dt = beta − R(ln2/t_up) • dY/dt = K^n / (K^n + R^n) − alphaY • Outputs: R(t) (tagBFP proxy) and alpha*Y(t) (mCherry). 7) Delay scan: test delays 0..25 h (step 0.5); choose delay minimizing MAE between simulated mCherry and mean experimental fc.cherry per time. 8) Plots: • plots/MAE_KD_mcherry.pdf (MAE vs delay; best delay highlighted) • plots/KD_ODE_mCherry.pdf (mCherry data; base + delayed sim) • plots/KD_ODE_tagBFP_.pdf (normalized BFP vs simulated R) 9) Outputs: • parameters/delays_repression.csv with columns [plasmid, d_rev] • intermediate parameters are read from parameters/ as above.

add_fc_and_norm

add_fc_and_norm(df)

Add mCherry fold-change (vs t==0 mean per plasmid) and BFP min–max normalization.

fc.cherry = mCh / mean_t0(plasmid) norm.bfp = (BFP - mean.init) / (mean.final - mean.init), where mean.init = mean(BFP | t==0), mean.final = mean(BFP | t>10)

Parameters

df : pd.DataFrame KD time-course with [plasmid, time, BV421-A, PE-A].

Returns

pd.DataFrame Input with extra columns: mmcherry, fc.cherry, mean.final, mean.init, norm.bfp.

Source code in scripts/step_3_simulate_repression.py
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
def add_fc_and_norm(df: pd.DataFrame) -> pd.DataFrame:
    """
    Add mCherry fold-change (vs t==0 mean per plasmid) and BFP min–max normalization.

      fc.cherry = mCh / mean_t0(plasmid)
      norm.bfp  = (BFP - mean.init) / (mean.final - mean.init), where
                  mean.init = mean(BFP | t==0), mean.final = mean(BFP | t>10)

    Parameters
    ----------
    df : pd.DataFrame
        KD time-course with [plasmid, time, BV421-A, PE-A].

    Returns
    -------
    pd.DataFrame
        Input with extra columns: mmcherry, fc.cherry, mean.final, mean.init, norm.bfp.
    """
    t0 = (df.query("time == 0")  # rows at t==0
          .groupby("plasmid")[CH_mCh]  # by plasmid
          .mean().rename("mmcherry").reset_index())  # mean mCherry
    out = df.merge(t0, on="plasmid", how="left")  # attach mmcherry
    out["fc.cherry"] = out[CH_mCh] / out["mmcherry"]  # fold-change
    mean_final = (out.query("time > 10")  # t>10 subset
                  .groupby("plasmid")[CH_BFP]  # by plasmid
                  .mean().rename("mean.final").reset_index())  # final mean BFP
    mean_init = (out.query("time == 0")  # t==0 subset
                 .groupby("plasmid")[CH_BFP]  # by plasmid
                 .mean().rename("mean.init").reset_index())  # initial mean BFP
    out = out.merge(mean_final, on="plasmid", how="left")  # attach mean.final
    out = out.merge(mean_init, on="plasmid", how="left")  # attach mean.init
    denom = (out["mean.final"] - out["mean.init"]).replace(0, np.nan)  # protect /0
    out["norm.bfp"] = (out[CH_BFP] - out["mean.init"]) / denom  # min–max BFP
    print("[norm] preview:\n" +
          out[["plasmid", "time", CH_BFP, CH_mCh, "fc.cherry", "mean.init", "mean.final", "norm.bfp"]]
          .head().to_string(index=False))  # debug preview
    return out  # return enriched table

apply_boundary_gate

apply_boundary_gate(df)

Apply coarse rectangle gate on FSC-A and SSC-A.

Parameters

df : pd.DataFrame Raw event table.

Returns

pd.DataFrame Events within the boundary gate.

Source code in scripts/step_3_simulate_repression.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
def apply_boundary_gate(df: pd.DataFrame) -> pd.DataFrame:
    """
    Apply coarse rectangle gate on FSC-A and SSC-A.

    Parameters
    ----------
    df : pd.DataFrame
        Raw event table.

    Returns
    -------
    pd.DataFrame
        Events within the boundary gate.
    """
    m = (  # build boolean mask for the rectangular region
            (df[CH_FSC_A] >= BOUND_MIN[CH_FSC_A]) & (df[CH_FSC_A] <= BOUND_MAX[CH_FSC_A]) &
            (df[CH_SSC_A] >= BOUND_MIN[CH_SSC_A]) & (df[CH_SSC_A] <= BOUND_MAX[CH_SSC_A])
    )
    out = df.loc[m]  # filter events
    print(f"[gate] boundary: kept {len(out):,}/{len(df):,}")  # debug summary
    return out  # return gated df

apply_singlet_gate

apply_singlet_gate(df)

Keep singlets based on FSC-H/FSC-A ratio.

Parameters

df : pd.DataFrame Events (boundary-gated recommended).

Returns

pd.DataFrame Singlet events only.

Source code in scripts/step_3_simulate_repression.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
def apply_singlet_gate(df: pd.DataFrame) -> pd.DataFrame:
    """
    Keep singlets based on FSC-H/FSC-A ratio.

    Parameters
    ----------
    df : pd.DataFrame
        Events (boundary-gated recommended).

    Returns
    -------
    pd.DataFrame
        Singlet events only.
    """
    ratio = df[CH_FSC_H] / (df[CH_FSC_A].replace(0, np.nan))  # robust ratio
    m = (ratio >= SINGLET_RATIO_LOW) & (ratio <= SINGLET_RATIO_HIGH)  # window
    out = df.loc[m]  # filter
    print(f"[gate] singlet : kept {len(out):,}/{len(df):,}")  # debug summary
    return out  # return singlets

build_kd_dataset

build_kd_dataset(nfc_dir='fcs_files/NFC', tc_dir='fcs_files/time-course_data')

Reconstruct the full KD dataset with background subtraction and normalization. Exposed so external scripts (e.g. Step 6) can load the exact same data.

Source code in scripts/step_3_simulate_repression.py
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
def build_kd_dataset(nfc_dir="fcs_files/NFC",
                     tc_dir="fcs_files/time-course_data") -> pd.DataFrame:
    """
    Reconstruct the full KD dataset with background subtraction and normalization.
    Exposed so external scripts (e.g. Step 6) can load the exact same data.
    """
    # 1. Compute NFC background
    mBFP_neg, mmCherry_neg = compute_nfc_background(nfc_dir)

    # 2. Load KD time-course
    # Note: We need to update load_kd to accept the directory if we want strict path handling,
    # but for now we rely on the default or update load_kd signature below.
    # For this surgical fix, we assume load_kd and load_flowset_medians use the hardcoded path
    # OR we update them. Let's update the calls to be safe if load_kd supports it.

    # Since load_kd inside this script currently hardcodes the path,
    # we should ideally update load_kd to accept a path argument.
    # However, to keep this fix "surgical" and low-touch:
    kd_data = load_kd(mBFP_neg, mmCherry_neg)

    # 3. Add FC and Norm
    return add_fc_and_norm(kd_data)

compute_nfc_background

compute_nfc_background(nfc_dir)

Estimate NFC background medians (mean of first 3 rows).

Parameters

nfc_dir : str Directory of NFC .fcs files.

Returns

(float, float) (mBFP_neg, mmCherry_neg) background values.

Source code in scripts/step_3_simulate_repression.py
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
def compute_nfc_background(nfc_dir: str):
    """
    Estimate NFC background medians (mean of first 3 rows).

    Parameters
    ----------
    nfc_dir : str
        Directory of NFC .fcs files.

    Returns
    -------
    (float, float)
        (mBFP_neg, mmCherry_neg) background values.
    """
    df = load_flowset_medians(nfc_dir)  # NFC medians
    print(f"[NFC] files={len(df)}; using first 3 for background")  # debug
    mBFP_neg = float(df.iloc[:3][CH_BFP].mean())  # BFP background
    mmCherry_neg = float(df.iloc[:3][CH_mCh].mean())  # mCherry background
    print(f"[NFC] mBFP_neg={mBFP_neg:.6g}, mmCherry_neg={mmCherry_neg:.6g}")  # debug
    return mBFP_neg, mmCherry_neg  # return tuple

delay_scan

delay_scan(pl_df, pars, t_grid=(0, 150, 0.01), delays=np.arange(0, 25.0 + 1e-09, 0.5))

Scan a range of delays (shift in simulation time) to minimize MAE vs. data.

Parameters

pl_df : pd.DataFrame KD dataset for one plasmid with columns ['time', 'fc.cherry', 'norm.bfp']. pars : pd.Series Parameter row with fields (t_up, k/K, n, alpha). t_grid : (float, float, float), optional (t0, t1, dt) for base simulation, by default (0, 150, 0.01). delays : array-like, optional Delay values (hours) to test, by default 0..25 step 0.5.

Returns

(pd.DataFrame, float, float, (np.ndarray, np.ndarray, np.ndarray)) (delay→MAE table, best_delay, best_mae, (base_t, base_R, base_Y))

Source code in scripts/step_3_simulate_repression.py
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
def delay_scan(pl_df: pd.DataFrame, pars: pd.Series,
               t_grid=(0, 150, 0.01),
               delays=np.arange(0, 25.0 + 1e-9, 0.5)):
    """
    Scan a range of delays (shift in simulation time) to minimize MAE vs. data.

    Parameters
    ----------
    pl_df : pd.DataFrame
        KD dataset for one plasmid with columns ['time', 'fc.cherry', 'norm.bfp'].
    pars : pd.Series
        Parameter row with fields (t_up, k/K, n, alpha).
    t_grid : (float, float, float), optional
        (t0, t1, dt) for base simulation, by default (0, 150, 0.01).
    delays : array-like, optional
        Delay values (hours) to test, by default 0..25 step 0.5.

    Returns
    -------
    (pd.DataFrame, float, float, (np.ndarray, np.ndarray, np.ndarray))
        (delay→MAE table, best_delay, best_mae, (base_t, base_R, base_Y))
    """
    t0, t1, dt = t_grid  # unpack time grid

    def pick(series, *keys):  # helper to fetch param with aliases
        for k in keys:
            if k in series and pd.notna(series[k]):
                return float(series[k])
        raise KeyError(f"Missing parameter(s) {keys} in row: {series.to_dict()}")

    # Extract parameters with tolerant keys
    t_up = pick(pars, "t_up", "T_up", "tup")  # upregulation half-time
    K = pick(pars, "K", "k")  # Hill K
    n = pick(pars, "n", "N", "hill_n")  # Hill n
    alpha = pick(pars, "alpha", "Alpha", "alphamcherry")  # decay rate
    beta = math.log(2.0) / t_up  # beta from t_up
    print(f"[delay] params: t_up={t_up:.4g}, K={K:.4g}, n={n:.4g}, alpha={alpha:.4g}, beta={beta:.4g}")  # debug

    # Build experimental mean curve vs time
    exp_mean = (pl_df.groupby("time")["fc.cherry"]  # group by time
                .mean().reset_index().sort_values("time"))  # mean fc per time
    exp_t = exp_mean["time"].to_numpy(float)  # times
    exp_y = exp_mean["fc.cherry"].to_numpy(float)  # responses
    print(f"[delay] exp points: {len(exp_t)} t[0..3]={exp_t[:3]} y[0..3]={exp_y[:3]}")  # debug

    # Simulate base (no delay)
    base_t, base_R, base_Y = simulate(beta, t_up, K, n, alpha, t0, t1, dt)  # base series

    rows = []  # collect MAE rows
    for d in delays:  # iterate candidate delays
        t_del = base_t + d  # shifted time grid
        fY = interp1d(t_del, base_Y, kind="linear",  # interpolator on shifted Y
                      bounds_error=False, fill_value=(base_Y[0], base_Y[-1]))
        y_hat = fY(exp_t)  # predicted Y at exp times
        resid = exp_y - y_hat  # residuals
        N = np.isfinite(resid).sum()  # number of finite residuals
        mae = np.inf if N <= 1 else np.nansum(np.abs(resid)) / (N - 1)  # robust MAE
        rows.append((float(d), int(N), float(mae)))  # append row

    out = pd.DataFrame(rows, columns=["t", "N", "MAE"])  # delay→MAE table
    best_row = out.loc[out["MAE"].idxmin()]  # locate min MAE
    best_d, best_mae = float(best_row["t"]), float(best_row["MAE"])  # unpack
    print(f"[delay] best delay={best_d:.3g} h, MAE={best_mae:.4g}")  # debug
    return out, best_d, best_mae, (base_t, base_R, base_Y)  # return results

load_flowset_medians

load_flowset_medians(folder)

Load all .fcs files in a folder and compute gated medians.

Parameters

folder : str Directory containing .fcs files.

Returns

pd.DataFrame One row per file with medians and '__filename'.

Source code in scripts/step_3_simulate_repression.py
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
def load_flowset_medians(folder: str) -> pd.DataFrame:
    """
    Load all .fcs files in a folder and compute gated medians.

    Parameters
    ----------
    folder : str
        Directory containing .fcs files.

    Returns
    -------
    pd.DataFrame
        One row per file with medians and '__filename'.
    """
    files = sorted(glob.glob(os.path.join(folder, "*.fcs")))  # discover files
    print(f"[scan] {folder}{len(files)} files")  # debug
    if not files:  # guard
        raise FileNotFoundError(f"No FCS in {folder}")
    out = pd.DataFrame([median_channels_for_file(f) for f in files])  # compute all
    print(f"[table] medians shape={out.shape}")  # debug
    return out  # return table

load_kd

load_kd(mBFP_neg, mmCherry_neg)

Load time-course medians, subtract NFC background, keep KD-only rows.

Parameters

mBFP_neg : float Background for BFP to subtract. mmCherry_neg : float Background for mCherry to subtract.

Returns

pd.DataFrame KD-only dataset with columns [BV421-A, PE-A, plasmid, exp, rep, time].

Source code in scripts/step_3_simulate_repression.py
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
def load_kd(mBFP_neg: float, mmCherry_neg: float) -> pd.DataFrame:
    """
    Load time-course medians, subtract NFC background, keep KD-only rows.

    Parameters
    ----------
    mBFP_neg : float
        Background for BFP to subtract.
    mmCherry_neg : float
        Background for mCherry to subtract.

    Returns
    -------
    pd.DataFrame
        KD-only dataset with columns [BV421-A, PE-A, plasmid, exp, rep, time].
    """
    med = load_flowset_medians("fcs_files/time-course_data").copy()  # medians table
    print(f"[KD] medians pre-sub shape={med.shape}")  # debug
    med[CH_BFP] = med[CH_BFP] - mBFP_neg  # subtract BFP bg
    med[CH_mCh] = med[CH_mCh] - mmCherry_neg  # subtract mCh bg
    print(f"[KD] bg-sub head:\n{med[[CH_BFP, CH_mCh]].head().to_string(index=False)}")  # preview
    parsed = med["__filename"].apply(parse_timecourse_name).tolist()  # parse tokens
    parsed_df = pd.DataFrame(parsed, columns=["plasmid", "exp", "rep", "time"])  # to frame
    df = pd.concat([med[[CH_BFP, CH_mCh]], parsed_df], axis=1)  # combine data/meta
    df = df[df["exp"] == "KD"].copy()  # keep KD rows
    print(f"[KD] KD-only rows: {len(df)}")  # debug
    df["time"] = pd.to_numeric(df["time"], errors="coerce")  # numeric time
    before = len(df)  # count before drop
    df = df.dropna(subset=["time"])  # drop invalid time
    print(f"[KD] dropped {before - len(df)} rows with non-numeric time")  # debug
    cat = pd.CategoricalDtype(["SP430", "SP428", "SP430ABA", "SP427", "SP411"], ordered=True)  # order
    df["plasmid"] = df["plasmid"].astype(cat)  # set category order
    return df  # return KD table

load_parameters

load_parameters()

Load and merge parameter CSVs required for KD simulation.

Expected files under 'parameters/': - half_times_downregulation.csv : columns [plasmid, halftime] → t_down (not used here) - half_times_upregulation.csv : columns [plasmid, halftime] → t_up - Hill_parameters.csv : columns [plasmid, K, n] - alphamcherry.csv : either [plasmid, alpha] or a single-row [alpha]

Returns

pd.DataFrame Merged table with columns at least: [plasmid, t_up, k, n, alpha].

Raises

KeyError If required columns/files are missing or malformed.

Source code in scripts/step_3_simulate_repression.py
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
def load_parameters() -> pd.DataFrame:
    """
    Load and merge parameter CSVs required for KD simulation.

    Expected files under 'parameters/':
      - half_times_downregulation.csv : columns [plasmid, halftime] → t_down (not used here)
      - half_times_upregulation.csv   : columns [plasmid, halftime] → t_up
      - Hill_parameters.csv           : columns [plasmid, K, n]
      - alphamcherry.csv              : either [plasmid, alpha] or a single-row [alpha]

    Returns
    -------
    pd.DataFrame
        Merged table with columns at least: [plasmid, t_up, k, n, alpha].

    Raises
    ------
    KeyError
        If required columns/files are missing or malformed.
    """
    base = "parameters"  # root directory

    def _read_norm(path: str) -> pd.DataFrame:
        df = pd.read_csv(path)  # read CSV
        df.columns = (df.columns.str.strip().str.lower()
                      .str.replace(r"[\s\-]+", "_", regex=True))  # normalize names
        print(f"[params] loaded {os.path.basename(path)} shape={df.shape}")  # debug
        return df

    # Load per-plasmid tables
    t_down = _read_norm(os.path.join(base, "half_times_downregulation.csv"))
    t_up = _read_norm(os.path.join(base, "half_times_upregulation.csv"))
    hills = _read_norm(os.path.join(base, "Hill_parameters.csv"))

    # Normalize expected columns
    if "halftime" in t_down.columns: t_down = t_down.rename(columns={"halftime": "t_down"})  # rename
    if "halftime" in t_up.columns:   t_up = t_up.rename(columns={"halftime": "t_up"})  # rename
    if "k" not in hills.columns and "K" in hills.columns: hills = hills.rename(columns={"K": "k"})  # fix K

    # Validate presence of 'plasmid'
    for name, df in [("half_times_downregulation.csv", t_down),
                     ("half_times_upregulation.csv", t_up),
                     ("Hill_parameters.csv", hills)]:
        if "plasmid" not in df.columns:
            raise KeyError(f"'{name}' must have a 'plasmid' column. Found: {list(df.columns)}")

    # Merge core parameter tables
    params = (t_down.merge(t_up, on="plasmid", how="outer")
              .merge(hills, on="plasmid", how="outer"))
    print(f"[params] merged core shape={params.shape}")  # debug

    # Load alpha; allow global or per-plasmid
    alpha_path = os.path.join(base, "alphamcherry.csv")  # path
    alpha = _read_norm(alpha_path)  # load alpha

    if "alpha" not in alpha.columns:  # tolerate variants
        for alt in ("alpha_mcherry", "alphamcherry", "a"):
            if alt in alpha.columns:
                alpha = alpha.rename(columns={alt: "alpha"})
                break
    if "alpha" not in alpha.columns:
        raise KeyError(f"'alphamcherry.csv' must contain 'alpha'. Found: {list(alpha.columns)}")

    if "plasmid" in alpha.columns:  # per-plasmid alpha
        params = params.merge(alpha[["plasmid", "alpha"]], on="plasmid", how="left")
    else:  # global alpha
        if len(alpha) != 1:
            raise KeyError("Global 'alphamcherry.csv' must contain exactly 1 row.")
        params["alpha"] = float(alpha["alpha"].iloc[0])  # broadcast

    # Final checks
    required = ["plasmid", "t_up", "k", "n", "alpha"]
    missing = [c for c in required if c not in params.columns]
    if missing:
        raise KeyError(f"Merged parameters missing columns: {missing}. Columns: {list(params.columns)}")

    if params[required].isna().any(axis=1).any():  # warn on NA rows
        print("[WARN] Some parameter rows contain NA values.")
        print(params.loc[params[required].isna().any(axis=1), ["plasmid"] + required[1:]])

    print(f"[params] final shape={params.shape}")  # debug
    return params  # return merged params

median_channels_for_file

median_channels_for_file(fpath)

Compute per-file medians after gating.

Parameters

fpath : str Path to .fcs file.

Returns

pd.Series Medians per channel with '__filename' stem.

Source code in scripts/step_3_simulate_repression.py
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
def median_channels_for_file(fpath: str) -> pd.Series:
    """
    Compute per-file medians after gating.

    Parameters
    ----------
    fpath : str
        Path to .fcs file.

    Returns
    -------
    pd.Series
        Medians per channel with '__filename' stem.
    """
    print(f"[read] {os.path.basename(fpath)}")  # which file
    data = FCMeasurement(ID=os.path.basename(fpath),  # load FCS
                         datafile=fpath).data
    print(f"[read] events={len(data):,}, cols={len(data.columns)}")  # size
    for ch in (CH_FSC_A, CH_SSC_A, CH_FSC_H, CH_BFP, CH_mCh):  # channel check
        if ch not in data.columns:
            raise ValueError(f"Channel '{ch}' not in {fpath}")
    g = apply_boundary_gate(data)  # boundary gate
    if len(g) == 0:  # fallback if empty
        print("[gate] boundary empty → using raw sample")
        g = data
    s = apply_singlet_gate(g)  # singlet gate
    if len(s) == 0:  # fallback if empty
        print("[gate] singlet empty  → using boundary-gated")
        s = g
    med = s.median(numeric_only=True)  # compute medians
    print(f"[median] BFP~{med.get(CH_BFP, np.nan):.3g} | mCh~{med.get(CH_mCh, np.nan):.3g}")  # preview
    med["__filename"] = os.path.splitext(os.path.basename(fpath))[0]  # store stem
    return med  # return series

ode_rhs

ode_rhs(t, y, beta, t_up, K, n, alpha)

Right-hand side of the repression ODE system.

Parameters

t : float Time (hours). y : list[float] State vector [R, Y], where R≈repressor proxy, Y≈scaled reporter (pre-scaling). beta : float Production rate chosen so that R rises with half-time t_up (beta=ln2/t_up). t_up : float Upregulation half-time for R. K : float Hill half-maximal constant. n : float Hill coefficient. alpha : float Reporter decay rate (per hour).

Returns

list[float] Derivatives [dR/dt, dY/dt].

Source code in scripts/step_3_simulate_repression.py
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
def ode_rhs(t: float, y: list, beta: float, t_up: float, K: float, n: float, alpha: float):
    """
    Right-hand side of the repression ODE system.

    Parameters
    ----------
    t : float
        Time (hours).
    y : list[float]
        State vector [R, Y], where R≈repressor proxy, Y≈scaled reporter (pre-scaling).
    beta : float
        Production rate chosen so that R rises with half-time t_up (beta=ln2/t_up).
    t_up : float
        Upregulation half-time for R.
    K : float
        Hill half-maximal constant.
    n : float
        Hill coefficient.
    alpha : float
        Reporter decay rate (per hour).

    Returns
    -------
    list[float]
        Derivatives [dR/dt, dY/dt].
    """
    R, Y = y  # unpack state
    dR = beta - R * (math.log(2.0) / t_up)  # repressor kinetics
    dY = (K ** n) / (K ** n + max(R, 0.0) ** n) - alpha * Y  # reporter kinetics
    return [dR, dY]  # return derivatives

parse_timecourse_name

parse_timecourse_name(name)

Extract (plasmid, exp, rep, time) tokens from filename stem.

Parameters

name : str Basename of the file (no extension).

Returns

(str, str, str, float) (plasmid, exp, rep, time)

Source code in scripts/step_3_simulate_repression.py
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
def parse_timecourse_name(name: str):
    """
    Extract (plasmid, exp, rep, time) tokens from filename stem.

    Parameters
    ----------
    name : str
        Basename of the file (no extension).

    Returns
    -------
    (str, str, str, float)
        (plasmid, exp, rep, time)
    """
    parts = _SPLIT.split(name)  # tokenize
    plasmid = parts[2] if len(parts) > 2 else ""  # plasmid token
    exp = parts[3] if len(parts) > 3 else ""  # experiment token
    rep = parts[4] if len(parts) > 4 else ""  # replicate token
    time_s = parts[5] if len(parts) > 5 else ""  # time token (string)
    try:
        time = float(time_s)  # try direct parse
    except Exception:
        m = re.search(r"(\d+(?:\.\d+)?)", time_s)  # fallback: first number
        time = float(m.group(1)) if m else np.nan  # NaN if none
    return plasmid, exp, rep, time  # return tuple

save_fit_plots

save_fit_plots(pl_df, base_t, base_R, base_Y, delay, tag_prefix)

Save diagnostic plots comparing experimental data and simulated curves.

Parameters

pl_df : pd.DataFrame KD data for a single plasmid (columns include 'time', 'fc.cherry', 'norm.bfp'). base_t : np.ndarray Base simulation time vector. base_R : np.ndarray Base simulated R(t). base_Y : np.ndarray Base simulated alpha*Y(t). delay : float Best delay (hours) to overlay as dashed line. tag_prefix : str Tag for filenames (e.g., "KRAB_dCas9").

Source code in scripts/step_3_simulate_repression.py
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
def save_fit_plots(pl_df: pd.DataFrame, base_t: np.ndarray, base_R: np.ndarray,
                   base_Y: np.ndarray, delay: float, tag_prefix: str):
    """
    Save diagnostic plots comparing experimental data and simulated curves.

    Parameters
    ----------
    pl_df : pd.DataFrame
        KD data for a single plasmid (columns include 'time', 'fc.cherry', 'norm.bfp').
    base_t : np.ndarray
        Base simulation time vector.
    base_R : np.ndarray
        Base simulated R(t).
    base_Y : np.ndarray
        Base simulated alpha*Y(t).
    delay : float
        Best delay (hours) to overlay as dashed line.
    tag_prefix : str
        Tag for filenames (e.g., "KRAB_dCas9").
    """
    # mCherry fit
    p1 = (
            ggplot(pl_df, aes("time", "fc.cherry"))
            + geom_point(size=0.8, alpha=0.4, color=COL_CH)
            + coord_cartesian(xlim=(0, 150), ylim=(0, 1.3))
            + labs(x="Time (hours)", y="mCherry")
            + THEME
            + geom_line(pd.DataFrame({"time": base_t, "Y": base_Y}), aes("time", "Y"))
            + geom_line(pd.DataFrame({"time": base_t + delay, "Y": base_Y}), aes("time", "Y"), linetype="dashed")
    )
    out1 = os.path.join(OUT_PATH, f"KD_ODE_mCherry_{tag_prefix}.pdf")  # filename
    p1.save(out1, width=PLOT_W, height=PLOT_H, units="in")  # save
    print(f"[plot] saved: {out1}")  # debug

    # tagBFP fit (R trajectory vs normalized BFP proxy)
    p2 = (
            ggplot(pl_df, aes("time", "norm.bfp"))
            + geom_point(size=0.8, alpha=0.4, color=COL_BFP)
            + coord_cartesian(xlim=(0, 150), ylim=(0, 1.3))
            + labs(x="Time (hours)", y="tagBFP")
            + THEME
            + geom_line(pd.DataFrame({"time": base_t, "R": base_R}), aes("time", "R"))
    )
    out2 = os.path.join(OUT_PATH, f"KD_ODE_tagBFP_{tag_prefix}.pdf")  # filename
    p2.save(out2, width=PLOT_W, height=PLOT_H, units="in")  # save
    print(f"[plot] saved: {out2}")  # debug

save_mae_plot

save_mae_plot(df_mae, out_pdf, y_max)

Save a plot of MAE vs delay and highlight the minimum.

Parameters

df_mae : pd.DataFrame Table with columns ['t', 'N', 'MAE']. out_pdf : str Output filename (saved under OUT_PATH). y_max : float Upper y-limit for MAE axis for readability.

Source code in scripts/step_3_simulate_repression.py
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
def save_mae_plot(df_mae: pd.DataFrame, out_pdf: str, y_max: float):
    """
    Save a plot of MAE vs delay and highlight the minimum.

    Parameters
    ----------
    df_mae : pd.DataFrame
        Table with columns ['t', 'N', 'MAE'].
    out_pdf : str
        Output filename (saved under OUT_PATH).
    y_max : float
        Upper y-limit for MAE axis for readability.
    """
    best_row = df_mae.loc[df_mae["MAE"].idxmin()]  # best row for highlight
    p = (  # build plot
            ggplot(df_mae, aes("t", "MAE"))
            + geom_point(size=0.1, alpha=0.4, color="black")
            + geom_point(data=best_row.to_frame().T, mapping=aes("t", "MAE"),
                         size=0.8, alpha=1.0, color=COL_CH)
            + geom_line(data=df_mae.assign(minMAE=df_mae["MAE"].min()),
                        mapping=aes("t", "minMAE"), linetype="dashed")
            + coord_cartesian(xlim=(0, 25), ylim=(0, y_max))
            + labs(y="MAE", x="Delay (hours)")
            + THEME
    )
    out_path = os.path.join(OUT_PATH, out_pdf)  # full path
    p.save(out_path, width=PLOT_W, height=PLOT_H, units="in")  # save PDF
    print(f"[plot] saved: {out_path}")  # debug

simulate

simulate(beta, t_up, K, n, alpha, t_start, t_end, dt=0.01)

Simulate the ODE system with LSODA and return (t, R(t), alpha*Y(t)).

Parameters

beta, t_up, K, n, alpha : float Model parameters. t_start, t_end : float Time window (hours). dt : float, optional Output sampling step, by default 0.01 h.

Returns

(np.ndarray, np.ndarray, np.ndarray) (time grid, R(t), alpha*Y(t)).

Raises

RuntimeError If the IVP solver fails.

Source code in scripts/step_3_simulate_repression.py
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
def simulate(beta: float, t_up: float, K: float, n: float, alpha: float,
             t_start: float, t_end: float, dt: float = 0.01):
    """
    Simulate the ODE system with LSODA and return (t, R(t), alpha*Y(t)).

    Parameters
    ----------
    beta, t_up, K, n, alpha : float
        Model parameters.
    t_start, t_end : float
        Time window (hours).
    dt : float, optional
        Output sampling step, by default 0.01 h.

    Returns
    -------
    (np.ndarray, np.ndarray, np.ndarray)
        (time grid, R(t), alpha*Y(t)).

    Raises
    ------
    RuntimeError
        If the IVP solver fails.
    """
    t_eval = np.arange(t_start, t_end + dt / 2, dt)  # time grid
    y0 = [0.0, 1.0 / alpha]  # initial state
    print(
        f"[sim] t∈[{t_start},{t_end}] dt={dt}, params: beta={beta:.4g}, t_up={t_up:.4g}, K={K:.4g}, n={n:.4g}, alpha={alpha:.4g}")  # debug
    sol = solve_ivp(  # solve IVP
        ode_rhs, (t_start, t_end), y0,
        t_eval=t_eval,
        method="LSODA",
        # Why max_step=0.5? To prevent LSODA from taking too large steps
        max_step=0.5,
        args=(beta, t_up, K, n, alpha),
        rtol=1e-7,
        atol=1e-9
    )
    if not sol.success:  # propagate failure
        raise RuntimeError(sol.message)
    R = sol.y[0]  # repressor
    Y = sol.y[1] * alpha  # scale reporter by alpha
    print(f"[sim] steps={len(sol.t)} R[0..2]={R[:3]} Y[0..2]={Y[:3]}")  # debug
    return sol.t, R, Y  # return time series

simulate_ode

simulate_ode(R0, Y0, pars, t0=0.0, tmax=150.0, step=0.05, delay=0.0)

Simulate the repression ODE system for given parameters and return results.

Parameters

R0 : float Initial R value (not used; always 0). Y0 : float Initial Y value (not used; always 1/alpha). pars : pd.Series Parameter row with fields (t_up, k/K, n, alpha). t0 : float, optional Start time (hours), by default 0.0. tmax : float, optional End time (hours), by default 150.0. step : float, optional Time step for output (hours), by default 0.05. delay : float, optional Time delay to add to output time (hours), by default 0.0.

Returns

pd.DataFrame DataFrame with columns ['time', 'R', 'Y'].

Source code in scripts/step_3_simulate_repression.py
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
def simulate_ode(R0: float, Y0: float, pars: pd.Series,
                 t0: float = 0.0, tmax: float = 150.0, step: float = 0.05,
                 delay: float = 0.0) -> pd.DataFrame:
    """
    Simulate the repression ODE system for given parameters and return results.

    Parameters
    ----------
    R0 : float
        Initial R value (not used; always 0).
    Y0 : float
        Initial Y value (not used; always 1/alpha).
    pars : pd.Series
        Parameter row with fields (t_up, k/K, n, alpha).
    t0 : float, optional
        Start time (hours), by default 0.0.
    tmax : float, optional
        End time (hours), by default 150.0.
    step : float, optional
        Time step for output (hours), by default 0.05.
    delay : float, optional
        Time delay to add to output time (hours), by default 0.0.

    Returns
    -------
    pd.DataFrame
        DataFrame with columns ['time', 'R', 'Y'].
    """

    # 1. Helper to safely get parameters regardless of casing
    def get_par(p, *keys):
        for k in keys:
            if k in p: return float(p[k])
        # If we reach here, the key is missing.
        # For 'K', we might have 'k'. For 'n', 'hill_n'.
        return None

    # 2. Extract keys with fallbacks
    t_up = get_par(pars, "t_up", "halftime", "tup")
    K = get_par(pars, "K", "k")
    n = get_par(pars, "n", "hill_n", "N")
    alpha = get_par(pars, "alpha", "alphamcherry", "a")

    # 3. Guard against missing or NaN parameters
    if any(x is None or math.isnan(x) for x in [t_up, K, n, alpha]):
        # This prevents the crash. We return an empty DataFrame or
        # a flat line if you prefer, but empty allows the loop to continue.
        # Check specific variable to debug:
        # print(f"[WARN] Skipping {pars.name if hasattr(pars, 'name') else 'row'}: t_up={t_up}, K={K}, n={n}, alpha={alpha}")
        return pd.DataFrame({"time": [], "R": [], "Y": []})

    # 4. Calculate beta and run simulation
    beta = math.log(2.0) / t_up

    # Call the existing internal 'simulate' function
    t_arr, R_arr, Y_arr = simulate(beta, t_up, K, n, alpha, t0, tmax, step)

    # 5. Return formatted result
    return pd.DataFrame({"time": t_arr + delay, "R": R_arr, "Y": Y_arr})