feat: add timeline overlay and video segment options

- Add --timeline flag to overlay frame numbers as ruler at bottom
- Add --start and --end parameters to process video segments
- Timeline shows evenly-spaced frame numbers with tick marks
- Timeline always horizontal from left to right at bottom
- Segment processing maintains actual frame numbers in timeline
- Update all extraction and analysis functions to support segments
This commit is contained in:
yair-mv 2025-11-03 17:27:01 +02:00
parent c8acdca86b
commit 777f9d14b0

194
main.py
View File

@ -13,6 +13,7 @@ import numpy as np
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
from pathlib import Path from pathlib import Path
import uuid import uuid
import math
def calculate_line_difference(line1, line2): def calculate_line_difference(line1, line2):
@ -69,7 +70,7 @@ def generate_change_graph(changes, output_path, threshold=None):
print(f"Change graph saved to: {output_path}") print(f"Change graph saved to: {output_path}")
def analyze_changes_only(video_path, x_column=None, y_row=None, debug_output=None): def analyze_changes_only(video_path, x_column=None, y_row=None, debug_output=None, start_frame=0, end_frame=None):
""" """
Analyze changes in video without generating strip image. Analyze changes in video without generating strip image.
Used for debug mode to generate change threshold graphs. Used for debug mode to generate change threshold graphs.
@ -79,6 +80,8 @@ def analyze_changes_only(video_path, x_column=None, y_row=None, debug_output=Non
x_column: X-coordinate of column to analyze (if column mode) x_column: X-coordinate of column to analyze (if column mode)
y_row: Y-coordinate of row to analyze (if row mode) y_row: Y-coordinate of row to analyze (if row mode)
debug_output: Base path for debug outputs debug_output: Base path for debug outputs
start_frame: First frame to process (0-based)
end_frame: Last frame to process (None = until end)
Returns: Returns:
List of change values List of change values
@ -102,7 +105,11 @@ def analyze_changes_only(video_path, x_column=None, y_row=None, debug_output=Non
raise ValueError(f"Row {y_row} is outside video height ({frame_height})") raise ValueError(f"Row {y_row} is outside video height ({frame_height})")
print(f"Analyzing row {y_row} from {frame_width}x{frame_height} frames") print(f"Analyzing row {y_row} from {frame_width}x{frame_height} frames")
print(f"Processing {total_frames} frames for change analysis...") # Set end frame if not specified
if end_frame is None:
end_frame = total_frames - 1
print(f"Processing frames {start_frame} to {end_frame} ({end_frame - start_frame + 1} frames) for change analysis...")
changes = [] changes = []
previous_line = None previous_line = None
@ -112,6 +119,15 @@ def analyze_changes_only(video_path, x_column=None, y_row=None, debug_output=Non
ret, frame = cap.read() ret, frame = cap.read()
if not ret: if not ret:
break break
# Skip frames before start
if frame_idx < start_frame:
frame_idx += 1
continue
# Stop after end frame
if frame_idx > end_frame:
break
# Extract current line (column or row) # Extract current line (column or row)
if x_column is not None: if x_column is not None:
@ -127,8 +143,8 @@ def analyze_changes_only(video_path, x_column=None, y_row=None, debug_output=Non
previous_line = current_line previous_line = current_line
frame_idx += 1 frame_idx += 1
if frame_idx % 100 == 0: if (frame_idx - start_frame) % 100 == 0:
print(f"Analyzed {frame_idx}/{total_frames} frames") print(f"Analyzed {frame_idx - start_frame}/{end_frame - start_frame + 1} frames")
cap.release() cap.release()
@ -167,7 +183,51 @@ def analyze_changes_only(video_path, x_column=None, y_row=None, debug_output=Non
return changes return changes
def extract_column_strip(video_path, x_column, output_path, change_threshold=0.005, relax=0): def add_timeline_overlay(image, frame_numbers):
"""
Add frame number overlay as a timeline/ruler at the top of the image.
Always horizontal from left to right.
Args:
image: The strip image to add overlay to
frame_numbers: List of frame numbers that were included
Returns:
Image with timeline overlay
"""
if not frame_numbers:
return image
overlay = image.copy()
font = cv2.FONT_HERSHEY_SIMPLEX
font_scale = 0.4
font_thickness = 1
text_color = (255, 255, 0) # Cyan for visibility
# Calculate text size for spacing
(text_width, text_height), _ = cv2.getTextSize("00000", font, font_scale, font_thickness)
# Horizontal timeline at the top from left to right
# Calculate spacing to avoid overlap
available_width = image.shape[1]
num_labels = min(len(frame_numbers), max(10, available_width // (text_width + 10)))
step = max(1, len(frame_numbers) // num_labels)
for i in range(0, len(frame_numbers), step):
frame_num = frame_numbers[i]
text = str(frame_num)
x_pos = int((i / len(frame_numbers)) * available_width)
# Add small tick mark
cv2.line(overlay, (x_pos, 0), (x_pos, 10), text_color, 1)
# Add text
cv2.putText(overlay, text, (x_pos + 2, text_height + 12),
font, font_scale, text_color, font_thickness, cv2.LINE_AA)
return overlay
def extract_column_strip(video_path, x_column, output_path, change_threshold=0.005, relax=0, timeline=False, start_frame=0, end_frame=None):
""" """
Extract vertical strip at x_column from each frame of the video. Extract vertical strip at x_column from each frame of the video.
Only include frames where the change exceeds the threshold. Only include frames where the change exceeds the threshold.
@ -178,6 +238,9 @@ def extract_column_strip(video_path, x_column, output_path, change_threshold=0.0
output_path: Path for output image output_path: Path for output image
change_threshold: Minimum change threshold (0-1) to include frame change_threshold: Minimum change threshold (0-1) to include frame
relax: Number of extra frames to include before/after threshold frames relax: Number of extra frames to include before/after threshold frames
timeline: If True, overlay frame numbers as timeline
start_frame: First frame to process (0-based)
end_frame: Last frame to process (None = until end)
""" """
cap = cv2.VideoCapture(str(video_path)) cap = cv2.VideoCapture(str(video_path))
@ -192,7 +255,11 @@ def extract_column_strip(video_path, x_column, output_path, change_threshold=0.0
if x_column >= frame_width: if x_column >= frame_width:
raise ValueError(f"Column {x_column} is outside video width ({frame_width})") raise ValueError(f"Column {x_column} is outside video width ({frame_width})")
print(f"Processing {total_frames} frames...") # Set end frame if not specified
if end_frame is None:
end_frame = total_frames - 1
print(f"Processing frames {start_frame} to {end_frame} ({end_frame - start_frame + 1} frames)...")
print(f"Extracting column {x_column} from {frame_width}x{frame_height} frames") print(f"Extracting column {x_column} from {frame_width}x{frame_height} frames")
print(f"Change threshold: {change_threshold}") print(f"Change threshold: {change_threshold}")
if relax > 0: if relax > 0:
@ -201,6 +268,7 @@ def extract_column_strip(video_path, x_column, output_path, change_threshold=0.0
# First pass: collect all columns and identify significant frames # First pass: collect all columns and identify significant frames
all_columns = [] all_columns = []
changes = [] changes = []
frame_numbers = []
previous_column = None previous_column = None
frame_idx = 0 frame_idx = 0
@ -208,10 +276,20 @@ def extract_column_strip(video_path, x_column, output_path, change_threshold=0.0
ret, frame = cap.read() ret, frame = cap.read()
if not ret: if not ret:
break break
# Skip frames before start
if frame_idx < start_frame:
frame_idx += 1
continue
# Stop after end frame
if frame_idx > end_frame:
break
# Extract current column # Extract current column
current_column = frame[:, x_column, :].copy() current_column = frame[:, x_column, :].copy()
all_columns.append(current_column) all_columns.append(current_column)
frame_numbers.append(frame_idx)
# Calculate change from previous frame # Calculate change from previous frame
if previous_column is not None: if previous_column is not None:
@ -223,8 +301,8 @@ def extract_column_strip(video_path, x_column, output_path, change_threshold=0.0
previous_column = current_column previous_column = current_column
frame_idx += 1 frame_idx += 1
if frame_idx % 100 == 0: if (frame_idx - start_frame) % 100 == 0:
print(f"Processed {frame_idx}/{total_frames} frames") print(f"Processed {frame_idx - start_frame}/{end_frame - start_frame + 1} frames")
cap.release() cap.release()
@ -239,8 +317,14 @@ def extract_column_strip(video_path, x_column, output_path, change_threshold=0.0
for j in range(start, end): for j in range(start, end):
include_mask[j] = True include_mask[j] = True
# Collect significant columns # Collect significant columns with actual frame numbers
significant_columns = [col for i, col in enumerate(all_columns) if include_mask[i]] significant_columns = []
significant_frame_numbers = []
for i, col in enumerate(all_columns):
if include_mask[i]:
significant_columns.append(col)
significant_frame_numbers.append(frame_numbers[i])
included_frames = sum(include_mask) included_frames = sum(include_mask)
skipped_frames = len(all_columns) - included_frames skipped_frames = len(all_columns) - included_frames
@ -250,7 +334,11 @@ def extract_column_strip(video_path, x_column, output_path, change_threshold=0.0
# Convert list to numpy array # Convert list to numpy array
strip_image = np.stack(significant_columns, axis=1) strip_image = np.stack(significant_columns, axis=1)
print(f"Original frames: {total_frames}") # Add timeline overlay if requested
if timeline:
strip_image = add_timeline_overlay(strip_image, significant_frame_numbers)
print(f"Original frames in segment: {len(all_columns)}")
print(f"Included frames: {included_frames}") print(f"Included frames: {included_frames}")
print(f"Skipped frames: {skipped_frames}") print(f"Skipped frames: {skipped_frames}")
print(f"Compression ratio: {skipped_frames/total_frames:.1%}") print(f"Compression ratio: {skipped_frames/total_frames:.1%}")
@ -261,7 +349,7 @@ def extract_column_strip(video_path, x_column, output_path, change_threshold=0.0
cv2.imwrite(str(output_path), strip_image) cv2.imwrite(str(output_path), strip_image)
def extract_row_strip(video_path, y_row, output_path, change_threshold=0.01, relax=0): def extract_row_strip(video_path, y_row, output_path, change_threshold=0.01, relax=0, timeline=False, start_frame=0, end_frame=None):
""" """
Extract horizontal strip at y_row from each frame of the video. Extract horizontal strip at y_row from each frame of the video.
Only include frames where the change exceeds the threshold. Only include frames where the change exceeds the threshold.
@ -272,6 +360,9 @@ def extract_row_strip(video_path, y_row, output_path, change_threshold=0.01, rel
output_path: Path for output image output_path: Path for output image
change_threshold: Minimum change threshold (0-1) to include frame change_threshold: Minimum change threshold (0-1) to include frame
relax: Number of extra frames to include before/after threshold frames relax: Number of extra frames to include before/after threshold frames
timeline: If True, overlay frame numbers as timeline
start_frame: First frame to process (0-based)
end_frame: Last frame to process (None = until end)
""" """
cap = cv2.VideoCapture(str(video_path)) cap = cv2.VideoCapture(str(video_path))
@ -286,7 +377,11 @@ def extract_row_strip(video_path, y_row, output_path, change_threshold=0.01, rel
if y_row >= frame_height: if y_row >= frame_height:
raise ValueError(f"Row {y_row} is outside video height ({frame_height})") raise ValueError(f"Row {y_row} is outside video height ({frame_height})")
print(f"Processing {total_frames} frames...") # Set end frame if not specified
if end_frame is None:
end_frame = total_frames - 1
print(f"Processing frames {start_frame} to {end_frame} ({end_frame - start_frame + 1} frames)...")
print(f"Extracting row {y_row} from {frame_width}x{frame_height} frames") print(f"Extracting row {y_row} from {frame_width}x{frame_height} frames")
print(f"Change threshold: {change_threshold}") print(f"Change threshold: {change_threshold}")
if relax > 0: if relax > 0:
@ -295,6 +390,7 @@ def extract_row_strip(video_path, y_row, output_path, change_threshold=0.01, rel
# First pass: collect all rows and identify significant frames # First pass: collect all rows and identify significant frames
all_rows = [] all_rows = []
changes = [] changes = []
frame_numbers = []
previous_row = None previous_row = None
frame_idx = 0 frame_idx = 0
@ -302,10 +398,20 @@ def extract_row_strip(video_path, y_row, output_path, change_threshold=0.01, rel
ret, frame = cap.read() ret, frame = cap.read()
if not ret: if not ret:
break break
# Skip frames before start
if frame_idx < start_frame:
frame_idx += 1
continue
# Stop after end frame
if frame_idx > end_frame:
break
# Extract current row # Extract current row
current_row = frame[y_row, :, :].copy() current_row = frame[y_row, :, :].copy()
all_rows.append(current_row) all_rows.append(current_row)
frame_numbers.append(frame_idx)
# Calculate change from previous frame # Calculate change from previous frame
if previous_row is not None: if previous_row is not None:
@ -317,8 +423,8 @@ def extract_row_strip(video_path, y_row, output_path, change_threshold=0.01, rel
previous_row = current_row previous_row = current_row
frame_idx += 1 frame_idx += 1
if frame_idx % 100 == 0: if (frame_idx - start_frame) % 100 == 0:
print(f"Processed {frame_idx}/{total_frames} frames") print(f"Processed {frame_idx - start_frame}/{end_frame - start_frame + 1} frames")
cap.release() cap.release()
@ -333,8 +439,14 @@ def extract_row_strip(video_path, y_row, output_path, change_threshold=0.01, rel
for j in range(start, end): for j in range(start, end):
include_mask[j] = True include_mask[j] = True
# Collect significant rows # Collect significant rows with actual frame numbers
significant_rows = [row for i, row in enumerate(all_rows) if include_mask[i]] significant_rows = []
significant_frame_numbers = []
for i, row in enumerate(all_rows):
if include_mask[i]:
significant_rows.append(row)
significant_frame_numbers.append(frame_numbers[i])
included_frames = sum(include_mask) included_frames = sum(include_mask)
skipped_frames = len(all_rows) - included_frames skipped_frames = len(all_rows) - included_frames
@ -345,9 +457,13 @@ def extract_row_strip(video_path, y_row, output_path, change_threshold=0.01, rel
strip_image = np.stack(significant_rows, axis=0) strip_image = np.stack(significant_rows, axis=0)
# Rotate clockwise 90 degrees for row mode # Rotate clockwise 90 degrees for row mode
strip_image = cv2.rotate(strip_image, cv2.ROTATE_90_CLOCKWISE) strip_image = cv2.rotate(strip_image, cv2.ROTATE_90_COUNTERCLOCKWISE)
print(f"Original frames: {total_frames}") # Add timeline overlay if requested (after rotation)
if timeline:
strip_image = add_timeline_overlay(strip_image, significant_frame_numbers)
print(f"Original frames in segment: {len(all_rows)}")
print(f"Included frames: {included_frames}") print(f"Included frames: {included_frames}")
print(f"Skipped frames: {skipped_frames}") print(f"Skipped frames: {skipped_frames}")
print(f"Compression ratio: {skipped_frames/total_frames:.1%}") print(f"Compression ratio: {skipped_frames/total_frames:.1%}")
@ -402,6 +518,25 @@ def main():
help="Include N extra frames before/after frames exceeding threshold (default: 0, or 100 if flag used without value)" help="Include N extra frames before/after frames exceeding threshold (default: 0, or 100 if flag used without value)"
) )
parser.add_argument(
"--start",
type=int,
default=0,
help="Start frame number (0-based, default: 0)"
)
parser.add_argument(
"--end",
type=int,
help="End frame number (0-based, default: last frame)"
)
parser.add_argument(
"--timeline",
action="store_true",
help="Overlay frame numbers as timeline/ruler on output image"
)
parser.add_argument( parser.add_argument(
"--debug", "--debug",
action="store_true", action="store_true",
@ -440,6 +575,15 @@ def main():
print("Error: --threshold must be between 0 and 1") print("Error: --threshold must be between 0 and 1")
sys.exit(1) sys.exit(1)
# Validate frame range
if args.start < 0:
print("Error: --start must be non-negative")
sys.exit(1)
if args.end is not None and args.end < args.start:
print("Error: --end must be greater than or equal to --start")
sys.exit(1)
# Generate output path # Generate output path
if args.output: if args.output:
output_path = Path(args.output) output_path = Path(args.output)
@ -469,20 +613,24 @@ def main():
if args.xcolumn is not None: if args.xcolumn is not None:
print(f"Column mode: Analyzing vertical line at x={args.xcolumn}") print(f"Column mode: Analyzing vertical line at x={args.xcolumn}")
analyze_changes_only(video_path, x_column=args.xcolumn, debug_output=output_path) analyze_changes_only(video_path, x_column=args.xcolumn, debug_output=output_path,
start_frame=args.start, end_frame=args.end)
else: else:
print(f"Row mode: Analyzing horizontal line at y={args.yrow}") print(f"Row mode: Analyzing horizontal line at y={args.yrow}")
analyze_changes_only(video_path, y_row=args.yrow, debug_output=output_path) analyze_changes_only(video_path, y_row=args.yrow, debug_output=output_path,
start_frame=args.start, end_frame=args.end)
print("Change analysis completed successfully!") print("Change analysis completed successfully!")
else: else:
# Normal mode: extract strip photography # Normal mode: extract strip photography
if args.xcolumn is not None: if args.xcolumn is not None:
print(f"Column mode: Extracting vertical line at x={args.xcolumn}") print(f"Column mode: Extracting vertical line at x={args.xcolumn}")
extract_column_strip(video_path, args.xcolumn, output_path, args.threshold, args.relax) extract_column_strip(video_path, args.xcolumn, output_path, args.threshold, args.relax, args.timeline,
args.start, args.end)
else: else:
print(f"Row mode: Extracting horizontal line at y={args.yrow}") print(f"Row mode: Extracting horizontal line at y={args.yrow}")
extract_row_strip(video_path, args.yrow, output_path, args.threshold, args.relax) extract_row_strip(video_path, args.yrow, output_path, args.threshold, args.relax, args.timeline,
args.start, args.end)
print("Strip photography extraction completed successfully!") print("Strip photography extraction completed successfully!")