Skip to content

Commit

Permalink
enh(SQLLogger): #4529 (wip)
Browse files Browse the repository at this point in the history
  • Loading branch information
aleks-f committed Apr 13, 2024
1 parent 9af63ee commit 016faef
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 51 deletions.
31 changes: 17 additions & 14 deletions Data/include/Poco/Data/SQLChannel.h
Original file line number Diff line number Diff line change
Expand Up @@ -232,9 +232,13 @@ class Data_API SQLChannel: public Poco::Channel, Poco::Runnable
/// Returns true is value is "true", "t", "yes" or "y".
/// Case insensitive.

size_t logTofile(AutoPtr<FileChannel>& pFileChannel, const std::string& fileName, bool clear = false);
size_t logToFile(bool clear = false);
/// Logs cached entries to a file. Called in case DB insertions fail.

size_t logLocal(const std::string, Message::Priority prio = Message::PRIO_ERROR);
/// Adds the message to the local SQLChannel log queue, and logs it to the file.
/// Typically used to log DB connection/execution erors.

std::string maskPwd();
/// Masks the password in the connection
/// string, if detected. This is not a
Expand All @@ -243,17 +247,17 @@ class Data_API SQLChannel: public Poco::Channel, Poco::Runnable

mutable Poco::FastMutex _mutex;

std::string _connector;
std::string _connect;
SessionPtr _pSession;
std::string _sql;
std::string _name;
std::string _table;
bool _tableChanged;
int _timeout;
std::atomic<int> _minBatch;
int _maxBatch;
bool _bulk;
std::string _connector;
std::string _connect;
SessionPtr _pSession;
std::string _sql;
std::string _name;
std::string _table;
bool _tableChanged;
int _timeout;
std::atomic<int> _minBatch;
int _maxBatch;
bool _bulk;
std::atomic<bool> _throw;

// members for log entry cache
Expand All @@ -267,7 +271,6 @@ class Data_API SQLChannel: public Poco::Channel, Poco::Runnable
Poco::NotificationQueue _logQueue;
std::unique_ptr<Poco::Thread> _pDBThread;
std::atomic<bool> _reconnect;
std::atomic<bool> _running;
std::atomic<bool> _stop;
std::atomic<size_t> _logged;
StrategyPtr _pArchiveStrategy;
Expand All @@ -293,7 +296,7 @@ inline bool SQLChannel::isTrue(const std::string& value) const

inline bool SQLChannel::isRunning() const
{
return _running;
return _pDBThread && _pDBThread->isRunning();
}


Expand Down
84 changes: 47 additions & 37 deletions Data/src/SQLChannel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ SQLChannel::SQLChannel():
_tid(),
_priority(),
_reconnect(false),
_running(false),
_stop(false),
_logged(0)
{
Expand Down Expand Up @@ -97,7 +96,6 @@ SQLChannel::SQLChannel(const std::string& connector,
_priority(),
_pDBThread(new Thread),
_reconnect(true),
_running(false),
_stop(false),
_logged(0)
{
Expand Down Expand Up @@ -157,12 +155,13 @@ void SQLChannel::open()
_pSession = new Session(_connector, _connect, _timeout / 1000);
if (_pSession->hasProperty("maxFieldSize")) _pSession->setProperty("maxFieldSize", 8192);
if (_pSession->hasProperty("autoBind")) _pSession->setFeature("autoBind", true);
_logger.information("Connected to %s: %s", _connector, maskPwd());
if (!_stop) _logger.information("Connected to %s: %s", _connector, maskPwd());
else logLocal(Poco::format("Connected to %s: %s", _connector, maskPwd()));
return;
}
catch (DataException& ex)
{
_logger.error(ex.displayText());
logLocal(ex.displayText());
}
}
_pSession = nullptr;
Expand All @@ -173,7 +172,19 @@ void SQLChannel::open()
void SQLChannel::close()
{
wait(_timeout);
_pSession = nullptr;
if (_pSession)
{
_pSession->close();
_pSession = nullptr;
}
}


size_t SQLChannel::logLocal(const std::string message, Message::Priority prio)
{
Message msg("SQLChannel"s, message, prio);
log(msg);
return logToFile(_pFileChannel);
}


Expand All @@ -189,7 +200,7 @@ size_t SQLChannel::logSync()
{
return execSQL();
}
catch (Exception&)
catch (...)
{
if (_throw) throw;
}
Expand Down Expand Up @@ -222,7 +233,7 @@ bool SQLChannel::processOne(int minBatch)
}
ret = true;
}
if (_source.size() >= _minBatch) logSync();
if (_source.size() >= minBatch) logSync();

return ret;
}
Expand All @@ -243,25 +254,32 @@ void SQLChannel::run()
if (_reconnect && sleepTime < 12800)
sleepTime *= 2;
}
processOne(_minBatch);
sleepTime = 100;

if (!_reconnect)
{
processOne(_minBatch);
sleepTime = 100;
}
}
catch (Poco::Exception& ex)
{
_logger.error(ex.displayText());
if (!_stop) _logger.error(ex.displayText());
else logLocal(ex.displayText());
}
catch (std::exception& ex)
{
_logger.error(ex.what());
if (!_stop) _logger.error(ex.what());
else logLocal(ex.what());
}
catch (...)
{
_logger.error("SQLChannel::run(): unknown exception");
if (!_stop) _logger.error("SQLChannel::run(): unknown exception"s);
else logLocal("SQLChannel::run(): unknown exception"s);
}
_running = true;
Thread::sleep(100);
if (_stop) break;
Thread::sleep(sleepTime);
}
_running = false;
while (_logQueue.size()) processOne();
}


Expand All @@ -271,21 +289,20 @@ void SQLChannel::stop()
{
_reconnect = false;
_stop = true;
while (_pDBThread->isRunning()) Thread::sleep(10);
_pDBThread->join();
while (_logQueue.size())
processOne();
}
}


void SQLChannel::reconnect()
{
_reconnect = true;
if (!_pDBThread)
{
_pDBThread.reset(new Thread);
_pDBThread->start(*this);
}
_reconnect = true;
}


Expand Down Expand Up @@ -455,16 +472,16 @@ std::string SQLChannel::getProperty(const std::string& name) const
}


size_t SQLChannel::logTofile(AutoPtr<FileChannel>& pFileChannel, const std::string& fileName, bool clear)
size_t SQLChannel::logToFile(bool clear)
{
static std::vector<std::string> names;
if (names.size() != _source.size())
names.resize(_source.size(), Poco::replace(_name, "'", "''"));

std::size_t n = 0;

if (!pFileChannel) pFileChannel = new FileChannel(fileName);
if (pFileChannel)
if (!_pFileChannel && !_file.empty()) _pFileChannel = new FileChannel(_file);
if (_pFileChannel)
{
std::string sql;
Poco::format(sql, SQL_INSERT_STMT, _table, std::string());
Expand All @@ -488,7 +505,7 @@ size_t SQLChannel::logTofile(AutoPtr<FileChannel>& pFileChannel, const std::stri
{
os << ";\n";
Message msg(_source[0], os.str(), Message::PRIO_ERROR);
pFileChannel->log(msg);
_pFileChannel->log(msg);
os.str(""); sql.clear();
Poco::format(sql, SQL_INSERT_STMT, _table, std::string());
batch = 0;
Expand All @@ -501,7 +518,7 @@ size_t SQLChannel::logTofile(AutoPtr<FileChannel>& pFileChannel, const std::stri
os << ",\n";
}
Message msg(_source[0], os.str(), Message::PRIO_ERROR);
pFileChannel->log(msg);
_pFileChannel->log(msg);
n = _source.size();
if (clear && n)
{
Expand Down Expand Up @@ -555,11 +572,9 @@ size_t SQLChannel::execSQL()
use(_text, bulk),
use(_dateTime, bulk), now;
}
// most likely bulk mode not supported,
// log and try again
catch (Poco::InvalidAccessException& ex)
// most likely bulk mode not supported, try again
catch (Poco::InvalidAccessException&)
{
_logger.log(ex);
(*_pSession) << _sql,
use(_source),
use(names),
Expand Down Expand Up @@ -588,26 +603,22 @@ size_t SQLChannel::execSQL()
}
catch (Poco::Exception& ex)
{
_logger.error(ex.displayText());
if (!_file.empty())
n = logTofile(_pFileChannel, _file);
logLocal(ex.displayText());
close();
_reconnect = true;
}
catch (std::exception& ex)
{
_logger.error(ex.what());
if (!_file.empty())
n = logTofile(_pFileChannel, _file);
logLocal(ex.what());
close();
_reconnect = true;
}
}
else
{
if (!_file.empty())
n = logTofile(_pFileChannel, _file);
n = logToFile(_pFileChannel);
}

if (n)
{
_logged += n;
Expand All @@ -627,14 +638,13 @@ std::size_t SQLChannel::wait(int ms)
{
Stopwatch sw;
sw.start();
int processed = _logQueue.size();
while (_logQueue.size())
{
Thread::sleep(10);
if (ms && sw.elapsed() * 1000 > ms)
break;
}
return processed - _logQueue.size();
return _logQueue.size();
}


Expand Down

0 comments on commit 016faef

Please sign in to comment.