diff --git a/include/mynteye/api/api.h b/include/mynteye/api/api.h index ea7a27f..e9b3cc1 100644 --- a/include/mynteye/api/api.h +++ b/include/mynteye/api/api.h @@ -91,6 +91,8 @@ class MYNTEYE_API API { using stream_callback_t = std::function; /** The api::MotionData callback. */ using motion_callback_t = std::function; + /** The enable/disable switch callback. */ + using stream_switch_callback_t = std::function; explicit API(std::shared_ptr device, CalibrationModel calib_model); virtual ~API(); @@ -280,11 +282,31 @@ class MYNTEYE_API API { * still support this stream. */ void EnableStreamData(const Stream &stream); + + /** + * Enable the data of stream. + * callback function will call before the father processor enable. + * when try_tag is true, the function will do nothing except callback. + */ + void EnableStreamData( + const Stream &stream, + stream_switch_callback_t callback, + bool try_tag = false); /** * Disable the data of stream. */ void DisableStreamData(const Stream &stream); + /** + * Disable the data of stream. + * callback function will call before the children processor disable. + * when try_tag is true, the function will do nothing except callback. + */ + void DisableStreamData( + const Stream &stream, + stream_switch_callback_t callback, + bool try_tag = false); + /** * Get the latest data of stream. */ diff --git a/src/mynteye/api/api.cc b/src/mynteye/api/api.cc index 18e80cd..0f6801e 100644 --- a/src/mynteye/api/api.cc +++ b/src/mynteye/api/api.cc @@ -446,6 +446,17 @@ void API::DisableStreamData(const Stream &stream) { synthetic_->DisableStreamData(stream); } +void API::EnableStreamData( + const Stream &stream, stream_switch_callback_t callback, + bool try_tag) { + synthetic_->EnableStreamData(stream, callback, try_tag); +} +void API::DisableStreamData( + const Stream &stream, stream_switch_callback_t callback, + bool try_tag) { + synthetic_->DisableStreamData(stream, callback, try_tag); +} + api::StreamData API::GetStreamData(const Stream &stream) { return synthetic_->GetStreamData(stream); } diff --git a/src/mynteye/api/synthetic.cc b/src/mynteye/api/synthetic.cc index 1830e67..fc7771e 100644 --- a/src/mynteye/api/synthetic.cc +++ b/src/mynteye/api/synthetic.cc @@ -195,27 +195,6 @@ bool Synthetic::checkControlDateWithStream(const Stream& stream) const { return false; } -void Synthetic::EnableStreamData(const Stream &stream) { - // Activate processors of synthetic stream - auto processor = getProcessorWithStream(stream); - iterate_processor_CtoP_before(processor, - [](std::shared_ptr proce){ - auto streams = proce->getTargetStreams(); - int act_tag = 0; - for (unsigned int i = 0; i < proce->getStreamsSum() ; i++) { - if (proce->target_streams_[i].enabled_mode_ == MODE_LAST) { - act_tag++; - proce->target_streams_[i].enabled_mode_ = MODE_SYNTHETIC; - } - } - if (act_tag > 0 && !proce->IsActivated()) { - // std::cout << proce->Name() << " Active now" << std::endl; - proce->Activate(); - } - }); -} - - bool Synthetic::Supports(const Stream &stream) const { return checkControlDateWithStream(stream); } @@ -228,16 +207,45 @@ Synthetic::mode_t Synthetic::SupportsMode(const Stream &stream) const { return MODE_LAST; } -void Synthetic::DisableStreamData(const Stream &stream) { +void Synthetic::EnableStreamData( + const Stream &stream, stream_switch_callback_t callback, + bool try_tag) { + // Activate processors of synthetic stream + auto processor = getProcessorWithStream(stream); + iterate_processor_CtoP_before(processor, + [callback, try_tag](std::shared_ptr proce){ + auto streams = proce->getTargetStreams(); + int act_tag = 0; + for (unsigned int i = 0; i < proce->getStreamsSum() ; i++) { + if (proce->target_streams_[i].enabled_mode_ == MODE_LAST) { + callback(proce->target_streams_[i].stream); + if (!try_tag) { + act_tag++; + proce->target_streams_[i].enabled_mode_ = MODE_SYNTHETIC; + } + } + } + if (act_tag > 0 && !proce->IsActivated()) { + // std::cout << proce->Name() << " Active now" << std::endl; + proce->Activate(); + } + }); +} +void Synthetic::DisableStreamData( + const Stream &stream, stream_switch_callback_t callback, + bool try_tag) { auto processor = getProcessorWithStream(stream); iterate_processor_PtoC_before(processor, - [](std::shared_ptr proce){ + [callback, try_tag](std::shared_ptr proce){ auto streams = proce->getTargetStreams(); int act_tag = 0; for (unsigned int i = 0; i < proce->getStreamsSum() ; i++) { if (proce->target_streams_[i].enabled_mode_ == MODE_SYNTHETIC) { - act_tag++; - proce->target_streams_[i].enabled_mode_ = MODE_LAST; + callback(proce->target_streams_[i].stream); + if (!try_tag) { + act_tag++; + proce->target_streams_[i].enabled_mode_ = MODE_LAST; + } } } if (act_tag > 0 && proce->IsActivated()) { @@ -247,6 +255,20 @@ void Synthetic::DisableStreamData(const Stream &stream) { }); } +void Synthetic::EnableStreamData(const Stream &stream) { + EnableStreamData(stream, [](const Stream &stream){ + // std::cout << stream << "enabled in callback" << std::endl; + MYNTEYE_UNUSED(stream); + }, false); +} + +void Synthetic::DisableStreamData(const Stream &stream) { + DisableStreamData(stream, [](const Stream &stream){ + // std::cout << stream << "disabled in callback" << std::endl; + MYNTEYE_UNUSED(stream); + }, false); +} + bool Synthetic::IsStreamDataEnabled(const Stream &stream) const { if (checkControlDateWithStream(stream)) { auto data = getControlDateWithStream(stream); diff --git a/src/mynteye/api/synthetic.h b/src/mynteye/api/synthetic.h index 29a9548..fbd80ac 100644 --- a/src/mynteye/api/synthetic.h +++ b/src/mynteye/api/synthetic.h @@ -35,6 +35,7 @@ struct Object; class Synthetic { public: using stream_callback_t = API::stream_callback_t; + using stream_switch_callback_t = API::stream_switch_callback_t; typedef enum Mode { MODE_NATIVE, // Native stream @@ -59,6 +60,11 @@ class Synthetic { void EnableStreamData(const Stream &stream); void DisableStreamData(const Stream &stream); + + void EnableStreamData( + const Stream &stream, stream_switch_callback_t callback, bool try_tag); + void DisableStreamData( + const Stream &stream, stream_switch_callback_t callback, bool try_tag); bool IsStreamDataEnabled(const Stream &stream) const; void SetStreamCallback(const Stream &stream, stream_callback_t callback); diff --git a/wrappers/ros/src/mynt_eye_ros_wrapper/launch/mynteye.launch b/wrappers/ros/src/mynt_eye_ros_wrapper/launch/mynteye.launch index 8a571db..d6ada67 100644 --- a/wrappers/ros/src/mynt_eye_ros_wrapper/launch/mynteye.launch +++ b/wrappers/ros/src/mynt_eye_ros_wrapper/launch/mynteye.launch @@ -347,6 +347,10 @@ - 'image_transport/compressedDepth' - + + + - 'image_transport/compressedDepth' + + diff --git a/wrappers/ros/src/mynt_eye_ros_wrapper/src/wrapper_nodelet.cc b/wrappers/ros/src/mynt_eye_ros_wrapper/src/wrapper_nodelet.cc index d9048da..72b5541 100644 --- a/wrappers/ros/src/mynt_eye_ros_wrapper/src/wrapper_nodelet.cc +++ b/wrappers/ros/src/mynt_eye_ros_wrapper/src/wrapper_nodelet.cc @@ -554,75 +554,41 @@ class ROSWrapperNodelet : public nodelet::Nodelet { return true; } - void SetIsPublished(const Stream &stream) { - is_published_[stream] = false; - switch (stream) { - case Stream::LEFT_RECTIFIED: { - if (is_published_[Stream::RIGHT_RECTIFIED]) { - SetIsPublished(Stream::RIGHT_RECTIFIED); - } - if (is_published_[Stream::DISPARITY]) { - SetIsPublished(Stream::DISPARITY); - } - } break; - case Stream::RIGHT_RECTIFIED: { - if (is_published_[Stream::LEFT_RECTIFIED]) { - SetIsPublished(Stream::LEFT_RECTIFIED); - } - if (is_published_[Stream::DISPARITY]) { - SetIsPublished(Stream::DISPARITY); - } - } break; - case Stream::DISPARITY: { - if (is_published_[Stream::DISPARITY_NORMALIZED]) { - SetIsPublished(Stream::DISPARITY_NORMALIZED); - } - if (is_published_[Stream::POINTS]) { - SetIsPublished(Stream::POINTS); - } - } break; - case Stream::DISPARITY_NORMALIZED: { - } break; - case Stream::POINTS: { - if (is_published_[Stream::DEPTH]) { - SetIsPublished(Stream::DEPTH); - } - } break; - case Stream::DEPTH: { - } break; - default: - return; + void publishData( + const Stream &stream, const api::StreamData &data, std::uint32_t seq, + ros::Time stamp) { + if (stream == Stream::LEFT || stream == Stream::RIGHT) { + return; + } else if (stream == Stream::POINTS) { + publishPoints(data, seq, stamp); + } else { + publishCamera(stream, data, seq, stamp); } } - void publishPoint(const Stream &stream) { - auto &&points_num = points_publisher_.getNumSubscribers(); - if (points_num == 0 && is_published_[stream]) { - SetIsPublished(stream); - api_->DisableStreamData(stream); - } else if (points_num > 0 && !is_published_[Stream::POINTS]) { - api_->EnableStreamData(Stream::POINTS); - api_->SetStreamCallback( - Stream::POINTS, [this](const api::StreamData &data) { - // ros::Time stamp = hardTimeToSoftTime(data.img->timestamp); - ros::Time stamp = checkUpTimeStamp( - data.img->timestamp, Stream::POINTS); - static std::size_t count = 0; - ++count; - publishPoints(data, count, stamp); - }); - is_published_[Stream::POINTS] = true; + int getStreamSubscribers(const Stream &stream) { + if (stream == Stream::POINTS) { + return points_publisher_.getNumSubscribers(); } + auto pub = camera_publishers_[stream]; + if (pub) + return pub.getNumSubscribers(); + return -1; } void publishOthers(const Stream &stream) { - auto stream_num = camera_publishers_[stream].getNumSubscribers(); - if (stream_num == 0 && is_published_[stream]) { - // Stop computing when was not subcribed - SetIsPublished(stream); - api_->DisableStreamData(stream); - } else if (stream_num > 0 && !is_published_[stream]) { - // Start computing and publishing when was subcribed + // std::cout << stream << "===============================" << std::endl; + // int enable_tag = 0; + // api_->EnableStreamData(stream, [&](const Stream &stream) { + // enable_tag += getStreamSubscribers(stream); + // std::cout << "EnableStreamData callback test" + // << stream << "|| Sum:" + // << getStreamSubscribers(stream) << std::endl; + // }, true); + if (getStreamSubscribers(stream) > 0 && !is_published_[stream]) { + // std::cout << stream + // <<" enableStreamData tag = 0 return" << std::endl; + // std::cout << "enable " << stream << std::endl; api_->EnableStreamData(stream); api_->SetStreamCallback( stream, [this, stream](const api::StreamData &data) { @@ -631,81 +597,121 @@ class ROSWrapperNodelet : public nodelet::Nodelet { data.img->timestamp, stream); static std::size_t count = 0; ++count; - publishCamera(stream, data, count, stamp); + publishData(stream, data, count, stamp); }); is_published_[stream] = true; + return; + } + + int disable_tag = 0; + api_->DisableStreamData(stream, [&](const Stream &stream) { + disable_tag += getStreamSubscribers(stream); + // std::cout << "DisableStreamData callback test: " + // << stream << "|| Sum:"<< getStreamSubscribers(stream) << std::endl; + }, true); + if (disable_tag == 0 && is_published_[stream]) { + api_->DisableStreamData(stream, [&](const Stream &stream) { + // std::cout << "disable " << stream << std::endl; + api_->SetStreamCallback(stream, nullptr); + is_published_[stream] = false; + }); + return; } } void publishTopics() { - if ((camera_publishers_[Stream::LEFT].getNumSubscribers() > 0 || - mono_publishers_[Stream::LEFT].getNumSubscribers() > 0) && - !is_published_[Stream::LEFT]) { - api_->SetStreamCallback( - Stream::LEFT, [&](const api::StreamData &data) { - ++left_count_; - if (left_count_ > 10) { - // ros::Time stamp = hardTimeToSoftTime(data.img->timestamp); - ros::Time stamp = checkUpTimeStamp( - data.img->timestamp, Stream::LEFT); + std::vector all_streams{ + Stream::RIGHT, + Stream::LEFT, + Stream::LEFT_RECTIFIED, + Stream::RIGHT_RECTIFIED, + Stream::DISPARITY, + Stream::DISPARITY_NORMALIZED, + Stream::POINTS, + Stream::DEPTH + }; - // static double img_time_prev = -1; - // NODELET_INFO_STREAM("ros_time_beg: " << FULL_PRECISION << - // ros_time_beg - // << ", img_time_elapsed: " << FULL_PRECISION - // << ((data.img->timestamp - img_time_beg) * 0.00001f) - // << ", img_time_diff: " << FULL_PRECISION - // << ((img_time_prev < 0) ? 0 - // : (data.img->timestamp - img_time_prev) * 0.01f) << " - // ms"); - // img_time_prev = data.img->timestamp; - publishCamera(Stream::LEFT, data, left_count_, stamp); - publishMono(Stream::LEFT, data, left_count_, stamp); - NODELET_DEBUG_STREAM( - Stream::LEFT << ", count: " << left_count_ - << ", frame_id: " << data.img->frame_id - << ", timestamp: " << data.img->timestamp - << ", exposure_time: " << data.img->exposure_time); - } - }); - left_time_beg_ = ros::Time::now().toSec(); - is_published_[Stream::LEFT] = true; + static int sum = 0; + int sum_c = 0; + for (auto &&stream : all_streams) { + sum_c += getStreamSubscribers(stream); } - if ((camera_publishers_[Stream::RIGHT].getNumSubscribers() > 0 || - mono_publishers_[Stream::RIGHT].getNumSubscribers() > 0) && - !is_published_[Stream::RIGHT]) { - api_->SetStreamCallback( - Stream::RIGHT, [&](const api::StreamData &data) { - ++right_count_; - if (right_count_ > 10) { - // ros::Time stamp = hardTimeToSoftTime(data.img->timestamp); - ros::Time stamp = checkUpTimeStamp( - data.img->timestamp, Stream::RIGHT); - publishCamera(Stream::RIGHT, data, right_count_, stamp); - publishMono(Stream::RIGHT, data, right_count_, stamp); - NODELET_DEBUG_STREAM( - Stream::RIGHT << ", count: " << right_count_ - << ", frame_id: " << data.img->frame_id - << ", timestamp: " << data.img->timestamp - << ", exposure_time: " << data.img->exposure_time); - } - }); - right_time_beg_ = ros::Time::now().toSec(); - is_published_[Stream::RIGHT] = true; - } - - std::vector other_streams{ - Stream::LEFT_RECTIFIED, Stream::RIGHT_RECTIFIED, - Stream::DISPARITY, Stream::DISPARITY_NORMALIZED, - Stream::POINTS, Stream::DEPTH}; - - for (auto &&stream : other_streams) { - if (stream != Stream::POINTS) { - publishOthers(stream); + if (sum_c != sum) { + if (sum_c == 0) { + api_->Stop(Source::VIDEO_STREAMING); + for (auto &&stream : all_streams) { + is_published_[stream] = false; + } + api_->Start(Source::VIDEO_STREAMING); } else { - publishPoint(stream); + if ((camera_publishers_[Stream::LEFT].getNumSubscribers() > 0 || + mono_publishers_[Stream::LEFT].getNumSubscribers() > 0) && + !is_published_[Stream::LEFT]) { + api_->SetStreamCallback( + Stream::LEFT, [&](const api::StreamData &data) { + ++left_count_; + if (left_count_ > 10) { + // ros::Time stamp = hardTimeToSoftTime(data.img->timestamp); + ros::Time stamp = checkUpTimeStamp( + data.img->timestamp, Stream::LEFT); + + // static double img_time_prev = -1; + // NODELET_INFO_STREAM("ros_time_beg: " << FULL_PRECISION << + // ros_time_beg + // << ", img_time_elapsed: " << FULL_PRECISION + // << ((data.img->timestamp - img_time_beg) * 0.00001f) + // << ", img_time_diff: " << FULL_PRECISION + // << ((img_time_prev < 0) ? 0 + // : (data.img->timestamp - img_time_prev) * 0.01f) << " + // ms"); + // img_time_prev = data.img->timestamp; + publishCamera(Stream::LEFT, data, left_count_, stamp); + publishMono(Stream::LEFT, data, left_count_, stamp); + NODELET_DEBUG_STREAM( + Stream::LEFT << ", count: " << left_count_ + << ", frame_id: " << data.img->frame_id + << ", timestamp: " << data.img->timestamp + << ", exposure_time: " << data.img->exposure_time); + } + }); + left_time_beg_ = ros::Time::now().toSec(); + is_published_[Stream::LEFT] = true; + } + + if ((camera_publishers_[Stream::RIGHT].getNumSubscribers() > 0 || + mono_publishers_[Stream::RIGHT].getNumSubscribers() > 0) && + !is_published_[Stream::RIGHT]) { + api_->SetStreamCallback( + Stream::RIGHT, [&](const api::StreamData &data) { + ++right_count_; + if (right_count_ > 10) { + // ros::Time stamp = hardTimeToSoftTime(data.img->timestamp); + ros::Time stamp = checkUpTimeStamp( + data.img->timestamp, Stream::RIGHT); + publishCamera(Stream::RIGHT, data, right_count_, stamp); + publishMono(Stream::RIGHT, data, right_count_, stamp); + NODELET_DEBUG_STREAM( + Stream::RIGHT << ", count: " << right_count_ + << ", frame_id: " << data.img->frame_id + << ", timestamp: " << data.img->timestamp + << ", exposure_time: " << data.img->exposure_time); + } + }); + right_time_beg_ = ros::Time::now().toSec(); + is_published_[Stream::RIGHT] = true; + } + + std::vector other_streams{ + Stream::LEFT_RECTIFIED, Stream::RIGHT_RECTIFIED, + Stream::DISPARITY, Stream::DISPARITY_NORMALIZED, + Stream::POINTS, Stream::DEPTH + }; + for (auto &&stream : other_streams) { + publishOthers(stream); + } } + sum = sum_c; } if (!is_motion_published_) {