From 94633491b0fdbd199a73f471efca921073e4a059 Mon Sep 17 00:00:00 2001 From: wissotsky Date: Sun, 16 Nov 2025 00:50:45 +0200 Subject: [PATCH] feat: implement real-time linescan motion detection pipeline - UDP packet reception on port 5000 with 16MB buffer - Ring buffer implementation for real-time processing - EMA-based motion detection with configurable thresholds - Automatic AVIF image saving for detected motion segments - Gradient metadata generation for web visualization - Performance monitoring with latency warnings - Debug mode for development troubleshooting --- main.py | 216 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 216 insertions(+) create mode 100644 main.py diff --git a/main.py b/main.py new file mode 100644 index 0000000..9eed76b --- /dev/null +++ b/main.py @@ -0,0 +1,216 @@ +import numpy as np +import imageio.v3 as iio +from tqdm import tqdm +import concurrent.futures +import os +import time + +# --- Configuration --- +DATA_SOURCE = "udp://10.81.2.120:5000" +OUTPUT_DIR = "output" +TARGET_FPS = 750 +TARGET_FRAME_TIME_NS = int(1e9 / 1) + +RING_BUFFER_SIZE = 32768 # must be power of two +RING_BUFFER_MASK = RING_BUFFER_SIZE - 1 + +COLUMN_HEIGHT, COLUMN_WIDTH, CHANNELS = 2456, 1, 3 # Example dimensions, adjust as needed + +LIVE_DEBUG = False + + +os.makedirs(OUTPUT_DIR, exist_ok=True) + +write_executor = concurrent.futures.ThreadPoolExecutor(max_workers=16) +futures = [] + +def img_to_gradient(input_image): + string = "linear-gradient(in oklab" + for i in np.array_split(input_image[:,0,:],11): + r,g,b = np.mean(i,axis=0).astype(np.uint8) + string += f",rgb({r},{g},{b})" + + string += ")" + return string + +#img_to_gradient(input_image) + +def submit_write_job(ring_buffer, index, length, ring_index): + try: + # copy data so it doesnt run away + data_to_write = np.take(ring_buffer, range(ring_index-length, ring_index), axis=0, mode='wrap').copy() + presync_time = int(time.time()) + gradient_string = img_to_gradient(np.rot90(data_to_write)) + #append to metadata jsonl + with open(f"{OUTPUT_DIR}/metadata.jsonl", "a") as f: + f.write(f'{{"filename":"live-{presync_time}-{index-length}-{index}-{length}.avif","length":{length},"gradient":"{gradient_string}"}}\n') + + except Exception as e: + print(f"Error taking data from ring buffer: {e}") + return None + + def _write_task(data): + filename = f"{OUTPUT_DIR}/live-{presync_time}-{index-length}-{index}-{length}.avif" + iio.imwrite(filename, data) + + future = write_executor.submit(_write_task, np.rot90(data_to_write)) + return future + +#props = iio.improps(DATA_SOURCE) # overwrite manually if live video source +#props = (None, COLUMN_HEIGHT, COLUMN_WIDTH, CHANNELS, np.uint8) + +window_size = 12000 + +ema_mean = 1.2 +ema_var = 0.4 +baseline_alpha = 0.001 +variance_alpha = 0.01 +default_diff = 0.52 +frame_mean = 0.0 + +stride=1 +frameskip=1 + +postcutoff_variance_threshold = 2.5 +cutoff_variance_threshold = 3.5 +precutoff_variance_threshold = 0.07 + +is_recording = False +recording_length = 0 +patience_default = 600 +patience = patience_default +patience_length = 0 +recorded_images = 0 +frames_since_last_recording = 0 +index = 0 + +ring_buffer = np.zeros((RING_BUFFER_SIZE, COLUMN_HEIGHT, CHANNELS), dtype=np.uint8) +sum_ring_buffer = np.zeros((RING_BUFFER_SIZE), dtype=np.uint64) +ema_var_ring = np.zeros((RING_BUFFER_SIZE), dtype=np.float32) +sum_buffer = np.zeros_like(sum_ring_buffer[0:window_size]) +stride_indices = np.arange(0, COLUMN_HEIGHT, stride) + + +#for frame in tqdm(iio.imiter(DATA_SOURCE, plugin="pyav"), total=props.n_images): +#for frame in iio.imiter(DATA_SOURCE, plugin="pyav"): + +import socket + +"udp://10.81.2.120:5000" +#10.81.2.183 +sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) +sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) +sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 16777216) # 16MB buffer +sock.bind(("0.0.0.0", 5000)) + + + +while True: + data, addr = sock.recvfrom(65536) # buffer size is 65536 bytes + #print(f"Received {len(data)} bytes from {addr}") + frame = np.frombuffer(data, dtype=np.uint8).reshape((COLUMN_HEIGHT, COLUMN_WIDTH, CHANNELS)) + frame = frame.transpose(1, 0, 2)[:, :, ::-1] # Convert BGR to RGB + #print(frame.shape) + loop_start_time = time.perf_counter_ns() + + effective_window_size = min(index, window_size) + ring_index = index & RING_BUFFER_MASK + + tia = np.arange(ring_index - effective_window_size, ring_index) + time_indices = tia & RING_BUFFER_MASK + sum_buffer[:len(time_indices)] = sum_ring_buffer[time_indices[:]] + + # splayed out for perf debugging + s4 = frame[0:1,::stride,1] + frame_mean = np.divide(np.sum(s4),s4.size) + s2 = sum_buffer[:len(time_indices)] + s11 = np.sum(s2) + s1 = np.divide(s11,s2.size) if s2.size > 0 else 0 # Avoid division by zero + s0 = np.abs(frame_mean - s1) + s00 = np.mean(s0) + value = s00 if index > 0 else default_diff + + if ema_mean is None: + ema_mean, ema_var = value, 1.0 + + deviation = value - ema_mean + + ema_mean_temp = (1 - baseline_alpha) * ema_mean + baseline_alpha * value + ema_var_temp = (1 - variance_alpha) * ema_var + variance_alpha * (deviation ** 2) + + ring_buffer[ring_index:ring_index+1, :, :] = frame[0:1,:,:] + sum_ring_buffer[ring_index] = frame_mean + + if is_recording == False or abs(deviation) < 3 * np.sqrt(ema_var): + ema_var = ema_var_temp + ema_mean = ema_mean_temp + if (index > 1200 and is_recording == False and ema_var > cutoff_variance_threshold): + is_recording = True + + quiet_indices = np.argwhere(np.take(ema_var_ring, range(ring_index-RING_BUFFER_SIZE, ring_index), mode='wrap') < precutoff_variance_threshold) + + if quiet_indices.size > 0: + last_quiet_index = quiet_indices[-1].item() + recording_length = min(RING_BUFFER_SIZE - last_quiet_index + patience_default, frames_since_last_recording) + else: + recording_length = frames_since_last_recording + patience = patience_default + patience_length = 0 + + if is_recording: + if (ema_var < postcutoff_variance_threshold): + patience -= 1 + patience_length += 1 + if (ema_var >= postcutoff_variance_threshold): + recording_length += 1 + patience_length = 0 + patience = patience_default + if (patience == 0): + print(f"Dumping image starting at index {index-recording_length} ending at {index}, length {recording_length}") + future = submit_write_job(ring_buffer, index, recording_length, ring_index-patience_length) + if future: + futures.append(future) + + ema_var_ring[ring_index-patience_length] = 0.0 + recorded_images += 1 + frames_since_last_recording = patience_length + patience_length = 0 + is_recording = False + + ema_var_ring[ring_index] = ema_var + index += 1 + frames_since_last_recording += 1 + + loop_end_time = time.perf_counter_ns() + elapsed_ns = loop_end_time - loop_start_time + if elapsed_ns > TARGET_FRAME_TIME_NS: + print(f"Latency warning: processing time exceeded target by {elapsed_ns / TARGET_FRAME_TIME_NS} times") + + # visualize the variables using tqdm + if "stats_bar" not in globals(): + stats_bar = tqdm(total=0, leave=True) + + stats_bar.update(1) + stats_bar.set_description( + f"rec={is_recording} idx={index} value={value:.4f} evr_mean = {np.mean(ema_var_ring[len(time_indices)]):.4f} ema_mean={ema_mean:.4f},{ema_mean_temp:.4f} ema_var={ema_var:.5f},{ema_var_temp:.5f} len={recording_length} pat={patience} frame_mean={frame_mean:.2f}" + ) + + if LIVE_DEBUG and index % 1000 == 0: + debug_image = np.take(ring_buffer, range(ring_index - effective_window_size, ring_index), axis=0, mode='wrap') + debug_filename = f"{OUTPUT_DIR}/live-debug-{index}.webp" + iio.imwrite(debug_filename, debug_image) + print(f"Wrote live debug image to {debug_filename}") + +# handle final recording +if is_recording: + print(f"Dumping final image starting at index {index-recording_length} ending at {index}, length {recording_length}") + final_future = submit_write_job(ring_buffer, index, recording_length, ring_index-patience_length) + if final_future: + futures.append(final_future) + recorded_images += 1 + +# wait for image writes to complete +print("Processing complete. Waiting for all background writing tasks to finish...") +concurrent.futures.wait(futures) +write_executor.shutdown() # Cleanly release resources +print(f"All images written. Recorded {recorded_images} objects.") \ No newline at end of file