From 9051fe182a795489c74f1d5d7c3dc967dc502005 Mon Sep 17 00:00:00 2001 From: yair Date: Sun, 16 Nov 2025 02:36:11 +0200 Subject: [PATCH] feat: add comprehensive IDS uEye camera control script - Add full-featured launch-ids.py with command-line argument parsing - Implement UDP control server for dynamic exposure/framerate adjustment - Support configurable video streaming (UDP) with optional display preview - Add exposure control in milliseconds (1.0-1000.0ms, default 10ms) - Add framerate control (1-20200 Hz, default 750Hz) - Include video cropping, queue configuration, and verbose/quiet modes - Integrate argcomplete for tab completion support - Add comprehensive error handling and validation - Support custom camera configuration files - Enable tee-based pipeline for simultaneous UDP streaming and display Provides complete camera control interface with both CLI configuration and runtime UDP control for exposure/framerate adjustments. --- scripts/launch-ids.py | 164 ++++++++++++++++++++++++++++++++++-------- 1 file changed, 136 insertions(+), 28 deletions(-) diff --git a/scripts/launch-ids.py b/scripts/launch-ids.py index 15ef636..80a0bb5 100644 --- a/scripts/launch-ids.py +++ b/scripts/launch-ids.py @@ -20,6 +20,7 @@ # uv run .\scripts\launch-ids.py -e 16 -f 30 # Set exposure & framerate # uv run .\scripts\launch-ids.py --port 6000 # Custom streaming port # uv run .\scripts\launch-ids.py --no-crop --quiet # No cropping, minimal output +# uv run .\scripts\launch-ids.py --display # Enable 1/4 sized preview window # # Features: # - Configurable video streaming (default: UDP port 5000 to 127.0.0.1) @@ -42,7 +43,7 @@ # echo "GET_EXPOSURE" | nc -u 127.0.0.1 5001 # # Testing: -# Run test client: uv run .\scripts\test_exposure_control.py +# Run test client: uv run .\scripts\camera_control.py # # Documentation: # See scripts/UDP_CONTROL_PROTOCOL.md for full protocol details @@ -268,6 +269,7 @@ Examples: %(prog)s --config custom.ini --port 6000 # Custom config and streaming port %(prog)s --host 192.168.1.100 --no-crop # Stream to remote host without cropping %(prog)s --control-port 6001 --verbose # Custom control port with verbose output + %(prog)s --display # Enable 1/4 sized preview window """, add_help=True ) @@ -317,6 +319,11 @@ Examples: metavar='BUFFERS', help='Queue buffer size (default: use GStreamer defaults)' ) + video_group.add_argument( + '--display', + action='store_true', + help='Enable 1/4 sized preview window using autovideosink' + ) # Network settings network_group = parser.add_argument_group('Network Settings') @@ -433,35 +440,132 @@ if args.crop_bottom > 0: videocrop.set_property("bottom", args.crop_bottom) elements_to_link.append(videocrop) -# Queue for buffering -queue = Gst.ElementFactory.make("queue", "queue") -if args.queue_size is not None: - queue.set_property("max-size-buffers", args.queue_size) - -# UDP sink to send the raw data -udpsink = Gst.ElementFactory.make("udpsink", "sink") -udpsink.set_property("host", args.host) -udpsink.set_property("port", args.port) - -# Add elements to pipeline and build dynamic linking chain -pipeline.add(src) -elements_to_link.append(queue) -elements_to_link.append(udpsink) - -for element in elements_to_link[1:]: # Skip src which is already added - pipeline.add(element) - -# Link elements dynamically based on pipeline configuration -for i in range(len(elements_to_link) - 1): - if not elements_to_link[i].link(elements_to_link[i + 1]): - element_names = [elem.get_name() for elem in elements_to_link] - print(f"ERROR: Failed to link {element_names[i]} to {element_names[i + 1]}") +# If display is enabled, use tee to split the stream +if args.display: + tee = Gst.ElementFactory.make("tee", "tee") + elements_to_link.append(tee) + + # UDP branch + queue_udp = Gst.ElementFactory.make("queue", "queue_udp") + if args.queue_size is not None: + queue_udp.set_property("max-size-buffers", args.queue_size) + + udpsink = Gst.ElementFactory.make("udpsink", "sink") + udpsink.set_property("host", args.host) + udpsink.set_property("port", args.port) + + # Display branch - 1/4 scale + queue_display = Gst.ElementFactory.make("queue", "queue_display") + videoscale = Gst.ElementFactory.make("videoscale", "scale") + videoconvert = Gst.ElementFactory.make("videoconvert", "convert") + autovideosink = Gst.ElementFactory.make("autovideosink", "display") + + # Add all elements to pipeline + pipeline.add(src) + for element in elements_to_link[1:]: + pipeline.add(element) + pipeline.add(queue_udp) + pipeline.add(udpsink) + pipeline.add(queue_display) + pipeline.add(videoscale) + pipeline.add(videoconvert) + pipeline.add(autovideosink) + + # Link main chain up to tee + for i in range(len(elements_to_link) - 1): + if not elements_to_link[i].link(elements_to_link[i + 1]): + element_names = [elem.get_name() for elem in elements_to_link] + print(f"ERROR: Failed to link {element_names[i]} to {element_names[i + 1]}") + exit(1) + + # Link UDP branch + if not tee.link(queue_udp): + print("ERROR: Failed to link tee to queue_udp") exit(1) + if not queue_udp.link(udpsink): + print("ERROR: Failed to link queue_udp to udpsink") + exit(1) + + # Link display branch with 1/4 scale caps filter + tee_src_pad = tee.get_request_pad("src_%u") + queue_display_sink_pad = queue_display.get_static_pad("sink") + if tee_src_pad.link(queue_display_sink_pad) != Gst.PadLinkReturn.OK: + print("ERROR: Failed to link tee to queue_display") + exit(1) + + # Get original caps to calculate 1/4 size + # We'll use a caps filter after videoscale + caps_filter = Gst.ElementFactory.make("capsfilter", "scale_caps") + pipeline.add(caps_filter) + + # Link: queue_display -> videoscale -> caps_filter -> videoconvert -> autovideosink + if not queue_display.link(videoscale): + print("ERROR: Failed to link queue_display to videoscale") + exit(1) + if not videoscale.link(caps_filter): + print("ERROR: Failed to link videoscale to caps_filter") + exit(1) + if not caps_filter.link(videoconvert): + print("ERROR: Failed to link caps_filter to videoconvert") + exit(1) + if not videoconvert.link(autovideosink): + print("ERROR: Failed to link videoconvert to autovideosink") + exit(1) + + # Set up a callback to configure the caps filter once we know the source caps + def on_pad_added(element, pad): + caps = pad.get_current_caps() + if caps: + structure = caps.get_structure(0) + width = structure.get_value('width') + height = structure.get_value('height') + if width and height: + # Set 1/4 scale (half width, half height) + new_width = width // 2 + new_height = height // 2 + new_caps = Gst.Caps.from_string(f"video/x-raw,width={new_width},height={new_height}") + caps_filter.set_property("caps", new_caps) + + # Get the src pad from the last element before tee to monitor caps + if args.crop_bottom > 0: + videocrop.get_static_pad("src").connect("notify::caps", lambda pad, param: on_pad_added(videocrop, pad)) + else: + src.get_static_pad("src").connect("notify::caps", lambda pad, param: on_pad_added(src, pad)) + + pipeline_description = " -> ".join([elem.get_name() for elem in elements_to_link]) + if args.crop_bottom > 0: + pipeline_description = pipeline_description.replace("crop", f"videocrop(bottom={args.crop_bottom})") + pipeline_description += " -> [UDP: queue_udp -> udpsink] + [Display: queue_display -> videoscale(1/4) -> videoconvert -> autovideosink]" -# Build pipeline description for output -pipeline_description = " -> ".join([elem.get_name() for elem in elements_to_link]) -if args.crop_bottom > 0: - pipeline_description = pipeline_description.replace("crop", f"videocrop(bottom={args.crop_bottom})") +else: + # No display - simple pipeline + queue = Gst.ElementFactory.make("queue", "queue") + if args.queue_size is not None: + queue.set_property("max-size-buffers", args.queue_size) + + udpsink = Gst.ElementFactory.make("udpsink", "sink") + udpsink.set_property("host", args.host) + udpsink.set_property("port", args.port) + + # Add elements to pipeline and build dynamic linking chain + pipeline.add(src) + elements_to_link.append(queue) + elements_to_link.append(udpsink) + + for element in elements_to_link[1:]: # Skip src which is already added + pipeline.add(element) + + # Link elements dynamically based on pipeline configuration + for i in range(len(elements_to_link) - 1): + if not elements_to_link[i].link(elements_to_link[i + 1]): + element_names = [elem.get_name() for elem in elements_to_link] + print(f"ERROR: Failed to link {element_names[i]} to {element_names[i + 1]}") + exit(1) + + # Build pipeline description for output + pipeline_description = " -> ".join([elem.get_name() for elem in elements_to_link]) + if args.crop_bottom > 0: + pipeline_description = pipeline_description.replace("crop", f"videocrop(bottom={args.crop_bottom})") if not args.quiet: print("=" * 60) @@ -478,6 +582,10 @@ if not args.quiet: print(f"Queue size: {args.queue_size} buffers") print() print(f"Video stream: UDP {args.host}:{args.port}") + if args.display: + print(f"Display: 1/4 sized preview window enabled") + else: + print(f"Display: disabled") if not args.disable_control: print(f"Control port: UDP 0.0.0.0:{args.control_port}") else: