Skip to content
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
# Changelog
- 4.5.0
- Expose `create_partitions` from the underlying kafka_protocol
- Allow partitions to automatically have producers started for newly discovered partitions.

- 4.4.6
- Upgrade to `kafka_protocol-4.2.8` for dependency (`crc32cer-1.0.4`) to fix its link error.
Expand Down
41 changes: 41 additions & 0 deletions src/brod.erl
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@
%% Topic APIs
-export([ create_topics/3
, create_topics/4
, create_partitions/3
, create_partitions/4
, delete_topics/3
, delete_topics/4
]).
Expand Down Expand Up @@ -190,6 +192,7 @@
-type endpoint() :: {hostname(), portnum()}.
-type topic() :: kpro:topic().
-type topic_config() :: kpro:struct().
-type topic_partition_config() :: kpro:struct().
-type partition() :: kpro:partition().
-type topic_partition() :: {topic(), partition()}.
-type offset() :: kpro:offset(). %% Physical offset (an integer)
Expand Down Expand Up @@ -1049,6 +1052,44 @@ create_topics(Hosts, TopicConfigs, RequestConfigs) ->
create_topics(Hosts, TopicConfigs, RequestConfigs, Options) ->
brod_utils:create_topics(Hosts, TopicConfigs, RequestConfigs, Options).

%% @equiv create_partitions(Hosts, TopicPartitionConfigs, RequestConfigs, [])
-spec create_partitions([endpoint()], [topic_partition_config()], #{timeout => kpro:int32()}) ->
ok | {error, any()}.
create_partitions(Hosts, TopicPartitionConfigs, RequestConfigs) ->
brod_utils:create_partitions(Hosts, TopicPartitionConfigs, RequestConfigs).

%% @doc Create partitions(s) in kafka.
%% <ul>
%% <li>`topic'
%% The topic name.
%% </li>
%%
%% <li>`new_partitions'
%% The `count` of how many partitions will exist for the topic. current + desired
%% The number of `assignment` should be equal to the number of new partitions.
%% Each list for assignment specify the prefferred broker ids to assign
%% </li>
%%
%% Example:
%% ```
%% > TopicPartitionConfigs = [
%% #{
%% topic: <<"my_topic">>,
%% new_partitions: #{
%% count: 6,
%% assignment: [[1,2], [2,3], [3,1]]
%% }
%% }
%% ].
%% > brod:create_partitions([{"localhost", 9092}], TopicPartitionConfigs, #{timeout => 1000}, []).
%% ok
%% '''
-spec create_partitions([endpoint()], [topic_partition_config()], #{timeout => kpro:int32()},
conn_config()) ->
ok | {error, any()}.
create_partitions(Hosts, TopicPartitionConfigs, RequestConfigs, Options) ->
brod_utils:create_partitions(Hosts, TopicPartitionConfigs, RequestConfigs, Options).

%% @equiv delete_topics(Hosts, Topics, Timeout, [])
-spec delete_topics([endpoint()], [topic()], pos_integer()) ->
ok | {error, any()}.
Expand Down
24 changes: 22 additions & 2 deletions src/brod_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
-define(DEFAULT_RECONNECT_COOL_DOWN_SECONDS, 1).
-define(DEFAULT_GET_METADATA_TIMEOUT_SECONDS, 5).
-define(DEFAULT_UNKNOWN_TOPIC_CACHE_TTL, 120000).
-define(DEFAULT_SYNC_METADATA_INTERVAL_SECONDS, 60).

%% ClientId as ets table name.
-define(ETS(ClientId), ClientId).
Expand Down Expand Up @@ -131,6 +132,7 @@
, consumers_sup :: ?undef | pid()
, config :: ?undef | config()
, workers_tab :: ?undef | ets:tab()
, partitions_sync :: ?undef | pid()
}).

-type state() :: #state{}.
Expand Down Expand Up @@ -363,13 +365,20 @@ handle_continue(init, State0) ->
State1 = ensure_metadata_connection(State0),
{ok, ProducersSupPid} = brod_producers_sup:start_link(),
{ok, ConsumersSupPid} = brod_consumers_sup:start_link(),
State = State1#state{ bootstrap_endpoints = Endpoints
State2 = State1#state{ bootstrap_endpoints = Endpoints
, producers_sup = ProducersSupPid
, consumers_sup = ConsumersSupPid
},
State = State2#state{partitions_sync = maybe_start_partitions_sync(State2)},
{noreply, State}.

%% @private
handle_info({'EXIT', Pid, Reason}, #state{ client_id = ClientId
, partitions_sync = Pid
} = State) ->
?BROD_LOG_ERROR("client ~p partitions sync down~nReason: ~p", [ClientId, Pid, Reason]),
NewState = State#state{partitions_sync = maybe_start_partitions_sync(State)},
{noreply, NewState};
handle_info({'EXIT', Pid, Reason}, #state{ client_id = ClientId
, producers_sup = Pid
} = State) ->
Expand Down Expand Up @@ -796,6 +805,17 @@ do_connect(Endpoint, State) ->
ConnConfig = conn_config(State),
kpro:connect(Endpoint, ConnConfig).

maybe_start_partitions_sync(#state{ client_id = Client
, producers_sup = ProducersSup
, config = Config }) ->
case config(sync_partitions, Config, false) of
true ->
{ok, Pid} = brod_partitions_sync:start_link(Client, ProducersSup, Config),
Pid;
false ->
undefined
end.

%% Handle connection pid EXIT event, for payload sockets keep the timestamp,
%% but do not restart yet. Payload connection will be re-established when a
%% per-partition worker restarts and requests for a connection after
Expand Down Expand Up @@ -1082,4 +1102,4 @@ now_ts() -> erlang:monotonic_time(millisecond).
%%% Local Variables:
%%% allout-layout: t
%%% erlang-indent-level: 2
%%% End:
%%% End:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: please revert this.

1 change: 1 addition & 0 deletions src/brod_kafka_apis.erl
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ supported_versions() ->
, describe_groups => {0, 5}
, list_groups => {0, 3}
, create_topics => {0, 4}
, create_partitions => {0, 1}
, delete_topics => {0, 4}
}.

Expand Down
12 changes: 12 additions & 0 deletions src/brod_kafka_request.erl
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
-module(brod_kafka_request).

-export([ create_topics/3
, create_partitions/3
, delete_topics/3
, fetch/8
, list_groups/1
Expand All @@ -36,6 +37,7 @@
-type vsn() :: brod_kafka_apis:vsn().
-type topic() :: brod:topic().
-type topic_config() :: kpro:struct().
-type topic_partition_config() :: kpro:struct().
-type partition() :: brod:partition().
-type offset() :: brod:offset().
-type conn() :: kpro:connection().
Expand Down Expand Up @@ -65,6 +67,16 @@ create_topics(Connection, TopicConfigs, RequestConfigs)
create_topics(Vsn, TopicConfigs, RequestConfigs) ->
kpro_req_lib:create_topics(Vsn, TopicConfigs, RequestConfigs).

%% @doc Make a create_partitions request.
-spec create_partitions(vsn() | conn(), [topic_partition_config()], #{timeout => kpro:int32(),
validate_only => boolean()}) -> kpro:req().
create_partitions(Connection, TopicPartitionConfigs, RequestConfigs)
when is_pid(Connection) ->
Vsn = brod_kafka_apis:pick_version(Connection, create_partitions),
create_partitions(Vsn, TopicPartitionConfigs, RequestConfigs);
create_partitions(Vsn, TopicPartitionConfigs, RequestConfigs) ->
kpro_req_lib:create_partitions(Vsn, TopicPartitionConfigs, RequestConfigs).

%% @doc Make a delete_topics request.
-spec delete_topics(vsn() | conn(), [topic()], pos_integer()) -> kpro:req().
delete_topics(Connection, Topics, Timeout) when is_pid(Connection) ->
Expand Down
166 changes: 166 additions & 0 deletions src/brod_partitions_sync.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
%%% Licensed under the Apache License, Version 2.0 (the "License");
%%% you may not use this file except in compliance with the License.
%%% You may obtain a copy of the License at
%%%
%%% http://www.apache.org/licenses/LICENSE-2.0
%%%
%%% Unless required by applicable law or agreed to in writing, software
%%% distributed under the License is distributed on an "AS IS" BASIS,
%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%%% See the License for the specific language governing permissions and
%%% limitations under the License.
%%%

%% @doc A `brod_partitions_sync' is a `gen_server' that is responsible for fetching
%% the latest partition counts for a client and ensuring external changes to partitions
%% are propogated to the clients and starts a producer for the partition if not present
-module(brod_partitions_sync).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is maybe no need for this module.
we can start a timer (ticker) in brod_client process to trigger a periodic metadata refresh.

brod_client will automatically start the producers for newly discovered partitions.

brod/src/brod_client.erl

Lines 638 to 643 in 674ae4d

ok = maybe_start_partition_producer(
filter_topics(TopicsMetadata),
brod_producers_sup:count_started_children(ProducersSup),
Topics,
ProducersSup
),

-behaviour(gen_server).

-export([
start_link/3
]).

-export([
code_change/3,
handle_call/3,
handle_cast/2,
handle_info/2,
init/1,
terminate/2
]).

-include("brod_int.hrl").

-define(DEFAULT_SYNC_PARTITIONS_INTERVAL_SECONDS, 60).

-record(state, {
client_id :: pid(),
producers_sup :: pid(),
interval :: non_neg_integer(),
config :: brod_client:config()
}).

-type state() :: #state{}.

-spec start_link(atom(), pid(), brod_client:config()) ->
{ok, pid()} | {error, any()}.
start_link(ClientId, ProducersSup, Config) ->
gen_server:start_link(?MODULE, {ClientId, ProducersSup, Config}, []).

-spec init({pid(), pid(), brod_client:config()}) ->
{ok, state()}.
init({ClientId, ProducersSup, Config}) ->
Interval = sync_partition_interval(Config),
schedule_sync(Interval),
State = #state{
client_id = ClientId,
producers_sup = ProducersSup,
interval = Interval,
config = Config
},

{ok, State}.

%% @private
handle_info(
sync,
#state{
client_id = Client,
config = Config,
producers_sup = Sup,
interval = Interval
} = State
) ->
sync_partitions(Client, Sup, Config),
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This ensure the new partitions get a producer but then we need to tell the client to update the metadata so it can use the new partition.

sync_client(Client),
schedule_sync(Interval),
{noreply, State};
handle_info(_Info, #state{} = State) ->
{noreply, State}.

%% @private
handle_call(Call, _From, #state{} = State) ->
{reply, {error, {unsupported_call, Call}}, State}.

%% @private
handle_cast(_Cast, #state{} = State) ->
{noreply, State}.

%% @private
code_change(_OldVsn, #state{} = State, _Extra) ->
{ok, State}.

%% @private
terminate(_Reason, _State) ->
ok.

%% @private
sync_client(Client) ->
case brod_client:get_metadata_safe(Client, []) of
{ok, _} ->
ok;
{error, Error} ->
?BROD_LOG_ERROR("Partitions Sync Client MetaData Error: ~p", [Error]),
ok
end.

%% @private
sync_partitions(Client, Sup, Config) ->
Producers = brod_supervisor3:which_children(Sup),
TopicsList = lists:map(fun({Topic, ProducerPid, _, _}) -> {Topic, ProducerPid} end, Producers),
TopicsMap = maps:from_list(TopicsList),
MetaData = get_metadata_for_topics(Client, maps:keys(TopicsMap)),
lists:foreach(
fun(#{name := Topic, partitions := Partitions}) ->
ProducerPid = maps:get(Topic, TopicsMap),

lists:foreach(
fun(#{partition_index := Partition}) ->
sync_partition(ProducerPid, Client, Topic, Partition, Config)
end,
Partitions
)
end,
MetaData
),
ok.

%% @private
schedule_sync(Interval) ->
erlang:send_after(Interval, self(), sync).

%% @private
sync_partition(Sup, Client, Topic, Partition, Config) ->
case brod_producers_sup:find_producer(Sup, Topic, Partition) of
{ok, Pid} ->
{ok, Pid};
{error, _} ->
brod_producers_sup:start_producer(Sup, Client, Topic, Partition, Config)
end.

%% @private
sync_partition_interval(Config) ->
T = proplists:get_value(
sync_partitions_interval_seconds, Config, ?DEFAULT_SYNC_PARTITIONS_INTERVAL_SECONDS
),
timer:seconds(T).

%% @private
get_metadata_for_topics(_Client, []) ->
[];
%% @private
get_metadata_for_topics(Client, Topics) ->
case brod_client:get_bootstrap(Client) of
{ok, {Hosts, Conn}} ->
case brod_utils:get_metadata(Hosts, Topics, Conn) of
{ok, #{topics := MetaData}} ->
MetaData;
{error, Error} ->
?BROD_LOG_ERROR("Partitions Sync MetaData Error: ~p", [Error]),
[]
end;
{error, Error} ->
?BROD_LOG_ERROR("Partitions Sync Bootstrap Error: ~p", [Error]),
[]
end.
2 changes: 1 addition & 1 deletion src/brod_producers_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -217,4 +217,4 @@ take_delay_secs(Config, Name, DefaultValue) ->
%%% Local Variables:
%%% allout-layout: t
%%% erlang-indent-level: 2
%%% End:
%%% End:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please revert this change

24 changes: 24 additions & 0 deletions src/brod_utils.erl
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
, describe_groups/3
, create_topics/3
, create_topics/4
, create_partitions/3
, create_partitions/4
, delete_topics/3
, delete_topics/4
, epoch_ms/0
Expand Down Expand Up @@ -73,6 +75,7 @@
-type conn_config() :: brod:conn_config().
-type topic() :: brod:topic().
-type topic_config() :: kpro:struct().
-type topic_partition_config() :: kpro:struct().
-type partition() :: brod:partition().
-type offset() :: brod:offset().
-type endpoint() :: brod:endpoint().
Expand Down Expand Up @@ -103,6 +106,27 @@ create_topics(Hosts, TopicConfigs, RequestConfigs, ConnCfg) ->
Pid, TopicConfigs, RequestConfigs),
request_sync(Pid, Request)
end).

%% @equiv create_partitions(Hosts, TopicPartitionConfigs, RequestConfigs, [])
-spec create_partitions([endpoint()], [topic_partition_config()], #{timeout => kpro:int32()}) ->
ok | {error, any()}.
create_partitions(Hosts, TopicPartitionConfigs, RequestConfigs) ->
create_partitions(Hosts, TopicPartitionConfigs, RequestConfigs, _ConnCfg = []).

%% @doc Try to connect to the controller node using the given
%% connection options and create the given partitions with configs
-spec create_partitions([endpoint()], [topic_partition_config()], #{timeout => kpro:int32()},
conn_config()) ->
ok | {error, any()}.
create_partitions(Hosts, TopicPartitionConfigs, RequestConfigs, ConnCfg) ->
KproOpts = kpro_connection_options(ConnCfg),
with_conn(kpro:connect_controller(Hosts, nolink(ConnCfg), KproOpts),
fun(Pid) ->
Request = brod_kafka_request:create_partitions(
Pid, TopicPartitionConfigs, RequestConfigs),
request_sync(Pid, Request)
end).

%% @equiv delete_topics(Hosts, Topics, Timeout, [])
-spec delete_topics([endpoint()], [topic()], pos_integer()) ->
ok | {error, any()}.
Expand Down
Loading
Loading