diff --git a/src/vscp/common/vscp-client-mqtt.cpp b/src/vscp/common/vscp-client-mqtt.cpp index fe2e8c887..cde1ca334 100644 --- a/src/vscp/common/vscp-client-mqtt.cpp +++ b/src/vscp/common/vscp-client-mqtt.cpp @@ -55,15 +55,13 @@ #include #include -#include +#include // for convenience using json = nlohmann::json; using namespace kainjow::mustache; // Forward declaration -static void * -workerThread(void *pObj); // ---------------------------------------------------------------------------- @@ -544,6 +542,7 @@ publishTopic::publishTopic(const std::string &topic, enumMqttMsgFormat format, i publishTopic::~publishTopic() { + #if LIBMOSQUITTO_MAJOR > 1 || (LIBMOSQUITTO_MAJOR == 1 && LIBMOSQUITTO_MINOR >= 6) mosquitto_property_free_all(&m_properties); #endif @@ -607,8 +606,8 @@ vscpClientMqtt::vscpClientMqtt(void) m_mapMqttIntOptions["send-maximum"] = 20; #ifndef WIN32 - m_tid = 0; // pthread -#endif + m_tid = 0; // pthread +#endif m_bConnected = false; // Not connected m_bJsonMeasurementAdd = true; // Add measurement block to JSON publish event m_bindInterface = ""; // No bind interface @@ -662,7 +661,10 @@ vscpClientMqtt::vscpClientMqtt(void) return; } - pthread_mutex_init(&m_mutexif, nullptr); + sem_init(&m_semReceiveQueue, 0, 0); + + pthread_mutex_init(&m_mutexif, NULL); + pthread_mutex_init(&m_mutexReceiveQueue, NULL); } /////////////////////////////////////////////////////////////////////////////// @@ -681,7 +683,10 @@ vscpClientMqtt::~vscpClientMqtt() // Clean up the lib mosquitto_lib_cleanup(); + sem_destroy(&m_semReceiveQueue); + pthread_mutex_destroy(&m_mutexif); + pthread_mutex_destroy(&m_mutexReceiveQueue); vscpEvent *pev = nullptr; while (m_receiveQueue.size()) { @@ -1284,8 +1289,8 @@ vscpClientMqtt::initFromJson(const std::string &config) addSubscription(topic, format, qos, v5_options); } // object - } // for - } // sub + } // for + } // sub if (j.contains("bescape-pub-topics") && j["bescape-pub-topics"].is_boolean()) { m_bEscapesPubTopics = j["bescape-pub-topics"].get(); @@ -1374,8 +1379,8 @@ vscpClientMqtt::initFromJson(const std::string &config) addPublish(topic, format, qos, bretain); } // obj - } // for - } // pub + } // for + } // pub // v5 if (j.contains("v5") && j["v5"].is_object()) { @@ -1517,7 +1522,10 @@ vscpClientMqtt::handleMessage(const struct mosquitto_message *pmsg) // Save event in incoming queue if (m_receiveQueue.size() < MQTT_MAX_INQUEUE_SIZE) { + pthread_mutex_lock(&m_mutexReceiveQueue); m_receiveQueue.push_back(pEvent); + sem_post(&m_semReceiveQueue); + pthread_mutex_unlock(&m_mutexReceiveQueue); } } } @@ -1556,7 +1564,10 @@ vscpClientMqtt::handleMessage(const struct mosquitto_message *pmsg) // Save event in incoming queue if (m_receiveQueue.size() < MQTT_MAX_INQUEUE_SIZE) { + pthread_mutex_lock(&m_mutexReceiveQueue); m_receiveQueue.push_back(pEvent); + sem_post(&m_semReceiveQueue); + pthread_mutex_unlock(&m_mutexReceiveQueue); } } } @@ -1595,7 +1606,10 @@ vscpClientMqtt::handleMessage(const struct mosquitto_message *pmsg) // Save event in incoming queue if (m_receiveQueue.size() < MQTT_MAX_INQUEUE_SIZE) { + pthread_mutex_lock(&m_mutexReceiveQueue); m_receiveQueue.push_back(pEvent); + sem_post(&m_semReceiveQueue); + pthread_mutex_unlock(&m_mutexReceiveQueue); } } } @@ -1637,7 +1651,10 @@ vscpClientMqtt::handleMessage(const struct mosquitto_message *pmsg) // Save event in incoming queue if (m_receiveQueue.size() < MQTT_MAX_INQUEUE_SIZE) { + pthread_mutex_lock(&m_mutexReceiveQueue); m_receiveQueue.push_back(pEvent); + sem_post(&m_semReceiveQueue); + pthread_mutex_unlock(&m_mutexReceiveQueue); } } } @@ -2076,30 +2093,6 @@ vscpClientMqtt::connect(void) } } - // Start worker thread if a callback has been defined - if (isCallbackEvActive()|| isCallbackExActive()) { - int rv = pthread_create(&m_tid, nullptr, workerThread, this); - switch (rv) { - - case EAGAIN: - spdlog::error( - "MQTT CLIENT: Failed to start MQTT callback thread - Insufficient resources to create another thread."); - break; - - case EINVAL: - spdlog::error("MQTT CLIENT: Failed to start MQTT callback thread - Invalid settings in attr"); - break; - - case EPERM: - spdlog::error("MQTT CLIENT: Failed to start MQTT callback thread - No permission to set the scheduling policy"); - break; - - default: - spdlog::debug("MQTT CLIENT: Started MQTT callback thread"); - break; - } - } - return VSCP_ERROR_SUCCESS; } @@ -2219,7 +2212,7 @@ vscpClientMqtt::send(vscpEvent &ev) spdlog::error("MQTT CLIENT: sendEvent: Failed to add measurement info to event."); } } // OK to insert extra info - } // is measurement + } // is measurement lenPayload = strPayload.length(); strncpy((char *) payload, strPayload.c_str(), lenPayload); @@ -2477,7 +2470,7 @@ vscpClientMqtt::send(vscpEventEx &ex) spdlog::error("MQTT CLIENT: sendEvent: Failed to add measurement info to event."); } } // OK to insert extra info - } // is measurement + } // is measurement lenPayload = strPayload.length(); strncpy((char *) payload, strPayload.c_str(), sizeof(payload)); @@ -3003,58 +2996,3 @@ win_usleep(__int64 usec) CloseHandle(timer); } #endif - -/////////////////////////////////////////////////////////////////////////////// -// Workerthread -// -// This thread call the appropriate callback when events are received -// - -static void * -workerThread(void *pObj) -{ - //uint8_t guid[] = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }; - vscpClientMqtt *pClient = (vscpClientMqtt *) pObj; - // if (nullptr == pif) return nullptr; - - while (pClient->m_bRun) { - - // pthread_mutex_lock(&pif->m_mutexif); - - // Check if there are events to fetch - // int cnt; - /* if ((cnt = pClient->m_canalif.CanalDataAvailable())) { - - while (cnt) { - canalMsg msg; - if ( CANAL_ERROR_SUCCESS == - pClient->m_canalif.CanalReceive(&msg) ) { if ( nullptr != - pClient->m_evcallback ) { vscpEvent ev; if - (vscp_convertCanalToEvent(&ev, &msg, guid) ) { - pClient->m_evcallback(ev); - } - } - if ( nullptr != pClient->m_excallback ) { - vscpEventEx ex; - if (vscp_convertCanalToEventEx(&ex, - &msg, - guid) ) { - pClient->m_excallback(ex); - } - } - } - cnt--; - } - - } */ - - // pthread_mutex_unlock(&pif->m_mutexif); -#ifndef WIN32 - usleep(200); -#else - win_usleep(200); -#endif - } - - return nullptr; -} diff --git a/src/vscp/common/vscp-client-mqtt.h b/src/vscp/common/vscp-client-mqtt.h index 2fcc5b997..2986eb3b8 100644 --- a/src/vscp/common/vscp-client-mqtt.h +++ b/src/vscp/common/vscp-client-mqtt.h @@ -405,7 +405,7 @@ class vscpClientMqtt : public CVscpClient { @param timeout Timeout in milliseconds. Default is 100 ms. @return Return VSCP_ERROR_SUCCESS of OK and error code else. */ - virtual int receiveBlocking(vscpEvent &ev, long timeout = 100 ); + virtual int receiveBlocking(vscpEvent &ev, long timeout = 100); /*! Receive VSCP event ex from remote host @@ -419,7 +419,7 @@ class vscpClientMqtt : public CVscpClient { @param timeout Timeout in milliseconds. Default is 100 ms. @return Return VSCP_ERROR_SUCCESS of OK and error code else. */ - virtual int receiveBlocking(vscpEventEx &ex, long timeout = 100 ); + virtual int receiveBlocking(vscpEventEx &ex, long timeout = 100); /*! Receive CAN(AL) message from remote host @@ -728,6 +728,7 @@ class vscpClientMqtt : public CVscpClient { Mutex that protect CANAL interface when callbacks are defined */ pthread_mutex_t m_mutexif; + pthread_mutex_t m_mutexReceiveQueue; /*! Event object to indicate that there is an event in the diff --git a/src/vscp/common/vscp-client-tcp.cpp b/src/vscp/common/vscp-client-tcp.cpp index 9b13349b2..5d707b7e8 100644 --- a/src/vscp/common/vscp-client-tcp.cpp +++ b/src/vscp/common/vscp-client-tcp.cpp @@ -503,8 +503,9 @@ vscpClientTcp::setfilter(vscpEventFilter &filter) int vscpClientTcp::getcount(uint16_t *pcount) { - if (NULL == pcount) + if (NULL == pcount) { return VSCP_ERROR_INVALID_POINTER; + } *pcount = m_tcp.doCmdDataAvailable(); return VSCP_ERROR_SUCCESS; } @@ -621,8 +622,9 @@ workerThread(vscpClientTcp *pObj) ev.pdata = NULL; // Check pointer - if (nullptr == pObj) + if (nullptr == pObj) { return; + } VscpRemoteTcpIf *m_pifReceive = pObj->getTcpReceive(); diff --git a/src/vscp/common/vscphelper.cpp b/src/vscp/common/vscphelper.cpp index 3dd5c2cc8..93584e7be 100644 --- a/src/vscp/common/vscphelper.cpp +++ b/src/vscp/common/vscphelper.cpp @@ -275,6 +275,22 @@ vscp_sem_wait(sem_t *sem, uint32_t waitms) } #endif +#ifdef WIN32 +static void +vscp_usleep(__int64 usec) +{ + HANDLE timer; + LARGE_INTEGER ft; + + ft.QuadPart = -(10 * usec); // Convert to 100 nanosecond interval, negative value indicates relative time + + timer = CreateWaitableTimer(nullptr, TRUE, nullptr); + SetWaitableTimer(timer, &ft, 0, nullptr, nullptr, 0); + WaitForSingleObject(timer, INFINITE); + CloseHandle(timer); +} +#endif + /////////////////////////////////////////////////////////////////////////////// // vscp_almostEqualRelativeFloat // diff --git a/src/vscp/common/vscphelper.h b/src/vscp/common/vscphelper.h index 936190803..0bfbac228 100644 --- a/src/vscp/common/vscphelper.h +++ b/src/vscp/common/vscphelper.h @@ -262,6 +262,15 @@ int vscp_sem_wait(sem_t *sem, uint32_t waitms); #endif +/*! + Sleep for a number of microseconds + @param usec Number of microseconds to sleep +*/ +#ifdef WIN32 +static void +vscp_usleep(__int64 usec); +#endif + /* * Check two floats for equality * @param A Float to compare