diff --git a/scripts/append_signals.py b/scripts/append_signals.py index 35f43e0..45ac64a 100644 --- a/scripts/append_signals.py +++ b/scripts/append_signals.py @@ -1,17 +1,15 @@ #!/usr/bin/env -S uv run # /// script # requires-python = ">=3.10" -# dependencies = [ -# "pillow", -# ] +# dependencies = [] # /// """ Create scrolling panorama videos from linescan image sequences. -This script stitches linescan images horizontally into a wide panorama, -then creates a scrolling video that pans left-to-right across the stitched image. -The scroll speed is calibrated to the linescan capture rate (750 lines/second by default). +This script uses ffmpeg's streaming filters to efficiently process linescan images +without loading the entire stitched panorama into memory. Images are horizontally +concatenated and scrolled left-to-right on-the-fly. """ import argparse @@ -20,7 +18,6 @@ import sys import time from pathlib import Path from typing import List -from PIL import Image def find_frames(directory: Path, pattern: str = "*.jpeg") -> List[Path]: @@ -33,135 +30,119 @@ def find_frames(directory: Path, pattern: str = "*.jpeg") -> List[Path]: return frames -def stitch_images_horizontally( +def create_scrolling_video_efficient( frames: List[Path], output_file: Path, - max_frames: int = None -) -> tuple[int, int]: - """ - Stitch images horizontally into a single wide image. - Returns (width, height) of stitched image. - """ - if max_frames is not None and max_frames > 0: - frames = frames[:max_frames] - - print(f"Loading {len(frames)} images for stitching...") - - # Load first image to get dimensions - first_img = Image.open(frames[0]) - img_width, img_height = first_img.size - - # Calculate total width - total_width = img_width * len(frames) - - print(f"Creating stitched image: {total_width}x{img_height} pixels") - print(f" Individual frame size: {img_width}x{img_height}") - print(f" Total frames: {len(frames)}") - - # Create large canvas - stitched = Image.new('RGB', (total_width, img_height)) - - # Paste images - x_offset = 0 - for i, frame_path in enumerate(frames): - if i % 50 == 0: - print(f" Stitching frame {i+1}/{len(frames)}...") - img = Image.open(frame_path) - stitched.paste(img, (x_offset, 0)) - x_offset += img_width - - print(f"Saving stitched image to {output_file}...") - stitched.save(output_file, quality=95) - - return total_width, img_height - - -def create_scrolling_video( - stitched_image: Path, - output_file: Path, - total_width: int, - height: int, output_width: int = 1920, - scroll_speed: float = 750.0, + scroll_speed: float = 1.0, fps: int = 30, crf: int = 18, lines_per_second: float = 750.0 ) -> None: """ - Create scrolling video from stitched image. + Create scrolling video from image sequence using ffmpeg streaming. + Memory-efficient - doesn't create intermediate stitched image. Args: - stitched_image: Path to stitched horizontal image + frames: List of image file paths output_file: Output video file - total_width: Total width of stitched image - height: Height of image output_width: Width of output video viewport scroll_speed: Scroll speed multiplier (1.0 = real-time @ 750 lines/sec) fps: Output video framerate crf: Quality (lower = better) lines_per_second: Linescan capture rate (pixels per second) """ - # Calculate scroll parameters - # At scroll_speed=1.0, we scroll at the actual capture rate + num_frames = len(frames) + + # Get dimensions from first frame + probe_cmd = [ + "ffprobe", + "-v", "error", + "-select_streams", "v:0", + "-show_entries", "stream=width,height", + "-of", "csv=s=x:p=0", + str(frames[0]) + ] + + try: + result = subprocess.run(probe_cmd, capture_output=True, text=True, check=True) + img_width, img_height = map(int, result.stdout.strip().split('x')) + except Exception as e: + print(f"Error probing image dimensions: {e}") + sys.exit(1) + + # Calculate total width and video parameters + total_width = img_width * num_frames pixels_per_second = lines_per_second * scroll_speed pixels_per_frame = pixels_per_second / fps - - # Calculate video duration scrollable_width = total_width - output_width duration = scrollable_width / pixels_per_second total_frames = int(duration * fps) - print(f"\nScrolling video parameters:") - print(f" Output viewport: {output_width}x{height}") + print(f"\nVideo parameters:") + print(f" Input: {num_frames} images of {img_width}x{img_height}") + print(f" Total panorama width: {total_width} pixels") + print(f" Output viewport: {output_width}x{img_height}") print(f" Scroll speed: {scroll_speed}x real-time ({pixels_per_second:.1f} pixels/sec)") print(f" Pixels per frame: {pixels_per_frame:.2f}") print(f" Video duration: {duration:.2f} seconds") print(f" Total frames: {total_frames}") print(f" Output FPS: {fps}") - # FFmpeg crop filter with horizontal scrolling - # crop=w:h:x:y where x moves from 0 to (total_width - output_width) - # The 't' variable represents time in seconds - crop_expr = f"crop={output_width}:{height}:min({pixels_per_second}*t\\,{total_width-output_width}):0" - - cmd = [ - "ffmpeg", - "-y", - "-loop", "1", - "-i", str(stitched_image), - "-vf", crop_expr, - "-t", str(duration), - "-r", str(fps), - "-c:v", "libx264", - "-crf", str(crf), - "-pix_fmt", "yuv420p", - "-movflags", "+faststart", - str(output_file) - ] - - print(f"\nRunning ffmpeg...") - print(f"Command: {' '.join(cmd)}\n") - + # Create concat file for ffmpeg + concat_file = frames[0].parent / f"concat_temp_{int(time.time())}.txt" try: - subprocess.run(cmd, check=True) - print(f"\n✓ Scrolling video created successfully: {output_file}") + with open(concat_file, 'w') as f: + for frame in frames: + f.write(f"file '{frame.absolute()}'\n") - # Show file size - size_mb = output_file.stat().st_size / (1024 * 1024) - print(f" File size: {size_mb:.2f} MB") - print(f" Duration: {duration:.2f} seconds") + # Build scrolling crop expression: x position moves based on time + crop_expr = f"crop={output_width}:{img_height}:min({pixels_per_second}*t\\,{scrollable_width}):0" - except subprocess.CalledProcessError as e: - print(f"Error: ffmpeg command failed with return code {e.returncode}") - sys.exit(1) - except FileNotFoundError: - print("Error: ffmpeg not found. Please install ffmpeg and ensure it's in your PATH.") - sys.exit(1) + # Filter chain: tile creates horizontal panorama, loop repeats it, crop scrolls + filter_complex = f"tile=layout={num_frames}x1[pano];[pano]loop=loop=-1:size=1:start=0,{crop_expr}" + + cmd = [ + "ffmpeg", + "-y", + "-f", "concat", + "-safe", "0", + "-i", str(concat_file), + "-filter_complex", filter_complex, + "-t", str(duration), + "-r", str(fps), + "-c:v", "libx264", + "-crf", str(crf), + "-pix_fmt", "yuv420p", + "-movflags", "+faststart", + str(output_file) + ] + + print(f"\nRunning ffmpeg...") + print(f"Command: {' '.join(cmd)}\n") + + try: + subprocess.run(cmd, check=True) + print(f"\n✓ Scrolling video created successfully: {output_file}") + + # Show file size + size_mb = output_file.stat().st_size / (1024 * 1024) + print(f" File size: {size_mb:.2f} MB") + print(f" Duration: {duration:.2f} seconds") + + except subprocess.CalledProcessError as e: + print(f"Error: ffmpeg command failed with return code {e.returncode}") + sys.exit(1) + + finally: + # Clean up concat file + if concat_file.exists(): + concat_file.unlink() def main(): parser = argparse.ArgumentParser( - description="Stitch linescan images horizontally and create scrolling video", + description="Create scrolling panorama videos from linescan images (memory-efficient)", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: @@ -182,6 +163,9 @@ Examples: # Custom output width (4K) python append_signals.py results/20251122/bumpy-filter --width 3840 + +Note: This version uses ffmpeg's tile filter for memory-efficient processing. +No intermediate stitched image is created - everything is streamed. """ ) @@ -194,7 +178,7 @@ Examples: parser.add_argument( "-o", "--output", type=Path, - help="Output video file (default: {folder_name}_{timestamp}.mp4 in input dir)" + help="Output video file (default: {folder_name}_scroll_{timestamp}.mp4)" ) parser.add_argument( @@ -285,36 +269,16 @@ Examples: print("Error: CRF must be between 0 and 51") sys.exit(1) - # Create temporary stitched image - stitched_filename = f"stitched_temp_{int(time.time())}.jpg" - stitched_path = args.input_dir / stitched_filename - - try: - # Stitch images - total_width, img_height = stitch_images_horizontally( - frames=frames, - output_file=stitched_path, - max_frames=args.max_frames - ) - - # Create scrolling video - create_scrolling_video( - stitched_image=stitched_path, - output_file=output_file, - total_width=total_width, - height=img_height, - output_width=args.width, - scroll_speed=args.scroll_speed, - fps=args.fps, - crf=args.crf, - lines_per_second=args.lines_per_second - ) - - finally: - # Clean up temporary stitched image - if stitched_path.exists(): - print(f"\nCleaning up temporary file: {stitched_filename}") - stitched_path.unlink() + # Create scrolling video using efficient streaming method + create_scrolling_video_efficient( + frames=frames, + output_file=output_file, + output_width=args.width, + scroll_speed=args.scroll_speed, + fps=args.fps, + crf=args.crf, + lines_per_second=args.lines_per_second + ) if __name__ == "__main__":