feat(correspondence): wait img when imu correspondence

This commit is contained in:
John Zhao 2019-02-22 12:15:18 +08:00
parent 73cca48f57
commit ab52fd5280
3 changed files with 180 additions and 6 deletions

View File

@ -441,8 +441,12 @@ void API::Stop(const Source &source) {
} }
void API::WaitForStreams() { void API::WaitForStreams() {
if (correspondence_) {
correspondence_->WaitForStreams();
} else {
synthetic_->WaitForStreams(); synthetic_->WaitForStreams();
} }
}
void API::EnableStreamData(const Stream &stream) { void API::EnableStreamData(const Stream &stream) {
synthetic_->EnableStreamData(stream); synthetic_->EnableStreamData(stream);
@ -453,12 +457,20 @@ void API::DisableStreamData(const Stream &stream) {
} }
api::StreamData API::GetStreamData(const Stream &stream) { api::StreamData API::GetStreamData(const Stream &stream) {
if (correspondence_ && correspondence_->Watch(stream)) {
return correspondence_->GetStreamData(stream);
} else {
return synthetic_->GetStreamData(stream); return synthetic_->GetStreamData(stream);
} }
}
std::vector<api::StreamData> API::GetStreamDatas(const Stream &stream) { std::vector<api::StreamData> API::GetStreamDatas(const Stream &stream) {
if (correspondence_ && correspondence_->Watch(stream)) {
return correspondence_->GetStreamDatas(stream);
} else {
return synthetic_->GetStreamDatas(stream); return synthetic_->GetStreamDatas(stream);
} }
}
void API::EnableMotionDatas(std::size_t max_size) { void API::EnableMotionDatas(std::size_t max_size) {
if (correspondence_) return; // not cache them if (correspondence_) return; // not cache them

View File

@ -22,23 +22,59 @@ Correspondence::Correspondence(const std::shared_ptr<Device> &device,
const Stream &stream) const Stream &stream)
: device_(device), stream_(stream) { : device_(device), stream_(stream) {
VLOG(2) << __func__; VLOG(2) << __func__;
// set matched stream to be watched too,
// aim to make stream and matched stream correspondence
if (stream_ == Stream::LEFT) {
stream_match_ = Stream::RIGHT;
} else if (stream_ == Stream::RIGHT) {
stream_match_ = Stream::LEFT;
} else if (stream_ == Stream::LEFT_RECTIFIED) {
stream_match_ = Stream::RIGHT_RECTIFIED;
} else if (stream_ == Stream::RIGHT_RECTIFIED) {
stream_match_ = Stream::LEFT_RECTIFIED;
} else {
stream_match_ = Stream::LAST;
}
EnableStreamMatch();
auto framerate = device_->GetOptionValue(Option::FRAME_RATE);
stream_interval_us_ = 1000000.f / framerate;
stream_interval_us_half_ = 0.5f * stream_interval_us_;
VLOG(2) << "framerate: " << framerate
<< ", interval_us: " << stream_interval_us_;
} }
Correspondence::~Correspondence() { Correspondence::~Correspondence() {
VLOG(2) << __func__; VLOG(2) << __func__;
} }
bool Correspondence::Watch(const Stream &stream) const {
if (stream == stream_) return true;
if (stream_match_enabled_ && stream == stream_match_) return true;
return false;
}
void Correspondence::OnStreamDataCallback( void Correspondence::OnStreamDataCallback(
const Stream &stream, const api::StreamData &data) { const Stream &stream, const api::StreamData &data) {
// LOG(INFO) << __func__ << ", " << stream // LOG(INFO) << __func__ << ", " << stream
// << ", id: " << data.frame_id << ", stamp: " << data.img->timestamp; // << ", id: " << data.frame_id << ", stamp: " << data.img->timestamp;
if (!Watch(stream)) {
return; // unwatched
}
std::lock_guard<std::recursive_mutex> _(mtx_stream_datas_);
if (stream == stream_) {
stream_datas_.push_back(data);
} else if (/*stream_match_enabled_ && */stream == stream_match_) {
stream_datas_match_.push_back(data);
}
NotifyStreamDataReady();
} }
void Correspondence::OnMotionDataCallback(const device::MotionData &data) { void Correspondence::OnMotionDataCallback(const device::MotionData &data) {
// LOG(INFO) << __func__ << ", id: " << data.imu->frame_id // LOG(INFO) << __func__ << ", id: " << data.imu->frame_id
// << ", stamp: " << data.imu->timestamp; // << ", stamp: " << data.imu->timestamp;
{ {
std::lock_guard<std::mutex> _(mtx_motion_datas_); std::lock_guard<std::recursive_mutex> _(mtx_motion_datas_);
motion_datas_.push_back(data); motion_datas_.push_back(data);
} }
if (motion_callback_) { if (motion_callback_) {
@ -51,8 +87,60 @@ void Correspondence::SetMotionCallback(API::motion_callback_t callback) {
motion_callback_ = callback; motion_callback_ = callback;
} }
void Correspondence::WaitForStreams() {
if (stream_ == Stream::LEFT || stream_ == Stream::RIGHT) {
// Wait native stream ready, avoid get these stream empty
// Todo: determine native stream according to device
WaitStreamDataReady();
return;
}
device_->WaitForStreams();
}
api::StreamData Correspondence::GetStreamData(const Stream &stream) {
auto datas = GetStreamDatas(stream);
return datas.empty() ? api::StreamData{} : datas.back();
}
std::vector<api::StreamData> Correspondence::GetStreamDatas(
const Stream &stream) {
if (!Watch(stream)) {
LOG(ERROR) << "Get unwatched stream data of " << stream;
return {};
}
std::lock_guard<std::recursive_mutex> _(mtx_stream_datas_);
static std::uint32_t stream_count_ = 0;
static std::uint32_t stream_match_count_ = 0;
if (stream == stream_) {
auto datas = std::move(stream_datas_);
if (stream_count_ < 10) {
++stream_count_;
} else {
// get stream, but not get matched stream, disable it
if (stream_match_count_ == 0) {
DisableStreamMatch();
}
}
return datas;
} else if (/*stream_match_enabled_ && */stream == stream_match_) {
auto datas = std::move(stream_datas_match_);
if (stream_match_count_ < 10) {
++stream_match_count_;
}
return datas;
}
return {};
}
std::vector<api::MotionData> Correspondence::GetMotionDatas() { std::vector<api::MotionData> Correspondence::GetMotionDatas() {
std::lock_guard<std::mutex> _(mtx_motion_datas_); std::lock_guard<std::recursive_mutex> _(mtx_motion_datas_);
std::vector<api::MotionData> datas; std::vector<api::MotionData> datas;
for (auto &&data : motion_datas_) { for (auto &&data : motion_datas_) {
datas.push_back({data.imu}); datas.push_back({data.imu});
@ -61,4 +149,53 @@ std::vector<api::MotionData> Correspondence::GetMotionDatas() {
return datas; return datas;
} }
void Correspondence::EnableStreamMatch() {
stream_match_enabled_ = true;
}
void Correspondence::DisableStreamMatch() {
stream_match_enabled_ = false;
stream_datas_match_.clear();
}
void Correspondence::WaitStreamDataReady() {
std::unique_lock<std::recursive_mutex> lock(mtx_stream_datas_);
auto ready = std::bind(&Correspondence::IsStreamDataReady, this);
bool ok = cond_stream_datas_.wait_for(lock, std::chrono::seconds(3), ready);
if (!ok) {
LOG(FATAL) << "Timeout waiting for key frames. Please use USB 3.0, and not "
"in virtual machine.";
}
}
void Correspondence::NotifyStreamDataReady() {
cond_stream_datas_.notify_one();
}
bool Correspondence::IsStreamDataReady() {
if (stream_datas_.empty()) return false;
if (motion_datas_.empty()) return false;
std::uint64_t img_stamp = 0;
{
std::lock_guard<std::recursive_mutex> _(mtx_stream_datas_);
auto data = stream_datas_.front();
if (data.img == nullptr) {
LOG(FATAL) << "stream data image info is empty!";
}
img_stamp = data.img->timestamp;
}
std::uint64_t imu_stamp = 0;
{
std::lock_guard<std::recursive_mutex> _(mtx_motion_datas_);
auto data = motion_datas_.back();
if (data.imu == nullptr) {
LOG(FATAL) << "motion data imu info is empty!";
}
imu_stamp = data.imu->timestamp;
}
return img_stamp + stream_interval_us_half_ < imu_stamp;
}
MYNTEYE_END_NAMESPACE MYNTEYE_END_NAMESPACE

View File

@ -15,6 +15,7 @@
#define MYNTEYE_API_CONFIG_H_ #define MYNTEYE_API_CONFIG_H_
#pragma once #pragma once
#include <condition_variable>
#include <memory> #include <memory>
#include <mutex> #include <mutex>
#include <vector> #include <vector>
@ -29,19 +30,43 @@ class Correspondence {
Correspondence(const std::shared_ptr<Device> &device, const Stream &stream); Correspondence(const std::shared_ptr<Device> &device, const Stream &stream);
~Correspondence(); ~Correspondence();
bool Watch(const Stream &stream) const;
void OnStreamDataCallback(const Stream &stream, const api::StreamData &data); void OnStreamDataCallback(const Stream &stream, const api::StreamData &data);
void OnMotionDataCallback(const device::MotionData &data); void OnMotionDataCallback(const device::MotionData &data);
void SetMotionCallback(API::motion_callback_t callback); void SetMotionCallback(API::motion_callback_t callback);
void WaitForStreams();
api::StreamData GetStreamData(const Stream &stream);
std::vector<api::StreamData> GetStreamDatas(const Stream &stream);
std::vector<api::MotionData> GetMotionDatas(); std::vector<api::MotionData> GetMotionDatas();
private: private:
void EnableStreamMatch();
void DisableStreamMatch();
void WaitStreamDataReady();
void NotifyStreamDataReady();
bool IsStreamDataReady();
std::shared_ptr<Device> device_; std::shared_ptr<Device> device_;
Stream stream_; Stream stream_;
Stream stream_match_;
bool stream_match_enabled_;
float stream_interval_us_;
float stream_interval_us_half_;
API::motion_callback_t motion_callback_; API::motion_callback_t motion_callback_;
std::vector<device::MotionData> motion_datas_; std::vector<device::MotionData> motion_datas_;
std::mutex mtx_motion_datas_; std::recursive_mutex mtx_motion_datas_;
std::vector<api::StreamData> stream_datas_;
std::vector<api::StreamData> stream_datas_match_;
std::recursive_mutex mtx_stream_datas_;
std::condition_variable_any cond_stream_datas_;
}; };
MYNTEYE_END_NAMESPACE MYNTEYE_END_NAMESPACE