From 1cc6093bdd8c0b052949f3dd94af97e27e5fdaa9 Mon Sep 17 00:00:00 2001 From: Chris Kempton Date: Thu, 20 Mar 2025 18:23:39 +0000 Subject: [PATCH 01/15] stash --- src/brod.erl | 32 +++++++++++++++++++++++++++++ src/brod_client.erl | 41 ++++++++++++++++++++++++++++++++++++++ src/brod_kafka_apis.erl | 33 +++++++++++++++--------------- src/brod_kafka_request.erl | 12 +++++++++++ src/brod_utils.erl | 24 ++++++++++++++++++++++ 5 files changed, 126 insertions(+), 16 deletions(-) diff --git a/src/brod.erl b/src/brod.erl index 415ad28c..a285b769 100644 --- a/src/brod.erl +++ b/src/brod.erl @@ -96,6 +96,8 @@ %% Topic APIs -export([ create_topics/3 , create_topics/4 + , create_partitions/3 + , create_partitions/4 , delete_topics/3 , delete_topics/4 ]). @@ -190,6 +192,7 @@ -type endpoint() :: {hostname(), portnum()}. -type topic() :: kpro:topic(). -type topic_config() :: kpro:struct(). +-type topic_partition_config() :: kpro:struct(). -type partition() :: kpro:partition(). -type topic_partition() :: {topic(), partition()}. -type offset() :: kpro:offset(). %% Physical offset (an integer) @@ -1049,6 +1052,35 @@ create_topics(Hosts, TopicConfigs, RequestConfigs) -> create_topics(Hosts, TopicConfigs, RequestConfigs, Options) -> brod_utils:create_topics(Hosts, TopicConfigs, RequestConfigs, Options). +%% @equiv create_partitions(Hosts, TopicPartitionConfigs, RequestConfigs, []) +-spec create_partitions([endpoint()], [topic_partition_config()], #{timeout => kpro:int32()}) -> + ok | {error, any()}. +create_partitions(Hosts, TopicPartitionConfigs, RequestConfigs) -> + brod_utils:create_partitions(Hosts, TopicPartitionConfigs, RequestConfigs). + +%% @doc Create partitions(s) in kafka. +%% See {@link create_topics/4} for more information. +%% +%% Example: +%% ``` +%% > TopicPartitionConfigs = [ +%% #{ +%% topic: <<"my_topic">>, +%% new_partitions: #{ +%% count: 6, +%% assignment: [[1,2], [2,3], [3,1]] +%% } +%% } +%% ]. +%% > brod:create_partitions([{"localhost", 9092}], TopicPartitionConfigs, #{timeout => 1000}, []). +%% ok +%% ''' +-spec create_partitions([endpoint()], [topic_partition_config()], #{timeout => kpro:int32()}, + conn_config()) -> + ok | {error, any()}. +create_partitions(Hosts, TopicPartitionConfigs, RequestConfigs, Options) -> + brod_utils:create_partitions(Hosts, TopicPartitionConfigs, RequestConfigs, Options). + %% @equiv delete_topics(Hosts, Topics, Timeout, []) -spec delete_topics([endpoint()], [topic()], pos_integer()) -> ok | {error, any()}. diff --git a/src/brod_client.erl b/src/brod_client.erl index f624da6c..6536df4b 100644 --- a/src/brod_client.erl +++ b/src/brod_client.erl @@ -72,6 +72,7 @@ -define(DEFAULT_RECONNECT_COOL_DOWN_SECONDS, 1). -define(DEFAULT_GET_METADATA_TIMEOUT_SECONDS, 5). -define(DEFAULT_UNKNOWN_TOPIC_CACHE_TTL, 120000). +-define(DEFAULT_SYNC_METADATA_INTERVAL_SECONDS, 60). %% ClientId as ets table name. -define(ETS(ClientId), ClientId). @@ -345,6 +346,7 @@ init({BootstrapEndpoints, ClientId, Config}) -> Tab = ets:new(?ETS(ClientId), [named_table, protected, {read_concurrency, true}]), self() ! init, + maybe_start_metadata_sync(Config), {ok, #state{ client_id = ClientId , bootstrap_endpoints = BootstrapEndpoints , config = Config @@ -362,6 +364,11 @@ handle_info(init, State0) -> , consumers_sup = ConsumersSupPid }, {noreply, State}; +handle_info(sync, #state{config = Config, producers_sup = Sup} = State) -> + Children = brod_supervisor3:which_children(Sup), + NewState = sync_topics_metadata(Children, State), + erlang:send_after(sync_interval(Config), self(), sync), + {noreply, NewState}; handle_info({'EXIT', Pid, Reason}, #state{ client_id = ClientId , producers_sup = Pid } = State) -> @@ -695,6 +702,11 @@ timeout(Config) -> ?DEFAULT_GET_METADATA_TIMEOUT_SECONDS), timer:seconds(T). +sync_interval(Config) -> + T = config(sync_metadata_interval_seconds, Config, + ?DEFAULT_SYNC_METADATA_INTERVAL_SECONDS), + timer:seconds(T). + config(Key, Config, Default) -> proplists:get_value(Key, Config, Default). @@ -719,6 +731,35 @@ ensure_binary(ClientId) when is_atom(ClientId) -> ensure_binary(ClientId) when is_binary(ClientId) -> ClientId. +maybe_start_metadata_sync(Config) -> + case config(sync_metadata, Config, false) of + true -> + erlang:send_after(sync_interval(Config), self(), sync), + ok; + false -> + ok + end. + +sync_topics_metadata(Children, State) -> + lists:foldl(fun({Topic, _, _, _}, StateAcc) -> sync_topic_metadata(Topic, StateAcc) end, State, Children). + +sync_topic_metadata(Topic, State) -> + case do_get_metadata(Topic, State) of + {{ok, #{ topics := [ #{ partitions := Partitions } ] }}, NewState} -> + lists:foreach(fun (Partition) -> sync_topic_partition(Partition, Topic, State) end, Partitions), + NewState; + {{error, _}, NewState} -> + NewState + end. + +sync_topic_partition(#{ partition_index := Partition }, Topic, #state{ producers_sup = Sup, client_id = Client, config = Config}) -> + case brod_producers_sup:find_producer(Sup, Topic, Partition) of + {ok, _} -> + ok; + {error, {producer_not_found, _, _}} -> + brod_producer:start_link(Client, Topic, Partition, Config) + end. + -spec maybe_connect(state(), endpoint()) -> {Result, state()} when Result :: {ok, pid()} | {error, any()}. maybe_connect(#state{} = State, Endpoint) -> diff --git a/src/brod_kafka_apis.erl b/src/brod_kafka_apis.erl index 75bbbe6b..282f8b02 100644 --- a/src/brod_kafka_apis.erl +++ b/src/brod_kafka_apis.erl @@ -139,22 +139,23 @@ lookup_vsn_range(Conn, API) -> %% Do not change range without verification. supported_versions(API) -> case API of - produce -> {0, 7}; - fetch -> {0, 10}; - list_offsets -> {0, 2}; - metadata -> {0, 2}; - offset_commit -> {2, 2}; - offset_fetch -> {1, 2}; - find_coordinator -> {0, 0}; - join_group -> {0, 1}; - heartbeat -> {0, 0}; - leave_group -> {0, 0}; - sync_group -> {0, 0}; - describe_groups -> {0, 0}; - list_groups -> {0, 0}; - create_topics -> {0, 0}; - delete_topics -> {0, 0}; - _ -> erlang:error({unsupported_api, API}) + produce -> {0, 7}; + fetch -> {0, 10}; + list_offsets -> {0, 2}; + metadata -> {0, 2}; + offset_commit -> {2, 2}; + offset_fetch -> {1, 2}; + find_coordinator -> {0, 0}; + join_group -> {0, 1}; + heartbeat -> {0, 0}; + leave_group -> {0, 0}; + sync_group -> {0, 0}; + describe_groups -> {0, 0}; + list_groups -> {0, 0}; + create_topics -> {0, 0}; + create_partitions -> {0, 1}; + delete_topics -> {0, 0}; + _ -> erlang:error({unsupported_api, API}) end. monitor_connection(Conn) -> diff --git a/src/brod_kafka_request.erl b/src/brod_kafka_request.erl index c6046aab..00261278 100644 --- a/src/brod_kafka_request.erl +++ b/src/brod_kafka_request.erl @@ -18,6 +18,7 @@ -module(brod_kafka_request). -export([ create_topics/3 + , create_partitions/3 , delete_topics/3 , fetch/8 , list_groups/1 @@ -36,6 +37,7 @@ -type vsn() :: brod_kafka_apis:vsn(). -type topic() :: brod:topic(). -type topic_config() :: kpro:struct(). +-type topic_partition_config() :: kpro:struct(). -type partition() :: brod:partition(). -type offset() :: brod:offset(). -type conn() :: kpro:connection(). @@ -65,6 +67,16 @@ create_topics(Connection, TopicConfigs, RequestConfigs) create_topics(Vsn, TopicConfigs, RequestConfigs) -> kpro_req_lib:create_topics(Vsn, TopicConfigs, RequestConfigs). +%% @doc Make a create_partitions request. +-spec create_partitions(vsn() | conn(), [topic_partition_config()], #{timeout => kpro:int32(), + validate_only => boolean()}) -> kpro:req(). +create_partitions(Connection, TopicPartitionConfigs, RequestConfigs) + when is_pid(Connection) -> + Vsn = brod_kafka_apis:pick_version(Connection, create_partitions), + create_partitions(Vsn, TopicPartitionConfigs, RequestConfigs); +create_partitions(Vsn, TopicPartitionConfigs, RequestConfigs) -> + kpro_req_lib:create_partitions(Vsn, TopicPartitionConfigs, RequestConfigs). + %% @doc Make a delete_topics request. -spec delete_topics(vsn() | conn(), [topic()], pos_integer()) -> kpro:req(). delete_topics(Connection, Topics, Timeout) when is_pid(Connection) -> diff --git a/src/brod_utils.erl b/src/brod_utils.erl index e6525f01..e3c54b9a 100644 --- a/src/brod_utils.erl +++ b/src/brod_utils.erl @@ -26,6 +26,8 @@ , describe_groups/3 , create_topics/3 , create_topics/4 + , create_partitions/3 + , create_partitions/4 , delete_topics/3 , delete_topics/4 , epoch_ms/0 @@ -73,6 +75,7 @@ -type conn_config() :: brod:conn_config(). -type topic() :: brod:topic(). -type topic_config() :: kpro:struct(). +-type topic_partition_config() :: kpro:struct(). -type partition() :: brod:partition(). -type offset() :: brod:offset(). -type endpoint() :: brod:endpoint(). @@ -103,6 +106,27 @@ create_topics(Hosts, TopicConfigs, RequestConfigs, ConnCfg) -> Pid, TopicConfigs, RequestConfigs), request_sync(Pid, Request) end). + +%% @equiv create_partitions(Hosts, TopicPartitionConfigs, RequestConfigs, []) +-spec create_partitions([endpoint()], [topic_partition_config()], #{timeout => kpro:int32()}) -> + ok | {error, any()}. +create_partitions(Hosts, TopicPartitionConfigs, RequestConfigs) -> + create_partitions(Hosts, TopicPartitionConfigs, RequestConfigs, _ConnCfg = []). + +%% @doc Try to connect to the controller node using the given +%% connection options and create the given partitions with configs +-spec create_partitions([endpoint()], [topic_partition_config()], #{timeout => kpro:int32()}, + conn_config()) -> + ok | {error, any()}. +create_partitions(Hosts, TopicPartitionConfigs, RequestConfigs, ConnCfg) -> + KproOpts = kpro_connection_options(ConnCfg), + with_conn(kpro:connect_controller(Hosts, nolink(ConnCfg), KproOpts), + fun(Pid) -> + Request = brod_kafka_request:create_partitions( + Pid, TopicPartitionConfigs, RequestConfigs), + request_sync(Pid, Request) + end). + %% @equiv delete_topics(Hosts, Topics, Timeout, []) -spec delete_topics([endpoint()], [topic()], pos_integer()) -> ok | {error, any()}. From 2e91370c380a75ddfc4a7b5587f160f984616f41 Mon Sep 17 00:00:00 2001 From: Chris Kempton Date: Thu, 20 Mar 2025 19:06:02 +0000 Subject: [PATCH 02/15] update inline docs --- src/brod.erl | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/brod.erl b/src/brod.erl index a285b769..d49dba5f 100644 --- a/src/brod.erl +++ b/src/brod.erl @@ -1059,7 +1059,16 @@ create_partitions(Hosts, TopicPartitionConfigs, RequestConfigs) -> brod_utils:create_partitions(Hosts, TopicPartitionConfigs, RequestConfigs). %% @doc Create partitions(s) in kafka. -%% See {@link create_topics/4} for more information. +%%