diff --git a/src/vscp/common/vscp-client-canal.cpp b/src/vscp/common/vscp-client-canal.cpp index dbaf4900f..3ddd60980 100644 --- a/src/vscp/common/vscp-client-canal.cpp +++ b/src/vscp/common/vscp-client-canal.cpp @@ -38,16 +38,14 @@ #include "vscp-client-canal.h" #include "vscphelper.h" - - -//#include +// #include #include // Needs C++11 -std=c++11 // for convenience using json = nlohmann::json; -//using namespace kainjow::mustache; +// using namespace kainjow::mustache; -//#include +// #include #include // Forward declaration @@ -64,7 +62,17 @@ vscpClientCanal::vscpClientCanal() m_bConnected = false; // Not connected // m_tid = 0; m_bRun = true; + + vscp_clearVSCPFilter(&m_filterIn); // Accept all events + pthread_mutex_init(&m_mutexif, NULL); + pthread_mutex_init(&m_mutexReceiveQueue, NULL); + + // sem_init(&m_semSendQueue, 0, 0); + sem_init(&m_semReceiveQueue, 0, 0); + + // pthread_mutex_init(&m_mutexSendQueue, NULL); + pthread_mutex_init(&m_mutexReceiveQueue, NULL); spdlog::trace("CANAL CLIENT: constructor vscp_client_canal object."); } @@ -76,7 +84,19 @@ vscpClientCanal::vscpClientCanal() vscpClientCanal::~vscpClientCanal() { disconnect(); + + sem_destroy(&m_semReceiveQueue); + pthread_mutex_destroy(&m_mutexif); + pthread_mutex_destroy(&m_mutexReceiveQueue); + + // Clear the input queue (if needed) + while (m_receiveQueue.size()) { + vscpEvent *pev = m_receiveQueue.front(); + m_receiveQueue.pop_front(); + vscp_deleteEvent(pev); + } + spdlog::trace("CANAL CLIENT: destructor vscp_client_canal object."); } @@ -133,13 +153,13 @@ vscpClientCanal::initFromJson(const std::string &config) spdlog::error("CANAL CLIENT: JSON init: Name must be set."); return false; // Must be set } - spdlog::debug("CANAL CLIENT: JSON init: name={}.", (std::string)j["name"]); + spdlog::debug("CANAL CLIENT: JSON init: name={}.", (std::string) j["name"]); if (!j["path"].is_string()) { spdlog::error("CANAL CLIENT: JSON init: Path must be set."); return false; // Must be set } - spdlog::debug("CANAL CLIENT: JSON init: path={}.", (std::string)j["path"]); + spdlog::debug("CANAL CLIENT: JSON init: path={}.", (std::string) j["path"]); if (j.contains("config")) { if (!j["config"].is_string()) { @@ -150,7 +170,7 @@ vscpClientCanal::initFromJson(const std::string &config) else { j["config"] = ""; // Set default } - spdlog::debug("CANAL CLIENT: JSON init: config=\"{}\".", (std::string)j["config"]); + spdlog::debug("CANAL CLIENT: JSON init: config=\"{}\".", (std::string) j["config"]); if (j.contains("flags")) { if (j["flags"].is_string()) { @@ -160,7 +180,7 @@ vscpClientCanal::initFromJson(const std::string &config) else { j["flags"] = 0; // Set default } - spdlog::debug("CANAL CLIENT: JSON init: flags={}.", (uint32_t)j["flags"]); + spdlog::debug("CANAL CLIENT: JSON init: flags={}.", (uint32_t) j["flags"]); if (j.contains("datarate")) { if (j["datarate"].is_string()) { @@ -170,7 +190,7 @@ vscpClientCanal::initFromJson(const std::string &config) else { j["datarate"] = 0; // Set default } - spdlog::debug("CANAL CLIENT: JSON init: datarate={}.", (int)j["datarate"]); + spdlog::debug("CANAL CLIENT: JSON init: datarate={}.", (int) j["datarate"]); setName(j["name"]); return (init(j["path"], j["config"], j["flags"], j["datarate"])); @@ -358,8 +378,8 @@ int vscpClientCanal::receive(canalMsg &msg) { int rv; - //canalMsg canalMsg; - //uint8_t guid[] = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }; + // canalMsg canalMsg; + // uint8_t guid[] = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }; spdlog::debug("CANAL CLIENT: Poll for event ex."); @@ -492,7 +512,7 @@ vscpClientCanal::clear() int vscpClientCanal::getversion(uint8_t * /*pmajor*/, uint8_t * /*pminor*/, uint8_t * /*prelease*/, uint8_t * /*pbuild*/) { - //uint32_t ver = m_canalif.CanalGetDllVersion(); + // uint32_t ver = m_canalif.CanalGetDllVersion(); return VSCP_ERROR_SUCCESS; } @@ -502,7 +522,7 @@ vscpClientCanal::getversion(uint8_t * /*pmajor*/, uint8_t * /*pminor*/, uint8_t // int -vscpClientCanal::getinterfaces(std::deque & /*iflist*/ ) +vscpClientCanal::getinterfaces(std::deque & /*iflist*/) { // No interfaces available return VSCP_ERROR_SUCCESS; @@ -573,7 +593,7 @@ vscpClientCanal::setCallbackEv(std::function ca } spdlog::debug("CANAL CLIENT: ev callback set."); - CVscpClient::setCallbackEv(callback, pData); + CVscpClient::setCallbackEv(callback, pData); return VSCP_ERROR_SUCCESS; } @@ -620,6 +640,7 @@ win_usleep(__int64 usec) static void * workerThread(void *pObj) { + canalMsg msg; uint8_t guid[] = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }; vscpClientCanal *pClient = (vscpClientCanal *) pObj; VscpCanalDeviceIf *pif = (VscpCanalDeviceIf *) &(pClient->m_canalif); @@ -631,48 +652,76 @@ workerThread(void *pObj) while (pClient->m_bRun) { - spdlog::trace("CANAL CLIENT: worktread start."); - - pthread_mutex_lock(&pClient->m_mutexif); + spdlog::trace("CANAL CLIENT: workertread start."); // Check if there are events to fetch - int cnt; - if ((cnt = pClient->m_canalif.CanalDataAvailable())) { - - while (cnt) { - canalMsg msg; - if (CANAL_ERROR_SUCCESS == pClient->m_canalif.CanalReceive(&msg)) { - spdlog::debug("CANAL CLIENT: workthread. Event recived"); - if (pClient->isCallbackEvActive()) { - vscpEvent ev; - if (vscp_convertCanalToEvent(&ev, &msg, guid)) { - spdlog::trace("CANAL CLIENT: workthread. Event sent to ev callback"); - pClient->m_callbackev(ev, pClient->getCallbackObj()); - } + // int cnt; + // if ((cnt = pClient->m_canalif.CanalDataAvailable())) { + + // while (cnt) { + + + + if (CANAL_ERROR_SUCCESS == pClient->m_canalif.CanalBlockingReceive(&msg, 100)) { + + spdlog::trace("CANAL CLIENT: workthread. Event received"); + + pthread_mutex_lock(&pClient->m_mutexif); + + if (pClient->isCallbackEvActive()) { + vscpEvent ev; + if (vscp_convertCanalToEvent(&ev, &msg, guid)) { + if (vscp_doLevel2Filter(&ev, &pClient->m_filterIn)) { + spdlog::trace("CANAL CLIENT: workthread. Event sent to ev callback"); + pClient->m_callbackev(ev, pClient->getCallbackObj()); } - if (pClient->isCallbackExActive()) { - vscpEventEx ex; - if (vscp_convertCanalToEventEx(&ex, &msg, guid)) { - spdlog::trace("CANAL CLIENT: workthread. Event sent to ex callback"); - pClient->m_callbackex(ex, pClient->getCallbackObj()); - } + } + } + else if (pClient->isCallbackExActive()) { + vscpEventEx ex; + if (vscp_convertCanalToEventEx(&ex, &msg, guid)) { + if (vscp_doLevel2FilterEx(&ex, &pClient->m_filterIn)) { + spdlog::trace("CANAL CLIENT: workthread. Event sent to ex callback"); + pClient->m_callbackex(ex, pClient->getCallbackObj()); } } - cnt--; } - } + else { - pthread_mutex_unlock(&pClient->m_mutexif); + // No callback defined so save event in incoming queue -#ifndef WIN32 - usleep(200); -#else - win_usleep(200); -#endif + vscpEvent *pev = new vscpEvent; + if (nullptr == pev) { + spdlog::critical("CANAL CLIENT: Memory problem."); + return NULL; + } + if (vscp_convertCanalToEvent(pev, &msg, guid)) { + if (vscp_doLevel2Filter(pev, &pClient->m_filterIn)) { + pthread_mutex_lock(&pClient->m_mutexReceiveQueue); + pClient->m_receiveQueue.push_back(pev); + sem_post(&pClient->m_semReceiveQueue); + pthread_mutex_unlock(&pClient->m_mutexReceiveQueue); + } + } + else { + vscp_deleteEvent(pev); + } + } - spdlog::trace("CANAL CLIENT: worktread end."); + pthread_mutex_unlock(&pClient->m_mutexif); + } // message received } // while + // cnt--; + //} + + // #ifndef WIN32 + // usleep(200); + // #else + // win_usleep(200); + // #endif + + spdlog::trace("CANAL CLIENT: workthread end."); return NULL; } diff --git a/src/vscp/common/vscp-client-canal.h b/src/vscp/common/vscp-client-canal.h index fdce00b8e..2aa6260b7 100644 --- a/src/vscp/common/vscp-client-canal.h +++ b/src/vscp/common/vscp-client-canal.h @@ -27,13 +27,14 @@ #define VSCPCLIENTCANAL_H__INCLUDED_ #include "vscp.h" +#include #include "vscp-client-base.h" #include "vscpcanaldeviceif.h" #include // When a callback is set and connect is called this object is shared -// with a workerthread that +// with a worker thread that class vscpClientCanal : public CVscpClient { @@ -226,10 +227,28 @@ class vscpClientCanal : public CVscpClient { // Mutex that protect CANAL interface when callbacks are defined pthread_mutex_t m_mutexif; + pthread_mutex_t m_mutexReceiveQueue; + + /*! + Event object to indicate that there is an event in the + output queue + */ + sem_t m_semReceiveQueue; + + /*! + If no callback is defined received events are connected in + this queue + */ + std::deque m_receiveQueue; // CANAL functionality VscpCanalDeviceIf m_canalif; + /*! + Receive filter + */ + vscpEventFilter m_filterIn; + private: /*! True of dll connection is open diff --git a/src/vscp/common/vscp-client-mqtt.cpp b/src/vscp/common/vscp-client-mqtt.cpp index cde1ca334..c2445c799 100644 --- a/src/vscp/common/vscp-client-mqtt.cpp +++ b/src/vscp/common/vscp-client-mqtt.cpp @@ -692,7 +692,7 @@ vscpClientMqtt::~vscpClientMqtt() while (m_receiveQueue.size()) { pev = m_receiveQueue.front(); m_receiveQueue.pop_front(); - vscp_deleteEvent_v2(&pev); + vscp_deleteEvent(pev); } // Delete subscription objects diff --git a/src/vscp/common/vscp-client-socketcan.cpp b/src/vscp/common/vscp-client-socketcan.cpp index aae80fd4b..23a37a346 100644 --- a/src/vscp/common/vscp-client-socketcan.cpp +++ b/src/vscp/common/vscp-client-socketcan.cpp @@ -153,6 +153,7 @@ vscpClientSocketCan::vscpClientSocketCan() vscpClientSocketCan::~vscpClientSocketCan() { disconnect(); + pthread_mutex_destroy(&m_mutexSocket); sem_destroy(&m_semSendQueue); @@ -160,6 +161,13 @@ vscpClientSocketCan::~vscpClientSocketCan() pthread_mutex_destroy(&m_mutexSendQueue); pthread_mutex_destroy(&m_mutexReceiveQueue); + + // Clear the input queue (if needed) + while (m_receiveList.size()) { + vscpEvent *pev = m_receiveList.front(); + m_receiveList.pop_front(); + vscp_deleteEvent(pev); + } } /////////////////////////////////////////////////////////////////////////////// @@ -167,7 +175,10 @@ vscpClientSocketCan::~vscpClientSocketCan() // int -vscpClientSocketCan::init(const std::string &interface, const std::string &guid, unsigned long flags, uint32_t /*timeout*/) +vscpClientSocketCan::init(const std::string &interface, + const std::string &guid, + unsigned long flags, + uint32_t /*timeout*/) { m_interface = interface; m_guid.getFromString(guid); @@ -634,7 +645,7 @@ vscpClientSocketCan::setfilter(vscpEventFilter & /*filter*/) // ((unsigned long) filter.mask_type << 8) | filter.mask_GUID[0]; // m_canalif.CanalSetMask(_mask); - return rv; + return rv; } /////////////////////////////////////////////////////////////////////////////// @@ -663,7 +674,10 @@ vscpClientSocketCan::clear() // int -vscpClientSocketCan::getversion(uint8_t * /*pmajor*/, uint8_t * /*pminor*/, uint8_t * /*prelease*/, uint8_t * /*pbuild*/) +vscpClientSocketCan::getversion(uint8_t * /*pmajor*/, + uint8_t * /*pminor*/, + uint8_t * /*prelease*/, + uint8_t * /*pbuild*/) { // uint32_t ver = m_canalif.CanalGetDllVersion(); @@ -675,7 +689,7 @@ vscpClientSocketCan::getversion(uint8_t * /*pmajor*/, uint8_t * /*pminor*/, uint // int -vscpClientSocketCan::getinterfaces(std::deque &/*iflist*/) +vscpClientSocketCan::getinterfaces(std::deque & /*iflist*/) { // No interfaces available return VSCP_ERROR_SUCCESS; @@ -792,13 +806,13 @@ workerThread(void *pData) { int mtu, enable_canfd = 1; fd_set rdfs; - //struct timeval tv; + // struct timeval tv; struct sockaddr_can addr; struct ifreq ifr; - //struct cmsghdr *cmsg; + // struct cmsghdr *cmsg; struct canfd_frame frame; - //char ctrlmsg[CMSG_SPACE(sizeof(struct timeval)) + CMSG_SPACE(sizeof(__u32))]; - //const int canfd_on = 1; + // char ctrlmsg[CMSG_SPACE(sizeof(struct timeval)) + CMSG_SPACE(sizeof(__u32))]; + // const int canfd_on = 1; vscpClientSocketCan *pObj = (vscpClientSocketCan *) pData; if (NULL == pObj) { diff --git a/src/vscp/common/vscp-client-tcp.cpp b/src/vscp/common/vscp-client-tcp.cpp index 5d707b7e8..ed2308b7a 100644 --- a/src/vscp/common/vscp-client-tcp.cpp +++ b/src/vscp/common/vscp-client-tcp.cpp @@ -73,11 +73,11 @@ vscpClientTcp::~vscpClientTcp() pthread_mutex_destroy(&m_mutexReceiveQueue); // Clear the input queue (if needed) - // while (m_inputQue.size()) { - // vscpEvent *pev = m_inputQue.front(); - // m_inputQue.pop_front(); - // vscp_deleteEvent(pev); - // } + while (m_receiveList.size()) { + vscpEvent *pev = m_receiveList.front(); + m_receiveList.pop_front(); + vscp_deleteEvent(pev); + } } ///////////////////////////////////////////////////////////////////////////////