diff --git a/src/vscp/common/vscp-client-socketcan.cpp b/src/vscp/common/vscp-client-socketcan.cpp index f45dc1699..aae80fd4b 100644 --- a/src/vscp/common/vscp-client-socketcan.cpp +++ b/src/vscp/common/vscp-client-socketcan.cpp @@ -882,7 +882,7 @@ workerThread(void *pData) setsockopt(pObj->m_socket, SOL_SOCKET, SO_RCVTIMEO, (const char *) &tv, sizeof(struct timeval)); if (bind(pObj->m_socket, (struct sockaddr *) &addr, sizeof(addr)) < 0) { - spdlog::error("SOCKETCAN client: wrkthread socketcan client: Error in socket bind. Terminating!"); + spdlog::error("SOCKETCAN client: workthread socketcan client: Error in socket bind. Terminating!"); close(pObj->m_socket); sleep(2); // continue; diff --git a/src/vscp/common/vscp-client-tcp.cpp b/src/vscp/common/vscp-client-tcp.cpp index 577f23537..9b13349b2 100644 --- a/src/vscp/common/vscp-client-tcp.cpp +++ b/src/vscp/common/vscp-client-tcp.cpp @@ -48,6 +48,14 @@ vscpClientTcp::vscpClientTcp() m_guidif.clear(); m_bPolling = false; m_obid = 0; + + vscp_clearVSCPFilter(&m_filterIn); // Accept all events + vscp_clearVSCPFilter(&m_filterOut); // Send all events + + sem_init(&m_semReceiveQueue, 0, 0); + + pthread_mutex_init(&m_mutexTcpIpObject, NULL); + pthread_mutex_init(&m_mutexReceiveQueue, NULL); } /////////////////////////////////////////////////////////////////////////////// @@ -59,6 +67,11 @@ vscpClientTcp::~vscpClientTcp() // Just to be sure disconnect(); + sem_destroy(&m_semReceiveQueue); + + pthread_mutex_destroy(&m_mutexTcpIpObject); + pthread_mutex_destroy(&m_mutexReceiveQueue); + // Clear the input queue (if needed) // while (m_inputQue.size()) { // vscpEvent *pev = m_inputQue.front(); @@ -96,17 +109,17 @@ vscpClientTcp::getConfigAsJson(void) j["keyfile"] = m_keyfile; j["pwkeyfile"] = m_pwKeyfile; - // Filter + // Filter In - j["priority-filter"] = m_filter.filter_priority; - j["priority-mask"] = m_filter.mask_priority; - j["class-filter"] = m_filter.filter_class; - j["class-mask"] = m_filter.mask_class; - j["type-filter"] = m_filter.filter_type; - j["type-mask"] = m_filter.mask_type; - vscp_writeGuidArrayToString(str, m_filter.filter_GUID); + j["priority-filter"] = m_filterIn.filter_priority; + j["priority-mask"] = m_filterIn.mask_priority; + j["class-filter"] = m_filterIn.filter_class; + j["class-mask"] = m_filterIn.mask_class; + j["type-filter"] = m_filterIn.filter_type; + j["type-mask"] = m_filterIn.mask_type; + vscp_writeGuidArrayToString(str, m_filterIn.filter_GUID); j["guid-filter"] = str; - vscp_writeGuidArrayToString(str, m_filter.mask_GUID); + vscp_writeGuidArrayToString(str, m_filterIn.mask_GUID); j["guid-mask"] = str; return j.dump(); @@ -190,37 +203,37 @@ vscpClientTcp::initFromJson(const std::string &config) // Filter if (j.contains("priority-filter")) { - m_filter.filter_priority = j["priority-filter"].get(); + m_filterIn.filter_priority = j["priority-filter"].get(); } if (j.contains("priority-mask")) { - m_filter.mask_priority = j["priority-mask"].get(); + m_filterIn.mask_priority = j["priority-mask"].get(); } if (j.contains("class-filter")) { - m_filter.filter_class = j["class-filter"].get(); + m_filterIn.filter_class = j["class-filter"].get(); } if (j.contains("class-mask")) { - m_filter.mask_class = j["class-mask"].get(); + m_filterIn.mask_class = j["class-mask"].get(); } if (j.contains("type-filter")) { - m_filter.filter_type = j["type-filter"].get(); + m_filterIn.filter_type = j["type-filter"].get(); } if (j.contains("type-mask")) { - m_filter.mask_type = j["type-mask"].get(); + m_filterIn.mask_type = j["type-mask"].get(); } if (j.contains("guid-filter")) { std::string str = j["guid-filter"].get(); - vscp_getGuidFromStringToArray(m_filter.filter_GUID, str); + vscp_getGuidFromStringToArray(m_filterIn.filter_GUID, str); } if (j.contains("guid-mask")) { std::string str = j["guid-mask"].get(); - vscp_getGuidFromStringToArray(m_filter.mask_GUID, str); + vscp_getGuidFromStringToArray(m_filterIn.mask_GUID, str); } } catch (...) { @@ -477,9 +490,9 @@ vscpClientTcp::setfilter(vscpEventFilter &filter) return m_tcp.doCmdFilter(&filter); } else { - m_mutexReceive.lock(); + pthread_mutex_lock(&m_mutexTcpIpObject); return m_tcpReceive.doCmdFilter(&filter); - m_mutexReceive.unlock(); + pthread_mutex_unlock(&m_mutexTcpIpObject); } } @@ -617,11 +630,31 @@ workerThread(vscpClientTcp *pObj) while (pObj->m_bRun) { - pObj->m_mutexReceive.lock(); + pthread_mutex_lock(&pObj->m_mutexTcpIpObject); - // m_pif->rcvloopRead(500); if (VSCP_ERROR_SUCCESS == m_pifReceive->doCmdBlockingReceive(&ev)) { - pObj->sendToCallbacks(&ev); + + if (vscp_doLevel2Filter(&ev, &pObj->m_filterIn)) { + + if (pObj->isCallbackEvActive()) { + pObj->m_callbackev(ev, pObj->getCallbackObj()); + } + + if (pObj->isCallbackExActive()) { + vscpEventEx ex; + if (vscp_convertEventToEventEx(&ex, &ev)) { + pObj->m_callbackex(ex, pObj->getCallbackObj()); + } + } + + // Add to input queue only if no callback set + if (!pObj->isCallbackEvActive() || !pObj->isCallbackExActive()) { + pthread_mutex_lock(&pObj->m_mutexReceiveQueue); + pObj->m_receiveList.push_back(&ev); + sem_post(&pObj->m_semReceiveQueue); + pthread_mutex_unlock(&pObj->m_mutexReceiveQueue); + } + } vscp_deleteEvent(&ev); } @@ -629,6 +662,6 @@ workerThread(vscpClientTcp *pObj) pObj->m_bRun = false; } - pObj->m_mutexReceive.unlock(); + pthread_mutex_unlock(&pObj->m_mutexTcpIpObject); } } \ No newline at end of file diff --git a/src/vscp/common/vscp-client-tcp.h b/src/vscp/common/vscp-client-tcp.h index 576d9f627..11f5142fd 100644 --- a/src/vscp/common/vscp-client-tcp.h +++ b/src/vscp/common/vscp-client-tcp.h @@ -44,8 +44,8 @@ class vscpClientTcp : public CVscpClient { tcp:// or stcp:// (SSL connection) @param strUsername Username used to login on remote host. @param strPassword Password used to login on remote host. - @param bPolling If true only one connection will be opended to the remote server - on which polling for events will be done. If false one connection will be opended + @param bPolling If true only one connection will be opened to the remote server + on which polling for events will be done. If false one connection will be opened for send and one for receive. The polling is intended for low end devices which only accepts one client at the time. @return Return VSCP_ERROR_SUCCESS of OK and error code else. @@ -257,11 +257,18 @@ class vscpClientTcp : public CVscpClient { void sendToCallbacks(vscpEvent *pev); public: - /// Flag for workerthread run as long it's true + /// Flag for worker thread run as long it's true bool m_bRun; + // Event lists + // std::list m_sendList; + std::list m_receiveList; + /// Mutex to protect receive tcp/ip object - std::mutex m_mutexReceive; + pthread_mutex_t m_mutexTcpIpObject; + + /// Mutex to protect receive queue + pthread_mutex_t m_mutexReceiveQueue; /*! Event object to indicate that there is an event in the @@ -269,13 +276,17 @@ class vscpClientTcp : public CVscpClient { */ sem_t m_semReceiveQueue; + /// Filters for input/output + vscpEventFilter m_filterIn; + vscpEventFilter m_filterOut; + /// Used for channel id (prevent sent events from being received) uint32_t m_obid; private: /*! The main interface (sending) is always opened (both in poll and - standard mode). The Receive interface is opended only in normal mode + standard mode). The Receive interface is opened only in normal mode and do just connect - log in - enable receive loop. Received events will be sent on the defined callbacks. */ @@ -286,12 +297,13 @@ class vscpClientTcp : public CVscpClient { /// Receiving interface VscpRemoteTcpIf m_tcpReceive; - // Filter used for both channels - vscpEventFilter m_filter; - // Interface on remote host cguid m_guidif; + // ------------------------------------------------------------------------ + + + /// Workerthread std::thread *m_pworkerthread; @@ -312,6 +324,8 @@ class vscpClientTcp : public CVscpClient { /// If true the remote host interface will be polled. bool m_bPolling; + + // ------------------------------------------------------------------------ // TLS / SSL // ------------------------------------------------------------------------