Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,20 +54,19 @@

public class OpcUaSubscriptionLifecycleHandler implements OpcUaSubscription.SubscriptionListener {

private static final Logger log = LoggerFactory.getLogger(OpcUaSubscriptionLifecycleHandler.class);
private static final long KEEP_ALIVE_TIMEOUT_MS = 30_000; // 30 seconds
private static final @NotNull Logger log = LoggerFactory.getLogger(OpcUaSubscriptionLifecycleHandler.class);
private static final int MAX_MONITORED_ITEM_COUNT = 5;

private final @NotNull ProtocolAdapterMetricsService protocolAdapterMetricsService;
private final @NotNull ProtocolAdapterTagStreamingService tagStreamingService;
private final @NotNull EventService eventService;
private final @NotNull String adapterId;
final Map<OpcuaTag, Boolean> tagToFirstSeen = new ConcurrentHashMap<>();
private final @NotNull Map<OpcuaTag, Boolean> tagToFirstSeen;
private final @NotNull Map<NodeId, OpcuaTag> nodeIdToTag;
private final @NotNull List<OpcuaTag> tags;
private final @NotNull OpcUaClient client;
private final @NotNull DataPointFactory dataPointFactory;
final @NotNull OpcUaSpecificAdapterConfig config;
private final @NotNull OpcUaSpecificAdapterConfig config;

// Track last keep-alive timestamp for health monitoring
private volatile long lastKeepAliveTimestamp;
Expand All @@ -89,8 +88,9 @@ public OpcUaSubscriptionLifecycleHandler(
this.client = client;
this.dataPointFactory = dataPointFactory;
this.tags = tags;
this.tagToFirstSeen = new ConcurrentHashMap<>();
this.lastKeepAliveTimestamp = System.currentTimeMillis();
nodeIdToTag = tags.stream()
this.nodeIdToTag = tags.stream()
.collect(Collectors.toMap(tag -> NodeId.parse(tag.getDefinition().getNode()), Function.identity()));
}

Expand Down Expand Up @@ -226,20 +226,33 @@ public void onKeepAliveReceived(final @NotNull OpcUaSubscription subscription) {
lastKeepAliveTimestamp = System.currentTimeMillis();
protocolAdapterMetricsService.increment(Constants.METRIC_SUBSCRIPTION_KEEPALIVE_COUNT);

subscription.getSubscriptionId().ifPresent(subscriptionId -> {
log.debug("Keep-alive received for subscription {} of adapter '{}'", subscriptionId, adapterId);
});
subscription.getSubscriptionId().ifPresent(subscriptionId ->
log.debug("Keep-alive received for subscription {} of adapter '{}'", subscriptionId, adapterId));
}

/**
* Checks if keep-alive messages are being received within the expected timeout.
* The timeout is computed dynamically from ConnectionOptions.
* Can be used for health monitoring to detect subscription issues.
*
* @return true if last keep-alive was received within KEEP_ALIVE_TIMEOUT_MS, false otherwise
* @return true if last keep-alive was received within the computed timeout, false otherwise
*/
public boolean isKeepAliveHealthy() {
final long timeSinceLastKeepAlive = System.currentTimeMillis() - lastKeepAliveTimestamp;
return timeSinceLastKeepAlive < KEEP_ALIVE_TIMEOUT_MS;
return timeSinceLastKeepAlive < getKeepAliveTimeoutMs();
}

/**
* Computes the keep-alive timeout based on ConnectionOptions.
* The timeout allows for the configured number of missed keep-alives plus one
* before considering the connection unhealthy, plus a safety margin.
* Formula: keepAliveIntervalMs × (keepAliveFailuresAllowed + 1) + SAFETY_MARGIN_MS
*
* @return the computed keep-alive timeout in milliseconds
*/
public long getKeepAliveTimeoutMs() {
final var connOpts = config.getConnectionOptions();
return connOpts.keepAliveIntervalMs() * (connOpts.keepAliveFailuresAllowed() + 1) + 5_000L;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we give a name to that magic number?

}

@Override
Expand All @@ -259,9 +272,7 @@ public void onTransferFailed(
replacementSubscription.setSubscriptionListener(this);
syncTagsAndMonitoredItems(replacementSubscription, tags, config);
},
() -> {
log.error("Subscription Transfer failed, unable to create new subscription '{}'", adapterId);
}
() -> log.error("Subscription Transfer failed, unable to create new subscription '{}'", adapterId)
);
}

Expand Down
Loading
Loading