Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.qubership.integration.platform.engine.configuration;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.util.ClientBuilder;
import io.kubernetes.client.util.credentials.AccessTokenAuthentication;
Expand Down Expand Up @@ -64,7 +65,7 @@ public KubeOperatorConfiguration(
*/
@Bean
@ConditionalOnProperty(prefix = "kubernetes", name = "devmode", havingValue = "false", matchIfMissing = true)
public KubeOperator kubeOperator() {
public KubeOperator kubeOperator(ObjectMapper objectMapper) {
try {
log.info("Creating KubernetesOperator bean in PROD mode");

Expand All @@ -75,10 +76,10 @@ public KubeOperator kubeOperator() {
.setAuthentication(new TokenFileAuthentication(tokenFilePath))
.build();

return new KubeOperator(client, namespace, false);
return new KubeOperator(objectMapper, client, namespace, false);
} catch (Exception e) {
log.error("Invalid k8s cluster parameters, can't initialize k8s API. {}", e.getMessage());
return new KubeOperator();
return new KubeOperator(objectMapper);
}
}

Expand All @@ -88,7 +89,7 @@ public KubeOperator kubeOperator() {
*/
@Bean
@ConditionalOnProperty(prefix = "kubernetes", name = "devmode", havingValue = "true")
public KubeOperator kubeOperatorDev() {
public KubeOperator kubeOperatorDev(ObjectMapper objectMapper) {
try {
log.info("Creating KubernetesOperator bean in DEV mode");

Expand All @@ -98,10 +99,10 @@ public KubeOperator kubeOperatorDev() {
.setAuthentication(new AccessTokenAuthentication(devToken))
.build();

return new KubeOperator(client, namespace, true);
return new KubeOperator(objectMapper, client, namespace, true);
} catch (Exception e) {
log.error("Invalid k8s cluster parameters, can't initialize k8s API. {}", e.getMessage());
return new KubeOperator();
return new KubeOperator(objectMapper);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,6 @@ public class OpenSearchInitializer {
@Value("${qip.opensearch.rollover.min_rollover_age_to_delete:14d}")
private TimeValue minRolloverAgeToDelete;

@Value("${qip.opensearch.kafka-client.enabled:false}")
private boolean kafkaClientEnabled;

private final Environment environment;
private final ObjectMapper jsonMapper;
private final OpenSearchClientSupplier openSearchClientSupplier;
Expand All @@ -94,10 +91,6 @@ public OpenSearchInitializer(

@PostConstruct
public void initialize() {
if (kafkaClientEnabled) {
log.info("Update opensearch template and indexes was skipped due to enabled kafka client");
return;
}
log.info("Update opensearch template and indexes");
updateTemplateAndIndexes(openSearchClientSupplier.getClient());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import org.apache.kafka.common.serialization.StringSerializer;
import org.qubership.integration.platform.engine.kafka.DefaultOpenSearchKafkaProducer;
import org.qubership.integration.platform.engine.kafka.OpenSearchKafkaProducer;
import org.qubership.integration.platform.engine.model.opensearch.KafkaQueueElement;
import org.qubership.integration.platform.engine.model.opensearch.SessionElementElastic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.AutoConfiguration;
Expand Down Expand Up @@ -42,7 +42,7 @@ public OpenSearchKafkaProducer openSearchKafkaProducer() {
);
}

private ProducerFactory<String, KafkaQueueElement> openSearchProducerFactory() {
private ProducerFactory<String, SessionElementElastic> openSearchProducerFactory() {
Map<String, Object> configProps = new HashMap<>();

configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaClientBootstrapServers);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
package org.qubership.integration.platform.engine.kafka;

import lombok.extern.slf4j.Slf4j;
import org.qubership.integration.platform.engine.model.opensearch.KafkaQueueElement;
import org.qubership.integration.platform.engine.model.opensearch.SessionElementElastic;
import org.springframework.kafka.core.KafkaTemplate;

@Slf4j
public class DefaultOpenSearchKafkaProducer implements OpenSearchKafkaProducer {

private final KafkaTemplate<String, KafkaQueueElement> openSearchKafkaTemplate;
private final KafkaTemplate<String, SessionElementElastic> openSearchKafkaTemplate;
private final String kafkaClientTopic;

public DefaultOpenSearchKafkaProducer(KafkaTemplate<String, KafkaQueueElement> openSearchKafkaTemplate, String kafkaClientTopic) {
public DefaultOpenSearchKafkaProducer(KafkaTemplate<String, SessionElementElastic> openSearchKafkaTemplate, String kafkaClientTopic) {
this.openSearchKafkaTemplate = openSearchKafkaTemplate;
this.kafkaClientTopic = kafkaClientTopic;
}

@Override
public void send(String key, KafkaQueueElement kafkaQueueElement) {
public void send(String key, SessionElementElastic sessionElementElastic) {
try {
openSearchKafkaTemplate.send(kafkaClientTopic, key, kafkaQueueElement);
openSearchKafkaTemplate.send(kafkaClientTopic, key, sessionElementElastic);
} catch (Exception e) {
log.error("Unable to send element to opensearch via kafka", e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package org.qubership.integration.platform.engine.kafka;

import org.qubership.integration.platform.engine.model.opensearch.KafkaQueueElement;
import org.qubership.integration.platform.engine.model.opensearch.SessionElementElastic;

public interface OpenSearchKafkaProducer {

void send(String key, KafkaQueueElement kafkaQueueElement);
void send(String key, SessionElementElastic sessionElementElastic);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package org.qubership.integration.platform.engine.kubernetes;

import io.kubernetes.client.openapi.models.V1ObjectMeta;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.Setter;

import java.util.Map;

@Getter
@Setter
@Builder
@AllArgsConstructor
public class KubeCustomObject {

private String apiVersion;
private String kind;
private V1ObjectMeta metadata;
private Map<String, Object> spec;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package org.qubership.integration.platform.engine.kubernetes;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;

@Getter
@Builder
@AllArgsConstructor
public class KubeCustomObjectRequest {

private final String group;
private final String version;
private final String resourceNamePlural;
private final KubeCustomObject body;
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@

package org.qubership.integration.platform.engine.kubernetes;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.apis.AppsV1Api;
import io.kubernetes.client.openapi.apis.CoreV1Api;
import io.kubernetes.client.openapi.apis.CustomObjectsApi;
import io.kubernetes.client.openapi.models.V1ObjectMeta;
import io.kubernetes.client.openapi.models.V1Secret;
import io.kubernetes.client.openapi.models.V1SecretList;
Expand All @@ -38,28 +40,31 @@

private static final String DEFAULT_ERR_MESSAGE = "Invalid k8s cluster parameters or API error. ";

private final ObjectMapper objectMapper;
private final CoreV1Api coreApi;
private final AppsV1Api appsApi;
private final CustomObjectsApi customObjectsApi;

private final String namespace;
private final Boolean devmode;

public KubeOperator() {
public KubeOperator(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
coreApi = new CoreV1Api();
appsApi = new AppsV1Api();
customObjectsApi = new CustomObjectsApi();
namespace = null;
devmode = null;
}

public KubeOperator(ApiClient client,
String namespace,
Boolean devmode) {
public KubeOperator(
ObjectMapper objectMapper,
ApiClient client,
String namespace,
Boolean devmode
) {
this.objectMapper = objectMapper;
coreApi = new CoreV1Api(client);

coreApi = new CoreV1Api();
coreApi.setApiClient(client);

appsApi = new AppsV1Api();
appsApi.setApiClient(client);
customObjectsApi = new CustomObjectsApi(client);

this.namespace = namespace;
this.devmode = devmode;
Expand Down Expand Up @@ -114,7 +119,79 @@
return secrets;
}

public void createOrReplaceCustomObject(KubeCustomObjectRequest request) {
String resourceVersion = getCustomObjectResourceVersion(request);
try {
if (resourceVersion != null) {
request.getBody().getMetadata().setResourceVersion(resourceVersion);
customObjectsApi.replaceNamespacedCustomObject(
request.getGroup(),
request.getVersion(),
namespace,
request.getResourceNamePlural(),
request.getBody().getMetadata().getName(),
request.getBody(),
null,
null
);
} else {
customObjectsApi.createNamespacedCustomObject(
request.getGroup(),
request.getVersion(),
namespace,
request.getResourceNamePlural(),
request.getBody(),
null,
null,
null
);
}
} catch (ApiException e) {
if (e.getCode() != 404) {
if (!isDevmode()) {

Check warning on line 151 in src/main/java/org/qubership/integration/platform/engine/kubernetes/KubeOperator.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Use a primitive boolean expression here.

See more on https://sonarcloud.io/project/issues?id=Netcracker_qubership-integration-engine&issues=AZ0-bvWbcQvc8vmi4LDS&open=AZ0-bvWbcQvc8vmi4LDS&pullRequest=486
log.error(DEFAULT_ERR_MESSAGE + e.getResponseBody());
}
throw new KubeApiException(DEFAULT_ERR_MESSAGE + e.getResponseBody(), e);
}
} catch (Exception e) {
if (!isDevmode()) {

Check warning on line 157 in src/main/java/org/qubership/integration/platform/engine/kubernetes/KubeOperator.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Use a primitive boolean expression here.

See more on https://sonarcloud.io/project/issues?id=Netcracker_qubership-integration-engine&issues=AZ0-bvWbcQvc8vmi4LDT&open=AZ0-bvWbcQvc8vmi4LDT&pullRequest=486
log.error(DEFAULT_ERR_MESSAGE + e.getMessage());
}
throw new KubeApiException(DEFAULT_ERR_MESSAGE + e.getMessage(), e);
}
}

public Boolean isDevmode() {
return devmode;
}

private String getCustomObjectResourceVersion(KubeCustomObjectRequest request) {
try {
Object response = customObjectsApi.getNamespacedCustomObject(
request.getGroup(),
request.getVersion(),
namespace,
request.getResourceNamePlural(),
request.getBody().getMetadata().getName()
);

JsonNode responseNode = objectMapper.convertValue(response, JsonNode.class);
JsonNode resourceVersion = responseNode.path("metadata").path("resourceVersion");
return resourceVersion.isMissingNode() || resourceVersion.isNull() ? null : resourceVersion.asText();
} catch (ApiException e) {
if (e.getCode() == 404) {
return null;
} else {
if (!isDevmode()) {

Check warning on line 185 in src/main/java/org/qubership/integration/platform/engine/kubernetes/KubeOperator.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Use a primitive boolean expression here.

See more on https://sonarcloud.io/project/issues?id=Netcracker_qubership-integration-engine&issues=AZ0-bvWbcQvc8vmi4LDU&open=AZ0-bvWbcQvc8vmi4LDU&pullRequest=486
log.error(DEFAULT_ERR_MESSAGE + e.getResponseBody());
}
throw new KubeApiException(DEFAULT_ERR_MESSAGE + e.getResponseBody(), e);
}
} catch (Exception e) {
if (!isDevmode()) {

Check warning on line 191 in src/main/java/org/qubership/integration/platform/engine/kubernetes/KubeOperator.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Use a primitive boolean expression here.

See more on https://sonarcloud.io/project/issues?id=Netcracker_qubership-integration-engine&issues=AZ0-bvWbcQvc8vmi4LDV&open=AZ0-bvWbcQvc8vmi4LDV&pullRequest=486
log.error(DEFAULT_ERR_MESSAGE + e.getMessage());
}
throw new KubeApiException(DEFAULT_ERR_MESSAGE + e.getMessage(), e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import lombok.extern.slf4j.Slf4j;
import org.qubership.integration.platform.engine.kafka.OpenSearchKafkaProducer;
import org.qubership.integration.platform.engine.model.opensearch.KafkaQueueElement;
import org.qubership.integration.platform.engine.model.opensearch.SessionElementElastic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
Expand All @@ -38,11 +37,7 @@ public OpenSearchWriterKafka(OpenSearchKafkaProducer openSearchKafkaProducer) {
}

private void sendToKafka(SessionElementElastic element) {
KafkaQueueElement kafkaQueueElement = KafkaQueueElement.builder()
.id(element.getId())
.source(element)
.build();
openSearchKafkaProducer.send(element.getId(), kafkaQueueElement);
openSearchKafkaProducer.send(element.getId(), element);
}

@Override
Expand Down
Loading