Skip to content

Commit bac16cc

Browse files
authored
BE: Fixes #445 Added connect cluster info (#1247)
1 parent 451ecf5 commit bac16cc

File tree

6 files changed

+205
-23
lines changed

6 files changed

+205
-23
lines changed

api/src/main/java/io/kafbat/ui/config/ClustersProperties.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,7 @@ public enum LogLevel {
192192
public static class CacheProperties {
193193
boolean enabled = true;
194194
Duration connectCacheExpiry = Duration.ofMinutes(1);
195+
Duration connectClusterCacheExpiry = Duration.ofHours(24);
195196
}
196197

197198
@PostConstruct

api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.kafbat.ui.mapper;
22

33
import io.kafbat.ui.config.ClustersProperties;
4+
import io.kafbat.ui.connect.model.ClusterInfo;
45
import io.kafbat.ui.connect.model.ConnectorStatusConnector;
56
import io.kafbat.ui.connect.model.ConnectorTask;
67
import io.kafbat.ui.connect.model.NewConnector;
@@ -45,6 +46,7 @@ ConnectorPluginConfigValidationResponseDTO fromClient(
4546
default ConnectDTO toKafkaConnect(
4647
ClustersProperties.ConnectCluster connect,
4748
List<InternalConnectorInfo> connectors,
49+
ClusterInfo clusterInfo,
4850
boolean withStats) {
4951
Integer connectorCount = null;
5052
Integer failedConnectors = null;
@@ -66,12 +68,17 @@ default ConnectDTO toKafkaConnect(
6668
.filter(ConnectorStateDTO.FAILED::equals)
6769
.map(s -> 1).orElse(0);
6870

69-
tasksCount += internalConnector.map(c -> c.getTasks().size()).orElse(0);
70-
71-
for (TaskDTO task : connector.getTasks()) {
72-
if (task.getStatus() != null && ConnectorTaskStatusDTO.FAILED.equals(task.getStatus().getState())) {
73-
failedTasksCount += tasksCount;
74-
}
71+
tasksCount += internalConnector.map(ConnectorDTO::getTasks).map(List::size).orElse(0);
72+
73+
if (connector.getTasks() != null) {
74+
failedTasksCount += (int) connector.getTasks().stream()
75+
.filter(t ->
76+
Optional.ofNullable(t)
77+
.map(TaskDTO::getStatus)
78+
.map(TaskStatusDTO::getState)
79+
.map(ConnectorTaskStatusDTO.FAILED::equals)
80+
.orElse(false)
81+
).count();
7582
}
7683
}
7784

@@ -83,7 +90,10 @@ default ConnectDTO toKafkaConnect(
8390
.connectorsCount(connectorCount)
8491
.failedConnectorsCount(failedConnectors)
8592
.tasksCount(tasksCount)
86-
.failedTasksCount(failedTasksCount);
93+
.failedTasksCount(failedTasksCount)
94+
.version(clusterInfo.getVersion())
95+
.commit(clusterInfo.getCommit())
96+
.clusterId(clusterInfo.getKafkaClusterId());
8797
}
8898

8999
default FullConnectorInfoDTO fullConnectorInfo(InternalConnectorInfo connectInfo) {

api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java

Lines changed: 37 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,14 @@
55
import com.github.benmanes.caffeine.cache.Caffeine;
66
import io.kafbat.ui.config.ClustersProperties;
77
import io.kafbat.ui.connect.api.KafkaConnectClientApi;
8+
import io.kafbat.ui.connect.model.ClusterInfo;
89
import io.kafbat.ui.connect.model.ConnectorStatus;
910
import io.kafbat.ui.connect.model.ConnectorStatusConnector;
1011
import io.kafbat.ui.connect.model.ConnectorTopics;
1112
import io.kafbat.ui.connect.model.TaskStatus;
1213
import io.kafbat.ui.exception.ConnectorOffsetsResetException;
1314
import io.kafbat.ui.exception.NotFoundException;
1415
import io.kafbat.ui.exception.ValidationException;
15-
import io.kafbat.ui.mapper.ClusterMapper;
1616
import io.kafbat.ui.mapper.KafkaConnectMapper;
1717
import io.kafbat.ui.model.ConnectDTO;
1818
import io.kafbat.ui.model.ConnectorActionDTO;
@@ -40,49 +40,59 @@
4040
import org.springframework.web.reactive.function.client.WebClientResponseException;
4141
import reactor.core.publisher.Flux;
4242
import reactor.core.publisher.Mono;
43+
import reactor.util.function.Tuples;
4344

4445
@Service
4546
@Slf4j
4647
public class KafkaConnectService {
47-
private final ClusterMapper clusterMapper;
4848
private final KafkaConnectMapper kafkaConnectMapper;
4949
private final KafkaConfigSanitizer kafkaConfigSanitizer;
5050
private final ClustersProperties clustersProperties;
5151

5252
private final AsyncCache<ConnectCacheKey, List<InternalConnectorInfo>> cachedConnectors;
53+
private final AsyncCache<String, ClusterInfo> cacheClusterInfo;
5354

54-
public KafkaConnectService(ClusterMapper clusterMapper, KafkaConnectMapper kafkaConnectMapper,
55+
public KafkaConnectService(KafkaConnectMapper kafkaConnectMapper,
5556
KafkaConfigSanitizer kafkaConfigSanitizer,
5657
ClustersProperties clustersProperties) {
57-
this.clusterMapper = clusterMapper;
5858
this.kafkaConnectMapper = kafkaConnectMapper;
5959
this.kafkaConfigSanitizer = kafkaConfigSanitizer;
6060
this.clustersProperties = clustersProperties;
6161
this.cachedConnectors = Caffeine.newBuilder()
6262
.expireAfterWrite(clustersProperties.getCache().getConnectCacheExpiry())
6363
.buildAsync();
64+
this.cacheClusterInfo = Caffeine.newBuilder()
65+
.expireAfterWrite(clustersProperties.getCache().getConnectClusterCacheExpiry())
66+
.buildAsync();
6467
}
6568

6669
public Flux<ConnectDTO> getConnects(KafkaCluster cluster, boolean withStats) {
6770
Optional<List<ClustersProperties.@Valid ConnectCluster>> connectClusters =
6871
Optional.ofNullable(cluster.getOriginalProperties().getKafkaConnect());
72+
6973
if (withStats) {
7074
return connectClusters.map(connects ->
71-
Flux.fromIterable(connects).flatMap(connect -> (
72-
getConnectConnectorsFromCache(new ConnectCacheKey(cluster, connect), withStats).map(
73-
connectors -> kafkaConnectMapper.toKafkaConnect(connect, connectors, withStats)
74-
)
75+
Flux.fromIterable(connects).flatMap(c ->
76+
getClusterInfo(cluster, c.getName()).map(ci -> Tuples.of(c, ci))
77+
).flatMap(tuple -> (
78+
getConnectConnectorsFromCache(new ConnectCacheKey(cluster, tuple.getT1()))
79+
.map(connectors ->
80+
kafkaConnectMapper.toKafkaConnect(tuple.getT1(), connectors, tuple.getT2(), withStats)
81+
)
7582
)
7683
)
7784
).orElse(Flux.fromIterable(List.of()));
7885
} else {
79-
return Flux.fromIterable(connectClusters.map(connects ->
80-
connects.stream().map(c -> kafkaConnectMapper.toKafkaConnect(c, List.of(), withStats)).toList()
81-
).orElse(List.of()));
86+
return Flux.fromIterable(connectClusters.orElse(List.of()))
87+
.flatMap(c ->
88+
getClusterInfo(cluster, c.getName()).map(info ->
89+
kafkaConnectMapper.toKafkaConnect(c, List.of(), info, withStats)
90+
)
91+
);
8292
}
8393
}
8494

85-
private Mono<List<InternalConnectorInfo>> getConnectConnectorsFromCache(ConnectCacheKey key, boolean withStats) {
95+
private Mono<List<InternalConnectorInfo>> getConnectConnectorsFromCache(ConnectCacheKey key) {
8696
if (clustersProperties.getCache().isEnabled()) {
8797
return Mono.fromFuture(
8898
cachedConnectors.get(key, (t, e) ->
@@ -94,6 +104,16 @@ private Mono<List<InternalConnectorInfo>> getConnectConnectorsFromCache(ConnectC
94104
}
95105
}
96106

107+
private Mono<ClusterInfo> getClusterInfo(KafkaCluster cluster, String connectName) {
108+
return Mono.fromFuture(cacheClusterInfo.get(connectName, (t, e) ->
109+
api(cluster, connectName).mono(KafkaConnectClientApi::getClusterInfo)
110+
.onErrorResume(th -> {
111+
log.error("Error on collecting cluster info" + th.getMessage(), th);
112+
return Mono.just(new ClusterInfo());
113+
}).toFuture()
114+
));
115+
}
116+
97117
private Flux<InternalConnectorInfo> getConnectConnectors(
98118
KafkaCluster cluster,
99119
ClustersProperties.ConnectCluster connect) {
@@ -177,12 +197,13 @@ public Mono<ConnectorDTO> createConnector(KafkaCluster cluster, String connectNa
177197
.mono(client ->
178198
connector
179199
.flatMap(c -> connectorExists(cluster, connectName, c.getName())
180-
.map(exists -> {
200+
.flatMap(exists -> {
181201
if (Boolean.TRUE.equals(exists)) {
182-
throw new ValidationException(
183-
String.format("Connector with name %s already exists", c.getName()));
202+
return Mono.error(new ValidationException(
203+
String.format("Connector with name %s already exists", c.getName())));
204+
} else {
205+
return Mono.just(c);
184206
}
185-
return c;
186207
}))
187208
.map(kafkaConnectMapper::toClient)
188209
.flatMap(client::createConnector)
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
package io.kafbat.ui.mapper;
2+
3+
import static org.assertj.core.api.Assertions.assertThat;
4+
5+
import io.kafbat.ui.config.ClustersProperties;
6+
import io.kafbat.ui.connect.model.ClusterInfo;
7+
import io.kafbat.ui.model.ConnectDTO;
8+
import io.kafbat.ui.model.ConnectorDTO;
9+
import io.kafbat.ui.model.ConnectorStateDTO;
10+
import io.kafbat.ui.model.ConnectorStatusDTO;
11+
import io.kafbat.ui.model.ConnectorTaskStatusDTO;
12+
import io.kafbat.ui.model.TaskDTO;
13+
import io.kafbat.ui.model.TaskIdDTO;
14+
import io.kafbat.ui.model.TaskStatusDTO;
15+
import io.kafbat.ui.model.connect.InternalConnectorInfo;
16+
import java.util.ArrayList;
17+
import java.util.List;
18+
import java.util.UUID;
19+
import java.util.concurrent.ThreadLocalRandom;
20+
import org.junit.jupiter.api.Test;
21+
import org.openapitools.jackson.nullable.JsonNullable;
22+
23+
class KafkaConnectMapperTest {
24+
25+
@Test
26+
void toKafkaConnect() {
27+
ThreadLocalRandom random = ThreadLocalRandom.current();
28+
29+
List<InternalConnectorInfo> connectors = new ArrayList<>();
30+
int failedConnectors = 0;
31+
int failedTasks = 0;
32+
int tasksPerConnector = random.nextInt(1, 10);
33+
34+
for (int i = 0; i < 10; i++) {
35+
ConnectorStateDTO connectorState;
36+
if (random.nextBoolean()) {
37+
connectorState = ConnectorStateDTO.FAILED;
38+
failedConnectors++;
39+
} else {
40+
connectorState = ConnectorStateDTO.RUNNING;
41+
}
42+
43+
ConnectorDTO connectorDto = new ConnectorDTO();
44+
connectorDto.setName(UUID.randomUUID().toString());
45+
connectorDto.setStatus(
46+
new ConnectorStatusDTO(connectorState, UUID.randomUUID().toString())
47+
);
48+
49+
List<TaskDTO> tasks = new ArrayList<>();
50+
List<TaskIdDTO> taskIds = new ArrayList<>();
51+
52+
for (int j = 0; j < tasksPerConnector; j++) {
53+
TaskDTO task = new TaskDTO();
54+
TaskIdDTO taskId = new TaskIdDTO(UUID.randomUUID().toString(), j);
55+
task.setId(taskId);
56+
57+
ConnectorTaskStatusDTO state;
58+
if (random.nextBoolean()) {
59+
state = ConnectorTaskStatusDTO.FAILED;
60+
failedTasks++;
61+
} else {
62+
state = ConnectorTaskStatusDTO.RUNNING;
63+
}
64+
65+
TaskStatusDTO status = new TaskStatusDTO();
66+
status.setState(state);
67+
task.setStatus(status);
68+
tasks.add(task);
69+
taskIds.add(taskId);
70+
}
71+
72+
connectorDto.setTasks(taskIds);
73+
InternalConnectorInfo connector = InternalConnectorInfo.builder()
74+
.connector(connectorDto)
75+
.tasks(tasks)
76+
.build();
77+
78+
connectors.add(connector);
79+
}
80+
81+
ClusterInfo clusterInfo = new ClusterInfo();
82+
clusterInfo.setVersion(UUID.randomUUID().toString());
83+
clusterInfo.setCommit(UUID.randomUUID().toString());
84+
clusterInfo.setKafkaClusterId(UUID.randomUUID().toString());
85+
86+
ClustersProperties.ConnectCluster connectCluster = ClustersProperties.ConnectCluster.builder()
87+
.name(UUID.randomUUID().toString())
88+
.address("http://localhost:" + random.nextInt(1000, 5000))
89+
.username(UUID.randomUUID().toString())
90+
.password(UUID.randomUUID().toString()).build();
91+
92+
ConnectDTO connectDto = new ConnectDTO();
93+
connectDto.setName(connectCluster.getName());
94+
connectDto.setAddress(connectCluster.getAddress());
95+
connectDto.setVersion(JsonNullable.of(clusterInfo.getVersion()));
96+
connectDto.setCommit(JsonNullable.of(clusterInfo.getCommit()));
97+
connectDto.setClusterId(JsonNullable.of(clusterInfo.getKafkaClusterId()));
98+
connectDto.setConnectorsCount(JsonNullable.of(connectors.size()));
99+
connectDto.setFailedConnectorsCount(JsonNullable.of(failedConnectors));
100+
connectDto.setTasksCount(JsonNullable.of(connectors.size() * tasksPerConnector));
101+
connectDto.setFailedTasksCount(JsonNullable.of(failedTasks));
102+
103+
KafkaConnectMapper mapper = new KafkaConnectMapperImpl();
104+
ConnectDTO kafkaConnect = mapper.toKafkaConnect(
105+
connectCluster,
106+
connectors,
107+
clusterInfo,
108+
true
109+
);
110+
111+
assertThat(kafkaConnect).isNotNull();
112+
assertThat(kafkaConnect).isEqualTo(connectDto);
113+
114+
}
115+
}

contract/src/main/resources/swagger/kafbat-ui-api.yaml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3449,6 +3449,16 @@ components:
34493449
failedTasksCount:
34503450
type: integer
34513451
nullable: true
3452+
version:
3453+
type: string
3454+
nullable: true
3455+
commit:
3456+
type: string
3457+
nullable: true
3458+
clusterId:
3459+
type: string
3460+
nullable: true
3461+
34523462
required:
34533463
- name
34543464

contract/src/main/resources/swagger/kafka-connect-api.yaml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,20 @@ servers:
1414
- url: /localhost
1515

1616
paths:
17+
/:
18+
get:
19+
tags:
20+
- KafkaConnectClient
21+
summary: get kafka connect info
22+
operationId: getClusterInfo
23+
responses:
24+
200:
25+
description: OK
26+
content:
27+
application/json:
28+
schema:
29+
$ref: '#/components/schemas/ClusterInfo'
30+
1731
/connectors:
1832
get:
1933
tags:
@@ -419,6 +433,17 @@ components:
419433
type: http
420434
scheme: basic
421435
schemas:
436+
437+
ClusterInfo:
438+
type: object
439+
properties:
440+
version:
441+
type: string
442+
commit:
443+
type: string
444+
kafka_cluster_id:
445+
type: string
446+
422447
ConnectorConfig:
423448
type: object
424449
additionalProperties:

0 commit comments

Comments
 (0)