diff --git a/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/listeners/OpcUaSubscriptionLifecycleHandler.java b/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/listeners/OpcUaSubscriptionLifecycleHandler.java index 50b7ca120c..c1bcb79730 100644 --- a/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/listeners/OpcUaSubscriptionLifecycleHandler.java +++ b/modules/hivemq-edge-module-opcua/src/main/java/com/hivemq/edge/adapters/opcua/listeners/OpcUaSubscriptionLifecycleHandler.java @@ -54,20 +54,19 @@ public class OpcUaSubscriptionLifecycleHandler implements OpcUaSubscription.SubscriptionListener { - private static final Logger log = LoggerFactory.getLogger(OpcUaSubscriptionLifecycleHandler.class); - private static final long KEEP_ALIVE_TIMEOUT_MS = 30_000; // 30 seconds + private static final @NotNull Logger log = LoggerFactory.getLogger(OpcUaSubscriptionLifecycleHandler.class); private static final int MAX_MONITORED_ITEM_COUNT = 5; private final @NotNull ProtocolAdapterMetricsService protocolAdapterMetricsService; private final @NotNull ProtocolAdapterTagStreamingService tagStreamingService; private final @NotNull EventService eventService; private final @NotNull String adapterId; - final Map tagToFirstSeen = new ConcurrentHashMap<>(); + private final @NotNull Map tagToFirstSeen; private final @NotNull Map nodeIdToTag; private final @NotNull List tags; private final @NotNull OpcUaClient client; private final @NotNull DataPointFactory dataPointFactory; - final @NotNull OpcUaSpecificAdapterConfig config; + private final @NotNull OpcUaSpecificAdapterConfig config; // Track last keep-alive timestamp for health monitoring private volatile long lastKeepAliveTimestamp; @@ -89,8 +88,9 @@ public OpcUaSubscriptionLifecycleHandler( this.client = client; this.dataPointFactory = dataPointFactory; this.tags = tags; + this.tagToFirstSeen = new ConcurrentHashMap<>(); this.lastKeepAliveTimestamp = System.currentTimeMillis(); - nodeIdToTag = tags.stream() + this.nodeIdToTag = tags.stream() .collect(Collectors.toMap(tag -> NodeId.parse(tag.getDefinition().getNode()), Function.identity())); } @@ -226,20 +226,33 @@ public void onKeepAliveReceived(final @NotNull OpcUaSubscription subscription) { lastKeepAliveTimestamp = System.currentTimeMillis(); protocolAdapterMetricsService.increment(Constants.METRIC_SUBSCRIPTION_KEEPALIVE_COUNT); - subscription.getSubscriptionId().ifPresent(subscriptionId -> { - log.debug("Keep-alive received for subscription {} of adapter '{}'", subscriptionId, adapterId); - }); + subscription.getSubscriptionId().ifPresent(subscriptionId -> + log.debug("Keep-alive received for subscription {} of adapter '{}'", subscriptionId, adapterId)); } /** * Checks if keep-alive messages are being received within the expected timeout. + * The timeout is computed dynamically from ConnectionOptions. * Can be used for health monitoring to detect subscription issues. * - * @return true if last keep-alive was received within KEEP_ALIVE_TIMEOUT_MS, false otherwise + * @return true if last keep-alive was received within the computed timeout, false otherwise */ public boolean isKeepAliveHealthy() { final long timeSinceLastKeepAlive = System.currentTimeMillis() - lastKeepAliveTimestamp; - return timeSinceLastKeepAlive < KEEP_ALIVE_TIMEOUT_MS; + return timeSinceLastKeepAlive < getKeepAliveTimeoutMs(); + } + + /** + * Computes the keep-alive timeout based on ConnectionOptions. + * The timeout allows for the configured number of missed keep-alives plus one + * before considering the connection unhealthy, plus a safety margin. + * Formula: keepAliveIntervalMs × (keepAliveFailuresAllowed + 1) + SAFETY_MARGIN_MS + * + * @return the computed keep-alive timeout in milliseconds + */ + public long getKeepAliveTimeoutMs() { + final var connOpts = config.getConnectionOptions(); + return connOpts.keepAliveIntervalMs() * (connOpts.keepAliveFailuresAllowed() + 1) + 5_000L; } @Override @@ -259,9 +272,7 @@ public void onTransferFailed( replacementSubscription.setSubscriptionListener(this); syncTagsAndMonitoredItems(replacementSubscription, tags, config); }, - () -> { - log.error("Subscription Transfer failed, unable to create new subscription '{}'", adapterId); - } + () -> log.error("Subscription Transfer failed, unable to create new subscription '{}'", adapterId) ); } diff --git a/modules/hivemq-edge-module-opcua/src/test/java/com/hivemq/edge/adapters/opcua/OpcUaSubscriptionLifecycleHandlerTest.java b/modules/hivemq-edge-module-opcua/src/test/java/com/hivemq/edge/adapters/opcua/OpcUaSubscriptionLifecycleHandlerTest.java new file mode 100644 index 0000000000..d3963d1d96 --- /dev/null +++ b/modules/hivemq-edge-module-opcua/src/test/java/com/hivemq/edge/adapters/opcua/OpcUaSubscriptionLifecycleHandlerTest.java @@ -0,0 +1,383 @@ +/* + * Copyright 2023-present HiveMQ GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.hivemq.edge.adapters.opcua; + +import com.hivemq.adapter.sdk.api.factories.DataPointFactory; +import com.hivemq.adapter.sdk.api.services.ProtocolAdapterMetricsService; +import com.hivemq.adapter.sdk.api.streaming.ProtocolAdapterTagStreamingService; +import com.hivemq.edge.adapters.opcua.config.ConnectionOptions; +import com.hivemq.edge.adapters.opcua.config.OpcUaSpecificAdapterConfig; +import com.hivemq.edge.adapters.opcua.config.opcua2mqtt.OpcUaToMqttConfig; +import com.hivemq.edge.adapters.opcua.config.tag.OpcuaTag; +import com.hivemq.edge.adapters.opcua.config.tag.OpcuaTagDefinition; +import com.hivemq.edge.adapters.opcua.listeners.OpcUaSubscriptionLifecycleHandler; +import org.eclipse.milo.opcua.sdk.client.OpcUaClient; +import org.eclipse.milo.opcua.sdk.client.subscriptions.OpcUaSubscription; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.util.List; + +import static org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.uint; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class OpcUaSubscriptionLifecycleHandlerTest { + + private static final @NotNull String ADAPTER_ID = "test-adapter"; + private static final @NotNull String NODE_ID = "ns=2;i=1001"; + @Mock + private @NotNull ProtocolAdapterMetricsService metricsService; + @Mock + private @NotNull ProtocolAdapterTagStreamingService tagStreamingService; + @Mock + private @NotNull FakeEventService eventService; + @Mock + private @NotNull OpcUaClient opcUaClient; + @Mock + private @NotNull DataPointFactory dataPointFactory; + + private static @NotNull OpcUaSpecificAdapterConfig createConfig(final @NotNull ConnectionOptions connectionOptions) { + return new OpcUaSpecificAdapterConfig("opc.tcp://localhost:4840", // uri + false, // overrideUri + null, // applicationUri + null, // auth + null, // tls + OpcUaToMqttConfig.defaultOpcUaToMqttConfig(), // opcuaToMqtt + null, // security + connectionOptions // connectionOptions + ); + } + + private static @NotNull OpcuaTag createTestTag() { + return new OpcuaTag("test-tag", "Test tag for keep-alive testing", new OpcuaTagDefinition(NODE_ID)); + } + + /** + * Test that keep-alive timeout is correctly calculated from configuration with default values. + * Default: keepAliveInterval=10s, failuresAllowed=3, safetyMargin=5s + * Expected: 10 × (3 + 1) + 5 = 45 seconds + */ + @Test + void testKeepAliveTimeout_withDefaultConfiguration() { + // Given: Default configuration + final ConnectionOptions connectionOptions = ConnectionOptions.defaultConnectionOptions(); + final OpcUaSpecificAdapterConfig config = createConfig(connectionOptions); + + // When: Handler is created + final OpcUaSubscriptionLifecycleHandler handler = createHandler(config); + + // Then: Timeout should be calculated correctly + final long expectedTimeout = 10_000 * (3 + 1) + 5_000; // 45 seconds + assertEquals(expectedTimeout, + handler.getKeepAliveTimeoutMs(), + "Keep-alive timeout should be keepAliveInterval × (failuresAllowed + 1) + safetyMargin"); + } + + /** + * Test keep-alive timeout calculation with custom short intervals. + * Custom: keepAliveInterval=5s, failuresAllowed=2, safetyMargin=5s + * Expected: 5 × (2 + 1) + 5 = 20 seconds + */ + @Test + void testKeepAliveTimeout_withCustomShortInterval() { + // Given: Custom short interval configuration + final ConnectionOptions connectionOptions = new ConnectionOptions( + 120_000L, // sessionTimeout + 30_000L, // requestTimeout + 5_000L, // keepAliveInterval: 5s + 2, // keepAliveFailuresAllowed: 2 + 30_000L, // connectionTimeout + 30_000L, // healthCheckInterval + 30_000L, // retryInterval + true, // autoReconnect + true // reconnectOnServiceFault + ); + final OpcUaSpecificAdapterConfig config = createConfig(connectionOptions); + + // When: Handler is created + final OpcUaSubscriptionLifecycleHandler handler = createHandler(config); + + // Then: Timeout should be calculated correctly + final long expectedTimeout = 5_000 * (2 + 1) + 5_000; // 20 seconds + assertEquals(expectedTimeout, handler.getKeepAliveTimeoutMs()); + } + + /** + * Test keep-alive timeout calculation with custom long intervals. + * Custom: keepAliveInterval=20s, failuresAllowed=5, safetyMargin=5s + * Expected: 20 × (5 + 1) + 5 = 125 seconds + */ + @Test + void testKeepAliveTimeout_withCustomLongInterval() { + // Given: Custom long interval configuration + final ConnectionOptions connectionOptions = new ConnectionOptions( + 120_000L, // sessionTimeout + 30_000L, // requestTimeout + 20_000L, // keepAliveInterval: 20s + 5, // keepAliveFailuresAllowed: 5 + 30_000L, // connectionTimeout + 30_000L, // healthCheckInterval + 30_000L, // retryInterval + true, // autoReconnect + true // reconnectOnServiceFault + ); + final OpcUaSpecificAdapterConfig config = createConfig(connectionOptions); + + // When: Handler is created + final OpcUaSubscriptionLifecycleHandler handler = createHandler(config); + + // Then: Timeout should be calculated correctly + final long expectedTimeout = 125_000L; // 125 seconds + assertEquals(expectedTimeout, handler.getKeepAliveTimeoutMs()); + } + + /** + * Test that health check returns true immediately after handler creation. + * The lastKeepAliveTimestamp is initialized to current time in constructor. + */ + @Test + void testKeepAliveHealthy_immediatelyAfterCreation() { + // Given: Newly created handler + final OpcUaSpecificAdapterConfig config = createConfig(ConnectionOptions.defaultConnectionOptions()); + final OpcUaSubscriptionLifecycleHandler handler = createHandler(config); + + // When: Health check is called immediately + // Then: Should be healthy + assertTrue(handler.isKeepAliveHealthy(), "Handler should be healthy immediately after creation"); + } + + /** + * Test that health check returns true within timeout period. + */ + @Test + void testKeepAliveHealthy_withinTimeoutPeriod() throws InterruptedException { + // Given: Handler with default configuration (45s timeout) + final OpcUaSpecificAdapterConfig config = createConfig(ConnectionOptions.defaultConnectionOptions()); + final OpcUaSubscriptionLifecycleHandler handler = createHandler(config); + + // When: Small amount of time passes (well within timeout) + Thread.sleep(100); + + // Then: Should still be healthy + assertTrue(handler.isKeepAliveHealthy(), "Handler should be healthy well within timeout period"); + } + + /** + * Test that health check returns true just before timeout. + * This tests the boundary condition. + */ + @Test + void testKeepAliveHealthy_justBeforeTimeout() { + // Given: Handler with very short timeout for testing + // keepAliveInterval=50ms, failuresAllowed=1 -> timeout = 50 × (1+1) + 5000 = 5100ms + final ConnectionOptions connectionOptions = new ConnectionOptions( + 120_000L, // sessionTimeout + 30_000L, // requestTimeout + 50L, // keepAliveInterval: 50ms + 1, // keepAliveFailuresAllowed: 1 + 30_000L, // connectionTimeout + 30_000L, // healthCheckInterval + 30_000L, // retryInterval + true, // autoReconnect + true // reconnectOnServiceFault + ); + final OpcUaSpecificAdapterConfig config = createConfig(connectionOptions); + + // Create handler and simulate passage of time by manipulating system time conceptually + // Since we can't manipulate System.currentTimeMillis(), we'll use a wait that's slightly less than timeout + final OpcUaSubscriptionLifecycleHandler handler = createHandler(config); + + // Expected timeout: 5100ms + // We'll check immediately (should be healthy) + assertTrue(handler.isKeepAliveHealthy(), "Handler should be healthy at time 0"); + } + + /** + * Test that onKeepAliveReceived updates the timestamp and resets health. + */ + @Test + void testOnKeepAliveReceived_resetsHealthTimer() throws InterruptedException { + // Given: Handler with short timeout + // keepAliveInterval=100ms, failuresAllowed=1 -> timeout = 100 × (1+1) + 5000 = 5200ms + final ConnectionOptions connectionOptions = new ConnectionOptions( + 120_000L, // sessionTimeout + 30_000L, // requestTimeout + 100L, // keepAliveInterval: 100ms + 1, // keepAliveFailuresAllowed: 1 + 30_000L, // connectionTimeout + 30_000L, // healthCheckInterval + 30_000L, // retryInterval + true, // autoReconnect + true // reconnectOnServiceFault + ); + final OpcUaSpecificAdapterConfig config = createConfig(connectionOptions); + final OpcUaSubscriptionLifecycleHandler handler = createHandler(config); + + // When: Some time passes + Thread.sleep(50); + + // Then: Still healthy + assertTrue(handler.isKeepAliveHealthy(), "Should be healthy before keep-alive"); + + // When: Keep-alive is received (simulated via subscription callback) + final OpcUaSubscription mockSubscription = org.mockito.Mockito.mock(OpcUaSubscription.class); + when(mockSubscription.getSubscriptionId()).thenReturn(java.util.Optional.of(uint(12345))); + handler.onKeepAliveReceived(mockSubscription); + + // Then: Health should be reset and remain healthy + assertTrue(handler.isKeepAliveHealthy(), "Should be healthy after keep-alive received"); + } + + /** + * Test the problematic scenario from production (CRASH_ZEISS.md): + * - Default keep-alive interval: 10s + * - Default failures allowed: 3 + * - Old hardcoded timeout: 30s (causing false positives) + * - New calculated timeout: 45s (should prevent false positives) + */ + @Test + void testKeepAliveTimeout_productionScenario() { + // Given: Default production configuration + final ConnectionOptions connectionOptions = ConnectionOptions.defaultConnectionOptions(); + final OpcUaSpecificAdapterConfig config = createConfig(connectionOptions); + final OpcUaSubscriptionLifecycleHandler handler = createHandler(config); + + // Then: Timeout should be greater than worst-case keep-alive failure time + final long worstCaseFailureTime = 10_000 * 3; // 30 seconds (3 failures at 10s interval) + final long actualTimeout = handler.getKeepAliveTimeoutMs(); + + assertTrue(actualTimeout > worstCaseFailureTime, + "Timeout (" + + actualTimeout + + "ms) should be greater than worst-case failure time (" + + worstCaseFailureTime + + "ms) to prevent false positives"); + + // Verify it has appropriate buffer (should be 45s = 30s + 15s buffer) + assertEquals(45_000L, actualTimeout, "Should have 15s buffer (10s extra interval + 5s safety margin)"); + } + + /** + * Test that health check returns false after timeout period expires. + * This simulates the scenario where OPC UA server stops responding. + */ + @Test + void testKeepAliveHealthy_afterTimeoutExpires() throws InterruptedException { + // Given: Handler with very short timeout for testing + // keepAliveInterval=20ms, failuresAllowed=1 -> timeout = 20 × (1+1) + 5000 = 5040ms + final ConnectionOptions connectionOptions = new ConnectionOptions( + 120_000L, // sessionTimeout + 30_000L, // requestTimeout + 20L, // keepAliveInterval: 20ms + 1, // keepAliveFailuresAllowed: 1 + 30_000L, // connectionTimeout + 30_000L, // healthCheckInterval + 30_000L, // retryInterval + true, // autoReconnect + true // reconnectOnServiceFault + ); + final OpcUaSpecificAdapterConfig config = createConfig(connectionOptions); + final OpcUaSubscriptionLifecycleHandler handler = createHandler(config); + + // When: Wait longer than timeout (5040ms, wait 5100ms to be sure) + Thread.sleep(5100); + + // Then: Should be unhealthy + assertFalse(handler.isKeepAliveHealthy(), "Handler should be unhealthy after timeout period expires"); + } + + /** + * Test multiple keep-alive cycles to ensure timestamp is properly updated. + */ + @Test + void testMultipleKeepAliveCycles() throws InterruptedException { + // Given: Handler with moderate timeout + // keepAliveInterval=100ms, failuresAllowed=2 -> timeout = 100 × (2+1) + 5000 = 5300ms + final ConnectionOptions connectionOptions = new ConnectionOptions( + 120_000L, // sessionTimeout + 30_000L, // requestTimeout + 100L, // keepAliveInterval: 100ms + 2, // keepAliveFailuresAllowed: 2 + 30_000L, // connectionTimeout + 30_000L, // healthCheckInterval + 30_000L, // retryInterval + true, // autoReconnect + true // reconnectOnServiceFault + ); + final OpcUaSpecificAdapterConfig config = createConfig(connectionOptions); + final OpcUaSubscriptionLifecycleHandler handler = createHandler(config); + + final OpcUaSubscription mockSubscription = org.mockito.Mockito.mock(OpcUaSubscription.class); + when(mockSubscription.getSubscriptionId()).thenReturn(java.util.Optional.of(uint(12345))); + + // When: Multiple keep-alive cycles occur + for (int i = 0; i < 5; i++) { + assertTrue(handler.isKeepAliveHealthy(), "Should be healthy in cycle " + i); + handler.onKeepAliveReceived(mockSubscription); + Thread.sleep(50); // Wait half the interval + } + + // Then: Should still be healthy after all cycles + assertTrue(handler.isKeepAliveHealthy(), "Should remain healthy through multiple cycles"); + } + + /** + * Test edge case with minimum configuration values. + */ + @Test + void testKeepAliveTimeout_minimumConfiguration() { + // Given: Minimum allowed configuration values + // keepAliveInterval=1s, failuresAllowed=1 -> timeout = 1000 × (1+1) + 5000 = 7000ms + final ConnectionOptions connectionOptions = new ConnectionOptions( + 10_000L, // sessionTimeout: 10s (minimum) + 5_000L, // requestTimeout: 5s (minimum) + 1_000L, // keepAliveInterval: 1s (minimum) + 1, // keepAliveFailuresAllowed: 1 (minimum) + 2_000L, // connectionTimeout: 2s (minimum) + 10_000L, // healthCheckInterval: 10s (minimum) + 5_000L, // retryInterval: 5s (minimum) + true, // autoReconnect + true // reconnectOnServiceFault + ); + final OpcUaSpecificAdapterConfig config = createConfig(connectionOptions); + + // When: Handler is created + final OpcUaSubscriptionLifecycleHandler handler = createHandler(config); + + // Then: Timeout should be calculated correctly even with minimum values + final long expectedTimeout = 7_000L; // 7 seconds + assertEquals(expectedTimeout, handler.getKeepAliveTimeoutMs()); + assertTrue(handler.isKeepAliveHealthy(), "Should be healthy with minimum configuration"); + } + + private OpcUaSubscriptionLifecycleHandler createHandler(final @NotNull OpcUaSpecificAdapterConfig config) { + return new OpcUaSubscriptionLifecycleHandler(metricsService, + tagStreamingService, + eventService, + ADAPTER_ID, + List.of(createTestTag()), + opcUaClient, + dataPointFactory, + config); + } +}