pleorasink: use async queues and improve debug messages

This commit is contained in:
Joshua M. Doe 2021-08-02 14:44:51 -04:00
parent 63295f7290
commit 02dd4126ac
4 changed files with 58 additions and 68 deletions

View File

@ -255,9 +255,6 @@ gst_pleorasink_init (GstPleoraSink * sink)
sink->source = new GstStreamingChannelSource (); sink->source = new GstStreamingChannelSource ();
sink->source->SetSink (sink); sink->source->SetSink (sink);
sink->device = new PvSoftDeviceGEV (); sink->device = new PvSoftDeviceGEV ();
g_mutex_init (&sink->mutex);
g_cond_init (&sink->cond);
} }
void void
@ -396,9 +393,6 @@ gst_pleorasink_dispose (GObject * object)
sink->source = NULL; sink->source = NULL;
} }
g_mutex_clear (&sink->mutex);
g_cond_clear (&sink->cond);
g_free (sink->address); g_free (sink->address);
G_OBJECT_CLASS (gst_pleorasink_parent_class)->dispose (object); G_OBJECT_CLASS (gst_pleorasink_parent_class)->dispose (object);
@ -695,7 +689,6 @@ GstFlowReturn
gst_pleorasink_render (GstBaseSink * basesink, GstBuffer * buffer) gst_pleorasink_render (GstBaseSink * basesink, GstBuffer * buffer)
{ {
GstPleoraSink *sink = GST_PLEORASINK (basesink); GstPleoraSink *sink = GST_PLEORASINK (basesink);
GST_LOG_OBJECT (sink, "Rendering buffer");
if (sink->stop_requested) { if (sink->stop_requested) {
GST_DEBUG_OBJECT (sink, "stop requested, flushing"); GST_DEBUG_OBJECT (sink, "stop requested, flushing");
@ -713,10 +706,7 @@ gst_pleorasink_unlock (GstBaseSink * basesink)
{ {
GstPleoraSink *sink = GST_PLEORASINK (basesink); GstPleoraSink *sink = GST_PLEORASINK (basesink);
g_mutex_lock (&sink->mutex);
sink->stop_requested = TRUE; sink->stop_requested = TRUE;
g_cond_signal (&sink->cond);
g_mutex_unlock (&sink->mutex);
return TRUE; return TRUE;
} }

View File

@ -62,8 +62,6 @@ struct _GstPleoraSink
gboolean camera_connected; gboolean camera_connected;
GstVideoInfo vinfo; GstVideoInfo vinfo;
GMutex mutex;
GCond cond;
gboolean acquisition_started; gboolean acquisition_started;
gboolean stop_requested; gboolean stop_requested;

View File

@ -29,10 +29,24 @@ GST_DEBUG_CATEGORY_EXTERN (pleorasink_debug);
#define KLV_CHUNKID 0xFEDC #define KLV_CHUNKID 0xFEDC
GstStreamingChannelSource::GstStreamingChannelSource () GstStreamingChannelSource::GstStreamingChannelSource ()
: mAcquisitionBuffer (NULL), mBufferCount (0), mBufferValid (FALSE), : mBufferCount (0),
mChunkModeActive(TRUE), mChunkKlvEnabled(TRUE), mKlvChunkSize(0) mChunkModeActive(TRUE), mChunkKlvEnabled(TRUE), mKlvChunkSize(0),
mStreamingStarted(false)
{ {
mInputQueue = g_async_queue_new ();
mOutputQueue = g_async_queue_new ();
}
void GstStreamingChannelSource::OnStreamingStart()
{
GST_DEBUG_OBJECT(mSink, "Controller has requested streaming start");
mStreamingStarted = true;
}
void GstStreamingChannelSource::OnStreamingStop()
{
GST_DEBUG_OBJECT(mSink, "Controller has requested streaming stop");
mStreamingStarted = false;
} }
void void
@ -128,8 +142,12 @@ gboolean GstStreamingChannelSource::GetKlvEnabled()
PvBuffer * GstStreamingChannelSource::AllocBuffer () PvBuffer * GstStreamingChannelSource::AllocBuffer ()
{ {
if (mBufferCount < mSink->num_internal_buffers) { if (mBufferCount < mSink->num_internal_buffers) {
GST_LOG_OBJECT(mSink, "Allocating buffer #%d", mBufferCount);
PvBuffer *buf = new PvBuffer;
buf->SetID(mBufferCount);
mBufferCount++; mBufferCount++;
return new PvBuffer;
return buf;
} }
return NULL; return NULL;
} }
@ -142,39 +160,21 @@ void GstStreamingChannelSource::FreeBuffer (PvBuffer * aBuffer)
PvResult GstStreamingChannelSource::QueueBuffer (PvBuffer * aBuffer) PvResult GstStreamingChannelSource::QueueBuffer (PvBuffer * aBuffer)
{ {
g_mutex_lock (&mSink->mutex); GST_LOG_OBJECT(mSink, "Pushing buffer #%d to input queue", aBuffer->GetID());
if (mAcquisitionBuffer == NULL) { g_async_queue_push(mInputQueue, aBuffer);
// No buffer queued, accept it
mAcquisitionBuffer = aBuffer;
mBufferValid = FALSE;
g_mutex_unlock (&mSink->mutex);
return PvResult::Code::OK; return PvResult::Code::OK;
} }
g_mutex_unlock (&mSink->mutex);
return PvResult::Code::BUSY;
}
PvResult GstStreamingChannelSource::RetrieveBuffer(PvBuffer** aBuffer) PvResult GstStreamingChannelSource::RetrieveBuffer(PvBuffer** aBuffer)
{ {
gint64 end_time; guint64 timeout_ms = 50;
*aBuffer = (PvBuffer*)(g_async_queue_timeout_pop(mOutputQueue, timeout_ms * 1000));
g_mutex_lock (&mSink->mutex); if (!*aBuffer) {
// WAIT for buffer GST_WARNING_OBJECT(mSink, "No buffers available in output queue after %llu ms, possibly slow video framerate", timeout_ms);
end_time = g_get_monotonic_time () + 50 * G_TIME_SPAN_MILLISECOND;
while ((mAcquisitionBuffer == NULL || !mBufferValid)
&& !mSink->stop_requested) {
if (!g_cond_wait_until (&mSink->cond, &mSink->mutex, end_time)) {
// No buffer queued for acquisition
g_mutex_unlock (&mSink->mutex);
return PvResult::Code::NO_AVAILABLE_DATA; return PvResult::Code::NO_AVAILABLE_DATA;
} }
}
// Remove buffer from 1-deep pipeline
*aBuffer = mAcquisitionBuffer;
mAcquisitionBuffer = NULL;
mBufferValid = FALSE;
g_mutex_unlock (&mSink->mutex);
GST_LOG_OBJECT (mSink, "Returning buffer #%llu from output queue to GEV streaming thread", (*aBuffer)->GetID());
return PvResult::Code::OK; return PvResult::Code::OK;
} }
@ -237,22 +237,21 @@ GstStreamingChannelSource::SetBuffer (GstBuffer * buf)
{ {
GByteArray * klv_byte_array = NULL; GByteArray * klv_byte_array = NULL;
GST_LOG_OBJECT (mSink, "SetBuffer"); PvBuffer* pvBuffer;
g_mutex_lock (&mSink->mutex); guint64 timeout_ms = 50;
pvBuffer = (PvBuffer*)(g_async_queue_timeout_pop (mInputQueue, timeout_ms * 1000));
if (mAcquisitionBuffer == NULL) { if (!pvBuffer) {
GST_WARNING_OBJECT (mSink, "No PvBuffer available to fill, dropping frame"); if (mStreamingStarted) {
g_mutex_unlock (&mSink->mutex); GST_WARNING_OBJECT(mSink, "No free buffers, dropping frame. No consumers connected, or insufficient network bandwidth. Try increasing num-internal-buffers and/or packet-size.");
}
else {
GST_LOG_OBJECT(mSink, "Dropping frame as no controller has requested streaming to start");
}
return; return;
} }
if (mBufferValid) { GST_LOG_OBJECT(mSink, "Got buffer #%llu from input queue to fill with video data", pvBuffer->GetID());
GST_WARNING_OBJECT (mSink,
"Buffer already filled, dropping incoming frame");
g_mutex_unlock (&mSink->mutex);
return;
}
if (mChunkKlvEnabled) { if (mChunkKlvEnabled) {
klv_byte_array = GetKlvByteArray (buf); klv_byte_array = GetKlvByteArray (buf);
@ -263,31 +262,31 @@ GstStreamingChannelSource::SetBuffer (GstBuffer * buf)
} }
} }
ResizeBufferIfNeeded (mAcquisitionBuffer); ResizeBufferIfNeeded (pvBuffer);
/* TODO: avoid memcpy (when strides align) by attaching to PvBuffer */ /* TODO: avoid memcpy (when strides align) by attaching to PvBuffer */
GstMapInfo minfo; GstMapInfo minfo;
gst_buffer_map (buf, &minfo, GST_MAP_READ); gst_buffer_map (buf, &minfo, GST_MAP_READ);
guint8 *dst = mAcquisitionBuffer->GetDataPointer (); guint8 *dst = pvBuffer->GetDataPointer ();
if (!dst) { if (!dst) {
GST_ERROR_OBJECT (mSink, "Have buffer to fill, but data pointer is invalid"); GST_ERROR_OBJECT (mSink, "Have buffer to fill, but data pointer is invalid");
g_mutex_unlock (&mSink->mutex); //g_mutex_unlock (&mSink->mutex);
return; return;
} }
g_assert (mAcquisitionBuffer->GetSize () >= minfo.size); g_assert (pvBuffer->GetSize () >= minfo.size);
/* TODO: fix stride if needed */ /* TODO: fix stride if needed */
memcpy (dst, minfo.data, minfo.size); memcpy (dst, minfo.data, minfo.size);
gst_buffer_unmap (buf, &minfo); gst_buffer_unmap (buf, &minfo);
mAcquisitionBuffer->ResetChunks(); pvBuffer->ResetChunks();
mAcquisitionBuffer->SetChunkLayoutID(CHUNKLAYOUTID); pvBuffer->SetChunkLayoutID(CHUNKLAYOUTID);
if (mChunkKlvEnabled && klv_byte_array && klv_byte_array->len > 0) { if (mChunkKlvEnabled && klv_byte_array && klv_byte_array->len > 0) {
PvResult pvRes; PvResult pvRes;
pvRes = mAcquisitionBuffer->AddChunk (KLV_CHUNKID, (uint8_t*)klv_byte_array->data, klv_byte_array->len); pvRes = pvBuffer->AddChunk (KLV_CHUNKID, (uint8_t*)klv_byte_array->data, klv_byte_array->len);
if (pvRes.IsOK ()) { if (pvRes.IsOK ()) {
GST_LOG_OBJECT (mSink, "Added KLV as chunk data (len=%d)", klv_byte_array->len); GST_LOG_OBJECT (mSink, "Added KLV as chunk data (len=%d)", klv_byte_array->len);
} else { } else {
@ -301,10 +300,8 @@ GstStreamingChannelSource::SetBuffer (GstBuffer * buf)
g_byte_array_unref (klv_byte_array); g_byte_array_unref (klv_byte_array);
} }
mBufferValid = TRUE; GST_LOG_OBJECT(mSink, "Pushing buffer #%d to output queue", pvBuffer->GetID());
g_cond_signal (&mSink->cond); g_async_queue_push(mOutputQueue, pvBuffer);
g_mutex_unlock (&mSink->mutex);
} }
GByteArray * GstStreamingChannelSource::GetKlvByteArray (GstBuffer * buf) GByteArray * GstStreamingChannelSource::GetKlvByteArray (GstBuffer * buf)

View File

@ -26,6 +26,9 @@ class GstStreamingChannelSource:public PvStreamingChannelSourceDefault
public: public:
GstStreamingChannelSource (); GstStreamingChannelSource ();
void OnStreamingStart();
void OnStreamingStop();
void SetSink (GstPleoraSink * sink); void SetSink (GstPleoraSink * sink);
void SetCaps (GstCaps * caps); void SetCaps (GstCaps * caps);
void ResizeBufferIfNeeded (PvBuffer * aBuffer); void ResizeBufferIfNeeded (PvBuffer * aBuffer);
@ -55,8 +58,8 @@ public:
private: private:
GstPleoraSink * mSink; GstPleoraSink * mSink;
PvBuffer *mAcquisitionBuffer; GAsyncQueue* mInputQueue;
gboolean mBufferValid; GAsyncQueue* mOutputQueue;
gint mBufferCount; gint mBufferCount;
gint mWidth; gint mWidth;
@ -67,4 +70,6 @@ private:
bool mChunkKlvEnabled; bool mChunkKlvEnabled;
gint mKlvChunkSize; gint mKlvChunkSize;
bool mStreamingStarted;
}; };