Skip to content

Commit

Permalink
Added active flag for subscribe/publish topics that allow disable/ena…
Browse files Browse the repository at this point in the history
…ble of it's use without undefining them
  • Loading branch information
grodansparadis committed Mar 28, 2024
1 parent 9b31d98 commit 96077a9
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 71 deletions.
188 changes: 117 additions & 71 deletions src/vscp/common/vscp_client_mqtt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
#else
// Name change from 1.6 (after 1.5.8)
// As it looks it is not installed by deb script
//#include <mqtt3_protocol.h>
// #include <mqtt3_protocol.h>
#endif
#endif

Expand Down Expand Up @@ -86,7 +86,7 @@ password_callback(char *buf, int size, int rwflag, void *userdata)

memset(buf, 0, size);
strncpy(buf, pClient->getPassword().c_str(), size);
return (int)strlen(buf);
return (int) strlen(buf);
}

///////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -514,6 +514,7 @@ publishTopic::publishTopic(const std::string &topic, enumMqttMsgFormat format, i
m_topic = topic;
m_qos = qos;
m_bRetain = bretain;
m_bActive = true;
m_format = format;
#if LIBMOSQUITTO_MAJOR > 1 || (LIBMOSQUITTO_MAJOR == 1 && LIBMOSQUITTO_MINOR >= 6)
m_properties = properties;
Expand Down Expand Up @@ -550,6 +551,7 @@ subscribeTopic::subscribeTopic(const std::string &topic, enumMqttMsgFormat forma
m_topic = topic;
m_qos = qos;
m_v5_options = v5_options;
m_bActive = true;
m_format = format;
#if LIBMOSQUITTO_MAJOR > 1 || (LIBMOSQUITTO_MAJOR == 1 && LIBMOSQUITTO_MINOR >= 6)
m_properties = properties;
Expand Down Expand Up @@ -587,20 +589,21 @@ vscpClientMqtt::vscpClientMqtt(void)
m_mapMqttIntOptions["receive-maximum"] = 20;
m_mapMqttIntOptions["send-maximum"] = 20;

m_bConnected = false; // Not connected
m_bJsonMeasurementAdd = true; // Add measurement block to JSON publish event
m_bindInterface = ""; // No bind interface
m_mosq = nullptr; // No mosquitto connection
m_publish_format = jsonfmt; // Publish inm JSON if not configured to do something else
m_subscribe_format = autofmt; // Automatically detect payload format
m_bRun = true; // Run to the Hills...
m_host = "localhost"; // tcp://localhost:1883
m_port = 1883; // Default port
m_clientid = ""; // No client id set
m_username = ""; // No username set
m_password = ""; // No password set
m_keepAlive = 30; // 30 seconds for keepalive
m_bCleanSession = false; // Do not start with a clean session
m_bConnected = false; // Not connected
m_bJsonMeasurementAdd = true; // Add measurement block to JSON publish event
m_bindInterface = ""; // No bind interface
m_mosq = nullptr; // No mosquitto connection
m_publish_format = jsonfmt; // Publish inm JSON if not configured to do something else
m_subscribe_format = autofmt; // Automatically detect payload format
m_bRun = true; // Run to the Hills...
m_host = "localhost"; // tcp://localhost:1883
m_port = 1883; // Default port
m_clientid = ""; // No client id set
m_username = ""; // No username set
m_password = ""; // No password set
m_keepAlive = 30; // 30 seconds for keepalive
m_bCleanSession = false; // Do not start with a clean session
m_qos = 0; // No quality of service

m_bTLS = false;
m_tls_cafile = "";
Expand Down Expand Up @@ -846,9 +849,10 @@ vscpClientMqtt::initFromJson(const std::string &config)
// {rnd} mustasch is replaces with hex random value
spdlog::debug("json mqtt init: 'client id' set to {}.", m_clientid);
if (m_clientid.length() > MQTT_MAX_CLIENTID_LENGTH) {
spdlog::warn("json mqtt init: 'client id' is to long {0} length={1} (Standard say max 23 characters but longer is OK with most brokers).",
m_clientid,
m_clientid.length());
spdlog::warn("json mqtt init: 'client id' is to long {0} length={1} (Standard say max 23 characters but longer "
"is OK with most brokers).",
m_clientid,
m_clientid.length());
}
}

Expand Down Expand Up @@ -1001,21 +1005,49 @@ vscpClientMqtt::initFromJson(const std::string &config)
if (jj.contains("guid-filter") && j["guid-filter"].is_string()) {
std::string str = jj["guid-filter"].get<std::string>();
vscp_getGuidFromStringToArray(m_filter.filter_GUID, str);
spdlog::debug("json mqtt init: 'guid-filter' set to {0:x}:{1:x}:{2:x}:{3:x}:{4:x}:{5:x}:{6:x}:{7:x}:{8:x}:{9:x}:{10:x}:{11:x}:{12:x}:{13:x}:{14:x}:{15:x}.",
m_filter.filter_GUID[0], m_filter.filter_GUID[1], m_filter.filter_GUID[2], m_filter.filter_GUID[3],
m_filter.filter_GUID[4], m_filter.filter_GUID[5], m_filter.filter_GUID[6], m_filter.filter_GUID[7],
m_filter.filter_GUID[8], m_filter.filter_GUID[9], m_filter.filter_GUID[10], m_filter.filter_GUID[11],
m_filter.filter_GUID[12], m_filter.filter_GUID[13], m_filter.filter_GUID[14], m_filter.filter_GUID[15]);
spdlog::debug(
"json mqtt init: 'guid-filter' set to "
"{0:x}:{1:x}:{2:x}:{3:x}:{4:x}:{5:x}:{6:x}:{7:x}:{8:x}:{9:x}:{10:x}:{11:x}:{12:x}:{13:x}:{14:x}:{15:x}.",
m_filter.filter_GUID[0],
m_filter.filter_GUID[1],
m_filter.filter_GUID[2],
m_filter.filter_GUID[3],
m_filter.filter_GUID[4],
m_filter.filter_GUID[5],
m_filter.filter_GUID[6],
m_filter.filter_GUID[7],
m_filter.filter_GUID[8],
m_filter.filter_GUID[9],
m_filter.filter_GUID[10],
m_filter.filter_GUID[11],
m_filter.filter_GUID[12],
m_filter.filter_GUID[13],
m_filter.filter_GUID[14],
m_filter.filter_GUID[15]);
}

if (jj.contains("guid-mask") && j["guid-mask"].is_string()) {
std::string str = jj["guid-mask"].get<std::string>();
vscp_getGuidFromStringToArray(m_filter.mask_GUID, str);
spdlog::debug("json mqtt init: 'guid-mask' set to {0:x}:{1:x}:{2:x}:{3:x}:{4:x}:{5:x}:{6:x}:{7:x}:{8:x}:{9:x}:{10:x}:{11:x}:{12:x}:{13:x}:{14:x}:{15:x}.",
m_filter.mask_GUID[0], m_filter.mask_GUID[1], m_filter.mask_GUID[2], m_filter.mask_GUID[3],
m_filter.mask_GUID[4], m_filter.mask_GUID[5], m_filter.mask_GUID[6], m_filter.mask_GUID[7],
m_filter.mask_GUID[8], m_filter.mask_GUID[9], m_filter.mask_GUID[10], m_filter.mask_GUID[11],
m_filter.mask_GUID[12], m_filter.mask_GUID[13], m_filter.mask_GUID[14], m_filter.mask_GUID[15]);
spdlog::debug(
"json mqtt init: 'guid-mask' set to "
"{0:x}:{1:x}:{2:x}:{3:x}:{4:x}:{5:x}:{6:x}:{7:x}:{8:x}:{9:x}:{10:x}:{11:x}:{12:x}:{13:x}:{14:x}:{15:x}.",
m_filter.mask_GUID[0],
m_filter.mask_GUID[1],
m_filter.mask_GUID[2],
m_filter.mask_GUID[3],
m_filter.mask_GUID[4],
m_filter.mask_GUID[5],
m_filter.mask_GUID[6],
m_filter.mask_GUID[7],
m_filter.mask_GUID[8],
m_filter.mask_GUID[9],
m_filter.mask_GUID[10],
m_filter.mask_GUID[11],
m_filter.mask_GUID[12],
m_filter.mask_GUID[13],
m_filter.mask_GUID[14],
m_filter.mask_GUID[15]);
}
}

Expand Down Expand Up @@ -1174,14 +1206,14 @@ vscpClientMqtt::initFromJson(const std::string &config)
the broker will not publish the message back to the client.
*/
if (std::string::npos != str.find("NO_LOCAL")) {
#ifndef WIN32
#ifndef WIN32
#if LIBMOSQUITTO_MAJOR > 1 || (LIBMOSQUITTO_MAJOR == 1 && LIBMOSQUITTO_MINOR >= 6)
v5_options |= MQTT_SUB_OPT_NO_LOCAL;
#else
v5_options |= 0x04;
#endif
#else
v5_options |= 0x04; // TODO Check if this is the right option
v5_options |= 0x04; // TODO Check if this is the right option
#endif
}
/*
Expand All @@ -1199,7 +1231,7 @@ vscpClientMqtt::initFromJson(const std::string &config)
v5_options |= 0x08;
#endif
#else
v5_options |= 0x08; // TODO Check if this is the right option
v5_options |= 0x08; // TODO Check if this is the right option
#endif
}
/*
Expand All @@ -1217,7 +1249,7 @@ vscpClientMqtt::initFromJson(const std::string &config)
v5_options |= 0x00;
#endif
#else
v5_options |= 0x00; // TODO Check if this is the right option
v5_options |= 0x00; // TODO Check if this is the right option
#endif
}
/*
Expand All @@ -1234,7 +1266,7 @@ vscpClientMqtt::initFromJson(const std::string &config)
v5_options |= 0x10;
#endif
#else
v5_options |= 0x10; // TODO Check if this is the right option
v5_options |= 0x10; // TODO Check if this is the right option
#endif
}
/*
Expand All @@ -1250,7 +1282,7 @@ vscpClientMqtt::initFromJson(const std::string &config)
v5_options |= 0x20;
#endif
#else
v5_options |= 0x20; // TODO Check if this is the right option
v5_options |= 0x20; // TODO Check if this is the right option
#endif
}

Expand Down Expand Up @@ -1439,8 +1471,8 @@ vscpClientMqtt::handleMessage(const struct mosquitto_message *pmsg)
else {
// String: send 0,20,3,,,,0:1:2:3:4:5:6:7:8:9:10:11:12:13:14:15,0,1,35<CR><LF>
std::string s = (char *) pmsg->payload;
size_t n = std::count(s.begin(), s.end(), ',');
if (n<6) {
size_t n = std::count(s.begin(), s.end(), ',');
if (n < 6) {
// This is not a VSCP event on string format
spdlog::trace("Payload is not VSCP event.");
return false;
Expand Down Expand Up @@ -1601,7 +1633,7 @@ vscpClientMqtt::handleMessage(const struct mosquitto_message *pmsg)
spdlog::critical("Memory problem.");
return false;
}

pEvent->pdata = nullptr;

if (!vscp_copyEvent(pEvent, &ev)) {
Expand Down Expand Up @@ -1733,7 +1765,7 @@ vscpClientMqtt::init(void)
#if LIBMOSQUITTO_MAJOR > 1 || (LIBMOSQUITTO_MAJOR == 1 && LIBMOSQUITTO_MINOR >= 6)
if (MOSQ_ERR_SUCCESS != mosquitto_will_set_v5(m_mosq,
strTopic.c_str(),
(int)m_will_payload.length(),
(int) m_will_payload.length(),
m_will_payload.c_str(),
m_will_qos,
m_will_bretain,
Expand All @@ -1752,7 +1784,7 @@ vscpClientMqtt::init(void)
else {
if (MOSQ_ERR_SUCCESS != mosquitto_will_set(m_mosq,
strTopic.c_str(),
(int)m_will_payload.length(),
(int) m_will_payload.length(),
m_will_payload.c_str(),
m_will_qos,
m_will_bretain)) {
Expand Down Expand Up @@ -1986,8 +2018,7 @@ vscpClientMqtt::connect(void)
strTopic = subtemplate.render(data);
}

spdlog::trace("Publish will: Topic: {0} Payload: empty qos=1 retain=true",
strTopic);
spdlog::trace("Publish will: Topic: {0} Payload: empty qos=1 retain=true", strTopic);

if (MOSQ_ERR_SUCCESS != (rv = mosquitto_publish(m_mosq,
NULL, // msg id
Expand All @@ -2007,17 +2038,24 @@ vscpClientMqtt::connect(void)

subscribeTopic *psubtopic = (*it);

/*!
if the subscription is not active we do not subscribe.
Normally all are active but tools like vscp-works-qt (session)
can inactivate sessions temporarily. This is never saved
persistently.
*/
if (!psubtopic->isActive()) {
continue;
}

// Fix subscribe topics
mustache subtemplate{ psubtopic->getTopic() };
data data;
// data.set("guid", m_guid.getAsString());
std::string subscribe_topic = subtemplate.render(data);

// Subscribe to specified topic
rv = mosquitto_subscribe(m_mosq,
/*m_mqtt_id*/ nullptr,
subscribe_topic.c_str(),
m_qos);
rv = mosquitto_subscribe(m_mosq, nullptr, subscribe_topic.c_str(), m_qos);
if (MOSQ_ERR_SUCCESS != rv) {
spdlog::error("Failed to subscribed to topic '{0}' - rv={1} {2}.", subscribe_topic, rv, mosquitto_strerror(rv));
}
Expand All @@ -2026,23 +2064,23 @@ vscpClientMqtt::connect(void)
// Start worker thread if a callback has been defined
if ((nullptr != m_evcallback) || (nullptr != m_excallback)) {
int rv = pthread_create(&m_tid, nullptr, workerThread, this);
switch(rv) {
switch (rv) {

case EAGAIN:
spdlog::error("Failed to start MQTT callback thread - Insufficient resources to create another thread.");
break;
case EAGAIN:
spdlog::error("Failed to start MQTT callback thread - Insufficient resources to create another thread.");
break;

case EINVAL:
spdlog::error("Failed to start MQTT callback thread - Invalid settings in attr");
break;
case EINVAL:
spdlog::error("Failed to start MQTT callback thread - Invalid settings in attr");
break;

case EPERM:
spdlog::error("Failed to start MQTT callback thread - No permission to set the scheduling policy");
break;
case EPERM:
spdlog::error("Failed to start MQTT callback thread - No permission to set the scheduling policy");
break;

default:
spdlog::debug("Started MQTT callback thread");
break;
default:
spdlog::debug("Started MQTT callback thread");
break;
}
}

Expand Down Expand Up @@ -2109,6 +2147,17 @@ vscpClientMqtt::send(vscpEvent &ev)
++it) {

publishTopic *ppublish = (*it);

/*!
if the publish item is not active we do not publish.
Normally all are active but tools like vscp-works-qt (session)
can inactivate sessions temporarily. This is never saved
persistently.
*/
if (!ppublish->isActive()) {
continue;
}

memset(payload, 0, sizeof(payload));

if (ppublish->getFormat() == jsonfmt) {
Expand Down Expand Up @@ -2198,7 +2247,6 @@ vscpClientMqtt::send(vscpEvent &ev)
mustache subtemplate{ topic_template };
data data;
cguid evguid(ev.GUID); // Event GUID


// Event GUID
data.set("guid", evguid.getAsString());
Expand Down Expand Up @@ -2237,7 +2285,6 @@ vscpClientMqtt::send(vscpEvent &ev)
data.set(vscp_str_format("xifguid[%d]", i), vscp_str_format("%02X", m_ifguid.getAt(i)));
}


// Event data
data.set("sizedata", vscp_str_format("%d", ev.sizeData));
data.set("xsizedata", vscp_str_format("%04X", ev.sizeData));
Expand All @@ -2253,7 +2300,6 @@ vscpClientMqtt::send(vscpEvent &ev)
data.set("class", vscp_str_format("%d", ev.vscp_class));
data.set("type", vscp_str_format("%d", ev.vscp_type));


data.set("xclass", vscp_str_format("%02x", ev.vscp_class));
data.set("xtype", vscp_str_format("%02x", ev.vscp_type));

Expand Down Expand Up @@ -2349,10 +2395,10 @@ vscpClientMqtt::send(vscpEvent &ev)
}

spdlog::trace("Publish send ev: Topic: {0} qos={1} retain={2}",
strTopic,
/*(unsigned char *)payload,*/
ppublish->getQos(),
ppublish->getRetain());
strTopic,
/*(unsigned char *)payload,*/
ppublish->getQos(),
ppublish->getRetain());

if (MOSQ_ERR_SUCCESS != (rv = mosquitto_publish(m_mosq,
NULL, // msg id
Expand Down Expand Up @@ -2476,7 +2522,7 @@ vscpClientMqtt::send(vscpEventEx &ex)
mustache subtemplate{ topic_template };
data data;
cguid evguid(ex.GUID); // Event GUID

// Event GUID
data.set("guid", evguid.getAsString());

Expand Down Expand Up @@ -2636,10 +2682,10 @@ vscpClientMqtt::send(vscpEventEx &ex)
}

spdlog::trace("Publish send ex: Topic: {0} qos={1} retain={2}",
strTopic,
/*(unsigned char *)payload,*/
ppublish->getQos(),
ppublish->getRetain());
strTopic,
/*(unsigned char *)payload,*/
ppublish->getQos(),
ppublish->getRetain());

if (MOSQ_ERR_SUCCESS != (rv = mosquitto_publish(m_mosq,
NULL, // msg id
Expand Down
Loading

0 comments on commit 96077a9

Please sign in to comment.