Skip to content

Commit 0c11197

Browse files
author
Alexander Damian
committed
Replaced termination callback with throwing exception
1 parent e8c4397 commit 0c11197

File tree

6 files changed

+74
-35
lines changed

6 files changed

+74
-35
lines changed

CMakeLists.txt

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,20 +7,17 @@ set(CPPKAFKA_VERSION_MINOR 2)
77
set(CPPKAFKA_VERSION "${CPPKAFKA_VERSION_MAJOR}.${CPPKAFKA_VERSION_MINOR}")
88
set(RDKAFKA_MIN_VERSION 0x00090400)
99

10-
if (NOT CMAKE_CXX_FLAGS)
11-
# Set default compile flags for the project
12-
if(MSVC)
13-
# Don't always use Wall, since VC's /Wall is ridiculously verbose.
14-
set(CMAKE_CXX_FLAGS "/W3")
15-
16-
# Disable VC secure checks, since these are not really issues
17-
add_definitions("-D_CRT_SECURE_NO_WARNINGS=1")
18-
add_definitions("-D_SCL_SECURE_NO_WARNINGS=1")
19-
add_definitions("-DNOGDI=1")
20-
add_definitions("-DNOMINMAX=1")
21-
else()
22-
set(CMAKE_CXX_FLAGS "-std=c++11 -Wall")
23-
endif()
10+
if(MSVC)
11+
# Don't always use Wall, since VC's /Wall is ridiculously verbose.
12+
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /W3")
13+
14+
# Disable VC secure checks, since these are not really issues
15+
add_definitions("-D_CRT_SECURE_NO_WARNINGS=1")
16+
add_definitions("-D_SCL_SECURE_NO_WARNINGS=1")
17+
add_definitions("-DNOGDI=1")
18+
add_definitions("-DNOMINMAX=1")
19+
else()
20+
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -Wall")
2421
endif()
2522
set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_CURRENT_SOURCE_DIR}/cmake/")
2623

include/cppkafka/error.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@ namespace cppkafka {
4242
*/
4343
class CPPKAFKA_API Error {
4444
public:
45+
/**
46+
* @brief Constructs an error object with RD_KAFKA_RESP_ERR_NO_ERROR
47+
*/
48+
Error() = default;
4549
/**
4650
* Constructs an error object
4751
*/
@@ -77,7 +81,7 @@ class CPPKAFKA_API Error {
7781
*/
7882
CPPKAFKA_API friend std::ostream& operator<<(std::ostream& output, const Error& rhs);
7983
private:
80-
rd_kafka_resp_err_t error_;
84+
rd_kafka_resp_err_t error_{RD_KAFKA_RESP_ERR_NO_ERROR};
8185
};
8286

8387
} // cppkafka

include/cppkafka/utils/backoff_committer.h

Lines changed: 30 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -100,10 +100,18 @@ class CPPKAFKA_API BackoffCommitter : public BackoffPerformer {
100100
*/
101101
void set_error_callback(ErrorCallback callback);
102102

103+
/**
104+
* \brief Commits the current partition assignment synchronously
105+
*
106+
* This will call Consumer::commit() until either the message is successfully
107+
* committed or the error callback returns false (if any is set).
108+
*/
109+
void commit();
110+
103111
/**
104112
* \brief Commits the given message synchronously
105113
*
106-
* This will call Consumer::commit until either the message is successfully
114+
* This will call Consumer::commit(msg) until either the message is successfully
107115
* committed or the error callback returns false (if any is set).
108116
*
109117
* \param msg The message to be committed
@@ -113,7 +121,7 @@ class CPPKAFKA_API BackoffCommitter : public BackoffPerformer {
113121
/**
114122
* \brief Commits the offsets on the given topic/partitions synchronously
115123
*
116-
* This will call Consumer::commit until either the offsets are successfully
124+
* This will call Consumer::commit(topic_partitions) until either the offsets are successfully
117125
* committed or the error callback returns false (if any is set).
118126
*
119127
* \param topic_partitions The topic/partition list to be committed
@@ -127,25 +135,34 @@ class CPPKAFKA_API BackoffCommitter : public BackoffPerformer {
127135
*/
128136
Consumer& get_consumer();
129137
private:
130-
// Return true to abort and false to continue committing
138+
// If the ReturnType contains 'true', we abort committing. Otherwise we continue.
139+
// The second member of the ReturnType contains the RdKafka error if any.
131140
template <typename T>
132-
bool do_commit(const T& object) {
141+
ReturnType do_commit(const T* object) {
142+
ReturnType rt;
133143
try {
134-
consumer_.commit(object);
135-
// If the commit succeeds, we're done
136-
return true;
144+
if (!object) {
145+
consumer_.commit();
146+
}
147+
else {
148+
consumer_.commit(*object);
149+
}
150+
// If the commit succeeds, we're done.
137151
}
138152
catch (const HandleException& ex) {
153+
rt.error_ = ex.get_error();
139154
// If there were actually no offsets to commit, return. Retrying won't solve
140155
// anything here
141-
if (ex.get_error() == RD_KAFKA_RESP_ERR__NO_OFFSET) {
142-
return true;
156+
if (rt.error_ != RD_KAFKA_RESP_ERR__NO_OFFSET) {
157+
// If there's no callback or if returns true for this message, keep committing.
158+
// Otherwise abort.
159+
CallbackInvoker<ErrorCallback> callback("backoff committer", callback_, &consumer_);
160+
if (!callback || callback(rt.error_)) {
161+
rt.abort_ = false; //continue retrying
162+
}
143163
}
144-
// If there's a callback and it returns false for this message, abort.
145-
// Otherwise keep committing.
146-
CallbackInvoker<ErrorCallback> callback("backoff committer", callback_, &consumer_);
147-
return callback && !callback(ex.get_error());
148164
}
165+
return rt;
149166
}
150167

151168
Consumer& consumer_;

include/cppkafka/utils/backoff_performer.h

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
#include <functional>
3535
#include <thread>
3636
#include "../consumer.h"
37+
#include "../exceptions.h"
3738

3839
namespace cppkafka {
3940

@@ -47,6 +48,14 @@ class CPPKAFKA_API BackoffPerformer {
4748
static const TimeUnit DEFAULT_BACKOFF_STEP;
4849
static const TimeUnit DEFAULT_MAXIMUM_BACKOFF;
4950
static const size_t DEFAULT_MAXIMUM_RETRIES;
51+
52+
/**
53+
* @brief Type which any functor must return.
54+
*/
55+
struct ReturnType {
56+
bool abort_{true};
57+
Error error_;
58+
};
5059

5160
/**
5261
* The backoff policy to use
@@ -119,11 +128,16 @@ class CPPKAFKA_API BackoffPerformer {
119128
void perform(const Functor& callback) {
120129
TimeUnit backoff = initial_backoff_;
121130
size_t retries = maximum_retries_;
131+
ReturnType rt;
122132
while (retries--) {
123133
auto start = std::chrono::steady_clock::now();
124134
// If the callback returns true, we're done
125-
if (callback()) {
126-
return;
135+
rt = callback();
136+
if (rt.abort_) {
137+
if (rt.error_) {
138+
break; //terminal error
139+
}
140+
return; //success
127141
}
128142
auto end = std::chrono::steady_clock::now();
129143
auto time_elapsed = end - start;
@@ -134,6 +148,8 @@ class CPPKAFKA_API BackoffPerformer {
134148
// Increase out backoff depending on the policy being used
135149
backoff = increase_backoff(backoff);
136150
}
151+
// No more retries left or we have a terminal error.
152+
throw ConsumerException(rt.error_);
137153
}
138154
private:
139155
TimeUnit increase_backoff(TimeUnit backoff);

src/topic_partition_list.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ using std::vector;
3838
using std::set;
3939
using std::ostream;
4040
using std::string;
41-
using std::equal;
4241

4342
namespace cppkafka {
4443

src/utils/backoff_committer.cpp

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,15 +43,21 @@ void BackoffCommitter::set_error_callback(ErrorCallback callback) {
4343
callback_ = move(callback);
4444
}
4545

46+
void BackoffCommitter::commit() {
47+
perform([&]()->ReturnType {
48+
return do_commit<TopicPartitionList>(nullptr);
49+
});
50+
}
51+
4652
void BackoffCommitter::commit(const Message& msg) {
47-
perform([&] {
48-
return do_commit(msg);
53+
perform([&]()->ReturnType {
54+
return do_commit(&msg);
4955
});
5056
}
5157

5258
void BackoffCommitter::commit(const TopicPartitionList& topic_partitions) {
53-
perform([&] {
54-
return do_commit(topic_partitions);
59+
perform([&]()->ReturnType {
60+
return do_commit(&topic_partitions);
5561
});
5662
}
5763

0 commit comments

Comments
 (0)