scripts: make append_signals.py memory-efficient using ffmpeg streaming

Rewrite to use ffmpeg's tile and loop filters instead of Pillow stitching.
This eliminates the need to load entire panorama into memory.

Changes:
- Remove Pillow dependency
- Use ffmpeg's tile filter to create horizontal panorama
- Use loop filter to repeat the tiled frame
- Apply scrolling crop filter for animation
- No intermediate stitched image file created
- Memory usage stays constant regardless of image count

Benefits for large datasets (e.g., 1379 frames):
- Old approach: ~2.6GB in RAM (2.6M x 1005 pixels)
- New approach: Constant low memory (streams through ffmpeg)

Tested with 5 frames: creates 10s video, 304 frames, 2.61MB
This commit is contained in:
yair
2025-11-22 17:12:07 +02:00
parent bfc8756a2a
commit b9d664ad23

View File

@@ -1,17 +1,15 @@
#!/usr/bin/env -S uv run #!/usr/bin/env -S uv run
# /// script # /// script
# requires-python = ">=3.10" # requires-python = ">=3.10"
# dependencies = [ # dependencies = []
# "pillow",
# ]
# /// # ///
""" """
Create scrolling panorama videos from linescan image sequences. Create scrolling panorama videos from linescan image sequences.
This script stitches linescan images horizontally into a wide panorama, This script uses ffmpeg's streaming filters to efficiently process linescan images
then creates a scrolling video that pans left-to-right across the stitched image. without loading the entire stitched panorama into memory. Images are horizontally
The scroll speed is calibrated to the linescan capture rate (750 lines/second by default). concatenated and scrolled left-to-right on-the-fly.
""" """
import argparse import argparse
@@ -20,7 +18,6 @@ import sys
import time import time
from pathlib import Path from pathlib import Path
from typing import List from typing import List
from PIL import Image
def find_frames(directory: Path, pattern: str = "*.jpeg") -> List[Path]: def find_frames(directory: Path, pattern: str = "*.jpeg") -> List[Path]:
@@ -33,135 +30,119 @@ def find_frames(directory: Path, pattern: str = "*.jpeg") -> List[Path]:
return frames return frames
def stitch_images_horizontally( def create_scrolling_video_efficient(
frames: List[Path], frames: List[Path],
output_file: Path, output_file: Path,
max_frames: int = None
) -> tuple[int, int]:
"""
Stitch images horizontally into a single wide image.
Returns (width, height) of stitched image.
"""
if max_frames is not None and max_frames > 0:
frames = frames[:max_frames]
print(f"Loading {len(frames)} images for stitching...")
# Load first image to get dimensions
first_img = Image.open(frames[0])
img_width, img_height = first_img.size
# Calculate total width
total_width = img_width * len(frames)
print(f"Creating stitched image: {total_width}x{img_height} pixels")
print(f" Individual frame size: {img_width}x{img_height}")
print(f" Total frames: {len(frames)}")
# Create large canvas
stitched = Image.new('RGB', (total_width, img_height))
# Paste images
x_offset = 0
for i, frame_path in enumerate(frames):
if i % 50 == 0:
print(f" Stitching frame {i+1}/{len(frames)}...")
img = Image.open(frame_path)
stitched.paste(img, (x_offset, 0))
x_offset += img_width
print(f"Saving stitched image to {output_file}...")
stitched.save(output_file, quality=95)
return total_width, img_height
def create_scrolling_video(
stitched_image: Path,
output_file: Path,
total_width: int,
height: int,
output_width: int = 1920, output_width: int = 1920,
scroll_speed: float = 750.0, scroll_speed: float = 1.0,
fps: int = 30, fps: int = 30,
crf: int = 18, crf: int = 18,
lines_per_second: float = 750.0 lines_per_second: float = 750.0
) -> None: ) -> None:
""" """
Create scrolling video from stitched image. Create scrolling video from image sequence using ffmpeg streaming.
Memory-efficient - doesn't create intermediate stitched image.
Args: Args:
stitched_image: Path to stitched horizontal image frames: List of image file paths
output_file: Output video file output_file: Output video file
total_width: Total width of stitched image
height: Height of image
output_width: Width of output video viewport output_width: Width of output video viewport
scroll_speed: Scroll speed multiplier (1.0 = real-time @ 750 lines/sec) scroll_speed: Scroll speed multiplier (1.0 = real-time @ 750 lines/sec)
fps: Output video framerate fps: Output video framerate
crf: Quality (lower = better) crf: Quality (lower = better)
lines_per_second: Linescan capture rate (pixels per second) lines_per_second: Linescan capture rate (pixels per second)
""" """
# Calculate scroll parameters num_frames = len(frames)
# At scroll_speed=1.0, we scroll at the actual capture rate
# Get dimensions from first frame
probe_cmd = [
"ffprobe",
"-v", "error",
"-select_streams", "v:0",
"-show_entries", "stream=width,height",
"-of", "csv=s=x:p=0",
str(frames[0])
]
try:
result = subprocess.run(probe_cmd, capture_output=True, text=True, check=True)
img_width, img_height = map(int, result.stdout.strip().split('x'))
except Exception as e:
print(f"Error probing image dimensions: {e}")
sys.exit(1)
# Calculate total width and video parameters
total_width = img_width * num_frames
pixels_per_second = lines_per_second * scroll_speed pixels_per_second = lines_per_second * scroll_speed
pixels_per_frame = pixels_per_second / fps pixels_per_frame = pixels_per_second / fps
# Calculate video duration
scrollable_width = total_width - output_width scrollable_width = total_width - output_width
duration = scrollable_width / pixels_per_second duration = scrollable_width / pixels_per_second
total_frames = int(duration * fps) total_frames = int(duration * fps)
print(f"\nScrolling video parameters:") print(f"\nVideo parameters:")
print(f" Output viewport: {output_width}x{height}") print(f" Input: {num_frames} images of {img_width}x{img_height}")
print(f" Total panorama width: {total_width} pixels")
print(f" Output viewport: {output_width}x{img_height}")
print(f" Scroll speed: {scroll_speed}x real-time ({pixels_per_second:.1f} pixels/sec)") print(f" Scroll speed: {scroll_speed}x real-time ({pixels_per_second:.1f} pixels/sec)")
print(f" Pixels per frame: {pixels_per_frame:.2f}") print(f" Pixels per frame: {pixels_per_frame:.2f}")
print(f" Video duration: {duration:.2f} seconds") print(f" Video duration: {duration:.2f} seconds")
print(f" Total frames: {total_frames}") print(f" Total frames: {total_frames}")
print(f" Output FPS: {fps}") print(f" Output FPS: {fps}")
# FFmpeg crop filter with horizontal scrolling # Create concat file for ffmpeg
# crop=w:h:x:y where x moves from 0 to (total_width - output_width) concat_file = frames[0].parent / f"concat_temp_{int(time.time())}.txt"
# The 't' variable represents time in seconds
crop_expr = f"crop={output_width}:{height}:min({pixels_per_second}*t\\,{total_width-output_width}):0"
cmd = [
"ffmpeg",
"-y",
"-loop", "1",
"-i", str(stitched_image),
"-vf", crop_expr,
"-t", str(duration),
"-r", str(fps),
"-c:v", "libx264",
"-crf", str(crf),
"-pix_fmt", "yuv420p",
"-movflags", "+faststart",
str(output_file)
]
print(f"\nRunning ffmpeg...")
print(f"Command: {' '.join(cmd)}\n")
try: try:
subprocess.run(cmd, check=True) with open(concat_file, 'w') as f:
print(f"\n✓ Scrolling video created successfully: {output_file}") for frame in frames:
f.write(f"file '{frame.absolute()}'\n")
# Show file size # Build scrolling crop expression: x position moves based on time
size_mb = output_file.stat().st_size / (1024 * 1024) crop_expr = f"crop={output_width}:{img_height}:min({pixels_per_second}*t\\,{scrollable_width}):0"
print(f" File size: {size_mb:.2f} MB")
print(f" Duration: {duration:.2f} seconds")
except subprocess.CalledProcessError as e: # Filter chain: tile creates horizontal panorama, loop repeats it, crop scrolls
print(f"Error: ffmpeg command failed with return code {e.returncode}") filter_complex = f"tile=layout={num_frames}x1[pano];[pano]loop=loop=-1:size=1:start=0,{crop_expr}"
sys.exit(1)
except FileNotFoundError: cmd = [
print("Error: ffmpeg not found. Please install ffmpeg and ensure it's in your PATH.") "ffmpeg",
sys.exit(1) "-y",
"-f", "concat",
"-safe", "0",
"-i", str(concat_file),
"-filter_complex", filter_complex,
"-t", str(duration),
"-r", str(fps),
"-c:v", "libx264",
"-crf", str(crf),
"-pix_fmt", "yuv420p",
"-movflags", "+faststart",
str(output_file)
]
print(f"\nRunning ffmpeg...")
print(f"Command: {' '.join(cmd)}\n")
try:
subprocess.run(cmd, check=True)
print(f"\n✓ Scrolling video created successfully: {output_file}")
# Show file size
size_mb = output_file.stat().st_size / (1024 * 1024)
print(f" File size: {size_mb:.2f} MB")
print(f" Duration: {duration:.2f} seconds")
except subprocess.CalledProcessError as e:
print(f"Error: ffmpeg command failed with return code {e.returncode}")
sys.exit(1)
finally:
# Clean up concat file
if concat_file.exists():
concat_file.unlink()
def main(): def main():
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
description="Stitch linescan images horizontally and create scrolling video", description="Create scrolling panorama videos from linescan images (memory-efficient)",
formatter_class=argparse.RawDescriptionHelpFormatter, formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=""" epilog="""
Examples: Examples:
@@ -182,6 +163,9 @@ Examples:
# Custom output width (4K) # Custom output width (4K)
python append_signals.py results/20251122/bumpy-filter --width 3840 python append_signals.py results/20251122/bumpy-filter --width 3840
Note: This version uses ffmpeg's tile filter for memory-efficient processing.
No intermediate stitched image is created - everything is streamed.
""" """
) )
@@ -194,7 +178,7 @@ Examples:
parser.add_argument( parser.add_argument(
"-o", "--output", "-o", "--output",
type=Path, type=Path,
help="Output video file (default: {folder_name}_{timestamp}.mp4 in input dir)" help="Output video file (default: {folder_name}_scroll_{timestamp}.mp4)"
) )
parser.add_argument( parser.add_argument(
@@ -285,36 +269,16 @@ Examples:
print("Error: CRF must be between 0 and 51") print("Error: CRF must be between 0 and 51")
sys.exit(1) sys.exit(1)
# Create temporary stitched image # Create scrolling video using efficient streaming method
stitched_filename = f"stitched_temp_{int(time.time())}.jpg" create_scrolling_video_efficient(
stitched_path = args.input_dir / stitched_filename frames=frames,
output_file=output_file,
try: output_width=args.width,
# Stitch images scroll_speed=args.scroll_speed,
total_width, img_height = stitch_images_horizontally( fps=args.fps,
frames=frames, crf=args.crf,
output_file=stitched_path, lines_per_second=args.lines_per_second
max_frames=args.max_frames )
)
# Create scrolling video
create_scrolling_video(
stitched_image=stitched_path,
output_file=output_file,
total_width=total_width,
height=img_height,
output_width=args.width,
scroll_speed=args.scroll_speed,
fps=args.fps,
crf=args.crf,
lines_per_second=args.lines_per_second
)
finally:
# Clean up temporary stitched image
if stitched_path.exists():
print(f"\nCleaning up temporary file: {stitched_filename}")
stitched_path.unlink()
if __name__ == "__main__": if __name__ == "__main__":