From 11b279cb0864a14f4c9ddf230378c8ec5a5cdf33 Mon Sep 17 00:00:00 2001 From: yair Date: Sat, 22 Nov 2025 16:59:26 +0200 Subject: [PATCH] scripts: rewrite append_signals.py for linescan scrolling video Complete rewrite to properly handle linescan image sequences: - Stitches linescan images horizontally into wide panorama - Creates scrolling video that pans left-to-right - Configurable scroll speed based on capture rate (750 lines/sec) - Output saved one folder up from image source - Uses Pillow for image stitching, ffmpeg for video creation Features: - --scroll-speed: multiplier for playback speed (1.0 = real-time) - --lines-per-second: linescan capture rate (default: 750) - --max-frames: limit frames for testing - --fps: output video framerate (default: 30) - --width: viewport width (default: 1920) - Automatic cleanup of temporary stitched image Example usage: # Real-time playback uv run scripts\append_signals.py results\20251122\bumpy-filter # 2x speed uv run scripts\append_signals.py results\20251122\bumpy-filter --scroll-speed 2.0 # Test with 10 frames uv run scripts\append_signals.py results\20251122\bumpy-filter --max-frames 10 --- scripts/append_signals.py | 282 ++++++++++++++++++++++++-------------- 1 file changed, 176 insertions(+), 106 deletions(-) diff --git a/scripts/append_signals.py b/scripts/append_signals.py index d05767a..9627d68 100644 --- a/scripts/append_signals.py +++ b/scripts/append_signals.py @@ -1,12 +1,14 @@ #!/usr/bin/env -S uv run # /// script # requires-python = ">=3.10" -# dependencies = [] +# dependencies = [ +# "pillow", +# ] # /// """ -Concatenate a sequence of frames into a video using ffmpeg. -Scales to 1920 width and allows control of video speed via fps. +Stitch linescan frame sequences horizontally and create scrolling video. +Linescan images are concatenated side-by-side and played as a scrolling video. """ import argparse @@ -15,6 +17,7 @@ import sys import time from pathlib import Path from typing import List +from PIL import Image def find_frames(directory: Path, pattern: str = "*.jpeg") -> List[Path]: @@ -27,83 +30,105 @@ def find_frames(directory: Path, pattern: str = "*.jpeg") -> List[Path]: return frames -def create_video_ffmpeg( - input_dir: Path, +def stitch_images_horizontally( + frames: List[Path], output_file: Path, - fps: int = 30, - width: int = 1920, - crf: int = 18, - pattern: str = "*.jpeg", max_frames: int = None -) -> None: +) -> tuple[int, int]: """ - Create video from image sequence using ffmpeg. - - Args: - input_dir: Directory containing the frames - output_file: Output video file path - fps: Frames per second (controls playback speed) - width: Output video width in pixels (height auto-scales) - crf: Constant Rate Factor for quality (lower = better, 18-23 recommended) - pattern: File pattern to match frame files + Stitch images horizontally into a single wide image. + Returns (width, height) of stitched image. """ - frames = find_frames(input_dir, pattern) - - if not frames: - print(f"Error: No frames found in {input_dir} matching pattern {pattern}") - sys.exit(1) - - total_frames = len(frames) - - # Limit frames if max_frames is specified if max_frames is not None and max_frames > 0: frames = frames[:max_frames] - print(f"Found {total_frames} frames in {input_dir}, processing first {len(frames)} frames") - else: - print(f"Found {len(frames)} frames in {input_dir}") - print(f"First frame: {frames[0].name}") - print(f"Last frame: {frames[-1].name}") - # Check if frames are sequential (no gaps) - first_frame = frames[0] - is_sequential = True + print(f"Loading {len(frames)} images for stitching...") - # Extract frame numbers and check for gaps - name_parts = first_frame.stem.rsplit('_', 1) - if len(name_parts) == 2 and name_parts[1].isdigit(): - frame_numbers = [] - for frame in frames: - parts = frame.stem.rsplit('_', 1) - if len(parts) == 2 and parts[1].isdigit(): - frame_numbers.append(int(parts[1])) - else: - is_sequential = False - break - - # Check if there are gaps in the sequence - if is_sequential and frame_numbers: - expected_count = frame_numbers[-1] - frame_numbers[0] + 1 - if len(frame_numbers) != expected_count: - is_sequential = False - print(f"Warning: Detected gaps in frame sequence (have {len(frame_numbers)} frames, expected {expected_count})") - else: - is_sequential = False + # Load first image to get dimensions + first_img = Image.open(frames[0]) + img_width, img_height = first_img.size - # Use concat method for reliability (works with gaps, non-sequential naming, etc.) - # Create concat file - concat_file = input_dir / "concat_list.txt" - with open(concat_file, 'w') as f: - for frame in frames: - # Use absolute paths to avoid issues - f.write(f"file '{frame.absolute()}'\n") + # Calculate total width + total_width = img_width * len(frames) + + print(f"Creating stitched image: {total_width}x{img_height} pixels") + print(f" Individual frame size: {img_width}x{img_height}") + print(f" Total frames: {len(frames)}") + + # Create large canvas + stitched = Image.new('RGB', (total_width, img_height)) + + # Paste images + x_offset = 0 + for i, frame_path in enumerate(frames): + if i % 50 == 0: + print(f" Stitching frame {i+1}/{len(frames)}...") + img = Image.open(frame_path) + stitched.paste(img, (x_offset, 0)) + x_offset += img_width + + print(f"Saving stitched image to {output_file}...") + stitched.save(output_file, quality=95) + + return total_width, img_height + + +def create_scrolling_video( + stitched_image: Path, + output_file: Path, + total_width: int, + height: int, + output_width: int = 1920, + scroll_speed: float = 750.0, + fps: int = 30, + crf: int = 18, + lines_per_second: float = 750.0 +) -> None: + """ + Create scrolling video from stitched image. + + Args: + stitched_image: Path to stitched horizontal image + output_file: Output video file + total_width: Total width of stitched image + height: Height of image + output_width: Width of output video viewport + scroll_speed: Scroll speed multiplier (1.0 = real-time @ 750 lines/sec) + fps: Output video framerate + crf: Quality (lower = better) + lines_per_second: Linescan capture rate (pixels per second) + """ + # Calculate scroll parameters + # At scroll_speed=1.0, we scroll at the actual capture rate + pixels_per_second = lines_per_second * scroll_speed + pixels_per_frame = pixels_per_second / fps + + # Calculate video duration + scrollable_width = total_width - output_width + duration = scrollable_width / pixels_per_second + total_frames = int(duration * fps) + + print(f"\nScrolling video parameters:") + print(f" Output viewport: {output_width}x{height}") + print(f" Scroll speed: {scroll_speed}x real-time ({pixels_per_second:.1f} pixels/sec)") + print(f" Pixels per frame: {pixels_per_frame:.2f}") + print(f" Video duration: {duration:.2f} seconds") + print(f" Total frames: {total_frames}") + print(f" Output FPS: {fps}") + + # FFmpeg crop filter with horizontal scrolling + # crop=w:h:x:y where x moves from 0 to (total_width - output_width) + # The 't' variable represents time in seconds + crop_expr = f"crop={output_width}:{height}:min({pixels_per_second}*t\\,{total_width-output_width}):0" cmd = [ "ffmpeg", "-y", - "-f", "concat", - "-safe", "0", - "-i", str(concat_file), - "-vf", f"scale={width}:-2,fps={fps}", + "-loop", "1", + "-i", str(stitched_image), + "-vf", crop_expr, + "-t", str(duration), + "-r", str(fps), "-c:v", "libx264", "-crf", str(crf), "-pix_fmt", "yuv420p", @@ -111,20 +136,17 @@ def create_video_ffmpeg( str(output_file) ] - print(f"\nCreating video with:") - print(f" FPS: {fps}") - print(f" Width: {width}px") - print(f" CRF (quality): {crf}") - print(f" Output: {output_file}") - print(f"\nRunning: {' '.join(cmd)}\n") + print(f"\nRunning ffmpeg...") + print(f"Command: {' '.join(cmd)}\n") try: - result = subprocess.run(cmd, check=True) - print(f"\n✓ Video created successfully: {output_file}") + subprocess.run(cmd, check=True) + print(f"\n✓ Scrolling video created successfully: {output_file}") # Show file size size_mb = output_file.stat().st_size / (1024 * 1024) print(f" File size: {size_mb:.2f} MB") + print(f" Duration: {duration:.2f} seconds") except subprocess.CalledProcessError as e: print(f"Error: ffmpeg command failed with return code {e.returncode}") @@ -136,54 +158,68 @@ def create_video_ffmpeg( def main(): parser = argparse.ArgumentParser( - description="Concatenate frame sequences into a video using ffmpeg", + description="Stitch linescan images horizontally and create scrolling video", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: - # Basic usage with default settings (30 fps, 1920 width) - python append_signals.py C:\\dev\\gst-plugins-vision\\results\\20251122\\bumpy-filter + # Real-time playback (1.0x speed, 750 lines/second) + python append_signals.py results/20251122/bumpy-filter - # Slow motion (10 fps) - python append_signals.py C:\\dev\\gst-plugins-vision\\results\\20251122\\bumpy-filter --fps 10 + # Slow motion (0.5x speed) + python append_signals.py results/20251122/bumpy-filter --scroll-speed 0.5 - # Fast playback (60 fps) with custom output name - python append_signals.py results/20251122/bumpy-filter --fps 60 -o timelapse.mp4 + # Fast playback (2x speed) + python append_signals.py results/20251122/bumpy-filter --scroll-speed 2.0 - # Higher quality (lower CRF) - python append_signals.py results/20251122/bumpy-filter --crf 15 - - # Custom width - python append_signals.py results/20251122/bumpy-filter --width 3840 - - # Process only first 100 frames for testing + # Process only first 100 frames python append_signals.py results/20251122/bumpy-filter --max-frames 100 + + # Higher FPS for smoother scrolling + python append_signals.py results/20251122/bumpy-filter --fps 60 + + # Custom output width (4K) + python append_signals.py results/20251122/bumpy-filter --width 3840 """ ) parser.add_argument( "input_dir", type=Path, - help="Directory containing the frame sequence" + help="Directory containing the linescan frame sequence" ) parser.add_argument( "-o", "--output", type=Path, - help="Output video file (default: output.mp4 in input directory)" + help="Output video file (default: {folder_name}_{timestamp}.mp4 in input dir)" + ) + + parser.add_argument( + "--scroll-speed", + type=float, + default=1.0, + help="Scroll speed multiplier (1.0 = real-time at 750 lines/sec, default: 1.0)" + ) + + parser.add_argument( + "--lines-per-second", + type=float, + default=750.0, + help="Linescan capture rate in lines/pixels per second (default: 750)" ) parser.add_argument( "--fps", type=int, default=30, - help="Output video framerate/playback speed (default: 30)" + help="Output video framerate (default: 30)" ) parser.add_argument( "--width", type=int, default=1920, - help="Output video width in pixels (default: 1920)" + help="Output video viewport width in pixels (default: 1920)" ) parser.add_argument( @@ -218,13 +254,26 @@ Examples: print(f"Error: Input path is not a directory: {args.input_dir}") sys.exit(1) - # Set output file + # Find frames + frames = find_frames(args.input_dir, args.pattern) + + if not frames: + print(f"Error: No frames found in {args.input_dir} matching pattern {args.pattern}") + sys.exit(1) + + total_frames = len(frames) + if args.max_frames is not None and args.max_frames > 0: + frames = frames[:args.max_frames] + print(f"Found {total_frames} frames, processing first {len(frames)} frames") + else: + print(f"Found {len(frames)} frames in {args.input_dir}") + + # Set output file (one folder up from input directory) if args.output is None: - # Generate filename: input_folder_name_unixtime.mp4 unix_time = int(time.time()) folder_name = args.input_dir.name - output_filename = f"{folder_name}_{unix_time}.mp4" - output_file = args.input_dir / output_filename + output_filename = f"{folder_name}_scroll_{unix_time}.mp4" + output_file = args.input_dir.parent / output_filename else: output_file = args.output @@ -233,15 +282,36 @@ Examples: print("Error: CRF must be between 0 and 51") sys.exit(1) - create_video_ffmpeg( - input_dir=args.input_dir, - output_file=output_file, - fps=args.fps, - width=args.width, - crf=args.crf, - pattern=args.pattern, - max_frames=args.max_frames - ) + # Create temporary stitched image + stitched_filename = f"stitched_temp_{int(time.time())}.jpg" + stitched_path = args.input_dir / stitched_filename + + try: + # Stitch images + total_width, img_height = stitch_images_horizontally( + frames=frames, + output_file=stitched_path, + max_frames=args.max_frames + ) + + # Create scrolling video + create_scrolling_video( + stitched_image=stitched_path, + output_file=output_file, + total_width=total_width, + height=img_height, + output_width=args.width, + scroll_speed=args.scroll_speed, + fps=args.fps, + crf=args.crf, + lines_per_second=args.lines_per_second + ) + + finally: + # Clean up temporary stitched image + if stitched_path.exists(): + print(f"\nCleaning up temporary file: {stitched_filename}") + stitched_path.unlink() if __name__ == "__main__":