diff --git a/ecosystem/gstreamer_plugin/README.md b/ecosystem/gstreamer_plugin/README.md index d8034a085..f23efb8d6 100644 --- a/ecosystem/gstreamer_plugin/README.md +++ b/ecosystem/gstreamer_plugin/README.md @@ -363,12 +363,13 @@ The `mtl_st40p_tx` plugin supports all pad capabilities (the data is not checked - **Capabilities**: Any (GST_STATIC_CAPS_ANY) **Arguments** -| Property Name | Type | Description | Range | Default Value | -|-------------------|------|--------------------------------------------------------------------|---------------|---------------| -| tx-framebuff-cnt | uint | Number of framebuffers to be used for transmission. | 0 to G_MAXUINT| 3 | -| tx-fps | uint | Framerate of the video to which the ancillary data is synchronized.| [Supported video fps fractions](#231-supported-video-fps-fractions) | 25/1 | -| tx-did | uint | Data ID for the ancillary data. | 0 to 255 | 0 | -| tx-sdid | uint | Secondary Data ID for the ancillary data. | 0 to 255 | 0 | +| Property Name | Type | Description | Range | Default Value | +|----------------------|---------|--------------------------------------------------------------------|---------------|---------------| +| tx-framebuff-cnt | uint | Number of framebuffers to be used for transmission. | 0 to G_MAXUINT| 3 | +| tx-fps | uint | Framerate of the video to which the ancillary data is synchronized.| [Supported video fps fractions](#231-supported-video-fps-fractions) | 25/1 | +| tx-did | uint | Data ID for the ancillary data. | 0 to 255 | 0 | +| tx-sdid | uint | Secondary Data ID for the ancillary data. | 0 to 255 | 0 | +| async-session-create | boolean | Improve initialization time by creating a session in a separate thread. All buffers that arrive before the session is ready will be dropped. | TRUE/FALSE | FALSE | #### 5.1.2. Example GStreamer Pipeline for Transmission diff --git a/ecosystem/gstreamer_plugin/gst_mtl_st20p_tx.c b/ecosystem/gstreamer_plugin/gst_mtl_st20p_tx.c index c4338cddf..8e112a86a 100644 --- a/ecosystem/gstreamer_plugin/gst_mtl_st20p_tx.c +++ b/ecosystem/gstreamer_plugin/gst_mtl_st20p_tx.c @@ -201,7 +201,11 @@ static gboolean gst_mtl_st20p_tx_start(GstBaseSink* bsink) { } if (sink->async_session_create) { - pthread_mutex_init(&sink->session_mutex, NULL); + if (pthread_mutex_init(&sink->session_mutex, NULL)) { + GST_ERROR("Failed to initialize mutex"); + return FALSE; + } + sink->session_ready = FALSE; } @@ -385,10 +389,20 @@ static gboolean gst_mtl_st20p_tx_sink_event(GstPad* pad, GstObject* parent, gst_event_parse_caps(event, &caps); if (sink->async_session_create) { thread_data = malloc(sizeof(GstMtlSt20pTxThreadData)); + if (!thread_data) { + GST_ERROR("Failed to allocate memory for thread data"); + return FALSE; + } + thread_data->sink = sink; thread_data->caps = gst_caps_ref(caps); - pthread_create(&sink->session_thread, NULL, - gst_mtl_st20p_tx_session_create_thread, thread_data); + + if (pthread_create(&sink->session_thread, NULL, + gst_mtl_st20p_tx_session_create_thread, thread_data)) { + GST_ERROR("Failed to create session thread"); + free(thread_data); + return FALSE; + } } else { ret = gst_mtl_st20p_tx_session_create(sink, caps); if (!ret) { diff --git a/ecosystem/gstreamer_plugin/gst_mtl_st30p_tx.c b/ecosystem/gstreamer_plugin/gst_mtl_st30p_tx.c index a6fdc116b..293aaee9a 100644 --- a/ecosystem/gstreamer_plugin/gst_mtl_st30p_tx.c +++ b/ecosystem/gstreamer_plugin/gst_mtl_st30p_tx.c @@ -201,7 +201,11 @@ static gboolean gst_mtl_st30p_tx_start(GstBaseSink* bsink) { } if (sink->async_session_create) { - pthread_mutex_init(&sink->session_mutex, NULL); + if (pthread_mutex_init(&sink->session_mutex, NULL)) { + GST_ERROR("Failed to initialize mutex"); + return FALSE; + } + sink->session_ready = FALSE; } @@ -419,10 +423,21 @@ static gboolean gst_mtl_st30p_tx_sink_event(GstPad* pad, GstObject* parent, gst_event_parse_caps(event, &caps); if (sink->async_session_create) { thread_data = malloc(sizeof(GstMtlSt30pTxThreadData)); + if (!thread_data) { + GST_ERROR("Failed to allocate memory for thread data"); + return FALSE; + } + thread_data->sink = sink; thread_data->caps = gst_caps_ref(caps); pthread_create(&sink->session_thread, NULL, gst_mtl_st30p_tx_session_create_thread, thread_data); + + if (&sink->session_thread == NULL) { + GST_ERROR("Failed to create session thread"); + free(thread_data); + return FALSE; + } } else { ret = gst_mtl_st30p_tx_session_create(sink, caps); if (!ret) { diff --git a/ecosystem/gstreamer_plugin/gst_mtl_st40_rx.c b/ecosystem/gstreamer_plugin/gst_mtl_st40_rx.c index d4170914d..ce5b635bb 100644 --- a/ecosystem/gstreamer_plugin/gst_mtl_st40_rx.c +++ b/ecosystem/gstreamer_plugin/gst_mtl_st40_rx.c @@ -392,6 +392,11 @@ static GstFlowReturn gst_mtl_st40_rx_fill_buffer(Gst_Mtl_St40_Rx* src, GstBuffer } else if (src->udw_size == 0) { src->udw_size = udw_size; src->anc_data = (char*)malloc(udw_size); + if(!src->anc_data) { + GST_ERROR("Failed to allocate memory for ancillary data"); + return GST_FLOW_ERROR; + } + } else if (src->udw_size != udw_size) { GST_INFO("Size of received ancillary data has changed"); if (src->anc_data) { @@ -400,6 +405,10 @@ static GstFlowReturn gst_mtl_st40_rx_fill_buffer(Gst_Mtl_St40_Rx* src, GstBuffer } src->udw_size = udw_size; src->anc_data = (char*)malloc(udw_size); + if(!src->anc_data) { + GST_ERROR("Failed to allocate memory for ancillary data"); + return GST_FLOW_ERROR; + } } *buffer = gst_buffer_new_allocate(NULL, src->udw_size, NULL); diff --git a/ecosystem/gstreamer_plugin/gst_mtl_st40p_tx.c b/ecosystem/gstreamer_plugin/gst_mtl_st40p_tx.c index 54c07e493..69642356b 100644 --- a/ecosystem/gstreamer_plugin/gst_mtl_st40p_tx.c +++ b/ecosystem/gstreamer_plugin/gst_mtl_st40p_tx.c @@ -65,6 +65,7 @@ #endif #include +#include #include #include "gst_mtl_st40p_tx.h" @@ -98,9 +99,15 @@ enum { PROP_ST40P_TX_FRAMERATE, PROP_ST40P_TX_DID, PROP_ST40P_TX_SDID, + PROP_ST40P_TX_ASYNC_SESSION_CREATE, PROP_MAX }; +/* Structure to pass arguments to the thread function */ +typedef struct { + Gst_Mtl_St40p_Tx* sink; +} GstMtlSt40pTxThreadData; + /* pad template */ static GstStaticPadTemplate gst_mtl_st40p_tx_sink_pad_template = GST_STATIC_PAD_TEMPLATE("sink", GST_PAD_SINK, GST_PAD_ALWAYS, GST_STATIC_CAPS_ANY); @@ -125,6 +132,7 @@ static GstFlowReturn gst_mtl_st40p_tx_chain(GstPad* pad, GstObject* parent, GstBuffer* buf); static gboolean gst_mtl_st40p_tx_start(GstBaseSink* bsink); + static gboolean gst_mtl_st40p_tx_session_create(Gst_Mtl_St40p_Tx* sink); static void gst_mtl_st40p_tx_class_init(Gst_Mtl_St40p_TxClass* klass) { @@ -173,6 +181,12 @@ static void gst_mtl_st40p_tx_class_init(Gst_Mtl_St40p_TxClass* klass) { g_param_spec_uint("tx-sdid", "Secondary Data ID", "Secondary Data ID for the ancillary data", 0, 0xff, 0, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property( + gobject_class, PROP_ST40P_TX_ASYNC_SESSION_CREATE, + g_param_spec_boolean("async-session-create", "Async Session Create", + "Create TX session in a separate thread.", FALSE, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); } static gboolean gst_mtl_st40p_tx_start(GstBaseSink* bsink) { @@ -189,7 +203,14 @@ static gboolean gst_mtl_st40p_tx_start(GstBaseSink* bsink) { return FALSE; } - gst_mtl_st40p_tx_session_create(sink); + if (sink->async_session_create) { + if (pthread_mutex_init(&sink->session_mutex, NULL)) { + GST_ERROR("Failed to initialize mutex"); + return FALSE; + } + + sink->session_ready = FALSE; + } gst_element_set_state(GST_ELEMENT(sink), GST_STATE_PLAYING); @@ -238,6 +259,9 @@ static void gst_mtl_st40p_tx_set_property(GObject* object, guint prop_id, case PROP_ST40P_TX_SDID: self->sdid = g_value_get_uint(value); break; + case PROP_ST40P_TX_ASYNC_SESSION_CREATE: + self->async_session_create = g_value_get_boolean(value); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); break; @@ -267,6 +291,9 @@ static void gst_mtl_st40p_tx_get_property(GObject* object, guint prop_id, GValue case PROP_ST40P_TX_SDID: g_value_set_uint(value, sink->sdid); break; + case PROP_ST40P_TX_ASYNC_SESSION_CREATE: + g_value_set_boolean(value, sink->async_session_create); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); break; @@ -274,8 +301,7 @@ static void gst_mtl_st40p_tx_get_property(GObject* object, guint prop_id, GValue } /* - * Create MTL session tx handle and initialize the session with the parameters - * from caps negotiated by the pipeline. + * Create MTL session tx handle and initialize the session with the parameters. */ static gboolean gst_mtl_st40p_tx_session_create(Gst_Mtl_St40p_Tx* sink) { struct st40p_tx_ops ops_tx = {0}; @@ -334,11 +360,31 @@ static gboolean gst_mtl_st40p_tx_session_create(Gst_Mtl_St40p_Tx* sink) { return FALSE; } + if (sink->async_session_create) { + pthread_mutex_lock(&sink->session_mutex); + sink->session_ready = TRUE; + pthread_mutex_unlock(&sink->session_mutex); + } + return TRUE; } +static void* gst_mtl_st40p_tx_session_create_thread(void* data) { + GstMtlSt40pTxThreadData* thread_data = (GstMtlSt40pTxThreadData*)data; + gboolean result = gst_mtl_st40p_tx_session_create(thread_data->sink); + + if (!result) { + GST_ELEMENT_ERROR(thread_data->sink, RESOURCE, FAILED, (NULL), + ("Failed to create TX session in worker thread")); + } + + free(thread_data); + return NULL; +} + static gboolean gst_mtl_st40p_tx_sink_event(GstPad* pad, GstObject* parent, GstEvent* event) { + GstMtlSt40pTxThreadData* thread_data; Gst_Mtl_St40p_Tx* sink; gint ret; @@ -350,17 +396,28 @@ static gboolean gst_mtl_st40p_tx_sink_event(GstPad* pad, GstObject* parent, ret = GST_EVENT_TYPE(event); switch (GST_EVENT_TYPE(event)) { - case GST_EVENT_SEGMENT: - if (!sink->tx_handle) { - GST_ERROR("Tx handle not initialized"); - return FALSE; + case GST_EVENT_STREAM_START: + if (sink->async_session_create) { + thread_data = malloc(sizeof(GstMtlSt40pTxThreadData)); + if (!thread_data) { + GST_ERROR("Failed to allocate memory for thread data"); + return FALSE; + } + + thread_data->sink = sink; + if (pthread_create(&sink->session_thread, NULL, + gst_mtl_st40p_tx_session_create_thread, thread_data)) { + GST_ERROR("Failed to create session thread"); + free(thread_data); + return FALSE; + } + } else { + ret = gst_mtl_st40p_tx_session_create(sink); + if (!ret) { + GST_ERROR("Failed to create TX session"); + return FALSE; + } } - ret = gst_pad_event_default(pad, parent, event); - break; - case GST_EVENT_EOS: - ret = gst_pad_event_default(pad, parent, event); - gst_element_post_message(GST_ELEMENT(sink), gst_message_new_eos(GST_OBJECT(sink))); - break; default: ret = gst_pad_event_default(pad, parent, event); break; @@ -414,6 +471,18 @@ static GstFlowReturn gst_mtl_st40p_tx_chain(GstPad* pad, GstObject* parent, gint bytes_to_write, bytes_to_write_cur; void* cur_addr_buf; + if (sink->async_session_create) { + pthread_mutex_lock(&sink->session_mutex); + gboolean session_ready = sink->session_ready; + pthread_mutex_unlock(&sink->session_mutex); + + if (!session_ready) { + GST_WARNING("Session not ready, dropping buffer"); + gst_buffer_unref(buf); + return GST_FLOW_OK; + } + } + if (!sink->tx_handle) { GST_ERROR("Tx handle not initialized"); return GST_FLOW_ERROR; @@ -434,6 +503,7 @@ static GstFlowReturn gst_mtl_st40p_tx_chain(GstPad* pad, GstObject* parent, GST_ERROR("Failed to get frame"); return GST_FLOW_ERROR; } + cur_addr_buf = map_info.data + gst_buffer_get_size(buf) - bytes_to_write; bytes_to_write_cur = bytes_to_write > sink->frame_size ? sink->frame_size : bytes_to_write; @@ -445,6 +515,7 @@ static GstFlowReturn gst_mtl_st40p_tx_chain(GstPad* pad, GstObject* parent, } gst_memory_unmap(gst_buffer_memory, &map_info); } + gst_buffer_unref(buf); return GST_FLOW_OK; } @@ -452,19 +523,16 @@ static GstFlowReturn gst_mtl_st40p_tx_chain(GstPad* pad, GstObject* parent, static void gst_mtl_st40p_tx_finalize(GObject* object) { Gst_Mtl_St40p_Tx* sink = GST_MTL_ST40P_TX(object); - if (sink->tx_handle) { - if (st40p_tx_free(sink->tx_handle)) { - GST_ERROR("Failed to free tx handle"); - return; - } + if (sink->async_session_create) { + if (sink->session_thread) pthread_join(sink->session_thread, NULL); + pthread_mutex_destroy(&sink->session_mutex); } - if (sink->mtl_lib_handle) { - if (gst_mtl_common_deinit_handle(&sink->mtl_lib_handle)) { - GST_ERROR("Failed to uninitialize MTL library"); - return; - } - } + if (sink->tx_handle && st40p_tx_free(sink->tx_handle)) + GST_ERROR("Failed to free tx handle"); + + if (sink->mtl_lib_handle && gst_mtl_common_deinit_handle(&sink->mtl_lib_handle)) + GST_ERROR("Failed to uninitialize MTL library"); } static gboolean plugin_init(GstPlugin* mtl_st40p_tx) { diff --git a/ecosystem/gstreamer_plugin/gst_mtl_st40p_tx.h b/ecosystem/gstreamer_plugin/gst_mtl_st40p_tx.h index 22b91c69e..027b77c35 100644 --- a/ecosystem/gstreamer_plugin/gst_mtl_st40p_tx.h +++ b/ecosystem/gstreamer_plugin/gst_mtl_st40p_tx.h @@ -62,6 +62,10 @@ struct _Gst_Mtl_St40p_Tx { st40p_tx_handle tx_handle; guint frame_size; + gboolean session_ready; + pthread_mutex_t session_mutex; + pthread_t session_thread; + /* arguments */ guint log_level; GeneralArgs generalArgs; /* imtl initialization arguments */ @@ -70,6 +74,7 @@ struct _Gst_Mtl_St40p_Tx { guint fps_n, fps_d; guint did; guint sdid; + gboolean async_session_create; }; G_END_DECLS