Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ and what APIs have changed, if applicable.

## [Unreleased]

## [29.64.0] - 2025-01-31
- Allow subscribing to a single D2URI

## [29.63.2] - 2025-01-31
- Make XdsDirectory lazy to subscribe the names

Expand Down Expand Up @@ -5767,7 +5770,8 @@ patch operations can re-use these classes for generating patch messages.

## [0.14.1]

[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.63.2...master
[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.64.0...master
[29.64.0]: https://github.com/linkedin/rest.li/compare/v29.63.2...v29.64.0
[29.63.2]: https://github.com/linkedin/rest.li/compare/v29.63.1...v29.63.2
[29.63.1]: https://github.com/linkedin/rest.li/compare/v29.63.0...v29.63.1
[29.63.0]: https://github.com/linkedin/rest.li/compare/v29.62.1...v29.63.0
Expand Down
43 changes: 42 additions & 1 deletion d2/src/main/java/com/linkedin/d2/xds/GlobCollectionUtils.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
package com.linkedin.d2.xds;

import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GlobCollectionUtils
{
private static final String D2_URIS_PREFIX = "/d2/uris/";
private static final String D2_URI_NODE_GLOB_COLLECTION_PREFIX = "xdstp:///indis.D2URI/";
private static final String GLOB_COLLECTION_SUFFIX = "/*";
private static final String UTF_8 = StandardCharsets.UTF_8.name();
private static final Logger LOG = LoggerFactory.getLogger(GlobCollectionUtils.class);

private GlobCollectionUtils()
{
Expand Down Expand Up @@ -63,7 +71,25 @@ public static D2UriIdentifier parse(String resourceName)

String clusterName = resourceName.substring(D2_URI_NODE_GLOB_COLLECTION_PREFIX.length(), lastIndex);

return new D2UriIdentifier(D2_URIS_PREFIX + clusterName, resourceName.substring(lastIndex + 1));
String uri;
try
{
uri = URLDecoder.decode(resourceName.substring(lastIndex + 1), UTF_8);
}
catch (UnsupportedEncodingException e)
{
// Note that this is impossible. It is only thrown if the charset isn't recognized, and UTF-8 is known to be
// supported.

throw new RuntimeException(e);
}
catch (Exception e)
{
LOG.warn("Ignoring D2URI URN with invalid URL encoding {}", resourceName, e);
return null;
}

return new D2UriIdentifier(D2_URIS_PREFIX + clusterName, uri);
}
}

Expand All @@ -81,4 +107,19 @@ public static String globCollectionUrlForClusterResource(String clusterPath)
clusterPath.substring(clusterPath.lastIndexOf('/') + 1) +
GLOB_COLLECTION_SUFFIX;
}

public static String globCollectionUrn(String clusterName, String uri)
{
try
{
return D2_URI_NODE_GLOB_COLLECTION_PREFIX + clusterName + "/" + URLEncoder.encode(uri, UTF_8);
}
catch (UnsupportedEncodingException e)
{
// Note that this is impossible. It is only thrown if the charset isn't recognized, and UTF-8 is known to be
// supported.
throw new RuntimeException(e);
}

}
}
70 changes: 70 additions & 0 deletions d2/src/main/java/com/linkedin/d2/xds/XdsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,22 @@ final void onChanged(ResourceUpdate update)
}
}

public static abstract class D2UriResourceWatcher extends ResourceWatcher
{
public D2UriResourceWatcher()
{
super(ResourceType.D2_URI);
}

public abstract void onChanged(D2URIUpdate update);

@Override
final void onChanged(ResourceUpdate update)
{
onChanged((D2URIUpdate) update);
}
}

public static abstract class WildcardResourceWatcher
{
private final ResourceType _type;
Expand Down Expand Up @@ -381,6 +397,60 @@ public String toString()
}
}

public static final class D2URIUpdate implements ResourceUpdate
{
private final XdsD2.D2URI _d2Uri;

D2URIUpdate(XdsD2.D2URI d2Uri)
{
_d2Uri = d2Uri;
}

/**
* Returns the {@link XdsD2.D2URI} that was received, or {@code null} if the URI was deleted.
*/
@Nullable
public XdsD2.D2URI getD2Uri()
{
return _d2Uri;
}

@Override
public boolean isValid()
{
// For this update type, the subscriber needs to be notified of deletions, so all D2URIUpdates are valid.
return true;
}


@Override
public boolean equals(Object o)
{
if (this == o)
{
return true;
}
if (o == null || getClass() != o.getClass())
{
return false;
}
D2URIUpdate that = (D2URIUpdate) o;
return Objects.equals(_d2Uri, that._d2Uri);
}

@Override
public int hashCode()
{
return Objects.hash(_d2Uri);
}

@Override
public String toString()
{
return MoreObjects.toStringHelper(this).add("_d2Uri", _d2Uri).toString();
}
}

public static final NodeUpdate EMPTY_NODE_UPDATE = new NodeUpdate(null);
public static final D2URIMapUpdate EMPTY_D2_URI_MAP_UPDATE = new D2URIMapUpdate(null);
public static final D2ClusterOrServiceNameUpdate EMPTY_D2_CLUSTER_OR_SERVICE_NAME_UPDATE =
Expand Down
69 changes: 53 additions & 16 deletions d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -464,26 +464,63 @@ private void handleD2URICollectionResponse(DiscoveryResponseData data)
return;
}

ResourceSubscriber subscriber =
ResourceSubscriber clusterSubscriber =
getResourceSubscriberMap(ResourceType.D2_URI_MAP).get(uriId.getClusterResourceName());
ResourceSubscriber uriSubscriber = getResourceSubscriberMap(ResourceType.D2_URI).get(resourceName);
WildcardResourceSubscriber wildcardSubscriber = getWildcardResourceSubscriber(ResourceType.D2_URI_MAP);
if (subscriber == null && wildcardSubscriber == null)
if (clusterSubscriber == null && wildcardSubscriber == null && uriSubscriber == null)
{
String msg = String.format("Ignoring D2URI resource update for untracked cluster: %s", resourceName);
_log.warn(msg);
errors.add(msg);
return;
}

// uri will be null if the data was invalid, or if the resource is being deleted.
XdsD2.D2URI uri = null;
if (resource != null)
{
try
{
uri = resource.getResource().unpack(XdsD2.D2URI.class);
}
catch (Exception e)
{
String errMsg = String.format("Failed to unpack D2URI for resource: %s", resourceName);
_log.warn(errMsg, e);
errors.add(errMsg);
}
}

if (uriSubscriber != null)
{
// Special case for the D2URI subscriber: the URI could not be deserialized. If a previous version of the data
// is present, do nothing and drop the update on the floor. If no previous version is present however, notify
// the subscriber that the URI is deleted/doesn't exist. This behavior is slightly different from the other
// types, which do not support deletions.
if (uri != null // The URI is being updated
|| resource == null // The URI is being deleted
|| uriSubscriber.getData() == null // The URI was corrupted and there was no previous version of this URI
)
{
uriSubscriber.onData(new D2URIUpdate(uri), _serverMetricsProvider);
}
}

if (clusterSubscriber == null && wildcardSubscriber == null)
{
return;
}

// Get or create a new D2URIMapUpdate which is a copy of the existing data for that cluster.
D2URIMapUpdate update = updates.computeIfAbsent(uriId.getClusterResourceName(), k ->
{
D2URIMapUpdate currentData;
// Use the existing data from whichever subscriber is present. If both are present, they will point to the same
// D2URIMapUpdate.
if (subscriber != null)
if (clusterSubscriber != null)
{
currentData = (D2URIMapUpdate) subscriber._data;
currentData = (D2URIMapUpdate) clusterSubscriber._data;
}
else
{
Expand Down Expand Up @@ -514,19 +551,11 @@ private void handleD2URICollectionResponse(DiscoveryResponseData data)
update.removeUri(uriId.getUriName());
}
}
else
// Only put valid URIs in the map. Because the D2URIMapUpdate is still created by this loop, the subscriber will
// receive an update, unblocking any waiting futures, so there is no need to insert null/invalid URIs in the map.
else if (uri != null)
{
try
{
XdsD2.D2URI uri = resource.getResource().unpack(XdsD2.D2URI.class);
update.putUri(uriId.getUriName(), uri);
}
catch (Exception e)
{
String errMsg = String.format("Failed to unpack D2URI for resource: %s", resourceName);
_log.warn(errMsg, e);
errors.add(errMsg);
}
update.putUri(uriId.getUriName(), uri);
}
});
sendAckOrNack(data.getResourceType(), data.getNonce(), errors);
Expand Down Expand Up @@ -804,6 +833,14 @@ else if (resourceUpdate instanceof D2URIMapUpdate)
trackServerLatencyForUris(updatedUris, metricsProvider, now);
trackServerLatencyForUris(rawDiff.entriesOnlyOnLeft(), metricsProvider, now); // newly added uris
}
else if (resourceUpdate instanceof D2URIUpdate)
{
XdsD2.D2URI uri = ((D2URIUpdate) resourceUpdate).getD2Uri();
if (uri != null)
{
metricsProvider.trackLatency(now - uri.getModifiedTime().getSeconds() * 1000);
}
}
}

private boolean shouldTrackServerLatency()
Expand Down
Loading