Skip to content

Commit 31a0e8c

Browse files
authored
BE: Improved speed of consumer groups requests (#1308)
1 parent 40e6a6e commit 31a0e8c

File tree

5 files changed

+347
-33
lines changed

5 files changed

+347
-33
lines changed

api/src/main/java/io/kafbat/ui/config/ClustersProperties.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,19 @@ public class ClustersProperties {
4343
CacheProperties cache = new CacheProperties();
4444
ClusterFtsProperties fts = new ClusterFtsProperties();
4545

46+
AdminClient adminClient = new AdminClient();
47+
48+
@Data
49+
public static class AdminClient {
50+
Integer timeout;
51+
int describeConsumerGroupsPartitionSize = 50;
52+
int describeConsumerGroupsConcurrency = 4;
53+
int listConsumerGroupOffsetsPartitionSize = 50;
54+
int listConsumerGroupOffsetsConcurrency = 4;
55+
int getTopicsConfigPartitionSize = 200;
56+
int describeTopicsPartitionSize = 200;
57+
}
58+
4659
@Data
4760
public static class Cluster {
4861
@NotBlank(message = "field name for for cluster could not be blank")

api/src/main/java/io/kafbat/ui/service/AdminClientServiceImpl.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,10 @@ public class AdminClientServiceImpl implements AdminClientService, Closeable {
2727

2828
private final Map<String, ReactiveAdminClient> adminClientCache = new ConcurrentHashMap<>();
2929
private final int clientTimeout;
30+
private final ClustersProperties clustersProperties;
3031

3132
public AdminClientServiceImpl(ClustersProperties clustersProperties) {
33+
this.clustersProperties = clustersProperties;
3234
this.clientTimeout = Optional.ofNullable(clustersProperties.getAdminClientTimeout())
3335
.orElse(DEFAULT_CLIENT_TIMEOUT_MS);
3436
}
@@ -53,7 +55,9 @@ private Mono<ReactiveAdminClient> createAdminClient(KafkaCluster cluster) {
5355
);
5456
return AdminClient.create(properties);
5557
}).subscribeOn(Schedulers.boundedElastic())
56-
.flatMap(ac -> ReactiveAdminClient.create(ac).doOnError(th -> ac.close()))
58+
.flatMap(ac -> ReactiveAdminClient.create(ac, clustersProperties.getAdminClient())
59+
.doOnError(th -> ac.close())
60+
)
5761
.onErrorMap(th -> new IllegalStateException(
5862
"Error while creating AdminClient for the cluster " + cluster.getName(), th));
5963
}

api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java

Lines changed: 105 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,11 @@
88
import io.kafbat.ui.model.InternalConsumerGroup;
99
import io.kafbat.ui.model.InternalTopicConsumerGroup;
1010
import io.kafbat.ui.model.KafkaCluster;
11+
import io.kafbat.ui.model.ServerStatusDTO;
1112
import io.kafbat.ui.model.SortOrderDTO;
13+
import io.kafbat.ui.model.Statistics;
1214
import io.kafbat.ui.service.index.ConsumerGroupFilter;
15+
import io.kafbat.ui.service.metrics.scrape.ScrapedClusterState;
1316
import io.kafbat.ui.service.rbac.AccessControlService;
1417
import io.kafbat.ui.util.ApplicationMetrics;
1518
import io.kafbat.ui.util.KafkaClientSslPropertiesUtil;
@@ -19,7 +22,9 @@
1922
import java.util.HashMap;
2023
import java.util.List;
2124
import java.util.Map;
25+
import java.util.Optional;
2226
import java.util.Properties;
27+
import java.util.Set;
2328
import java.util.function.ToIntFunction;
2429
import java.util.stream.Collectors;
2530
import java.util.stream.Stream;
@@ -41,6 +46,7 @@ public class ConsumerGroupService {
4146
private final AdminClientService adminClientService;
4247
private final AccessControlService accessControlService;
4348
private final ClustersProperties clustersProperties;
49+
private final StatisticsCache statisticsCache;
4450

4551
private Mono<List<InternalConsumerGroup>> getConsumerGroups(
4652
ReactiveAdminClient ac,
@@ -67,27 +73,63 @@ private Mono<List<InternalConsumerGroup>> getConsumerGroups(
6773
public Mono<List<InternalTopicConsumerGroup>> getConsumerGroupsForTopic(KafkaCluster cluster,
6874
String topic) {
6975
return adminClientService.get(cluster)
70-
// 1. getting topic's end offsets
7176
.flatMap(ac -> ac.listTopicOffsets(topic, OffsetSpec.latest(), false)
72-
.flatMap(endOffsets -> {
73-
var tps = new ArrayList<>(endOffsets.keySet());
74-
// 2. getting all consumer groups
75-
return describeConsumerGroups(ac)
76-
.flatMap((List<ConsumerGroupDescription> groups) -> {
77-
// 3. trying to find committed offsets for topic
78-
var groupNames = groups.stream().map(ConsumerGroupDescription::groupId).toList();
79-
return ac.listConsumerGroupOffsets(groupNames, tps).map(offsets ->
80-
groups.stream()
81-
// 4. keeping only groups that relates to topic
82-
.filter(g -> isConsumerGroupRelatesToTopic(topic, g, offsets.containsRow(g.groupId())))
83-
.map(g ->
84-
// 5. constructing results
85-
InternalTopicConsumerGroup.create(topic, g, offsets.row(g.groupId()), endOffsets))
86-
.toList()
87-
);
88-
}
89-
);
90-
}));
77+
.flatMap(endOffsets ->
78+
describeConsumerGroups(cluster, ac, true).flatMap(groups ->
79+
filterConsumerGroups(cluster, ac, groups, topic, endOffsets)
80+
)
81+
)
82+
);
83+
}
84+
85+
private Mono<List<InternalTopicConsumerGroup>> filterConsumerGroups(
86+
KafkaCluster cluster,
87+
ReactiveAdminClient ac,
88+
List<ConsumerGroupDescription> groups,
89+
String topic,
90+
Map<TopicPartition, Long> endOffsets) {
91+
92+
Set<ConsumerGroupState> inactiveStates = Set.of(
93+
ConsumerGroupState.DEAD,
94+
ConsumerGroupState.EMPTY
95+
);
96+
97+
Map<Boolean, List<ConsumerGroupDescription>> partitioned = groups.stream().collect(
98+
Collectors.partitioningBy((g) -> !inactiveStates.contains(g.state()))
99+
);
100+
101+
List<ConsumerGroupDescription> stable = partitioned.get(true).stream()
102+
.filter(g -> isConsumerGroupRelatesToTopic(topic, g, false))
103+
.toList();
104+
105+
List<ConsumerGroupDescription> dead = partitioned.get(false);
106+
if (!dead.isEmpty()) {
107+
Statistics statistics = statisticsCache.get(cluster);
108+
if (statistics.getStatus().equals(ServerStatusDTO.ONLINE)) {
109+
Map<String, ScrapedClusterState.ConsumerGroupState> consumerGroupsStates =
110+
statistics.getClusterState().getConsumerGroupsStates();
111+
dead = dead.stream().filter(g ->
112+
Optional.ofNullable(consumerGroupsStates.get(g.groupId()))
113+
.map(s ->
114+
s.committedOffsets().keySet().stream().anyMatch(tp -> tp.topic().equals(topic))
115+
).orElse(false)
116+
).toList();
117+
}
118+
}
119+
120+
List<ConsumerGroupDescription> filtered = new ArrayList<>(stable.size() + dead.size());
121+
filtered.addAll(stable);
122+
filtered.addAll(dead);
123+
124+
List<TopicPartition> partitions = new ArrayList<>(endOffsets.keySet());
125+
126+
List<String> groupIds = filtered.stream().map(ConsumerGroupDescription::groupId).toList();
127+
return ac.listConsumerGroupOffsets(groupIds, partitions).map(offsets ->
128+
filtered.stream().filter(g ->
129+
isConsumerGroupRelatesToTopic(topic, g, offsets.containsRow(g.groupId()))
130+
).map(g ->
131+
InternalTopicConsumerGroup.create(topic, g, offsets.row(g.groupId()), endOffsets)
132+
).toList());
91133
}
92134

93135
private boolean isConsumerGroupRelatesToTopic(String topic,
@@ -208,13 +250,53 @@ private <T> Stream<T> sortAndPaginate(Collection<T> collection,
208250
.limit(perPage);
209251
}
210252

211-
private Mono<List<ConsumerGroupDescription>> describeConsumerGroups(ReactiveAdminClient ac) {
253+
private Mono<List<ConsumerGroupDescription>> describeConsumerGroups(
254+
KafkaCluster cluster,
255+
ReactiveAdminClient ac,
256+
boolean cache) {
212257
return ac.listConsumerGroupNames()
213-
.flatMap(ac::describeConsumerGroups)
214-
.map(cgs -> new ArrayList<>(cgs.values()));
258+
.flatMap(names -> describeConsumerGroups(names, cluster, ac, cache));
259+
}
260+
261+
private Mono<List<ConsumerGroupDescription>> describeConsumerGroups(
262+
List<String> groupNames,
263+
KafkaCluster cluster,
264+
ReactiveAdminClient ac,
265+
boolean cache) {
266+
267+
Statistics statistics = statisticsCache.get(cluster);
268+
269+
if (cache && statistics.getStatus().equals(ServerStatusDTO.ONLINE)) {
270+
List<ConsumerGroupDescription> result = new ArrayList<>();
271+
List<String> notFound = new ArrayList<>();
272+
Map<String, ScrapedClusterState.ConsumerGroupState> consumerGroupsStates =
273+
statistics.getClusterState().getConsumerGroupsStates();
274+
for (String groupName : groupNames) {
275+
ScrapedClusterState.ConsumerGroupState consumerGroupState = consumerGroupsStates.get(groupName);
276+
if (consumerGroupState != null) {
277+
result.add(consumerGroupState.description());
278+
} else {
279+
notFound.add(groupName);
280+
}
281+
}
282+
if (!notFound.isEmpty()) {
283+
return ac.describeConsumerGroups(notFound)
284+
.map(descriptions -> {
285+
result.addAll(descriptions.values());
286+
return result;
287+
});
288+
} else {
289+
return Mono.just(result);
290+
}
291+
} else {
292+
return ac.describeConsumerGroups(groupNames)
293+
.map(descriptions -> List.copyOf(descriptions.values()));
294+
}
215295
}
216296

217297

298+
299+
218300
private Mono<List<ConsumerGroupDescription>> loadDescriptionsByInternalConsumerGroups(
219301
ReactiveAdminClient ac,
220302
List<ConsumerGroupListing> groups,

api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import com.google.common.collect.ImmutableTable;
1010
import com.google.common.collect.Iterables;
1111
import com.google.common.collect.Table;
12+
import io.kafbat.ui.config.ClustersProperties;
1213
import io.kafbat.ui.exception.IllegalEntityStateException;
1314
import io.kafbat.ui.exception.NotFoundException;
1415
import io.kafbat.ui.exception.ValidationException;
@@ -88,7 +89,6 @@
8889
import org.apache.kafka.common.quota.ClientQuotaAlteration;
8990
import org.apache.kafka.common.quota.ClientQuotaEntity;
9091
import org.apache.kafka.common.quota.ClientQuotaFilter;
91-
import org.apache.kafka.common.requests.DescribeLogDirsResponse;
9292
import org.apache.kafka.common.resource.ResourcePatternFilter;
9393
import reactor.core.publisher.Flux;
9494
import reactor.core.publisher.Mono;
@@ -190,9 +190,11 @@ private static Mono<ConfigRelatedInfo> extract(AdminClient ac) {
190190
}
191191
}
192192

193-
public static Mono<ReactiveAdminClient> create(AdminClient adminClient) {
193+
public static Mono<ReactiveAdminClient> create(AdminClient adminClient, ClustersProperties.AdminClient properties) {
194194
Mono<ConfigRelatedInfo> configRelatedInfoMono = ConfigRelatedInfo.extract(adminClient);
195-
return configRelatedInfoMono.map(info -> new ReactiveAdminClient(adminClient, configRelatedInfoMono, info));
195+
return configRelatedInfoMono.map(info ->
196+
new ReactiveAdminClient(adminClient, configRelatedInfoMono, properties, info)
197+
);
196198
}
197199

198200

@@ -235,6 +237,7 @@ public static <T> Mono<T> toMono(KafkaFuture<T> future) {
235237
@Getter(AccessLevel.PACKAGE) // visible for testing
236238
private final AdminClient client;
237239
private final Mono<ConfigRelatedInfo> configRelatedInfoMono;
240+
private final ClustersProperties.AdminClient properties;
238241

239242
private volatile ConfigRelatedInfo configRelatedInfo;
240243

@@ -280,7 +283,7 @@ public Mono<Map<String, List<ConfigEntry>>> getTopicsConfig(Collection<String> t
280283
// we need to partition calls, because it can lead to AdminClient timeouts in case of large topics count
281284
return partitionCalls(
282285
topicNames,
283-
200,
286+
properties.getGetTopicsConfigPartitionSize(),
284287
part -> getTopicsConfigImpl(part, includeDocFixed),
285288
mapMerger()
286289
);
@@ -348,7 +351,7 @@ public Mono<Map<String, TopicDescription>> describeTopics(Collection<String> top
348351
// we need to partition calls, because it can lead to AdminClient timeouts in case of large topics count
349352
return partitionCalls(
350353
topics,
351-
200,
354+
properties.getDescribeTopicsPartitionSize(),
352355
this::describeTopicsImpl,
353356
mapMerger()
354357
);
@@ -517,8 +520,8 @@ public Mono<Collection<ConsumerGroupListing>> listConsumerGroups() {
517520
public Mono<Map<String, ConsumerGroupDescription>> describeConsumerGroups(Collection<String> groupIds) {
518521
return partitionCalls(
519522
groupIds,
520-
25,
521-
4,
523+
properties.getDescribeConsumerGroupsPartitionSize(),
524+
properties.getDescribeConsumerGroupsConcurrency(),
522525
ids -> toMono(client.describeConsumerGroups(ids).all()),
523526
mapMerger()
524527
);
@@ -541,8 +544,8 @@ public Mono<Table<String, TopicPartition, Long>> listConsumerGroupOffsets(List<S
541544

542545
Mono<Map<String, Map<TopicPartition, OffsetAndMetadata>>> merged = partitionCalls(
543546
consumerGroups,
544-
25,
545-
4,
547+
properties.getListConsumerGroupOffsetsPartitionSize(),
548+
properties.getListConsumerGroupOffsetsConcurrency(),
546549
call,
547550
mapMerger()
548551
);

0 commit comments

Comments
 (0)