diff --git a/CHANGELOG.md b/CHANGELOG.md
index d697493c..7e562a58 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,7 @@
# Changelog
+- 4.5.0
+ - Expose `create_partitions` from the underlying kafka_protocol
+ - Allow partitions to automatically have producers started for newly discovered partitions.
- 4.4.6
- Upgrade to `kafka_protocol-4.2.8` for dependency (`crc32cer-1.0.4`) to fix its link error.
diff --git a/src/brod.erl b/src/brod.erl
index 415ad28c..d49dba5f 100644
--- a/src/brod.erl
+++ b/src/brod.erl
@@ -96,6 +96,8 @@
%% Topic APIs
-export([ create_topics/3
, create_topics/4
+ , create_partitions/3
+ , create_partitions/4
, delete_topics/3
, delete_topics/4
]).
@@ -190,6 +192,7 @@
-type endpoint() :: {hostname(), portnum()}.
-type topic() :: kpro:topic().
-type topic_config() :: kpro:struct().
+-type topic_partition_config() :: kpro:struct().
-type partition() :: kpro:partition().
-type topic_partition() :: {topic(), partition()}.
-type offset() :: kpro:offset(). %% Physical offset (an integer)
@@ -1049,6 +1052,44 @@ create_topics(Hosts, TopicConfigs, RequestConfigs) ->
create_topics(Hosts, TopicConfigs, RequestConfigs, Options) ->
brod_utils:create_topics(Hosts, TopicConfigs, RequestConfigs, Options).
+%% @equiv create_partitions(Hosts, TopicPartitionConfigs, RequestConfigs, [])
+-spec create_partitions([endpoint()], [topic_partition_config()], #{timeout => kpro:int32()}) ->
+ ok | {error, any()}.
+create_partitions(Hosts, TopicPartitionConfigs, RequestConfigs) ->
+ brod_utils:create_partitions(Hosts, TopicPartitionConfigs, RequestConfigs).
+
+%% @doc Create partitions(s) in kafka.
+%%
+%% - `topic'
+%% The topic name.
+%%
+%%
+%% - `new_partitions'
+%% The `count` of how many partitions will exist for the topic. current + desired
+%% The number of `assignment` should be equal to the number of new partitions.
+%% Each list for assignment specify the prefferred broker ids to assign
+%%
+%%
+%% Example:
+%% ```
+%% > TopicPartitionConfigs = [
+%% #{
+%% topic: <<"my_topic">>,
+%% new_partitions: #{
+%% count: 6,
+%% assignment: [[1,2], [2,3], [3,1]]
+%% }
+%% }
+%% ].
+%% > brod:create_partitions([{"localhost", 9092}], TopicPartitionConfigs, #{timeout => 1000}, []).
+%% ok
+%% '''
+-spec create_partitions([endpoint()], [topic_partition_config()], #{timeout => kpro:int32()},
+ conn_config()) ->
+ ok | {error, any()}.
+create_partitions(Hosts, TopicPartitionConfigs, RequestConfigs, Options) ->
+ brod_utils:create_partitions(Hosts, TopicPartitionConfigs, RequestConfigs, Options).
+
%% @equiv delete_topics(Hosts, Topics, Timeout, [])
-spec delete_topics([endpoint()], [topic()], pos_integer()) ->
ok | {error, any()}.
diff --git a/src/brod_client.erl b/src/brod_client.erl
index a03c24d0..06d5bc9e 100644
--- a/src/brod_client.erl
+++ b/src/brod_client.erl
@@ -73,6 +73,7 @@
-define(DEFAULT_RECONNECT_COOL_DOWN_SECONDS, 1).
-define(DEFAULT_GET_METADATA_TIMEOUT_SECONDS, 5).
-define(DEFAULT_UNKNOWN_TOPIC_CACHE_TTL, 120000).
+-define(DEFAULT_SYNC_METADATA_INTERVAL_SECONDS, 60).
%% ClientId as ets table name.
-define(ETS(ClientId), ClientId).
@@ -131,6 +132,7 @@
, consumers_sup :: ?undef | pid()
, config :: ?undef | config()
, workers_tab :: ?undef | ets:tab()
+ , partitions_sync :: ?undef | pid()
}).
-type state() :: #state{}.
@@ -363,13 +365,20 @@ handle_continue(init, State0) ->
State1 = ensure_metadata_connection(State0),
{ok, ProducersSupPid} = brod_producers_sup:start_link(),
{ok, ConsumersSupPid} = brod_consumers_sup:start_link(),
- State = State1#state{ bootstrap_endpoints = Endpoints
+ State2 = State1#state{ bootstrap_endpoints = Endpoints
, producers_sup = ProducersSupPid
, consumers_sup = ConsumersSupPid
},
+ State = State2#state{partitions_sync = maybe_start_partitions_sync(State2)},
{noreply, State}.
%% @private
+handle_info({'EXIT', Pid, Reason}, #state{ client_id = ClientId
+ , partitions_sync = Pid
+ } = State) ->
+ ?BROD_LOG_ERROR("client ~p partitions sync down~nReason: ~p", [ClientId, Pid, Reason]),
+ NewState = State#state{partitions_sync = maybe_start_partitions_sync(State)},
+ {noreply, NewState};
handle_info({'EXIT', Pid, Reason}, #state{ client_id = ClientId
, producers_sup = Pid
} = State) ->
@@ -796,6 +805,17 @@ do_connect(Endpoint, State) ->
ConnConfig = conn_config(State),
kpro:connect(Endpoint, ConnConfig).
+maybe_start_partitions_sync(#state{ client_id = Client
+ , producers_sup = ProducersSup
+ , config = Config }) ->
+ case config(sync_partitions, Config, false) of
+ true ->
+ {ok, Pid} = brod_partitions_sync:start_link(Client, ProducersSup, Config),
+ Pid;
+ false ->
+ undefined
+ end.
+
%% Handle connection pid EXIT event, for payload sockets keep the timestamp,
%% but do not restart yet. Payload connection will be re-established when a
%% per-partition worker restarts and requests for a connection after
@@ -1082,4 +1102,4 @@ now_ts() -> erlang:monotonic_time(millisecond).
%%% Local Variables:
%%% allout-layout: t
%%% erlang-indent-level: 2
-%%% End:
+%%% End:
\ No newline at end of file
diff --git a/src/brod_kafka_apis.erl b/src/brod_kafka_apis.erl
index 78a529fc..5e647dfb 100644
--- a/src/brod_kafka_apis.erl
+++ b/src/brod_kafka_apis.erl
@@ -152,6 +152,7 @@ supported_versions() ->
, describe_groups => {0, 5}
, list_groups => {0, 3}
, create_topics => {0, 4}
+ , create_partitions => {0, 1}
, delete_topics => {0, 4}
}.
diff --git a/src/brod_kafka_request.erl b/src/brod_kafka_request.erl
index b2403a01..1adfb6fb 100644
--- a/src/brod_kafka_request.erl
+++ b/src/brod_kafka_request.erl
@@ -18,6 +18,7 @@
-module(brod_kafka_request).
-export([ create_topics/3
+ , create_partitions/3
, delete_topics/3
, fetch/8
, list_groups/1
@@ -36,6 +37,7 @@
-type vsn() :: brod_kafka_apis:vsn().
-type topic() :: brod:topic().
-type topic_config() :: kpro:struct().
+-type topic_partition_config() :: kpro:struct().
-type partition() :: brod:partition().
-type offset() :: brod:offset().
-type conn() :: kpro:connection().
@@ -65,6 +67,16 @@ create_topics(Connection, TopicConfigs, RequestConfigs)
create_topics(Vsn, TopicConfigs, RequestConfigs) ->
kpro_req_lib:create_topics(Vsn, TopicConfigs, RequestConfigs).
+%% @doc Make a create_partitions request.
+-spec create_partitions(vsn() | conn(), [topic_partition_config()], #{timeout => kpro:int32(),
+ validate_only => boolean()}) -> kpro:req().
+create_partitions(Connection, TopicPartitionConfigs, RequestConfigs)
+ when is_pid(Connection) ->
+ Vsn = brod_kafka_apis:pick_version(Connection, create_partitions),
+ create_partitions(Vsn, TopicPartitionConfigs, RequestConfigs);
+create_partitions(Vsn, TopicPartitionConfigs, RequestConfigs) ->
+ kpro_req_lib:create_partitions(Vsn, TopicPartitionConfigs, RequestConfigs).
+
%% @doc Make a delete_topics request.
-spec delete_topics(vsn() | conn(), [topic()], pos_integer()) -> kpro:req().
delete_topics(Connection, Topics, Timeout) when is_pid(Connection) ->
diff --git a/src/brod_partitions_sync.erl b/src/brod_partitions_sync.erl
new file mode 100644
index 00000000..61a359fd
--- /dev/null
+++ b/src/brod_partitions_sync.erl
@@ -0,0 +1,166 @@
+%%% Licensed under the Apache License, Version 2.0 (the "License");
+%%% you may not use this file except in compliance with the License.
+%%% You may obtain a copy of the License at
+%%%
+%%% http://www.apache.org/licenses/LICENSE-2.0
+%%%
+%%% Unless required by applicable law or agreed to in writing, software
+%%% distributed under the License is distributed on an "AS IS" BASIS,
+%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%%% See the License for the specific language governing permissions and
+%%% limitations under the License.
+%%%
+
+%% @doc A `brod_partitions_sync' is a `gen_server' that is responsible for fetching
+%% the latest partition counts for a client and ensuring external changes to partitions
+%% are propogated to the clients and starts a producer for the partition if not present
+-module(brod_partitions_sync).
+-behaviour(gen_server).
+
+-export([
+ start_link/3
+]).
+
+-export([
+ code_change/3,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ init/1,
+ terminate/2
+]).
+
+-include("brod_int.hrl").
+
+-define(DEFAULT_SYNC_PARTITIONS_INTERVAL_SECONDS, 60).
+
+-record(state, {
+ client_id :: pid(),
+ producers_sup :: pid(),
+ interval :: non_neg_integer(),
+ config :: brod_client:config()
+}).
+
+-type state() :: #state{}.
+
+-spec start_link(atom(), pid(), brod_client:config()) ->
+ {ok, pid()} | {error, any()}.
+start_link(ClientId, ProducersSup, Config) ->
+ gen_server:start_link(?MODULE, {ClientId, ProducersSup, Config}, []).
+
+-spec init({pid(), pid(), brod_client:config()}) ->
+ {ok, state()}.
+init({ClientId, ProducersSup, Config}) ->
+ Interval = sync_partition_interval(Config),
+ schedule_sync(Interval),
+ State = #state{
+ client_id = ClientId,
+ producers_sup = ProducersSup,
+ interval = Interval,
+ config = Config
+ },
+
+ {ok, State}.
+
+%% @private
+handle_info(
+ sync,
+ #state{
+ client_id = Client,
+ config = Config,
+ producers_sup = Sup,
+ interval = Interval
+ } = State
+) ->
+ sync_partitions(Client, Sup, Config),
+ sync_client(Client),
+ schedule_sync(Interval),
+ {noreply, State};
+handle_info(_Info, #state{} = State) ->
+ {noreply, State}.
+
+%% @private
+handle_call(Call, _From, #state{} = State) ->
+ {reply, {error, {unsupported_call, Call}}, State}.
+
+%% @private
+handle_cast(_Cast, #state{} = State) ->
+ {noreply, State}.
+
+%% @private
+code_change(_OldVsn, #state{} = State, _Extra) ->
+ {ok, State}.
+
+%% @private
+terminate(_Reason, _State) ->
+ ok.
+
+%% @private
+sync_client(Client) ->
+ case brod_client:get_metadata_safe(Client, []) of
+ {ok, _} ->
+ ok;
+ {error, Error} ->
+ ?BROD_LOG_ERROR("Partitions Sync Client MetaData Error: ~p", [Error]),
+ ok
+ end.
+
+%% @private
+sync_partitions(Client, Sup, Config) ->
+ Producers = brod_supervisor3:which_children(Sup),
+ TopicsList = lists:map(fun({Topic, ProducerPid, _, _}) -> {Topic, ProducerPid} end, Producers),
+ TopicsMap = maps:from_list(TopicsList),
+ MetaData = get_metadata_for_topics(Client, maps:keys(TopicsMap)),
+ lists:foreach(
+ fun(#{name := Topic, partitions := Partitions}) ->
+ ProducerPid = maps:get(Topic, TopicsMap),
+
+ lists:foreach(
+ fun(#{partition_index := Partition}) ->
+ sync_partition(ProducerPid, Client, Topic, Partition, Config)
+ end,
+ Partitions
+ )
+ end,
+ MetaData
+ ),
+ ok.
+
+%% @private
+schedule_sync(Interval) ->
+ erlang:send_after(Interval, self(), sync).
+
+%% @private
+sync_partition(Sup, Client, Topic, Partition, Config) ->
+ case brod_producers_sup:find_producer(Sup, Topic, Partition) of
+ {ok, Pid} ->
+ {ok, Pid};
+ {error, _} ->
+ brod_producers_sup:start_producer(Sup, Client, Topic, Partition, Config)
+ end.
+
+%% @private
+sync_partition_interval(Config) ->
+ T = proplists:get_value(
+ sync_partitions_interval_seconds, Config, ?DEFAULT_SYNC_PARTITIONS_INTERVAL_SECONDS
+ ),
+ timer:seconds(T).
+
+%% @private
+get_metadata_for_topics(_Client, []) ->
+ [];
+%% @private
+get_metadata_for_topics(Client, Topics) ->
+ case brod_client:get_bootstrap(Client) of
+ {ok, {Hosts, Conn}} ->
+ case brod_utils:get_metadata(Hosts, Topics, Conn) of
+ {ok, #{topics := MetaData}} ->
+ MetaData;
+ {error, Error} ->
+ ?BROD_LOG_ERROR("Partitions Sync MetaData Error: ~p", [Error]),
+ []
+ end;
+ {error, Error} ->
+ ?BROD_LOG_ERROR("Partitions Sync Bootstrap Error: ~p", [Error]),
+ []
+ end.
\ No newline at end of file
diff --git a/src/brod_producers_sup.erl b/src/brod_producers_sup.erl
index 3f70519d..ccd17a9a 100644
--- a/src/brod_producers_sup.erl
+++ b/src/brod_producers_sup.erl
@@ -217,4 +217,4 @@ take_delay_secs(Config, Name, DefaultValue) ->
%%% Local Variables:
%%% allout-layout: t
%%% erlang-indent-level: 2
-%%% End:
+%%% End:
\ No newline at end of file
diff --git a/src/brod_utils.erl b/src/brod_utils.erl
index e6525f01..e3c54b9a 100644
--- a/src/brod_utils.erl
+++ b/src/brod_utils.erl
@@ -26,6 +26,8 @@
, describe_groups/3
, create_topics/3
, create_topics/4
+ , create_partitions/3
+ , create_partitions/4
, delete_topics/3
, delete_topics/4
, epoch_ms/0
@@ -73,6 +75,7 @@
-type conn_config() :: brod:conn_config().
-type topic() :: brod:topic().
-type topic_config() :: kpro:struct().
+-type topic_partition_config() :: kpro:struct().
-type partition() :: brod:partition().
-type offset() :: brod:offset().
-type endpoint() :: brod:endpoint().
@@ -103,6 +106,27 @@ create_topics(Hosts, TopicConfigs, RequestConfigs, ConnCfg) ->
Pid, TopicConfigs, RequestConfigs),
request_sync(Pid, Request)
end).
+
+%% @equiv create_partitions(Hosts, TopicPartitionConfigs, RequestConfigs, [])
+-spec create_partitions([endpoint()], [topic_partition_config()], #{timeout => kpro:int32()}) ->
+ ok | {error, any()}.
+create_partitions(Hosts, TopicPartitionConfigs, RequestConfigs) ->
+ create_partitions(Hosts, TopicPartitionConfigs, RequestConfigs, _ConnCfg = []).
+
+%% @doc Try to connect to the controller node using the given
+%% connection options and create the given partitions with configs
+-spec create_partitions([endpoint()], [topic_partition_config()], #{timeout => kpro:int32()},
+ conn_config()) ->
+ ok | {error, any()}.
+create_partitions(Hosts, TopicPartitionConfigs, RequestConfigs, ConnCfg) ->
+ KproOpts = kpro_connection_options(ConnCfg),
+ with_conn(kpro:connect_controller(Hosts, nolink(ConnCfg), KproOpts),
+ fun(Pid) ->
+ Request = brod_kafka_request:create_partitions(
+ Pid, TopicPartitionConfigs, RequestConfigs),
+ request_sync(Pid, Request)
+ end).
+
%% @equiv delete_topics(Hosts, Topics, Timeout, [])
-spec delete_topics([endpoint()], [topic()], pos_integer()) ->
ok | {error, any()}.
diff --git a/test/brod_SUITE.erl b/test/brod_SUITE.erl
index 806d97da..ea34c668 100644
--- a/test/brod_SUITE.erl
+++ b/test/brod_SUITE.erl
@@ -25,7 +25,7 @@
]).
%% Test cases
--export([ t_create_delete_topics/1
+-export([ t_create_update_delete_topics/1
, t_delete_topics_not_found/1
]).
@@ -60,7 +60,7 @@ all() -> [F || {F, _A} <- module_info(exports),
%%%_* Test functions ===========================================================
-t_create_delete_topics(Config) when is_list(Config) ->
+t_create_update_delete_topics(Config) when is_list(Config) ->
Topic = iolist_to_binary(["test-topic-", integer_to_list(erlang:system_time())]),
TopicConfig = [
#{
@@ -71,9 +71,22 @@ t_create_delete_topics(Config) when is_list(Config) ->
name => Topic
}
],
+ TopicPartitionConfig = [
+ #{
+ topic => Topic,
+ new_partitions => #{
+ count => 2,
+ assignment => [[1]]
+ }
+ }
+ ],
try
?assertEqual(ok,
brod:create_topics(?HOSTS, TopicConfig, #{timeout => ?TIMEOUT},
+ #{connect_timeout => ?TIMEOUT})),
+
+ ?assertEqual(ok,
+ brod:create_partitions(?HOSTS, TopicPartitionConfig, #{timeout => ?TIMEOUT},
#{connect_timeout => ?TIMEOUT}))
after
?assertEqual(ok, brod:delete_topics(?HOSTS, [Topic], ?TIMEOUT,
diff --git a/test/brod_client_SUITE.erl b/test/brod_client_SUITE.erl
index af3ef6f3..130ad85b 100644
--- a/test/brod_client_SUITE.erl
+++ b/test/brod_client_SUITE.erl
@@ -29,7 +29,8 @@
-export([ auth/6 ]).
%% Test cases
--export([ t_skip_unreachable_endpoint/1
+-export([ t_optional_partitions_syncing/1
+ , t_skip_unreachable_endpoint/1
, t_no_reachable_endpoint/1
, t_call_bad_client_id/1
, t_metadata_connection_restart/1
@@ -69,6 +70,18 @@
end
end()).
+-record(state,
+ { client_id :: pid()
+ , bootstrap_endpoints :: [brod:endpoint()]
+ , meta_conn :: kpro:connection()
+ , payload_conns = [] :: list()
+ , producers_sup :: pid()
+ , consumers_sup :: pid()
+ , config :: list()
+ , workers_tab :: ets:tab()
+ , partitions_sync :: reference()
+ }).
+
%%%_* ct callbacks =============================================================
suite() -> [{timetrap, {seconds, 30}}].
@@ -114,6 +127,20 @@ all() ->
%%%_* Test functions ===========================================================
+t_optional_partitions_syncing(Config) when is_list(Config) ->
+ Client0 = has_sync,
+ Config0 = [{sync_partitions, true}],
+ ok = start_client(?HOSTS, Client0, Config0),
+ #state{ partitions_sync = Pid0 } = sys:get_state(Client0),
+ ?assert(is_process_alive(Pid0)),
+ ok = brod:stop_client(Client0),
+
+ Config1 = [{sync_partitions, false}],
+ Client1 = no_sync,
+ ok = start_client(?HOSTS, Client1, Config1),
+ #state{ partitions_sync = Pid1 } = sys:get_state(Client1),
+ ?assertMatch(undefined, Pid1),
+ ok = brod:stop_client(Client1).
t_get_partitions_count_safe(Config) when is_list(Config) ->
Client = ?FUNCTION_NAME,
@@ -126,7 +153,6 @@ t_get_partitions_count_safe(Config) when is_list(Config) ->
?assertMatch({error, unknown_topic_or_partition}, Res2),
ok = brod:stop_client(Client).
-
t_get_partitions_count_configure_cache_ttl(Config) when is_list(Config) ->
Client = ?FUNCTION_NAME,
ClientConfig = [{unknown_topic_cache_ttl, 100}],
@@ -141,7 +167,6 @@ t_get_partitions_count_configure_cache_ttl(Config) when is_list(Config) ->
?assertMatch(false, Res3),
ok = brod:stop_client(Client).
-
t_skip_unreachable_endpoint(Config) when is_list(Config) ->
Client = t_skip_unreachable_endpoint,
ok = start_client([{"localhost", 8192} | ?HOSTS], Client),