Add processor and update api

This commit is contained in:
John Zhao 2018-04-26 14:44:47 +08:00
parent 8f10285773
commit b97baae06d
9 changed files with 512 additions and 7 deletions

View File

@ -141,6 +141,7 @@ if(WITH_API)
list(APPEND MYNTEYE_SRCS
src/api/api.cc
src/api/synthetic.cc
src/api/processor/processor.cc
)
endif()

View File

@ -2,6 +2,8 @@
#include <glog/logging.h>
#include <thread>
#include "mynteye/utils.h"
#include "api/synthetic.h"
@ -31,7 +33,7 @@ Model API::GetModel() const {
}
bool API::Supports(const Stream &stream) const {
return device_->Supports(stream);
return synthetic_->Supports(stream);
}
bool API::Supports(const Capabilities &capability) const {
@ -92,6 +94,80 @@ bool API::RunOptionAction(const Option &option) const {
return device_->RunOptionAction(option);
}
void API::SetStreamCallback(const Stream &stream, stream_callback_t callback) {
synthetic_->SetStreamCallback(stream, callback);
}
void API::SetMotionCallback(motion_callback_t callback) {
static auto callback_ = callback;
if (callback_) {
device_->SetMotionCallback(
[](const device::MotionData &data) { callback_({data.imu}); });
} else {
device_->SetMotionCallback(nullptr);
}
}
bool API::HasStreamCallback(const Stream &stream) const {
return synthetic_->HasStreamCallback(stream);
}
bool API::HasMotionCallback() const {
return device_->HasMotionCallback();
}
void API::Start(const Source &source) {
if (source == Source::VIDEO_STREAMING) {
synthetic_->StartVideoStreaming();
} else if (source == Source::MOTION_TRACKING) {
device_->StartMotionTracking();
} else if (source == Source::ALL) {
Start(Source::VIDEO_STREAMING);
Start(Source::MOTION_TRACKING);
} else {
LOG(FATAL) << "Unsupported source :(";
}
}
void API::Stop(const Source &source) {
if (source == Source::VIDEO_STREAMING) {
synthetic_->StopVideoStreaming();
} else if (source == Source::MOTION_TRACKING) {
device_->StopMotionTracking();
} else if (source == Source::ALL) {
Stop(Source::MOTION_TRACKING);
// Must stop motion tracking before video streaming and sleep a moment here
std::this_thread::sleep_for(std::chrono::milliseconds(10));
Stop(Source::VIDEO_STREAMING);
} else {
LOG(FATAL) << "Unsupported source :(";
}
}
void API::WaitForStreams() {
synthetic_->WaitForStreams();
}
api::StreamData API::GetStreamData(const Stream &stream) {
return synthetic_->GetStreamData(stream);
}
std::vector<api::StreamData> API::GetStreamDatas(const Stream &stream) {
return synthetic_->GetStreamDatas(stream);
}
void API::EnableMotionDatas(std::size_t max_size) {
device_->EnableMotionDatas(max_size);
}
std::vector<api::MotionData> API::GetMotionDatas() {
std::vector<api::MotionData> datas;
for (auto &&data : device_->GetMotionDatas()) {
datas.push_back({data.imu});
}
return datas;
}
std::shared_ptr<Device> API::device() {
return device_;
}

View File

@ -2,6 +2,9 @@
#define MYNTEYE_API_H_
#pragma once
#include <opencv2/core/core.hpp>
#include <limits>
#include <memory>
#include <string>
#include <vector>
@ -14,8 +17,24 @@ MYNTEYE_BEGIN_NAMESPACE
class Device;
class Synthetic;
namespace api {
struct MYNTEYE_API StreamData {
std::shared_ptr<ImgData> img;
cv::Mat frame;
};
struct MYNTEYE_API MotionData {
std::shared_ptr<ImuData> imu;
};
} // namespace api
class MYNTEYE_API API {
public:
using stream_callback_t = std::function<void(const api::StreamData &data)>;
using motion_callback_t = std::function<void(const api::MotionData &data)>;
explicit API(std::shared_ptr<Device> device);
/*virtual*/ ~API();
@ -48,6 +67,24 @@ class MYNTEYE_API API {
bool RunOptionAction(const Option &option) const;
void SetStreamCallback(const Stream &stream, stream_callback_t callback);
void SetMotionCallback(motion_callback_t callback);
bool HasStreamCallback(const Stream &stream) const;
bool HasMotionCallback() const;
void Start(const Source &source);
void Stop(const Source &source);
void WaitForStreams();
api::StreamData GetStreamData(const Stream &stream);
std::vector<api::StreamData> GetStreamDatas(const Stream &stream);
void EnableMotionDatas(
std::size_t max_size = std::numeric_limits<std::size_t>::max());
std::vector<api::MotionData> GetMotionDatas();
std::shared_ptr<Device> device();
private:

View File

@ -0,0 +1,62 @@
#ifndef MYNTEYE_OBJECT_H_ // NOLINT
#define MYNTEYE_OBJECT_H_
#pragma once
#include <opencv2/core/core.hpp>
#include "mynteye/mynteye.h"
MYNTEYE_BEGIN_NAMESPACE
/**
* Input & output object.
*/
struct Object {
Object() = default;
virtual ~Object() = default;
virtual Object *Clone() const = 0;
template <typename T>
static T *Cast(Object *obj) {
return dynamic_cast<T *>(obj);
}
template <typename T>
static const T *Cast(const Object *obj) {
return dynamic_cast<const T *>(obj);
}
};
struct ObjMat : public Object {
ObjMat() = default;
explicit ObjMat(const cv::Mat &value) : value(value) {}
cv::Mat value;
Object *Clone() const {
ObjMat *mat = new ObjMat;
mat->value = value.clone();
return mat;
}
};
struct ObjMat2 : public Object {
ObjMat2() = default;
ObjMat2(const cv::Mat &first, const cv::Mat &second)
: first(first), second(second) {}
cv::Mat first;
cv::Mat second;
Object *Clone() const {
ObjMat2 *mat2 = new ObjMat2;
mat2->first = first.clone();
mat2->second = second.clone();
return mat2;
}
};
MYNTEYE_END_NAMESPACE
#endif // MYNTEYE_OBJECT_H_ NOLINT

View File

@ -0,0 +1,180 @@
#include "api/processor/processor.h"
#include <glog/logging.h>
#include <utility>
MYNTEYE_BEGIN_NAMESPACE
Processor::Processor()
: activated_(false),
input_ready_(false),
idle_(true),
dropped_count_(0),
input_(nullptr),
output_(nullptr),
output_result_(nullptr),
pre_callback_(nullptr),
post_callback_(nullptr),
callback_(nullptr),
parent_(nullptr) {
VLOG(2) << __func__;
}
Processor::~Processor() {
VLOG(2) << __func__;
Deactivate();
input_.reset(nullptr);
output_.reset(nullptr);
output_result_.reset(nullptr);
childs_.clear();
}
std::string Processor::Name() {
return "Processor";
}
void Processor::AddChild(const std::shared_ptr<Processor> &child) {
child->parent_ = this;
childs_.push_back(child);
}
void Processor::RemoveChild(const std::shared_ptr<Processor> &child) {
childs_.remove(child);
}
std::list<std::shared_ptr<Processor>> Processor::GetChilds() {
return childs_;
}
void Processor::SetPreProcessCallback(PreProcessCallback callback) {
pre_callback_ = std::move(callback);
}
void Processor::SetPostProcessCallback(PostProcessCallback callback) {
post_callback_ = std::move(callback);
}
void Processor::SetProcessCallback(ProcessCallback callback) {
callback_ = std::move(callback);
}
void Processor::Activate() {
if (activated_)
return;
activated_ = true;
{
// Activate all parents
Processor *parent = parent_;
while (parent != nullptr) {
parent->Activate();
parent = parent->parent_;
}
}
thread_ = std::thread(&Processor::Run, this);
// thread_.detach();
}
void Processor::Deactivate() {
if (!activated_)
return;
activated_ = false;
{
std::lock_guard<std::mutex> lk(mtx_input_ready_);
input_ready_ = true;
}
cond_input_ready_.notify_all();
thread_.join();
}
bool Processor::IsActivated() {
return activated_;
}
bool Processor::IsIdle() {
std::lock_guard<std::mutex> lk(mtx_state_);
return idle_;
}
bool Processor::Process(const Object *const in) {
if (!activated_)
return false;
if (!idle_) {
std::lock_guard<std::mutex> lk(mtx_state_);
if (!idle_) {
++dropped_count_;
return false;
}
}
{
std::lock_guard<std::mutex> lk(mtx_input_ready_);
input_.reset(in->Clone());
input_ready_ = true;
}
cond_input_ready_.notify_all();
return true;
}
Object *Processor::GetOutput() {
std::lock_guard<std::mutex> lk(mtx_result_);
return output_result_.get();
}
std::uint64_t Processor::GetDroppedCount() {
std::lock_guard<std::mutex> lk(mtx_state_);
return dropped_count_;
}
void Processor::Run() {
VLOG(2) << Name() << " thread start";
while (true) {
std::unique_lock<std::mutex> lk(mtx_input_ready_);
cond_input_ready_.wait(lk, [this] { return input_ready_; });
if (!activated_) {
SetIdle(true);
input_ready_ = false;
break;
}
SetIdle(false);
if (!output_) {
output_.reset(OnCreateOutput());
}
if (pre_callback_) {
pre_callback_(input_.get());
}
if (callback_) {
if (!callback_(input_.get(), output_.get(), parent_)) {
OnProcess(input_.get(), output_.get(), parent_);
}
} else {
OnProcess(input_.get(), output_.get(), parent_);
}
if (post_callback_) {
post_callback_(output_.get());
}
{
std::unique_lock<std::mutex> lk(mtx_result_);
output_result_.reset(output_->Clone());
}
if (!childs_.empty()) {
for (auto child : childs_) {
child->Process(output_.get());
}
}
SetIdle(true);
input_ready_ = false;
}
VLOG(2) << Name() << " thread end";
}
void Processor::SetIdle(bool idle) {
std::lock_guard<std::mutex> lk(mtx_state_);
idle_ = idle;
}
MYNTEYE_END_NAMESPACE

View File

@ -0,0 +1,98 @@
#ifndef MYNTEYE_PROCESSOR_H_ // NOLINT
#define MYNTEYE_PROCESSOR_H_
#pragma once
#include <condition_variable>
#include <cstdint>
#include <functional>
#include <list>
#include <memory>
#include <mutex>
#include <string>
#include <thread>
#include "mynteye/mynteye.h"
#include "api/processor/object.h"
MYNTEYE_BEGIN_NAMESPACE
class Processor /*: public std::enable_shared_from_this<Processor>*/ {
public:
using PreProcessCallback = std::function<void(Object *const)>;
using PostProcessCallback = std::function<void(Object *const)>;
using ProcessCallback = std::function<bool(
Object *const in, Object *const out, Processor *const parent)>;
Processor();
virtual ~Processor();
virtual std::string Name();
void AddChild(const std::shared_ptr<Processor> &child);
void RemoveChild(const std::shared_ptr<Processor> &child);
std::list<std::shared_ptr<Processor>> GetChilds();
void SetPreProcessCallback(PreProcessCallback callback);
void SetPostProcessCallback(PostProcessCallback callback);
void SetProcessCallback(ProcessCallback callback);
void Activate();
void Deactivate();
bool IsActivated();
bool IsIdle();
/** Returns dropped or not. */
bool Process(const Object *const in);
/**
* Returns the last output.
* @note Returns null if not output now.
*/
Object *GetOutput();
std::uint64_t GetDroppedCount();
protected:
virtual Object *OnCreateOutput() = 0;
virtual void OnProcess(
Object *const in, Object *const out, Processor *const parent) = 0;
private:
/** Run in standalone thread. */
void Run();
void SetIdle(bool idle);
bool activated_;
bool input_ready_;
std::mutex mtx_input_ready_;
std::condition_variable cond_input_ready_;
bool idle_;
std::uint64_t dropped_count_;
std::mutex mtx_state_;
std::unique_ptr<Object> input_;
std::unique_ptr<Object> output_;
std::unique_ptr<Object> output_result_;
std::mutex mtx_result_;
PreProcessCallback pre_callback_;
PostProcessCallback post_callback_;
ProcessCallback callback_;
Processor *parent_;
std::list<std::shared_ptr<Processor>> childs_;
std::thread thread_;
};
MYNTEYE_END_NAMESPACE
#endif // MYNTEYE_PROCESSOR_H_ NOLINT

View File

@ -2,7 +2,8 @@
#include <glog/logging.h>
#include "api/api.h"
#include "api/processor/processor.h"
#include "device/device.h"
MYNTEYE_BEGIN_NAMESPACE
@ -14,4 +15,35 @@ Synthetic::~Synthetic() {
VLOG(2) << __func__;
}
bool Synthetic::Supports(const Stream &stream) const {
return api_->device()->Supports(stream);
}
void Synthetic::SetStreamCallback(
const Stream &stream, stream_callback_t callback) {
UNUSED(stream)
UNUSED(callback)
}
bool Synthetic::HasStreamCallback(const Stream &stream) const {
UNUSED(stream)
return false;
}
void Synthetic::StartVideoStreaming() {}
void Synthetic::StopVideoStreaming() {}
void Synthetic::WaitForStreams() {}
api::StreamData Synthetic::GetStreamData(const Stream &stream) {
UNUSED(stream)
return {};
}
std::vector<api::StreamData> Synthetic::GetStreamDatas(const Stream &stream) {
UNUSED(stream)
return {};
}
MYNTEYE_END_NAMESPACE

View File

@ -2,19 +2,40 @@
#define MYNTEYE_SYNTHETIC_H_
#pragma once
#include "mynteye/mynteye.h"
#include <memory>
#include <vector>
#include "api/api.h"
MYNTEYE_BEGIN_NAMESPACE
class API;
class Processor;
class Synthetic {
public:
using stream_callback_t = API::stream_callback_t;
explicit Synthetic(API *api);
~Synthetic();
bool Supports(const Stream &stream) const;
void SetStreamCallback(const Stream &stream, stream_callback_t callback);
bool HasStreamCallback(const Stream &stream) const;
void StartVideoStreaming();
void StopVideoStreaming();
void WaitForStreams();
api::StreamData GetStreamData(const Stream &stream);
std::vector<api::StreamData> GetStreamDatas(const Stream &stream);
private:
API *api_;
std::vector<std::shared_ptr<Processor>> processors_;
};
MYNTEYE_END_NAMESPACE

View File

@ -29,6 +29,7 @@ class DeviceWriter;
struct DeviceInfo;
class API;
class Channels;
class Motions;
class Streams;
@ -104,10 +105,6 @@ class MYNTEYE_API Device {
return device_;
}
std::shared_ptr<DeviceInfo> device_info() const {
return device_info_;
}
const StreamRequest &GetStreamRequest(const Capabilities &capability);
virtual void StartVideoStreaming();
@ -151,6 +148,7 @@ class MYNTEYE_API Device {
return channels_;
}
friend API;
friend tools::DeviceWriter;
};