diff --git a/src/mynteye/api/api.cc b/src/mynteye/api/api.cc index 136cac9..d52f198 100644 --- a/src/mynteye/api/api.cc +++ b/src/mynteye/api/api.cc @@ -441,7 +441,11 @@ void API::Stop(const Source &source) { } void API::WaitForStreams() { - synthetic_->WaitForStreams(); + if (correspondence_) { + correspondence_->WaitForStreams(); + } else { + synthetic_->WaitForStreams(); + } } void API::EnableStreamData(const Stream &stream) { @@ -453,11 +457,19 @@ void API::DisableStreamData(const Stream &stream) { } api::StreamData API::GetStreamData(const Stream &stream) { - return synthetic_->GetStreamData(stream); + if (correspondence_ && correspondence_->Watch(stream)) { + return correspondence_->GetStreamData(stream); + } else { + return synthetic_->GetStreamData(stream); + } } std::vector API::GetStreamDatas(const Stream &stream) { - return synthetic_->GetStreamDatas(stream); + if (correspondence_ && correspondence_->Watch(stream)) { + return correspondence_->GetStreamDatas(stream); + } else { + return synthetic_->GetStreamDatas(stream); + } } void API::EnableMotionDatas(std::size_t max_size) { diff --git a/src/mynteye/api/correspondence.cc b/src/mynteye/api/correspondence.cc index 344e7e7..5da3030 100644 --- a/src/mynteye/api/correspondence.cc +++ b/src/mynteye/api/correspondence.cc @@ -22,23 +22,59 @@ Correspondence::Correspondence(const std::shared_ptr &device, const Stream &stream) : device_(device), stream_(stream) { VLOG(2) << __func__; + // set matched stream to be watched too, + // aim to make stream and matched stream correspondence + if (stream_ == Stream::LEFT) { + stream_match_ = Stream::RIGHT; + } else if (stream_ == Stream::RIGHT) { + stream_match_ = Stream::LEFT; + } else if (stream_ == Stream::LEFT_RECTIFIED) { + stream_match_ = Stream::RIGHT_RECTIFIED; + } else if (stream_ == Stream::RIGHT_RECTIFIED) { + stream_match_ = Stream::LEFT_RECTIFIED; + } else { + stream_match_ = Stream::LAST; + } + EnableStreamMatch(); + + auto framerate = device_->GetOptionValue(Option::FRAME_RATE); + stream_interval_us_ = 1000000.f / framerate; + stream_interval_us_half_ = 0.5f * stream_interval_us_; + VLOG(2) << "framerate: " << framerate + << ", interval_us: " << stream_interval_us_; } Correspondence::~Correspondence() { VLOG(2) << __func__; } +bool Correspondence::Watch(const Stream &stream) const { + if (stream == stream_) return true; + if (stream_match_enabled_ && stream == stream_match_) return true; + return false; +} + void Correspondence::OnStreamDataCallback( const Stream &stream, const api::StreamData &data) { // LOG(INFO) << __func__ << ", " << stream // << ", id: " << data.frame_id << ", stamp: " << data.img->timestamp; + if (!Watch(stream)) { + return; // unwatched + } + std::lock_guard _(mtx_stream_datas_); + if (stream == stream_) { + stream_datas_.push_back(data); + } else if (/*stream_match_enabled_ && */stream == stream_match_) { + stream_datas_match_.push_back(data); + } + NotifyStreamDataReady(); } void Correspondence::OnMotionDataCallback(const device::MotionData &data) { // LOG(INFO) << __func__ << ", id: " << data.imu->frame_id // << ", stamp: " << data.imu->timestamp; { - std::lock_guard _(mtx_motion_datas_); + std::lock_guard _(mtx_motion_datas_); motion_datas_.push_back(data); } if (motion_callback_) { @@ -51,8 +87,60 @@ void Correspondence::SetMotionCallback(API::motion_callback_t callback) { motion_callback_ = callback; } +void Correspondence::WaitForStreams() { + if (stream_ == Stream::LEFT || stream_ == Stream::RIGHT) { + // Wait native stream ready, avoid get these stream empty + // Todo: determine native stream according to device + WaitStreamDataReady(); + return; + } + device_->WaitForStreams(); +} + +api::StreamData Correspondence::GetStreamData(const Stream &stream) { + auto datas = GetStreamDatas(stream); + return datas.empty() ? api::StreamData{} : datas.back(); +} + +std::vector Correspondence::GetStreamDatas( + const Stream &stream) { + if (!Watch(stream)) { + LOG(ERROR) << "Get unwatched stream data of " << stream; + return {}; + } + + std::lock_guard _(mtx_stream_datas_); + static std::uint32_t stream_count_ = 0; + static std::uint32_t stream_match_count_ = 0; + + if (stream == stream_) { + auto datas = std::move(stream_datas_); + + if (stream_count_ < 10) { + ++stream_count_; + } else { + // get stream, but not get matched stream, disable it + if (stream_match_count_ == 0) { + DisableStreamMatch(); + } + } + + return datas; + } else if (/*stream_match_enabled_ && */stream == stream_match_) { + auto datas = std::move(stream_datas_match_); + + if (stream_match_count_ < 10) { + ++stream_match_count_; + } + + return datas; + } + + return {}; +} + std::vector Correspondence::GetMotionDatas() { - std::lock_guard _(mtx_motion_datas_); + std::lock_guard _(mtx_motion_datas_); std::vector datas; for (auto &&data : motion_datas_) { datas.push_back({data.imu}); @@ -61,4 +149,53 @@ std::vector Correspondence::GetMotionDatas() { return datas; } +void Correspondence::EnableStreamMatch() { + stream_match_enabled_ = true; +} + +void Correspondence::DisableStreamMatch() { + stream_match_enabled_ = false; + stream_datas_match_.clear(); +} + +void Correspondence::WaitStreamDataReady() { + std::unique_lock lock(mtx_stream_datas_); + auto ready = std::bind(&Correspondence::IsStreamDataReady, this); + bool ok = cond_stream_datas_.wait_for(lock, std::chrono::seconds(3), ready); + if (!ok) { + LOG(FATAL) << "Timeout waiting for key frames. Please use USB 3.0, and not " + "in virtual machine."; + } +} + +void Correspondence::NotifyStreamDataReady() { + cond_stream_datas_.notify_one(); +} + +bool Correspondence::IsStreamDataReady() { + if (stream_datas_.empty()) return false; + if (motion_datas_.empty()) return false; + + std::uint64_t img_stamp = 0; + { + std::lock_guard _(mtx_stream_datas_); + auto data = stream_datas_.front(); + if (data.img == nullptr) { + LOG(FATAL) << "stream data image info is empty!"; + } + img_stamp = data.img->timestamp; + } + std::uint64_t imu_stamp = 0; + { + std::lock_guard _(mtx_motion_datas_); + auto data = motion_datas_.back(); + if (data.imu == nullptr) { + LOG(FATAL) << "motion data imu info is empty!"; + } + imu_stamp = data.imu->timestamp; + } + + return img_stamp + stream_interval_us_half_ < imu_stamp; +} + MYNTEYE_END_NAMESPACE diff --git a/src/mynteye/api/correspondence.h b/src/mynteye/api/correspondence.h index 3191e36..c415731 100644 --- a/src/mynteye/api/correspondence.h +++ b/src/mynteye/api/correspondence.h @@ -15,6 +15,7 @@ #define MYNTEYE_API_CONFIG_H_ #pragma once +#include #include #include #include @@ -29,19 +30,43 @@ class Correspondence { Correspondence(const std::shared_ptr &device, const Stream &stream); ~Correspondence(); + bool Watch(const Stream &stream) const; + void OnStreamDataCallback(const Stream &stream, const api::StreamData &data); void OnMotionDataCallback(const device::MotionData &data); void SetMotionCallback(API::motion_callback_t callback); + void WaitForStreams(); + api::StreamData GetStreamData(const Stream &stream); + std::vector GetStreamDatas(const Stream &stream); std::vector GetMotionDatas(); private: + void EnableStreamMatch(); + void DisableStreamMatch(); + + void WaitStreamDataReady(); + void NotifyStreamDataReady(); + + bool IsStreamDataReady(); + std::shared_ptr device_; Stream stream_; + Stream stream_match_; + bool stream_match_enabled_; + + float stream_interval_us_; + float stream_interval_us_half_; + API::motion_callback_t motion_callback_; std::vector motion_datas_; - std::mutex mtx_motion_datas_; + std::recursive_mutex mtx_motion_datas_; + + std::vector stream_datas_; + std::vector stream_datas_match_; + std::recursive_mutex mtx_stream_datas_; + std::condition_variable_any cond_stream_datas_; }; MYNTEYE_END_NAMESPACE