From 24df96c66ae5a6b50617afe1e9b96d3386f027a5 Mon Sep 17 00:00:00 2001 From: John Patton Date: Tue, 13 Nov 2018 09:18:34 -0700 Subject: [PATCH 01/11] added better logging when a thread dies --- util/include/logger.h | 14 +++++++------- util/include/threadbaseclass.h | 4 +++- util/src/threadbaseclass.cpp | 32 +++++++++++++++++++++++++------- 3 files changed, 35 insertions(+), 15 deletions(-) diff --git a/util/include/logger.h b/util/include/logger.h index 452cb7c2..53ed317d 100644 --- a/util/include/logger.h +++ b/util/include/logger.h @@ -26,32 +26,32 @@ namespace util { /** * \brief glassutil logging class * - * The CLogit class encapsulates the logic and functionality needed + * The Logger class encapsulates the logic and functionality needed * to write logging information to disk. */ class Logger { public: /** - * \brief CLogit constructor + * \brief Logger constructor * - * The constructor for the CLogit class. + * The constructor for the Logger class. */ Logger(); /** - * \brief CLogit destructor + * \brief Logger destructor * - * The destructor for the CLogit class. + * The destructor for the Logger class. */ ~Logger(); /** - * \brief CLogit disable logging function + * \brief Logger disable logging function */ static void disable(); /** - * \brief CLogit enable logging function + * \brief Logger enable logging function */ static void enable(); diff --git a/util/include/threadbaseclass.h b/util/include/threadbaseclass.h index 618b13a3..c31b78a1 100644 --- a/util/include/threadbaseclass.h +++ b/util/include/threadbaseclass.h @@ -215,9 +215,11 @@ class ThreadBaseClass : public util::BaseClass { * This function retrieves the oldest time any of the the health statuses of * the work threads was updated as healthy by the setThreadHealth function * + * \param oldestThreadId - An optional pointer to a std::thread::id that + * holds the id of the oldest thread getAllLastHealthy checked. * \return A std::time_t containing the last check time */ - std::time_t getAllLastHealthy(); + std::time_t getAllLastHealthy(std::thread::id* oldestThreadId = NULL); /** * \brief ThreadBaseClass work function diff --git a/util/src/threadbaseclass.cpp b/util/src/threadbaseclass.cpp index e86c6eae..fa7b0eda 100644 --- a/util/src/threadbaseclass.cpp +++ b/util/src/threadbaseclass.cpp @@ -211,14 +211,25 @@ bool ThreadBaseClass::healthCheck() { + getThreadName() + ")"); return (false); } + std::thread::id oldestThreadId; + int lastCheckInterval = (std::time(nullptr) - + getAllLastHealthy(&oldestThreadId)); - int lastCheckInterval = (std::time(nullptr) - getAllLastHealthy()); if (lastCheckInterval > getHealthCheckInterval()) { + // convert threadid to size_t + // thread id's are not directly printable + // so use the same method spdlog uses since we want this for + // logging anyway + size_t tid = + static_cast(std::hash()(oldestThreadId)); + glass3::util::Logger::log( "error", "ThreadBaseClass::healthCheck():" - " lastCheckInterval for at least one thread in" - + getThreadName() + " exceeds health check interval ( " + " lastCheckInterval for at least one thread in " + + getThreadName() + " (" + + std::to_string(tid) + + ") exceeds health check interval ( " + std::to_string(lastCheckInterval) + " > " + std::to_string(getHealthCheckInterval()) + " )"); @@ -293,13 +304,14 @@ void ThreadBaseClass::workLoop() { } // ---------------------------------------------------------getAllLastHealthy -std::time_t ThreadBaseClass::getAllLastHealthy() { +std::time_t ThreadBaseClass::getAllLastHealthy( + std::thread::id* oldestThreadId) { // don't bother if we've not got any threads if (getNumThreads() <= 0) { return (0); } - // empty check + // empty check (either no threads or not tracking status) if (m_ThreadHealthMap.size() == 0) { return (0); } @@ -309,7 +321,7 @@ std::time_t ThreadBaseClass::getAllLastHealthy() { return (0); } - // init oldest time to now + // init oldest time to now, everything should be older than now double oldestTime = std::time(nullptr); // go through all work threads @@ -321,8 +333,14 @@ std::time_t ThreadBaseClass::getAllLastHealthy() { // get the thread status double healthTime = static_cast(StatusItr->second); - // at least one thread did not respond + // Only report the oldest time if (healthTime < oldestTime) { + // remember the oldest thread if we're asked + if(oldestThreadId != NULL) { + *oldestThreadId = StatusItr->first; + } + + // remember the oldest time oldestTime = healthTime; } } From e8c6b3e8473ef87c6860f97fe4e51832e49c2d58 Mon Sep 17 00:00:00 2001 From: John Patton Date: Tue, 13 Nov 2018 14:17:24 -0700 Subject: [PATCH 02/11] added mutex protection to m_mLastTimeSiteLookedUp, changed so that known stations are not re-looked up --- glasscore/glasslib/src/SiteList.cpp | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/glasscore/glasslib/src/SiteList.cpp b/glasscore/glasslib/src/SiteList.cpp index 8e37fb5b..0603dca0 100644 --- a/glasscore/glasslib/src/SiteList.cpp +++ b/glasscore/glasslib/src/SiteList.cpp @@ -322,15 +322,22 @@ std::shared_ptr CSiteList::getSite(std::string site, std::string comp, scnl += "." + loc; } + // lookup the site from the map + std::shared_ptr foundSite = getSite(scnl); + if (foundSite != NULL) { + return (foundSite); + } + + // if we didn't find the site in the // send request for information about this station - // this section is above the getSite() call so that - // we could be constantly requesting new station information, - // which seems useful. if (m_iHoursBeforeLookingUp >= 0) { // what time is it time_t tNow; std::time(&tNow); + // lock while we are searching / editing m_mLastTimeSiteLookedUp + std::lock_guard guard(m_SiteListMutex); + // get what time this station has been looked up before int tLookup = 0; auto itsite = m_mLastTimeSiteLookedUp.find(scnl); @@ -364,12 +371,6 @@ std::shared_ptr CSiteList::getSite(std::string site, std::string comp, } } - // lookup the site from the map - std::shared_ptr foundSite = getSite(scnl); - if (foundSite != NULL) { - return (foundSite); - } - // site not found return (NULL); } From 6edceb16ca849c34d507e28c9ee7e72e12776b0d Mon Sep 17 00:00:00 2001 From: John Patton Date: Wed, 14 Nov 2018 14:54:41 -0700 Subject: [PATCH 03/11] updated to work with latest hazdevbroker --- CMakeLists.txt | 5 +- cmake/include_rapidjson.cmake | 7 +++ glass-broker-app/CMakeLists.txt | 3 ++ glass-broker-app/brokerInput/brokerInput.cpp | 49 +++++++++++++++++++- glass-broker-app/brokerInput/brokerInput.h | 5 ++ util/include/threadbaseclass.h | 4 +- util/src/threadbaseclass.cpp | 23 ++------- 7 files changed, 70 insertions(+), 26 deletions(-) create mode 100644 cmake/include_rapidjson.cmake diff --git a/CMakeLists.txt b/CMakeLists.txt index dde953cd..38dc07c4 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -179,7 +179,7 @@ if (BUILD_GLASS-BROKER-APP) if (GIT_CLONE_PUBLIC) ExternalProject_Add(HazdevBroker GIT_REPOSITORY https://github.com/usgs/hazdev-broker.git - #GIT_TAG working + GIT_TAG v0.2.0 TIMEOUT 10 CMAKE_ARGS -DCMAKE_INSTALL_PREFIX=${INSTALL_LOCATION} -DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE} @@ -191,7 +191,7 @@ if (BUILD_GLASS-BROKER-APP) else() ExternalProject_Add(HazdevBroker GIT_REPOSITORY git@github.com:usgs/hazdev-broker.git - #GIT_TAG working + GIT_TAG v0.2.0 TIMEOUT 10 CMAKE_ARGS -DCMAKE_INSTALL_PREFIX=${INSTALL_LOCATION} -DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE} @@ -352,6 +352,7 @@ if (BUILD_GLASS-BROKER-APP) CMAKE_ARGS -DCMAKE_INSTALL_PREFIX=${INSTALL_LOCATION} -DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE} -DCMAKE_MODULE_PATH=${CMAKE_MODULE_PATH} + -DRAPIDJSON_PATH=${RAPIDJSON_PATH} -DLIBRDKAFKA_PATH=${LIBRDKAFKA_PATH} -DLIBRDKAFKA_C_LIB=${LIBRDKAFKA_C_LIB} -DLIBRDKAFKA_CPP_LIB=${LIBRDKAFKA_CPP_LIB} diff --git a/cmake/include_rapidjson.cmake b/cmake/include_rapidjson.cmake new file mode 100644 index 00000000..9d17f57d --- /dev/null +++ b/cmake/include_rapidjson.cmake @@ -0,0 +1,7 @@ +# include_rapidjson.cmake - a CMake script that finds the rapidjson +# include files +# +# rapidjson +set(RAPIDJSON_PATH "${CMAKE_CURRENT_LIST_DIR}/lib/rapidjson" CACHE PATH "Path to rapidjson") + +include_directories(${RAPIDJSON_PATH}) diff --git a/glass-broker-app/CMakeLists.txt b/glass-broker-app/CMakeLists.txt index 992b3e46..3aff7cc9 100644 --- a/glass-broker-app/CMakeLists.txt +++ b/glass-broker-app/CMakeLists.txt @@ -21,6 +21,9 @@ include(${CMAKE_DIR}/base.cmake) # SuperEasyJSON include(${CMAKE_DIR}/include_SuperEasyJSON.cmake) +# rapidjson +include(${CMAKE_DIR}/include_rapidjson.cmake) + # detection-formats include(${CMAKE_DIR}/include_DetectionFormats.cmake) diff --git a/glass-broker-app/brokerInput/brokerInput.cpp b/glass-broker-app/brokerInput/brokerInput.cpp index b723421b..2da1c5af 100644 --- a/glass-broker-app/brokerInput/brokerInput.cpp +++ b/glass-broker-app/brokerInput/brokerInput.cpp @@ -8,6 +8,7 @@ #include #include #include +#include namespace glass3 { @@ -18,6 +19,7 @@ brokerInput::brokerInput() "brokerInput::brokerInput(): Construction."); m_Consumer = NULL; + m_iHeartbeatInterval = std::numeric_limits::quiet_NaN(); clear(); } @@ -26,6 +28,7 @@ brokerInput::brokerInput() brokerInput::brokerInput(const std::shared_ptr &config) : glass3::input::Input() { m_Consumer = NULL; + m_iHeartbeatInterval = std::numeric_limits::quiet_NaN(); // do basic construction clear(); @@ -128,6 +131,25 @@ bool brokerInput::setup(std::shared_ptr config) { } } + // optional directory to write heartbeat files to + std::string heartbeatDirectory = ""; + if (config->HasKey("HeartbeatDirectory")) { + heartbeatDirectory = (*config)["HeartbeatDirectory"].ToString(); + glass3::util::Logger::log( + "info", + "brokerInput::setup(): Using HeartbeatDirectory: " + + heartbeatDirectory + "."); + } + + // optional interval to check for heartbeats + if (config->HasKey("HeartbeatInterval")) { + m_iHeartbeatInterval = (*config)["HeartbeatInterval"].ToInt(); + glass3::util::Logger::log( + "info", + "brokerInput::setup(): Using HeartbeatInterval: " + + std::to_string(m_iHeartbeatInterval) + "."); + } + // set up consumer if (m_Consumer != NULL) { delete (m_Consumer); @@ -135,6 +157,7 @@ bool brokerInput::setup(std::shared_ptr config) { // create new consumer m_Consumer = new hazdevbroker::Consumer(); + m_Consumer->setHeartbeatDirectory(heartbeatDirectory); // set up logging m_Consumer->setLogCallback( @@ -172,8 +195,32 @@ std::string brokerInput::fetchRawData(std::string* pOutType) { *pOutType = std::string(JSON_TYPE); // make sure we have a consumer - if (m_Consumer == NULL) + if (m_Consumer == NULL) { return (""); + } + + // if we are checking heartbeat times + if (!(std::isnan(m_iHeartbeatInterval))) { + // get current time in seconds + int64_t timeNow = std::time(NULL); + + // get last heartbeat time + int64_t lastHB = m_Consumer->getLastHeartbeatTime(); + + // calculate elapsed time + int64_t elapsedTime = timeNow - lastHB; + + // has it been too long since the last heartbeat? + if (elapsedTime > m_iHeartbeatInterval) { + glass3::util::Logger::log("error", + "No Heartbeat Message seen from topic(s) in " + + std::to_string(m_iHeartbeatInterval) + " seconds! (" + + std::to_string(elapsedTime) + ")"); + + // reset last heartbeat time so that we don't fill the log + m_Consumer->setLastHeartbeatTime(timeNow); + } + } // get message from the consumer std::string message = m_Consumer->pollString(100); diff --git a/glass-broker-app/brokerInput/brokerInput.h b/glass-broker-app/brokerInput/brokerInput.h index 8fe66c81..32306e14 100644 --- a/glass-broker-app/brokerInput/brokerInput.h +++ b/glass-broker-app/brokerInput/brokerInput.h @@ -100,6 +100,11 @@ class brokerInput : public glass3::input::Input { * \brief the hazdevbroker consumer object to get messages from kafka */ hazdevbroker::Consumer * m_Consumer; + + /** + * \brief the interval in seconds to expect hazdevbroker heartbeats. + */ + int m_iHeartbeatInterval; }; } // namespace glass3 #endif // BROKERINPUT_H diff --git a/util/include/threadbaseclass.h b/util/include/threadbaseclass.h index c31b78a1..618b13a3 100644 --- a/util/include/threadbaseclass.h +++ b/util/include/threadbaseclass.h @@ -215,11 +215,9 @@ class ThreadBaseClass : public util::BaseClass { * This function retrieves the oldest time any of the the health statuses of * the work threads was updated as healthy by the setThreadHealth function * - * \param oldestThreadId - An optional pointer to a std::thread::id that - * holds the id of the oldest thread getAllLastHealthy checked. * \return A std::time_t containing the last check time */ - std::time_t getAllLastHealthy(std::thread::id* oldestThreadId = NULL); + std::time_t getAllLastHealthy(); /** * \brief ThreadBaseClass work function diff --git a/util/src/threadbaseclass.cpp b/util/src/threadbaseclass.cpp index fa7b0eda..fed4741b 100644 --- a/util/src/threadbaseclass.cpp +++ b/util/src/threadbaseclass.cpp @@ -211,25 +211,14 @@ bool ThreadBaseClass::healthCheck() { + getThreadName() + ")"); return (false); } - std::thread::id oldestThreadId; - int lastCheckInterval = (std::time(nullptr) - - getAllLastHealthy(&oldestThreadId)); + int lastCheckInterval = (std::time(nullptr) - getAllLastHealthy()); if (lastCheckInterval > getHealthCheckInterval()) { - // convert threadid to size_t - // thread id's are not directly printable - // so use the same method spdlog uses since we want this for - // logging anyway - size_t tid = - static_cast(std::hash()(oldestThreadId)); - glass3::util::Logger::log( "error", "ThreadBaseClass::healthCheck():" " lastCheckInterval for at least one thread in " - + getThreadName() + " (" - + std::to_string(tid) - + ") exceeds health check interval ( " + + getThreadName() + " exceeds health check interval ( " + std::to_string(lastCheckInterval) + " > " + std::to_string(getHealthCheckInterval()) + " )"); @@ -304,8 +293,7 @@ void ThreadBaseClass::workLoop() { } // ---------------------------------------------------------getAllLastHealthy -std::time_t ThreadBaseClass::getAllLastHealthy( - std::thread::id* oldestThreadId) { +std::time_t ThreadBaseClass::getAllLastHealthy() { // don't bother if we've not got any threads if (getNumThreads() <= 0) { return (0); @@ -335,11 +323,6 @@ std::time_t ThreadBaseClass::getAllLastHealthy( // Only report the oldest time if (healthTime < oldestTime) { - // remember the oldest thread if we're asked - if(oldestThreadId != NULL) { - *oldestThreadId = StatusItr->first; - } - // remember the oldest time oldestTime = healthTime; } From a7fde175b596723e7f3c5bfaf4a0a435e76d3afb Mon Sep 17 00:00:00 2001 From: John Patton Date: Thu, 15 Nov 2018 13:44:49 -0700 Subject: [PATCH 04/11] updated to use new hazdevbroker heartbeat functionality --- glass-app/fileInput/fileInput.cpp | 5 ++ glass-app/fileInput/fileInput.h | 11 +++ glass-app/fileOutput/fileOutput.cpp | 4 + glass-app/fileOutput/fileOutput.h | 11 +++ .../brokerOutput/brokerOutput.cpp | 27 +++++++ glass-broker-app/brokerOutput/brokerOutput.h | 23 +++++- glass-broker-app/brokerOutput/outputTopic.cpp | 13 ++++ glass-broker-app/brokerOutput/outputTopic.h | 8 ++ output/include/output.h | 20 +++++ output/src/output.cpp | 78 +++++++++++-------- output/tests/output_unittest.cpp | 51 ++++++------ util/include/baseclass.h | 13 ++-- util/include/cache.h | 11 +++ util/include/queue.h | 11 +++ util/include/threadpool.h | 11 +++ util/src/cache.cpp | 5 ++ util/src/queue.cpp | 5 ++ util/src/threadbaseclass.cpp | 2 +- util/src/threadpool.cpp | 4 + 19 files changed, 249 insertions(+), 64 deletions(-) diff --git a/glass-app/fileInput/fileInput.cpp b/glass-app/fileInput/fileInput.cpp index 02696322..4f2de84c 100644 --- a/glass-app/fileInput/fileInput.cpp +++ b/glass-app/fileInput/fileInput.cpp @@ -361,4 +361,9 @@ void fileInput::setShutdownWait(int waitTime) { int fileInput::getShutdownWait() { return (m_iShutdownWait); } + +// ---------------------------------------------------------getMutex +std::mutex & fileInput::getMutex() { + return (m_Mutex); +} } // namespace glass3 diff --git a/glass-app/fileInput/fileInput.h b/glass-app/fileInput/fileInput.h index 60ac98d6..5e5614fe 100644 --- a/glass-app/fileInput/fileInput.h +++ b/glass-app/fileInput/fileInput.h @@ -243,6 +243,17 @@ class fileInput : public glass3::input::Input { * \brief the count of data read from the input file */ int m_iDataCount; + + /** + * \brief Retrieves a reference to the class member containing the mutex + * used to control access to class members + */ + std::mutex & getMutex(); + + /** + * \brief A mutex to control access to class members + */ + std::mutex m_Mutex; }; } // namespace glass3 #endif // FILEINPUT_H diff --git a/glass-app/fileOutput/fileOutput.cpp b/glass-app/fileOutput/fileOutput.cpp index 1ecaf495..e6801ca2 100644 --- a/glass-app/fileOutput/fileOutput.cpp +++ b/glass-app/fileOutput/fileOutput.cpp @@ -288,4 +288,8 @@ bool fileOutput::getTimestampFileName() { return (m_bTimestampFileName); } +// ---------------------------------------------------------getMutex +std::mutex & fileOutput::getMutex() { + return (m_Mutex); +} } // namespace glass3 diff --git a/glass-app/fileOutput/fileOutput.h b/glass-app/fileOutput/fileOutput.h index 2bdebcc9..88bd01c3 100644 --- a/glass-app/fileOutput/fileOutput.h +++ b/glass-app/fileOutput/fileOutput.h @@ -134,6 +134,17 @@ class fileOutput : public glass3::output::output { * an epoch timestamp in the file name. */ std::atomic m_bTimestampFileName; + + /** + * \brief Retrieves a reference to the class member containing the mutex + * used to control access to class members + */ + std::mutex & getMutex(); + + /** + * \brief A mutex to control access to class members + */ + std::mutex m_Mutex; }; } // namespace glass3 #endif // FILEOUTPUT_H diff --git a/glass-broker-app/brokerOutput/brokerOutput.cpp b/glass-broker-app/brokerOutput/brokerOutput.cpp index c5b44c63..6208b2c8 100644 --- a/glass-broker-app/brokerOutput/brokerOutput.cpp +++ b/glass-broker-app/brokerOutput/brokerOutput.cpp @@ -14,6 +14,7 @@ #include #include #include +#include #include "outputTopic.h" @@ -158,6 +159,16 @@ bool brokerOutput::setup(std::shared_ptr config) { + topicConfig + "."); } + // heartbeat interval + int64_t heartbeatInterval = std::numeric_limits::quiet_NaN(); + if (config->HasKey("HeartbeatInterval")) { + heartbeatInterval = (*config)["HeartbeatInterval"].ToInt(); + glass3::util::Logger::log( + "info", + "brokerOutput::setup(): Using HeartbeatInterval: " + + std::to_string(heartbeatInterval) + "."); + } + // clear out any old producer if (m_OutputProducer != NULL) { delete (m_OutputProducer); @@ -165,6 +176,7 @@ bool brokerOutput::setup(std::shared_ptr config) { // create new producer m_OutputProducer = new hazdevbroker::Producer(); + m_OutputProducer->setHeartbeatInterval(heartbeatInterval); // set up logging m_OutputProducer->setLogCallback( @@ -441,6 +453,16 @@ void brokerOutput::sendToOutputTopics(const std::string &message) { } } +// ---------------------------------------------------------sendHeartbeat +void brokerOutput::sendHeartbeat() { + // send heartbeats to each topic + // for each topic + for (auto aTopic : m_vOutputTopics) { + // send it + aTopic->heartbeat(); + } +} + // ---------------------------------------------------------logProducer void brokerOutput::logProducer(const std::string &message) { // log whatever the producer wanted us to log @@ -459,4 +481,9 @@ const std::string brokerOutput::getStationFileName() { std::lock_guard guard(getMutex()); return (m_sStationFileName); } + +// ---------------------------------------------------------getMutex +std::mutex & brokerOutput::getMutex() { + return (m_Mutex); +} } // namespace glass3 diff --git a/glass-broker-app/brokerOutput/brokerOutput.h b/glass-broker-app/brokerOutput/brokerOutput.h index 009b4e5c..feb98ea5 100644 --- a/glass-broker-app/brokerOutput/brokerOutput.h +++ b/glass-broker-app/brokerOutput/brokerOutput.h @@ -93,8 +93,8 @@ class brokerOutput : public glass3::output::output { /** * \brief the function for producer logging - * This function is used by the HazDevBroker producer to log messages - * with neic-glass3's logging system. + * This function is used by the HazDevBroker producer to log messages + * with neic-glass3's logging system. * \param message - A string containing the logging message */ void logProducer(const std::string &message); @@ -118,6 +118,14 @@ class brokerOutput : public glass3::output::output { */ void sendToOutputTopics(const std::string &message); + /** + * \brief Send heartbeats + * + * This function is used to optionally send heartbeat messages to kafka via + * the hazdev-broker producer. + */ + void sendHeartbeat() override; + private: /** * \brief The hazdevbroker producer used to send messages to kafka @@ -138,6 +146,17 @@ class brokerOutput : public glass3::output::output { * \brief the std::string containing the station file name. */ std::string m_sStationFileName; + + /** + * \brief Retrieves a reference to the class member containing the mutex + * used to control access to class members + */ + std::mutex & getMutex(); + + /** + * \brief A mutex to control access to class members + */ + std::mutex m_Mutex; }; } // namespace glass3 #endif // BROKEROUTPUT_H diff --git a/glass-broker-app/brokerOutput/outputTopic.cpp b/glass-broker-app/brokerOutput/outputTopic.cpp index dcd42e88..5eadc330 100644 --- a/glass-broker-app/brokerOutput/outputTopic.cpp +++ b/glass-broker-app/brokerOutput/outputTopic.cpp @@ -188,4 +188,17 @@ void outputTopic::send(const std::string &message) { // send it m_OutputProducer->sendString(m_OutputTopic, message); } + +// ---------------------------------------------------------heartbeat +void outputTopic::heartbeat() { + if (m_OutputProducer == NULL) { + return; + } + if (m_OutputTopic == NULL) { + return; + } + + // send it + m_OutputProducer->sendHeartbeat(m_OutputTopic); +} } // namespace glass3 diff --git a/glass-broker-app/brokerOutput/outputTopic.h b/glass-broker-app/brokerOutput/outputTopic.h index c687efec..b18cd1f9 100644 --- a/glass-broker-app/brokerOutput/outputTopic.h +++ b/glass-broker-app/brokerOutput/outputTopic.h @@ -79,6 +79,14 @@ class outputTopic { */ void send(const std::string &message); + /** + * \brief outputTopic heartbeat function + * The function sends generates a heartbeat using the producer pointer and + * the configured topic. Note that the producer takes care of deciding + * whether it has been long enough to generate a heartbeat + */ + void heartbeat(); + protected: /** * \brief the top of the bounds rectangle in degrees of latitude. diff --git a/output/include/output.h b/output/include/output.h index 69939de7..33ca32ce 100644 --- a/output/include/output.h +++ b/output/include/output.h @@ -460,6 +460,14 @@ class output : public glass3::util::iOutput, virtual void sendOutput(const std::string &type, const std::string &id, const std::string &message) = 0; + /** + * \brief Send heartbeats + * + * This function is optionally used by an overriding class to implement a + * specific heartbeat method, i.e. to disk, memory, socket, kafka, etc. + */ + virtual void sendHeartbeat(); + private: /** * \brief A std::vector of integers containing the times in seconds @@ -579,6 +587,18 @@ class output : public glass3::util::iOutput, * perform output. */ glass3::util::ThreadPool *m_ThreadPool; + + private: + /** + * \brief Retrieves a reference to the class member containing the mutex + * used to control access to class members + */ + std::mutex & getMutex(); + + /** + * \brief A mutex to control access to class members + */ + std::mutex m_Mutex; }; } // namespace output } // namespace glass3 diff --git a/output/src/output.cpp b/output/src/output.cpp index 9866ba50..f08230c4 100644 --- a/output/src/output.cpp +++ b/output/src/output.cpp @@ -64,37 +64,43 @@ output::output() // ---------------------------------------------------------~output output::~output() { - glass3::util::Logger::log("debug", "output::~output(): Destruction."); - - // stop the output threads - stop(); - - // clear everything - clear(); - - // cleanup - getMutex().lock(); - // cppcheck-suppress nullPointerRedundantCheck - if (m_TrackingCache != NULL) { - m_TrackingCache->clear(); - delete (m_TrackingCache); - m_TrackingCache = NULL; - } +glass3::util::Logger::log("debug", "output::~output(): Destruction."); + try { + // stop the output threads + stop(); + + // clear everything + clear(); + + // cleanup + getMutex().lock(); + // cppcheck-suppress nullPointerRedundantCheck + if (m_TrackingCache != NULL) { + m_TrackingCache->clear(); + delete (m_TrackingCache); + m_TrackingCache = NULL; + } - // cppcheck-suppress nullPointerRedundantCheck - if (m_OutputQueue != NULL) { - m_OutputQueue->clear(); - delete (m_OutputQueue); - m_OutputQueue = NULL; - } + // cppcheck-suppress nullPointerRedundantCheck + if (m_OutputQueue != NULL) { + m_OutputQueue->clear(); + delete (m_OutputQueue); + m_OutputQueue = NULL; + } - // cppcheck-suppress nullPointerRedundantCheck - if (m_LookupQueue != NULL) { - m_LookupQueue->clear(); - delete (m_LookupQueue); - m_LookupQueue = NULL; + // cppcheck-suppress nullPointerRedundantCheck + if (m_LookupQueue != NULL) { + m_LookupQueue->clear(); + delete (m_LookupQueue); + m_LookupQueue = NULL; + } + getMutex().unlock(); + } catch (const std::exception& e) { + glass3::util::Logger::log( + "warning", + "output::~output()(): Exception " + + std::string(e.what())); } - getMutex().unlock(); } // configuration @@ -234,9 +240,6 @@ void output::clear() { setPubOnExpiration(false); clearPubTimes(); setSiteListRequestInterval(-1); - - // finally do baseclass clear - glass3::util::BaseClass::clear(); } // ---------------------------------------------------------sendToOutput @@ -625,6 +628,10 @@ glass3::util::WorkState output::work() { } } + // send any optional hearbeat messages (implementation specific via + // overriding sendHeartbeat) + m_ThreadPool->addJob(std::bind(&output::sendHeartbeat, this)); + // null check if ((m_OutputQueue == NULL) || (m_LookupQueue == NULL)) { // no message queues means we've got big problems @@ -1182,6 +1189,10 @@ bool output::isDataFinished(std::shared_ptr data) { return (true); } +// ---------------------------------------------------------sendHeartbeat +void output::sendHeartbeat() { +} + // ---------------------------------------------------------setSiteListRequestInterval void output::setSiteListRequestInterval(int delay) { m_iSiteListRequestInterval = delay; @@ -1241,5 +1252,10 @@ void output::clearPubTimes() { std::lock_guard guard(getMutex()); m_PublicationTimes.clear(); } + +// ---------------------------------------------------------getMutex +std::mutex & output::getMutex() { + return (m_Mutex); +} } // namespace output } // namespace glass3 diff --git a/output/tests/output_unittest.cpp b/output/tests/output_unittest.cpp index 94ce947c..c6bed245 100644 --- a/output/tests/output_unittest.cpp +++ b/output/tests/output_unittest.cpp @@ -261,13 +261,16 @@ TEST(Output, Construction) { OutputStub outputObject; // assert that this is an input thread - ASSERT_STREQ(outputObject.getThreadName().c_str(), "output")<< "check output thread name"; + ASSERT_STREQ(outputObject.getThreadName().c_str(), "output")<< + "check output thread name"; // assert the thread sleeptime - ASSERT_EQ(outputObject.getSleepTime(), SLEEPTIME)<< "check output thread sleep time"; + ASSERT_EQ(outputObject.getSleepTime(), SLEEPTIME)<< + "check output thread sleep time"; // assert report interval - ASSERT_EQ(outputObject.getReportInterval(), REPORTINTERVAL)<< "output thread report interval"; + ASSERT_EQ(outputObject.getReportInterval(), REPORTINTERVAL)<< + "output thread report interval"; // assert class is not set up ASSERT_FALSE(outputObject.getSetup())<< "output thread is not set up"; @@ -331,7 +334,8 @@ TEST(Output, Configuration) { ASSERT_EQ(pubTimes[1], PUBDELAY2)<< "second pub time correct"; // assert pub on expire - ASSERT_EQ(outputObject->getPubOnExpiration(), PUBONEXPIRE)<< "pub on expire correct"; + ASSERT_EQ(outputObject->getPubOnExpiration(), PUBONEXPIRE)<< + "pub on expire correct"; // assert site list delay ASSERT_EQ(outputObject->getSiteListRequestInterval(), SITELISTDELAY)<< @@ -349,8 +353,8 @@ TEST(Output, Configuration) { } TEST(Output, ThreadTests) { - //glass3::util::log_init("outputtest", spdlog::level::debug, std::string(TESTPATH), - // true); + // glass3::util::log_init("outputtest", spdlog::level::debug, + // std::string(TESTPATH),true); OutputStub* outputObject = new OutputStub(); @@ -477,8 +481,8 @@ TEST(Output, TrackingTests) { } TEST(Output, OutputTest) { - //glass3::util::log_init("outputtest", spdlog::level::debug, std::string(TESTPATH), - // true); + // glass3::util::log_init("outputtest", spdlog::level::debug, + // std::string(TESTPATH), true); OutputStub* outputObject = new OutputStub(); @@ -532,8 +536,8 @@ TEST(Output, OutputTest) { } TEST(Output, UpdateTest) { - //glass3::util::log_init("outputtest", spdlog::level::debug, std::string(TESTPATH), - // true); + // glass3::util::log_init("outputtest", spdlog::level::debug, + // std::string(TESTPATH), true); OutputStub* outputObject = new OutputStub(); @@ -637,12 +641,11 @@ TEST(Output, UpdateTest) { // assert that second update was not created ASSERT_EQ(outputObject->messages.size(), 2)<< "update2 not created"; - } TEST(Output, CancelTest) { - //glass3::util::log_init("outputtest", spdlog::level::debug, std::string(TESTPATH), - // true); + // glass3::util::log_init("outputtest", spdlog::level::debug, + // std::string(TESTPATH), true); OutputStub* outputObject = new OutputStub(); @@ -694,8 +697,8 @@ TEST(Output, CancelTest) { } TEST(Output, RetractTest) { - //glass3::util::log_init("outputtest", spdlog::level::debug, std::string(TESTPATH), - // true); + // glass3::util::log_init("outputtest", spdlog::level::debug, + // std::string(TESTPATH), true); OutputStub* outputObject = new OutputStub(); @@ -753,8 +756,8 @@ TEST(Output, RetractTest) { } TEST(Output, ExpireTest) { - //glass3::util::log_init("outputtest", spdlog::level::debug, std::string(TESTPATH), - // true); + // glass3::util::log_init("outputtest", spdlog::level::debug, + // std::string(TESTPATH), true); OutputStub* outputObject = new OutputStub(); @@ -799,7 +802,7 @@ TEST(Output, ExpireTest) { // assert that output was created ASSERT_EQ(outputObject->messages.size(), 1)<< "output created"; - //remove output for update + // remove output for update // std::remove(output2file.c_str()); std::string event2expirefile = std::string(TESTPATH) + "/" @@ -831,8 +834,8 @@ TEST(Output, ExpireTest) { } TEST(Output, StationRequestTest) { - // glass3::util::log_init("outputtest", spdlog::level::debug, std::string(TESTPATH), - // true); + // glass3::util::log_init("outputtest", spdlog::level::debug, + // std::string(TESTPATH), true); OutputStub* outputObject = new OutputStub(); @@ -857,8 +860,8 @@ TEST(Output, StationRequestTest) { std::string stationrequestfile = std::string(TESTPATH) + "/" + std::string(SITEREQUESTFILE); -// std::string hypo2file = std::string(TESTPATH) + "/" -// + std::string(HYPO2FILE); + // std::string hypo2file = std::string(TESTPATH) + "/" + // + std::string(HYPO2FILE); std::shared_ptr stationrequest = GetDataFromFile( stationrequestfile); @@ -908,8 +911,8 @@ TEST(Output, StationListTest) { } TEST(Output, FailTests) { - //glass3::util::log_init("outputtest", spdlog::level::debug, std::string(TESTPATH), - // true); + // glass3::util::log_init("outputtest", spdlog::level::debug, + // std::string(TESTPATH), true); OutputStub* outputObject = new OutputStub(); // create configfilestring diff --git a/util/include/baseclass.h b/util/include/baseclass.h index 253a6e85..bdb960f0 100644 --- a/util/include/baseclass.h +++ b/util/include/baseclass.h @@ -122,12 +122,6 @@ class BaseClass { void setDefaultAuthor(const std::string &author); protected: - /** - * \brief Retrieves a reference to the class member containing the mutex - * used to control access to class members - */ - std::mutex & getMutex(); - /** * \brief A shared pointer to the json::Object that holds the configuration */ @@ -151,6 +145,13 @@ class BaseClass { */ std::string m_DefaultAuthor; + private: + /** + * \brief Retrieves a reference to the class member containing the mutex + * used to control access to class members + */ + std::mutex & getMutex(); + /** * \brief A mutex to control access to BaseClass members */ diff --git a/util/include/cache.h b/util/include/cache.h index 54bfcfc6..433abc6d 100644 --- a/util/include/cache.h +++ b/util/include/cache.h @@ -132,6 +132,17 @@ class Cache : public util::BaseClass { * getNextFromCache() */ std::map>::iterator m_CacheDumpItr; + + /** + * \brief Retrieves a reference to the class member containing the mutex + * used to control access to class members + */ + std::mutex & getMutex(); + + /** + * \brief A mutex to control access to class members + */ + std::mutex m_Mutex; }; } // namespace util } // namespace glass3 diff --git a/util/include/queue.h b/util/include/queue.h index f299c34a..1c930f61 100644 --- a/util/include/queue.h +++ b/util/include/queue.h @@ -82,6 +82,17 @@ class Queue : public util::BaseClass { * \brief the std::Queue used to store the Queue */ std::queue> m_DataQueue; + + /** + * \brief Retrieves a reference to the class member containing the mutex + * used to control access to class members + */ + std::mutex & getMutex(); + + /** + * \brief A mutex to control access to class members + */ + std::mutex m_Mutex; }; } // namespace util } // namespace glass3 diff --git a/util/include/threadpool.h b/util/include/threadpool.h index 83fa818c..061d3353 100644 --- a/util/include/threadpool.h +++ b/util/include/threadpool.h @@ -104,6 +104,17 @@ class ThreadPool : public util::ThreadBaseClass { * functions to process */ std::queue> m_JobQueue; + + /** + * \brief Retrieves a reference to the class member containing the mutex + * used to control access to class members + */ + std::mutex & getMutex(); + + /** + * \brief A mutex to control access to class members + */ + std::mutex m_Mutex; }; } // namespace util } // namespace glass3 diff --git a/util/src/cache.cpp b/util/src/cache.cpp index 8b33629b..509f7bf4 100644 --- a/util/src/cache.cpp +++ b/util/src/cache.cpp @@ -188,5 +188,10 @@ int Cache::size() { return (cachesize); } + +// ---------------------------------------------------------getMutex +std::mutex & Cache::getMutex() { + return (m_Mutex); +} } // namespace util } // namespace glass3 diff --git a/util/src/queue.cpp b/util/src/queue.cpp index 7122f24b..819d1a4f 100644 --- a/util/src/queue.cpp +++ b/util/src/queue.cpp @@ -72,6 +72,11 @@ int Queue::size() { return (queuesize); } + +// ---------------------------------------------------------getMutex +std::mutex & Queue::getMutex() { + return (m_Mutex); +} } // namespace util } // namespace glass3 diff --git a/util/src/threadbaseclass.cpp b/util/src/threadbaseclass.cpp index fed4741b..897a81e5 100644 --- a/util/src/threadbaseclass.cpp +++ b/util/src/threadbaseclass.cpp @@ -134,7 +134,7 @@ bool ThreadBaseClass::stop() { for (int i = 0; i < m_WorkThreads.size(); i++) { try { m_WorkThreads[i].join(); - } catch (const std::system_error& e) { + } catch (const std::exception& e) { glass3::util::Logger::log( "warning", "ThreadBaseClass::stop(): Exception " diff --git a/util/src/threadpool.cpp b/util/src/threadpool.cpp index def9ccb4..a8d67470 100644 --- a/util/src/threadpool.cpp +++ b/util/src/threadpool.cpp @@ -85,5 +85,9 @@ int ThreadPool::getJobQueueSize() { return (queuesize); } +// ---------------------------------------------------------getMutex +std::mutex & ThreadPool::getMutex() { + return (m_Mutex); +} } // namespace util } // namespace glass3 From ca907666fa26aa1a4830eae155aad93938f2274f Mon Sep 17 00:00:00 2001 From: John Patton Date: Thu, 15 Nov 2018 14:07:48 -0700 Subject: [PATCH 05/11] fix for always publishing heartbeats problem --- glass-broker-app/brokerInput/brokerInput.cpp | 40 ++++++++++--------- glass-broker-app/brokerInput/brokerInput.h | 2 +- .../brokerOutput/brokerOutput.cpp | 22 +++++----- 3 files changed, 33 insertions(+), 31 deletions(-) diff --git a/glass-broker-app/brokerInput/brokerInput.cpp b/glass-broker-app/brokerInput/brokerInput.cpp index 2da1c5af..0fe893f2 100644 --- a/glass-broker-app/brokerInput/brokerInput.cpp +++ b/glass-broker-app/brokerInput/brokerInput.cpp @@ -19,7 +19,7 @@ brokerInput::brokerInput() "brokerInput::brokerInput(): Construction."); m_Consumer = NULL; - m_iHeartbeatInterval = std::numeric_limits::quiet_NaN(); + m_iBrokerHeartbeatInterval = std::numeric_limits::quiet_NaN(); clear(); } @@ -28,7 +28,7 @@ brokerInput::brokerInput() brokerInput::brokerInput(const std::shared_ptr &config) : glass3::input::Input() { m_Consumer = NULL; - m_iHeartbeatInterval = std::numeric_limits::quiet_NaN(); + m_iBrokerHeartbeatInterval = std::numeric_limits::quiet_NaN(); // do basic construction clear(); @@ -131,10 +131,20 @@ bool brokerInput::setup(std::shared_ptr config) { } } + // set up consumer + if (m_Consumer != NULL) { + delete (m_Consumer); + } + + // create new consumer + m_Consumer = new hazdevbroker::Consumer(); + // optional directory to write heartbeat files to std::string heartbeatDirectory = ""; if (config->HasKey("HeartbeatDirectory")) { - heartbeatDirectory = (*config)["HeartbeatDirectory"].ToString(); + std::string heartbeatDirectory = + (*config)["HeartbeatDirectory"].ToString(); + m_Consumer->setHeartbeatDirectory(heartbeatDirectory); glass3::util::Logger::log( "info", "brokerInput::setup(): Using HeartbeatDirectory: " @@ -142,23 +152,15 @@ bool brokerInput::setup(std::shared_ptr config) { } // optional interval to check for heartbeats - if (config->HasKey("HeartbeatInterval")) { - m_iHeartbeatInterval = (*config)["HeartbeatInterval"].ToInt(); + if (config->HasKey("BrokerHeartbeatInterval")) { + m_iBrokerHeartbeatInterval = + (*config)["BrokerHeartbeatInterval"].ToInt(); glass3::util::Logger::log( "info", - "brokerInput::setup(): Using HeartbeatInterval: " - + std::to_string(m_iHeartbeatInterval) + "."); + "brokerInput::setup(): Using BrokerHeartbeatInterval: " + + std::to_string(m_iBrokerHeartbeatInterval) + "."); } - // set up consumer - if (m_Consumer != NULL) { - delete (m_Consumer); - } - - // create new consumer - m_Consumer = new hazdevbroker::Consumer(); - m_Consumer->setHeartbeatDirectory(heartbeatDirectory); - // set up logging m_Consumer->setLogCallback( std::bind(&brokerInput::logConsumer, this, std::placeholders::_1)); @@ -200,7 +202,7 @@ std::string brokerInput::fetchRawData(std::string* pOutType) { } // if we are checking heartbeat times - if (!(std::isnan(m_iHeartbeatInterval))) { + if (!(std::isnan(m_iBrokerHeartbeatInterval))) { // get current time in seconds int64_t timeNow = std::time(NULL); @@ -211,10 +213,10 @@ std::string brokerInput::fetchRawData(std::string* pOutType) { int64_t elapsedTime = timeNow - lastHB; // has it been too long since the last heartbeat? - if (elapsedTime > m_iHeartbeatInterval) { + if (elapsedTime > m_iBrokerHeartbeatInterval) { glass3::util::Logger::log("error", "No Heartbeat Message seen from topic(s) in " + - std::to_string(m_iHeartbeatInterval) + " seconds! (" + + std::to_string(m_iBrokerHeartbeatInterval) + " seconds! (" + std::to_string(elapsedTime) + ")"); // reset last heartbeat time so that we don't fill the log diff --git a/glass-broker-app/brokerInput/brokerInput.h b/glass-broker-app/brokerInput/brokerInput.h index 32306e14..5a26bc7b 100644 --- a/glass-broker-app/brokerInput/brokerInput.h +++ b/glass-broker-app/brokerInput/brokerInput.h @@ -104,7 +104,7 @@ class brokerInput : public glass3::input::Input { /** * \brief the interval in seconds to expect hazdevbroker heartbeats. */ - int m_iHeartbeatInterval; + int m_iBrokerHeartbeatInterval; }; } // namespace glass3 #endif // BROKERINPUT_H diff --git a/glass-broker-app/brokerOutput/brokerOutput.cpp b/glass-broker-app/brokerOutput/brokerOutput.cpp index 6208b2c8..0e391a6c 100644 --- a/glass-broker-app/brokerOutput/brokerOutput.cpp +++ b/glass-broker-app/brokerOutput/brokerOutput.cpp @@ -159,16 +159,6 @@ bool brokerOutput::setup(std::shared_ptr config) { + topicConfig + "."); } - // heartbeat interval - int64_t heartbeatInterval = std::numeric_limits::quiet_NaN(); - if (config->HasKey("HeartbeatInterval")) { - heartbeatInterval = (*config)["HeartbeatInterval"].ToInt(); - glass3::util::Logger::log( - "info", - "brokerOutput::setup(): Using HeartbeatInterval: " - + std::to_string(heartbeatInterval) + "."); - } - // clear out any old producer if (m_OutputProducer != NULL) { delete (m_OutputProducer); @@ -176,7 +166,17 @@ bool brokerOutput::setup(std::shared_ptr config) { // create new producer m_OutputProducer = new hazdevbroker::Producer(); - m_OutputProducer->setHeartbeatInterval(heartbeatInterval); + + // heartbeat interval + if (config->HasKey("BrokerHeartbeatInterval")) { + int64_t brokerHeartbeatInterval = + (*config)["BrokerHeartbeatInterval"].ToInt(); + m_OutputProducer->setHeartbeatInterval(brokerHeartbeatInterval); + glass3::util::Logger::log( + "info", + "brokerOutput::setup(): Using BrokerHeartbeatInterval: " + + std::to_string(brokerHeartbeatInterval) + "."); + } // set up logging m_OutputProducer->setLogCallback( From a1f48cde0453ad6d1ece736c71d37e799c4d8bcb Mon Sep 17 00:00:00 2001 From: John Patton Date: Thu, 15 Nov 2018 14:13:27 -0700 Subject: [PATCH 06/11] temporarially disabled auto broker heartbeats --- glass-broker-app/brokerOutput/brokerOutput.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/glass-broker-app/brokerOutput/brokerOutput.cpp b/glass-broker-app/brokerOutput/brokerOutput.cpp index 0e391a6c..0670da7d 100644 --- a/glass-broker-app/brokerOutput/brokerOutput.cpp +++ b/glass-broker-app/brokerOutput/brokerOutput.cpp @@ -457,10 +457,10 @@ void brokerOutput::sendToOutputTopics(const std::string &message) { void brokerOutput::sendHeartbeat() { // send heartbeats to each topic // for each topic - for (auto aTopic : m_vOutputTopics) { - // send it - aTopic->heartbeat(); - } + // for (auto aTopic : m_vOutputTopics) { + // send it + // aTopic->heartbeat(); + // } } // ---------------------------------------------------------logProducer From 5abacf97f375705bbb9798e43fcc26818f8cbb99 Mon Sep 17 00:00:00 2001 From: John Patton Date: Thu, 15 Nov 2018 15:43:21 -0700 Subject: [PATCH 07/11] switched to fixed hazdevbroker --- CMakeLists.txt | 4 ++-- glass-broker-app/brokerOutput/brokerOutput.cpp | 10 +++++----- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 38dc07c4..9ff2924b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -179,7 +179,7 @@ if (BUILD_GLASS-BROKER-APP) if (GIT_CLONE_PUBLIC) ExternalProject_Add(HazdevBroker GIT_REPOSITORY https://github.com/usgs/hazdev-broker.git - GIT_TAG v0.2.0 + GIT_TAG v0.2.1 TIMEOUT 10 CMAKE_ARGS -DCMAKE_INSTALL_PREFIX=${INSTALL_LOCATION} -DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE} @@ -191,7 +191,7 @@ if (BUILD_GLASS-BROKER-APP) else() ExternalProject_Add(HazdevBroker GIT_REPOSITORY git@github.com:usgs/hazdev-broker.git - GIT_TAG v0.2.0 + GIT_TAG v0.2.1 TIMEOUT 10 CMAKE_ARGS -DCMAKE_INSTALL_PREFIX=${INSTALL_LOCATION} -DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE} diff --git a/glass-broker-app/brokerOutput/brokerOutput.cpp b/glass-broker-app/brokerOutput/brokerOutput.cpp index 0670da7d..a30443dd 100644 --- a/glass-broker-app/brokerOutput/brokerOutput.cpp +++ b/glass-broker-app/brokerOutput/brokerOutput.cpp @@ -169,7 +169,7 @@ bool brokerOutput::setup(std::shared_ptr config) { // heartbeat interval if (config->HasKey("BrokerHeartbeatInterval")) { - int64_t brokerHeartbeatInterval = + int brokerHeartbeatInterval = (*config)["BrokerHeartbeatInterval"].ToInt(); m_OutputProducer->setHeartbeatInterval(brokerHeartbeatInterval); glass3::util::Logger::log( @@ -457,10 +457,10 @@ void brokerOutput::sendToOutputTopics(const std::string &message) { void brokerOutput::sendHeartbeat() { // send heartbeats to each topic // for each topic - // for (auto aTopic : m_vOutputTopics) { - // send it - // aTopic->heartbeat(); - // } + for (auto aTopic : m_vOutputTopics) { + // send it + aTopic->heartbeat(); + } } // ---------------------------------------------------------logProducer From 3d1d041e0ffbbe0341c7198b2c35beb7700de16f Mon Sep 17 00:00:00 2001 From: John Patton Date: Fri, 16 Nov 2018 15:05:40 -0700 Subject: [PATCH 08/11] work to fix random mutex crash during object destruction --- glass-app/fileInput/fileInput.cpp | 4 - glass-app/fileOutput/fileOutput.cpp | 5 -- glass-app/tests/input_unittest.cpp | 1 + glass-app/tests/output_unittest.cpp | 1 + glass-broker-app/brokerInput/brokerInput.cpp | 7 +- .../brokerOutput/brokerOutput.cpp | 19 ++--- glasscore/glasslib/src/Correlation.cpp | 1 - glasscore/glasslib/src/CorrelationList.cpp | 1 - glasscore/glasslib/src/Glass.cpp | 2 - glasscore/glasslib/src/Hypo.cpp | 1 - glasscore/glasslib/src/HypoList.cpp | 2 - glasscore/glasslib/src/Node.cpp | 1 - glasscore/glasslib/src/Pick.cpp | 1 - glasscore/glasslib/src/PickList.cpp | 12 ++- glasscore/glasslib/src/Site.cpp | 1 - glasscore/glasslib/src/SiteList.cpp | 1 - glasscore/glasslib/src/Trigger.cpp | 1 - glasscore/glasslib/src/Web.cpp | 1 - glasscore/glasslib/src/WebList.cpp | 1 - glasscore/tests/web_unittest.cpp | 2 +- input/src/input.cpp | 14 ++-- input/tests/input_unittest.cpp | 8 ++ output/src/output.cpp | 59 ++++++-------- output/tests/output_unittest.cpp | 20 +++++ process/src/associator.cpp | 8 +- util/include/threadbaseclass.h | 2 + util/src/baseclass.cpp | 1 - util/src/cache.cpp | 1 - util/src/queue.cpp | 1 - util/src/threadbaseclass.cpp | 81 ++++++++++--------- util/src/threadpool.cpp | 2 - 31 files changed, 124 insertions(+), 138 deletions(-) diff --git a/glass-app/fileInput/fileInput.cpp b/glass-app/fileInput/fileInput.cpp index 4f2de84c..e187a6ed 100644 --- a/glass-app/fileInput/fileInput.cpp +++ b/glass-app/fileInput/fileInput.cpp @@ -40,10 +40,6 @@ fileInput::fileInput(const std::shared_ptr &config) // ---------------------------------------------------------~fileInput fileInput::~fileInput() { - glass3::util::Logger::log("debug", "fileInput::~fileInput(): Destruction."); - - // stop the input thread - stop(); } // ---------------------------------------------------------setup diff --git a/glass-app/fileOutput/fileOutput.cpp b/glass-app/fileOutput/fileOutput.cpp index e6801ca2..1f27700f 100644 --- a/glass-app/fileOutput/fileOutput.cpp +++ b/glass-app/fileOutput/fileOutput.cpp @@ -42,11 +42,6 @@ fileOutput::fileOutput(const std::shared_ptr &config) // ---------------------------------------------------------~fileOutput fileOutput::~fileOutput() { - glass3::util::Logger::log("debug", - "fileOutput::~fileOutput(): Destruction."); - - // stop the input thread - stop(); } // ---------------------------------------------------------setup diff --git a/glass-app/tests/input_unittest.cpp b/glass-app/tests/input_unittest.cpp index 6ac3e713..16b12e4a 100644 --- a/glass-app/tests/input_unittest.cpp +++ b/glass-app/tests/input_unittest.cpp @@ -117,6 +117,7 @@ class InputTest : public ::testing::Test { #endif // cleanup input thread + InputThread->stop(); delete (InputThread); if (InputConfig != NULL) delete (InputConfig); diff --git a/glass-app/tests/output_unittest.cpp b/glass-app/tests/output_unittest.cpp index a4d7d237..71cadedd 100644 --- a/glass-app/tests/output_unittest.cpp +++ b/glass-app/tests/output_unittest.cpp @@ -463,6 +463,7 @@ TEST_F(OutputTest, Output) { // assert that the file is not there ASSERT_TRUE(std::ifstream(retract3file).good()) << "retract output file created"; + OutputThread->stop(); } /* diff --git a/glass-broker-app/brokerInput/brokerInput.cpp b/glass-broker-app/brokerInput/brokerInput.cpp index 0fe893f2..caf439ed 100644 --- a/glass-broker-app/brokerInput/brokerInput.cpp +++ b/glass-broker-app/brokerInput/brokerInput.cpp @@ -42,12 +42,7 @@ brokerInput::brokerInput(const std::shared_ptr &config) // ---------------------------------------------------------~brokerInput brokerInput::~brokerInput() { - glass3::util::Logger::log("debug", - "brokerInput::~brokerInput(): Destruction."); - - // stop the brokerInput thread - stop(); - + // cleanup if (m_Consumer != NULL) { delete (m_Consumer); } diff --git a/glass-broker-app/brokerOutput/brokerOutput.cpp b/glass-broker-app/brokerOutput/brokerOutput.cpp index a30443dd..09e90277 100644 --- a/glass-broker-app/brokerOutput/brokerOutput.cpp +++ b/glass-broker-app/brokerOutput/brokerOutput.cpp @@ -54,11 +54,13 @@ brokerOutput::brokerOutput(const std::shared_ptr &config) // ---------------------------------------------------------~brokerOutput brokerOutput::~brokerOutput() { - glass3::util::Logger::log("debug", - "brokerOutput::~brokerOutput(): Destruction."); - - // stop the input thread - stop(); + // cleanup + for (auto aTopic : m_vOutputTopics) { + if (aTopic != NULL) { + delete (aTopic); + } + } + m_vOutputTopics.clear(); if(m_OutputProducer != NULL) { delete(m_OutputProducer); @@ -67,13 +69,6 @@ brokerOutput::~brokerOutput() { if(m_StationRequestTopic != NULL) { delete(m_StationRequestTopic); } - - for (auto aTopic : m_vOutputTopics) { - if (aTopic != NULL) { - delete (aTopic); - } - } - m_vOutputTopics.clear(); } // ---------------------------------------------------------setup diff --git a/glasscore/glasslib/src/Correlation.cpp b/glasscore/glasslib/src/Correlation.cpp index 10b91aa4..7185a073 100644 --- a/glasscore/glasslib/src/Correlation.cpp +++ b/glasscore/glasslib/src/Correlation.cpp @@ -300,7 +300,6 @@ CCorrelation::CCorrelation(std::shared_ptr correlation, // ---------------------------------------------------------~CCorrelation CCorrelation::~CCorrelation() { - clear(); } // ---------------------------------------------------------clear diff --git a/glasscore/glasslib/src/CorrelationList.cpp b/glasscore/glasslib/src/CorrelationList.cpp index 2a7d9c9b..b3a5a3b7 100644 --- a/glasscore/glasslib/src/CorrelationList.cpp +++ b/glasscore/glasslib/src/CorrelationList.cpp @@ -30,7 +30,6 @@ CCorrelationList::CCorrelationList() { // ---------------------------------------------------------~CCorrelationList CCorrelationList::~CCorrelationList() { - clear(); } // ---------------------------------------------------------clear diff --git a/glasscore/glasslib/src/Glass.cpp b/glasscore/glasslib/src/Glass.cpp index d6592c34..7dc4d50a 100644 --- a/glasscore/glasslib/src/Glass.cpp +++ b/glasscore/glasslib/src/Glass.cpp @@ -126,8 +126,6 @@ CGlass::~CGlass() { if (m_pDetectionProcessor) { delete (m_pDetectionProcessor); } - - clear(); } // -------------------------------------------------------receiveExternalMessage diff --git a/glasscore/glasslib/src/Hypo.cpp b/glasscore/glasslib/src/Hypo.cpp index e418173e..4f0ddb09 100644 --- a/glasscore/glasslib/src/Hypo.cpp +++ b/glasscore/glasslib/src/Hypo.cpp @@ -308,7 +308,6 @@ CHypo::CHypo(std::shared_ptr corr, // ---------------------------------------------------------~CHypo CHypo::~CHypo() { - clear(); } // ------------------------------------------------------addCorrelationReference diff --git a/glasscore/glasslib/src/HypoList.cpp b/glasscore/glasslib/src/HypoList.cpp index 6ec17e1b..73fff5e9 100644 --- a/glasscore/glasslib/src/HypoList.cpp +++ b/glasscore/glasslib/src/HypoList.cpp @@ -43,8 +43,6 @@ CHypoList::CHypoList(int numThreads, int sleepTime, int checkInterval) // ---------------------------------------------------------~CHypoList CHypoList::~CHypoList() { - // clean up everything else - clear(); } // ---------------------------------------------------------addHypo diff --git a/glasscore/glasslib/src/Node.cpp b/glasscore/glasslib/src/Node.cpp index e1b42ac8..7df21e85 100644 --- a/glasscore/glasslib/src/Node.cpp +++ b/glasscore/glasslib/src/Node.cpp @@ -62,7 +62,6 @@ CNode::CNode(std::string name, double lat, double lon, double z, // ---------------------------------------------------------~CNode CNode::~CNode() { - clear(); } // ---------------------------------------------------------clear diff --git a/glasscore/glasslib/src/Pick.cpp b/glasscore/glasslib/src/Pick.cpp index 084ec496..23871122 100644 --- a/glasscore/glasslib/src/Pick.cpp +++ b/glasscore/glasslib/src/Pick.cpp @@ -251,7 +251,6 @@ CPick::CPick(std::shared_ptr pick, CSiteList *pSiteList) { // ---------------------------------------------------------~CPick CPick::~CPick() { - clear(); } // ---------------------------------------------------------clear diff --git a/glasscore/glasslib/src/PickList.cpp b/glasscore/glasslib/src/PickList.cpp index 6c868056..12ba1a8d 100644 --- a/glasscore/glasslib/src/PickList.cpp +++ b/glasscore/glasslib/src/PickList.cpp @@ -36,8 +36,6 @@ CPickList::CPickList(int numThreads, int sleepTime, int checkInterval) // ---------------------------------------------------------~CPickList CPickList::~CPickList() { - // clean up everything else - clear(); } // ---------------------------------------------------------~clear @@ -447,6 +445,9 @@ glass3::util::WorkState CPickList::work() { return (glass3::util::WorkState::OK); } + // signal that the thread is still alive after pick parsing + setThreadHealth(); + // check if pick is duplicate std::shared_ptr existingPick = getDuplicate( newPick->getTPick(), newPick->getSite()->getSCNL(), @@ -533,6 +534,9 @@ glass3::util::WorkState CPickList::work() { // done modifying the multiset m_PickListMutex.unlock(); + // signal that the thread is still alive after pick insertion + setThreadHealth(); + // Attempt to associate the pick CGlass::getHypoList()->associateData(pick); @@ -559,6 +563,10 @@ glass3::util::WorkState CPickList::work() { } } + // signal that the thread is still alive, since association might have + // taken awhile + setThreadHealth(); + // Attempt nucleation unless we were told not to. if (bNucleateThisPick == true) { pick->nucleate(); diff --git a/glasscore/glasslib/src/Site.cpp b/glasscore/glasslib/src/Site.cpp index 27188b7d..4d020785 100644 --- a/glasscore/glasslib/src/Site.cpp +++ b/glasscore/glasslib/src/Site.cpp @@ -264,7 +264,6 @@ bool CSite::initialize(std::string sta, std::string comp, std::string net, // ---------------------------------------------------------~CSite CSite::~CSite() { - clear(); } // --------------------------------------------------------clear diff --git a/glasscore/glasslib/src/SiteList.cpp b/glasscore/glasslib/src/SiteList.cpp index 0603dca0..7df1ec0e 100644 --- a/glasscore/glasslib/src/SiteList.cpp +++ b/glasscore/glasslib/src/SiteList.cpp @@ -30,7 +30,6 @@ CSiteList::CSiteList(int numThreads, int sleepTime, int checkInterval) // ---------------------------------------------------------~CSiteList CSiteList::~CSiteList() { - clear(); } // ---------------------------------------------------------clear diff --git a/glasscore/glasslib/src/Trigger.cpp b/glasscore/glasslib/src/Trigger.cpp index 36724c47..067fe186 100644 --- a/glasscore/glasslib/src/Trigger.cpp +++ b/glasscore/glasslib/src/Trigger.cpp @@ -28,7 +28,6 @@ CTrigger::CTrigger(double lat, double lon, double z, double ot, // ---------------------------------------------------------~CTrigger CTrigger::~CTrigger() { - clear(); } // ---------------------------------------------------------clear diff --git a/glasscore/glasslib/src/Web.cpp b/glasscore/glasslib/src/Web.cpp index ffda599c..81ffe86f 100644 --- a/glasscore/glasslib/src/Web.cpp +++ b/glasscore/glasslib/src/Web.cpp @@ -76,7 +76,6 @@ CWeb::CWeb(std::string name, double thresh, int numDetect, int numNucleate, // ---------------------------------------------------------~CWeb CWeb::~CWeb() { - clear(); } // ---------------------------------------------------------clear diff --git a/glasscore/glasslib/src/WebList.cpp b/glasscore/glasslib/src/WebList.cpp index 54150783..76b9870d 100644 --- a/glasscore/glasslib/src/WebList.cpp +++ b/glasscore/glasslib/src/WebList.cpp @@ -20,7 +20,6 @@ CWebList::CWebList(int numThreads) { // ---------------------------------------------------------~CWebList CWebList::~CWebList() { - clear(); } // ---------------------------------------------------------clear diff --git a/glasscore/tests/web_unittest.cpp b/glasscore/tests/web_unittest.cpp index 3209b922..b39e75c8 100644 --- a/glasscore/tests/web_unittest.cpp +++ b/glasscore/tests/web_unittest.cpp @@ -313,7 +313,7 @@ TEST(WebTest, GlobalTest) { phasename2.c_str()); // cleanup - delete (testSiteList); + // delete (testSiteList); } // test creating a regional/local grid diff --git a/input/src/input.cpp b/input/src/input.cpp index 1c59e531..a9875e8e 100644 --- a/input/src/input.cpp +++ b/input/src/input.cpp @@ -54,11 +54,6 @@ Input::Input(std::shared_ptr config) // ---------------------------------------------------------~Input Input::~Input() { - glass3::util::Logger::log("debug", "Input::~Input(): Destruction."); - - // stop the Input thread - stop(); - if (m_DataQueue != NULL) { // clear the queue m_DataQueue->clear(); @@ -66,14 +61,17 @@ Input::~Input() { delete (m_DataQueue); } - if (m_GPickParser != NULL) + if (m_GPickParser != NULL) { delete (m_GPickParser); + } - if (m_JSONParser != NULL) + if (m_JSONParser != NULL) { delete (m_JSONParser); + } - if (m_CCParser != NULL) + if (m_CCParser != NULL) { delete (m_CCParser); + } } // ---------------------------------------------------------setup diff --git a/input/tests/input_unittest.cpp b/input/tests/input_unittest.cpp index d8b9b3a2..664a7ed4 100644 --- a/input/tests/input_unittest.cpp +++ b/input/tests/input_unittest.cpp @@ -111,6 +111,8 @@ TEST(InputTest, Configuration) { // assert queue max size is -1 ASSERT_EQ(TestInput.getInputDataMaxSize(), QUEUESIZE)<< "queue max size check"; + + TestInput.stop(); } // tests to see if input can process gicks @@ -137,6 +139,8 @@ TEST(InputTest, GPickTest) { // check that the right ammount of data is in the queue ASSERT_EQ(TestInput.getInputDataCount(), DATACOUNT)<< "queue size check"; + + TestInput.stop(); } // tests to see if input can process ccData @@ -163,6 +167,8 @@ TEST(InputTest, CCTest) { // check that the right ammount of data is in the queue ASSERT_EQ(TestInput.getInputDataCount(), DATACOUNT)<< "queue size check"; + + TestInput.stop(); } // tests to see if input can process json data @@ -189,4 +195,6 @@ TEST(InputTest, JSONTest) { // check that the right ammount of data is in the queue ASSERT_EQ(TestInput.getInputDataCount(), DATACOUNT)<< "queue size check"; + + TestInput.stop(); } diff --git a/output/src/output.cpp b/output/src/output.cpp index f08230c4..605e6798 100644 --- a/output/src/output.cpp +++ b/output/src/output.cpp @@ -31,8 +31,6 @@ namespace output { // ---------------------------------------------------------output output::output() : glass3::util::ThreadBaseClass("output") { - glass3::util::Logger::log("debug", "output::output(): Construction."); - std::time(&tLastWorkReport); std::time(&m_tLastSiteRequest); @@ -64,42 +62,31 @@ output::output() // ---------------------------------------------------------~output output::~output() { -glass3::util::Logger::log("debug", "output::~output(): Destruction."); - try { - // stop the output threads - stop(); - - // clear everything - clear(); - - // cleanup - getMutex().lock(); - // cppcheck-suppress nullPointerRedundantCheck - if (m_TrackingCache != NULL) { - m_TrackingCache->clear(); - delete (m_TrackingCache); - m_TrackingCache = NULL; - } + // cleanup + // cppcheck-suppress nullPointerRedundantCheck + if (m_TrackingCache != NULL) { + m_TrackingCache->clear(); + delete (m_TrackingCache); + m_TrackingCache = NULL; + } - // cppcheck-suppress nullPointerRedundantCheck - if (m_OutputQueue != NULL) { - m_OutputQueue->clear(); - delete (m_OutputQueue); - m_OutputQueue = NULL; - } + // cppcheck-suppress nullPointerRedundantCheck + if (m_OutputQueue != NULL) { + m_OutputQueue->clear(); + delete (m_OutputQueue); + m_OutputQueue = NULL; + } - // cppcheck-suppress nullPointerRedundantCheck - if (m_LookupQueue != NULL) { - m_LookupQueue->clear(); - delete (m_LookupQueue); - m_LookupQueue = NULL; - } - getMutex().unlock(); - } catch (const std::exception& e) { - glass3::util::Logger::log( - "warning", - "output::~output()(): Exception " - + std::string(e.what())); + // cppcheck-suppress nullPointerRedundantCheck + if (m_LookupQueue != NULL) { + m_LookupQueue->clear(); + delete (m_LookupQueue); + m_LookupQueue = NULL; + } + + if (m_ThreadPool != NULL) { + delete(m_ThreadPool); + m_ThreadPool = NULL; } } diff --git a/output/tests/output_unittest.cpp b/output/tests/output_unittest.cpp index c6bed245..74fbdaec 100644 --- a/output/tests/output_unittest.cpp +++ b/output/tests/output_unittest.cpp @@ -350,6 +350,8 @@ TEST(Output, Configuration) { std::string author = std::string(TESTAUTHOR); ASSERT_STREQ(outputObject->getDefaultAuthor().c_str(), author.c_str())<< "check author"; + + outputObject->stop(); } TEST(Output, ThreadTests) { @@ -478,6 +480,8 @@ TEST(Output, TrackingTests) { // clear outputThread.clearTrackingData(); ASSERT_FALSE(outputThread.haveTrackingData(std::string(ID3))); + + outputThread.stop(); } TEST(Output, OutputTest) { @@ -533,6 +537,8 @@ TEST(Output, OutputTest) { // check the output data against the input CheckData(senthypo, outputdetection); + + outputObject->stop(); } TEST(Output, UpdateTest) { @@ -641,6 +647,8 @@ TEST(Output, UpdateTest) { // assert that second update was not created ASSERT_EQ(outputObject->messages.size(), 2)<< "update2 not created"; + + outputObject->stop(); } TEST(Output, CancelTest) { @@ -694,6 +702,8 @@ TEST(Output, CancelTest) { // assert that the file is not there ASSERT_EQ(outputObject->messages.size(), 0)<< "no output"; + + outputObject->stop(); } TEST(Output, RetractTest) { @@ -753,6 +763,8 @@ TEST(Output, RetractTest) { // assert that output was created ASSERT_EQ(outputObject->messages.size(), 2)<< "retract created"; + + outputObject->stop(); } TEST(Output, ExpireTest) { @@ -831,6 +843,8 @@ TEST(Output, ExpireTest) { // check the output data against the update CheckData(sentexpirehypo2, outputexpiredetection2); + + outputObject->stop(); } TEST(Output, StationRequestTest) { @@ -874,6 +888,8 @@ TEST(Output, StationRequestTest) { // assert that request was created ASSERT_EQ(outputObject->messages.size(), 1)<< "request created"; + + outputObject->stop(); } TEST(Output, StationListTest) { @@ -908,6 +924,8 @@ TEST(Output, StationListTest) { // assert that sitelist was created ASSERT_EQ(outputObject->messages.size(), 1)<< "site list created"; + + outputObject->stop(); } TEST(Output, FailTests) { @@ -955,4 +973,6 @@ TEST(Output, FailTests) { std::shared_ptr badData2 = GetDataFromString( std::string(CONFIGFAIL1)); outputObject->sendToOutput(badData2); + + outputObject->stop(); } diff --git a/process/src/associator.cpp b/process/src/associator.cpp index f5905cf5..0e164ed9 100644 --- a/process/src/associator.cpp +++ b/process/src/associator.cpp @@ -43,18 +43,12 @@ Associator::Associator(glass3::util::iInput* inputint, // ---------------------------------------------------------~Associator Associator::~Associator() { - glass3::util::Logger::log("debug", - "associator::~Associator(): Destruction."); - - // stop the processing thread - stop(); - m_Input = NULL; m_Output = NULL; // clean up message queue - m_MessageQueue->clear(); if (m_MessageQueue != NULL) { + m_MessageQueue->clear(); delete (m_MessageQueue); } } diff --git a/util/include/threadbaseclass.h b/util/include/threadbaseclass.h index 618b13a3..8f0439f9 100644 --- a/util/include/threadbaseclass.h +++ b/util/include/threadbaseclass.h @@ -313,6 +313,8 @@ class ThreadBaseClass : public util::BaseClass { */ std::atomic m_iSleepTimeMS; + bool m_bTerminate; + // constants /** * \brief default health check interval in seconds diff --git a/util/src/baseclass.cpp b/util/src/baseclass.cpp index 7958862a..0b4b5712 100644 --- a/util/src/baseclass.cpp +++ b/util/src/baseclass.cpp @@ -15,7 +15,6 @@ BaseClass::BaseClass() { // ---------------------------------------------------------~BaseClass BaseClass::~BaseClass() { - clear(); } // ---------------------------------------------------------setup diff --git a/util/src/cache.cpp b/util/src/cache.cpp index 509f7bf4..a80467fc 100644 --- a/util/src/cache.cpp +++ b/util/src/cache.cpp @@ -20,7 +20,6 @@ Cache::Cache() { // ---------------------------------------------------------~Cache Cache::~Cache() { - clear(); } // ---------------------------------------------------------clear diff --git a/util/src/queue.cpp b/util/src/queue.cpp index 819d1a4f..b3be19b6 100644 --- a/util/src/queue.cpp +++ b/util/src/queue.cpp @@ -16,7 +16,6 @@ Queue::Queue() { // ---------------------------------------------------------~Queue Queue::~Queue() { - clear(); } // ---------------------------------------------------------clear diff --git a/util/src/threadbaseclass.cpp b/util/src/threadbaseclass.cpp index 897a81e5..e2810e87 100644 --- a/util/src/threadbaseclass.cpp +++ b/util/src/threadbaseclass.cpp @@ -23,6 +23,7 @@ ThreadBaseClass::ThreadBaseClass() setHealthCheckInterval(k_iHeathCheckIntervalDefault); setNumThreads(k_iNumThreadsDefault); setThreadHealth(); + m_bTerminate = false; // set to default inter-loop sleep setSleepTime(k_iSleepTimeDefault); @@ -40,6 +41,7 @@ ThreadBaseClass::ThreadBaseClass(std::string threadName, int sleepTimeMS, setHealthCheckInterval(checkInterval); setNumThreads(numThreads); setThreadHealth(); + m_bTerminate = false; // set to provided inter-loop sleep setSleepTime(sleepTimeMS); @@ -49,13 +51,9 @@ ThreadBaseClass::ThreadBaseClass(std::string threadName, int sleepTimeMS, // ---------------------------------------------------------~ThreadBaseClass ThreadBaseClass::~ThreadBaseClass() { - try { - stop(); - } catch (const std::exception& e) { - glass3::util::Logger::log( - "warning", - "ThreadBaseClass::~ThreadBaseClass()(): Exception " - + std::string(e.what())); + m_bTerminate = true; + for (int i = 0; i < m_WorkThreads.size(); i++) { + m_WorkThreads[i].join(); } } @@ -110,42 +108,49 @@ bool ThreadBaseClass::start() { // ---------------------------------------------------------stop bool ThreadBaseClass::stop() { - // don't bother if we've not got any threads - if (getNumThreads() <= 0) { - return (false); - } - if (m_WorkThreads.size() <= 0) { - return (false); - } - - // check if we're running - if (getWorkThreadsState() != glass3::util::ThreadState::Started) { - glass3::util::Logger::log( - "warning", - "ThreadBaseClass::stop(): Work Thread is not running, " - "or is already stopping. (" + getThreadName() + ")"); - return (false); - } - - // we're stopping - setWorkThreadsState(glass3::util::ThreadState::Stopping); + try { + // don't bother if we've not got any threads + if (getNumThreads() <= 0) { + return (false); + } + if (m_WorkThreads.size() <= 0) { + return (false); + } - // wait for threads to finish - for (int i = 0; i < m_WorkThreads.size(); i++) { - try { - m_WorkThreads[i].join(); - } catch (const std::exception& e) { + // check if we're running + if (getWorkThreadsState() != glass3::util::ThreadState::Started) { glass3::util::Logger::log( "warning", - "ThreadBaseClass::stop(): Exception " - + std::string(e.what()) + " joining work thread #" - + std::to_string(i) + "(" + getThreadName() + ")"); + "ThreadBaseClass::stop(): Work Thread is not running, " + "or is already stopping. (" + getThreadName() + ")"); + return (false); } - } - m_WorkThreads.clear(); + // we're stopping + setWorkThreadsState(glass3::util::ThreadState::Stopping); + + // wait for threads to finish + for (int i = 0; i < m_WorkThreads.size(); i++) { + try { + m_WorkThreads[i].join(); + } catch (const std::exception& e) { + glass3::util::Logger::log( + "warning", + "ThreadBaseClass::stop(): Exception " + + std::string(e.what()) + " joining work thread #" + + std::to_string(i) + "(" + getThreadName() + ")"); + } + } - setWorkThreadsState(glass3::util::ThreadState::Stopped); + m_WorkThreads.clear(); + + setWorkThreadsState(glass3::util::ThreadState::Stopped); + } catch (const std::system_error& e) { + glass3::util::Logger::log( + "warning", + "ThreadBaseClass::stop(): System Error Exception " + + std::string(e.what())); + } // done glass3::util::Logger::log( @@ -235,7 +240,7 @@ void ThreadBaseClass::workLoop() { setWorkThreadsState(glass3::util::ThreadState::Started); // run until told to stop - while (true) { + while (!m_bTerminate) { // signal that we're still running setThreadHealth(); glass3::util::WorkState workState; diff --git a/util/src/threadpool.cpp b/util/src/threadpool.cpp index a8d67470..80a88a35 100644 --- a/util/src/threadpool.cpp +++ b/util/src/threadpool.cpp @@ -19,8 +19,6 @@ ThreadPool::ThreadPool(std::string poolName, int numThreads, int sleepTime, // ---------------------------------------------------------~ThreadPool ThreadPool::~ThreadPool() { - // stop the threads. - stop(); } // ---------------------------------------------------------addJob From 5251c03e3cc41cab9622fcb44917e2484021c93c Mon Sep 17 00:00:00 2001 From: John Patton Date: Mon, 26 Nov 2018 08:27:35 -0700 Subject: [PATCH 09/11] adjusted logging, fixed bad error message --- glass-broker-app/brokerInput/brokerInput.cpp | 9 +++++---- util/src/threadpool.cpp | 6 +++--- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/glass-broker-app/brokerInput/brokerInput.cpp b/glass-broker-app/brokerInput/brokerInput.cpp index caf439ed..fc758801 100644 --- a/glass-broker-app/brokerInput/brokerInput.cpp +++ b/glass-broker-app/brokerInput/brokerInput.cpp @@ -19,7 +19,7 @@ brokerInput::brokerInput() "brokerInput::brokerInput(): Construction."); m_Consumer = NULL; - m_iBrokerHeartbeatInterval = std::numeric_limits::quiet_NaN(); + m_iBrokerHeartbeatInterval = -1; clear(); } @@ -28,7 +28,7 @@ brokerInput::brokerInput() brokerInput::brokerInput(const std::shared_ptr &config) : glass3::input::Input() { m_Consumer = NULL; - m_iBrokerHeartbeatInterval = std::numeric_limits::quiet_NaN(); + m_iBrokerHeartbeatInterval = -1; // do basic construction clear(); @@ -197,7 +197,7 @@ std::string brokerInput::fetchRawData(std::string* pOutType) { } // if we are checking heartbeat times - if (!(std::isnan(m_iBrokerHeartbeatInterval))) { + if (m_iBrokerHeartbeatInterval >= 0) { // get current time in seconds int64_t timeNow = std::time(NULL); @@ -210,7 +210,8 @@ std::string brokerInput::fetchRawData(std::string* pOutType) { // has it been too long since the last heartbeat? if (elapsedTime > m_iBrokerHeartbeatInterval) { glass3::util::Logger::log("error", - "No Heartbeat Message seen from topic(s) in " + + "brokerInput::fetchRawData: No Heartbeat Message seen from" + " topic(s) in " + std::to_string(m_iBrokerHeartbeatInterval) + " seconds! (" + std::to_string(elapsedTime) + ")"); diff --git a/util/src/threadpool.cpp b/util/src/threadpool.cpp index 80a88a35..20699e3c 100644 --- a/util/src/threadpool.cpp +++ b/util/src/threadpool.cpp @@ -28,7 +28,7 @@ void ThreadPool::addJob(std::function newjob) { getMutex().unlock(); glass3::util::Logger::log( - "debug", + "trace", "ThreadPool::addJob(): Added Job.(" + getThreadName() + ")"); } @@ -54,7 +54,7 @@ glass3::util::WorkState ThreadPool::work() { getMutex().unlock(); glass3::util::Logger::log( - "debug", + "trace", "ThreadPool::jobLoop(): Found Job.(" + getThreadName() + ")"); // run the job @@ -69,7 +69,7 @@ glass3::util::WorkState ThreadPool::work() { } glass3::util::Logger::log( - "debug", + "trace", "ThreadPool::jobLoop(): Finished Job.(" + getThreadName() + ")"); // work was successful From eb45492244f63225ddd0549f4b224cb901e41d81 Mon Sep 17 00:00:00 2001 From: John Patton Date: Mon, 26 Nov 2018 13:12:13 -0700 Subject: [PATCH 10/11] updated application documentation --- doc/gen-travel-times-app.md | 76 +++++++++++++++++++ doc/glass-app.md | 100 +++++++++++++++++++++++++ doc/glass-broker-app.md | 142 ++++++++++++++++++++++++++++++++++++ doc/index.md | 14 ++-- 4 files changed, 326 insertions(+), 6 deletions(-) create mode 100644 doc/gen-travel-times-app.md create mode 100644 doc/glass-app.md create mode 100644 doc/glass-broker-app.md diff --git a/doc/gen-travel-times-app.md b/doc/gen-travel-times-app.md new file mode 100644 index 00000000..a180e431 --- /dev/null +++ b/doc/gen-travel-times-app.md @@ -0,0 +1,76 @@ +# gen-travel-times-app + +gen-travel-times-app is an application that uses the glasscore traveltime libraries to generate the traveltime lookup files (.trv) used by neic-glass3 from a model file. Please note that this application is currently not optimized, and is **extremely** slow. + +## Building + +To build gen-travel-times-app, set the `BUILD_GEN-TRAVELTMES-APP` option equal to true (1) in the cmake command or GUI. + +## Configuration + +An example configuration for gen-travel-times-app is available in the [gen-travel-times-app params directory](https://github.com/usgs/neic-glass3/tree/master/gen-travel-times-app/params) + +### gen-travel-times.d + +```json +{ + "Configuration": "gen-travel-times-app", + "FileExtension": ".trv", + "OutputPath": "./", + "Model": "./params/ak135_mod.d", + "Branches": [ + { + "Cmd": "GenerateTraveltime", + "Branch": "P", + "Rays": [ + "Pup", + "P", + "Pdiff" + ], + "DeltaTimeWarp": { + "MinimumDistance": 0.0, + "MaximumDistance": 360.0, + "SlopeDecayConstant": 0.10, + "SlopeZero": 0.05, + "SlopeInfinite": 1.0 + }, + "DepthTimeWarp": { + "MinimumDepth": -10.0, + "MaximumDepth": 800.0, + "SlopeDecayConstant": 0.10, + "SlopeZero": 1.0, + "SlopeInfinite": 10.0 + } + } + ] +} +``` + +* **FileExtension** - The file extension to use for travel time files +* **OutputPath** - The output directory to write travel time files to +* **Model** - The earth model file to use +* **Branches** - The list of travel time files to generate +* **Branch** - The branch name for the travel time file +* **Rays** - The rays (phases) to use in generating the file +* **DeltaTimeWarp** - The distance warp for this travel time file +* **MinimumDistance** - Start of warp in degrees +* **MaximumDistance** - End of warp in degrees +* **SlopeDecayConstant** - The decay exponent for the warp +* **SlopeZero** - The warp slope value at minimum +* **SlopeInfinite** - The warp slope value at maximum +* **DepthTimeWarp** - The depth warp for this travel time file +* **MinimumDepth** - Start of warp in kilometers +* **MaximumDepth** - End of warp in kilometers +* **SlopeDecayConstant** - The decay exponent for the warp +* **SlopeZero** - The warp slope value at minimum +* **SlopeInfinite** - The warp slope value at maximum + +## Documentation + +Further documentation of the gen-traveltimes-app software is available [here](https://usgs.github.io/neic-glass3/gen-travel-times-app/html/) + +## Running + +To run gen-travel-times-app, use the following command: `gen-travel-times-app [logname]` where `` is the required path the the gen-travel-times.d configuration fileand `[logname]` is an optional command that when present specifies the log file name and enables logging. + +gen-travel-times-app uses the environment variable GLASS_LOG to define the location to write log files \ No newline at end of file diff --git a/doc/glass-app.md b/doc/glass-app.md new file mode 100644 index 00000000..1d639d18 --- /dev/null +++ b/doc/glass-app.md @@ -0,0 +1,100 @@ +# glass-app + +glass-app is an implementation of the neic-glass3 libraries that reads input from a file system directory, and writes output to a file system directory, with a static stationlist and configuration. + +## Building + +To build glass-app, set the `BUILD_GLASS-APP` option equal to true (1) in the cmake command or GUI. + +## Configuration + +An example configuration for glass-app is available in the [glass-app params directory](https://github.com/usgs/neic-glass3/tree/master/glass-app/params) + +### Glass-App + +```json +{ + "Configuration":"glass-app", + "LogLevel":"debug", + "ConfigDirectory":"./params", + "InitializeFile":"initialize.d", + "StationList":"stationlist.d", + "GridFiles":[ + "ak_grid.d", + ], + "InputConfig":"input.d", + "OutputConfig":"output.d" +} +``` + +* **LogLevel** - Sets the minimum logging level, trace, debug, info, warning, error, or criticalerror +* **ConfigDirectory** - The path to directory containing the other glass subcomponent configuration files +* **InitializeFile** - Configuration file containing the neic-glass3 Algorithm configuration +* **StationList** - File containing the initial neic-glass3 station list +* **GridFiles** - One or more files defining detection grids used by neic-glass3 +* **InputConfig** - Configuration file containing the file input configuration +* **OutputConfig** - Configuration file containing the file output configuration + +### input.d + +```json +{ + "Configuration":"GlassInput", + "InputDirectory":"./input", + "ErrorDirectory":"./error", + "ArchiveDirectory":"./archive", + "Format":"gpick", + "QueueMaxSize":1000, + "ShutdownWhenNoData":true, + "ShutdownWait":300, + "DefaultAgencyID":"US", + "DefaultAuthor":"glassConverter" +} +``` + +* **InputDirectory** - The directory to read input files from +* **ErrorDirectory** - The optional directory to archive erroneous input files to. +* **ArchiveDirectory** - The optional directory to archive input files to. +* **Format** - The format to accept. glass-app currently understands the gpick, jsonpick, jsonhypo, and ccdata (dat) formats. Note that the only way to use multiple inputs (Picks, Correlations, and Detections at the same time) +* **QueueMaxSize** - The maximum size of the input queue +* **ShutdownWhenNoData** - Optional Flag indicating whether to shut down when there is no more input data +* **ShutdownWait** - The time in seconds to wait before shutting down due to there being no input data +* **DefaultAgencyID** - The default agency identifier to use when converting data to json +* **DefaultAuthor** - The default author to use when converting data to json + +### Output + +```json +{ + "Configuration":"GlassOutput", + "PublicationTimes":[20,180], + "PublishOnExpiration":true, + "OutputDirectory":"./output", + "OutputFormat":"json", + "TimeStampFileName":true, + "OutputAgencyID":"US", + "OutputAuthor":"glass" +} +``` + +* **PublicationTimes** - The time(s), in seconds since the detections was first found, to publish +* **PublishOnExpiration** - Flag indicating whether to always publish a final version of a detection when it expires out of glass +* **OutputDirectory** - The directory to write output to +* **OutputFormat** - The format to write output in for now, the only format is json +* **TimeStampFileName** - Optional flag to define whether to timestamp output file names, defaults to true +* **OutputAgencyID** - The agency identifier to use when generating output data +* **OutputAuthor** - The author to use when generating output data + +### neic-glass3 Algorithm + +For neic-glass3 algorithmic configuration, see [neic-glass3 Configuration](https://github.com/usgs/neic-glass3/blob/master/doc/GlassConfiguration.md). + +## Documentation + +Further documentation of the glass-app software is available [here](https://usgs.github.io/neic-glass3/glass-app/html/) + +## Running + +To run glass-app, use the following command: `glass-app [logname] [noconsole]` where `` is the required path the the glass.d configuration file, `[logname]` is an optional command that defining an alternate name for the glass-app log file, and `[noconsole]` is an optional command specifying that glass-app should not write messages to the console. + +glass-app uses the environment variable GLASS_LOG to define the location to write log files \ No newline at end of file diff --git a/doc/glass-broker-app.md b/doc/glass-broker-app.md new file mode 100644 index 00000000..4658b452 --- /dev/null +++ b/doc/glass-broker-app.md @@ -0,0 +1,142 @@ +# glass-broker-app + +glass-broker-app is an implementation of the neic-glass3 libraries that reads +input, station, and configuration updates from +[HazDev Broker](https://github.com/usgs/hazdev-broker) topics, and writes output +and station information requests to HazDev Broker topics. + +## Building + +To build glass-broker-app, set the `BUILD_GLASS-BROKER-APP` option equal to +true (1) in the cmake command or GUI. Building glass-broker-app requires +dependencies, such as [hazdev-broker](https://github.com/usgs/hazdev-broker) +and [librdkafka](https://github.com/edenhill/librdkafka). + +## Configuration + +An example configuration for glass-broker-app is available in the +[glass-broker-app params directory](https://github.com/usgs/neic-glass3/tree/master/glass-broker-app/params) + +### glass.d + +```json +{ + "Configuration":"glass-broker-app", + "LogLevel":"debug", + "ConfigDirectory":"./params", + "StationList":"stationlist.d", + "InitializeFile":"initialize.d", + "GridFiles":[ + "ak_grid.d", + ], + "InputConfig":"input.d", + "OutputConfig":"output.d" +} +``` + +* **LogLevel** - Sets the minimum logging level, trace, debug, info, warning, error, or criticalerror +* **ConfigDirectory** - The path to directory containing the other glass subcomponent configuration files +* **InitializeFile** - Configuration file containing the neic-glass3 Algorithm configuration +* **StationList** - File containing the initial neic-glass3 station list +* **GridFiles** - One or more files defining detection grids used by neic-glass3 +* **InputConfig** - Configuration file containing the broker input configuration +* **OutputConfig** - Configuration file containing the broker output configuration + +### input.d + +```json +{ + "Configuration":"GlassInput", + "HazdevBrokerConfig": { + "Type":"ConsumerConfig", + "Properties":{ + "client.id":"glass3Default", + "group.id":"1", + "metadata.broker.list":"", + "enable.auto.commit":"false" + } + }, + "HazdevBrokerTopicConfig": { + "Type":"TopicConfig", + "Properties":{ + "auto.commit.enable":"false", + "auto.offset.reset":"latest" + } + }, + "Topics":["Dev-RayPicker-1", "Station-Data"], + "HeartbeatDirectory":"./", + "BrokerHeartbeatInterval":300, + "QueueMaxSize":1000, + "DefaultAgencyID":"US", + "DefaultAuthor":"glassConverter" +} +``` + +* **HazdevBrokerConfig** - The HazDev Broker configuration to use for input, see [HazDev-Broker](https://github.com/usgs/hazdev-broker) +* **HazdevBrokerTopicConfig** - The HazDev Broker topic configuration to use, see [HazDev-Broker](https://github.com/usgs/hazdev-broker) +* **Topics** - The HazDev Broker topic(s) to receive input data from +* **HeartbeatDirectory** - An optional key defining where HazDev Broker heartbeat files should be written, if not defined, heartbeat files will not be written. +* **BrokerHeartbeatInterval** - An optional key defining the interval in seconds to expect HazDev Broker heartbeats, if not defined, heatbeats are not expected. +* **QueueMaxSize** - The maximum size of the input queue +* **DefaultAgencyID** - The default agency identifier to use when converting data to json +* **DefaultAuthor** - The default author to use when converting data to json + +### output.d + +```json +{ + "Configuration":"GlassOutput", + "PublishOnExpiration":false, + "PublicationTimes":[20,180], + "HazdevBrokerConfig": { + "Type":"ProducerConfig", + "Properties":{ + "client.id":"glass3Default", + "group.id":"0", + "metadata.broker.list":"", + "retries":"0" + } + }, + "OutputTopics":[ + { + "TopicName":"OK", + "TopLatitude":38.0, + "LeftLongitude":-101.0, + "BottomLatitude":33.0, + "RightLongitude":-94.0 + }, + { "TopicName":"DefaultWorld" + } + ], + "StationRequestTopic":"Station-Lookup", + "SiteListDelay":7200, + "StationFile":"./params/stationlist.d", + "OutputAgencyID":"US", + "OutputAuthor":"glass" +} +``` + +* **PublishOnExpiration** - Flag indicating whether to always publish a final version of a detection when it expires out of glass +* **PublicationTimes** - The time(s), in seconds since the detections was first found, to publish +* **HazdevBrokerConfig** - The HazDev Broker configuration to use for output, see [HazDev-Broker](https://github.com/usgs/hazdev-broker) +* **OutputTopics** - The list of HazDev Broker topics to write output to. +* **TopicName** - +* **TopLatitude** - +* **LeftLongitude** - +* **BottomLatitude** - +* **RightLongitude** - +* **StationRequestTopic** - Optional HazDev Broker topic to request station information. +* **SiteListDelay** - Optional delay between writing updated station files to disk +* **StationFile** - Optional file name of updated station file +* **OutputAgencyID** - The agency identifier to use when generating output data +* **OutputAuthor** - The author to use when generating output data + +### neic-glass3 Algorithm + +For neic-glass3 algorithmic configuration, see [GLASS 3 Configuration](https://github.com/usgs/neic-glass3/blob/master/doc/GlassConfiguration.md). + +## Running + +To run glass-broker-app, use the following command: `glass-broker-app [logname] [noconsole]` where `` is the required path to the glass.d configuration file, `[logname]` is an optional command that defining an alternate name for the glass-broker-app log file, and `[noconsole]` is an optional command specifying that glass-broker-app should not write messages to the console. + +glass-broker-app uses the environment variable GLASS_LOG to define the location to write log files \ No newline at end of file diff --git a/doc/index.md b/doc/index.md index 9b3f9986..388a5061 100644 --- a/doc/index.md +++ b/doc/index.md @@ -1,8 +1,6 @@ -# Welcome to the neic-glass3 Code Documentation pages +# neic-glass3 Code Documentation pages -Please note that this documentation site is **Under Construction** and not all pages have been completed yet. - -## Library Documentation: +## Library Documentation * [GlassCore](https://usgs.github.io/neic-glass3/glasscore/html/) * [Log](https://usgs.github.io/neic-glass3/log/html/) @@ -12,11 +10,15 @@ Please note that this documentation site is **Under Construction** and not all p * [Util](https://usgs.github.io/neic-glass3/util/html/) ## Application Documentation -* [gen-traveltimes-app](https://usgs.github.io/neic-glass3/gen-travel-times-app/html/) -* [glass-app](https://usgs.github.io/neic-glass3/glass-app/html/) + +* [gen-traveltimes-app](gen-travel-times-app.md) +* [glass-app](glass-app.md) +* [glass-broker-app](glass-broker-app.md) ## Configuration Documentation + * [Glass Configuration](GlassConfiguration.md) ## Wiki + * [Glass Wiki](https://github.com/usgs/neic-glass3/wiki) From 78ed20a955779007cd8acdce840a0955157f3ce6 Mon Sep 17 00:00:00 2001 From: John Patton Date: Mon, 26 Nov 2018 13:22:33 -0700 Subject: [PATCH 11/11] upped version number --- cmake/version.cmake | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmake/version.cmake b/cmake/version.cmake index 7a6b018e..34e95dfa 100644 --- a/cmake/version.cmake +++ b/cmake/version.cmake @@ -1,4 +1,4 @@ # version.cmake - a CMake script that defines the overall project version set (PROJECT_VERSION_MAJOR 1) set (PROJECT_VERSION_MINOR 0) -set (PROJECT_VERSION_PATCH 1) +set (PROJECT_VERSION_PATCH 2)