Skip to content

Commit

Permalink
Clients now support callbacks and buffer read
Browse files Browse the repository at this point in the history
  • Loading branch information
grodansparadis committed Dec 17, 2024
1 parent 7da6b38 commit 5dfbf4a
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 61 deletions.
141 changes: 95 additions & 46 deletions src/vscp/common/vscp-client-canal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,14 @@
#include "vscp-client-canal.h"
#include "vscphelper.h"



//#include <mustache.hpp>
// #include <mustache.hpp>
#include <nlohmann/json.hpp> // Needs C++11 -std=c++11

// for convenience
using json = nlohmann::json;
//using namespace kainjow::mustache;
// using namespace kainjow::mustache;

//#include <spdlog/sinks/rotating_file_sink.h>
// #include <spdlog/sinks/rotating_file_sink.h>
#include <spdlog/spdlog.h>

// Forward declaration
Expand All @@ -64,7 +62,17 @@ vscpClientCanal::vscpClientCanal()
m_bConnected = false; // Not connected
// m_tid = 0;
m_bRun = true;

vscp_clearVSCPFilter(&m_filterIn); // Accept all events

pthread_mutex_init(&m_mutexif, NULL);
pthread_mutex_init(&m_mutexReceiveQueue, NULL);

// sem_init(&m_semSendQueue, 0, 0);
sem_init(&m_semReceiveQueue, 0, 0);

// pthread_mutex_init(&m_mutexSendQueue, NULL);
pthread_mutex_init(&m_mutexReceiveQueue, NULL);

spdlog::trace("CANAL CLIENT: constructor vscp_client_canal object.");
}
Expand All @@ -76,7 +84,19 @@ vscpClientCanal::vscpClientCanal()
vscpClientCanal::~vscpClientCanal()
{
disconnect();

sem_destroy(&m_semReceiveQueue);

pthread_mutex_destroy(&m_mutexif);
pthread_mutex_destroy(&m_mutexReceiveQueue);

// Clear the input queue (if needed)
while (m_receiveQueue.size()) {
vscpEvent *pev = m_receiveQueue.front();
m_receiveQueue.pop_front();
vscp_deleteEvent(pev);
}

spdlog::trace("CANAL CLIENT: destructor vscp_client_canal object.");
}

Expand Down Expand Up @@ -133,13 +153,13 @@ vscpClientCanal::initFromJson(const std::string &config)
spdlog::error("CANAL CLIENT: JSON init: Name must be set.");
return false; // Must be set
}
spdlog::debug("CANAL CLIENT: JSON init: name={}.", (std::string)j["name"]);
spdlog::debug("CANAL CLIENT: JSON init: name={}.", (std::string) j["name"]);

if (!j["path"].is_string()) {
spdlog::error("CANAL CLIENT: JSON init: Path must be set.");
return false; // Must be set
}
spdlog::debug("CANAL CLIENT: JSON init: path={}.", (std::string)j["path"]);
spdlog::debug("CANAL CLIENT: JSON init: path={}.", (std::string) j["path"]);

if (j.contains("config")) {
if (!j["config"].is_string()) {
Expand All @@ -150,7 +170,7 @@ vscpClientCanal::initFromJson(const std::string &config)
else {
j["config"] = ""; // Set default
}
spdlog::debug("CANAL CLIENT: JSON init: config=\"{}\".", (std::string)j["config"]);
spdlog::debug("CANAL CLIENT: JSON init: config=\"{}\".", (std::string) j["config"]);

if (j.contains("flags")) {
if (j["flags"].is_string()) {
Expand All @@ -160,7 +180,7 @@ vscpClientCanal::initFromJson(const std::string &config)
else {
j["flags"] = 0; // Set default
}
spdlog::debug("CANAL CLIENT: JSON init: flags={}.", (uint32_t)j["flags"]);
spdlog::debug("CANAL CLIENT: JSON init: flags={}.", (uint32_t) j["flags"]);

if (j.contains("datarate")) {
if (j["datarate"].is_string()) {
Expand All @@ -170,7 +190,7 @@ vscpClientCanal::initFromJson(const std::string &config)
else {
j["datarate"] = 0; // Set default
}
spdlog::debug("CANAL CLIENT: JSON init: datarate={}.", (int)j["datarate"]);
spdlog::debug("CANAL CLIENT: JSON init: datarate={}.", (int) j["datarate"]);

setName(j["name"]);
return (init(j["path"], j["config"], j["flags"], j["datarate"]));
Expand Down Expand Up @@ -358,8 +378,8 @@ int
vscpClientCanal::receive(canalMsg &msg)
{
int rv;
//canalMsg canalMsg;
//uint8_t guid[] = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 };
// canalMsg canalMsg;
// uint8_t guid[] = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 };

spdlog::debug("CANAL CLIENT: Poll for event ex.");

Expand Down Expand Up @@ -492,7 +512,7 @@ vscpClientCanal::clear()
int
vscpClientCanal::getversion(uint8_t * /*pmajor*/, uint8_t * /*pminor*/, uint8_t * /*prelease*/, uint8_t * /*pbuild*/)
{
//uint32_t ver = m_canalif.CanalGetDllVersion();
// uint32_t ver = m_canalif.CanalGetDllVersion();

return VSCP_ERROR_SUCCESS;
}
Expand All @@ -502,7 +522,7 @@ vscpClientCanal::getversion(uint8_t * /*pmajor*/, uint8_t * /*pminor*/, uint8_t
//

int
vscpClientCanal::getinterfaces(std::deque<std::string> & /*iflist*/ )
vscpClientCanal::getinterfaces(std::deque<std::string> & /*iflist*/)
{
// No interfaces available
return VSCP_ERROR_SUCCESS;
Expand Down Expand Up @@ -573,7 +593,7 @@ vscpClientCanal::setCallbackEv(std::function<void(vscpEvent &ev, void *pobj)> ca
}

spdlog::debug("CANAL CLIENT: ev callback set.");
CVscpClient::setCallbackEv(callback, pData);
CVscpClient::setCallbackEv(callback, pData);
return VSCP_ERROR_SUCCESS;
}

Expand Down Expand Up @@ -620,6 +640,7 @@ win_usleep(__int64 usec)
static void *
workerThread(void *pObj)
{
canalMsg msg;
uint8_t guid[] = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 };
vscpClientCanal *pClient = (vscpClientCanal *) pObj;
VscpCanalDeviceIf *pif = (VscpCanalDeviceIf *) &(pClient->m_canalif);
Expand All @@ -631,48 +652,76 @@ workerThread(void *pObj)

while (pClient->m_bRun) {

spdlog::trace("CANAL CLIENT: worktread start.");

pthread_mutex_lock(&pClient->m_mutexif);
spdlog::trace("CANAL CLIENT: workertread start.");

// Check if there are events to fetch
int cnt;
if ((cnt = pClient->m_canalif.CanalDataAvailable())) {

while (cnt) {
canalMsg msg;
if (CANAL_ERROR_SUCCESS == pClient->m_canalif.CanalReceive(&msg)) {
spdlog::debug("CANAL CLIENT: workthread. Event recived");
if (pClient->isCallbackEvActive()) {
vscpEvent ev;
if (vscp_convertCanalToEvent(&ev, &msg, guid)) {
spdlog::trace("CANAL CLIENT: workthread. Event sent to ev callback");
pClient->m_callbackev(ev, pClient->getCallbackObj());
}
// int cnt;
// if ((cnt = pClient->m_canalif.CanalDataAvailable())) {

// while (cnt) {



if (CANAL_ERROR_SUCCESS == pClient->m_canalif.CanalBlockingReceive(&msg, 100)) {

spdlog::trace("CANAL CLIENT: workthread. Event received");

pthread_mutex_lock(&pClient->m_mutexif);

if (pClient->isCallbackEvActive()) {
vscpEvent ev;
if (vscp_convertCanalToEvent(&ev, &msg, guid)) {
if (vscp_doLevel2Filter(&ev, &pClient->m_filterIn)) {
spdlog::trace("CANAL CLIENT: workthread. Event sent to ev callback");
pClient->m_callbackev(ev, pClient->getCallbackObj());
}
if (pClient->isCallbackExActive()) {
vscpEventEx ex;
if (vscp_convertCanalToEventEx(&ex, &msg, guid)) {
spdlog::trace("CANAL CLIENT: workthread. Event sent to ex callback");
pClient->m_callbackex(ex, pClient->getCallbackObj());
}
}
}
else if (pClient->isCallbackExActive()) {
vscpEventEx ex;
if (vscp_convertCanalToEventEx(&ex, &msg, guid)) {
if (vscp_doLevel2FilterEx(&ex, &pClient->m_filterIn)) {
spdlog::trace("CANAL CLIENT: workthread. Event sent to ex callback");
pClient->m_callbackex(ex, pClient->getCallbackObj());
}
}
cnt--;
}
}
else {

pthread_mutex_unlock(&pClient->m_mutexif);
// No callback defined so save event in incoming queue

#ifndef WIN32
usleep(200);
#else
win_usleep(200);
#endif
vscpEvent *pev = new vscpEvent;
if (nullptr == pev) {
spdlog::critical("CANAL CLIENT: Memory problem.");
return NULL;
}
if (vscp_convertCanalToEvent(pev, &msg, guid)) {
if (vscp_doLevel2Filter(pev, &pClient->m_filterIn)) {
pthread_mutex_lock(&pClient->m_mutexReceiveQueue);
pClient->m_receiveQueue.push_back(pev);
sem_post(&pClient->m_semReceiveQueue);
pthread_mutex_unlock(&pClient->m_mutexReceiveQueue);
}
}
else {
vscp_deleteEvent(pev);
}
}

spdlog::trace("CANAL CLIENT: worktread end.");
pthread_mutex_unlock(&pClient->m_mutexif);

} // message received
} // while
// cnt--;
//}

// #ifndef WIN32
// usleep(200);
// #else
// win_usleep(200);
// #endif

spdlog::trace("CANAL CLIENT: workthread end.");

return NULL;
}
21 changes: 20 additions & 1 deletion src/vscp/common/vscp-client-canal.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@
#define VSCPCLIENTCANAL_H__INCLUDED_

#include "vscp.h"
#include <vscphelper.h>
#include "vscp-client-base.h"
#include "vscpcanaldeviceif.h"

#include <pthread.h>

// When a callback is set and connect is called this object is shared
// with a workerthread that
// with a worker thread that

class vscpClientCanal : public CVscpClient {

Expand Down Expand Up @@ -226,10 +227,28 @@ class vscpClientCanal : public CVscpClient {

// Mutex that protect CANAL interface when callbacks are defined
pthread_mutex_t m_mutexif;
pthread_mutex_t m_mutexReceiveQueue;

/*!
Event object to indicate that there is an event in the
output queue
*/
sem_t m_semReceiveQueue;

/*!
If no callback is defined received events are connected in
this queue
*/
std::deque<vscpEvent *> m_receiveQueue;

// CANAL functionality
VscpCanalDeviceIf m_canalif;

/*!
Receive filter
*/
vscpEventFilter m_filterIn;

private:
/*!
True of dll connection is open
Expand Down
2 changes: 1 addition & 1 deletion src/vscp/common/vscp-client-mqtt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,7 @@ vscpClientMqtt::~vscpClientMqtt()
while (m_receiveQueue.size()) {
pev = m_receiveQueue.front();
m_receiveQueue.pop_front();
vscp_deleteEvent_v2(&pev);
vscp_deleteEvent(pev);
}

// Delete subscription objects
Expand Down
30 changes: 22 additions & 8 deletions src/vscp/common/vscp-client-socketcan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,21 +153,32 @@ vscpClientSocketCan::vscpClientSocketCan()
vscpClientSocketCan::~vscpClientSocketCan()
{
disconnect();

pthread_mutex_destroy(&m_mutexSocket);

sem_destroy(&m_semSendQueue);
sem_destroy(&m_semReceiveQueue);

pthread_mutex_destroy(&m_mutexSendQueue);
pthread_mutex_destroy(&m_mutexReceiveQueue);

// Clear the input queue (if needed)
while (m_receiveList.size()) {
vscpEvent *pev = m_receiveList.front();
m_receiveList.pop_front();
vscp_deleteEvent(pev);
}
}

///////////////////////////////////////////////////////////////////////////////
// init
//

int
vscpClientSocketCan::init(const std::string &interface, const std::string &guid, unsigned long flags, uint32_t /*timeout*/)
vscpClientSocketCan::init(const std::string &interface,
const std::string &guid,
unsigned long flags,
uint32_t /*timeout*/)
{
m_interface = interface;
m_guid.getFromString(guid);
Expand Down Expand Up @@ -634,7 +645,7 @@ vscpClientSocketCan::setfilter(vscpEventFilter & /*filter*/)
// ((unsigned long) filter.mask_type << 8) | filter.mask_GUID[0];
// m_canalif.CanalSetMask(_mask);

return rv;
return rv;
}

///////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -663,7 +674,10 @@ vscpClientSocketCan::clear()
//

int
vscpClientSocketCan::getversion(uint8_t * /*pmajor*/, uint8_t * /*pminor*/, uint8_t * /*prelease*/, uint8_t * /*pbuild*/)
vscpClientSocketCan::getversion(uint8_t * /*pmajor*/,
uint8_t * /*pminor*/,
uint8_t * /*prelease*/,
uint8_t * /*pbuild*/)
{
// uint32_t ver = m_canalif.CanalGetDllVersion();

Expand All @@ -675,7 +689,7 @@ vscpClientSocketCan::getversion(uint8_t * /*pmajor*/, uint8_t * /*pminor*/, uint
//

int
vscpClientSocketCan::getinterfaces(std::deque<std::string> &/*iflist*/)
vscpClientSocketCan::getinterfaces(std::deque<std::string> & /*iflist*/)
{
// No interfaces available
return VSCP_ERROR_SUCCESS;
Expand Down Expand Up @@ -792,13 +806,13 @@ workerThread(void *pData)
{
int mtu, enable_canfd = 1;
fd_set rdfs;
//struct timeval tv;
// struct timeval tv;
struct sockaddr_can addr;
struct ifreq ifr;
//struct cmsghdr *cmsg;
// struct cmsghdr *cmsg;
struct canfd_frame frame;
//char ctrlmsg[CMSG_SPACE(sizeof(struct timeval)) + CMSG_SPACE(sizeof(__u32))];
//const int canfd_on = 1;
// char ctrlmsg[CMSG_SPACE(sizeof(struct timeval)) + CMSG_SPACE(sizeof(__u32))];
// const int canfd_on = 1;

vscpClientSocketCan *pObj = (vscpClientSocketCan *) pData;
if (NULL == pObj) {
Expand Down
Loading

0 comments on commit 5dfbf4a

Please sign in to comment.