Skip to content

Commit 2443579

Browse files
committed
Log callback moved to constructor
1 parent 4f9bfce commit 2443579

File tree

2 files changed

+22
-15
lines changed

2 files changed

+22
-15
lines changed

kafka/src/kafka/impl/configuration.cpp

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,22 +16,13 @@
1616
#include <userver/yaml_config/yaml_config.hpp>
1717

1818
#include <kafka/impl/error_buffer.hpp>
19-
#include <kafka/impl/log_level.hpp>
2019

2120
USERVER_NAMESPACE_BEGIN
2221

2322
namespace kafka::impl {
2423

2524
namespace {
2625

27-
/// @brief Redirect `librdkafka` logs to `userver` logs.
28-
///
29-
/// @see
30-
/// https://docs.confluent.io/platform/current/clients/librdkafka/html/rdkafka_8h.html#a06ade2ca41f32eb82c6f7e3d4acbe19f
31-
void KafkaLogCallback([[maybe_unused]] const rd_kafka_t*, int level, const char* fac, const char* buf) noexcept {
32-
LOG(userver::kafka::impl::convertRdKafkaLogLevelToLoggingLevel(level)) << fac << buf;
33-
}
34-
3526
template <class SupportedList>
3627
bool IsSupportedOption(const SupportedList& supported_options, const std::string& configured_option) {
3728
return utils::ContainsIf(supported_options, [&configured_option](std::string_view supported_option) {
@@ -54,8 +45,8 @@ void VerifyComponentNamePrefix(const std::string& component_name, const std::str
5445
// producer's component should start with kafka-producer, consumer's - with
5546
// kafka-consumer
5647
if (component_name.rfind(expected_prefix) != 0) {
57-
throw std::runtime_error{
58-
fmt::format("Component '{}' doesn't start with '{}'", component_name, expected_prefix)};
48+
throw std::runtime_error{fmt::format("Component '{}' doesn't start with '{}'", component_name, expected_prefix)
49+
};
5950
}
6051
}
6152

@@ -122,7 +113,8 @@ SecurityConfiguration Parse(const yaml_config::YamlConfig& config, formats::pars
122113

123114
security.security_protocol.emplace<SecurityConfiguration::SaslSsl>(SecurityConfiguration::SaslSsl{
124115
/*security_mechanism=*/mechanism,
125-
/*ssl_ca_location=*/config["ssl_ca_location"].As<std::string>()});
116+
/*ssl_ca_location=*/config["ssl_ca_location"].As<std::string>()
117+
});
126118
}
127119

128120
return security;
@@ -291,8 +283,6 @@ void Configuration::SetOption(const char* option, const char* value, T to_print)
291283
#pragma GCC diagnostic pop
292284
#endif
293285
if (err == RD_KAFKA_CONF_OK) {
294-
rd_kafka_conf_set_log_cb(conf_.GetHandle(), KafkaLogCallback);
295-
296286
LOG_INFO() << fmt::format("Kafka conf option: '{}' -> '{}'", option, to_print);
297287
return;
298288
}

kafka/src/kafka/impl/holders.cpp

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,31 @@
55
#include <fmt/format.h>
66
#include <librdkafka/rdkafka.h>
77

8+
#include <userver/logging/log.hpp>
89
#include <userver/utils/assert.hpp>
910

1011
#include <kafka/impl/error_buffer.hpp>
12+
#include <kafka/impl/log_level.hpp>
13+
14+
#include <fmt/format.h>
1115

1216
USERVER_NAMESPACE_BEGIN
1317

1418
namespace kafka::impl {
1519

20+
namespace {
21+
22+
/// @brief Redirect `librdkafka` logs to `userver` logs.
23+
///
24+
/// @see
25+
/// https://docs.confluent.io/platform/current/clients/librdkafka/html/rdkafka_8h.html#a06ade2ca41f32eb82c6f7e3d4acbe19f
26+
void KafkaLogCallback([[maybe_unused]] const rd_kafka_t*, int level, const char* facility, const char* message)
27+
noexcept {
28+
LOG(convertRdKafkaLogLevelToLoggingLevel(level)) << fmt::format("facility: {}, message: {}", facility, message);
29+
}
30+
31+
} // namespace
32+
1633
template <class T, DeleterType<T> Deleter>
1734
HolderBase<T, Deleter>::HolderBase(T* data) : ptr_(data, Deleter) {}
1835

@@ -52,7 +69,7 @@ struct ConfHolder::Impl {
5269
HolderBase<rd_kafka_conf_s, &rd_kafka_conf_destroy> conf;
5370
};
5471

55-
ConfHolder::ConfHolder(rd_kafka_conf_t* conf) : impl_(conf) {}
72+
ConfHolder::ConfHolder(rd_kafka_conf_t* conf) : impl_(conf) { rd_kafka_conf_set_log_cb(conf, KafkaLogCallback); }
5673

5774
ConfHolder::ConfHolder(const ConfHolder& other) : impl_(rd_kafka_conf_dup(other.GetHandle())) {}
5875

0 commit comments

Comments
 (0)