Skip to content

Commit 7802b47

Browse files
author
Володин Кирилл Сергеевич
committed
Fix comments
1 parent ac54f7b commit 7802b47

File tree

9 files changed

+91
-45
lines changed

9 files changed

+91
-45
lines changed

kafka/include/userver/kafka/consumer_scope.hpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ namespace kafka {
1111
namespace impl {
1212

1313
class Consumer;
14+
struct OffsetRange;
1415

1516
} // namespace impl
1617

@@ -118,15 +119,14 @@ class ConsumerScope final {
118119
/// @warning This is a blocking call
119120
/// @param topic The name of the Kafka topic.
120121
/// @param partition The partition number of the Kafka topic.
121-
/// @return A pair of integers representing the minimum and maximum offsets for the given topic and partition.
122-
/// The first value is the minimum offset, and the second is the maximum offset.
123-
std::pair<std::int64_t, std::int64_t> GetMinMaxOffset(const std::string& topic, std::int32_t partition);
122+
/// @return A struct with minimum and maximum offsets for the given topic and partition.
123+
OffsetRange GetOffsetRange(const std::string& topic, std::int32_t partition) const;
124124

125125
/// @brief Retrieves the partition IDs for the specified Kafka topic.
126126
/// @warning This is a blocking call
127127
/// @param topic The name of the Kafka topic.
128128
/// @return A vector of partition IDs for the given topic.
129-
std::vector<std::int32_t> GetPartitionsId(const std::string& topic);
129+
std::vector<std::uint32_t> GetPartitionIds(const std::string& topic) const;
130130

131131
private:
132132
friend class impl::Consumer;

kafka/include/userver/kafka/impl/consumer.hpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ class ConsumerImpl;
1818

1919
struct ConsumerConfiguration;
2020
struct Secret;
21+
struct OffsetRange;
2122

2223
/// @brief Parameters Consumer uses in runtime.
2324
/// The struct is used only for documentation purposes, Consumer can be
@@ -96,6 +97,14 @@ class Consumer final {
9697
/// understanding
9798
void AsyncCommit();
9899

100+
/// @brief Retrieves the low and high offsets for the specified kafka topic and partition.
101+
/// @see ConsumerScope::GetOffsetRange for better commitment process
102+
OffsetRange GetOffsetRange(const std::string& topic, std::int32_t partition) const;
103+
104+
/// @brief Retrieves the partition IDs for the specified kafka topic.
105+
/// @see ConsumerScope::GetPartitionIds for better commitment process
106+
std::vector<std::uint32_t> GetPartitionIds(const std::string& topic) const;
107+
99108
/// @brief Adds consumer name to current span.
100109
void ExtendCurrentSpan() const;
101110

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
#pragma once
2+
3+
USERVER_NAMESPACE_BEGIN
4+
5+
namespace kafka::impl {
6+
7+
struct OffsetRange final {
8+
std::int64_t high{};
9+
std::int64_t low{};
10+
};
11+
12+
} // namespace kafka::impl
13+
14+
USERVER_NAMESPACE_END

kafka/src/kafka/consumer_scope.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,12 @@ void ConsumerScope::Stop() noexcept { consumer_.Stop(); }
1616

1717
void ConsumerScope::AsyncCommit() { consumer_.AsyncCommit(); }
1818

19-
std::pair<std::int64_t, std::int64_t> ConsumerScope::GetMinMaxOffset(const std::string& topic, std::int32_t partition) {
20-
return consumer_.GetMinMaxOffset(topic, partition);
19+
OffsetRange ConsumerScope::GetOffsetRange(const std::string& topic, std::int32_t partition) const {
20+
return consumer_.GetOffsetRange(topic, partition);
2121
}
2222

23-
std::vector<std::int32_t> ConsumerScope::GetPartitionsId(const std::string& topic){
24-
return consumer_.GetPartitionsId(topic);
23+
std::vector<std::uint32_t> ConsumerScope::GetPartitionIds(const std::string& topic) const {
24+
return consumer_.GetPartitionIds(topic);
2525
}
2626

2727
} // namespace kafka

kafka/src/kafka/impl/consumer.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#include <userver/engine/sleep.hpp>
88
#include <userver/formats/json/value_builder.hpp>
99
#include <userver/kafka/impl/configuration.hpp>
10+
#include <userver/kafka/impl/offset.hpp>
1011
#include <userver/kafka/impl/stats.hpp>
1112
#include <userver/testsuite/testpoint.hpp>
1213
#include <userver/tracing/span.hpp>
@@ -166,6 +167,14 @@ void Consumer::AsyncCommit() {
166167
}).Get();
167168
}
168169

170+
OffsetRange Consumer::GetOffsetRange(const std::string& topic, std::int32_t partition) const {
171+
consumer_->GetOffsetRange(topic, partition);
172+
}
173+
174+
std::vector<std::uint32_t> Consumer::GetPartitionIds(const std::string& topic) const {
175+
consumer_->GetPartitionIds(topic);
176+
}
177+
169178
void Consumer::Stop() noexcept {
170179
if (processing_.exchange(false) && poll_task_.IsValid()) {
171180
UINVARIANT(consumer_, "Stopping already stopped consumer");

kafka/src/kafka/impl/consumer_impl.cpp

Lines changed: 39 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,16 @@
55
#include <fmt/format.h>
66
#include <fmt/ranges.h>
77

8-
#include <boost/range/iterator_range.hpp>
9-
108
#include <userver/kafka/impl/configuration.hpp>
9+
#include <userver/kafka/impl/offset.hpp>
1110
#include <userver/kafka/impl/stats.hpp>
1211
#include <userver/logging/log.hpp>
1312
#include <userver/testsuite/testpoint.hpp>
1413
#include <userver/tracing/span.hpp>
1514
#include <userver/utils/fast_scope_guard.hpp>
1615
#include <userver/utils/span.hpp>
1716

17+
#include <kafka/impl/error_buffer.hpp>
1818
#include <kafka/impl/holders_aliases.hpp>
1919
#include <kafka/impl/log_level.hpp>
2020

@@ -294,51 +294,57 @@ void ConsumerImpl::Commit() { rd_kafka_commit(consumer_.GetHandle(), nullptr, /*
294294

295295
void ConsumerImpl::AsyncCommit() { rd_kafka_commit(consumer_.GetHandle(), nullptr, /*async=*/1); }
296296

297-
std::pair<std::int64_t, std::int64_t> ConsumerImpl::GetMinMaxOffset(const std::string& topic, std::int32_t partition) {
298-
std::int64_t offset_high{-1};
299-
std::int64_t offset_low{-1};
300-
297+
OffsetRange ConsumerImpl::GetOffsetRange(const std::string& topic, std::int32_t partition) const {
298+
OffsetRange offset_range;
301299
auto err = rd_kafka_query_watermark_offsets(
302-
consumer_.GetHandle(), topic.data(), partition, &offset_low, &offset_high, /*timeout_ms=*/-1
300+
consumer_.GetHandle(),
301+
topic.с_str(),
302+
partition,
303+
&offset_range.low,
304+
&offset_range.high,
305+
/*timeout_ms=*/-1
303306
);
304307
if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
305-
LOG_ERROR() << fmt::format("Failed to get offsets: {}", rd_kafka_err2str(err));
306-
return {-1, -1};
308+
PrintErrorAndThrow("get offsets", rd_kafka_err2str(err));
307309
}
308310

309-
if (offset_high == RD_KAFKA_OFFSET_INVALID) {
310-
LOG_ERROR() << fmt::format("Invalid offset for topic {} partition {}", topic, partition);
311-
return {-1, -1};
311+
if (offset_range.low == RD_KAFKA_OFFSET_INVALID || offset_range.high == RD_KAFKA_OFFSET_INVALID) {
312+
PrintErrorAndThrow("get offsets", fmt::format("invalid offset for topic {} partition {}", topic, partition));
312313
}
313314

314-
return {offset_low, offset_high};
315+
return offset_range;
315316
}
316317

317-
std::vector<std::int32_t> ConsumerImpl::GetPartitionsId(const std::string& topic) {
318-
const rd_kafka_metadata_t* raw_metadata{nullptr};
319-
auto err = rd_kafka_metadata(consumer_.GetHandle(), 0, nullptr, &raw_metadata, /*timeout_ms=*/-1);
320-
if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
321-
LOG_ERROR() << fmt::format("Failed to fetch metadata: ", rd_kafka_err2str(err));
322-
return {};
323-
}
324-
MetadataHolder metadata{raw_metadata};
325-
326-
auto topics = boost::make_iterator_range(metadata->topics, metadata->topics + metadata->topic_cnt);
327-
auto* topic_it = std::find_if(topics.begin(), topics.end(), [&topic](const rd_kafka_metadata_topic& topic_raw) {
328-
return topic == topic_raw.topic;
329-
});
318+
std::vector<std::uint32_t> ConsumerImpl::GetPartitionIds(const std::string& topic) const {
319+
MetadataHolder metadata{[&consumer_] {
320+
const rd_kafka_metadata_t* raw_metadata{nullptr};
321+
auto err = rd_kafka_metadata(consumer_.GetHandle(), 0, nullptr, &raw_metadata, /*timeout_ms=*/-1);
322+
if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
323+
PrintErrorAndThrow("fetch metadata", rd_kafka_err2str(err));
324+
}
325+
return raw_metadata;
326+
}()};
327+
328+
userver::utils::span<const rd_kafka_metadata_topic> topics{
329+
metadata->topics, static_cast<std::size_t>(metadata->topic_cnt)};
330+
const auto* topic_it =
331+
std::find_if(topics.begin(), topics.end(), [&topic](const rd_kafka_metadata_topic& topic_raw) {
332+
return topic == topic_raw.topic;
333+
});
330334
if (topic_it == topics.end()) {
331-
LOG_ERROR() << fmt::format("Topic not found: {}", topic);
332-
return {};
335+
PrintErrorAndThrow("find topic", fmt::format("{} not found", topic));
333336
}
334337

335-
std::vector<std::int32_t> partitions_id;
336-
for (const auto& partition :
337-
boost::make_iterator_range(topic_it->partitions, topic_it->partitions + topic_it->partition_cnt)) {
338-
partitions_id.push_back(partition.id);
338+
userver::utils::span<const rd_kafka_metadata_partition> partitions{
339+
topic_it->partitions, static_cast<std::size_t>(topic_it->partition_cnt)};
340+
std::vector<std::uint32_t> partition_ids;
341+
partition_ids.reserve(partitions.size());
342+
343+
for (const auto& partition : partitions) {
344+
partition_ids.push_back(partition.id);
339345
}
340346

341-
return partitions_id;
347+
return partition_ids;
342348
}
343349

344350
EventHolder ConsumerImpl::PollEvent() {

kafka/src/kafka/impl/consumer_impl.hpp

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ namespace kafka::impl {
1818

1919
struct Stats;
2020
struct TopicStats;
21+
struct OffsetRange;
2122

2223
/// @brief Consumer implementation based on `librdkafka`.
2324
/// @warning All methods calls the `librdkafka` functions that very often uses
@@ -37,11 +38,11 @@ class ConsumerImpl final {
3738
/// @brief Schedules the commitment task.
3839
void AsyncCommit();
3940

40-
/// @brief Retrieves the minimum and maximum offsets for the specified Kafka topic and partition.
41-
std::pair<std::int64_t, std::int64_t> GetMinMaxOffset(const std::string& topic, std::int32_t partition);
41+
/// @brief Retrieves the low and high offsets for the specified kafka topic and partition.
42+
OffsetRange GetOffsetRange(const std::string& topic, std::int32_t partition) const;
4243

43-
/// @brief Retrieves the partition IDs for the specified Kafka topic.
44-
std::vector<std::int32_t> GetPartitionsId(const std::string& topic);
44+
/// @brief Retrieves the partition IDs for the specified kafka topic.
45+
std::vector<std::uint32_t> GetPartitionIds(const std::string& topic) const;
4546

4647
/// @brief Effectively calls `PollMessage` until `deadline` is reached
4748
/// and no more than `max_batch_size` messages polled.

kafka/src/kafka/impl/error_buffer.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,12 @@ void PrintErrorAndThrow(std::string_view failed_action, const ErrorBuffer& err_b
1818
throw std::runtime_error{full_error};
1919
}
2020

21+
void PrintErrorAndThrow(std::string_view failed_action, std::string_view reason) {
22+
const auto full_error = fmt::format("Failed to {}: {}", failed_action, reason);
23+
LOG_ERROR() << full_error;
24+
throw std::runtime_error{full_error};
25+
}
26+
2127
} // namespace kafka::impl
2228

2329
USERVER_NAMESPACE_END

kafka/src/kafka/impl/error_buffer.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ constexpr std::size_t kErrorBufferSize = 512;
1717
using ErrorBuffer = std::array<char, kErrorBufferSize>;
1818

1919
[[noreturn]] void PrintErrorAndThrow(std::string_view failed_action, const ErrorBuffer& err_buf);
20+
[[noreturn]] void PrintErrorAndThrow(std::string_view failed_action, std::string_view reason);
2021

2122
} // namespace kafka::impl
2223

0 commit comments

Comments
 (0)