Skip to content

Add: Async session creation for st40p GStreamer #1149

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions ecosystem/gstreamer_plugin/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -363,12 +363,13 @@ The `mtl_st40p_tx` plugin supports all pad capabilities (the data is not checked
- **Capabilities**: Any (GST_STATIC_CAPS_ANY)

**Arguments**
| Property Name | Type | Description | Range | Default Value |
|-------------------|------|--------------------------------------------------------------------|---------------|---------------|
| tx-framebuff-cnt | uint | Number of framebuffers to be used for transmission. | 0 to G_MAXUINT| 3 |
| tx-fps | uint | Framerate of the video to which the ancillary data is synchronized.| [Supported video fps fractions](#231-supported-video-fps-fractions) | 25/1 |
| tx-did | uint | Data ID for the ancillary data. | 0 to 255 | 0 |
| tx-sdid | uint | Secondary Data ID for the ancillary data. | 0 to 255 | 0 |
| Property Name | Type | Description | Range | Default Value |
|----------------------|---------|--------------------------------------------------------------------|---------------|---------------|
| tx-framebuff-cnt | uint | Number of framebuffers to be used for transmission. | 0 to G_MAXUINT| 3 |
| tx-fps | uint | Framerate of the video to which the ancillary data is synchronized.| [Supported video fps fractions](#231-supported-video-fps-fractions) | 25/1 |
| tx-did | uint | Data ID for the ancillary data. | 0 to 255 | 0 |
| tx-sdid | uint | Secondary Data ID for the ancillary data. | 0 to 255 | 0 |
| async-session-create | boolean | Improve initialization time by creating a session in a separate thread. All buffers that arrive before the session is ready will be dropped. | TRUE/FALSE | FALSE |

#### 5.1.2. Example GStreamer Pipeline for Transmission

Expand Down
20 changes: 17 additions & 3 deletions ecosystem/gstreamer_plugin/gst_mtl_st20p_tx.c
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,11 @@ static gboolean gst_mtl_st20p_tx_start(GstBaseSink* bsink) {
}

if (sink->async_session_create) {
pthread_mutex_init(&sink->session_mutex, NULL);
if (pthread_mutex_init(&sink->session_mutex, NULL)) {
GST_ERROR("Failed to initialize mutex");
return FALSE;
}

sink->session_ready = FALSE;
}

Expand Down Expand Up @@ -385,10 +389,20 @@ static gboolean gst_mtl_st20p_tx_sink_event(GstPad* pad, GstObject* parent,
gst_event_parse_caps(event, &caps);
if (sink->async_session_create) {
thread_data = malloc(sizeof(GstMtlSt20pTxThreadData));
if (!thread_data) {
GST_ERROR("Failed to allocate memory for thread data");
return FALSE;
}

thread_data->sink = sink;
thread_data->caps = gst_caps_ref(caps);
pthread_create(&sink->session_thread, NULL,
gst_mtl_st20p_tx_session_create_thread, thread_data);

if (pthread_create(&sink->session_thread, NULL,
gst_mtl_st20p_tx_session_create_thread, thread_data)) {
GST_ERROR("Failed to create session thread");
free(thread_data);
return FALSE;
}
} else {
ret = gst_mtl_st20p_tx_session_create(sink, caps);
if (!ret) {
Expand Down
17 changes: 16 additions & 1 deletion ecosystem/gstreamer_plugin/gst_mtl_st30p_tx.c
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,11 @@ static gboolean gst_mtl_st30p_tx_start(GstBaseSink* bsink) {
}

if (sink->async_session_create) {
pthread_mutex_init(&sink->session_mutex, NULL);
if (pthread_mutex_init(&sink->session_mutex, NULL)) {
GST_ERROR("Failed to initialize mutex");
return FALSE;
}

sink->session_ready = FALSE;
}

Expand Down Expand Up @@ -419,10 +423,21 @@ static gboolean gst_mtl_st30p_tx_sink_event(GstPad* pad, GstObject* parent,
gst_event_parse_caps(event, &caps);
if (sink->async_session_create) {
thread_data = malloc(sizeof(GstMtlSt30pTxThreadData));
if (!thread_data) {
GST_ERROR("Failed to allocate memory for thread data");
return FALSE;
}

thread_data->sink = sink;
thread_data->caps = gst_caps_ref(caps);
pthread_create(&sink->session_thread, NULL,
gst_mtl_st30p_tx_session_create_thread, thread_data);

if (&sink->session_thread == NULL) {
GST_ERROR("Failed to create session thread");
free(thread_data);
return FALSE;
}
} else {
ret = gst_mtl_st30p_tx_session_create(sink, caps);
if (!ret) {
Expand Down
9 changes: 9 additions & 0 deletions ecosystem/gstreamer_plugin/gst_mtl_st40_rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,11 @@ static GstFlowReturn gst_mtl_st40_rx_fill_buffer(Gst_Mtl_St40_Rx* src, GstBuffer
} else if (src->udw_size == 0) {
src->udw_size = udw_size;
src->anc_data = (char*)malloc(udw_size);
if(!src->anc_data) {
GST_ERROR("Failed to allocate memory for ancillary data");
return GST_FLOW_ERROR;
}

} else if (src->udw_size != udw_size) {
GST_INFO("Size of received ancillary data has changed");
if (src->anc_data) {
Expand All @@ -400,6 +405,10 @@ static GstFlowReturn gst_mtl_st40_rx_fill_buffer(Gst_Mtl_St40_Rx* src, GstBuffer
}
src->udw_size = udw_size;
src->anc_data = (char*)malloc(udw_size);
if(!src->anc_data) {
GST_ERROR("Failed to allocate memory for ancillary data");
return GST_FLOW_ERROR;
}
}

*buffer = gst_buffer_new_allocate(NULL, src->udw_size, NULL);
Expand Down
116 changes: 92 additions & 24 deletions ecosystem/gstreamer_plugin/gst_mtl_st40p_tx.c
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
#endif

#include <gst/gst.h>
#include <pthread.h>
#include <unistd.h>

#include "gst_mtl_st40p_tx.h"
Expand Down Expand Up @@ -98,9 +99,15 @@ enum {
PROP_ST40P_TX_FRAMERATE,
PROP_ST40P_TX_DID,
PROP_ST40P_TX_SDID,
PROP_ST40P_TX_ASYNC_SESSION_CREATE,
PROP_MAX
};

/* Structure to pass arguments to the thread function */
typedef struct {
Gst_Mtl_St40p_Tx* sink;
} GstMtlSt40pTxThreadData;

/* pad template */
static GstStaticPadTemplate gst_mtl_st40p_tx_sink_pad_template =
GST_STATIC_PAD_TEMPLATE("sink", GST_PAD_SINK, GST_PAD_ALWAYS, GST_STATIC_CAPS_ANY);
Expand All @@ -125,6 +132,7 @@ static GstFlowReturn gst_mtl_st40p_tx_chain(GstPad* pad, GstObject* parent,
GstBuffer* buf);

static gboolean gst_mtl_st40p_tx_start(GstBaseSink* bsink);

static gboolean gst_mtl_st40p_tx_session_create(Gst_Mtl_St40p_Tx* sink);

static void gst_mtl_st40p_tx_class_init(Gst_Mtl_St40p_TxClass* klass) {
Expand Down Expand Up @@ -173,6 +181,12 @@ static void gst_mtl_st40p_tx_class_init(Gst_Mtl_St40p_TxClass* klass) {
g_param_spec_uint("tx-sdid", "Secondary Data ID",
"Secondary Data ID for the ancillary data", 0, 0xff, 0,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));

g_object_class_install_property(
gobject_class, PROP_ST40P_TX_ASYNC_SESSION_CREATE,
g_param_spec_boolean("async-session-create", "Async Session Create",
"Create TX session in a separate thread.", FALSE,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
}

static gboolean gst_mtl_st40p_tx_start(GstBaseSink* bsink) {
Expand All @@ -189,7 +203,14 @@ static gboolean gst_mtl_st40p_tx_start(GstBaseSink* bsink) {
return FALSE;
}

gst_mtl_st40p_tx_session_create(sink);
if (sink->async_session_create) {
if (pthread_mutex_init(&sink->session_mutex, NULL)) {
GST_ERROR("Failed to initialize mutex");
return FALSE;
}

sink->session_ready = FALSE;
}

gst_element_set_state(GST_ELEMENT(sink), GST_STATE_PLAYING);

Expand Down Expand Up @@ -238,6 +259,9 @@ static void gst_mtl_st40p_tx_set_property(GObject* object, guint prop_id,
case PROP_ST40P_TX_SDID:
self->sdid = g_value_get_uint(value);
break;
case PROP_ST40P_TX_ASYNC_SESSION_CREATE:
self->async_session_create = g_value_get_boolean(value);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
break;
Expand Down Expand Up @@ -267,15 +291,17 @@ static void gst_mtl_st40p_tx_get_property(GObject* object, guint prop_id, GValue
case PROP_ST40P_TX_SDID:
g_value_set_uint(value, sink->sdid);
break;
case PROP_ST40P_TX_ASYNC_SESSION_CREATE:
g_value_set_boolean(value, sink->async_session_create);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
break;
}
}

/*
* Create MTL session tx handle and initialize the session with the parameters
* from caps negotiated by the pipeline.
* Create MTL session tx handle and initialize the session with the parameters.
*/
static gboolean gst_mtl_st40p_tx_session_create(Gst_Mtl_St40p_Tx* sink) {
struct st40p_tx_ops ops_tx = {0};
Expand Down Expand Up @@ -334,11 +360,31 @@ static gboolean gst_mtl_st40p_tx_session_create(Gst_Mtl_St40p_Tx* sink) {
return FALSE;
}

if (sink->async_session_create) {
pthread_mutex_lock(&sink->session_mutex);
sink->session_ready = TRUE;
pthread_mutex_unlock(&sink->session_mutex);
}

return TRUE;
}

static void* gst_mtl_st40p_tx_session_create_thread(void* data) {
GstMtlSt40pTxThreadData* thread_data = (GstMtlSt40pTxThreadData*)data;
gboolean result = gst_mtl_st40p_tx_session_create(thread_data->sink);

if (!result) {
GST_ELEMENT_ERROR(thread_data->sink, RESOURCE, FAILED, (NULL),
("Failed to create TX session in worker thread"));
}

free(thread_data);
return NULL;
}

static gboolean gst_mtl_st40p_tx_sink_event(GstPad* pad, GstObject* parent,
GstEvent* event) {
GstMtlSt40pTxThreadData* thread_data;
Gst_Mtl_St40p_Tx* sink;
gint ret;

Expand All @@ -350,17 +396,28 @@ static gboolean gst_mtl_st40p_tx_sink_event(GstPad* pad, GstObject* parent,
ret = GST_EVENT_TYPE(event);

switch (GST_EVENT_TYPE(event)) {
case GST_EVENT_SEGMENT:
if (!sink->tx_handle) {
GST_ERROR("Tx handle not initialized");
return FALSE;
case GST_EVENT_STREAM_START:
if (sink->async_session_create) {
thread_data = malloc(sizeof(GstMtlSt40pTxThreadData));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handle malloc fail and pthread_create fail cases.

if (!thread_data) {
GST_ERROR("Failed to allocate memory for thread data");
return FALSE;
}

thread_data->sink = sink;
if (pthread_create(&sink->session_thread, NULL,
gst_mtl_st40p_tx_session_create_thread, thread_data)) {
GST_ERROR("Failed to create session thread");
free(thread_data);
return FALSE;
}
} else {
ret = gst_mtl_st40p_tx_session_create(sink);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you know how much later GST_EVENT_STREAM_START is send after gst_mtl_st40p_tx_start() was called? Are we ok with introducing this delay to the case when async_session_create == false?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we absolutly are not
I was under the impression that this signal is being "sent" much faster, it is noooot after the packets starts flowing its being sent
i though that leaving this in signals would be fine but i don't think so no more will move him

if (!ret) {
GST_ERROR("Failed to create TX session");
return FALSE;
}
}
ret = gst_pad_event_default(pad, parent, event);
break;
case GST_EVENT_EOS:
ret = gst_pad_event_default(pad, parent, event);
gst_element_post_message(GST_ELEMENT(sink), gst_message_new_eos(GST_OBJECT(sink)));
break;
default:
ret = gst_pad_event_default(pad, parent, event);
break;
Expand Down Expand Up @@ -414,6 +471,18 @@ static GstFlowReturn gst_mtl_st40p_tx_chain(GstPad* pad, GstObject* parent,
gint bytes_to_write, bytes_to_write_cur;
void* cur_addr_buf;

if (sink->async_session_create) {
pthread_mutex_lock(&sink->session_mutex);
gboolean session_ready = sink->session_ready;
pthread_mutex_unlock(&sink->session_mutex);

if (!session_ready) {
GST_WARNING("Session not ready, dropping buffer");
gst_buffer_unref(buf);
return GST_FLOW_OK;
}
}

if (!sink->tx_handle) {
GST_ERROR("Tx handle not initialized");
return GST_FLOW_ERROR;
Expand All @@ -434,6 +503,7 @@ static GstFlowReturn gst_mtl_st40p_tx_chain(GstPad* pad, GstObject* parent,
GST_ERROR("Failed to get frame");
return GST_FLOW_ERROR;
}

cur_addr_buf = map_info.data + gst_buffer_get_size(buf) - bytes_to_write;
bytes_to_write_cur =
bytes_to_write > sink->frame_size ? sink->frame_size : bytes_to_write;
Expand All @@ -445,26 +515,24 @@ static GstFlowReturn gst_mtl_st40p_tx_chain(GstPad* pad, GstObject* parent,
}
gst_memory_unmap(gst_buffer_memory, &map_info);
}

gst_buffer_unref(buf);
return GST_FLOW_OK;
}

static void gst_mtl_st40p_tx_finalize(GObject* object) {
Gst_Mtl_St40p_Tx* sink = GST_MTL_ST40P_TX(object);

if (sink->tx_handle) {
if (st40p_tx_free(sink->tx_handle)) {
GST_ERROR("Failed to free tx handle");
return;
}
if (sink->async_session_create) {
if (sink->session_thread) pthread_join(sink->session_thread, NULL);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer to deallocate GstMtlSt40pTxThreadData here, after join(), so as not to split the responsibility for memory control between 2 threads, but your way is valid too.

pthread_mutex_destroy(&sink->session_mutex);
}

if (sink->mtl_lib_handle) {
if (gst_mtl_common_deinit_handle(&sink->mtl_lib_handle)) {
GST_ERROR("Failed to uninitialize MTL library");
return;
}
}
if (sink->tx_handle && st40p_tx_free(sink->tx_handle))
GST_ERROR("Failed to free tx handle");

if (sink->mtl_lib_handle && gst_mtl_common_deinit_handle(&sink->mtl_lib_handle))
GST_ERROR("Failed to uninitialize MTL library");
}

static gboolean plugin_init(GstPlugin* mtl_st40p_tx) {
Expand Down
5 changes: 5 additions & 0 deletions ecosystem/gstreamer_plugin/gst_mtl_st40p_tx.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ struct _Gst_Mtl_St40p_Tx {
st40p_tx_handle tx_handle;
guint frame_size;

gboolean session_ready;
pthread_mutex_t session_mutex;
pthread_t session_thread;

/* arguments */
guint log_level;
GeneralArgs generalArgs; /* imtl initialization arguments */
Expand All @@ -70,6 +74,7 @@ struct _Gst_Mtl_St40p_Tx {
guint fps_n, fps_d;
guint did;
guint sdid;
gboolean async_session_create;
};

G_END_DECLS
Expand Down