Merge branch into develop

This commit is contained in:
John Zhao 2019-02-22 14:08:47 +08:00
commit 11795c54e5
6 changed files with 223 additions and 152 deletions

View File

@ -92,6 +92,8 @@ class MYNTEYE_API API {
using stream_callback_t = std::function<void(const api::StreamData &data)>; using stream_callback_t = std::function<void(const api::StreamData &data)>;
/** The api::MotionData callback. */ /** The api::MotionData callback. */
using motion_callback_t = std::function<void(const api::MotionData &data)>; using motion_callback_t = std::function<void(const api::MotionData &data)>;
/** The enable/disable switch callback. */
using stream_switch_callback_t = std::function<void(const Stream &stream)>;
explicit API(std::shared_ptr<Device> device, CalibrationModel calib_model); explicit API(std::shared_ptr<Device> device, CalibrationModel calib_model);
virtual ~API(); virtual ~API();
@ -281,11 +283,31 @@ class MYNTEYE_API API {
* still support this stream. * still support this stream.
*/ */
void EnableStreamData(const Stream &stream); void EnableStreamData(const Stream &stream);
/**
* Enable the data of stream.
* callback function will call before the father processor enable.
* when try_tag is true, the function will do nothing except callback.
*/
void EnableStreamData(
const Stream &stream,
stream_switch_callback_t callback,
bool try_tag = false);
/** /**
* Disable the data of stream. * Disable the data of stream.
*/ */
void DisableStreamData(const Stream &stream); void DisableStreamData(const Stream &stream);
/**
* Disable the data of stream.
* callback function will call before the children processor disable.
* when try_tag is true, the function will do nothing except callback.
*/
void DisableStreamData(
const Stream &stream,
stream_switch_callback_t callback,
bool try_tag = false);
/** /**
* Get the latest data of stream. * Get the latest data of stream.
*/ */

View File

@ -456,6 +456,17 @@ void API::DisableStreamData(const Stream &stream) {
synthetic_->DisableStreamData(stream); synthetic_->DisableStreamData(stream);
} }
void API::EnableStreamData(
const Stream &stream, stream_switch_callback_t callback,
bool try_tag) {
synthetic_->EnableStreamData(stream, callback, try_tag);
}
void API::DisableStreamData(
const Stream &stream, stream_switch_callback_t callback,
bool try_tag) {
synthetic_->DisableStreamData(stream, callback, try_tag);
}
api::StreamData API::GetStreamData(const Stream &stream) { api::StreamData API::GetStreamData(const Stream &stream) {
if (correspondence_ && correspondence_->Watch(stream)) { if (correspondence_ && correspondence_->Watch(stream)) {
return correspondence_->GetStreamData(stream); return correspondence_->GetStreamData(stream);

View File

@ -238,27 +238,6 @@ bool Synthetic::checkControlDateWithStream(const Stream& stream) const {
return false; return false;
} }
void Synthetic::EnableStreamData(const Stream &stream) {
// Activate processors of synthetic stream
auto processor = getProcessorWithStream(stream);
iterate_processor_CtoP_before(processor,
[](std::shared_ptr<Processor> proce){
auto streams = proce->getTargetStreams();
int act_tag = 0;
for (unsigned int i = 0; i < proce->getStreamsSum() ; i++) {
if (proce->target_streams_[i].enabled_mode_ == MODE_LAST) {
act_tag++;
proce->target_streams_[i].enabled_mode_ = MODE_SYNTHETIC;
}
}
if (act_tag > 0 && !proce->IsActivated()) {
// std::cout << proce->Name() << " Active now" << std::endl;
proce->Activate();
}
});
}
bool Synthetic::Supports(const Stream &stream) const { bool Synthetic::Supports(const Stream &stream) const {
return checkControlDateWithStream(stream); return checkControlDateWithStream(stream);
} }
@ -271,16 +250,45 @@ Synthetic::mode_t Synthetic::SupportsMode(const Stream &stream) const {
return MODE_LAST; return MODE_LAST;
} }
void Synthetic::DisableStreamData(const Stream &stream) { void Synthetic::EnableStreamData(
const Stream &stream, stream_switch_callback_t callback,
bool try_tag) {
// Activate processors of synthetic stream
auto processor = getProcessorWithStream(stream);
iterate_processor_CtoP_before(processor,
[callback, try_tag](std::shared_ptr<Processor> proce){
auto streams = proce->getTargetStreams();
int act_tag = 0;
for (unsigned int i = 0; i < proce->getStreamsSum() ; i++) {
if (proce->target_streams_[i].enabled_mode_ == MODE_LAST) {
callback(proce->target_streams_[i].stream);
if (!try_tag) {
act_tag++;
proce->target_streams_[i].enabled_mode_ = MODE_SYNTHETIC;
}
}
}
if (act_tag > 0 && !proce->IsActivated()) {
// std::cout << proce->Name() << " Active now" << std::endl;
proce->Activate();
}
});
}
void Synthetic::DisableStreamData(
const Stream &stream, stream_switch_callback_t callback,
bool try_tag) {
auto processor = getProcessorWithStream(stream); auto processor = getProcessorWithStream(stream);
iterate_processor_PtoC_before(processor, iterate_processor_PtoC_before(processor,
[](std::shared_ptr<Processor> proce){ [callback, try_tag](std::shared_ptr<Processor> proce){
auto streams = proce->getTargetStreams(); auto streams = proce->getTargetStreams();
int act_tag = 0; int act_tag = 0;
for (unsigned int i = 0; i < proce->getStreamsSum() ; i++) { for (unsigned int i = 0; i < proce->getStreamsSum() ; i++) {
if (proce->target_streams_[i].enabled_mode_ == MODE_SYNTHETIC) { if (proce->target_streams_[i].enabled_mode_ == MODE_SYNTHETIC) {
act_tag++; callback(proce->target_streams_[i].stream);
proce->target_streams_[i].enabled_mode_ = MODE_LAST; if (!try_tag) {
act_tag++;
proce->target_streams_[i].enabled_mode_ = MODE_LAST;
}
} }
} }
if (act_tag > 0 && proce->IsActivated()) { if (act_tag > 0 && proce->IsActivated()) {
@ -290,6 +298,20 @@ void Synthetic::DisableStreamData(const Stream &stream) {
}); });
} }
void Synthetic::EnableStreamData(const Stream &stream) {
EnableStreamData(stream, [](const Stream &stream){
// std::cout << stream << "enabled in callback" << std::endl;
MYNTEYE_UNUSED(stream);
}, false);
}
void Synthetic::DisableStreamData(const Stream &stream) {
DisableStreamData(stream, [](const Stream &stream){
// std::cout << stream << "disabled in callback" << std::endl;
MYNTEYE_UNUSED(stream);
}, false);
}
bool Synthetic::IsStreamDataEnabled(const Stream &stream) const { bool Synthetic::IsStreamDataEnabled(const Stream &stream) const {
if (checkControlDateWithStream(stream)) { if (checkControlDateWithStream(stream)) {
auto data = getControlDateWithStream(stream); auto data = getControlDateWithStream(stream);

View File

@ -37,6 +37,7 @@ class Synthetic {
using stream_callback_t = API::stream_callback_t; using stream_callback_t = API::stream_callback_t;
using stream_data_listener_t = using stream_data_listener_t =
std::function<void(const Stream &stream, const api::StreamData &data)>; std::function<void(const Stream &stream, const api::StreamData &data)>;
using stream_switch_callback_t = API::stream_switch_callback_t;
typedef enum Mode { typedef enum Mode {
MODE_NATIVE, // Native stream MODE_NATIVE, // Native stream
@ -63,6 +64,11 @@ class Synthetic {
void EnableStreamData(const Stream &stream); void EnableStreamData(const Stream &stream);
void DisableStreamData(const Stream &stream); void DisableStreamData(const Stream &stream);
void EnableStreamData(
const Stream &stream, stream_switch_callback_t callback, bool try_tag);
void DisableStreamData(
const Stream &stream, stream_switch_callback_t callback, bool try_tag);
bool IsStreamDataEnabled(const Stream &stream) const; bool IsStreamDataEnabled(const Stream &stream) const;
void SetStreamCallback(const Stream &stream, stream_callback_t callback); void SetStreamCallback(const Stream &stream, stream_callback_t callback);

View File

@ -347,6 +347,10 @@
- 'image_transport/compressedDepth' - 'image_transport/compressedDepth'
</rosparam> </rosparam>
</group> </group>
<group ns="$(arg depth_topic)">
<rosparam param="disable_pub_plugins">
- 'image_transport/compressedDepth'
</rosparam>
</group>
</group> <!-- mynteye --> </group> <!-- mynteye -->
</launch> </launch>

View File

@ -554,75 +554,41 @@ class ROSWrapperNodelet : public nodelet::Nodelet {
return true; return true;
} }
void SetIsPublished(const Stream &stream) { void publishData(
is_published_[stream] = false; const Stream &stream, const api::StreamData &data, std::uint32_t seq,
switch (stream) { ros::Time stamp) {
case Stream::LEFT_RECTIFIED: { if (stream == Stream::LEFT || stream == Stream::RIGHT) {
if (is_published_[Stream::RIGHT_RECTIFIED]) { return;
SetIsPublished(Stream::RIGHT_RECTIFIED); } else if (stream == Stream::POINTS) {
} publishPoints(data, seq, stamp);
if (is_published_[Stream::DISPARITY]) { } else {
SetIsPublished(Stream::DISPARITY); publishCamera(stream, data, seq, stamp);
}
} break;
case Stream::RIGHT_RECTIFIED: {
if (is_published_[Stream::LEFT_RECTIFIED]) {
SetIsPublished(Stream::LEFT_RECTIFIED);
}
if (is_published_[Stream::DISPARITY]) {
SetIsPublished(Stream::DISPARITY);
}
} break;
case Stream::DISPARITY: {
if (is_published_[Stream::DISPARITY_NORMALIZED]) {
SetIsPublished(Stream::DISPARITY_NORMALIZED);
}
if (is_published_[Stream::POINTS]) {
SetIsPublished(Stream::POINTS);
}
} break;
case Stream::DISPARITY_NORMALIZED: {
} break;
case Stream::POINTS: {
if (is_published_[Stream::DEPTH]) {
SetIsPublished(Stream::DEPTH);
}
} break;
case Stream::DEPTH: {
} break;
default:
return;
} }
} }
void publishPoint(const Stream &stream) { int getStreamSubscribers(const Stream &stream) {
auto &&points_num = points_publisher_.getNumSubscribers(); if (stream == Stream::POINTS) {
if (points_num == 0 && is_published_[stream]) { return points_publisher_.getNumSubscribers();
SetIsPublished(stream);
api_->DisableStreamData(stream);
} else if (points_num > 0 && !is_published_[Stream::POINTS]) {
api_->EnableStreamData(Stream::POINTS);
api_->SetStreamCallback(
Stream::POINTS, [this](const api::StreamData &data) {
// ros::Time stamp = hardTimeToSoftTime(data.img->timestamp);
ros::Time stamp = checkUpTimeStamp(
data.img->timestamp, Stream::POINTS);
static std::size_t count = 0;
++count;
publishPoints(data, count, stamp);
});
is_published_[Stream::POINTS] = true;
} }
auto pub = camera_publishers_[stream];
if (pub)
return pub.getNumSubscribers();
return -1;
} }
void publishOthers(const Stream &stream) { void publishOthers(const Stream &stream) {
auto stream_num = camera_publishers_[stream].getNumSubscribers(); // std::cout << stream << "===============================" << std::endl;
if (stream_num == 0 && is_published_[stream]) { // int enable_tag = 0;
// Stop computing when was not subcribed // api_->EnableStreamData(stream, [&](const Stream &stream) {
SetIsPublished(stream); // enable_tag += getStreamSubscribers(stream);
api_->DisableStreamData(stream); // std::cout << "EnableStreamData callback test"
} else if (stream_num > 0 && !is_published_[stream]) { // << stream << "|| Sum:"
// Start computing and publishing when was subcribed // << getStreamSubscribers(stream) << std::endl;
// }, true);
if (getStreamSubscribers(stream) > 0 && !is_published_[stream]) {
// std::cout << stream
// <<" enableStreamData tag = 0 return" << std::endl;
// std::cout << "enable " << stream << std::endl;
api_->EnableStreamData(stream); api_->EnableStreamData(stream);
api_->SetStreamCallback( api_->SetStreamCallback(
stream, [this, stream](const api::StreamData &data) { stream, [this, stream](const api::StreamData &data) {
@ -631,81 +597,121 @@ class ROSWrapperNodelet : public nodelet::Nodelet {
data.img->timestamp, stream); data.img->timestamp, stream);
static std::size_t count = 0; static std::size_t count = 0;
++count; ++count;
publishCamera(stream, data, count, stamp); publishData(stream, data, count, stamp);
}); });
is_published_[stream] = true; is_published_[stream] = true;
return;
}
int disable_tag = 0;
api_->DisableStreamData(stream, [&](const Stream &stream) {
disable_tag += getStreamSubscribers(stream);
// std::cout << "DisableStreamData callback test: "
// << stream << "|| Sum:"<< getStreamSubscribers(stream) << std::endl;
}, true);
if (disable_tag == 0 && is_published_[stream]) {
api_->DisableStreamData(stream, [&](const Stream &stream) {
// std::cout << "disable " << stream << std::endl;
api_->SetStreamCallback(stream, nullptr);
is_published_[stream] = false;
});
return;
} }
} }
void publishTopics() { void publishTopics() {
if ((camera_publishers_[Stream::LEFT].getNumSubscribers() > 0 || std::vector<Stream> all_streams{
mono_publishers_[Stream::LEFT].getNumSubscribers() > 0) && Stream::RIGHT,
!is_published_[Stream::LEFT]) { Stream::LEFT,
api_->SetStreamCallback( Stream::LEFT_RECTIFIED,
Stream::LEFT, [&](const api::StreamData &data) { Stream::RIGHT_RECTIFIED,
++left_count_; Stream::DISPARITY,
if (left_count_ > 10) { Stream::DISPARITY_NORMALIZED,
// ros::Time stamp = hardTimeToSoftTime(data.img->timestamp); Stream::POINTS,
ros::Time stamp = checkUpTimeStamp( Stream::DEPTH
data.img->timestamp, Stream::LEFT); };
// static double img_time_prev = -1; static int sum = 0;
// NODELET_INFO_STREAM("ros_time_beg: " << FULL_PRECISION << int sum_c = 0;
// ros_time_beg for (auto &&stream : all_streams) {
// << ", img_time_elapsed: " << FULL_PRECISION sum_c += getStreamSubscribers(stream);
// << ((data.img->timestamp - img_time_beg) * 0.00001f)
// << ", img_time_diff: " << FULL_PRECISION
// << ((img_time_prev < 0) ? 0
// : (data.img->timestamp - img_time_prev) * 0.01f) << "
// ms");
// img_time_prev = data.img->timestamp;
publishCamera(Stream::LEFT, data, left_count_, stamp);
publishMono(Stream::LEFT, data, left_count_, stamp);
NODELET_DEBUG_STREAM(
Stream::LEFT << ", count: " << left_count_
<< ", frame_id: " << data.img->frame_id
<< ", timestamp: " << data.img->timestamp
<< ", exposure_time: " << data.img->exposure_time);
}
});
left_time_beg_ = ros::Time::now().toSec();
is_published_[Stream::LEFT] = true;
} }
if ((camera_publishers_[Stream::RIGHT].getNumSubscribers() > 0 || if (sum_c != sum) {
mono_publishers_[Stream::RIGHT].getNumSubscribers() > 0) && if (sum_c == 0) {
!is_published_[Stream::RIGHT]) { api_->Stop(Source::VIDEO_STREAMING);
api_->SetStreamCallback( for (auto &&stream : all_streams) {
Stream::RIGHT, [&](const api::StreamData &data) { is_published_[stream] = false;
++right_count_; }
if (right_count_ > 10) { api_->Start(Source::VIDEO_STREAMING);
// ros::Time stamp = hardTimeToSoftTime(data.img->timestamp);
ros::Time stamp = checkUpTimeStamp(
data.img->timestamp, Stream::RIGHT);
publishCamera(Stream::RIGHT, data, right_count_, stamp);
publishMono(Stream::RIGHT, data, right_count_, stamp);
NODELET_DEBUG_STREAM(
Stream::RIGHT << ", count: " << right_count_
<< ", frame_id: " << data.img->frame_id
<< ", timestamp: " << data.img->timestamp
<< ", exposure_time: " << data.img->exposure_time);
}
});
right_time_beg_ = ros::Time::now().toSec();
is_published_[Stream::RIGHT] = true;
}
std::vector<Stream> other_streams{
Stream::LEFT_RECTIFIED, Stream::RIGHT_RECTIFIED,
Stream::DISPARITY, Stream::DISPARITY_NORMALIZED,
Stream::POINTS, Stream::DEPTH};
for (auto &&stream : other_streams) {
if (stream != Stream::POINTS) {
publishOthers(stream);
} else { } else {
publishPoint(stream); if ((camera_publishers_[Stream::LEFT].getNumSubscribers() > 0 ||
mono_publishers_[Stream::LEFT].getNumSubscribers() > 0) &&
!is_published_[Stream::LEFT]) {
api_->SetStreamCallback(
Stream::LEFT, [&](const api::StreamData &data) {
++left_count_;
if (left_count_ > 10) {
// ros::Time stamp = hardTimeToSoftTime(data.img->timestamp);
ros::Time stamp = checkUpTimeStamp(
data.img->timestamp, Stream::LEFT);
// static double img_time_prev = -1;
// NODELET_INFO_STREAM("ros_time_beg: " << FULL_PRECISION <<
// ros_time_beg
// << ", img_time_elapsed: " << FULL_PRECISION
// << ((data.img->timestamp - img_time_beg) * 0.00001f)
// << ", img_time_diff: " << FULL_PRECISION
// << ((img_time_prev < 0) ? 0
// : (data.img->timestamp - img_time_prev) * 0.01f) << "
// ms");
// img_time_prev = data.img->timestamp;
publishCamera(Stream::LEFT, data, left_count_, stamp);
publishMono(Stream::LEFT, data, left_count_, stamp);
NODELET_DEBUG_STREAM(
Stream::LEFT << ", count: " << left_count_
<< ", frame_id: " << data.img->frame_id
<< ", timestamp: " << data.img->timestamp
<< ", exposure_time: " << data.img->exposure_time);
}
});
left_time_beg_ = ros::Time::now().toSec();
is_published_[Stream::LEFT] = true;
}
if ((camera_publishers_[Stream::RIGHT].getNumSubscribers() > 0 ||
mono_publishers_[Stream::RIGHT].getNumSubscribers() > 0) &&
!is_published_[Stream::RIGHT]) {
api_->SetStreamCallback(
Stream::RIGHT, [&](const api::StreamData &data) {
++right_count_;
if (right_count_ > 10) {
// ros::Time stamp = hardTimeToSoftTime(data.img->timestamp);
ros::Time stamp = checkUpTimeStamp(
data.img->timestamp, Stream::RIGHT);
publishCamera(Stream::RIGHT, data, right_count_, stamp);
publishMono(Stream::RIGHT, data, right_count_, stamp);
NODELET_DEBUG_STREAM(
Stream::RIGHT << ", count: " << right_count_
<< ", frame_id: " << data.img->frame_id
<< ", timestamp: " << data.img->timestamp
<< ", exposure_time: " << data.img->exposure_time);
}
});
right_time_beg_ = ros::Time::now().toSec();
is_published_[Stream::RIGHT] = true;
}
std::vector<Stream> other_streams{
Stream::LEFT_RECTIFIED, Stream::RIGHT_RECTIFIED,
Stream::DISPARITY, Stream::DISPARITY_NORMALIZED,
Stream::POINTS, Stream::DEPTH
};
for (auto &&stream : other_streams) {
publishOthers(stream);
}
} }
sum = sum_c;
} }
if (!is_motion_published_) { if (!is_motion_published_) {