Skip to content

Commit 77d17ea

Browse files
author
Володин Кирилл Сергеевич
committed
Fix comments
1 parent 0e11284 commit 77d17ea

File tree

8 files changed

+56
-19
lines changed

8 files changed

+56
-19
lines changed

kafka/include/userver/kafka/consumer_scope.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ class ConsumerScope final {
116116
void AsyncCommit();
117117

118118
/// @brief Retrieves the minimum and maximum offsets for the specified topic and partition.
119-
/// @throws GetOffsetRangeException if the offsets could not retrieve or if the returned offsets are invalid
119+
/// @throws GetOffsetRangeException if the offsets could not be retrieve or if the returned offsets are invalid
120120
/// @warning This is a blocking call
121121
/// @param topic The name of the topic.
122122
/// @param partition The partition number of the topic.

kafka/include/userver/kafka/exceptions.hpp

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
#pragma once
22

3+
#include <cstdint>
34
#include <stdexcept>
5+
#include <string_view>
46

57
USERVER_NAMESPACE_BEGIN
68

@@ -68,25 +70,41 @@ class UnknownPartitionException final : public SendException {
6870
UnknownPartitionException();
6971
};
7072

71-
class GetOffsetRangeException : public std::runtime_error {
73+
/// @brief Exception thrown when there is an error retrieving the offset range.
74+
class OffsetRangeException : public std::runtime_error {
7275
public:
7376
using std::runtime_error::runtime_error;
7477

75-
GetOffsetRangeException(const char* what) : std::runtime_error(what) {}
78+
public:
79+
OffsetRangeException(std::string_view what, std::string_view topic, std::uint32_t partition);
80+
};
81+
82+
class OffsetRangeTimeoutException final : public OffsetRangeException {
83+
static constexpr const char* kWhat = "Timeout while fetching offsets.";
84+
85+
public:
86+
OffsetRangeTimeoutException(std::string_view topic, std::uint32_t partition);
7687
};
7788

7889
class TopicNotFoundException final : public std::runtime_error {
7990
public:
8091
using std::runtime_error::runtime_error;
81-
82-
TopicNotFoundException(const char* what) : std::runtime_error(what) {}
8392
};
8493

94+
/// @brief Exception thrown when fetching metadata.
8595
class GetMetadataException final : public std::runtime_error {
8696
public:
8797
using std::runtime_error::runtime_error;
8898

89-
GetMetadataException(const char* what) : std::runtime_error(what) {}
99+
public:
100+
GetMetadataException(std::string_view what, std::string_view topic);
101+
};
102+
103+
class GetMetadataTimeoutException final : public std::runtime_error {
104+
static constexpr const char* kWhat = "Timeout while getting metadata.";
105+
106+
public:
107+
GetMetadataTimeoutException(std::string_view topic);
90108
};
91109

92110
} // namespace kafka

kafka/include/userver/kafka/impl/consumer.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,14 +55,14 @@ class Consumer final {
5555
/// @param topics stands for topics list that consumer subscribes to
5656
/// after ConsumerScope::Start called
5757
/// @param consumer_task_processor -- task processor for message batches
58-
/// @param consumer_operation_task_processor -- task processor for consumer operation
58+
/// @param consumer_blocking_task_processor -- task processor for consumer blocking operation
5959
/// polling
6060
/// All callbacks are invoked in `main_task_processor`
6161
Consumer(
6262
const std::string& name,
6363
const std::vector<std::string>& topics,
6464
engine::TaskProcessor& consumer_task_processor,
65-
engine::TaskProcessor& consumer_operation_task_processor,
65+
engine::TaskProcessor& consumer_blocking_task_processor,
6666
engine::TaskProcessor& main_task_processor,
6767
const ConsumerConfiguration& consumer_configuration,
6868
const Secret& secrets,

kafka/include/userver/kafka/offset_range.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ USERVER_NAMESPACE_BEGIN
66

77
namespace kafka {
88

9+
/// @brief Represents the range of offsets.
910
struct OffsetRange final {
1011
///@brief The low watermark offset. It indicates the earliest available offset in Kafka.
1112
std::uint32_t low{};

kafka/src/kafka/consumer_component.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ ConsumerComponent::ConsumerComponent(
2222
config.Name(),
2323
config["topics"].As<std::vector<std::string>>(),
2424
context.GetTaskProcessor("consumer-task-processor"),
25-
context.GetTaskProcessor("consumer-operation-task-processor"),
25+
context.GetTaskProcessor("consumer-blocking-task-processor"),
2626
context.GetTaskProcessor("main-task-processor"),
2727
config.As<impl::ConsumerConfiguration>(),
2828
context.FindComponent<components::Secdist>().Get().Get<impl::BrokerSecrets>().GetSecretByComponentName(

kafka/src/kafka/exceptions.cpp

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
#include <userver/kafka/exceptions.hpp>
22

3+
#include <fmt/format.h>
4+
35
USERVER_NAMESPACE_BEGIN
46

57
namespace kafka {
@@ -19,6 +21,18 @@ UnknownTopicException::UnknownTopicException() : SendException(kWhat) {}
1921

2022
UnknownPartitionException::UnknownPartitionException() : SendException(kWhat) {}
2123

24+
OffsetRangeException::OffsetRangeException(std::string_view what, std::string_view topic, std::uint32_t partition)
25+
: std::runtime_error(fmt::format("{} topic: '{}', partition: {}", what, topic, partition)) {}
26+
27+
OffsetRangeTimeoutException::OffsetRangeTimeoutException(std::string_view topic, std::uint32_t partition)
28+
: OffsetRangeException(kWhat, topic, partition) {}
29+
30+
GetMetadataException::GetMetadataException(std::string_view what, std::string_view topic)
31+
: std::runtime_error(fmt::format("{} topic: '{}'", what, topic)) {}
32+
33+
GetMetadataTimeoutException::GetMetadataTimeoutException(std::string_view topic)
34+
: std::runtime_error(fmt::format("{} topic: '{}'", kWhat, topic)) {}
35+
2236
} // namespace kafka
2337

2438
USERVER_NAMESPACE_END

kafka/src/kafka/impl/consumer.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ Consumer::Consumer(
5353
const std::string& name,
5454
const std::vector<std::string>& topics,
5555
engine::TaskProcessor& consumer_task_processor,
56-
engine::TaskProcessor& consumer_operation_task_processor,
56+
engine::TaskProcessor& consumer_blocking_task_processor,
5757
engine::TaskProcessor& main_task_processor,
5858
const ConsumerConfiguration& configuration,
5959
const Secret& secrets,
@@ -63,7 +63,7 @@ Consumer::Consumer(
6363
topics_(topics),
6464
execution_params(params),
6565
consumer_task_processor_(consumer_task_processor),
66-
consumer_operation_task_processor_(consumer_operation_task_processor),
66+
consumer_operation_task_processor_(consumer_blocking_task_processor),
6767
main_task_processor_(main_task_processor),
6868
conf_(Configuration{name, configuration, secrets}.Release()) {
6969
/// To check configuration validity

kafka/src/kafka/impl/consumer_impl.cpp

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -300,8 +300,8 @@ OffsetRange ConsumerImpl::GetOffsetRange(
300300
std::uint32_t partition,
301301
std::optional<std::chrono::milliseconds> timeout
302302
) const {
303-
int64_t low_offset{0};
304-
int64_t high_offset{0};
303+
std::int64_t low_offset{0};
304+
std::int64_t high_offset{0};
305305

306306
auto err = rd_kafka_query_watermark_offsets(
307307
consumer_.GetHandle(),
@@ -311,14 +311,15 @@ OffsetRange ConsumerImpl::GetOffsetRange(
311311
&high_offset,
312312
static_cast<int>(timeout.value_or(std::chrono::milliseconds(-1)).count())
313313
);
314+
315+
if (err == RD_KAFKA_RESP_ERR__TIMED_OUT) {
316+
throw OffsetRangeTimeoutException{topic, partition};
317+
}
314318
if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
315-
throw GetOffsetRangeException{fmt::format("Failed to get offsets: {}", rd_kafka_err2str(err))};
319+
throw OffsetRangeException{fmt::format("Failed to get offsets: {}", rd_kafka_err2str(err)), topic, partition};
316320
}
317-
318321
if (low_offset == RD_KAFKA_OFFSET_INVALID || high_offset == RD_KAFKA_OFFSET_INVALID) {
319-
throw GetOffsetRangeException{
320-
fmt::format("Failed to get offsets: invalid offset for topic {} partition {}", topic, partition)
321-
};
322+
throw OffsetRangeException{fmt::format("Failed to get offsets: invalid offset."), topic, partition};
322323
}
323324

324325
return {static_cast<std::uint32_t>(low_offset), static_cast<std::uint32_t>(high_offset)};
@@ -336,8 +337,11 @@ ConsumerImpl::GetPartitionIds(const std::string& topic, std::optional<std::chron
336337
&raw_metadata,
337338
static_cast<int>(timeout.value_or(std::chrono::milliseconds(-1)).count())
338339
);
340+
if (err == RD_KAFKA_RESP_ERR__TIMED_OUT) {
341+
throw GetMetadataTimeoutException{topic};
342+
}
339343
if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
340-
throw GetMetadataException{fmt::format("Failed to fetch metadata: {}", rd_kafka_err2str(err))};
344+
throw GetMetadataException{fmt::format("Failed to fetch metadata: {}.", rd_kafka_err2str(err)), topic};
341345
}
342346
return raw_metadata;
343347
}()};

0 commit comments

Comments
 (0)