diff --git a/src/main/java/org/qubership/integration/platform/engine/configuration/KubeOperatorConfiguration.java b/src/main/java/org/qubership/integration/platform/engine/configuration/KubeOperatorConfiguration.java index 78de5522..730aaf29 100644 --- a/src/main/java/org/qubership/integration/platform/engine/configuration/KubeOperatorConfiguration.java +++ b/src/main/java/org/qubership/integration/platform/engine/configuration/KubeOperatorConfiguration.java @@ -16,6 +16,7 @@ package org.qubership.integration.platform.engine.configuration; +import com.fasterxml.jackson.databind.ObjectMapper; import io.kubernetes.client.openapi.ApiClient; import io.kubernetes.client.util.ClientBuilder; import io.kubernetes.client.util.credentials.AccessTokenAuthentication; @@ -64,7 +65,7 @@ public KubeOperatorConfiguration( */ @Bean @ConditionalOnProperty(prefix = "kubernetes", name = "devmode", havingValue = "false", matchIfMissing = true) - public KubeOperator kubeOperator() { + public KubeOperator kubeOperator(ObjectMapper objectMapper) { try { log.info("Creating KubernetesOperator bean in PROD mode"); @@ -75,10 +76,10 @@ public KubeOperator kubeOperator() { .setAuthentication(new TokenFileAuthentication(tokenFilePath)) .build(); - return new KubeOperator(client, namespace, false); + return new KubeOperator(objectMapper, client, namespace, false); } catch (Exception e) { log.error("Invalid k8s cluster parameters, can't initialize k8s API. {}", e.getMessage()); - return new KubeOperator(); + return new KubeOperator(objectMapper); } } @@ -88,7 +89,7 @@ public KubeOperator kubeOperator() { */ @Bean @ConditionalOnProperty(prefix = "kubernetes", name = "devmode", havingValue = "true") - public KubeOperator kubeOperatorDev() { + public KubeOperator kubeOperatorDev(ObjectMapper objectMapper) { try { log.info("Creating KubernetesOperator bean in DEV mode"); @@ -98,10 +99,10 @@ public KubeOperator kubeOperatorDev() { .setAuthentication(new AccessTokenAuthentication(devToken)) .build(); - return new KubeOperator(client, namespace, true); + return new KubeOperator(objectMapper, client, namespace, true); } catch (Exception e) { log.error("Invalid k8s cluster parameters, can't initialize k8s API. {}", e.getMessage()); - return new KubeOperator(); + return new KubeOperator(objectMapper); } } } diff --git a/src/main/java/org/qubership/integration/platform/engine/configuration/opensearch/OpenSearchInitializer.java b/src/main/java/org/qubership/integration/platform/engine/configuration/opensearch/OpenSearchInitializer.java index 7216307f..bb9c88ea 100644 --- a/src/main/java/org/qubership/integration/platform/engine/configuration/opensearch/OpenSearchInitializer.java +++ b/src/main/java/org/qubership/integration/platform/engine/configuration/opensearch/OpenSearchInitializer.java @@ -74,9 +74,6 @@ public class OpenSearchInitializer { @Value("${qip.opensearch.rollover.min_rollover_age_to_delete:14d}") private TimeValue minRolloverAgeToDelete; - @Value("${qip.opensearch.kafka-client.enabled:false}") - private boolean kafkaClientEnabled; - private final Environment environment; private final ObjectMapper jsonMapper; private final OpenSearchClientSupplier openSearchClientSupplier; @@ -94,10 +91,6 @@ public OpenSearchInitializer( @PostConstruct public void initialize() { - if (kafkaClientEnabled) { - log.info("Update opensearch template and indexes was skipped due to enabled kafka client"); - return; - } log.info("Update opensearch template and indexes"); updateTemplateAndIndexes(openSearchClientSupplier.getClient()); } diff --git a/src/main/java/org/qubership/integration/platform/engine/configuration/opensearch/OpenSearchKafkaAutoConfiguration.java b/src/main/java/org/qubership/integration/platform/engine/configuration/opensearch/OpenSearchKafkaAutoConfiguration.java index 00680038..af82663b 100644 --- a/src/main/java/org/qubership/integration/platform/engine/configuration/opensearch/OpenSearchKafkaAutoConfiguration.java +++ b/src/main/java/org/qubership/integration/platform/engine/configuration/opensearch/OpenSearchKafkaAutoConfiguration.java @@ -4,7 +4,7 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.qubership.integration.platform.engine.kafka.DefaultOpenSearchKafkaProducer; import org.qubership.integration.platform.engine.kafka.OpenSearchKafkaProducer; -import org.qubership.integration.platform.engine.model.opensearch.KafkaQueueElement; +import org.qubership.integration.platform.engine.model.opensearch.SessionElementElastic; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.AutoConfiguration; @@ -42,7 +42,7 @@ public OpenSearchKafkaProducer openSearchKafkaProducer() { ); } - private ProducerFactory openSearchProducerFactory() { + private ProducerFactory openSearchProducerFactory() { Map configProps = new HashMap<>(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaClientBootstrapServers); diff --git a/src/main/java/org/qubership/integration/platform/engine/kafka/DefaultOpenSearchKafkaProducer.java b/src/main/java/org/qubership/integration/platform/engine/kafka/DefaultOpenSearchKafkaProducer.java index fec2820d..27461902 100644 --- a/src/main/java/org/qubership/integration/platform/engine/kafka/DefaultOpenSearchKafkaProducer.java +++ b/src/main/java/org/qubership/integration/platform/engine/kafka/DefaultOpenSearchKafkaProducer.java @@ -1,24 +1,24 @@ package org.qubership.integration.platform.engine.kafka; import lombok.extern.slf4j.Slf4j; -import org.qubership.integration.platform.engine.model.opensearch.KafkaQueueElement; +import org.qubership.integration.platform.engine.model.opensearch.SessionElementElastic; import org.springframework.kafka.core.KafkaTemplate; @Slf4j public class DefaultOpenSearchKafkaProducer implements OpenSearchKafkaProducer { - private final KafkaTemplate openSearchKafkaTemplate; + private final KafkaTemplate openSearchKafkaTemplate; private final String kafkaClientTopic; - public DefaultOpenSearchKafkaProducer(KafkaTemplate openSearchKafkaTemplate, String kafkaClientTopic) { + public DefaultOpenSearchKafkaProducer(KafkaTemplate openSearchKafkaTemplate, String kafkaClientTopic) { this.openSearchKafkaTemplate = openSearchKafkaTemplate; this.kafkaClientTopic = kafkaClientTopic; } @Override - public void send(String key, KafkaQueueElement kafkaQueueElement) { + public void send(String key, SessionElementElastic sessionElementElastic) { try { - openSearchKafkaTemplate.send(kafkaClientTopic, key, kafkaQueueElement); + openSearchKafkaTemplate.send(kafkaClientTopic, key, sessionElementElastic); } catch (Exception e) { log.error("Unable to send element to opensearch via kafka", e); } diff --git a/src/main/java/org/qubership/integration/platform/engine/kafka/OpenSearchKafkaProducer.java b/src/main/java/org/qubership/integration/platform/engine/kafka/OpenSearchKafkaProducer.java index bb1a7d03..6fbbba53 100644 --- a/src/main/java/org/qubership/integration/platform/engine/kafka/OpenSearchKafkaProducer.java +++ b/src/main/java/org/qubership/integration/platform/engine/kafka/OpenSearchKafkaProducer.java @@ -1,8 +1,8 @@ package org.qubership.integration.platform.engine.kafka; -import org.qubership.integration.platform.engine.model.opensearch.KafkaQueueElement; +import org.qubership.integration.platform.engine.model.opensearch.SessionElementElastic; public interface OpenSearchKafkaProducer { - void send(String key, KafkaQueueElement kafkaQueueElement); + void send(String key, SessionElementElastic sessionElementElastic); } diff --git a/src/main/java/org/qubership/integration/platform/engine/kubernetes/KubeCustomObject.java b/src/main/java/org/qubership/integration/platform/engine/kubernetes/KubeCustomObject.java new file mode 100644 index 00000000..0271ef71 --- /dev/null +++ b/src/main/java/org/qubership/integration/platform/engine/kubernetes/KubeCustomObject.java @@ -0,0 +1,21 @@ +package org.qubership.integration.platform.engine.kubernetes; + +import io.kubernetes.client.openapi.models.V1ObjectMeta; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Getter; +import lombok.Setter; + +import java.util.Map; + +@Getter +@Setter +@Builder +@AllArgsConstructor +public class KubeCustomObject { + + private String apiVersion; + private String kind; + private V1ObjectMeta metadata; + private Map spec; +} diff --git a/src/main/java/org/qubership/integration/platform/engine/kubernetes/KubeCustomObjectRequest.java b/src/main/java/org/qubership/integration/platform/engine/kubernetes/KubeCustomObjectRequest.java new file mode 100644 index 00000000..6bc51a13 --- /dev/null +++ b/src/main/java/org/qubership/integration/platform/engine/kubernetes/KubeCustomObjectRequest.java @@ -0,0 +1,16 @@ +package org.qubership.integration.platform.engine.kubernetes; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Getter; + +@Getter +@Builder +@AllArgsConstructor +public class KubeCustomObjectRequest { + + private final String group; + private final String version; + private final String resourceNamePlural; + private final KubeCustomObject body; +} diff --git a/src/main/java/org/qubership/integration/platform/engine/kubernetes/KubeOperator.java b/src/main/java/org/qubership/integration/platform/engine/kubernetes/KubeOperator.java index f4abb4a4..e35fbc45 100644 --- a/src/main/java/org/qubership/integration/platform/engine/kubernetes/KubeOperator.java +++ b/src/main/java/org/qubership/integration/platform/engine/kubernetes/KubeOperator.java @@ -16,10 +16,12 @@ package org.qubership.integration.platform.engine.kubernetes; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import io.kubernetes.client.openapi.ApiClient; import io.kubernetes.client.openapi.ApiException; -import io.kubernetes.client.openapi.apis.AppsV1Api; import io.kubernetes.client.openapi.apis.CoreV1Api; +import io.kubernetes.client.openapi.apis.CustomObjectsApi; import io.kubernetes.client.openapi.models.V1ObjectMeta; import io.kubernetes.client.openapi.models.V1Secret; import io.kubernetes.client.openapi.models.V1SecretList; @@ -38,28 +40,31 @@ public class KubeOperator { private static final String DEFAULT_ERR_MESSAGE = "Invalid k8s cluster parameters or API error. "; + private final ObjectMapper objectMapper; private final CoreV1Api coreApi; - private final AppsV1Api appsApi; + private final CustomObjectsApi customObjectsApi; private final String namespace; private final Boolean devmode; - public KubeOperator() { + public KubeOperator(ObjectMapper objectMapper) { + this.objectMapper = objectMapper; coreApi = new CoreV1Api(); - appsApi = new AppsV1Api(); + customObjectsApi = new CustomObjectsApi(); namespace = null; devmode = null; } - public KubeOperator(ApiClient client, - String namespace, - Boolean devmode) { + public KubeOperator( + ObjectMapper objectMapper, + ApiClient client, + String namespace, + Boolean devmode + ) { + this.objectMapper = objectMapper; + coreApi = new CoreV1Api(client); - coreApi = new CoreV1Api(); - coreApi.setApiClient(client); - - appsApi = new AppsV1Api(); - appsApi.setApiClient(client); + customObjectsApi = new CustomObjectsApi(client); this.namespace = namespace; this.devmode = devmode; @@ -114,7 +119,79 @@ public Map> getAllSecretsWithLabel(Pair