Skip to content

Commit a9e1e9d

Browse files
author
Chris Kempton
committed
update with master
1 parent 25ce7b6 commit a9e1e9d

File tree

9 files changed

+326
-9
lines changed

9 files changed

+326
-9
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
# Changelog
2+
- 4.5.0
3+
- Expose `create_partitions` from the underlying kafka_protocol
4+
- Allow partitions to automatically have producers started for newly discovered partitions.
25

36
- 4.4.6
47
- Upgrade to `kafka_protocol-4.2.8` for dependency (`crc32cer-1.0.4`) to fix its link error.

src/brod.erl

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,8 @@
9696
%% Topic APIs
9797
-export([ create_topics/3
9898
, create_topics/4
99+
, create_partitions/3
100+
, create_partitions/4
99101
, delete_topics/3
100102
, delete_topics/4
101103
]).
@@ -190,6 +192,7 @@
190192
-type endpoint() :: {hostname(), portnum()}.
191193
-type topic() :: kpro:topic().
192194
-type topic_config() :: kpro:struct().
195+
-type topic_partition_config() :: kpro:struct().
193196
-type partition() :: kpro:partition().
194197
-type topic_partition() :: {topic(), partition()}.
195198
-type offset() :: kpro:offset(). %% Physical offset (an integer)
@@ -1049,6 +1052,44 @@ create_topics(Hosts, TopicConfigs, RequestConfigs) ->
10491052
create_topics(Hosts, TopicConfigs, RequestConfigs, Options) ->
10501053
brod_utils:create_topics(Hosts, TopicConfigs, RequestConfigs, Options).
10511054

1055+
%% @equiv create_partitions(Hosts, TopicPartitionConfigs, RequestConfigs, [])
1056+
-spec create_partitions([endpoint()], [topic_partition_config()], #{timeout => kpro:int32()}) ->
1057+
ok | {error, any()}.
1058+
create_partitions(Hosts, TopicPartitionConfigs, RequestConfigs) ->
1059+
brod_utils:create_partitions(Hosts, TopicPartitionConfigs, RequestConfigs).
1060+
1061+
%% @doc Create partitions(s) in kafka.
1062+
%% <ul>
1063+
%% <li>`topic'
1064+
%% The topic name.
1065+
%% </li>
1066+
%%
1067+
%% <li>`new_partitions'
1068+
%% The `count` of how many partitions will exist for the topic. current + desired
1069+
%% The number of `assignment` should be equal to the number of new partitions.
1070+
%% Each list for assignment specify the prefferred broker ids to assign
1071+
%% </li>
1072+
%%
1073+
%% Example:
1074+
%% ```
1075+
%% > TopicPartitionConfigs = [
1076+
%% #{
1077+
%% topic: <<"my_topic">>,
1078+
%% new_partitions: #{
1079+
%% count: 6,
1080+
%% assignment: [[1,2], [2,3], [3,1]]
1081+
%% }
1082+
%% }
1083+
%% ].
1084+
%% > brod:create_partitions([{"localhost", 9092}], TopicPartitionConfigs, #{timeout => 1000}, []).
1085+
%% ok
1086+
%% '''
1087+
-spec create_partitions([endpoint()], [topic_partition_config()], #{timeout => kpro:int32()},
1088+
conn_config()) ->
1089+
ok | {error, any()}.
1090+
create_partitions(Hosts, TopicPartitionConfigs, RequestConfigs, Options) ->
1091+
brod_utils:create_partitions(Hosts, TopicPartitionConfigs, RequestConfigs, Options).
1092+
10521093
%% @equiv delete_topics(Hosts, Topics, Timeout, [])
10531094
-spec delete_topics([endpoint()], [topic()], pos_integer()) ->
10541095
ok | {error, any()}.

src/brod_client.erl

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@
7373
-define(DEFAULT_RECONNECT_COOL_DOWN_SECONDS, 1).
7474
-define(DEFAULT_GET_METADATA_TIMEOUT_SECONDS, 5).
7575
-define(DEFAULT_UNKNOWN_TOPIC_CACHE_TTL, 120000).
76+
-define(DEFAULT_SYNC_METADATA_INTERVAL_SECONDS, 60).
7677

7778
%% ClientId as ets table name.
7879
-define(ETS(ClientId), ClientId).
@@ -131,6 +132,7 @@
131132
, consumers_sup :: ?undef | pid()
132133
, config :: ?undef | config()
133134
, workers_tab :: ?undef | ets:tab()
135+
, partitions_sync :: ?undef | pid()
134136
}).
135137

136138
-type state() :: #state{}.
@@ -363,13 +365,20 @@ handle_continue(init, State0) ->
363365
State1 = ensure_metadata_connection(State0),
364366
{ok, ProducersSupPid} = brod_producers_sup:start_link(),
365367
{ok, ConsumersSupPid} = brod_consumers_sup:start_link(),
366-
State = State1#state{ bootstrap_endpoints = Endpoints
368+
State2 = State1#state{ bootstrap_endpoints = Endpoints
367369
, producers_sup = ProducersSupPid
368370
, consumers_sup = ConsumersSupPid
369371
},
372+
State = State2#state{partitions_sync = maybe_start_partitions_sync(State2)},
370373
{noreply, State}.
371374

372375
%% @private
376+
handle_info({'EXIT', Pid, Reason}, #state{ client_id = ClientId
377+
, partitions_sync = Pid
378+
} = State) ->
379+
?BROD_LOG_ERROR("client ~p partitions sync down~nReason: ~p", [ClientId, Pid, Reason]),
380+
NewState = State#state{partitions_sync = maybe_start_partitions_sync(State)},
381+
{noreply, NewState};
373382
handle_info({'EXIT', Pid, Reason}, #state{ client_id = ClientId
374383
, producers_sup = Pid
375384
} = State) ->
@@ -737,6 +746,15 @@ ensure_binary(ClientId) when is_atom(ClientId) ->
737746
ensure_binary(ClientId) when is_binary(ClientId) ->
738747
ClientId.
739748

749+
maybe_start_partitions_sync(#state{client_id = Client, producers_sup = ProducersSup, config = Config}) ->
750+
case config(sync_partitions, Config, false) of
751+
true ->
752+
{ok, Pid} = brod_partitions_sync:start_link(Client, ProducersSup, Config),
753+
Pid;
754+
false ->
755+
undefined
756+
end.
757+
740758
-spec maybe_connect(state(), endpoint()) ->
741759
{Result, state()} when Result :: {ok, pid()} | {error, any()}.
742760
maybe_connect(#state{} = State, Endpoint) ->

src/brod_kafka_apis.erl

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -157,11 +157,24 @@ supported_versions() ->
157157

158158
%% Do not change range without verification.
159159
supported_versions(API) ->
160-
try
161-
maps:get(API, supported_versions())
162-
catch
163-
_ : _ ->
164-
erlang:error({unsupported_api, API})
160+
case API of
161+
produce -> {0, 7};
162+
fetch -> {0, 10};
163+
list_offsets -> {0, 2};
164+
metadata -> {0, 2};
165+
offset_commit -> {2, 2};
166+
offset_fetch -> {1, 2};
167+
find_coordinator -> {0, 0};
168+
join_group -> {0, 1};
169+
heartbeat -> {0, 0};
170+
leave_group -> {0, 0};
171+
sync_group -> {0, 0};
172+
describe_groups -> {0, 0};
173+
list_groups -> {0, 0};
174+
create_topics -> {0, 0};
175+
create_partitions -> {0, 1};
176+
delete_topics -> {0, 0};
177+
_ -> erlang:error({unsupported_api, API})
165178
end.
166179

167180
monitor_connection(Conn) ->

src/brod_kafka_request.erl

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
-module(brod_kafka_request).
1919

2020
-export([ create_topics/3
21+
, create_partitions/3
2122
, delete_topics/3
2223
, fetch/8
2324
, list_groups/1
@@ -36,6 +37,7 @@
3637
-type vsn() :: brod_kafka_apis:vsn().
3738
-type topic() :: brod:topic().
3839
-type topic_config() :: kpro:struct().
40+
-type topic_partition_config() :: kpro:struct().
3941
-type partition() :: brod:partition().
4042
-type offset() :: brod:offset().
4143
-type conn() :: kpro:connection().
@@ -65,6 +67,16 @@ create_topics(Connection, TopicConfigs, RequestConfigs)
6567
create_topics(Vsn, TopicConfigs, RequestConfigs) ->
6668
kpro_req_lib:create_topics(Vsn, TopicConfigs, RequestConfigs).
6769

70+
%% @doc Make a create_partitions request.
71+
-spec create_partitions(vsn() | conn(), [topic_partition_config()], #{timeout => kpro:int32(),
72+
validate_only => boolean()}) -> kpro:req().
73+
create_partitions(Connection, TopicPartitionConfigs, RequestConfigs)
74+
when is_pid(Connection) ->
75+
Vsn = brod_kafka_apis:pick_version(Connection, create_partitions),
76+
create_partitions(Vsn, TopicPartitionConfigs, RequestConfigs);
77+
create_partitions(Vsn, TopicPartitionConfigs, RequestConfigs) ->
78+
kpro_req_lib:create_partitions(Vsn, TopicPartitionConfigs, RequestConfigs).
79+
6880
%% @doc Make a delete_topics request.
6981
-spec delete_topics(vsn() | conn(), [topic()], pos_integer()) -> kpro:req().
7082
delete_topics(Connection, Topics, Timeout) when is_pid(Connection) ->

src/brod_partitions_sync.erl

Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
%%% Licensed under the Apache License, Version 2.0 (the "License");
2+
%%% you may not use this file except in compliance with the License.
3+
%%% You may obtain a copy of the License at
4+
%%%
5+
%%% http://www.apache.org/licenses/LICENSE-2.0
6+
%%%
7+
%%% Unless required by applicable law or agreed to in writing, software
8+
%%% distributed under the License is distributed on an "AS IS" BASIS,
9+
%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
%%% See the License for the specific language governing permissions and
11+
%%% limitations under the License.
12+
%%%
13+
14+
%% @doc A `brod_partitions_sync' is a `gen_server' that is responsible for fetching
15+
%% the latest partition counts for a client and ensuring external changes to partitions
16+
%% are propogated to the clients and starts a producer for the partition if not present
17+
-module(brod_partitions_sync).
18+
-behaviour(gen_server).
19+
20+
-export([
21+
start_link/3
22+
]).
23+
24+
-export([
25+
code_change/3,
26+
handle_call/3,
27+
handle_cast/2,
28+
handle_info/2,
29+
init/1,
30+
terminate/2
31+
]).
32+
33+
-include("brod_int.hrl").
34+
35+
-define(DEFAULT_SYNC_PARTITIONS_INTERVAL_SECONDS, 60).
36+
37+
-record(state, {
38+
client_id :: pid(),
39+
producers_sup :: pid(),
40+
interval :: non_neg_integer(),
41+
config :: brod_client:config()
42+
}).
43+
44+
-type state() :: #state{}.
45+
46+
-spec start_link(pid(), pid(), brod_client:config()) ->
47+
{ok, pid()} | {error, any()}.
48+
start_link(ClientId, ProducersSup, Config) ->
49+
gen_server:start_link(?MODULE, {ClientId, ProducersSup, Config}, []).
50+
51+
-spec init({pid(), pid(), brod_client:config()}) ->
52+
{ok, state()}.
53+
init({ClientId, ProducersSup, Config}) ->
54+
Interval = sync_partition_interval(Config),
55+
schedule_sync(Interval),
56+
State = #state{
57+
client_id = ClientId,
58+
producers_sup = ProducersSup,
59+
interval = Interval,
60+
config = Config
61+
},
62+
63+
{ok, State}.
64+
65+
%% @private
66+
handle_info(
67+
sync,
68+
#state{
69+
client_id = Client,
70+
config = Config,
71+
producers_sup = Sup,
72+
interval = Interval
73+
} = State
74+
) ->
75+
sync_partitions(Client, Sup, Config),
76+
sync_client(Client),
77+
schedule_sync(Interval),
78+
{noreply, State};
79+
handle_info(_Info, #state{} = State) ->
80+
{noreply, State}.
81+
82+
%% @private
83+
handle_call(Call, _From, #state{} = State) ->
84+
{reply, {error, {unsupported_call, Call}}, State}.
85+
86+
%% @private
87+
handle_cast(_Cast, #state{} = State) ->
88+
{noreply, State}.
89+
90+
%% @private
91+
code_change(_OldVsn, #state{} = State, _Extra) ->
92+
{ok, State}.
93+
94+
%% @private
95+
terminate(_Reason, _State) ->
96+
ok.
97+
98+
%% @private
99+
sync_client(Client) ->
100+
case brod_client:get_metadata_safe(Client, []) of
101+
{ok, _} ->
102+
ok;
103+
{error, Error} ->
104+
?BROD_LOG_ERROR("Partitions Sync Client MetaData Error: ~p", [Error]),
105+
ok
106+
end.
107+
108+
%% @private
109+
sync_partitions(Client, Sup, Config) ->
110+
Producers = brod_supervisor3:which_children(Sup),
111+
TopicsList = lists:map(fun({Topic, ProducerPid, _, _}) -> {Topic, ProducerPid} end, Producers),
112+
TopicsMap = maps:from_list(TopicsList),
113+
MetaData = get_metadata_for_topics(Client, maps:keys(TopicsMap)),
114+
lists:foreach(
115+
fun(#{name := Topic, partitions := Partitions}) ->
116+
ProducerPid = maps:get(Topic, TopicsMap),
117+
118+
lists:foreach(
119+
fun(#{partition_index := Partition}) ->
120+
sync_partition(ProducerPid, Client, Topic, Partition, Config)
121+
end,
122+
Partitions
123+
)
124+
end,
125+
MetaData
126+
),
127+
ok.
128+
129+
%% @private
130+
schedule_sync(Interval) ->
131+
erlang:send_after(Interval, self(), sync).
132+
133+
%% @private
134+
sync_partition(Sup, Client, Topic, Partition, Config) ->
135+
case brod_producers_sup:find_producer(Sup, Topic, Partition) of
136+
{ok, Pid} ->
137+
{ok, Pid};
138+
{error, _} ->
139+
brod_producers_sup:start_producer(Sup, Client, Topic, Partition, Config)
140+
end.
141+
142+
%% @private
143+
sync_partition_interval(Config) ->
144+
T = proplists:get_value(
145+
sync_partitions_interval_seconds, Config, ?DEFAULT_SYNC_PARTITIONS_INTERVAL_SECONDS
146+
),
147+
timer:seconds(T).
148+
149+
%% @private
150+
get_metadata_for_topics(_Client, []) ->
151+
[];
152+
%% @private
153+
get_metadata_for_topics(Client, Topics) ->
154+
case brod_client:get_bootstrap(Client) of
155+
{ok, {Hosts, Conn}} ->
156+
case brod_utils:get_metadata(Hosts, Topics, Conn) of
157+
{ok, #{topics := MetaData}} ->
158+
MetaData;
159+
{error, Error} ->
160+
?BROD_LOG_ERROR("Partitions Sync MetaData Error: ~p", [Error]),
161+
[]
162+
end;
163+
{error, Error} ->
164+
?BROD_LOG_ERROR("Partitions Sync Bootstrap Error: ~p", [Error]),
165+
[]
166+
end.

src/brod_utils.erl

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
, describe_groups/3
2727
, create_topics/3
2828
, create_topics/4
29+
, create_partitions/3
30+
, create_partitions/4
2931
, delete_topics/3
3032
, delete_topics/4
3133
, epoch_ms/0
@@ -73,6 +75,7 @@
7375
-type conn_config() :: brod:conn_config().
7476
-type topic() :: brod:topic().
7577
-type topic_config() :: kpro:struct().
78+
-type topic_partition_config() :: kpro:struct().
7679
-type partition() :: brod:partition().
7780
-type offset() :: brod:offset().
7881
-type endpoint() :: brod:endpoint().
@@ -103,6 +106,27 @@ create_topics(Hosts, TopicConfigs, RequestConfigs, ConnCfg) ->
103106
Pid, TopicConfigs, RequestConfigs),
104107
request_sync(Pid, Request)
105108
end).
109+
110+
%% @equiv create_partitions(Hosts, TopicPartitionConfigs, RequestConfigs, [])
111+
-spec create_partitions([endpoint()], [topic_partition_config()], #{timeout => kpro:int32()}) ->
112+
ok | {error, any()}.
113+
create_partitions(Hosts, TopicPartitionConfigs, RequestConfigs) ->
114+
create_partitions(Hosts, TopicPartitionConfigs, RequestConfigs, _ConnCfg = []).
115+
116+
%% @doc Try to connect to the controller node using the given
117+
%% connection options and create the given partitions with configs
118+
-spec create_partitions([endpoint()], [topic_partition_config()], #{timeout => kpro:int32()},
119+
conn_config()) ->
120+
ok | {error, any()}.
121+
create_partitions(Hosts, TopicPartitionConfigs, RequestConfigs, ConnCfg) ->
122+
KproOpts = kpro_connection_options(ConnCfg),
123+
with_conn(kpro:connect_controller(Hosts, nolink(ConnCfg), KproOpts),
124+
fun(Pid) ->
125+
Request = brod_kafka_request:create_partitions(
126+
Pid, TopicPartitionConfigs, RequestConfigs),
127+
request_sync(Pid, Request)
128+
end).
129+
106130
%% @equiv delete_topics(Hosts, Topics, Timeout, [])
107131
-spec delete_topics([endpoint()], [topic()], pos_integer()) ->
108132
ok | {error, any()}.

0 commit comments

Comments
 (0)