diff --git a/CMakeLists.txt b/CMakeLists.txt index 0e9ce9d..06d7b5a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -141,6 +141,7 @@ if(WITH_API) list(APPEND MYNTEYE_SRCS src/api/api.cc src/api/synthetic.cc + src/api/processor/processor.cc ) endif() diff --git a/src/api/api.cc b/src/api/api.cc index d6585c1..614f94b 100644 --- a/src/api/api.cc +++ b/src/api/api.cc @@ -2,6 +2,8 @@ #include +#include + #include "mynteye/utils.h" #include "api/synthetic.h" @@ -31,7 +33,7 @@ Model API::GetModel() const { } bool API::Supports(const Stream &stream) const { - return device_->Supports(stream); + return synthetic_->Supports(stream); } bool API::Supports(const Capabilities &capability) const { @@ -92,6 +94,80 @@ bool API::RunOptionAction(const Option &option) const { return device_->RunOptionAction(option); } +void API::SetStreamCallback(const Stream &stream, stream_callback_t callback) { + synthetic_->SetStreamCallback(stream, callback); +} + +void API::SetMotionCallback(motion_callback_t callback) { + static auto callback_ = callback; + if (callback_) { + device_->SetMotionCallback( + [](const device::MotionData &data) { callback_({data.imu}); }); + } else { + device_->SetMotionCallback(nullptr); + } +} + +bool API::HasStreamCallback(const Stream &stream) const { + return synthetic_->HasStreamCallback(stream); +} + +bool API::HasMotionCallback() const { + return device_->HasMotionCallback(); +} + +void API::Start(const Source &source) { + if (source == Source::VIDEO_STREAMING) { + synthetic_->StartVideoStreaming(); + } else if (source == Source::MOTION_TRACKING) { + device_->StartMotionTracking(); + } else if (source == Source::ALL) { + Start(Source::VIDEO_STREAMING); + Start(Source::MOTION_TRACKING); + } else { + LOG(FATAL) << "Unsupported source :("; + } +} + +void API::Stop(const Source &source) { + if (source == Source::VIDEO_STREAMING) { + synthetic_->StopVideoStreaming(); + } else if (source == Source::MOTION_TRACKING) { + device_->StopMotionTracking(); + } else if (source == Source::ALL) { + Stop(Source::MOTION_TRACKING); + // Must stop motion tracking before video streaming and sleep a moment here + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + Stop(Source::VIDEO_STREAMING); + } else { + LOG(FATAL) << "Unsupported source :("; + } +} + +void API::WaitForStreams() { + synthetic_->WaitForStreams(); +} + +api::StreamData API::GetStreamData(const Stream &stream) { + return synthetic_->GetStreamData(stream); +} + +std::vector API::GetStreamDatas(const Stream &stream) { + return synthetic_->GetStreamDatas(stream); +} + +void API::EnableMotionDatas(std::size_t max_size) { + device_->EnableMotionDatas(max_size); +} + +std::vector API::GetMotionDatas() { + std::vector datas; + for (auto &&data : device_->GetMotionDatas()) { + datas.push_back({data.imu}); + } + return datas; +} + std::shared_ptr API::device() { return device_; } diff --git a/src/api/api.h b/src/api/api.h index 7a45f9f..d226e70 100644 --- a/src/api/api.h +++ b/src/api/api.h @@ -2,6 +2,9 @@ #define MYNTEYE_API_H_ #pragma once +#include + +#include #include #include #include @@ -14,8 +17,24 @@ MYNTEYE_BEGIN_NAMESPACE class Device; class Synthetic; +namespace api { + +struct MYNTEYE_API StreamData { + std::shared_ptr img; + cv::Mat frame; +}; + +struct MYNTEYE_API MotionData { + std::shared_ptr imu; +}; + +} // namespace api + class MYNTEYE_API API { public: + using stream_callback_t = std::function; + using motion_callback_t = std::function; + explicit API(std::shared_ptr device); /*virtual*/ ~API(); @@ -48,6 +67,24 @@ class MYNTEYE_API API { bool RunOptionAction(const Option &option) const; + void SetStreamCallback(const Stream &stream, stream_callback_t callback); + void SetMotionCallback(motion_callback_t callback); + + bool HasStreamCallback(const Stream &stream) const; + bool HasMotionCallback() const; + + void Start(const Source &source); + void Stop(const Source &source); + + void WaitForStreams(); + + api::StreamData GetStreamData(const Stream &stream); + std::vector GetStreamDatas(const Stream &stream); + + void EnableMotionDatas( + std::size_t max_size = std::numeric_limits::max()); + std::vector GetMotionDatas(); + std::shared_ptr device(); private: diff --git a/src/api/processor/object.h b/src/api/processor/object.h new file mode 100644 index 0000000..51faf7b --- /dev/null +++ b/src/api/processor/object.h @@ -0,0 +1,62 @@ +#ifndef MYNTEYE_OBJECT_H_ // NOLINT +#define MYNTEYE_OBJECT_H_ +#pragma once + +#include + +#include "mynteye/mynteye.h" + +MYNTEYE_BEGIN_NAMESPACE + +/** + * Input & output object. + */ +struct Object { + Object() = default; + virtual ~Object() = default; + + virtual Object *Clone() const = 0; + + template + static T *Cast(Object *obj) { + return dynamic_cast(obj); + } + + template + static const T *Cast(const Object *obj) { + return dynamic_cast(obj); + } +}; + +struct ObjMat : public Object { + ObjMat() = default; + explicit ObjMat(const cv::Mat &value) : value(value) {} + + cv::Mat value; + + Object *Clone() const { + ObjMat *mat = new ObjMat; + mat->value = value.clone(); + return mat; + } +}; + +struct ObjMat2 : public Object { + ObjMat2() = default; + ObjMat2(const cv::Mat &first, const cv::Mat &second) + : first(first), second(second) {} + + cv::Mat first; + cv::Mat second; + + Object *Clone() const { + ObjMat2 *mat2 = new ObjMat2; + mat2->first = first.clone(); + mat2->second = second.clone(); + return mat2; + } +}; + +MYNTEYE_END_NAMESPACE + +#endif // MYNTEYE_OBJECT_H_ NOLINT diff --git a/src/api/processor/processor.cc b/src/api/processor/processor.cc new file mode 100644 index 0000000..b3f6491 --- /dev/null +++ b/src/api/processor/processor.cc @@ -0,0 +1,180 @@ +#include "api/processor/processor.h" + +#include + +#include + +MYNTEYE_BEGIN_NAMESPACE + +Processor::Processor() + : activated_(false), + input_ready_(false), + idle_(true), + dropped_count_(0), + input_(nullptr), + output_(nullptr), + output_result_(nullptr), + pre_callback_(nullptr), + post_callback_(nullptr), + callback_(nullptr), + parent_(nullptr) { + VLOG(2) << __func__; +} + +Processor::~Processor() { + VLOG(2) << __func__; + Deactivate(); + input_.reset(nullptr); + output_.reset(nullptr); + output_result_.reset(nullptr); + childs_.clear(); +} + +std::string Processor::Name() { + return "Processor"; +} + +void Processor::AddChild(const std::shared_ptr &child) { + child->parent_ = this; + childs_.push_back(child); +} + +void Processor::RemoveChild(const std::shared_ptr &child) { + childs_.remove(child); +} + +std::list> Processor::GetChilds() { + return childs_; +} + +void Processor::SetPreProcessCallback(PreProcessCallback callback) { + pre_callback_ = std::move(callback); +} + +void Processor::SetPostProcessCallback(PostProcessCallback callback) { + post_callback_ = std::move(callback); +} + +void Processor::SetProcessCallback(ProcessCallback callback) { + callback_ = std::move(callback); +} + +void Processor::Activate() { + if (activated_) + return; + activated_ = true; + { + // Activate all parents + Processor *parent = parent_; + while (parent != nullptr) { + parent->Activate(); + parent = parent->parent_; + } + } + thread_ = std::thread(&Processor::Run, this); + // thread_.detach(); +} + +void Processor::Deactivate() { + if (!activated_) + return; + activated_ = false; + { + std::lock_guard lk(mtx_input_ready_); + input_ready_ = true; + } + cond_input_ready_.notify_all(); + thread_.join(); +} + +bool Processor::IsActivated() { + return activated_; +} + +bool Processor::IsIdle() { + std::lock_guard lk(mtx_state_); + return idle_; +} + +bool Processor::Process(const Object *const in) { + if (!activated_) + return false; + if (!idle_) { + std::lock_guard lk(mtx_state_); + if (!idle_) { + ++dropped_count_; + return false; + } + } + { + std::lock_guard lk(mtx_input_ready_); + input_.reset(in->Clone()); + input_ready_ = true; + } + cond_input_ready_.notify_all(); + return true; +} + +Object *Processor::GetOutput() { + std::lock_guard lk(mtx_result_); + return output_result_.get(); +} + +std::uint64_t Processor::GetDroppedCount() { + std::lock_guard lk(mtx_state_); + return dropped_count_; +} + +void Processor::Run() { + VLOG(2) << Name() << " thread start"; + while (true) { + std::unique_lock lk(mtx_input_ready_); + cond_input_ready_.wait(lk, [this] { return input_ready_; }); + + if (!activated_) { + SetIdle(true); + input_ready_ = false; + break; + } + SetIdle(false); + + if (!output_) { + output_.reset(OnCreateOutput()); + } + + if (pre_callback_) { + pre_callback_(input_.get()); + } + if (callback_) { + if (!callback_(input_.get(), output_.get(), parent_)) { + OnProcess(input_.get(), output_.get(), parent_); + } + } else { + OnProcess(input_.get(), output_.get(), parent_); + } + if (post_callback_) { + post_callback_(output_.get()); + } + { + std::unique_lock lk(mtx_result_); + output_result_.reset(output_->Clone()); + } + + if (!childs_.empty()) { + for (auto child : childs_) { + child->Process(output_.get()); + } + } + + SetIdle(true); + input_ready_ = false; + } + VLOG(2) << Name() << " thread end"; +} + +void Processor::SetIdle(bool idle) { + std::lock_guard lk(mtx_state_); + idle_ = idle; +} + +MYNTEYE_END_NAMESPACE diff --git a/src/api/processor/processor.h b/src/api/processor/processor.h new file mode 100644 index 0000000..d4a0906 --- /dev/null +++ b/src/api/processor/processor.h @@ -0,0 +1,98 @@ +#ifndef MYNTEYE_PROCESSOR_H_ // NOLINT +#define MYNTEYE_PROCESSOR_H_ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "mynteye/mynteye.h" + +#include "api/processor/object.h" + +MYNTEYE_BEGIN_NAMESPACE + +class Processor /*: public std::enable_shared_from_this*/ { + public: + using PreProcessCallback = std::function; + using PostProcessCallback = std::function; + using ProcessCallback = std::function; + + Processor(); + virtual ~Processor(); + + virtual std::string Name(); + + void AddChild(const std::shared_ptr &child); + + void RemoveChild(const std::shared_ptr &child); + + std::list> GetChilds(); + + void SetPreProcessCallback(PreProcessCallback callback); + void SetPostProcessCallback(PostProcessCallback callback); + void SetProcessCallback(ProcessCallback callback); + + void Activate(); + void Deactivate(); + bool IsActivated(); + + bool IsIdle(); + + /** Returns dropped or not. */ + bool Process(const Object *const in); + + /** + * Returns the last output. + * @note Returns null if not output now. + */ + Object *GetOutput(); + + std::uint64_t GetDroppedCount(); + + protected: + virtual Object *OnCreateOutput() = 0; + virtual void OnProcess( + Object *const in, Object *const out, Processor *const parent) = 0; + + private: + /** Run in standalone thread. */ + void Run(); + + void SetIdle(bool idle); + + bool activated_; + + bool input_ready_; + std::mutex mtx_input_ready_; + std::condition_variable cond_input_ready_; + + bool idle_; + std::uint64_t dropped_count_; + std::mutex mtx_state_; + + std::unique_ptr input_; + std::unique_ptr output_; + + std::unique_ptr output_result_; + std::mutex mtx_result_; + + PreProcessCallback pre_callback_; + PostProcessCallback post_callback_; + ProcessCallback callback_; + + Processor *parent_; + std::list> childs_; + + std::thread thread_; +}; + +MYNTEYE_END_NAMESPACE + +#endif // MYNTEYE_PROCESSOR_H_ NOLINT diff --git a/src/api/synthetic.cc b/src/api/synthetic.cc index 446d06e..90f12e0 100644 --- a/src/api/synthetic.cc +++ b/src/api/synthetic.cc @@ -2,7 +2,8 @@ #include -#include "api/api.h" +#include "api/processor/processor.h" +#include "device/device.h" MYNTEYE_BEGIN_NAMESPACE @@ -14,4 +15,35 @@ Synthetic::~Synthetic() { VLOG(2) << __func__; } +bool Synthetic::Supports(const Stream &stream) const { + return api_->device()->Supports(stream); +} + +void Synthetic::SetStreamCallback( + const Stream &stream, stream_callback_t callback) { + UNUSED(stream) + UNUSED(callback) +} + +bool Synthetic::HasStreamCallback(const Stream &stream) const { + UNUSED(stream) + return false; +} + +void Synthetic::StartVideoStreaming() {} + +void Synthetic::StopVideoStreaming() {} + +void Synthetic::WaitForStreams() {} + +api::StreamData Synthetic::GetStreamData(const Stream &stream) { + UNUSED(stream) + return {}; +} + +std::vector Synthetic::GetStreamDatas(const Stream &stream) { + UNUSED(stream) + return {}; +} + MYNTEYE_END_NAMESPACE diff --git a/src/api/synthetic.h b/src/api/synthetic.h index 6c3a1ae..645a934 100644 --- a/src/api/synthetic.h +++ b/src/api/synthetic.h @@ -2,19 +2,40 @@ #define MYNTEYE_SYNTHETIC_H_ #pragma once -#include "mynteye/mynteye.h" +#include +#include + +#include "api/api.h" MYNTEYE_BEGIN_NAMESPACE class API; +class Processor; class Synthetic { public: + using stream_callback_t = API::stream_callback_t; + explicit Synthetic(API *api); ~Synthetic(); + bool Supports(const Stream &stream) const; + + void SetStreamCallback(const Stream &stream, stream_callback_t callback); + bool HasStreamCallback(const Stream &stream) const; + + void StartVideoStreaming(); + void StopVideoStreaming(); + + void WaitForStreams(); + + api::StreamData GetStreamData(const Stream &stream); + std::vector GetStreamDatas(const Stream &stream); + private: API *api_; + + std::vector> processors_; }; MYNTEYE_END_NAMESPACE diff --git a/src/device/device.h b/src/device/device.h index a504d6c..d22b1e5 100644 --- a/src/device/device.h +++ b/src/device/device.h @@ -29,6 +29,7 @@ class DeviceWriter; struct DeviceInfo; +class API; class Channels; class Motions; class Streams; @@ -104,10 +105,6 @@ class MYNTEYE_API Device { return device_; } - std::shared_ptr device_info() const { - return device_info_; - } - const StreamRequest &GetStreamRequest(const Capabilities &capability); virtual void StartVideoStreaming(); @@ -151,6 +148,7 @@ class MYNTEYE_API Device { return channels_; } + friend API; friend tools::DeviceWriter; };