Skip to content

Commit effdf7f

Browse files
author
Alexander Damian
committed
Removed ReturnType. Throw on error from inside do_commit() as well as from perform()
1 parent d84b75c commit effdf7f

File tree

3 files changed

+22
-39
lines changed

3 files changed

+22
-39
lines changed

include/cppkafka/utils/backoff_committer.h

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -137,32 +137,28 @@ class CPPKAFKA_API BackoffCommitter : public BackoffPerformer {
137137
private:
138138
// If the ReturnType contains 'true', we abort committing. Otherwise we continue.
139139
// The second member of the ReturnType contains the RdKafka error if any.
140-
template <typename T>
141-
ReturnType do_commit(const T* object) {
142-
ReturnType rt;
140+
template <typename...Args>
141+
bool do_commit(Args&&...args) {
143142
try {
144-
if (!object) {
145-
consumer_.commit();
146-
}
147-
else {
148-
consumer_.commit(*object);
149-
}
143+
consumer_.commit(std::forward<Args>(args)...);
150144
// If the commit succeeds, we're done.
145+
return true;
151146
}
152147
catch (const HandleException& ex) {
153-
rt.error_ = ex.get_error();
148+
Error error = ex.get_error();
154149
// If there were actually no offsets to commit, return. Retrying won't solve
155150
// anything here
156-
if (rt.error_ != RD_KAFKA_RESP_ERR__NO_OFFSET) {
157-
// If there's no callback or if returns true for this message, keep committing.
158-
// Otherwise abort.
159-
CallbackInvoker<ErrorCallback> callback("backoff committer", callback_, &consumer_);
160-
if (!callback || callback(rt.error_)) {
161-
rt.abort_ = false; //continue retrying
162-
}
151+
if (error == RD_KAFKA_RESP_ERR__NO_OFFSET) {
152+
throw ex; //abort
153+
}
154+
// If there's a callback and it returns false for this message, abort.
155+
// Otherwise keep committing.
156+
CallbackInvoker<ErrorCallback> callback("backoff committer", callback_, &consumer_);
157+
if (callback && !callback(error)) {
158+
throw ex; //abort
163159
}
164160
}
165-
return rt;
161+
return false; //continue
166162
}
167163

168164
Consumer& consumer_;

include/cppkafka/utils/backoff_performer.h

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -48,14 +48,6 @@ class CPPKAFKA_API BackoffPerformer {
4848
static const TimeUnit DEFAULT_BACKOFF_STEP;
4949
static const TimeUnit DEFAULT_MAXIMUM_BACKOFF;
5050
static const size_t DEFAULT_MAXIMUM_RETRIES;
51-
52-
/**
53-
* @brief Type which any functor must return.
54-
*/
55-
struct ReturnType {
56-
bool abort_{true};
57-
Error error_;
58-
};
5951

6052
/**
6153
* The backoff policy to use
@@ -128,15 +120,10 @@ class CPPKAFKA_API BackoffPerformer {
128120
void perform(const Functor& callback) {
129121
TimeUnit backoff = initial_backoff_;
130122
size_t retries = maximum_retries_;
131-
ReturnType rt;
132123
while (retries--) {
133124
auto start = std::chrono::steady_clock::now();
134125
// If the callback returns true, we're done
135-
rt = callback();
136-
if (rt.abort_) {
137-
if (rt.error_) {
138-
break; //terminal error
139-
}
126+
if (callback()) {
140127
return; //success
141128
}
142129
auto end = std::chrono::steady_clock::now();
@@ -149,7 +136,7 @@ class CPPKAFKA_API BackoffPerformer {
149136
backoff = increase_backoff(backoff);
150137
}
151138
// No more retries left or we have a terminal error.
152-
throw ConsumerException(rt.error_);
139+
throw Exception("Commit failed: no more retries.");
153140
}
154141
private:
155142
TimeUnit increase_backoff(TimeUnit backoff);

src/utils/backoff_committer.cpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,20 +44,20 @@ void BackoffCommitter::set_error_callback(ErrorCallback callback) {
4444
}
4545

4646
void BackoffCommitter::commit() {
47-
perform([&]()->ReturnType {
48-
return do_commit<TopicPartitionList>(nullptr);
47+
perform([&] {
48+
return do_commit();
4949
});
5050
}
5151

5252
void BackoffCommitter::commit(const Message& msg) {
53-
perform([&]()->ReturnType {
54-
return do_commit(&msg);
53+
perform([&] {
54+
return do_commit(msg);
5555
});
5656
}
5757

5858
void BackoffCommitter::commit(const TopicPartitionList& topic_partitions) {
59-
perform([&]()->ReturnType {
60-
return do_commit(&topic_partitions);
59+
perform([&] {
60+
return do_commit(topic_partitions);
6161
});
6262
}
6363

0 commit comments

Comments
 (0)