Skip to content

Commit

Permalink
Added queue event collection to tcp/ip client
Browse files Browse the repository at this point in the history
  • Loading branch information
grodansparadis committed Dec 16, 2024
1 parent 38c4e01 commit d663d5b
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 32 deletions.
2 changes: 1 addition & 1 deletion src/vscp/common/vscp-client-socketcan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -882,7 +882,7 @@ workerThread(void *pData)
setsockopt(pObj->m_socket, SOL_SOCKET, SO_RCVTIMEO, (const char *) &tv, sizeof(struct timeval));

if (bind(pObj->m_socket, (struct sockaddr *) &addr, sizeof(addr)) < 0) {
spdlog::error("SOCKETCAN client: wrkthread socketcan client: Error in socket bind. Terminating!");
spdlog::error("SOCKETCAN client: workthread socketcan client: Error in socket bind. Terminating!");
close(pObj->m_socket);
sleep(2);
// continue;
Expand Down
79 changes: 56 additions & 23 deletions src/vscp/common/vscp-client-tcp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,14 @@ vscpClientTcp::vscpClientTcp()
m_guidif.clear();
m_bPolling = false;
m_obid = 0;

vscp_clearVSCPFilter(&m_filterIn); // Accept all events
vscp_clearVSCPFilter(&m_filterOut); // Send all events

sem_init(&m_semReceiveQueue, 0, 0);

pthread_mutex_init(&m_mutexTcpIpObject, NULL);
pthread_mutex_init(&m_mutexReceiveQueue, NULL);
}

///////////////////////////////////////////////////////////////////////////////
Expand All @@ -59,6 +67,11 @@ vscpClientTcp::~vscpClientTcp()
// Just to be sure
disconnect();

sem_destroy(&m_semReceiveQueue);

pthread_mutex_destroy(&m_mutexTcpIpObject);
pthread_mutex_destroy(&m_mutexReceiveQueue);

// Clear the input queue (if needed)
// while (m_inputQue.size()) {
// vscpEvent *pev = m_inputQue.front();
Expand Down Expand Up @@ -96,17 +109,17 @@ vscpClientTcp::getConfigAsJson(void)
j["keyfile"] = m_keyfile;
j["pwkeyfile"] = m_pwKeyfile;

// Filter
// Filter In

j["priority-filter"] = m_filter.filter_priority;
j["priority-mask"] = m_filter.mask_priority;
j["class-filter"] = m_filter.filter_class;
j["class-mask"] = m_filter.mask_class;
j["type-filter"] = m_filter.filter_type;
j["type-mask"] = m_filter.mask_type;
vscp_writeGuidArrayToString(str, m_filter.filter_GUID);
j["priority-filter"] = m_filterIn.filter_priority;
j["priority-mask"] = m_filterIn.mask_priority;
j["class-filter"] = m_filterIn.filter_class;
j["class-mask"] = m_filterIn.mask_class;
j["type-filter"] = m_filterIn.filter_type;
j["type-mask"] = m_filterIn.mask_type;
vscp_writeGuidArrayToString(str, m_filterIn.filter_GUID);
j["guid-filter"] = str;
vscp_writeGuidArrayToString(str, m_filter.mask_GUID);
vscp_writeGuidArrayToString(str, m_filterIn.mask_GUID);
j["guid-mask"] = str;

return j.dump();
Expand Down Expand Up @@ -190,37 +203,37 @@ vscpClientTcp::initFromJson(const std::string &config)
// Filter

if (j.contains("priority-filter")) {
m_filter.filter_priority = j["priority-filter"].get<uint8_t>();
m_filterIn.filter_priority = j["priority-filter"].get<uint8_t>();
}

if (j.contains("priority-mask")) {
m_filter.mask_priority = j["priority-mask"].get<uint8_t>();
m_filterIn.mask_priority = j["priority-mask"].get<uint8_t>();
}

if (j.contains("class-filter")) {
m_filter.filter_class = j["class-filter"].get<uint16_t>();
m_filterIn.filter_class = j["class-filter"].get<uint16_t>();
}

if (j.contains("class-mask")) {
m_filter.mask_class = j["class-mask"].get<uint16_t>();
m_filterIn.mask_class = j["class-mask"].get<uint16_t>();
}

if (j.contains("type-filter")) {
m_filter.filter_type = j["type-filter"].get<uint16_t>();
m_filterIn.filter_type = j["type-filter"].get<uint16_t>();
}

if (j.contains("type-mask")) {
m_filter.mask_type = j["type-mask"].get<uint16_t>();
m_filterIn.mask_type = j["type-mask"].get<uint16_t>();
}

if (j.contains("guid-filter")) {
std::string str = j["guid-filter"].get<std::string>();
vscp_getGuidFromStringToArray(m_filter.filter_GUID, str);
vscp_getGuidFromStringToArray(m_filterIn.filter_GUID, str);
}

if (j.contains("guid-mask")) {
std::string str = j["guid-mask"].get<std::string>();
vscp_getGuidFromStringToArray(m_filter.mask_GUID, str);
vscp_getGuidFromStringToArray(m_filterIn.mask_GUID, str);
}
}
catch (...) {
Expand Down Expand Up @@ -477,9 +490,9 @@ vscpClientTcp::setfilter(vscpEventFilter &filter)
return m_tcp.doCmdFilter(&filter);
}
else {
m_mutexReceive.lock();
pthread_mutex_lock(&m_mutexTcpIpObject);
return m_tcpReceive.doCmdFilter(&filter);
m_mutexReceive.unlock();
pthread_mutex_unlock(&m_mutexTcpIpObject);
}
}

Expand Down Expand Up @@ -617,18 +630,38 @@ workerThread(vscpClientTcp *pObj)

while (pObj->m_bRun) {

pObj->m_mutexReceive.lock();
pthread_mutex_lock(&pObj->m_mutexTcpIpObject);

// m_pif->rcvloopRead(500);
if (VSCP_ERROR_SUCCESS == m_pifReceive->doCmdBlockingReceive(&ev)) {
pObj->sendToCallbacks(&ev);

if (vscp_doLevel2Filter(&ev, &pObj->m_filterIn)) {

if (pObj->isCallbackEvActive()) {
pObj->m_callbackev(ev, pObj->getCallbackObj());
}

if (pObj->isCallbackExActive()) {
vscpEventEx ex;
if (vscp_convertEventToEventEx(&ex, &ev)) {
pObj->m_callbackex(ex, pObj->getCallbackObj());
}
}

// Add to input queue only if no callback set
if (!pObj->isCallbackEvActive() || !pObj->isCallbackExActive()) {
pthread_mutex_lock(&pObj->m_mutexReceiveQueue);
pObj->m_receiveList.push_back(&ev);
sem_post(&pObj->m_semReceiveQueue);
pthread_mutex_unlock(&pObj->m_mutexReceiveQueue);
}
}
vscp_deleteEvent(&ev);
}

if (!m_pifReceive->isConnected()) {
pObj->m_bRun = false;
}

pObj->m_mutexReceive.unlock();
pthread_mutex_unlock(&pObj->m_mutexTcpIpObject);
}
}
30 changes: 22 additions & 8 deletions src/vscp/common/vscp-client-tcp.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ class vscpClientTcp : public CVscpClient {
tcp:// or stcp:// (SSL connection)
@param strUsername Username used to login on remote host.
@param strPassword Password used to login on remote host.
@param bPolling If true only one connection will be opended to the remote server
on which polling for events will be done. If false one connection will be opended
@param bPolling If true only one connection will be opened to the remote server
on which polling for events will be done. If false one connection will be opened
for send and one for receive. The polling is intended for low end devices which
only accepts one client at the time.
@return Return VSCP_ERROR_SUCCESS of OK and error code else.
Expand Down Expand Up @@ -257,25 +257,36 @@ class vscpClientTcp : public CVscpClient {
void sendToCallbacks(vscpEvent *pev);

public:
/// Flag for workerthread run as long it's true
/// Flag for worker thread run as long it's true
bool m_bRun;

// Event lists
// std::list<vscpEvent *> m_sendList;
std::list<vscpEvent *> m_receiveList;

/// Mutex to protect receive tcp/ip object
std::mutex m_mutexReceive;
pthread_mutex_t m_mutexTcpIpObject;

/// Mutex to protect receive queue
pthread_mutex_t m_mutexReceiveQueue;

/*!
Event object to indicate that there is an event in the
output queue
*/
sem_t m_semReceiveQueue;

/// Filters for input/output
vscpEventFilter m_filterIn;
vscpEventFilter m_filterOut;

/// Used for channel id (prevent sent events from being received)
uint32_t m_obid;

private:
/*!
The main interface (sending) is always opened (both in poll and
standard mode). The Receive interface is opended only in normal mode
standard mode). The Receive interface is opened only in normal mode
and do just connect - log in - enable receive loop. Received events
will be sent on the defined callbacks.
*/
Expand All @@ -286,12 +297,13 @@ class vscpClientTcp : public CVscpClient {
/// Receiving interface
VscpRemoteTcpIf m_tcpReceive;

// Filter used for both channels
vscpEventFilter m_filter;

// Interface on remote host
cguid m_guidif;

// ------------------------------------------------------------------------



/// Workerthread
std::thread *m_pworkerthread;

Expand All @@ -312,6 +324,8 @@ class vscpClientTcp : public CVscpClient {
/// If true the remote host interface will be polled.
bool m_bPolling;



// ------------------------------------------------------------------------
// TLS / SSL
// ------------------------------------------------------------------------
Expand Down

0 comments on commit d663d5b

Please sign in to comment.