From beebc6d7c3ff2ca7ff3f0c429c3f0296054bf499 Mon Sep 17 00:00:00 2001 From: Sergei Skuratovich <900852+SSNikolaevich@users.noreply.github.com> Date: Thu, 26 Mar 2026 18:20:45 +0300 Subject: [PATCH] feat: implemented MCP trigger --- pom.xml | 35 +++++-- .../engine/camel/JsonMessageValidator.java | 50 +++------- .../engine/model/ChainElementType.java | 1 + .../StringToTimeValueConverter.java | 2 +- .../service/debugger/logging/ChainLogger.java | 2 +- .../context/create/McpToolRegistrar.java | 99 +++++++++++++++++++ .../context/stop/McpToolUnregisterAction.java | 44 +++++++++ src/main/resources/application.yml | 16 +++ 8 files changed, 202 insertions(+), 47 deletions(-) create mode 100644 src/main/java/org/qubership/integration/platform/engine/service/deployment/processing/actions/context/create/McpToolRegistrar.java create mode 100644 src/main/java/org/qubership/integration/platform/engine/service/deployment/processing/actions/context/stop/McpToolUnregisterAction.java diff --git a/pom.xml b/pom.xml index e7208714..f59e023d 100644 --- a/pom.xml +++ b/pom.xml @@ -49,6 +49,7 @@ 4.14.5 2025.0.1 + 1.1.3 1.1.10.8 3.7.4 2.5.2 @@ -57,7 +58,8 @@ 1.5.5.Final 0.2.0 1.18.42 - 1.0.87 + + 19.0.3 3.25.8 5.20.0 @@ -121,6 +123,14 @@ import + + org.springframework.ai + spring-ai-bom + ${spring-ai.version} + pom + import + + @@ -203,11 +213,11 @@ ${atlasmap.version} - - com.networknt - json-schema-validator - ${json-schema-validator.version} - + + + + + io.kubernetes @@ -577,10 +587,10 @@ org.springframework spring-webflux - - com.networknt - json-schema-validator - + + + + io.kubernetes @@ -656,6 +666,11 @@ at.yawk.lz4 lz4-java + + + org.springframework.ai + spring-ai-starter-mcp-server-webmvc + diff --git a/src/main/java/org/qubership/integration/platform/engine/camel/JsonMessageValidator.java b/src/main/java/org/qubership/integration/platform/engine/camel/JsonMessageValidator.java index 139d01eb..074f5ae7 100644 --- a/src/main/java/org/qubership/integration/platform/engine/camel/JsonMessageValidator.java +++ b/src/main/java/org/qubership/integration/platform/engine/camel/JsonMessageValidator.java @@ -16,55 +16,35 @@ package org.qubership.integration.platform.engine.camel; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.networknt.schema.JsonSchema; -import com.networknt.schema.JsonSchemaFactory; -import com.networknt.schema.SpecVersion; -import com.networknt.schema.ValidationMessage; +import com.networknt.schema.*; +import com.networknt.schema.Error; import org.apache.commons.lang3.StringUtils; import org.qubership.integration.platform.engine.errorhandling.ValidationException; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Component; -import java.util.Set; +import java.util.List; import java.util.stream.Collectors; @Component public class JsonMessageValidator { public static final String MESSAGE_VALIDATION_ERROR = "Errors during message validation: "; - private static final String PARSE_MESSAGE_BODY_ERROR = "Unable to parse message body"; private static final String EMPTY_BODY_ERROR = "Message body is empty"; - private final ObjectMapper objectMapper; - - @Autowired - public JsonMessageValidator(@Qualifier("jsonMapper") ObjectMapper objectMapper) { - this.objectMapper = objectMapper; - } - public void validate(String jsonMessageAsString, String jsonSchemaAsString) { - try { - JsonSchemaFactory factory = JsonSchemaFactory.getInstance(SpecVersion.VersionFlag.V7); - JsonSchema schemaNode = factory.getSchema(jsonSchemaAsString); + SchemaRegistry schemaRegistry = SchemaRegistry.withDefaultDialect(SpecificationVersion.DRAFT_7); + Schema schema = schemaRegistry.getSchema(jsonSchemaAsString); - if (StringUtils.isBlank(jsonMessageAsString)) { - throw new ValidationException(EMPTY_BODY_ERROR); - } + if (StringUtils.isBlank(jsonMessageAsString)) { + throw new ValidationException(EMPTY_BODY_ERROR); + } - JsonNode messageNode = objectMapper.readTree(jsonMessageAsString); - Set errors = schemaNode.validate(messageNode); - if (!errors.isEmpty()) { - String validationMessages = errors - .stream() - .map(ValidationMessage::getMessage) - .collect(Collectors.joining(", ")); - throw new ValidationException(MESSAGE_VALIDATION_ERROR.concat(validationMessages)); - } - } catch (JsonProcessingException e) { - throw new ValidationException(PARSE_MESSAGE_BODY_ERROR); + List errors = schema.validate(jsonMessageAsString, InputFormat.JSON); + if (!errors.isEmpty()) { + String validationMessages = errors + .stream() + .map(Error::getMessage) + .collect(Collectors.joining(", ")); + throw new ValidationException(MESSAGE_VALIDATION_ERROR.concat(validationMessages)); } } } diff --git a/src/main/java/org/qubership/integration/platform/engine/model/ChainElementType.java b/src/main/java/org/qubership/integration/platform/engine/model/ChainElementType.java index 50fdf453..aff443e4 100644 --- a/src/main/java/org/qubership/integration/platform/engine/model/ChainElementType.java +++ b/src/main/java/org/qubership/integration/platform/engine/model/ChainElementType.java @@ -67,6 +67,7 @@ public enum ChainElementType { SPLIT_ASYNC_2("split-async-2"), ASYNC_SPLIT_ELEMENT_2("async-split-element-2"), FINALLY_2("finally-2"), + MCP_TRIGGER("mcp-trigger"), UNKNOWN(""); // add more elements as needed diff --git a/src/main/java/org/qubership/integration/platform/engine/opensearch/ism/converters/StringToTimeValueConverter.java b/src/main/java/org/qubership/integration/platform/engine/opensearch/ism/converters/StringToTimeValueConverter.java index 5f79a5a6..458ffd6d 100644 --- a/src/main/java/org/qubership/integration/platform/engine/opensearch/ism/converters/StringToTimeValueConverter.java +++ b/src/main/java/org/qubership/integration/platform/engine/opensearch/ism/converters/StringToTimeValueConverter.java @@ -16,7 +16,7 @@ package org.qubership.integration.platform.engine.opensearch.ism.converters; -import com.networknt.schema.utils.StringUtils; +import org.apache.commons.lang3.StringUtils; import org.qubership.integration.platform.engine.opensearch.ism.model.time.TimeValue; import org.springframework.boot.context.properties.ConfigurationPropertiesBinding; import org.springframework.core.convert.converter.Converter; diff --git a/src/main/java/org/qubership/integration/platform/engine/service/debugger/logging/ChainLogger.java b/src/main/java/org/qubership/integration/platform/engine/service/debugger/logging/ChainLogger.java index d3a3d146..22390623 100644 --- a/src/main/java/org/qubership/integration/platform/engine/service/debugger/logging/ChainLogger.java +++ b/src/main/java/org/qubership/integration/platform/engine/service/debugger/logging/ChainLogger.java @@ -16,7 +16,6 @@ package org.qubership.integration.platform.engine.service.debugger.logging; -import com.networknt.schema.utils.StringUtils; import lombok.extern.slf4j.Slf4j; import org.apache.camel.CamelException; import org.apache.camel.Exchange; @@ -25,6 +24,7 @@ import org.apache.camel.support.http.HttpUtil; import org.apache.camel.tracing.ActiveSpanManager; import org.apache.camel.tracing.SpanAdapter; +import org.apache.commons.lang3.StringUtils; import org.qubership.integration.platform.engine.errorhandling.errorcode.ErrorCode; import org.qubership.integration.platform.engine.model.ChainElementType; import org.qubership.integration.platform.engine.model.constants.CamelConstants; diff --git a/src/main/java/org/qubership/integration/platform/engine/service/deployment/processing/actions/context/create/McpToolRegistrar.java b/src/main/java/org/qubership/integration/platform/engine/service/deployment/processing/actions/context/create/McpToolRegistrar.java new file mode 100644 index 00000000..331e4128 --- /dev/null +++ b/src/main/java/org/qubership/integration/platform/engine/service/deployment/processing/actions/context/create/McpToolRegistrar.java @@ -0,0 +1,99 @@ +package org.qubership.integration.platform.engine.service.deployment.processing.actions.context.create; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.modelcontextprotocol.json.McpJsonMapper; +import io.modelcontextprotocol.json.jackson.JacksonMcpJsonMapper; +import io.modelcontextprotocol.server.McpServerFeatures; +import io.modelcontextprotocol.server.McpSyncServer; +import io.modelcontextprotocol.spec.McpSchema; +import lombok.extern.slf4j.Slf4j; +import org.apache.camel.Exchange; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.spring.SpringCamelContext; +import org.apache.commons.lang3.StringUtils; +import org.qubership.integration.platform.engine.model.ChainElementType; +import org.qubership.integration.platform.engine.model.constants.CamelConstants; +import org.qubership.integration.platform.engine.model.deployment.update.DeploymentInfo; +import org.qubership.integration.platform.engine.model.deployment.update.ElementProperties; +import org.qubership.integration.platform.engine.service.deployment.processing.ElementProcessingAction; +import org.qubership.integration.platform.engine.service.deployment.processing.qualifiers.OnAfterDeploymentContextCreated; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.stereotype.Component; + +import java.util.Collections; +import java.util.Map; + +import static java.util.Objects.isNull; + +@Slf4j +@Component +@OnAfterDeploymentContextCreated +public class McpToolRegistrar extends ElementProcessingAction { + public static final String DEPLOYMENT_ID = "deploymentId"; + + private final McpSyncServer mcpSyncServer; + private final McpJsonMapper mcpJsonMapper; + + @Autowired + public McpToolRegistrar( + McpSyncServer mcpSyncServer, + @Qualifier("jsonMapper") ObjectMapper objectMapper + ) { + this.mcpSyncServer = mcpSyncServer; + this.mcpJsonMapper = new JacksonMcpJsonMapper(objectMapper); + } + + @Override + public boolean applicableTo(ElementProperties properties) { + String elementType = properties.getProperties().get(CamelConstants.ChainProperties.ELEMENT_TYPE); + ChainElementType chainElementType = ChainElementType.fromString(elementType); + return ChainElementType.MCP_TRIGGER.equals(chainElementType); + } + + @Override + public void apply(SpringCamelContext context, ElementProperties properties, DeploymentInfo deploymentInfo) { + McpSchema.Tool tool = buildMcpTool(properties, deploymentInfo); + McpServerFeatures.SyncToolSpecification toolSpecification = McpServerFeatures.SyncToolSpecification.builder() + .tool(tool) + .callHandler((mcpExchange, request) -> { + ProducerTemplate producerTemplate = context.createProducerTemplate(); + String endpointUri = "direct:" + properties.getElementId(); + Exchange result = producerTemplate.request(endpointUri, exchange -> + exchange.getIn().setBody(request.arguments())); + McpSchema.CallToolResult.Builder builder = McpSchema.CallToolResult.builder(); + if (isNull(tool.outputSchema())) { + builder.textContent(Collections.singletonList(result.getMessage().getBody(String.class))); + } else { + builder.structuredContent(result.getMessage().getBody()); + } + return builder.build(); + }) + .build(); + log.debug("Registering MCP tool: {}", toolSpecification.tool()); + mcpSyncServer.addTool(toolSpecification); + } + + private McpSchema.Tool buildMcpTool(ElementProperties properties, DeploymentInfo deploymentInfo) { + Map props = properties.getProperties(); + McpSchema.Tool.Builder toolBuilder = McpSchema.Tool.builder(); + toolBuilder + .name(props.get("name")) + .description(props.get("description")) + .title(props.get("title")) + .annotations(new McpSchema.ToolAnnotations( + props.get("title"), + Boolean.valueOf(props.get("readOnly")), + Boolean.valueOf(props.get("destructive")), + Boolean.valueOf(props.get("idempotent")), + Boolean.valueOf(props.get("openWorld")), + Boolean.valueOf(props.get("requiresLocal")))) + .meta(Map.of(DEPLOYMENT_ID, deploymentInfo.getDeploymentId())) + .inputSchema(mcpJsonMapper, props.get("inputSchema")); + String outputSchema = props.get("outputSchema"); + if (StringUtils.isNotBlank(outputSchema)) { + toolBuilder.outputSchema(mcpJsonMapper, outputSchema); + } + return toolBuilder.build(); + } +} diff --git a/src/main/java/org/qubership/integration/platform/engine/service/deployment/processing/actions/context/stop/McpToolUnregisterAction.java b/src/main/java/org/qubership/integration/platform/engine/service/deployment/processing/actions/context/stop/McpToolUnregisterAction.java new file mode 100644 index 00000000..4821dee8 --- /dev/null +++ b/src/main/java/org/qubership/integration/platform/engine/service/deployment/processing/actions/context/stop/McpToolUnregisterAction.java @@ -0,0 +1,44 @@ +package org.qubership.integration.platform.engine.service.deployment.processing.actions.context.stop; + +import io.modelcontextprotocol.server.McpSyncServer; +import io.modelcontextprotocol.spec.McpSchema; +import lombok.extern.slf4j.Slf4j; +import org.apache.camel.spring.SpringCamelContext; +import org.qubership.integration.platform.engine.model.deployment.update.DeploymentConfiguration; +import org.qubership.integration.platform.engine.model.deployment.update.DeploymentInfo; +import org.qubership.integration.platform.engine.service.deployment.processing.DeploymentProcessingAction; +import org.qubership.integration.platform.engine.service.deployment.processing.qualifiers.OnStopDeploymentContext; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.List; + +import static org.qubership.integration.platform.engine.service.deployment.processing.actions.context.create.McpToolRegistrar.DEPLOYMENT_ID; + +@Slf4j +@Component +@OnStopDeploymentContext +public class McpToolUnregisterAction implements DeploymentProcessingAction { + private final McpSyncServer mcpSyncServer; + + @Autowired + public McpToolUnregisterAction( + McpSyncServer mcpSyncServer + ) { + this.mcpSyncServer = mcpSyncServer; + } + + @Override + public void execute( + SpringCamelContext context, + DeploymentInfo deploymentInfo, + DeploymentConfiguration deploymentConfiguration + ) { + List toolsToRemove = mcpSyncServer.listTools() + .stream() + .filter(tool -> deploymentInfo.getDeploymentId() + .equals(tool.meta().get(DEPLOYMENT_ID))) + .toList(); + toolsToRemove.forEach(tool -> mcpSyncServer.removeTool(tool.name())); + } +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 8ef96f8b..c86f1015 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -67,6 +67,22 @@ spring: serialization: indent-output: true + ai: + mcp: + server: + enabled: true + type: SYNC + protocol: STREAMABLE + annotation-scanner: + enabled: true + streamable-http: + mcp-endpoint: /mcp + capabilities: + completion: false + tool: true + resource: false + prompt: false + app: prefix: qip