From 02dd4126acef8782a3ca3d34aa20497009bbb914 Mon Sep 17 00:00:00 2001 From: "Joshua M. Doe" Date: Mon, 2 Aug 2021 14:44:51 -0400 Subject: [PATCH] pleorasink: use async queues and improve debug messages --- sys/pleora/gstpleorasink.cpp | 10 --- sys/pleora/gstpleorasink.h | 2 - sys/pleora/streamingchannelsource.cpp | 105 +++++++++++++------------- sys/pleora/streamingchannelsource.h | 9 ++- 4 files changed, 58 insertions(+), 68 deletions(-) diff --git a/sys/pleora/gstpleorasink.cpp b/sys/pleora/gstpleorasink.cpp index db01fc2..027daf3 100644 --- a/sys/pleora/gstpleorasink.cpp +++ b/sys/pleora/gstpleorasink.cpp @@ -255,9 +255,6 @@ gst_pleorasink_init (GstPleoraSink * sink) sink->source = new GstStreamingChannelSource (); sink->source->SetSink (sink); sink->device = new PvSoftDeviceGEV (); - - g_mutex_init (&sink->mutex); - g_cond_init (&sink->cond); } void @@ -396,9 +393,6 @@ gst_pleorasink_dispose (GObject * object) sink->source = NULL; } - g_mutex_clear (&sink->mutex); - g_cond_clear (&sink->cond); - g_free (sink->address); G_OBJECT_CLASS (gst_pleorasink_parent_class)->dispose (object); @@ -695,7 +689,6 @@ GstFlowReturn gst_pleorasink_render (GstBaseSink * basesink, GstBuffer * buffer) { GstPleoraSink *sink = GST_PLEORASINK (basesink); - GST_LOG_OBJECT (sink, "Rendering buffer"); if (sink->stop_requested) { GST_DEBUG_OBJECT (sink, "stop requested, flushing"); @@ -713,10 +706,7 @@ gst_pleorasink_unlock (GstBaseSink * basesink) { GstPleoraSink *sink = GST_PLEORASINK (basesink); - g_mutex_lock (&sink->mutex); sink->stop_requested = TRUE; - g_cond_signal (&sink->cond); - g_mutex_unlock (&sink->mutex); return TRUE; } diff --git a/sys/pleora/gstpleorasink.h b/sys/pleora/gstpleorasink.h index 9ac40b0..dd5891d 100644 --- a/sys/pleora/gstpleorasink.h +++ b/sys/pleora/gstpleorasink.h @@ -62,8 +62,6 @@ struct _GstPleoraSink gboolean camera_connected; GstVideoInfo vinfo; - GMutex mutex; - GCond cond; gboolean acquisition_started; gboolean stop_requested; diff --git a/sys/pleora/streamingchannelsource.cpp b/sys/pleora/streamingchannelsource.cpp index 9c4e626..23e7287 100644 --- a/sys/pleora/streamingchannelsource.cpp +++ b/sys/pleora/streamingchannelsource.cpp @@ -29,10 +29,24 @@ GST_DEBUG_CATEGORY_EXTERN (pleorasink_debug); #define KLV_CHUNKID 0xFEDC GstStreamingChannelSource::GstStreamingChannelSource () -: mAcquisitionBuffer (NULL), mBufferCount (0), mBufferValid (FALSE), - mChunkModeActive(TRUE), mChunkKlvEnabled(TRUE), mKlvChunkSize(0) +: mBufferCount (0), + mChunkModeActive(TRUE), mChunkKlvEnabled(TRUE), mKlvChunkSize(0), + mStreamingStarted(false) { + mInputQueue = g_async_queue_new (); + mOutputQueue = g_async_queue_new (); +} +void GstStreamingChannelSource::OnStreamingStart() +{ + GST_DEBUG_OBJECT(mSink, "Controller has requested streaming start"); + mStreamingStarted = true; +} + +void GstStreamingChannelSource::OnStreamingStop() +{ + GST_DEBUG_OBJECT(mSink, "Controller has requested streaming stop"); + mStreamingStarted = false; } void @@ -128,8 +142,12 @@ gboolean GstStreamingChannelSource::GetKlvEnabled() PvBuffer * GstStreamingChannelSource::AllocBuffer () { if (mBufferCount < mSink->num_internal_buffers) { + GST_LOG_OBJECT(mSink, "Allocating buffer #%d", mBufferCount); + PvBuffer *buf = new PvBuffer; + buf->SetID(mBufferCount); mBufferCount++; - return new PvBuffer; + + return buf; } return NULL; } @@ -142,39 +160,21 @@ void GstStreamingChannelSource::FreeBuffer (PvBuffer * aBuffer) PvResult GstStreamingChannelSource::QueueBuffer (PvBuffer * aBuffer) { - g_mutex_lock (&mSink->mutex); - if (mAcquisitionBuffer == NULL) { - // No buffer queued, accept it - mAcquisitionBuffer = aBuffer; - mBufferValid = FALSE; - g_mutex_unlock (&mSink->mutex); - return PvResult::Code::OK; - } - g_mutex_unlock (&mSink->mutex); - return PvResult::Code::BUSY; + GST_LOG_OBJECT(mSink, "Pushing buffer #%d to input queue", aBuffer->GetID()); + g_async_queue_push(mInputQueue, aBuffer); + return PvResult::Code::OK; } -PvResult GstStreamingChannelSource::RetrieveBuffer (PvBuffer ** aBuffer) +PvResult GstStreamingChannelSource::RetrieveBuffer(PvBuffer** aBuffer) { - gint64 end_time; - - g_mutex_lock (&mSink->mutex); - // WAIT for buffer - end_time = g_get_monotonic_time () + 50 * G_TIME_SPAN_MILLISECOND; - while ((mAcquisitionBuffer == NULL || !mBufferValid) - && !mSink->stop_requested) { - if (!g_cond_wait_until (&mSink->cond, &mSink->mutex, end_time)) { - // No buffer queued for acquisition - g_mutex_unlock (&mSink->mutex); - return PvResult::Code::NO_AVAILABLE_DATA; - } + guint64 timeout_ms = 50; + *aBuffer = (PvBuffer*)(g_async_queue_timeout_pop(mOutputQueue, timeout_ms * 1000)); + if (!*aBuffer) { + GST_WARNING_OBJECT(mSink, "No buffers available in output queue after %llu ms, possibly slow video framerate", timeout_ms); + return PvResult::Code::NO_AVAILABLE_DATA; } - // Remove buffer from 1-deep pipeline - *aBuffer = mAcquisitionBuffer; - mAcquisitionBuffer = NULL; - mBufferValid = FALSE; - g_mutex_unlock (&mSink->mutex); + GST_LOG_OBJECT (mSink, "Returning buffer #%llu from output queue to GEV streaming thread", (*aBuffer)->GetID()); return PvResult::Code::OK; } @@ -237,22 +237,21 @@ GstStreamingChannelSource::SetBuffer (GstBuffer * buf) { GByteArray * klv_byte_array = NULL; - GST_LOG_OBJECT (mSink, "SetBuffer"); + PvBuffer* pvBuffer; - g_mutex_lock (&mSink->mutex); - - if (mAcquisitionBuffer == NULL) { - GST_WARNING_OBJECT (mSink, "No PvBuffer available to fill, dropping frame"); - g_mutex_unlock (&mSink->mutex); + guint64 timeout_ms = 50; + pvBuffer = (PvBuffer*)(g_async_queue_timeout_pop (mInputQueue, timeout_ms * 1000)); + if (!pvBuffer) { + if (mStreamingStarted) { + GST_WARNING_OBJECT(mSink, "No free buffers, dropping frame. No consumers connected, or insufficient network bandwidth. Try increasing num-internal-buffers and/or packet-size."); + } + else { + GST_LOG_OBJECT(mSink, "Dropping frame as no controller has requested streaming to start"); + } return; } - if (mBufferValid) { - GST_WARNING_OBJECT (mSink, - "Buffer already filled, dropping incoming frame"); - g_mutex_unlock (&mSink->mutex); - return; - } + GST_LOG_OBJECT(mSink, "Got buffer #%llu from input queue to fill with video data", pvBuffer->GetID()); if (mChunkKlvEnabled) { klv_byte_array = GetKlvByteArray (buf); @@ -263,31 +262,31 @@ GstStreamingChannelSource::SetBuffer (GstBuffer * buf) } } - ResizeBufferIfNeeded (mAcquisitionBuffer); + ResizeBufferIfNeeded (pvBuffer); /* TODO: avoid memcpy (when strides align) by attaching to PvBuffer */ GstMapInfo minfo; gst_buffer_map (buf, &minfo, GST_MAP_READ); - guint8 *dst = mAcquisitionBuffer->GetDataPointer (); + guint8 *dst = pvBuffer->GetDataPointer (); if (!dst) { GST_ERROR_OBJECT (mSink, "Have buffer to fill, but data pointer is invalid"); - g_mutex_unlock (&mSink->mutex); + //g_mutex_unlock (&mSink->mutex); return; } - g_assert (mAcquisitionBuffer->GetSize () >= minfo.size); + g_assert (pvBuffer->GetSize () >= minfo.size); /* TODO: fix stride if needed */ memcpy (dst, minfo.data, minfo.size); gst_buffer_unmap (buf, &minfo); - mAcquisitionBuffer->ResetChunks(); - mAcquisitionBuffer->SetChunkLayoutID(CHUNKLAYOUTID); + pvBuffer->ResetChunks(); + pvBuffer->SetChunkLayoutID(CHUNKLAYOUTID); if (mChunkKlvEnabled && klv_byte_array && klv_byte_array->len > 0) { PvResult pvRes; - pvRes = mAcquisitionBuffer->AddChunk (KLV_CHUNKID, (uint8_t*)klv_byte_array->data, klv_byte_array->len); + pvRes = pvBuffer->AddChunk (KLV_CHUNKID, (uint8_t*)klv_byte_array->data, klv_byte_array->len); if (pvRes.IsOK ()) { GST_LOG_OBJECT (mSink, "Added KLV as chunk data (len=%d)", klv_byte_array->len); } else { @@ -301,10 +300,8 @@ GstStreamingChannelSource::SetBuffer (GstBuffer * buf) g_byte_array_unref (klv_byte_array); } - mBufferValid = TRUE; - g_cond_signal (&mSink->cond); - - g_mutex_unlock (&mSink->mutex); + GST_LOG_OBJECT(mSink, "Pushing buffer #%d to output queue", pvBuffer->GetID()); + g_async_queue_push(mOutputQueue, pvBuffer); } GByteArray * GstStreamingChannelSource::GetKlvByteArray (GstBuffer * buf) diff --git a/sys/pleora/streamingchannelsource.h b/sys/pleora/streamingchannelsource.h index 1da196f..dc02cd3 100644 --- a/sys/pleora/streamingchannelsource.h +++ b/sys/pleora/streamingchannelsource.h @@ -26,6 +26,9 @@ class GstStreamingChannelSource:public PvStreamingChannelSourceDefault public: GstStreamingChannelSource (); + void OnStreamingStart(); + void OnStreamingStop(); + void SetSink (GstPleoraSink * sink); void SetCaps (GstCaps * caps); void ResizeBufferIfNeeded (PvBuffer * aBuffer); @@ -55,8 +58,8 @@ public: private: GstPleoraSink * mSink; - PvBuffer *mAcquisitionBuffer; - gboolean mBufferValid; + GAsyncQueue* mInputQueue; + GAsyncQueue* mOutputQueue; gint mBufferCount; gint mWidth; @@ -67,4 +70,6 @@ private: bool mChunkKlvEnabled; gint mKlvChunkSize; + + bool mStreamingStarted; }; \ No newline at end of file