import numpy as np
import multiprocessing as mp
import traceback
from theatrics.utils.bleach_correction import fit_bleach_trend, depletion_correct
from theatrics.modules import SFCS_module
[docs]
def sfcs_process_main_curvefit(input_file, channel, cpu_n, out_q, cancel_event, bleach_corr,
chunk_lines=500, max_workers=None):
"""
Whole SFCS pipeline in a separate process.
Gaussian fitting uses curve_fit in parallel via multiprocessing.Pool + chunking.
out_q messages: ("progress", pct) / ("done", payload) / ("error", tb)
"""
try:
out_q.put(("progress", 0.0))
if cancel_event.is_set():
out_q.put(("cancelled", None))
return
frame_data, line_time_s, x, n_lines, n_pixels, root = SFCS_module.read_file(str(input_file), int(channel))
out_q.put(("progress", 2.0))
# ---- build chunk tasks ----
chunk_lines = int(chunk_lines)
if chunk_lines < 50:
chunk_lines = 50
tasks = []
for i0 in range(0, n_lines, chunk_lines):
block = frame_data[i0:i0 + chunk_lines] # view
tasks.append((i0, block, n_pixels))
n_tasks = len(tasks)
peaks = np.full(n_lines, 0.0, dtype=float)
sigmas = np.full(n_lines, 5.0, dtype=float)
# workers: don't blindly use 256; curve_fit overhead + OS overhead matters
cpu_n = int(cpu_n)
if max_workers is not None:
cpu_n = min(cpu_n, int(max_workers))
cpu_n = max(1, cpu_n)
# IMPORTANT on Windows/Linux: do NOT make this SFCS worker process daemonic
# (you already fixed that). This pool is created inside this worker process.
out_q.put(("progress", 5.0))
# ---- parallel chunk fitting ----
# chunksize=1 here because "tasks" are already chunky
with mp.Pool(processes=cpu_n) as pool:
completed_lines = 0
for chunk_result in pool.imap(SFCS_module.fit_gaussian_chunk, tasks, chunksize=1):
# chunk_result is list of (i, peak, sigma)
if cancel_event.is_set():
pool.terminate()
pool.join()
out_q.put(("cancelled", None))
return
for (i, peak, sigma) in chunk_result:
peaks[i] = peak
sigmas[i] = sigma
completed_lines += len(chunk_result)
pct = 5.0 + 55.0 * (completed_lines / n_lines) # 5%->60%
out_q.put(("progress", pct))
# ---- Remaining SFCS steps ----
if cancel_event.is_set():
out_q.put(("cancelled", None))
return
out_q.put(("progress", 65.0))
aligned_data = SFCS_module.alignment(frame_data, n_pixels, n_lines, root, peaks)
if cancel_event.is_set():
out_q.put(("cancelled", None))
return
out_q.put(("progress", 75.0))
intensity_traces = SFCS_module.calculate_intensity_trace(aligned_data, n_lines, n_pixels, sigmas, root)
if bleach_corr:
# build time axis: one point per line
t = np.arange(len(intensity_traces)) * line_time_s
# fit f(t) to the trace
f_t, f0, popt, model = fit_bleach_trend(t, intensity_traces, model="exp1") # or "exp2"
out_q.put(("progress", 85.0))
# apply Eq. 4
correct_intensity_traces = depletion_correct(intensity_traces, f_t, f0)
if cancel_event.is_set():
out_q.put(("cancelled", None))
return
out_q.put(("progress", 90.0))
G, G_std = SFCS_module.run_autocorrelation(correct_intensity_traces, line_time_s, root)
out_q.put(("progress", 100.0))
out_q.put(("done", dict(
frame_data=frame_data,
aligned_data=aligned_data,
intensity_traces=intensity_traces,
correct_intensity_traces=correct_intensity_traces,
G=G,
G_std=G_std
)))
else:
out_q.put(("progress", 90.0))
if cancel_event.is_set():
out_q.put(("cancelled", None))
return
G, G_std = SFCS_module.run_autocorrelation(intensity_traces, line_time_s, root)
out_q.put(("progress", 100.0))
out_q.put(("done", dict(
frame_data=frame_data,
aligned_data=aligned_data,
intensity_traces=intensity_traces,
G=G,
G_std=G_std
)))
except Exception:
out_q.put(("error", traceback.format_exc()))