feat: implement real-time linescan motion detection pipeline
- UDP packet reception on port 5000 with 16MB buffer - Ring buffer implementation for real-time processing - EMA-based motion detection with configurable thresholds - Automatic AVIF image saving for detected motion segments - Gradient metadata generation for web visualization - Performance monitoring with latency warnings - Debug mode for development troubleshooting
This commit is contained in:
216
main.py
Normal file
216
main.py
Normal file
@@ -0,0 +1,216 @@
|
|||||||
|
import numpy as np
|
||||||
|
import imageio.v3 as iio
|
||||||
|
from tqdm import tqdm
|
||||||
|
import concurrent.futures
|
||||||
|
import os
|
||||||
|
import time
|
||||||
|
|
||||||
|
# --- Configuration ---
|
||||||
|
DATA_SOURCE = "udp://10.81.2.120:5000"
|
||||||
|
OUTPUT_DIR = "output"
|
||||||
|
TARGET_FPS = 750
|
||||||
|
TARGET_FRAME_TIME_NS = int(1e9 / 1)
|
||||||
|
|
||||||
|
RING_BUFFER_SIZE = 32768 # must be power of two
|
||||||
|
RING_BUFFER_MASK = RING_BUFFER_SIZE - 1
|
||||||
|
|
||||||
|
COLUMN_HEIGHT, COLUMN_WIDTH, CHANNELS = 2456, 1, 3 # Example dimensions, adjust as needed
|
||||||
|
|
||||||
|
LIVE_DEBUG = False
|
||||||
|
|
||||||
|
|
||||||
|
os.makedirs(OUTPUT_DIR, exist_ok=True)
|
||||||
|
|
||||||
|
write_executor = concurrent.futures.ThreadPoolExecutor(max_workers=16)
|
||||||
|
futures = []
|
||||||
|
|
||||||
|
def img_to_gradient(input_image):
|
||||||
|
string = "linear-gradient(in oklab"
|
||||||
|
for i in np.array_split(input_image[:,0,:],11):
|
||||||
|
r,g,b = np.mean(i,axis=0).astype(np.uint8)
|
||||||
|
string += f",rgb({r},{g},{b})"
|
||||||
|
|
||||||
|
string += ")"
|
||||||
|
return string
|
||||||
|
|
||||||
|
#img_to_gradient(input_image)
|
||||||
|
|
||||||
|
def submit_write_job(ring_buffer, index, length, ring_index):
|
||||||
|
try:
|
||||||
|
# copy data so it doesnt run away
|
||||||
|
data_to_write = np.take(ring_buffer, range(ring_index-length, ring_index), axis=0, mode='wrap').copy()
|
||||||
|
presync_time = int(time.time())
|
||||||
|
gradient_string = img_to_gradient(np.rot90(data_to_write))
|
||||||
|
#append to metadata jsonl
|
||||||
|
with open(f"{OUTPUT_DIR}/metadata.jsonl", "a") as f:
|
||||||
|
f.write(f'{{"filename":"live-{presync_time}-{index-length}-{index}-{length}.avif","length":{length},"gradient":"{gradient_string}"}}\n')
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error taking data from ring buffer: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _write_task(data):
|
||||||
|
filename = f"{OUTPUT_DIR}/live-{presync_time}-{index-length}-{index}-{length}.avif"
|
||||||
|
iio.imwrite(filename, data)
|
||||||
|
|
||||||
|
future = write_executor.submit(_write_task, np.rot90(data_to_write))
|
||||||
|
return future
|
||||||
|
|
||||||
|
#props = iio.improps(DATA_SOURCE) # overwrite manually if live video source
|
||||||
|
#props = (None, COLUMN_HEIGHT, COLUMN_WIDTH, CHANNELS, np.uint8)
|
||||||
|
|
||||||
|
window_size = 12000
|
||||||
|
|
||||||
|
ema_mean = 1.2
|
||||||
|
ema_var = 0.4
|
||||||
|
baseline_alpha = 0.001
|
||||||
|
variance_alpha = 0.01
|
||||||
|
default_diff = 0.52
|
||||||
|
frame_mean = 0.0
|
||||||
|
|
||||||
|
stride=1
|
||||||
|
frameskip=1
|
||||||
|
|
||||||
|
postcutoff_variance_threshold = 2.5
|
||||||
|
cutoff_variance_threshold = 3.5
|
||||||
|
precutoff_variance_threshold = 0.07
|
||||||
|
|
||||||
|
is_recording = False
|
||||||
|
recording_length = 0
|
||||||
|
patience_default = 600
|
||||||
|
patience = patience_default
|
||||||
|
patience_length = 0
|
||||||
|
recorded_images = 0
|
||||||
|
frames_since_last_recording = 0
|
||||||
|
index = 0
|
||||||
|
|
||||||
|
ring_buffer = np.zeros((RING_BUFFER_SIZE, COLUMN_HEIGHT, CHANNELS), dtype=np.uint8)
|
||||||
|
sum_ring_buffer = np.zeros((RING_BUFFER_SIZE), dtype=np.uint64)
|
||||||
|
ema_var_ring = np.zeros((RING_BUFFER_SIZE), dtype=np.float32)
|
||||||
|
sum_buffer = np.zeros_like(sum_ring_buffer[0:window_size])
|
||||||
|
stride_indices = np.arange(0, COLUMN_HEIGHT, stride)
|
||||||
|
|
||||||
|
|
||||||
|
#for frame in tqdm(iio.imiter(DATA_SOURCE, plugin="pyav"), total=props.n_images):
|
||||||
|
#for frame in iio.imiter(DATA_SOURCE, plugin="pyav"):
|
||||||
|
|
||||||
|
import socket
|
||||||
|
|
||||||
|
"udp://10.81.2.120:5000"
|
||||||
|
#10.81.2.183
|
||||||
|
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||||
|
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||||
|
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 16777216) # 16MB buffer
|
||||||
|
sock.bind(("0.0.0.0", 5000))
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
while True:
|
||||||
|
data, addr = sock.recvfrom(65536) # buffer size is 65536 bytes
|
||||||
|
#print(f"Received {len(data)} bytes from {addr}")
|
||||||
|
frame = np.frombuffer(data, dtype=np.uint8).reshape((COLUMN_HEIGHT, COLUMN_WIDTH, CHANNELS))
|
||||||
|
frame = frame.transpose(1, 0, 2)[:, :, ::-1] # Convert BGR to RGB
|
||||||
|
#print(frame.shape)
|
||||||
|
loop_start_time = time.perf_counter_ns()
|
||||||
|
|
||||||
|
effective_window_size = min(index, window_size)
|
||||||
|
ring_index = index & RING_BUFFER_MASK
|
||||||
|
|
||||||
|
tia = np.arange(ring_index - effective_window_size, ring_index)
|
||||||
|
time_indices = tia & RING_BUFFER_MASK
|
||||||
|
sum_buffer[:len(time_indices)] = sum_ring_buffer[time_indices[:]]
|
||||||
|
|
||||||
|
# splayed out for perf debugging
|
||||||
|
s4 = frame[0:1,::stride,1]
|
||||||
|
frame_mean = np.divide(np.sum(s4),s4.size)
|
||||||
|
s2 = sum_buffer[:len(time_indices)]
|
||||||
|
s11 = np.sum(s2)
|
||||||
|
s1 = np.divide(s11,s2.size) if s2.size > 0 else 0 # Avoid division by zero
|
||||||
|
s0 = np.abs(frame_mean - s1)
|
||||||
|
s00 = np.mean(s0)
|
||||||
|
value = s00 if index > 0 else default_diff
|
||||||
|
|
||||||
|
if ema_mean is None:
|
||||||
|
ema_mean, ema_var = value, 1.0
|
||||||
|
|
||||||
|
deviation = value - ema_mean
|
||||||
|
|
||||||
|
ema_mean_temp = (1 - baseline_alpha) * ema_mean + baseline_alpha * value
|
||||||
|
ema_var_temp = (1 - variance_alpha) * ema_var + variance_alpha * (deviation ** 2)
|
||||||
|
|
||||||
|
ring_buffer[ring_index:ring_index+1, :, :] = frame[0:1,:,:]
|
||||||
|
sum_ring_buffer[ring_index] = frame_mean
|
||||||
|
|
||||||
|
if is_recording == False or abs(deviation) < 3 * np.sqrt(ema_var):
|
||||||
|
ema_var = ema_var_temp
|
||||||
|
ema_mean = ema_mean_temp
|
||||||
|
if (index > 1200 and is_recording == False and ema_var > cutoff_variance_threshold):
|
||||||
|
is_recording = True
|
||||||
|
|
||||||
|
quiet_indices = np.argwhere(np.take(ema_var_ring, range(ring_index-RING_BUFFER_SIZE, ring_index), mode='wrap') < precutoff_variance_threshold)
|
||||||
|
|
||||||
|
if quiet_indices.size > 0:
|
||||||
|
last_quiet_index = quiet_indices[-1].item()
|
||||||
|
recording_length = min(RING_BUFFER_SIZE - last_quiet_index + patience_default, frames_since_last_recording)
|
||||||
|
else:
|
||||||
|
recording_length = frames_since_last_recording
|
||||||
|
patience = patience_default
|
||||||
|
patience_length = 0
|
||||||
|
|
||||||
|
if is_recording:
|
||||||
|
if (ema_var < postcutoff_variance_threshold):
|
||||||
|
patience -= 1
|
||||||
|
patience_length += 1
|
||||||
|
if (ema_var >= postcutoff_variance_threshold):
|
||||||
|
recording_length += 1
|
||||||
|
patience_length = 0
|
||||||
|
patience = patience_default
|
||||||
|
if (patience == 0):
|
||||||
|
print(f"Dumping image starting at index {index-recording_length} ending at {index}, length {recording_length}")
|
||||||
|
future = submit_write_job(ring_buffer, index, recording_length, ring_index-patience_length)
|
||||||
|
if future:
|
||||||
|
futures.append(future)
|
||||||
|
|
||||||
|
ema_var_ring[ring_index-patience_length] = 0.0
|
||||||
|
recorded_images += 1
|
||||||
|
frames_since_last_recording = patience_length
|
||||||
|
patience_length = 0
|
||||||
|
is_recording = False
|
||||||
|
|
||||||
|
ema_var_ring[ring_index] = ema_var
|
||||||
|
index += 1
|
||||||
|
frames_since_last_recording += 1
|
||||||
|
|
||||||
|
loop_end_time = time.perf_counter_ns()
|
||||||
|
elapsed_ns = loop_end_time - loop_start_time
|
||||||
|
if elapsed_ns > TARGET_FRAME_TIME_NS:
|
||||||
|
print(f"Latency warning: processing time exceeded target by {elapsed_ns / TARGET_FRAME_TIME_NS} times")
|
||||||
|
|
||||||
|
# visualize the variables using tqdm
|
||||||
|
if "stats_bar" not in globals():
|
||||||
|
stats_bar = tqdm(total=0, leave=True)
|
||||||
|
|
||||||
|
stats_bar.update(1)
|
||||||
|
stats_bar.set_description(
|
||||||
|
f"rec={is_recording} idx={index} value={value:.4f} evr_mean = {np.mean(ema_var_ring[len(time_indices)]):.4f} ema_mean={ema_mean:.4f},{ema_mean_temp:.4f} ema_var={ema_var:.5f},{ema_var_temp:.5f} len={recording_length} pat={patience} frame_mean={frame_mean:.2f}"
|
||||||
|
)
|
||||||
|
|
||||||
|
if LIVE_DEBUG and index % 1000 == 0:
|
||||||
|
debug_image = np.take(ring_buffer, range(ring_index - effective_window_size, ring_index), axis=0, mode='wrap')
|
||||||
|
debug_filename = f"{OUTPUT_DIR}/live-debug-{index}.webp"
|
||||||
|
iio.imwrite(debug_filename, debug_image)
|
||||||
|
print(f"Wrote live debug image to {debug_filename}")
|
||||||
|
|
||||||
|
# handle final recording
|
||||||
|
if is_recording:
|
||||||
|
print(f"Dumping final image starting at index {index-recording_length} ending at {index}, length {recording_length}")
|
||||||
|
final_future = submit_write_job(ring_buffer, index, recording_length, ring_index-patience_length)
|
||||||
|
if final_future:
|
||||||
|
futures.append(final_future)
|
||||||
|
recorded_images += 1
|
||||||
|
|
||||||
|
# wait for image writes to complete
|
||||||
|
print("Processing complete. Waiting for all background writing tasks to finish...")
|
||||||
|
concurrent.futures.wait(futures)
|
||||||
|
write_executor.shutdown() # Cleanly release resources
|
||||||
|
print(f"All images written. Recorded {recorded_images} objects.")
|
||||||
Reference in New Issue
Block a user