From 96077a979f5d9d8ecf74e044ba035e3821c13e2f Mon Sep 17 00:00:00 2001 From: Ake Hedman Date: Thu, 28 Mar 2024 22:34:59 +0100 Subject: [PATCH] Added active flag for subscribe/publish topics that allow disable/enable of it's use without undefining them --- src/vscp/common/vscp_client_mqtt.cpp | 188 +++++++++++++++++---------- src/vscp/common/vscp_client_mqtt.h | 33 +++++ 2 files changed, 150 insertions(+), 71 deletions(-) diff --git a/src/vscp/common/vscp_client_mqtt.cpp b/src/vscp/common/vscp_client_mqtt.cpp index d2b63f120..51b8fa7b8 100644 --- a/src/vscp/common/vscp_client_mqtt.cpp +++ b/src/vscp/common/vscp_client_mqtt.cpp @@ -42,7 +42,7 @@ #else // Name change from 1.6 (after 1.5.8) // As it looks it is not installed by deb script -//#include +// #include #endif #endif @@ -86,7 +86,7 @@ password_callback(char *buf, int size, int rwflag, void *userdata) memset(buf, 0, size); strncpy(buf, pClient->getPassword().c_str(), size); - return (int)strlen(buf); + return (int) strlen(buf); } /////////////////////////////////////////////////////////////////////////////// @@ -514,6 +514,7 @@ publishTopic::publishTopic(const std::string &topic, enumMqttMsgFormat format, i m_topic = topic; m_qos = qos; m_bRetain = bretain; + m_bActive = true; m_format = format; #if LIBMOSQUITTO_MAJOR > 1 || (LIBMOSQUITTO_MAJOR == 1 && LIBMOSQUITTO_MINOR >= 6) m_properties = properties; @@ -550,6 +551,7 @@ subscribeTopic::subscribeTopic(const std::string &topic, enumMqttMsgFormat forma m_topic = topic; m_qos = qos; m_v5_options = v5_options; + m_bActive = true; m_format = format; #if LIBMOSQUITTO_MAJOR > 1 || (LIBMOSQUITTO_MAJOR == 1 && LIBMOSQUITTO_MINOR >= 6) m_properties = properties; @@ -587,20 +589,21 @@ vscpClientMqtt::vscpClientMqtt(void) m_mapMqttIntOptions["receive-maximum"] = 20; m_mapMqttIntOptions["send-maximum"] = 20; - m_bConnected = false; // Not connected - m_bJsonMeasurementAdd = true; // Add measurement block to JSON publish event - m_bindInterface = ""; // No bind interface - m_mosq = nullptr; // No mosquitto connection - m_publish_format = jsonfmt; // Publish inm JSON if not configured to do something else - m_subscribe_format = autofmt; // Automatically detect payload format - m_bRun = true; // Run to the Hills... - m_host = "localhost"; // tcp://localhost:1883 - m_port = 1883; // Default port - m_clientid = ""; // No client id set - m_username = ""; // No username set - m_password = ""; // No password set - m_keepAlive = 30; // 30 seconds for keepalive - m_bCleanSession = false; // Do not start with a clean session + m_bConnected = false; // Not connected + m_bJsonMeasurementAdd = true; // Add measurement block to JSON publish event + m_bindInterface = ""; // No bind interface + m_mosq = nullptr; // No mosquitto connection + m_publish_format = jsonfmt; // Publish inm JSON if not configured to do something else + m_subscribe_format = autofmt; // Automatically detect payload format + m_bRun = true; // Run to the Hills... + m_host = "localhost"; // tcp://localhost:1883 + m_port = 1883; // Default port + m_clientid = ""; // No client id set + m_username = ""; // No username set + m_password = ""; // No password set + m_keepAlive = 30; // 30 seconds for keepalive + m_bCleanSession = false; // Do not start with a clean session + m_qos = 0; // No quality of service m_bTLS = false; m_tls_cafile = ""; @@ -846,9 +849,10 @@ vscpClientMqtt::initFromJson(const std::string &config) // {rnd} mustasch is replaces with hex random value spdlog::debug("json mqtt init: 'client id' set to {}.", m_clientid); if (m_clientid.length() > MQTT_MAX_CLIENTID_LENGTH) { - spdlog::warn("json mqtt init: 'client id' is to long {0} length={1} (Standard say max 23 characters but longer is OK with most brokers).", - m_clientid, - m_clientid.length()); + spdlog::warn("json mqtt init: 'client id' is to long {0} length={1} (Standard say max 23 characters but longer " + "is OK with most brokers).", + m_clientid, + m_clientid.length()); } } @@ -1001,21 +1005,49 @@ vscpClientMqtt::initFromJson(const std::string &config) if (jj.contains("guid-filter") && j["guid-filter"].is_string()) { std::string str = jj["guid-filter"].get(); vscp_getGuidFromStringToArray(m_filter.filter_GUID, str); - spdlog::debug("json mqtt init: 'guid-filter' set to {0:x}:{1:x}:{2:x}:{3:x}:{4:x}:{5:x}:{6:x}:{7:x}:{8:x}:{9:x}:{10:x}:{11:x}:{12:x}:{13:x}:{14:x}:{15:x}.", - m_filter.filter_GUID[0], m_filter.filter_GUID[1], m_filter.filter_GUID[2], m_filter.filter_GUID[3], - m_filter.filter_GUID[4], m_filter.filter_GUID[5], m_filter.filter_GUID[6], m_filter.filter_GUID[7], - m_filter.filter_GUID[8], m_filter.filter_GUID[9], m_filter.filter_GUID[10], m_filter.filter_GUID[11], - m_filter.filter_GUID[12], m_filter.filter_GUID[13], m_filter.filter_GUID[14], m_filter.filter_GUID[15]); + spdlog::debug( + "json mqtt init: 'guid-filter' set to " + "{0:x}:{1:x}:{2:x}:{3:x}:{4:x}:{5:x}:{6:x}:{7:x}:{8:x}:{9:x}:{10:x}:{11:x}:{12:x}:{13:x}:{14:x}:{15:x}.", + m_filter.filter_GUID[0], + m_filter.filter_GUID[1], + m_filter.filter_GUID[2], + m_filter.filter_GUID[3], + m_filter.filter_GUID[4], + m_filter.filter_GUID[5], + m_filter.filter_GUID[6], + m_filter.filter_GUID[7], + m_filter.filter_GUID[8], + m_filter.filter_GUID[9], + m_filter.filter_GUID[10], + m_filter.filter_GUID[11], + m_filter.filter_GUID[12], + m_filter.filter_GUID[13], + m_filter.filter_GUID[14], + m_filter.filter_GUID[15]); } if (jj.contains("guid-mask") && j["guid-mask"].is_string()) { std::string str = jj["guid-mask"].get(); vscp_getGuidFromStringToArray(m_filter.mask_GUID, str); - spdlog::debug("json mqtt init: 'guid-mask' set to {0:x}:{1:x}:{2:x}:{3:x}:{4:x}:{5:x}:{6:x}:{7:x}:{8:x}:{9:x}:{10:x}:{11:x}:{12:x}:{13:x}:{14:x}:{15:x}.", - m_filter.mask_GUID[0], m_filter.mask_GUID[1], m_filter.mask_GUID[2], m_filter.mask_GUID[3], - m_filter.mask_GUID[4], m_filter.mask_GUID[5], m_filter.mask_GUID[6], m_filter.mask_GUID[7], - m_filter.mask_GUID[8], m_filter.mask_GUID[9], m_filter.mask_GUID[10], m_filter.mask_GUID[11], - m_filter.mask_GUID[12], m_filter.mask_GUID[13], m_filter.mask_GUID[14], m_filter.mask_GUID[15]); + spdlog::debug( + "json mqtt init: 'guid-mask' set to " + "{0:x}:{1:x}:{2:x}:{3:x}:{4:x}:{5:x}:{6:x}:{7:x}:{8:x}:{9:x}:{10:x}:{11:x}:{12:x}:{13:x}:{14:x}:{15:x}.", + m_filter.mask_GUID[0], + m_filter.mask_GUID[1], + m_filter.mask_GUID[2], + m_filter.mask_GUID[3], + m_filter.mask_GUID[4], + m_filter.mask_GUID[5], + m_filter.mask_GUID[6], + m_filter.mask_GUID[7], + m_filter.mask_GUID[8], + m_filter.mask_GUID[9], + m_filter.mask_GUID[10], + m_filter.mask_GUID[11], + m_filter.mask_GUID[12], + m_filter.mask_GUID[13], + m_filter.mask_GUID[14], + m_filter.mask_GUID[15]); } } @@ -1174,14 +1206,14 @@ vscpClientMqtt::initFromJson(const std::string &config) the broker will not publish the message back to the client. */ if (std::string::npos != str.find("NO_LOCAL")) { -#ifndef WIN32 +#ifndef WIN32 #if LIBMOSQUITTO_MAJOR > 1 || (LIBMOSQUITTO_MAJOR == 1 && LIBMOSQUITTO_MINOR >= 6) v5_options |= MQTT_SUB_OPT_NO_LOCAL; #else v5_options |= 0x04; #endif #else - v5_options |= 0x04; // TODO Check if this is the right option + v5_options |= 0x04; // TODO Check if this is the right option #endif } /* @@ -1199,7 +1231,7 @@ vscpClientMqtt::initFromJson(const std::string &config) v5_options |= 0x08; #endif #else - v5_options |= 0x08; // TODO Check if this is the right option + v5_options |= 0x08; // TODO Check if this is the right option #endif } /* @@ -1217,7 +1249,7 @@ vscpClientMqtt::initFromJson(const std::string &config) v5_options |= 0x00; #endif #else - v5_options |= 0x00; // TODO Check if this is the right option + v5_options |= 0x00; // TODO Check if this is the right option #endif } /* @@ -1234,7 +1266,7 @@ vscpClientMqtt::initFromJson(const std::string &config) v5_options |= 0x10; #endif #else - v5_options |= 0x10; // TODO Check if this is the right option + v5_options |= 0x10; // TODO Check if this is the right option #endif } /* @@ -1250,7 +1282,7 @@ vscpClientMqtt::initFromJson(const std::string &config) v5_options |= 0x20; #endif #else - v5_options |= 0x20; // TODO Check if this is the right option + v5_options |= 0x20; // TODO Check if this is the right option #endif } @@ -1439,8 +1471,8 @@ vscpClientMqtt::handleMessage(const struct mosquitto_message *pmsg) else { // String: send 0,20,3,,,,0:1:2:3:4:5:6:7:8:9:10:11:12:13:14:15,0,1,35 std::string s = (char *) pmsg->payload; - size_t n = std::count(s.begin(), s.end(), ','); - if (n<6) { + size_t n = std::count(s.begin(), s.end(), ','); + if (n < 6) { // This is not a VSCP event on string format spdlog::trace("Payload is not VSCP event."); return false; @@ -1601,7 +1633,7 @@ vscpClientMqtt::handleMessage(const struct mosquitto_message *pmsg) spdlog::critical("Memory problem."); return false; } - + pEvent->pdata = nullptr; if (!vscp_copyEvent(pEvent, &ev)) { @@ -1733,7 +1765,7 @@ vscpClientMqtt::init(void) #if LIBMOSQUITTO_MAJOR > 1 || (LIBMOSQUITTO_MAJOR == 1 && LIBMOSQUITTO_MINOR >= 6) if (MOSQ_ERR_SUCCESS != mosquitto_will_set_v5(m_mosq, strTopic.c_str(), - (int)m_will_payload.length(), + (int) m_will_payload.length(), m_will_payload.c_str(), m_will_qos, m_will_bretain, @@ -1752,7 +1784,7 @@ vscpClientMqtt::init(void) else { if (MOSQ_ERR_SUCCESS != mosquitto_will_set(m_mosq, strTopic.c_str(), - (int)m_will_payload.length(), + (int) m_will_payload.length(), m_will_payload.c_str(), m_will_qos, m_will_bretain)) { @@ -1986,8 +2018,7 @@ vscpClientMqtt::connect(void) strTopic = subtemplate.render(data); } - spdlog::trace("Publish will: Topic: {0} Payload: empty qos=1 retain=true", - strTopic); + spdlog::trace("Publish will: Topic: {0} Payload: empty qos=1 retain=true", strTopic); if (MOSQ_ERR_SUCCESS != (rv = mosquitto_publish(m_mosq, NULL, // msg id @@ -2007,6 +2038,16 @@ vscpClientMqtt::connect(void) subscribeTopic *psubtopic = (*it); + /*! + if the subscription is not active we do not subscribe. + Normally all are active but tools like vscp-works-qt (session) + can inactivate sessions temporarily. This is never saved + persistently. + */ + if (!psubtopic->isActive()) { + continue; + } + // Fix subscribe topics mustache subtemplate{ psubtopic->getTopic() }; data data; @@ -2014,10 +2055,7 @@ vscpClientMqtt::connect(void) std::string subscribe_topic = subtemplate.render(data); // Subscribe to specified topic - rv = mosquitto_subscribe(m_mosq, - /*m_mqtt_id*/ nullptr, - subscribe_topic.c_str(), - m_qos); + rv = mosquitto_subscribe(m_mosq, nullptr, subscribe_topic.c_str(), m_qos); if (MOSQ_ERR_SUCCESS != rv) { spdlog::error("Failed to subscribed to topic '{0}' - rv={1} {2}.", subscribe_topic, rv, mosquitto_strerror(rv)); } @@ -2026,23 +2064,23 @@ vscpClientMqtt::connect(void) // Start worker thread if a callback has been defined if ((nullptr != m_evcallback) || (nullptr != m_excallback)) { int rv = pthread_create(&m_tid, nullptr, workerThread, this); - switch(rv) { + switch (rv) { - case EAGAIN: - spdlog::error("Failed to start MQTT callback thread - Insufficient resources to create another thread."); - break; + case EAGAIN: + spdlog::error("Failed to start MQTT callback thread - Insufficient resources to create another thread."); + break; - case EINVAL: - spdlog::error("Failed to start MQTT callback thread - Invalid settings in attr"); - break; + case EINVAL: + spdlog::error("Failed to start MQTT callback thread - Invalid settings in attr"); + break; - case EPERM: - spdlog::error("Failed to start MQTT callback thread - No permission to set the scheduling policy"); - break; + case EPERM: + spdlog::error("Failed to start MQTT callback thread - No permission to set the scheduling policy"); + break; - default: - spdlog::debug("Started MQTT callback thread"); - break; + default: + spdlog::debug("Started MQTT callback thread"); + break; } } @@ -2109,6 +2147,17 @@ vscpClientMqtt::send(vscpEvent &ev) ++it) { publishTopic *ppublish = (*it); + + /*! + if the publish item is not active we do not publish. + Normally all are active but tools like vscp-works-qt (session) + can inactivate sessions temporarily. This is never saved + persistently. + */ + if (!ppublish->isActive()) { + continue; + } + memset(payload, 0, sizeof(payload)); if (ppublish->getFormat() == jsonfmt) { @@ -2198,7 +2247,6 @@ vscpClientMqtt::send(vscpEvent &ev) mustache subtemplate{ topic_template }; data data; cguid evguid(ev.GUID); // Event GUID - // Event GUID data.set("guid", evguid.getAsString()); @@ -2237,7 +2285,6 @@ vscpClientMqtt::send(vscpEvent &ev) data.set(vscp_str_format("xifguid[%d]", i), vscp_str_format("%02X", m_ifguid.getAt(i))); } - // Event data data.set("sizedata", vscp_str_format("%d", ev.sizeData)); data.set("xsizedata", vscp_str_format("%04X", ev.sizeData)); @@ -2253,7 +2300,6 @@ vscpClientMqtt::send(vscpEvent &ev) data.set("class", vscp_str_format("%d", ev.vscp_class)); data.set("type", vscp_str_format("%d", ev.vscp_type)); - data.set("xclass", vscp_str_format("%02x", ev.vscp_class)); data.set("xtype", vscp_str_format("%02x", ev.vscp_type)); @@ -2349,10 +2395,10 @@ vscpClientMqtt::send(vscpEvent &ev) } spdlog::trace("Publish send ev: Topic: {0} qos={1} retain={2}", - strTopic, - /*(unsigned char *)payload,*/ - ppublish->getQos(), - ppublish->getRetain()); + strTopic, + /*(unsigned char *)payload,*/ + ppublish->getQos(), + ppublish->getRetain()); if (MOSQ_ERR_SUCCESS != (rv = mosquitto_publish(m_mosq, NULL, // msg id @@ -2476,7 +2522,7 @@ vscpClientMqtt::send(vscpEventEx &ex) mustache subtemplate{ topic_template }; data data; cguid evguid(ex.GUID); // Event GUID - + // Event GUID data.set("guid", evguid.getAsString()); @@ -2636,10 +2682,10 @@ vscpClientMqtt::send(vscpEventEx &ex) } spdlog::trace("Publish send ex: Topic: {0} qos={1} retain={2}", - strTopic, - /*(unsigned char *)payload,*/ - ppublish->getQos(), - ppublish->getRetain()); + strTopic, + /*(unsigned char *)payload,*/ + ppublish->getQos(), + ppublish->getRetain()); if (MOSQ_ERR_SUCCESS != (rv = mosquitto_publish(m_mosq, NULL, // msg id diff --git a/src/vscp/common/vscp_client_mqtt.h b/src/vscp/common/vscp_client_mqtt.h index a422228df..45412a348 100644 --- a/src/vscp/common/vscp_client_mqtt.h +++ b/src/vscp/common/vscp_client_mqtt.h @@ -223,11 +223,16 @@ class publishTopic { bool isRetain(void) { return m_bRetain; }; void setRetain(bool bRetain) { m_bRetain = bRetain; }; + /// Getters/setters for bActive + bool isActive(void) { return m_bActive; }; + void setActive(bool bActive) { m_bActive = bActive; }; + /// getters/setter for format void setFormat(enumMqttMsgFormat format) { m_format = format; }; enumMqttMsgFormat getFormat(void) { return m_format; }; private: + /// Publish topic std::string m_topic; @@ -240,6 +245,13 @@ class publishTopic { /// publish topic message retain flag bool m_bRetain; + /*! + Used by routines that allow to deactivate publish topics + on the fly. This is used in the vscp-works-qt session window + True on default !!! NOT SAVED !!! + */ + bool m_bActive; + /*! Publish format json (default), xml, string, binary */ @@ -275,6 +287,10 @@ class subscribeTopic { int getQos(void) { return m_qos; }; void setQos(int qos) { m_qos = qos; }; + /// Getters/setters for bActive + bool isActive(void) { return m_bActive; }; + void setActive(bool bActive) { m_bActive = bActive; }; + /// getters/setter for format void setFormat(enumMqttMsgFormat format) { m_format = format; }; enumMqttMsgFormat getFormat(void) { return m_format; }; @@ -292,6 +308,13 @@ class subscribeTopic { /// version 5 options int m_v5_options; + /*! + Used by routines that allow to deactivate subscriptions + on the fly. This is used in the vscp-works-qt session window + True on default !!! NOT SAVED !!! + */ + bool m_bActive; + /*! Subscribe format auto(default),json,xml,string,binary */ @@ -623,6 +646,16 @@ class vscpClientMqtt : public CVscpClient { */ void setFuncParentCallbackMessage(LPFN_PARENT_CALLBACK_MESSAGE func) { m_parentCallbackMessage = func; }; + /*! + Get pointer to subscribe topic list + */ + std::list *getSubscribeList(void) { return &m_mqtt_subscribeTopicList; }; + + /*! + Get pointer to publish topic list + */ + std::list *getPublishList(void) { return &m_mqtt_publishTopicList; }; + public: // Timeout in milliseconds for host connection. uint32_t m_timeoutConnection;