Skip to content

Commit

Permalink
Added usleep for windows
Browse files Browse the repository at this point in the history
  • Loading branch information
grodansparadis committed Dec 17, 2024
1 parent d663d5b commit 7da6b38
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 95 deletions.
120 changes: 29 additions & 91 deletions src/vscp/common/vscp-client-mqtt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,13 @@
#include <spdlog/spdlog.h>

#include <mustache.hpp>
#include <nlohmann/json.hpp>
#include <nlohmann/json.hpp>

// for convenience
using json = nlohmann::json;
using namespace kainjow::mustache;

// Forward declaration
static void *
workerThread(void *pObj);

// ----------------------------------------------------------------------------

Expand Down Expand Up @@ -544,6 +542,7 @@ publishTopic::publishTopic(const std::string &topic, enumMqttMsgFormat format, i

publishTopic::~publishTopic()
{

#if LIBMOSQUITTO_MAJOR > 1 || (LIBMOSQUITTO_MAJOR == 1 && LIBMOSQUITTO_MINOR >= 6)
mosquitto_property_free_all(&m_properties);
#endif
Expand Down Expand Up @@ -607,8 +606,8 @@ vscpClientMqtt::vscpClientMqtt(void)
m_mapMqttIntOptions["send-maximum"] = 20;

#ifndef WIN32
m_tid = 0; // pthread
#endif
m_tid = 0; // pthread
#endif
m_bConnected = false; // Not connected
m_bJsonMeasurementAdd = true; // Add measurement block to JSON publish event
m_bindInterface = ""; // No bind interface
Expand Down Expand Up @@ -662,7 +661,10 @@ vscpClientMqtt::vscpClientMqtt(void)
return;
}

pthread_mutex_init(&m_mutexif, nullptr);
sem_init(&m_semReceiveQueue, 0, 0);

pthread_mutex_init(&m_mutexif, NULL);
pthread_mutex_init(&m_mutexReceiveQueue, NULL);
}

///////////////////////////////////////////////////////////////////////////////
Expand All @@ -681,7 +683,10 @@ vscpClientMqtt::~vscpClientMqtt()
// Clean up the lib
mosquitto_lib_cleanup();

sem_destroy(&m_semReceiveQueue);

pthread_mutex_destroy(&m_mutexif);
pthread_mutex_destroy(&m_mutexReceiveQueue);

vscpEvent *pev = nullptr;
while (m_receiveQueue.size()) {
Expand Down Expand Up @@ -1284,8 +1289,8 @@ vscpClientMqtt::initFromJson(const std::string &config)
addSubscription(topic, format, qos, v5_options);

} // object
} // for
} // sub
} // for
} // sub

if (j.contains("bescape-pub-topics") && j["bescape-pub-topics"].is_boolean()) {
m_bEscapesPubTopics = j["bescape-pub-topics"].get<bool>();
Expand Down Expand Up @@ -1374,8 +1379,8 @@ vscpClientMqtt::initFromJson(const std::string &config)
addPublish(topic, format, qos, bretain);

} // obj
} // for
} // pub
} // for
} // pub

// v5
if (j.contains("v5") && j["v5"].is_object()) {
Expand Down Expand Up @@ -1517,7 +1522,10 @@ vscpClientMqtt::handleMessage(const struct mosquitto_message *pmsg)

// Save event in incoming queue
if (m_receiveQueue.size() < MQTT_MAX_INQUEUE_SIZE) {
pthread_mutex_lock(&m_mutexReceiveQueue);
m_receiveQueue.push_back(pEvent);
sem_post(&m_semReceiveQueue);
pthread_mutex_unlock(&m_mutexReceiveQueue);
}
}
}
Expand Down Expand Up @@ -1556,7 +1564,10 @@ vscpClientMqtt::handleMessage(const struct mosquitto_message *pmsg)

// Save event in incoming queue
if (m_receiveQueue.size() < MQTT_MAX_INQUEUE_SIZE) {
pthread_mutex_lock(&m_mutexReceiveQueue);
m_receiveQueue.push_back(pEvent);
sem_post(&m_semReceiveQueue);
pthread_mutex_unlock(&m_mutexReceiveQueue);
}
}
}
Expand Down Expand Up @@ -1595,7 +1606,10 @@ vscpClientMqtt::handleMessage(const struct mosquitto_message *pmsg)

// Save event in incoming queue
if (m_receiveQueue.size() < MQTT_MAX_INQUEUE_SIZE) {
pthread_mutex_lock(&m_mutexReceiveQueue);
m_receiveQueue.push_back(pEvent);
sem_post(&m_semReceiveQueue);
pthread_mutex_unlock(&m_mutexReceiveQueue);
}
}
}
Expand Down Expand Up @@ -1637,7 +1651,10 @@ vscpClientMqtt::handleMessage(const struct mosquitto_message *pmsg)

// Save event in incoming queue
if (m_receiveQueue.size() < MQTT_MAX_INQUEUE_SIZE) {
pthread_mutex_lock(&m_mutexReceiveQueue);
m_receiveQueue.push_back(pEvent);
sem_post(&m_semReceiveQueue);
pthread_mutex_unlock(&m_mutexReceiveQueue);
}
}
}
Expand Down Expand Up @@ -2076,30 +2093,6 @@ vscpClientMqtt::connect(void)
}
}

// Start worker thread if a callback has been defined
if (isCallbackEvActive()|| isCallbackExActive()) {
int rv = pthread_create(&m_tid, nullptr, workerThread, this);
switch (rv) {

case EAGAIN:
spdlog::error(
"MQTT CLIENT: Failed to start MQTT callback thread - Insufficient resources to create another thread.");
break;

case EINVAL:
spdlog::error("MQTT CLIENT: Failed to start MQTT callback thread - Invalid settings in attr");
break;

case EPERM:
spdlog::error("MQTT CLIENT: Failed to start MQTT callback thread - No permission to set the scheduling policy");
break;

default:
spdlog::debug("MQTT CLIENT: Started MQTT callback thread");
break;
}
}

return VSCP_ERROR_SUCCESS;
}

Expand Down Expand Up @@ -2219,7 +2212,7 @@ vscpClientMqtt::send(vscpEvent &ev)
spdlog::error("MQTT CLIENT: sendEvent: Failed to add measurement info to event.");
}
} // OK to insert extra info
} // is measurement
} // is measurement

lenPayload = strPayload.length();
strncpy((char *) payload, strPayload.c_str(), lenPayload);
Expand Down Expand Up @@ -2477,7 +2470,7 @@ vscpClientMqtt::send(vscpEventEx &ex)
spdlog::error("MQTT CLIENT: sendEvent: Failed to add measurement info to event.");
}
} // OK to insert extra info
} // is measurement
} // is measurement

lenPayload = strPayload.length();
strncpy((char *) payload, strPayload.c_str(), sizeof(payload));
Expand Down Expand Up @@ -3003,58 +2996,3 @@ win_usleep(__int64 usec)
CloseHandle(timer);
}
#endif

///////////////////////////////////////////////////////////////////////////////
// Workerthread
//
// This thread call the appropriate callback when events are received
//

static void *
workerThread(void *pObj)
{
//uint8_t guid[] = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 };
vscpClientMqtt *pClient = (vscpClientMqtt *) pObj;
// if (nullptr == pif) return nullptr;

while (pClient->m_bRun) {

// pthread_mutex_lock(&pif->m_mutexif);

// Check if there are events to fetch
// int cnt;
/* if ((cnt = pClient->m_canalif.CanalDataAvailable())) {
while (cnt) {
canalMsg msg;
if ( CANAL_ERROR_SUCCESS ==
pClient->m_canalif.CanalReceive(&msg) ) { if ( nullptr !=
pClient->m_evcallback ) { vscpEvent ev; if
(vscp_convertCanalToEvent(&ev, &msg, guid) ) {
pClient->m_evcallback(ev);
}
}
if ( nullptr != pClient->m_excallback ) {
vscpEventEx ex;
if (vscp_convertCanalToEventEx(&ex,
&msg,
guid) ) {
pClient->m_excallback(ex);
}
}
}
cnt--;
}
} */

// pthread_mutex_unlock(&pif->m_mutexif);
#ifndef WIN32
usleep(200);
#else
win_usleep(200);
#endif
}

return nullptr;
}
5 changes: 3 additions & 2 deletions src/vscp/common/vscp-client-mqtt.h
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ class vscpClientMqtt : public CVscpClient {
@param timeout Timeout in milliseconds. Default is 100 ms.
@return Return VSCP_ERROR_SUCCESS of OK and error code else.
*/
virtual int receiveBlocking(vscpEvent &ev, long timeout = 100 );
virtual int receiveBlocking(vscpEvent &ev, long timeout = 100);

/*!
Receive VSCP event ex from remote host
Expand All @@ -419,7 +419,7 @@ class vscpClientMqtt : public CVscpClient {
@param timeout Timeout in milliseconds. Default is 100 ms.
@return Return VSCP_ERROR_SUCCESS of OK and error code else.
*/
virtual int receiveBlocking(vscpEventEx &ex, long timeout = 100 );
virtual int receiveBlocking(vscpEventEx &ex, long timeout = 100);

/*!
Receive CAN(AL) message from remote host
Expand Down Expand Up @@ -728,6 +728,7 @@ class vscpClientMqtt : public CVscpClient {
Mutex that protect CANAL interface when callbacks are defined
*/
pthread_mutex_t m_mutexif;
pthread_mutex_t m_mutexReceiveQueue;

/*!
Event object to indicate that there is an event in the
Expand Down
6 changes: 4 additions & 2 deletions src/vscp/common/vscp-client-tcp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -503,8 +503,9 @@ vscpClientTcp::setfilter(vscpEventFilter &filter)
int
vscpClientTcp::getcount(uint16_t *pcount)
{
if (NULL == pcount)
if (NULL == pcount) {
return VSCP_ERROR_INVALID_POINTER;
}
*pcount = m_tcp.doCmdDataAvailable();
return VSCP_ERROR_SUCCESS;
}
Expand Down Expand Up @@ -621,8 +622,9 @@ workerThread(vscpClientTcp *pObj)
ev.pdata = NULL;

// Check pointer
if (nullptr == pObj)
if (nullptr == pObj) {
return;
}

VscpRemoteTcpIf *m_pifReceive = pObj->getTcpReceive();

Expand Down
16 changes: 16 additions & 0 deletions src/vscp/common/vscphelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,22 @@ vscp_sem_wait(sem_t *sem, uint32_t waitms)
}
#endif

#ifdef WIN32
static void
vscp_usleep(__int64 usec)
{
HANDLE timer;
LARGE_INTEGER ft;

ft.QuadPart = -(10 * usec); // Convert to 100 nanosecond interval, negative value indicates relative time

timer = CreateWaitableTimer(nullptr, TRUE, nullptr);
SetWaitableTimer(timer, &ft, 0, nullptr, nullptr, 0);
WaitForSingleObject(timer, INFINITE);
CloseHandle(timer);
}
#endif

///////////////////////////////////////////////////////////////////////////////
// vscp_almostEqualRelativeFloat
//
Expand Down
9 changes: 9 additions & 0 deletions src/vscp/common/vscphelper.h
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,15 @@ int
vscp_sem_wait(sem_t *sem, uint32_t waitms);
#endif

/*!
Sleep for a number of microseconds
@param usec Number of microseconds to sleep
*/
#ifdef WIN32
static void
vscp_usleep(__int64 usec);
#endif

/*
* Check two floats for equality
* @param A Float to compare
Expand Down

0 comments on commit 7da6b38

Please sign in to comment.