Skip to content

Commit

Permalink
Added sendig header for kafka message for Producer.
Browse files Browse the repository at this point in the history
  • Loading branch information
melonaerial committed Feb 27, 2025
1 parent 903fcf3 commit 6e7567e
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 8 deletions.
7 changes: 7 additions & 0 deletions kafka/include/userver/kafka/producer.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include <cstdint>
#include <unordered_map>

#include <userver/engine/task/task_processor_fwd.hpp>
#include <userver/engine/task/task_with_result.hpp>
Expand Down Expand Up @@ -44,6 +45,9 @@ struct Secret;
///
/// @see https://docs.confluent.io/platform/current/clients/producer.html
class Producer final {
public:
using Headers = std::unordered_map<std::string, std::string>;

public:
/// @brief Creates the Kafka Producer.
///
Expand Down Expand Up @@ -95,6 +99,7 @@ class Producer final {
const std::string& topic_name,
std::string_view key,
std::string_view message,
Headers headers = {},
std::optional<std::uint32_t> partition = std::nullopt
) const;

Expand All @@ -111,6 +116,7 @@ class Producer final {
std::string topic_name,
std::string key,
std::string message,
Headers headers = {},
std::optional<std::uint32_t> partition = std::nullopt
) const;

Expand All @@ -124,6 +130,7 @@ class Producer final {
const std::string& topic_name,
std::string_view key,
std::string_view message,
Headers headers = {},
std::optional<std::uint32_t> partition
) const;

Expand Down
18 changes: 17 additions & 1 deletion kafka/src/kafka/impl/producer_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,11 @@ DeliveryResult ProducerImpl::Send(
const std::string& topic_name,
std::string_view key,
std::string_view message,
const Headers& headers,
std::optional<std::uint32_t> partition
) const {
LOG_INFO() << fmt::format("Message to topic '{}' is requested to send", topic_name);
auto delivery_result_future = ScheduleMessageDelivery(topic_name, key, message, partition);
auto delivery_result_future = ScheduleMessageDelivery(topic_name, key, message, headers, partition);

WaitUntilDeliveryReported(delivery_result_future);

Expand All @@ -140,6 +141,7 @@ engine::Future<DeliveryResult> ProducerImpl::ScheduleMessageDelivery(
const std::string& topic_name,
std::string_view key,
std::string_view message,
const Headers& headers,
std::optional<std::uint32_t> partition
) const {
auto waiter = std::make_unique<DeliveryWaiter>();
Expand Down Expand Up @@ -173,13 +175,26 @@ engine::Future<DeliveryResult> ProducerImpl::ScheduleMessageDelivery(
#pragma clang diagnostic ignored "-Wgnu-statement-expression"
#endif

// We assume that message takes ownership of headers, if rd_kafka_producev()
// succeeded. See rd_kafka RD_KAFKA_V_HEADERS reference for details
struct KafkaHeaders final {
using Ptr = std::unique_ptr<rd_kafka_headers_t, KafkaHeaders>;
void operator()(rd_kafka_headers_t* headers) const { rd_kafka_headers_destroy(headers); }
};

KafkaHeaders::Ptr kafka_headers{rd_kafka_headers_new(headers.size())};
for (const auto& [header, value] : headers) {
rd_kafka_header_add(kafka_headers.get(), header.c_str(), header.size(), value.c_str(), value.size());
}

// NOLINTBEGIN(clang-analyzer-cplusplus.NewDeleteLeaks,cppcoreguidelines-pro-type-const-cast)
const rd_kafka_resp_err_t enqueue_error = rd_kafka_producev(
producer_.GetHandle(),
RD_KAFKA_V_TOPIC(topic_name.c_str()),
RD_KAFKA_V_KEY(key.data(), key.size()),
RD_KAFKA_V_VALUE(const_cast<char*>(message.data()), message.size()),
RD_KAFKA_V_MSGFLAGS(0),
RD_KAFKA_V_HEADERS(kafka_headers.get()),
RD_KAFKA_V_PARTITION(partition.value_or(RD_KAFKA_PARTITION_UA)),
RD_KAFKA_V_OPAQUE(waiter.get()),
RD_KAFKA_V_END
Expand All @@ -191,6 +206,7 @@ engine::Future<DeliveryResult> ProducerImpl::ScheduleMessageDelivery(
#endif

if (enqueue_error == RD_KAFKA_RESP_ERR_NO_ERROR) {
[[maybe_unused]] const auto released = kafka_headers.release();
[[maybe_unused]] auto _ = waiter.release();
} else {
LOG_WARNING(
Expand Down
6 changes: 6 additions & 0 deletions kafka/src/kafka/impl/producer_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <chrono>
#include <cstdint>
#include <optional>
#include <unordered_map>

#include <librdkafka/rdkafka.h>

Expand All @@ -20,6 +21,9 @@ namespace kafka::impl {
class Configuration;

class ProducerImpl final {
public:
using Headers = std::unordered_map<std::string, std::string>;

public:
explicit ProducerImpl(Configuration&& configuration);

Expand All @@ -31,6 +35,7 @@ class ProducerImpl final {
const std::string& topic_name,
std::string_view key,
std::string_view message,
const Headers& headers,
std::optional<std::uint32_t> partition
) const;

Expand All @@ -51,6 +56,7 @@ class ProducerImpl final {
const std::string& topic_name,
std::string_view key,
std::string_view message,
const Headers& headers,
std::optional<std::uint32_t> partition
) const;

Expand Down
23 changes: 16 additions & 7 deletions kafka/src/kafka/producer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,16 @@ void Producer::Send(
const std::string& topic_name,
std::string_view key,
std::string_view message,
Headers headers,
std::optional<std::uint32_t> partition
) const {
utils::Async(producer_task_processor_, "producer_send", [this, &topic_name, key, message, partition] {
SendImpl(topic_name, key, message, partition);
}).Get();
utils::Async(
producer_task_processor_,
"producer_send",
[this, &topic_name, key, message, headers = std::move(headers), partition] {
SendImpl(topic_name, key, headers, message, headers, partition);
}
).Get();
}

engine::TaskWithResult<void> Producer::SendAsync(
Expand All @@ -102,9 +107,12 @@ engine::TaskWithResult<void> Producer::SendAsync(
return utils::Async(
producer_task_processor_,
"producer_send_async",
[this, topic_name = std::move(topic_name), key = std::move(key), message = std::move(message), partition] {
SendImpl(topic_name, key, message, partition);
}
[this,
topic_name = std::move(topic_name),
key = std::move(key),
message = std::move(message),
headers = std::move(headers),
partition] { SendImpl(topic_name, key, message, headers, partition); }
);
}

Expand All @@ -114,11 +122,12 @@ void Producer::SendImpl(
const std::string& topic_name,
std::string_view key,
std::string_view message,
const Headers& headers,
std::optional<std::uint32_t> partition
) const {
tracing::Span::CurrentSpan().AddTag("kafka_producer", name_);

const impl::DeliveryResult delivery_result = producer_->Send(topic_name, key, message, partition);
const impl::DeliveryResult delivery_result = producer_->Send(topic_name, key, message, headers, partition);
if (!delivery_result.IsSuccess()) {
ThrowSendError(delivery_result);
}
Expand Down

0 comments on commit 6e7567e

Please sign in to comment.