From 7e55f78418d2fe0df76e7e46729d5a00b645f8f2 Mon Sep 17 00:00:00 2001 From: John Zhao Date: Thu, 31 May 2018 15:39:42 +0800 Subject: [PATCH] Add async callback support --- src/api/api.cc | 5 +- src/api/synthetic.cc | 6 ++- src/device/device.cc | 59 +++++++++++++++------- src/device/device.h | 19 +++++++- src/internal/async_callback.h | 59 ++++++++++++++++++++++ src/internal/async_callback_impl.h | 78 ++++++++++++++++++++++++++++++ 6 files changed, 203 insertions(+), 23 deletions(-) create mode 100644 src/internal/async_callback.h create mode 100644 src/internal/async_callback_impl.h diff --git a/src/api/api.cc b/src/api/api.cc index 2c28146..9da5d30 100644 --- a/src/api/api.cc +++ b/src/api/api.cc @@ -139,8 +139,9 @@ void API::SetStreamCallback(const Stream &stream, stream_callback_t callback) { void API::SetMotionCallback(motion_callback_t callback) { static auto callback_ = callback; if (callback_) { - device_->SetMotionCallback( - [](const device::MotionData &data) { callback_({data.imu}); }); + device_->SetMotionCallback([](const device::MotionData &data) { + callback_({data.imu}); + } /*, true*/); } else { device_->SetMotionCallback(nullptr); } diff --git a/src/api/synthetic.cc b/src/api/synthetic.cc index 5f85118..447c0a9 100644 --- a/src/api/synthetic.cc +++ b/src/api/synthetic.cc @@ -104,14 +104,16 @@ void Synthetic::StartVideoStreaming() { if (it->second == MODE_NATIVE) { auto &&stream = it->first; device->SetStreamCallback( - stream, [this, stream](const device::StreamData &data) { + stream, + [this, stream](const device::StreamData &data) { auto &&stream_data = data2api(data); ProcessNativeStream(stream, stream_data); // Need mutex if set callback after start if (HasStreamCallback(stream)) { stream_callbacks_.at(stream)(stream_data); } - }); + }, + true); } } device->Start(Source::VIDEO_STREAMING); diff --git a/src/device/device.cc b/src/device/device.cc index a692c2a..f5de75c 100644 --- a/src/device/device.cc +++ b/src/device/device.cc @@ -20,6 +20,7 @@ #include #include "device/device_s.h" +#include "internal/async_callback.h" #include "internal/channels.h" #include "internal/config.h" #include "internal/motions.h" @@ -272,20 +273,32 @@ bool Device::RunOptionAction(const Option &option) const { } void Device::SetStreamCallback( - const Stream &stream, stream_callback_t callback) { + const Stream &stream, stream_callback_t callback, bool async) { if (!Supports(stream)) { LOG(WARNING) << "Unsupported stream: " << stream; return; } if (callback) { stream_callbacks_[stream] = callback; + if (async) + stream_async_callbacks_[stream] = + std::make_shared( + to_string(stream), callback); } else { stream_callbacks_.erase(stream); + stream_async_callbacks_.erase(stream); } } -void Device::SetMotionCallback(motion_callback_t callback) { +void Device::SetMotionCallback(motion_callback_t callback, bool async) { motion_callback_ = callback; + if (callback) { + if (async) + motion_async_callback_ = + std::make_shared("motion", callback); + } else { + motion_async_callback_ = nullptr; + } } bool Device::HasStreamCallback(const Stream &stream) const { @@ -409,16 +422,8 @@ void Device::StartVideoStreaming() { return; } if (streams_->PushStream(Capabilities::STEREO, data)) { - if (HasStreamCallback(Stream::LEFT)) { - auto &&stream_datas = streams_->stream_datas(Stream::LEFT); - // if (stream_datas.size() > 0) {} - stream_callbacks_.at(Stream::LEFT)(stream_datas.back()); - } - if (HasStreamCallback(Stream::RIGHT)) { - auto &&stream_datas = streams_->stream_datas(Stream::RIGHT); - // if (stream_datas.size() > 0) {} - stream_callbacks_.at(Stream::RIGHT)(stream_datas.back()); - } + CallbackPushedStreamData(Stream::LEFT); + CallbackPushedStreamData(Stream::RIGHT); } }); } else { @@ -446,11 +451,8 @@ void Device::StartMotionTracking() { LOG(WARNING) << "Cannot start motion tracking without first stopping it"; return; } - motions_->SetMotionCallback([this](const device::MotionData &data) { - if (motion_callback_) { - motion_callback_(data); - } - }); + motions_->SetMotionCallback( + std::bind(&Device::CallbackMotionData, this, std::placeholders::_1)); motions_->StartMotionTracking(); motion_tracking_ = true; } @@ -508,4 +510,27 @@ void Device::ReadAllInfos() { } } +void Device::CallbackPushedStreamData(const Stream &stream) { + if (HasStreamCallback(stream)) { + auto &&datas = streams_->stream_datas(stream); + // if (datas.size() > 0) {} + auto &&data = datas.back(); + if (stream_async_callbacks_.find(stream) != stream_async_callbacks_.end()) { + stream_async_callbacks_.at(stream)->PushData(data); + } else { + stream_callbacks_.at(stream)(data); + } + } +} + +void Device::CallbackMotionData(const device::MotionData &data) { + if (HasMotionCallback()) { + if (motion_async_callback_) { + motion_async_callback_->PushData(data); + } else { + motion_callback_(data); + } + } +} + MYNTEYE_END_NAMESPACE diff --git a/src/device/device.h b/src/device/device.h index 7998d18..9fc86d0 100644 --- a/src/device/device.h +++ b/src/device/device.h @@ -46,6 +46,9 @@ class Channels; class Motions; class Streams; +template +class AsyncCallback; + /** * The Device class to communicate with MYNT® EYE device. */ @@ -58,6 +61,11 @@ class MYNTEYE_API Device { using stream_callbacks_t = std::map; + using stream_async_callback_t = AsyncCallback; + using motion_async_callback_t = AsyncCallback; + using stream_async_callback_ptr_t = std::shared_ptr; + using motion_async_callback_ptr_t = std::shared_ptr; + Device(const Model &model, std::shared_ptr device); virtual ~Device(); @@ -175,11 +183,12 @@ class MYNTEYE_API Device { /** * Set the callback of stream. */ - void SetStreamCallback(const Stream &stream, stream_callback_t callback); + void SetStreamCallback( + const Stream &stream, stream_callback_t callback, bool async = false); /** * Set the callback of motion. */ - void SetMotionCallback(motion_callback_t callback); + void SetMotionCallback(motion_callback_t callback, bool async = false); /** * Has the callback of stream. @@ -256,6 +265,9 @@ class MYNTEYE_API Device { stream_callbacks_t stream_callbacks_; motion_callback_t motion_callback_; + std::map stream_async_callbacks_; + motion_async_callback_ptr_t motion_async_callback_; + std::shared_ptr streams_; std::map stream_config_requests_; @@ -266,6 +278,9 @@ class MYNTEYE_API Device { void ReadAllInfos(); + void CallbackPushedStreamData(const Stream &stream); + void CallbackMotionData(const device::MotionData &data); + std::shared_ptr channels() { return channels_; } diff --git a/src/internal/async_callback.h b/src/internal/async_callback.h new file mode 100644 index 0000000..ffcc7e7 --- /dev/null +++ b/src/internal/async_callback.h @@ -0,0 +1,59 @@ +// Copyright 2018 Slightech Co., Ltd. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#ifndef MYNTEYE_INTERNAL_ASYNC_CALLBACK_H_ // NOLINT +#define MYNTEYE_INTERNAL_ASYNC_CALLBACK_H_ +#pragma once + +#include +#include +#include +#include +#include + +#include "mynteye/mynteye.h" + +MYNTEYE_BEGIN_NAMESPACE + +template +class AsyncCallback { + public: + using callback_t = std::function; + + AsyncCallback(std::string name, callback_t callback); + ~AsyncCallback(); + + void PushData(Data data); + + private: + void Run(); + + std::string name_; + + callback_t callback_; + + std::mutex mtx_; + std::condition_variable cv_; + + bool running_; + std::thread thread_; + + Data data_; + std::uint32_t count_; +}; + +MYNTEYE_END_NAMESPACE + +#include "internal/async_callback_impl.h" + +#endif // MYNTEYE_INTERNAL_ASYNC_CALLBACK_H_ NOLINT diff --git a/src/internal/async_callback_impl.h b/src/internal/async_callback_impl.h new file mode 100644 index 0000000..dfdc0bd --- /dev/null +++ b/src/internal/async_callback_impl.h @@ -0,0 +1,78 @@ +// Copyright 2018 Slightech Co., Ltd. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#ifndef MYNTEYE_INTERNAL_ASYNC_CALLBACK_IMPL_H_ // NOLINT +#define MYNTEYE_INTERNAL_ASYNC_CALLBACK_IMPL_H_ +#pragma once + +#include + +#include +#include + +MYNTEYE_BEGIN_NAMESPACE + +template +AsyncCallback::AsyncCallback(std::string name, callback_t callback) + : name_(std::move(name)), callback_(std::move(callback)), count_(0) { + VLOG(2) << __func__; + running_ = true; + thread_ = std::thread(&AsyncCallback::Run, this); +} + +template +AsyncCallback::~AsyncCallback() { + VLOG(2) << __func__; + { + std::lock_guard _(mtx_); + running_ = false; + ++count_; + } + cv_.notify_one(); + if (thread_.joinable()) { + thread_.join(); + } +} + +template +void AsyncCallback::PushData(Data data) { + std::lock_guard _(mtx_); + data_ = data; + ++count_; + cv_.notify_one(); +} + +template +void AsyncCallback::Run() { + VLOG(2) << "AsyncCallback(" << name_ << ") thread start"; + while (true) { + std::unique_lock lock(mtx_); + cv_.wait(lock, [this] { return count_ > 0; }); + + if (!running_) + break; + + if (callback_) + callback_(data_); + + if (VLOG_IS_ON(2) && count_ > 1) { + VLOG(2) << "AsyncCallback(" << name_ << ") dropped " << (count_ - 1); + } + count_ = 0; + } + VLOG(2) << "AsyncCallback(" << name_ << ") thread end"; +} + +MYNTEYE_END_NAMESPACE + +#endif // MYNTEYE_INTERNAL_ASYNC_CALLBACK_IMPL_H_ NOLINT