scripts.step_1b_fit_downregulation

Reverse (Rev) time-course analysis.

This module loads flow cytometry FCS files, applies boundary and singlet gates, computes per-file medians, subtracts NFC-derived background, constructs a Rev-only time-course dataset, performs per-plasmid min–max normalization on BFP, fits an exponential decay model to estimate half-times (t1/2), generates diagnostic plots, and writes the estimated parameters to CSV.

Outputs
  • plots/REV_SP430ABA_fitting.pdf
  • plots/REV_dCas9_fitting.pdf
  • plots/REV_KRAB-dCas9_fitting.pdf
  • plots/REV_HDAC4-dCas9_fitting.pdf
  • plots/REV_CasRx_fitting.pdf
  • parameters/half_times_downregulation.csv

add_minmax_norm

add_minmax_norm(df)

Add per-plasmid min–max normalization for downregulation:

mean.final = mean(BFP | time > 10) mean.init = mean(BFP | time == 0) norm.bfp = (BFP - mean.final) / (mean.init - mean.final)

Parameters

df : pd.DataFrame Rev dataset with columns [plasmid, time, BV421-A].

Returns

pd.DataFrame Input with added columns mean.final, mean.init, norm.bfp.

Source code in scripts/step_1b_fit_downregulation.py
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
def add_minmax_norm(df: pd.DataFrame) -> pd.DataFrame:
    """
    Add per-plasmid min–max normalization for downregulation:

      mean.final = mean(BFP | time > 10)
      mean.init  = mean(BFP | time == 0)
      norm.bfp   = (BFP - mean.final) / (mean.init - mean.final)

    Parameters
    ----------
    df : pd.DataFrame
        Rev dataset with columns [plasmid, time, BV421-A].

    Returns
    -------
    pd.DataFrame
        Input with added columns mean.final, mean.init, norm.bfp.
    """
    mean_final = (  # per-plasmid final mean
        df.query("time > 10").groupby("plasmid")[CH_BFP]
        .mean().rename("mean.final").reset_index()
    )
    mean_init = (  # per-plasmid init mean
        df.query("time == 0").groupby("plasmid")[CH_BFP]
        .mean().rename("mean.init").reset_index()
    )
    print(f"[norm] mean.final rows={len(mean_final)}, mean.init rows={len(mean_init)}")  # debug
    out = df.merge(mean_final, on="plasmid", how="left")  # attach final mean
    out = out.merge(mean_init, on="plasmid", how="left")  # attach init mean
    denom = (out["mean.init"] - out["mean.final"]).replace(0, np.nan)  # protect /0
    out["norm.bfp"] = (out[CH_BFP] - out["mean.final"]) / denom  # compute normalized BFP
    print("[norm] preview:\n" +  # print small preview
          out[["plasmid", "time", CH_BFP, "mean.init", "mean.final", "norm.bfp"]]
          .head().to_string(index=False))
    return out  # return normalized table

apply_boundary_gate

apply_boundary_gate(df)

Apply coarse boundary gate on FSC-A and SSC-A.

Parameters

df : pd.DataFrame Events table with FSC-A and SSC-A.

Returns

pd.DataFrame Subset passing the boundary gate.

Source code in scripts/step_1b_fit_downregulation.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
def apply_boundary_gate(df: pd.DataFrame) -> pd.DataFrame:
    """
    Apply coarse boundary gate on FSC-A and SSC-A.

    Parameters
    ----------
    df : pd.DataFrame
        Events table with FSC-A and SSC-A.

    Returns
    -------
    pd.DataFrame
        Subset passing the boundary gate.
    """
    m = (  # build mask within bounds
            (df[CH_FSC_A] >= BOUND_MIN[CH_FSC_A]) & (df[CH_FSC_A] <= BOUND_MAX[CH_FSC_A]) &
            (df[CH_SSC_A] >= BOUND_MIN[CH_SSC_A]) & (df[CH_SSC_A] <= BOUND_MAX[CH_SSC_A])
    )
    out = df.loc[m]  # subset rows by mask
    print(f"[gate] boundary: {len(out):,}/{len(df):,} kept")  # debug summary
    return out  # return gated events

apply_singlet_gate

apply_singlet_gate(df)

Keep singlets using FSC-H / FSC-A ratio.

Parameters

df : pd.DataFrame Events table with FSC-H and FSC-A.

Returns

pd.DataFrame Subset passing the singlet gate.

Source code in scripts/step_1b_fit_downregulation.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
def apply_singlet_gate(df: pd.DataFrame) -> pd.DataFrame:
    """
    Keep singlets using FSC-H / FSC-A ratio.

    Parameters
    ----------
    df : pd.DataFrame
        Events table with FSC-H and FSC-A.

    Returns
    -------
    pd.DataFrame
        Subset passing the singlet gate.
    """
    ratio = df[CH_FSC_H] / (df[CH_FSC_A].replace(0, np.nan))  # compute ratio, avoid /0
    m = (ratio >= SINGLET_RATIO_LOW) & (ratio <= SINGLET_RATIO_HIGH)  # mask within window
    out = df.loc[m]  # subset
    print(f"[gate] singlet : {len(out):,}/{len(df):,} kept")  # debug summary
    return out  # return singlets

compute_nfc_background

compute_nfc_background(nfc_dir)

Estimate background medians from NFC files (first 3 medians).

Parameters

nfc_dir : str Directory with NFC .fcs files.

Returns

(float, float) (mBFP_neg, mmCherry_neg) background medians.

Source code in scripts/step_1b_fit_downregulation.py
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
def compute_nfc_background(nfc_dir: str):
    """
    Estimate background medians from NFC files (first 3 medians).

    Parameters
    ----------
    nfc_dir : str
        Directory with NFC .fcs files.

    Returns
    -------
    (float, float)
        (mBFP_neg, mmCherry_neg) background medians.
    """
    df = load_flowset_medians(nfc_dir)  # load NFC medians
    print(f"[NFC] files={len(df)}; using first 3 for background")  # debug
    mBFP_neg = float(df.iloc[:3][CH_BFP].mean())  # mean BFP background
    mmCherry_neg = float(df.iloc[:3][CH_mCh].mean())  # mean mCherry background
    print(f"[NFC] mBFP_neg={mBFP_neg:.6g}, mmCherry_neg={mmCherry_neg:.6g}")  # debug
    return mBFP_neg, mmCherry_neg  # return tuple

exp_decay

exp_decay(t, t_half)
Exponential decay model

y(t) = exp(-t * ln(2) / t_half)

Parameters

t : array-like Time points. t_half : float Half-time parameter.

Returns

np.ndarray Model values y(t).

Source code in scripts/step_1b_fit_downregulation.py
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
def exp_decay(t, t_half):
    """
    Exponential decay model:
      y(t) = exp(-t * ln(2) / t_half)

    Parameters
    ----------
    t : array-like
        Time points.
    t_half : float
        Half-time parameter.

    Returns
    -------
    np.ndarray
        Model values y(t).
    """
    return np.exp(-t * (math.log(2.0) / t_half))  # vectorized decay

fit_half_time

fit_half_time(t, y, start=0.1)

Fit exponential decay to (t, y) to estimate half-time.

Parameters

t : array-like Time points. y : array-like Normalized response (ideally in [0, 1]). start : float, optional Initial guess for t_half, by default 0.1.

Returns

(float, float) (t_half estimate, standard error). SE is NaN if covariance missing.

Raises

RuntimeError If fewer than 3 finite points are available.

Source code in scripts/step_1b_fit_downregulation.py
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
def fit_half_time(t, y, start=0.1):
    """
    Fit exponential decay to (t, y) to estimate half-time.

    Parameters
    ----------
    t : array-like
        Time points.
    y : array-like
        Normalized response (ideally in [0, 1]).
    start : float, optional
        Initial guess for t_half, by default 0.1.

    Returns
    -------
    (float, float)
        (t_half estimate, standard error). SE is NaN if covariance missing.

    Raises
    ------
    RuntimeError
        If fewer than 3 finite points are available.
    """
    t = np.asarray(t, float)  # ensure float array
    y = np.asarray(y, float)  # ensure float array
    m = np.isfinite(t) & np.isfinite(y)  # finite mask
    t, y = t[m], y[m]  # filtered pairs
    print(f"[fit] points used: {len(t)}")  # debug
    if len(t) < 3:  # guard against underfit
        raise RuntimeError("Not enough points for decay fit.")
    popt, pcov = curve_fit(exp_decay, t, y, p0=[float(start)], maxfev=20000)  # NLS fit
    se = float(np.sqrt(np.diag(pcov))[0]) if pcov is not None else np.nan  # SE of t_half
    print(f"[fit] t_half={popt[0]:.6g}, SE={se:.3g}")  # debug
    return float(popt[0]), se  # return estimate & SE

load_flowset_medians

load_flowset_medians(folder)

Load all FCS files in a folder and compute gated medians.

Parameters

folder : str Directory with .fcs files.

Returns

pd.DataFrame One row per file with medians and '__filename'.

Source code in scripts/step_1b_fit_downregulation.py
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
def load_flowset_medians(folder: str) -> pd.DataFrame:
    """
    Load all FCS files in a folder and compute gated medians.

    Parameters
    ----------
    folder : str
        Directory with .fcs files.

    Returns
    -------
    pd.DataFrame
        One row per file with medians and '__filename'.
    """
    files = sorted(glob.glob(os.path.join(folder, "*.fcs")))  # discover files
    print(f"[scan] {folder}{len(files)} files")  # debug
    if not files:  # guard: no inputs
        raise FileNotFoundError(f"No FCS files found in: {folder}")
    out = pd.DataFrame([median_channels_for_file(f) for f in files])  # compute all medians
    print(f"[table] medians shape={out.shape}")  # debug
    return out  # return table

load_rev_timecourse

load_rev_timecourse(mBFP_neg, mmCherry_neg)

Build Rev-only dataset with background-subtracted medians.

Parameters

mBFP_neg : float BFP background (NFC mean of first 3 medians). mmCherry_neg : float mCherry background (NFC mean of first 3 medians).

Returns

pd.DataFrame Columns: [BV421-A, PE-A, plasmid, exp, rep, time]

Source code in scripts/step_1b_fit_downregulation.py
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
def load_rev_timecourse(mBFP_neg: float, mmCherry_neg: float) -> pd.DataFrame:
    """
    Build Rev-only dataset with background-subtracted medians.

    Parameters
    ----------
    mBFP_neg : float
        BFP background (NFC mean of first 3 medians).
    mmCherry_neg : float
        mCherry background (NFC mean of first 3 medians).

    Returns
    -------
    pd.DataFrame
        Columns: [BV421-A, PE-A, plasmid, exp, rep, time]
    """
    med = load_flowset_medians("fcs_files/time-course_data").copy()  # load medians
    print(f"[REV] medians (pre-subtraction) shape={med.shape}")  # debug
    med[CH_BFP] = med[CH_BFP] - mBFP_neg  # subtract BFP bg
    med[CH_mCh] = med[CH_mCh] - mmCherry_neg  # subtract mCh bg
    print(f"[REV] bg-sub preview:\n{med[[CH_BFP, CH_mCh]].head().to_string(index=False)}")  # preview
    parsed = med["__filename"].apply(parse_timecourse_name).tolist()  # parse tokens
    parsed_df = pd.DataFrame(parsed, columns=["plasmid", "exp", "rep", "time"])  # to frame
    df = pd.concat([med[[CH_BFP, CH_mCh]], parsed_df], axis=1)  # combine data+meta
    print(f"[REV] combined shape={df.shape}")  # debug
    df = df[df["exp"] == "Rev"].copy()  # keep Rev experiments
    print(f"[REV] after exp=='Rev': n={len(df)}")  # debug
    df["time"] = pd.to_numeric(df["time"], errors="coerce")  # numeric time
    before = len(df)  # count before dropna
    df = df.dropna(subset=["time"])  # drop invalid times
    print(f"[REV] dropped {before - len(df)} rows with non-numeric time")  # debug
    return df  # return Rev dataset

median_channels_for_file

median_channels_for_file(fpath)

Compute per-file channel medians after gating.

Parameters

fpath : str Path to .fcs file.

Returns

pd.Series Median values (numeric-only) with '__filename' field.

Source code in scripts/step_1b_fit_downregulation.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
def median_channels_for_file(fpath: str) -> pd.Series:
    """
    Compute per-file channel medians after gating.

    Parameters
    ----------
    fpath : str
        Path to .fcs file.

    Returns
    -------
    pd.Series
        Median values (numeric-only) with '__filename' field.
    """
    print(f"[read] {os.path.basename(fpath)}")  # which file
    sample = FCMeasurement(ID=os.path.basename(fpath),  # load FCS events
                           datafile=fpath).data
    print(f"[read] events={len(sample):,}, cols={len(sample.columns)}")  # size preview
    for ch in (CH_FSC_A, CH_SSC_A, CH_FSC_H, CH_BFP, CH_mCh):  # validate channels
        if ch not in sample.columns:
            raise ValueError(f"Channel '{ch}' not found in: {fpath}")
    gated = apply_boundary_gate(sample)  # boundary gate
    if len(gated) == 0:  # fallback if empty
        print("[gate] boundary empty → using raw sample")
        gated = sample
    singlets = apply_singlet_gate(gated)  # singlet gate
    if len(singlets) == 0:  # fallback if empty
        print("[gate] singlet empty  → using boundary-gated")
        singlets = gated
    s = singlets.median(numeric_only=True)  # per-channel medians
    print(f"[median] BFP~{s.get(CH_BFP, np.nan):.3g}, mCh~{s.get(CH_mCh, np.nan):.3g}")  # preview
    s["__filename"] = os.path.splitext(os.path.basename(fpath))[0]  # store basename
    return s  # return medians series

parse_timecourse_name

parse_timecourse_name(name)

Parse plasmid/exp/rep/time tokens from filename stem.

Parameters

name : str Basename without extension.

Returns

(str, str, str, float) (plasmid, exp, rep, time)

Source code in scripts/step_1b_fit_downregulation.py
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
def parse_timecourse_name(name: str):
    """
    Parse plasmid/exp/rep/time tokens from filename stem.

    Parameters
    ----------
    name : str
        Basename without extension.

    Returns
    -------
    (str, str, str, float)
        (plasmid, exp, rep, time)
    """
    parts = FILENAME_SPLIT_RE.split(name)  # token list
    plasmid = parts[2] if len(parts) > 2 else ""  # plasmid token
    exp = parts[3] if len(parts) > 3 else ""  # experiment token
    rep = parts[4] if len(parts) > 4 else ""  # replicate token
    time_s = parts[5] if len(parts) > 5 else ""  # time token (string)
    try:
        time = float(time_s)  # direct float cast
    except Exception:
        m = re.search(r"(\d+(?:\.\d+)?)", time_s)  # fallback: first number
        time = float(m.group(1)) if m else np.nan  # NaN if not found
    return plasmid, exp, rep, time  # return tuple

save_rev_plot

save_rev_plot(df, t_half, out_pdf)

Save a Rev plot (norm.bfp vs time) with fitted decay curve.

Parameters

df : pd.DataFrame Data for a single plasmid with 'time' and 'norm.bfp'. t_half : float Estimated half-time to draw the curve. out_pdf : str Output PDF filename (saved in OUT_PATH).

Source code in scripts/step_1b_fit_downregulation.py
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
def save_rev_plot(df: pd.DataFrame, t_half: float, out_pdf: str):
    """
    Save a Rev plot (norm.bfp vs time) with fitted decay curve.

    Parameters
    ----------
    df : pd.DataFrame
        Data for a single plasmid with 'time' and 'norm.bfp'.
    t_half : float
        Estimated half-time to draw the curve.
    out_pdf : str
        Output PDF filename (saved in OUT_PATH).
    """
    tmin, tmax = float(df["time"].min()), float(df["time"].max())  # plotting range
    curve = pd.DataFrame({"time": np.linspace(tmin, tmax, 200)})  # dense time grid
    curve["fit"] = exp_decay(curve["time"], t_half)  # model predictions
    p = (  # build plot
            ggplot(df, aes("time", "norm.bfp"))
            + geom_point(size=0.4, alpha=0.7, color=POINT_COLOR)
            + geom_line(data=curve, mapping=aes(x="time", y="fit"), color="black")
            + coord_cartesian(ylim=(-0.15, 1.4))
            + scale_y_continuous(breaks=[0, 0.25, 0.5, 0.75, 1.0])
            + labs(x="Time (hours)", y="tagBFP (% of final)")
            + THEME
    )
    out_path = os.path.join(OUT_PATH, out_pdf)  # full output path
    p.save(out_path, width=PLOT_W, height=PLOT_H, units="in")  # write PDF
    print(f"[plot] saved: {out_path}")  # debug