From 425b18121684c682887f292cdd89728b94374c59 Mon Sep 17 00:00:00 2001 From: John Zhao Date: Thu, 31 May 2018 10:20:48 +0800 Subject: [PATCH] Improve lock stream datas --- src/device/device.cc | 18 +++++++----------- src/device/device.h | 3 --- src/internal/streams.cc | 17 +++++++++++++++-- src/internal/streams.h | 2 +- 4 files changed, 23 insertions(+), 17 deletions(-) diff --git a/src/device/device.cc b/src/device/device.cc index 7433f33..a692c2a 100644 --- a/src/device/device.cc +++ b/src/device/device.cc @@ -337,14 +337,12 @@ void Device::WaitForStreams() { std::vector Device::GetStreamDatas(const Stream &stream) { CHECK(video_streaming_); CHECK_NOTNULL(streams_); - std::lock_guard _(mtx_streams_); return streams_->GetStreamDatas(stream); } device::StreamData Device::GetLatestStreamData(const Stream &stream) { CHECK(video_streaming_); CHECK_NOTNULL(streams_); - std::lock_guard _(mtx_streams_); return streams_->GetLatestStreamData(stream); } @@ -410,17 +408,15 @@ void Device::StartVideoStreaming() { --drop_count; return; } - std::lock_guard _(mtx_streams_); - streams_->PushStream(Capabilities::STEREO, data); - if (HasStreamCallback(Stream::LEFT)) { - auto &&stream_datas = streams_->stream_datas(Stream::LEFT); - if (stream_datas.size() > 0) { + if (streams_->PushStream(Capabilities::STEREO, data)) { + if (HasStreamCallback(Stream::LEFT)) { + auto &&stream_datas = streams_->stream_datas(Stream::LEFT); + // if (stream_datas.size() > 0) {} stream_callbacks_.at(Stream::LEFT)(stream_datas.back()); } - } - if (HasStreamCallback(Stream::RIGHT)) { - auto &&stream_datas = streams_->stream_datas(Stream::RIGHT); - if (stream_datas.size() > 0) { + if (HasStreamCallback(Stream::RIGHT)) { + auto &&stream_datas = streams_->stream_datas(Stream::RIGHT); + // if (stream_datas.size() > 0) {} stream_callbacks_.at(Stream::RIGHT)(stream_datas.back()); } } diff --git a/src/device/device.h b/src/device/device.h index e74da35..7998d18 100644 --- a/src/device/device.h +++ b/src/device/device.h @@ -18,7 +18,6 @@ #include #include #include -#include #include #include @@ -261,8 +260,6 @@ class MYNTEYE_API Device { std::map stream_config_requests_; - std::mutex mtx_streams_; - std::shared_ptr channels_; std::shared_ptr motions_; diff --git a/src/internal/streams.cc b/src/internal/streams.cc index 23bb66b..fccef49 100644 --- a/src/internal/streams.cc +++ b/src/internal/streams.cc @@ -132,12 +132,13 @@ void Streams::ConfigStream( stream_config_requests_[capability] = request; } -void Streams::PushStream(const Capabilities &capability, const void *data) { +bool Streams::PushStream(const Capabilities &capability, const void *data) { if (!HasStreamConfigRequest(capability)) { LOG(FATAL) << "Cannot push stream without stream config request"; } std::unique_lock lock(mtx_); auto &&request = GetStreamConfigRequest(capability); + bool pushed = false; switch (capability) { case Capabilities::STEREO: { // alloc left @@ -155,10 +156,12 @@ void Streams::PushStream(const Capabilities &capability, const void *data) { data, request, left_data.frame.get()); unpack_img_pixels_map_[Stream::RIGHT]( data, request, right_data.frame.get()); + pushed = true; } else { // discard left DiscardStreamData(Stream::LEFT); LOG(WARNING) << "Image packet is unaccepted, frame dropped"; + pushed = false; } } break; default: @@ -166,6 +169,7 @@ void Streams::PushStream(const Capabilities &capability, const void *data) { } if (HasKeyStreamDatas()) cv_.notify_one(); + return pushed; } void Streams::WaitForStreams() { @@ -204,10 +208,19 @@ Streams::stream_datas_t Streams::GetStreamDatas(const Stream &stream) { } Streams::stream_data_t Streams::GetLatestStreamData(const Stream &stream) { - return GetStreamDatas(stream).back(); + std::unique_lock lock(mtx_); + if (!HasStreamDatas(stream) || stream_datas_map_.at(stream).empty()) { + LOG(WARNING) << "There are no stream datas of " << stream + << ". Did you call WaitForStreams() before this?"; + return {}; + } + stream_datas_t datas = stream_datas_map_.at(stream); + stream_datas_map_[stream].clear(); + return datas.back(); } const Streams::stream_datas_t &Streams::stream_datas(const Stream &stream) { + std::unique_lock lock(mtx_); try { return stream_datas_map_.at(stream); } catch (const std::out_of_range &e) { diff --git a/src/internal/streams.h b/src/internal/streams.h index 6e4cf1d..eecb6b9 100644 --- a/src/internal/streams.h +++ b/src/internal/streams.h @@ -44,7 +44,7 @@ class Streams { void ConfigStream( const Capabilities &capability, const StreamRequest &request); - void PushStream(const Capabilities &capability, const void *data); + bool PushStream(const Capabilities &capability, const void *data); void WaitForStreams();