From 14d5172a26bcabd6c688dc0fa79f6e4dc6dd3db8 Mon Sep 17 00:00:00 2001 From: Moritz Lang <16192401+slashmo@users.noreply.github.com> Date: Mon, 4 Aug 2025 23:52:02 +0200 Subject: [PATCH 01/23] Add static name to Valkey commands Signed-off-by: Moritz Lang <16192401+slashmo@users.noreply.github.com> --- .../Valkey/Connection/ValkeyConnection.swift | 26 ++++++++++++------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/Sources/Valkey/Connection/ValkeyConnection.swift b/Sources/Valkey/Connection/ValkeyConnection.swift index ca472f3e..ff336a53 100644 --- a/Sources/Valkey/Connection/ValkeyConnection.swift +++ b/Sources/Valkey/Connection/ValkeyConnection.swift @@ -17,6 +17,10 @@ import Network import NIOTransportServices #endif +#if DistributedTracingSupport +import Tracing +#endif + /// A single connection to a Valkey database. @available(valkeySwift 1.0, *) public final actor ValkeyConnection: ValkeyClientProtocol, Sendable { @@ -153,16 +157,20 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable { @inlinable func _execute(command: Command) async throws -> RESPToken { - let requestID = Self.requestIDGenerator.next() - return try await withTaskCancellationHandler { - if Task.isCancelled { - throw ValkeyClientError(.cancelled) - } - return try await withCheckedThrowingContinuation { continuation in - self.channelHandler.write(command: command, continuation: continuation, requestID: requestID) + try await withSpan(Command.name, ofKind: .client) { span in + span.attributes["db.system.name"] = "valkey" + + let requestID = Self.requestIDGenerator.next() + return try await withTaskCancellationHandler { + if Task.isCancelled { + throw ValkeyClientError(.cancelled) + } + return try await withCheckedThrowingContinuation { continuation in + self.channelHandler.write(command: command, continuation: continuation, requestID: requestID) + } + } onCancel: { + self.cancel(requestID: requestID) } - } onCancel: { - self.cancel(requestID: requestID) } } From 8560eca3bfbcbe524960c2c13e1edff8b704b4fe Mon Sep 17 00:00:00 2001 From: Moritz Lang <16192401+slashmo@users.noreply.github.com> Date: Tue, 5 Aug 2025 21:41:28 +0200 Subject: [PATCH 02/23] Add Distributed Tracing support behind new trait Signed-off-by: Moritz Lang <16192401+slashmo@users.noreply.github.com> --- Package.swift | 5 +- .../Valkey/Connection/ValkeyConnection.swift | 93 +++++++- Sources/Valkey/ValkeyConnectionFactory.swift | 1 + .../ClientIntegrationTests.swift | 1 + Tests/ValkeyTests/Utils/TestTracer.swift | 209 +++++++++++++++++ Tests/ValkeyTests/ValkeyConnectionTests.swift | 216 ++++++++++++++++++ 6 files changed, 520 insertions(+), 5 deletions(-) create mode 100644 Tests/ValkeyTests/Utils/TestTracer.swift diff --git a/Package.swift b/Package.swift index f487d4b1..4f497d9c 100644 --- a/Package.swift +++ b/Package.swift @@ -18,12 +18,14 @@ let package = Package( ], traits: [ .trait(name: "ServiceLifecycleSupport"), - .default(enabledTraits: ["ServiceLifecycleSupport"]), + .trait(name: "DistributedTracingSupport"), + .default(enabledTraits: ["ServiceLifecycleSupport", "DistributedTracingSupport"]), ], dependencies: [ .package(url: "https://github.com/apple/swift-atomics.git", from: "1.0.0"), .package(url: "https://github.com/apple/swift-collections.git", from: "1.1.4"), .package(url: "https://github.com/apple/swift-log.git", from: "1.6.3"), + .package(url: "https://github.com/apple/swift-distributed-tracing.git", from: "1.0.0"), .package(url: "https://github.com/apple/swift-nio.git", from: "2.81.0"), .package(url: "https://github.com/apple/swift-nio-ssl.git", from: "2.29.0"), .package(url: "https://github.com/apple/swift-nio-transport-services.git", from: "1.23.0"), @@ -36,6 +38,7 @@ let package = Package( .byName(name: "_ValkeyConnectionPool"), .product(name: "DequeModule", package: "swift-collections"), .product(name: "Logging", package: "swift-log"), + .product(name: "Tracing", package: "swift-distributed-tracing", condition: .when(traits: ["DistributedTracingSupport"])), .product(name: "NIOCore", package: "swift-nio"), .product(name: "NIOPosix", package: "swift-nio"), .product(name: "NIOSSL", package: "swift-nio-ssl"), diff --git a/Sources/Valkey/Connection/ValkeyConnection.swift b/Sources/Valkey/Connection/ValkeyConnection.swift index ff336a53..748850bd 100644 --- a/Sources/Valkey/Connection/ValkeyConnection.swift +++ b/Sources/Valkey/Connection/ValkeyConnection.swift @@ -38,6 +38,8 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable { @usableFromInline let channelHandler: ValkeyChannelHandler let configuration: ValkeyConnectionConfiguration + @usableFromInline + let address: (hostOrSocketPath: String, port: Int?)? let isClosed: Atomic /// Initialize connection @@ -46,6 +48,7 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable { connectionID: ID, channelHandler: ValkeyChannelHandler, configuration: ValkeyConnectionConfiguration, + address: ValkeyServerAddress?, logger: Logger ) { self.unownedExecutor = channel.eventLoop.executor.asUnownedSerialExecutor() @@ -54,6 +57,14 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable { self.configuration = configuration self.id = connectionID self.logger = logger + switch address?.value { + case let .hostname(host, port): + self.address = (host, port) + case let .unixDomainSocket(path): + self.address = (path, nil) + case nil: + self.address = nil + } self.isClosed = .init(false) } @@ -157,10 +168,19 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable { @inlinable func _execute(command: Command) async throws -> RESPToken { - try await withSpan(Command.name, ofKind: .client) { span in - span.attributes["db.system.name"] = "valkey" + #if DistributedTracingSupport + let span = startSpan(Command.name, ofKind: .client) + defer { span.end() } + + span.updateAttributes { attributes in + attributes["db.operation.name"] = Command.name + applyCommonAttributes(to: &attributes) + } + #endif + + let requestID = Self.requestIDGenerator.next() - let requestID = Self.requestIDGenerator.next() + do { return try await withTaskCancellationHandler { if Task.isCancelled { throw ValkeyClientError(.cancelled) @@ -171,6 +191,25 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable { } onCancel: { self.cancel(requestID: requestID) } + } catch let error as ValkeyClientError { + #if DistributedTracingSupport + span.recordError(error) + if let message = error.message { + var prefixEndIndex = message.startIndex + while prefixEndIndex < message.endIndex, message[prefixEndIndex] != " " { + message.formIndex(after: &prefixEndIndex) + } + let prefix = message[message.startIndex ..< prefixEndIndex] + span.attributes["db.response.status_code"] = "\(prefix)" + span.setStatus(SpanStatus(code: .error)) + } + #endif + throw error + } catch { + #if DistributedTracingSupport + span.recordError(error) + #endif + throw error } } @@ -185,8 +224,42 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable { public func execute( _ commands: repeat each Command ) async -> sending (repeat Result<(each Command).Response, Error>) { + #if DistributedTracingSupport + let span = startSpan("MULTI", ofKind: .client) + defer { span.end() } + + // We want to suffix the `db.operation.name` if all pipelined commands are of the same type. + var commandName: String? + var operationNameSuffix: String? + var commandCount = 0 + + for command in repeat each commands { + commandCount += 1 + if commandName == nil { + commandName = Swift.type(of: command).name + operationNameSuffix = commandName + } else if commandName != Swift.type(of: command).name { + // We should only add a suffix if all commands in the transaction are the same. + operationNameSuffix = nil + } + } + let operationName = operationNameSuffix.map { "MULTI \($0)" } ?? "MULTI" + + span.updateAttributes { attributes in + attributes["db.operation.name"] = operationName + attributes["db.operation.batch.size"] = commandCount > 1 ? commandCount : nil + applyCommonAttributes(to: &attributes) + } + #endif + func convert(_ result: Result, to: Response.Type) -> Result { - result.flatMap { + #if DistributedTracingSupport + if case .failure(let error) = result { + span.recordError(error) + } + #endif + + return result.flatMap { do { return try .success(Response(fromRESP: $0)) } catch { @@ -221,6 +294,16 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable { } } + @usableFromInline + func applyCommonAttributes(to attributes: inout SpanAttributes) { + // TODO: Should this be redis as recommended by OTel semconv or valkey as seen in valkey-go? + attributes["db.system.name"] = "valkey" + attributes["network.peer.address"] = channel.remoteAddress?.ipAddress + attributes["network.peer.port"] = channel.remoteAddress?.port + attributes["server.address"] = address?.hostOrSocketPath + attributes["server.port"] = address?.port == 6379 ? nil : address?.port + } + @usableFromInline nonisolated func cancel(requestID: Int) { self.channel.eventLoop.execute { @@ -286,6 +369,7 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable { connectionID: connectionID, channelHandler: handler, configuration: configuration, + address: address, logger: logger ) } @@ -321,6 +405,7 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable { connectionID: 0, channelHandler: handler, configuration: configuration, + address: .hostname("127.0.0.1", port: 6379), logger: logger ) return channel.connect(to: try SocketAddress(ipAddress: "127.0.0.1", port: 6379)).map { diff --git a/Sources/Valkey/ValkeyConnectionFactory.swift b/Sources/Valkey/ValkeyConnectionFactory.swift index 8a039113..2a64b3d8 100644 --- a/Sources/Valkey/ValkeyConnectionFactory.swift +++ b/Sources/Valkey/ValkeyConnectionFactory.swift @@ -85,6 +85,7 @@ package final class ValkeyConnectionFactory: Sendable { connectionID: connectionID, channelHandler: channelHandler, configuration: connectionConfig, + address: nil, logger: logger ) }.get() diff --git a/Tests/IntegrationTests/ClientIntegrationTests.swift b/Tests/IntegrationTests/ClientIntegrationTests.swift index 9a788fe2..5c007f8a 100644 --- a/Tests/IntegrationTests/ClientIntegrationTests.swift +++ b/Tests/IntegrationTests/ClientIntegrationTests.swift @@ -70,6 +70,7 @@ struct ClientIntegratedTests { func testValkeyCommand() async throws { struct GET: ValkeyCommand { typealias Response = String? + static let name = "GET" static let name = "GET" diff --git a/Tests/ValkeyTests/Utils/TestTracer.swift b/Tests/ValkeyTests/Utils/TestTracer.swift new file mode 100644 index 00000000..1b0f7273 --- /dev/null +++ b/Tests/ValkeyTests/Utils/TestTracer.swift @@ -0,0 +1,209 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the valkey-swift open source project +// +// Copyright (c) 2025 the valkey-swift project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of valkey-swift project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Distributed Tracing open source project +// +// Copyright (c) 2020-2023 Apple Inc. and the Swift Distributed Tracing project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of Swift Distributed Tracing project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +#if DistributedTracingSupport +import Dispatch +import Foundation +import Instrumentation +import ServiceContextModule +import Tracing + +/// Only intended to be used in single-threaded testing. +final class TestTracer { + private(set) var spans = [TestSpan]() + var onEndSpan: (TestSpan) -> Void = { _ in } + + func startAnySpan( + _ operationName: String, + context: @autoclosure () -> ServiceContext, + ofKind kind: SpanKind, + at instant: @autoclosure () -> Instant, + function: String, + file fileID: String, + line: UInt + ) -> any Tracing.Span { + let span = TestSpan( + operationName: operationName, + startTime: instant(), + context: context(), + kind: kind, + onEnd: self.onEndSpan + ) + self.spans.append(span) + return span + } + + func forceFlush() {} + + func extract(_ carrier: Carrier, into context: inout ServiceContext, using extractor: Extract) + where + Extract: Extractor, + Carrier == Extract.Carrier + { + let traceID = extractor.extract(key: "trace-id", from: carrier) ?? UUID().uuidString + context.traceID = traceID + } + + func inject(_ context: ServiceContext, into carrier: inout Carrier, using injector: Inject) + where + Inject: Injector, + Carrier == Inject.Carrier + { + guard let traceID = context.traceID else { return } + injector.inject(traceID, forKey: "trace-id", into: &carrier) + } +} + +extension TestTracer: Tracer { + func startSpan( + _ operationName: String, + context: @autoclosure () -> ServiceContext, + ofKind kind: SpanKind, + at instant: @autoclosure () -> Instant, + function: String, + file fileID: String, + line: UInt + ) -> TestSpan { + let span = TestSpan( + operationName: operationName, + startTime: instant(), + context: context(), + kind: kind, + onEnd: self.onEndSpan + ) + self.spans.append(span) + return span + } +} + +extension TestTracer { + enum TraceIDKey: ServiceContextKey { + typealias Value = String + } + + enum SpanIDKey: ServiceContextKey { + typealias Value = String + } +} + +extension ServiceContext { + var traceID: String? { + get { + self[TestTracer.TraceIDKey.self] + } + set { + self[TestTracer.TraceIDKey.self] = newValue + } + } + + var spanID: String? { + get { + self[TestTracer.SpanIDKey.self] + } + set { + self[TestTracer.SpanIDKey.self] = newValue + } + } +} + +/// Only intended to be used in single-threaded testing. +final class TestSpan: Span { + let kind: SpanKind + + private(set) var status: SpanStatus? + + let startTimestampNanosSinceEpoch: UInt64 + private(set) var endTimestampNanosSinceEpoch: UInt64? + + private(set) var recordedErrors: [(Error, SpanAttributes)] = [] + + var operationName: String + let context: ServiceContext + + private(set) var events = [SpanEvent]() { + didSet { + self.isRecording = !self.events.isEmpty + } + } + + private(set) var links = [SpanLink]() + + var attributes: SpanAttributes = [:] { + didSet { + self.isRecording = !self.attributes.isEmpty + } + } + + private(set) var isRecording = false + + let onEnd: (TestSpan) -> Void + + init( + operationName: String, + startTime: Instant, + context: ServiceContext, + kind: SpanKind, + onEnd: @escaping (TestSpan) -> Void + ) { + self.operationName = operationName + self.startTimestampNanosSinceEpoch = startTime.nanosecondsSinceEpoch + self.context = context + self.onEnd = onEnd + self.kind = kind + } + + func setStatus(_ status: SpanStatus) { + self.status = status + self.isRecording = true + } + + func addLink(_ link: SpanLink) { + self.links.append(link) + } + + func addEvent(_ event: SpanEvent) { + self.events.append(event) + } + + func recordError( + _ error: Error, + attributes: SpanAttributes, + at instant: @autoclosure () -> Instant + ) { + self.recordedErrors.append((error, attributes)) + } + + func end(at instant: @autoclosure () -> Instant) { + self.endTimestampNanosSinceEpoch = instant().nanosecondsSinceEpoch + self.onEnd(self) + } +} + +extension TestTracer: @unchecked Sendable {} // only intended for single threaded testing +extension TestSpan: @unchecked Sendable {} // only intended for single threaded testing +#endif diff --git a/Tests/ValkeyTests/ValkeyConnectionTests.swift b/Tests/ValkeyTests/ValkeyConnectionTests.swift index 3bcf47e7..1ca280c0 100644 --- a/Tests/ValkeyTests/ValkeyConnectionTests.swift +++ b/Tests/ValkeyTests/ValkeyConnectionTests.swift @@ -12,6 +12,10 @@ import NIOEmbedded import NIOPosix import Testing +#if DistributedTracingSupport +@testable import Instrumentation +#endif + @testable import Valkey @Suite @@ -486,4 +490,216 @@ struct ConnectionTests { } try await channel.close() } + + #if DistributedTracingSupport && compiler(>=6.2) // Swift Testing exit tests only added in 6.2 + @Suite(.serialized) + struct DistributedTracingTests { + @Test + @available(valkeySwift 1.0, *) + func testSingleCommandSpan() async throws { + await #expect(processExitsWith: .success, "Running in a separate process because test uses bootstrap") { + let tracer = TestTracer() + InstrumentationSystem.bootstrapInternal(tracer) + + let channel = NIOAsyncTestingChannel() + let logger = Logger(label: "test") + let connection = try await ValkeyConnection.setupChannelAndConnect(channel, logger: logger) + try await channel.processHello() + + async let fooResult = connection.get("foo").map { String(buffer: $0) } + + let outbound = try await channel.waitForOutboundWrite(as: ByteBuffer.self) + #expect(outbound == RESPToken(.command(["GET", "foo"])).base) + + try await channel.writeInbound(RESPToken(.bulkString("Bar")).base) + #expect(try await fooResult == "Bar") + + #expect(tracer.spans.count == 1) + let span = try #require(tracer.spans.first) + #expect(span.operationName == "GET") + #expect(span.kind == .client) + #expect(span.recordedErrors.isEmpty) + #expect(span.attributes == [ + "db.system.name": "valkey", + "db.operation.name": "GET", + "server.address": "127.0.0.1", + "network.peer.address": "127.0.0.1", + "network.peer.port": 6379 + ]) + #expect(span.recordedErrors.isEmpty) + #expect(span.status == nil) + } + } + + @Test + @available(valkeySwift 1.0, *) + func testSingleCommandFailureSpan() async throws { + await #expect(processExitsWith: .success, "Running in a separate process because test uses bootstrap") { + let tracer = TestTracer() + InstrumentationSystem.bootstrapInternal(tracer) + + let channel = NIOAsyncTestingChannel() + let logger = Logger(label: "test") + let connection = try await ValkeyConnection.setupChannelAndConnect(channel, logger: logger) + try await channel.processHello() + + async let fooResult = connection.get("foo") + _ = try await channel.waitForOutboundWrite(as: ByteBuffer.self) + + try await channel.writeInbound(RESPToken(.simpleError("ERR Error!")).base) + do { + _ = try await fooResult + Issue.record() + } catch let error as ValkeyClientError { + #expect(error.errorCode == .commandError) + #expect(error.message == "ERR Error!") + } + + #expect(tracer.spans.count == 1) + let span = try #require(tracer.spans.first) + #expect(span.operationName == "GET") + #expect(span.kind == .client) + #expect(span.recordedErrors.count == 1) + let error = try #require(span.recordedErrors.first) + #expect(error.0 as? ValkeyClientError == ValkeyClientError(.commandError, message: "ERR Error!")) + #expect(span.attributes == [ + "db.system.name": "valkey", + "db.operation.name": "GET", + "db.response.status_code": "ERR", + "server.address": "127.0.0.1", + "network.peer.address": "127.0.0.1", + "network.peer.port": 6379 + ]) + #expect(span.status?.code == .error) + } + } + + @Test + @available(valkeySwift 1.0, *) + func testPipelinedSameCommandsSpan() async throws { + await #expect(processExitsWith: .success, "Running in a separate process because test uses bootstrap") { + let tracer = TestTracer() + InstrumentationSystem.bootstrapInternal(tracer) + + let channel = NIOAsyncTestingChannel() + let logger = Logger(label: "test") + let connection = try await ValkeyConnection.setupChannelAndConnect(channel, logger: logger) + try await channel.processHello() + + async let results = connection.execute( + SET("foo", value: "bar"), + SET("bar", value: "foo") + ) + var outbound = try await channel.waitForOutboundWrite(as: ByteBuffer.self) + let set1 = RESPToken(.command(["SET", "foo", "bar"])).base + #expect(outbound.readSlice(length: set1.readableBytes) == set1) + #expect(outbound == RESPToken(.command(["SET", "bar", "foo"])).base) + try await channel.writeInbound(RESPToken(.simpleString("OK")).base) + try await channel.writeInbound(RESPToken(.simpleString("OK")).base) + + #expect(try await results.1.get().map { String(buffer: $0) } == "OK") + + #expect(tracer.spans.count == 1) + let span = try #require(tracer.spans.first) + #expect(span.operationName == "MULTI") + #expect(span.kind == .client) + #expect(span.recordedErrors.isEmpty) + #expect(span.attributes == [ + "db.system.name": "valkey", + "db.operation.name": "MULTI SET", + "db.operation.batch.size": 2, + "server.address": "127.0.0.1", + "network.peer.address": "127.0.0.1", + "network.peer.port": 6379 + ]) + #expect(span.recordedErrors.isEmpty) + #expect(span.status == nil) + } + } + + @Test + @available(valkeySwift 1.0, *) + func testPipelinedDifferentCommandsSpan() async throws { + await #expect(processExitsWith: .success, "Running in a separate process because test uses bootstrap") { + let tracer = TestTracer() + InstrumentationSystem.bootstrapInternal(tracer) + + let channel = NIOAsyncTestingChannel() + let logger = Logger(label: "test") + let connection = try await ValkeyConnection.setupChannelAndConnect(channel, logger: logger) + try await channel.processHello() + + async let results = connection.execute( + SET("foo", value: "bar"), + GET("foo"), + ) + var outbound = try await channel.waitForOutboundWrite(as: ByteBuffer.self) + let set = RESPToken(.command(["SET", "foo", "bar"])).base + #expect(outbound.readSlice(length: set.readableBytes) == set) + #expect(outbound == RESPToken(.command(["GET", "foo"])).base) + try await channel.writeInbound(RESPToken(.simpleString("OK")).base) + try await channel.writeInbound(RESPToken(.bulkString("bar")).base) + + #expect(try await results.1.get().map { String(buffer: $0) } == "bar") + + #expect(tracer.spans.count == 1) + let span = try #require(tracer.spans.first) + #expect(span.operationName == "MULTI") + #expect(span.kind == .client) + #expect(span.recordedErrors.isEmpty) + #expect(span.attributes == [ + "db.system.name": "valkey", + "db.operation.name": "MULTI", + "db.operation.batch.size": 2, + "server.address": "127.0.0.1", + "network.peer.address": "127.0.0.1", + "network.peer.port": 6379 + ]) + #expect(span.recordedErrors.isEmpty) + #expect(span.status == nil) + } + } + + @Test + @available(valkeySwift 1.0, *) + func testPipelinedCommandFailureSpan() async throws { + await #expect(processExitsWith: .success, "Running in a separate process because test uses bootstrap") { + let tracer = TestTracer() + InstrumentationSystem.bootstrapInternal(tracer) + + let channel = NIOAsyncTestingChannel() + let logger = Logger(label: "test") + let connection = try await ValkeyConnection.setupChannelAndConnect(channel, logger: logger) + try await channel.processHello() + + async let results = connection.execute( + SET("foo", value: "bar"), + GET("foo"), + ) + _ = try await channel.waitForOutboundWrite(as: ByteBuffer.self) + + try await channel.writeInbound(RESPToken(.simpleString("OK")).base) + try await channel.writeInbound(RESPToken(.simpleError("WRONGTYPE Error!")).base) + _ = await results + + #expect(tracer.spans.count == 1) + let span = try #require(tracer.spans.first) + #expect(span.operationName == "MULTI") + #expect(span.kind == .client) + #expect(span.recordedErrors.count == 1) + let error = try #require(span.recordedErrors.first) + #expect(error.0 as? ValkeyClientError == ValkeyClientError(.commandError, message: "WRONGTYPE Error!")) + #expect(span.attributes == [ + "db.system.name": "valkey", + "db.operation.name": "MULTI", + "db.operation.batch.size": 2, + "server.address": "127.0.0.1", + "network.peer.address": "127.0.0.1", + "network.peer.port": 6379 + ]) + #expect(span.status == nil) + } + } + } + #endif } From 061f031af016ea13410be62ba0f70eb6628993a2 Mon Sep 17 00:00:00 2001 From: Moritz Lang <16192401+slashmo@users.noreply.github.com> Date: Tue, 5 Aug 2025 21:43:39 +0200 Subject: [PATCH 03/23] Add open-telemetry example showcasing Distributed Tracing support Signed-off-by: Moritz Lang <16192401+slashmo@users.noreply.github.com> --- .gitignore | 5 +- Examples/open-telemetry/Package.swift | 29 ++++++ .../Sources/Example/Example.swift | 96 +++++++++++++++++++ Examples/open-telemetry/docker-compose.yml | 18 ++++ 4 files changed, 146 insertions(+), 2 deletions(-) create mode 100644 Examples/open-telemetry/Package.swift create mode 100644 Examples/open-telemetry/Sources/Example/Example.swift create mode 100644 Examples/open-telemetry/docker-compose.yml diff --git a/.gitignore b/.gitignore index 2c09b57c..fc58b183 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ .DS_Store -/.build +.build +.swiftpm /Packages xcuserdata/ DerivedData/ @@ -10,4 +11,4 @@ DerivedData/ Package.resolved .benchmarkBaselines/ .swift-version -.docc-build \ No newline at end of file +.docc-build diff --git a/Examples/open-telemetry/Package.swift b/Examples/open-telemetry/Package.swift new file mode 100644 index 00000000..688a6524 --- /dev/null +++ b/Examples/open-telemetry/Package.swift @@ -0,0 +1,29 @@ +// swift-tools-version:6.1 +import PackageDescription + +let package = Package( + name: "open-telemetry", + platforms: [.macOS(.v15)], + products: [ + .executable(name: "example", targets: ["Example"]) + ], + dependencies: [ + // TODO: Change to remote once Distributed Tracing support was merged into main and/or tagged + .package(path: "../../"), + .package(url: "https://github.com/hummingbird-project/hummingbird.git", from: "2.0.0"), + .package(url: "https://github.com/apple/swift-distributed-tracing.git", from: "1.0.0"), + .package(url: "https://github.com/swift-otel/swift-otel.git", exact: "1.0.0-alpha.1"), + ], + targets: [ + .executableTarget( + name: "Example", + dependencies: [ + .product(name: "Valkey", package: "valkey-swift"), + .product(name: "Hummingbird", package: "hummingbird"), + .product(name: "Tracing", package: "swift-distributed-tracing"), + .product(name: "OTel", package: "swift-otel"), + ] + ) + ], + swiftLanguageModes: [.v6] +) diff --git a/Examples/open-telemetry/Sources/Example/Example.swift b/Examples/open-telemetry/Sources/Example/Example.swift new file mode 100644 index 00000000..2cfc2e37 --- /dev/null +++ b/Examples/open-telemetry/Sources/Example/Example.swift @@ -0,0 +1,96 @@ +import Hummingbird +import Logging +import OTel +import ServiceLifecycle +import Tracing +import Valkey + +@main +struct Example { + static func main() async throws { + let observability = try bootstrapObservability() + let logger = Logger(label: "example") + + let valkeyClient = ValkeyClient( + .hostname("localhost"), + logger: logger + ) + + let router = Router() + router.add(middleware: TracingMiddleware()) + router.add(middleware: LogRequestsMiddleware(.info)) + + router.get("/:x") { _, context in + /* + This demonstrates the span created for pipelined commands where all commands are of the same type. + The `db.operation.name` indicates that it's multiple `EVAL` commands, + and `db.operation.batch.size` indicates the number of commands. + */ + _ = await valkeyClient.execute( + EVAL(script: "return '1'"), + EVAL(script: "return '2'"), + EVAL(script: "return '3'") + ) + + /* + This demonstrates the span created for pipelined commands where the commands are of different types. + The `db.operation.name` resorts to `MULTI`, and `db.operation.batch.size` indicates the number of commands. + */ + _ = await valkeyClient.execute( + EVAL(script: "return '1'"), + ACL.WHOAMI() + ) + + // This demonstrates the span created for a failed command. + _ = try? await valkeyClient.execute(EVAL(script: "💩")) + + // This demonstrates the span created for a failed pipelined command. + _ = await valkeyClient.execute( + EVAL(script: "return 'ok'"), + EVAL(script: "💩") + ) + + let x = try context.parameters.require("x", as: Int.self) + + func expensiveAlgorithm(_ x: Int) async throws -> Int { + try await withSpan("compute") { span in + span.attributes["input"] = x + try await Task.sleep(for: .seconds(3)) + return x * 2 + } + } + + if let cachedResult = try await valkeyClient.hget("values", field: "\(x)") { + return cachedResult + } + + let result = try await expensiveAlgorithm(x) + + try await valkeyClient.hset("values", data: [.init(field: "\(x)", value: "\(result)")]) + + return ByteBuffer(string: "\(result)") + } + + var app = Application(router: router) + app.addServices(observability) + app.addServices(valkeyClient) + + try await app.runService() + } + + private static func bootstrapObservability() throws -> some Service { + LoggingSystem.bootstrap( + StreamLogHandler.standardOutput(label:metadataProvider:), + metadataProvider: OTel.makeLoggingMetadataProvider() + ) + + var configuration = OTel.Configuration.default + configuration.serviceName = "example" + + // For now, valkey-swift only supports Distributed Tracing so we disable the other signals. + configuration.logs.enabled = false + configuration.metrics.enabled = false + + return try OTel.bootstrap(configuration: configuration) + } +} diff --git a/Examples/open-telemetry/docker-compose.yml b/Examples/open-telemetry/docker-compose.yml new file mode 100644 index 00000000..015d92e9 --- /dev/null +++ b/Examples/open-telemetry/docker-compose.yml @@ -0,0 +1,18 @@ +services: + valkey: + image: valkey/valkey:8.0 + ports: + - 6379:6379 + healthcheck: + test: ["CMD", "valkey-cli", "--raw", "incr", "ping"] + volumes: + - valkey_data:/data + + jaeger: + image: jaegertracing/all-in-one:latest + ports: + - 4318:4318 # OTLP/HTTP receiver + - 16686:16686 # Jaeger Web UI + +volumes: + valkey_data: From c95c599de9144055ae53f59f0ae3318200d80976 Mon Sep 17 00:00:00 2001 From: Moritz Lang <16192401+slashmo@users.noreply.github.com> Date: Tue, 5 Aug 2025 21:45:21 +0200 Subject: [PATCH 04/23] Run swift-format Signed-off-by: Moritz Lang <16192401+slashmo@users.noreply.github.com> --- .../Valkey/Connection/ValkeyConnection.swift | 2 +- Tests/ValkeyTests/ValkeyConnectionTests.swift | 94 ++++++++++--------- 2 files changed, 53 insertions(+), 43 deletions(-) diff --git a/Sources/Valkey/Connection/ValkeyConnection.swift b/Sources/Valkey/Connection/ValkeyConnection.swift index 748850bd..21b79d6a 100644 --- a/Sources/Valkey/Connection/ValkeyConnection.swift +++ b/Sources/Valkey/Connection/ValkeyConnection.swift @@ -199,7 +199,7 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable { while prefixEndIndex < message.endIndex, message[prefixEndIndex] != " " { message.formIndex(after: &prefixEndIndex) } - let prefix = message[message.startIndex ..< prefixEndIndex] + let prefix = message[message.startIndex..=6.2) // Swift Testing exit tests only added in 6.2 + #if DistributedTracingSupport && compiler(>=6.2) // Swift Testing exit tests only added in 6.2 @Suite(.serialized) struct DistributedTracingTests { @Test @@ -519,13 +519,15 @@ struct ConnectionTests { #expect(span.operationName == "GET") #expect(span.kind == .client) #expect(span.recordedErrors.isEmpty) - #expect(span.attributes == [ - "db.system.name": "valkey", - "db.operation.name": "GET", - "server.address": "127.0.0.1", - "network.peer.address": "127.0.0.1", - "network.peer.port": 6379 - ]) + #expect( + span.attributes == [ + "db.system.name": "valkey", + "db.operation.name": "GET", + "server.address": "127.0.0.1", + "network.peer.address": "127.0.0.1", + "network.peer.port": 6379, + ] + ) #expect(span.recordedErrors.isEmpty) #expect(span.status == nil) } @@ -562,14 +564,16 @@ struct ConnectionTests { #expect(span.recordedErrors.count == 1) let error = try #require(span.recordedErrors.first) #expect(error.0 as? ValkeyClientError == ValkeyClientError(.commandError, message: "ERR Error!")) - #expect(span.attributes == [ - "db.system.name": "valkey", - "db.operation.name": "GET", - "db.response.status_code": "ERR", - "server.address": "127.0.0.1", - "network.peer.address": "127.0.0.1", - "network.peer.port": 6379 - ]) + #expect( + span.attributes == [ + "db.system.name": "valkey", + "db.operation.name": "GET", + "db.response.status_code": "ERR", + "server.address": "127.0.0.1", + "network.peer.address": "127.0.0.1", + "network.peer.port": 6379, + ] + ) #expect(span.status?.code == .error) } } @@ -604,14 +608,16 @@ struct ConnectionTests { #expect(span.operationName == "MULTI") #expect(span.kind == .client) #expect(span.recordedErrors.isEmpty) - #expect(span.attributes == [ - "db.system.name": "valkey", - "db.operation.name": "MULTI SET", - "db.operation.batch.size": 2, - "server.address": "127.0.0.1", - "network.peer.address": "127.0.0.1", - "network.peer.port": 6379 - ]) + #expect( + span.attributes == [ + "db.system.name": "valkey", + "db.operation.name": "MULTI SET", + "db.operation.batch.size": 2, + "server.address": "127.0.0.1", + "network.peer.address": "127.0.0.1", + "network.peer.port": 6379, + ] + ) #expect(span.recordedErrors.isEmpty) #expect(span.status == nil) } @@ -647,14 +653,16 @@ struct ConnectionTests { #expect(span.operationName == "MULTI") #expect(span.kind == .client) #expect(span.recordedErrors.isEmpty) - #expect(span.attributes == [ - "db.system.name": "valkey", - "db.operation.name": "MULTI", - "db.operation.batch.size": 2, - "server.address": "127.0.0.1", - "network.peer.address": "127.0.0.1", - "network.peer.port": 6379 - ]) + #expect( + span.attributes == [ + "db.system.name": "valkey", + "db.operation.name": "MULTI", + "db.operation.batch.size": 2, + "server.address": "127.0.0.1", + "network.peer.address": "127.0.0.1", + "network.peer.port": 6379, + ] + ) #expect(span.recordedErrors.isEmpty) #expect(span.status == nil) } @@ -689,14 +697,16 @@ struct ConnectionTests { #expect(span.recordedErrors.count == 1) let error = try #require(span.recordedErrors.first) #expect(error.0 as? ValkeyClientError == ValkeyClientError(.commandError, message: "WRONGTYPE Error!")) - #expect(span.attributes == [ - "db.system.name": "valkey", - "db.operation.name": "MULTI", - "db.operation.batch.size": 2, - "server.address": "127.0.0.1", - "network.peer.address": "127.0.0.1", - "network.peer.port": 6379 - ]) + #expect( + span.attributes == [ + "db.system.name": "valkey", + "db.operation.name": "MULTI", + "db.operation.batch.size": 2, + "server.address": "127.0.0.1", + "network.peer.address": "127.0.0.1", + "network.peer.port": 6379, + ] + ) #expect(span.status == nil) } } From ef47244ac46690179a3bb29331d4bb9ae82a5095 Mon Sep 17 00:00:00 2001 From: Moritz Lang <16192401+slashmo@users.noreply.github.com> Date: Tue, 5 Aug 2025 22:37:28 +0200 Subject: [PATCH 05/23] Document open-telemetry example Signed-off-by: Moritz Lang <16192401+slashmo@users.noreply.github.com> --- Examples/open-telemetry/README.md | 128 ++++++++++++++++++ .../Sources/Example/Example.swift | 45 ++---- 2 files changed, 143 insertions(+), 30 deletions(-) create mode 100644 Examples/open-telemetry/README.md diff --git a/Examples/open-telemetry/README.md b/Examples/open-telemetry/README.md new file mode 100644 index 00000000..c583128b --- /dev/null +++ b/Examples/open-telemetry/README.md @@ -0,0 +1,128 @@ +# OpenTelemetry example + +An example HTTP server that uses a Valkey client, both of which emit Distributed Tracing spans +via [Swift OTel](https://github.com/swift-otel/swift-otel). + +## Overview + +This example bootstraps Swift OTel to export Distributed Tracing spans to Jaeger. + +It then starts a Hummingbird HTTP server along with its associated middleware for instrumentation. + +Finally, the server uses a Valkey client in its request handler to demonstrate the spans +created by executing various Valkey commands. + +## Testing + +The example uses [Docker Compose](https://docs.docker.com/compose) to run a Valkey server alongside Jaeger to collect +and visualize the spans from the HTTP server and Valkey client, which is running on your local machine. + +### Running Valkey and Jaeger + +In one terminal window, run the following command: + +```console +% docker compose up +[+] Running 4/4 + ✔ Network open-telemetry_default Created 0.0s + ✔ Volume "open-telemetry_valkey_data" Created 0.0s + ✔ Container open-telemetry-jaeger-1 Created 0.0s + ✔ Container open-telemetry-valkey-1 Created 0.0s +... +``` + +### Running the server + +Now, in another terminal, run the server locally using the following command: + +```console +% swift run +``` + +### Making some requests + +Finally, in a third terminal, make a request to the server: + +```console +% curl http://localhost:8080/compute/42 +``` + +The example server fakes an expensive algorithm which is hard-coded to take a couple of seconds to complete. +That's why the first request will take a decent amount of time. + +Now, make the same request again: + +```console +% curl http://localhost:8080/compute/42 +``` + +You should see that it returns instantaniously. We successfully cached the previously computed value in Valkey +and can now read it from the cache instead of re-computing it each time. + +### Visualizing the traces using Jaeger UI + +Visit Jaeger UI in your browser at [localhost:16686](http://localhost:16686). + +Select `example` from the dropdown and click `Find Traces`. + +You should see a handful of traces, including: + +#### `/compute/{x}` with an execution time of ~ 3.2 seconds + +This corresponds to the first request to `/42` where we had to compute the value. Click on this trace to reveal +its spans. The root span represents our entire Hummingbird request handling. Nested inside are three child spans: + +1. `HGET`: Shows the `HGET` Valkey command used to look up the cached value for `42`. +2. `compute`: Represents our expensive algorithm. We can see that this takes up the majority of the entire trace. +3. `HSET`: Shows the `HSET` Valkey command sent to store the computed value for future retrieval. + +#### `/compute/{x}` with an execution time of a few milliseconds + +This span corresponds to a subsequent request to `/42` where we could utelize our cache to avoid the +expensive computation. Click on this trace to reveal its spans. Like before, the root span represents +the Hummingbird request handling. We can also see a single child span: + +1. `HGET`: Shows the `HGET` Valkey command used to look up the cached value for `42`. + +### Making some more requests + +The example also comes with a few more API endpoints to demonstrate other Valkey commands: + +#### Pipelined commands + +Send the following request to kick off multiple pipelined commands: + +```console +% curl http://localhost:8080/multi +``` + +This will run three pipelined `EVAL` commands and produces a trace made up of the following spans: + +1. `/multi`: The Hummingbird request handling. +2. `MULTI`: The Valkey client span representing the execution of the pipelined commands. + +Click on the `MULTI` span to reveal its attributes. New here are the following two attributes: + +- `db.operation.batch.size`: This is set to `3` and represents the number of pipelined commands. +- `db.operation.name`: This is set to `MULTI EVAL`, showing that the pipeline consists only of `EVAL` commands. + +#### Failing commands + +Send the following request to send some gibberish to Valkey resulting in an error: + +```console +% curl http://localhost:8080/error +``` + +This will send an `EVAL` command with invalid script contents (`EVAL not a script`) resulting in a trace +made up of the following spans: + +1. `/error`: The Hummingbird request handling. +2. `EVAL`: The Valkey client span representing the failed `EVAL` command. + +Click on the `EVAL` span to reveal its attributes. New here are the following two attributes: + +- `db.response.status_code`: This is set to `ERR` and represents the prefix of the simple error returned +by Valkey. +- `error`: This is set to `true` indicating that the operation failed. In Jaeger, this is additionally displayed +via a red exclamation mark next to the span name. diff --git a/Examples/open-telemetry/Sources/Example/Example.swift b/Examples/open-telemetry/Sources/Example/Example.swift index 2cfc2e37..6e96a42c 100644 --- a/Examples/open-telemetry/Sources/Example/Example.swift +++ b/Examples/open-telemetry/Sources/Example/Example.swift @@ -20,36 +20,7 @@ struct Example { router.add(middleware: TracingMiddleware()) router.add(middleware: LogRequestsMiddleware(.info)) - router.get("/:x") { _, context in - /* - This demonstrates the span created for pipelined commands where all commands are of the same type. - The `db.operation.name` indicates that it's multiple `EVAL` commands, - and `db.operation.batch.size` indicates the number of commands. - */ - _ = await valkeyClient.execute( - EVAL(script: "return '1'"), - EVAL(script: "return '2'"), - EVAL(script: "return '3'") - ) - - /* - This demonstrates the span created for pipelined commands where the commands are of different types. - The `db.operation.name` resorts to `MULTI`, and `db.operation.batch.size` indicates the number of commands. - */ - _ = await valkeyClient.execute( - EVAL(script: "return '1'"), - ACL.WHOAMI() - ) - - // This demonstrates the span created for a failed command. - _ = try? await valkeyClient.execute(EVAL(script: "💩")) - - // This demonstrates the span created for a failed pipelined command. - _ = await valkeyClient.execute( - EVAL(script: "return 'ok'"), - EVAL(script: "💩") - ) - + router.get("/compute/:x") { _, context in let x = try context.parameters.require("x", as: Int.self) func expensiveAlgorithm(_ x: Int) async throws -> Int { @@ -71,6 +42,20 @@ struct Example { return ByteBuffer(string: "\(result)") } + router.get("/multi") { _, _ in + _ = await valkeyClient.execute( + EVAL(script: "return '1'"), + EVAL(script: "return '2'"), + EVAL(script: "return '3'") + ) + return HTTPResponse.Status.ok + } + + router.get("/error") { _, _ in + _ = try? await valkeyClient.eval(script: "not a script") + return HTTPResponse.Status.ok + } + var app = Application(router: router) app.addServices(observability) app.addServices(valkeyClient) From a4e877242c2b47aeb6495b8781fc84f18cac2701 Mon Sep 17 00:00:00 2001 From: Moritz Lang <16192401+slashmo@users.noreply.github.com> Date: Tue, 5 Aug 2025 23:02:40 +0200 Subject: [PATCH 06/23] Remove left-over TODO Signed-off-by: Moritz Lang <16192401+slashmo@users.noreply.github.com> --- Sources/Valkey/Connection/ValkeyConnection.swift | 1 - 1 file changed, 1 deletion(-) diff --git a/Sources/Valkey/Connection/ValkeyConnection.swift b/Sources/Valkey/Connection/ValkeyConnection.swift index 21b79d6a..021d911d 100644 --- a/Sources/Valkey/Connection/ValkeyConnection.swift +++ b/Sources/Valkey/Connection/ValkeyConnection.swift @@ -296,7 +296,6 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable { @usableFromInline func applyCommonAttributes(to attributes: inout SpanAttributes) { - // TODO: Should this be redis as recommended by OTel semconv or valkey as seen in valkey-go? attributes["db.system.name"] = "valkey" attributes["network.peer.address"] = channel.remoteAddress?.ipAddress attributes["network.peer.port"] = channel.remoteAddress?.port From db86f71b1d0dd13d67547ca1e1d631850f46770b Mon Sep 17 00:00:00 2001 From: Moritz Lang <16192401+slashmo@users.noreply.github.com> Date: Wed, 6 Aug 2025 10:23:42 +0200 Subject: [PATCH 07/23] Re-format docker-compose.yml Signed-off-by: Moritz Lang <16192401+slashmo@users.noreply.github.com> --- Examples/open-telemetry/docker-compose.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Examples/open-telemetry/docker-compose.yml b/Examples/open-telemetry/docker-compose.yml index 015d92e9..e585447d 100644 --- a/Examples/open-telemetry/docker-compose.yml +++ b/Examples/open-telemetry/docker-compose.yml @@ -11,8 +11,8 @@ services: jaeger: image: jaegertracing/all-in-one:latest ports: - - 4318:4318 # OTLP/HTTP receiver - - 16686:16686 # Jaeger Web UI + - 4318:4318 # OTLP/HTTP receiver + - 16686:16686 # Jaeger Web UI volumes: valkey_data: From a6e27b7887565fd4b28d51891fd93a24d6c7bfb3 Mon Sep 17 00:00:00 2001 From: Moritz Lang <16192401+slashmo@users.noreply.github.com> Date: Wed, 6 Aug 2025 10:25:32 +0200 Subject: [PATCH 08/23] Re-format ValkeyConnectionTests.swift Signed-off-by: Moritz Lang <16192401+slashmo@users.noreply.github.com> --- Tests/ValkeyTests/ValkeyConnectionTests.swift | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Tests/ValkeyTests/ValkeyConnectionTests.swift b/Tests/ValkeyTests/ValkeyConnectionTests.swift index 06c97f28..6d1f755e 100644 --- a/Tests/ValkeyTests/ValkeyConnectionTests.swift +++ b/Tests/ValkeyTests/ValkeyConnectionTests.swift @@ -637,7 +637,7 @@ struct ConnectionTests { async let results = connection.execute( SET("foo", value: "bar"), - GET("foo"), + GET("foo") ) var outbound = try await channel.waitForOutboundWrite(as: ByteBuffer.self) let set = RESPToken(.command(["SET", "foo", "bar"])).base @@ -682,7 +682,7 @@ struct ConnectionTests { async let results = connection.execute( SET("foo", value: "bar"), - GET("foo"), + GET("foo") ) _ = try await channel.waitForOutboundWrite(as: ByteBuffer.self) From 29d5ba031d288d67a755d3a318e598be0de04708 Mon Sep 17 00:00:00 2001 From: Moritz Lang <16192401+slashmo@users.noreply.github.com> Date: Wed, 6 Aug 2025 11:21:22 +0200 Subject: [PATCH 09/23] Clean up service bootstrap in open-telemetry example Signed-off-by: Moritz Lang <16192401+slashmo@users.noreply.github.com> --- Examples/open-telemetry/Sources/Example/Example.swift | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/Examples/open-telemetry/Sources/Example/Example.swift b/Examples/open-telemetry/Sources/Example/Example.swift index 6e96a42c..961c00c8 100644 --- a/Examples/open-telemetry/Sources/Example/Example.swift +++ b/Examples/open-telemetry/Sources/Example/Example.swift @@ -57,8 +57,7 @@ struct Example { } var app = Application(router: router) - app.addServices(observability) - app.addServices(valkeyClient) + app.addServices(observability, valkeyClient) try await app.runService() } From a63dc6b4f13af1c7c77b0901fc63c491812b6bb5 Mon Sep 17 00:00:00 2001 From: Moritz Lang <16192401+slashmo@users.noreply.github.com> Date: Wed, 6 Aug 2025 18:44:31 +0200 Subject: [PATCH 10/23] Remove open-telemetry example Signed-off-by: Moritz Lang <16192401+slashmo@users.noreply.github.com> --- Examples/open-telemetry/Package.swift | 29 ---- Examples/open-telemetry/README.md | 128 ------------------ .../Sources/Example/Example.swift | 80 ----------- Examples/open-telemetry/docker-compose.yml | 18 --- 4 files changed, 255 deletions(-) delete mode 100644 Examples/open-telemetry/Package.swift delete mode 100644 Examples/open-telemetry/README.md delete mode 100644 Examples/open-telemetry/Sources/Example/Example.swift delete mode 100644 Examples/open-telemetry/docker-compose.yml diff --git a/Examples/open-telemetry/Package.swift b/Examples/open-telemetry/Package.swift deleted file mode 100644 index 688a6524..00000000 --- a/Examples/open-telemetry/Package.swift +++ /dev/null @@ -1,29 +0,0 @@ -// swift-tools-version:6.1 -import PackageDescription - -let package = Package( - name: "open-telemetry", - platforms: [.macOS(.v15)], - products: [ - .executable(name: "example", targets: ["Example"]) - ], - dependencies: [ - // TODO: Change to remote once Distributed Tracing support was merged into main and/or tagged - .package(path: "../../"), - .package(url: "https://github.com/hummingbird-project/hummingbird.git", from: "2.0.0"), - .package(url: "https://github.com/apple/swift-distributed-tracing.git", from: "1.0.0"), - .package(url: "https://github.com/swift-otel/swift-otel.git", exact: "1.0.0-alpha.1"), - ], - targets: [ - .executableTarget( - name: "Example", - dependencies: [ - .product(name: "Valkey", package: "valkey-swift"), - .product(name: "Hummingbird", package: "hummingbird"), - .product(name: "Tracing", package: "swift-distributed-tracing"), - .product(name: "OTel", package: "swift-otel"), - ] - ) - ], - swiftLanguageModes: [.v6] -) diff --git a/Examples/open-telemetry/README.md b/Examples/open-telemetry/README.md deleted file mode 100644 index c583128b..00000000 --- a/Examples/open-telemetry/README.md +++ /dev/null @@ -1,128 +0,0 @@ -# OpenTelemetry example - -An example HTTP server that uses a Valkey client, both of which emit Distributed Tracing spans -via [Swift OTel](https://github.com/swift-otel/swift-otel). - -## Overview - -This example bootstraps Swift OTel to export Distributed Tracing spans to Jaeger. - -It then starts a Hummingbird HTTP server along with its associated middleware for instrumentation. - -Finally, the server uses a Valkey client in its request handler to demonstrate the spans -created by executing various Valkey commands. - -## Testing - -The example uses [Docker Compose](https://docs.docker.com/compose) to run a Valkey server alongside Jaeger to collect -and visualize the spans from the HTTP server and Valkey client, which is running on your local machine. - -### Running Valkey and Jaeger - -In one terminal window, run the following command: - -```console -% docker compose up -[+] Running 4/4 - ✔ Network open-telemetry_default Created 0.0s - ✔ Volume "open-telemetry_valkey_data" Created 0.0s - ✔ Container open-telemetry-jaeger-1 Created 0.0s - ✔ Container open-telemetry-valkey-1 Created 0.0s -... -``` - -### Running the server - -Now, in another terminal, run the server locally using the following command: - -```console -% swift run -``` - -### Making some requests - -Finally, in a third terminal, make a request to the server: - -```console -% curl http://localhost:8080/compute/42 -``` - -The example server fakes an expensive algorithm which is hard-coded to take a couple of seconds to complete. -That's why the first request will take a decent amount of time. - -Now, make the same request again: - -```console -% curl http://localhost:8080/compute/42 -``` - -You should see that it returns instantaniously. We successfully cached the previously computed value in Valkey -and can now read it from the cache instead of re-computing it each time. - -### Visualizing the traces using Jaeger UI - -Visit Jaeger UI in your browser at [localhost:16686](http://localhost:16686). - -Select `example` from the dropdown and click `Find Traces`. - -You should see a handful of traces, including: - -#### `/compute/{x}` with an execution time of ~ 3.2 seconds - -This corresponds to the first request to `/42` where we had to compute the value. Click on this trace to reveal -its spans. The root span represents our entire Hummingbird request handling. Nested inside are three child spans: - -1. `HGET`: Shows the `HGET` Valkey command used to look up the cached value for `42`. -2. `compute`: Represents our expensive algorithm. We can see that this takes up the majority of the entire trace. -3. `HSET`: Shows the `HSET` Valkey command sent to store the computed value for future retrieval. - -#### `/compute/{x}` with an execution time of a few milliseconds - -This span corresponds to a subsequent request to `/42` where we could utelize our cache to avoid the -expensive computation. Click on this trace to reveal its spans. Like before, the root span represents -the Hummingbird request handling. We can also see a single child span: - -1. `HGET`: Shows the `HGET` Valkey command used to look up the cached value for `42`. - -### Making some more requests - -The example also comes with a few more API endpoints to demonstrate other Valkey commands: - -#### Pipelined commands - -Send the following request to kick off multiple pipelined commands: - -```console -% curl http://localhost:8080/multi -``` - -This will run three pipelined `EVAL` commands and produces a trace made up of the following spans: - -1. `/multi`: The Hummingbird request handling. -2. `MULTI`: The Valkey client span representing the execution of the pipelined commands. - -Click on the `MULTI` span to reveal its attributes. New here are the following two attributes: - -- `db.operation.batch.size`: This is set to `3` and represents the number of pipelined commands. -- `db.operation.name`: This is set to `MULTI EVAL`, showing that the pipeline consists only of `EVAL` commands. - -#### Failing commands - -Send the following request to send some gibberish to Valkey resulting in an error: - -```console -% curl http://localhost:8080/error -``` - -This will send an `EVAL` command with invalid script contents (`EVAL not a script`) resulting in a trace -made up of the following spans: - -1. `/error`: The Hummingbird request handling. -2. `EVAL`: The Valkey client span representing the failed `EVAL` command. - -Click on the `EVAL` span to reveal its attributes. New here are the following two attributes: - -- `db.response.status_code`: This is set to `ERR` and represents the prefix of the simple error returned -by Valkey. -- `error`: This is set to `true` indicating that the operation failed. In Jaeger, this is additionally displayed -via a red exclamation mark next to the span name. diff --git a/Examples/open-telemetry/Sources/Example/Example.swift b/Examples/open-telemetry/Sources/Example/Example.swift deleted file mode 100644 index 961c00c8..00000000 --- a/Examples/open-telemetry/Sources/Example/Example.swift +++ /dev/null @@ -1,80 +0,0 @@ -import Hummingbird -import Logging -import OTel -import ServiceLifecycle -import Tracing -import Valkey - -@main -struct Example { - static func main() async throws { - let observability = try bootstrapObservability() - let logger = Logger(label: "example") - - let valkeyClient = ValkeyClient( - .hostname("localhost"), - logger: logger - ) - - let router = Router() - router.add(middleware: TracingMiddleware()) - router.add(middleware: LogRequestsMiddleware(.info)) - - router.get("/compute/:x") { _, context in - let x = try context.parameters.require("x", as: Int.self) - - func expensiveAlgorithm(_ x: Int) async throws -> Int { - try await withSpan("compute") { span in - span.attributes["input"] = x - try await Task.sleep(for: .seconds(3)) - return x * 2 - } - } - - if let cachedResult = try await valkeyClient.hget("values", field: "\(x)") { - return cachedResult - } - - let result = try await expensiveAlgorithm(x) - - try await valkeyClient.hset("values", data: [.init(field: "\(x)", value: "\(result)")]) - - return ByteBuffer(string: "\(result)") - } - - router.get("/multi") { _, _ in - _ = await valkeyClient.execute( - EVAL(script: "return '1'"), - EVAL(script: "return '2'"), - EVAL(script: "return '3'") - ) - return HTTPResponse.Status.ok - } - - router.get("/error") { _, _ in - _ = try? await valkeyClient.eval(script: "not a script") - return HTTPResponse.Status.ok - } - - var app = Application(router: router) - app.addServices(observability, valkeyClient) - - try await app.runService() - } - - private static func bootstrapObservability() throws -> some Service { - LoggingSystem.bootstrap( - StreamLogHandler.standardOutput(label:metadataProvider:), - metadataProvider: OTel.makeLoggingMetadataProvider() - ) - - var configuration = OTel.Configuration.default - configuration.serviceName = "example" - - // For now, valkey-swift only supports Distributed Tracing so we disable the other signals. - configuration.logs.enabled = false - configuration.metrics.enabled = false - - return try OTel.bootstrap(configuration: configuration) - } -} diff --git a/Examples/open-telemetry/docker-compose.yml b/Examples/open-telemetry/docker-compose.yml deleted file mode 100644 index e585447d..00000000 --- a/Examples/open-telemetry/docker-compose.yml +++ /dev/null @@ -1,18 +0,0 @@ -services: - valkey: - image: valkey/valkey:8.0 - ports: - - 6379:6379 - healthcheck: - test: ["CMD", "valkey-cli", "--raw", "incr", "ping"] - volumes: - - valkey_data:/data - - jaeger: - image: jaegertracing/all-in-one:latest - ports: - - 4318:4318 # OTLP/HTTP receiver - - 16686:16686 # Jaeger Web UI - -volumes: - valkey_data: From baff60ce71d04a0975b09d667657ce1e3b662322 Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Wed, 27 Aug 2025 14:38:46 +0200 Subject: [PATCH 11/23] Improve tracing support Signed-off-by: Fabian Fett --- .../ValkeyConnectionBenchmark.swift | 45 +++++++++++- .../Valkey/Connection/ValkeyConnection.swift | 71 ++++++------------- .../ValkeyConnectionConfiguration.swift | 30 ++++++++ .../Valkey/ValkeyClientConfiguration.swift | 4 ++ Sources/Valkey/ValkeyConnectionFactory.swift | 10 ++- 5 files changed, 109 insertions(+), 51 deletions(-) diff --git a/Benchmarks/ValkeyBenchmarks/ValkeyConnectionBenchmark.swift b/Benchmarks/ValkeyBenchmarks/ValkeyConnectionBenchmark.swift index 63df7890..e3a4de77 100644 --- a/Benchmarks/ValkeyBenchmarks/ValkeyConnectionBenchmark.swift +++ b/Benchmarks/ValkeyBenchmarks/ValkeyConnectionBenchmark.swift @@ -12,12 +12,18 @@ import Logging import NIOCore import NIOPosix import Synchronization +#if DistributedTracingSupport +import Tracing +#endif import Valkey @available(valkeySwift 1.0, *) func connectionBenchmarks() { makeConnectionCreateAndDropBenchmark() makeConnectionGETBenchmark() + #if DistributedTracingSupport + makeConnectionGETNoOpTracerBenchmark() + #endif makeConnectionPipelineBenchmark() } @@ -58,9 +64,45 @@ func makeConnectionGETBenchmark() -> Benchmark? { return Benchmark("Connection: GET benchmark", configuration: .init(metrics: defaultMetrics, scalingFactor: .kilo)) { benchmark in let port = serverMutex.withLock { $0 }!.localAddress!.port! let logger = Logger(label: "test") + #if DistributedTracingSupport + // explicitly set tracer to nil, if trait is enabled + var configuration = ValkeyConnectionConfiguration() + configuration.tracing.tracer = nil + #endif try await ValkeyConnection.withConnection( address: .hostname("127.0.0.1", port: port), - configuration: .init(), + configuration: configuration, + logger: logger + ) { connection in + benchmark.startMeasurement() + for _ in benchmark.scaledIterations { + let foo = try await connection.get("foo") + precondition(foo.map { String(buffer: $0) } == "Bar") + } + benchmark.stopMeasurement() + } + } setup: { + let server = try await makeLocalServer() + serverMutex.withLock { $0 = server } + } teardown: { + try await serverMutex.withLock { $0 }?.close().get() + } +} + +#if DistributedTracingSupport +@available(valkeySwift 1.0, *) +@discardableResult +func makeConnectionGETNoOpTracerBenchmark() -> Benchmark? { + let serverMutex = Mutex<(any Channel)?>(nil) + + return Benchmark("Connection: GET benchmark – NoOpTracer", configuration: .init(metrics: defaultMetrics, scalingFactor: .kilo)) { benchmark in + let port = serverMutex.withLock { $0 }!.localAddress!.port! + let logger = Logger(label: "test") + var configuration = ValkeyConnectionConfiguration() + configuration.tracing.tracer = NoOpTracer() + try await ValkeyConnection.withConnection( + address: .hostname("127.0.0.1", port: port), + configuration: configuration, logger: logger ) { connection in benchmark.startMeasurement() @@ -77,6 +119,7 @@ func makeConnectionGETBenchmark() -> Benchmark? { try await serverMutex.withLock { $0 }?.close().get() } } +#endif @available(valkeySwift 1.0, *) @discardableResult diff --git a/Sources/Valkey/Connection/ValkeyConnection.swift b/Sources/Valkey/Connection/ValkeyConnection.swift index 021d911d..260547a2 100644 --- a/Sources/Valkey/Connection/ValkeyConnection.swift +++ b/Sources/Valkey/Connection/ValkeyConnection.swift @@ -33,6 +33,10 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable { public let id: ID /// Logger used by Server let logger: Logger + #if DistributedTracingSupport + @usableFromInline + let tracer: (any Tracer)? + #endif @usableFromInline let channel: any Channel @usableFromInline @@ -57,6 +61,9 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable { self.configuration = configuration self.id = connectionID self.logger = logger + #if DistributedTracingSupport + self.tracer = configuration.tracing.tracer + #endif switch address?.value { case let .hostname(host, port): self.address = (host, port) @@ -169,12 +176,11 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable { @inlinable func _execute(command: Command) async throws -> RESPToken { #if DistributedTracingSupport - let span = startSpan(Command.name, ofKind: .client) - defer { span.end() } + let span = self.tracer?.startSpan(Command.name, ofKind: .client) + defer { span?.end() } - span.updateAttributes { attributes in - attributes["db.operation.name"] = Command.name - applyCommonAttributes(to: &attributes) + span?.updateAttributes { attributes in + self.applyCommonAttributes(to: &attributes, commandName: Command.name) } #endif @@ -193,21 +199,21 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable { } } catch let error as ValkeyClientError { #if DistributedTracingSupport - span.recordError(error) + span?.recordError(error) if let message = error.message { var prefixEndIndex = message.startIndex while prefixEndIndex < message.endIndex, message[prefixEndIndex] != " " { message.formIndex(after: &prefixEndIndex) } let prefix = message[message.startIndex..( _ commands: repeat each Command ) async -> sending (repeat Result<(each Command).Response, Error>) { - #if DistributedTracingSupport - let span = startSpan("MULTI", ofKind: .client) - defer { span.end() } - - // We want to suffix the `db.operation.name` if all pipelined commands are of the same type. - var commandName: String? - var operationNameSuffix: String? - var commandCount = 0 - - for command in repeat each commands { - commandCount += 1 - if commandName == nil { - commandName = Swift.type(of: command).name - operationNameSuffix = commandName - } else if commandName != Swift.type(of: command).name { - // We should only add a suffix if all commands in the transaction are the same. - operationNameSuffix = nil - } - } - let operationName = operationNameSuffix.map { "MULTI \($0)" } ?? "MULTI" - - span.updateAttributes { attributes in - attributes["db.operation.name"] = operationName - attributes["db.operation.batch.size"] = commandCount > 1 ? commandCount : nil - applyCommonAttributes(to: &attributes) - } - #endif - func convert(_ result: Result, to: Response.Type) -> Result { - #if DistributedTracingSupport - if case .failure(let error) = result { - span.recordError(error) - } - #endif - return result.flatMap { do { return try .success(Response(fromRESP: $0)) @@ -295,12 +267,13 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable { } @usableFromInline - func applyCommonAttributes(to attributes: inout SpanAttributes) { - attributes["db.system.name"] = "valkey" - attributes["network.peer.address"] = channel.remoteAddress?.ipAddress - attributes["network.peer.port"] = channel.remoteAddress?.port - attributes["server.address"] = address?.hostOrSocketPath - attributes["server.port"] = address?.port == 6379 ? nil : address?.port + func applyCommonAttributes(to attributes: inout SpanAttributes, commandName: String) { + attributes[self.configuration.tracing.attributeNames.databaseOperationName] = commandName + attributes[self.configuration.tracing.attributeNames.databaseSystemName] = self.configuration.tracing.attributeValue.databaseSystem + attributes[self.configuration.tracing.attributeNames.networkPeerAddress] = channel.remoteAddress?.ipAddress + attributes[self.configuration.tracing.attributeNames.networkPeerPort] = channel.remoteAddress?.port + attributes[self.configuration.tracing.attributeNames.serverAddress] = address?.hostOrSocketPath + attributes[self.configuration.tracing.attributeNames.serverPort] = address?.port == 6379 ? nil : address?.port } @usableFromInline diff --git a/Sources/Valkey/Connection/ValkeyConnectionConfiguration.swift b/Sources/Valkey/Connection/ValkeyConnectionConfiguration.swift index b4869439..81c30197 100644 --- a/Sources/Valkey/Connection/ValkeyConnectionConfiguration.swift +++ b/Sources/Valkey/Connection/ValkeyConnectionConfiguration.swift @@ -7,6 +7,9 @@ // import NIOSSL +#if DistributedTracingSupport +import Tracing +#endif /// A configuration object that defines how to connect to a Valkey server. /// @@ -112,6 +115,10 @@ public struct ValkeyConnectionConfiguration: Sendable { /// Default value is `nil` (no client name is set). public var clientName: String? + #if DistributedTracingSupport + public var tracing: ValkeyTracingConfiguration = .init() + #endif + /// Creates a new Valkey connection configuration. /// /// Use this initializer to create a configuration object that can be used to establish @@ -137,3 +144,26 @@ public struct ValkeyConnectionConfiguration: Sendable { self.clientName = clientName } } + +#if DistributedTracingSupport +public struct ValkeyTracingConfiguration: Sendable { + + public var tracer: (any Tracer)? = InstrumentationSystem.tracer + + public var attributeNames: AttributeNames = .init() + public var attributeValue: AttributeValues = .init() + + public struct AttributeNames: Sendable { + public var databaseOperationName: String = "db.operation.name" + public var databaseSystemName: String = "db.system.name" + public var networkPeerAddress: String = "network.peer.address" + public var networkPeerPort: String = "network.peer.port" + public var serverAddress: String = "server.address" + public var serverPort: String = "server.port" + } + + public struct AttributeValues: Sendable { + public var databaseSystem: String = "valkey" + } +} +#endif diff --git a/Sources/Valkey/ValkeyClientConfiguration.swift b/Sources/Valkey/ValkeyClientConfiguration.swift index 1b009f2e..f435d84f 100644 --- a/Sources/Valkey/ValkeyClientConfiguration.swift +++ b/Sources/Valkey/ValkeyClientConfiguration.swift @@ -118,6 +118,10 @@ public struct ValkeyClientConfiguration: Sendable { /// The TLS to use for the Valkey connection. public var tls: TLS + #if DistributedTracingSupport + public var tracing: ValkeyTracingConfiguration = .init() + #endif + /// Creates a Valkey client connection configuration. /// /// - Parameters: diff --git a/Sources/Valkey/ValkeyConnectionFactory.swift b/Sources/Valkey/ValkeyConnectionFactory.swift index 2a64b3d8..01594abe 100644 --- a/Sources/Valkey/ValkeyConnectionFactory.swift +++ b/Sources/Valkey/ValkeyConnectionFactory.swift @@ -103,7 +103,7 @@ package final class ValkeyConnectionFactory: Sendable { try await .enable(self.cache!.getSSLContext(), tlsServerName: clientName) } - return ValkeyConnectionConfiguration( + let newConfig = ValkeyConnectionConfiguration( authentication: self.configuration.authentication.flatMap { .init(username: $0.username, password: $0.password) }, @@ -112,5 +112,13 @@ package final class ValkeyConnectionFactory: Sendable { tls: tls, clientName: nil ) + + #if DistributedTracingSupport + var mConfig = newConfig + mConfig.tracing = self.configuration.tracing + return mConfig + #else + return newConfig + #endif } } From d1b16e7a81aad89febb4fb06d5fe376545d551ad Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Wed, 27 Aug 2025 14:56:07 +0200 Subject: [PATCH 12/23] Fix tests Signed-off-by: Fabian Fett --- .../ClientIntegrationTests.swift | 2 - Tests/ValkeyTests/ValkeyConnectionTests.swift | 375 +++++++++--------- 2 files changed, 185 insertions(+), 192 deletions(-) diff --git a/Tests/IntegrationTests/ClientIntegrationTests.swift b/Tests/IntegrationTests/ClientIntegrationTests.swift index 5c007f8a..0929edc1 100644 --- a/Tests/IntegrationTests/ClientIntegrationTests.swift +++ b/Tests/IntegrationTests/ClientIntegrationTests.swift @@ -72,8 +72,6 @@ struct ClientIntegratedTests { typealias Response = String? static let name = "GET" - static let name = "GET" - var key: ValkeyKey init(key: ValkeyKey) { diff --git a/Tests/ValkeyTests/ValkeyConnectionTests.swift b/Tests/ValkeyTests/ValkeyConnectionTests.swift index 6d1f755e..1c5591c2 100644 --- a/Tests/ValkeyTests/ValkeyConnectionTests.swift +++ b/Tests/ValkeyTests/ValkeyConnectionTests.swift @@ -492,223 +492,218 @@ struct ConnectionTests { } #if DistributedTracingSupport && compiler(>=6.2) // Swift Testing exit tests only added in 6.2 - @Suite(.serialized) + @Suite struct DistributedTracingTests { @Test @available(valkeySwift 1.0, *) func testSingleCommandSpan() async throws { - await #expect(processExitsWith: .success, "Running in a separate process because test uses bootstrap") { - let tracer = TestTracer() - InstrumentationSystem.bootstrapInternal(tracer) - - let channel = NIOAsyncTestingChannel() - let logger = Logger(label: "test") - let connection = try await ValkeyConnection.setupChannelAndConnect(channel, logger: logger) - try await channel.processHello() - - async let fooResult = connection.get("foo").map { String(buffer: $0) } - - let outbound = try await channel.waitForOutboundWrite(as: ByteBuffer.self) - #expect(outbound == RESPToken(.command(["GET", "foo"])).base) - - try await channel.writeInbound(RESPToken(.bulkString("Bar")).base) - #expect(try await fooResult == "Bar") - - #expect(tracer.spans.count == 1) - let span = try #require(tracer.spans.first) - #expect(span.operationName == "GET") - #expect(span.kind == .client) - #expect(span.recordedErrors.isEmpty) - #expect( - span.attributes == [ - "db.system.name": "valkey", - "db.operation.name": "GET", - "server.address": "127.0.0.1", - "network.peer.address": "127.0.0.1", - "network.peer.port": 6379, - ] - ) - #expect(span.recordedErrors.isEmpty) - #expect(span.status == nil) - } + let tracer = TestTracer() + var config = ValkeyConnectionConfiguration() + config.tracing.tracer = tracer + + let channel = NIOAsyncTestingChannel() + let logger = Logger(label: "test") + let connection = try await ValkeyConnection.setupChannelAndConnect(channel, configuration: config, logger: logger) + try await channel.processHello() + + async let fooResult = connection.get("foo").map { String(buffer: $0) } + + let outbound = try await channel.waitForOutboundWrite(as: ByteBuffer.self) + #expect(outbound == RESPToken(.command(["GET", "foo"])).base) + + try await channel.writeInbound(RESPToken(.bulkString("Bar")).base) + #expect(try await fooResult == "Bar") + + #expect(tracer.spans.count == 1) + let span = try #require(tracer.spans.first) + #expect(span.operationName == "GET") + #expect(span.kind == .client) + #expect(span.recordedErrors.isEmpty) + #expect( + span.attributes == [ + "db.system.name": "valkey", + "db.operation.name": "GET", + "server.address": "127.0.0.1", + "network.peer.address": "127.0.0.1", + "network.peer.port": 6379, + ] + ) + #expect(span.recordedErrors.isEmpty) + #expect(span.status == nil) } @Test @available(valkeySwift 1.0, *) func testSingleCommandFailureSpan() async throws { - await #expect(processExitsWith: .success, "Running in a separate process because test uses bootstrap") { - let tracer = TestTracer() - InstrumentationSystem.bootstrapInternal(tracer) - - let channel = NIOAsyncTestingChannel() - let logger = Logger(label: "test") - let connection = try await ValkeyConnection.setupChannelAndConnect(channel, logger: logger) - try await channel.processHello() + let tracer = TestTracer() + var config = ValkeyConnectionConfiguration() + config.tracing.tracer = tracer - async let fooResult = connection.get("foo") - _ = try await channel.waitForOutboundWrite(as: ByteBuffer.self) + let channel = NIOAsyncTestingChannel() + let logger = Logger(label: "test") + let connection = try await ValkeyConnection.setupChannelAndConnect(channel, configuration: config, logger: logger) + try await channel.processHello() - try await channel.writeInbound(RESPToken(.simpleError("ERR Error!")).base) - do { - _ = try await fooResult - Issue.record() - } catch let error as ValkeyClientError { - #expect(error.errorCode == .commandError) - #expect(error.message == "ERR Error!") - } + async let fooResult = connection.get("foo") + _ = try await channel.waitForOutboundWrite(as: ByteBuffer.self) - #expect(tracer.spans.count == 1) - let span = try #require(tracer.spans.first) - #expect(span.operationName == "GET") - #expect(span.kind == .client) - #expect(span.recordedErrors.count == 1) - let error = try #require(span.recordedErrors.first) - #expect(error.0 as? ValkeyClientError == ValkeyClientError(.commandError, message: "ERR Error!")) - #expect( - span.attributes == [ - "db.system.name": "valkey", - "db.operation.name": "GET", - "db.response.status_code": "ERR", - "server.address": "127.0.0.1", - "network.peer.address": "127.0.0.1", - "network.peer.port": 6379, - ] - ) - #expect(span.status?.code == .error) + try await channel.writeInbound(RESPToken(.simpleError("ERR Error!")).base) + do { + _ = try await fooResult + Issue.record() + } catch let error as ValkeyClientError { + #expect(error.errorCode == .commandError) + #expect(error.message == "ERR Error!") } + + #expect(tracer.spans.count == 1) + let span = try #require(tracer.spans.first) + #expect(span.operationName == "GET") + #expect(span.kind == .client) + #expect(span.recordedErrors.count == 1) + let error = try #require(span.recordedErrors.first) + #expect(error.0 as? ValkeyClientError == ValkeyClientError(.commandError, message: "ERR Error!")) + #expect( + span.attributes == [ + "db.system.name": "valkey", + "db.operation.name": "GET", + "db.response.status_code": "ERR", + "server.address": "127.0.0.1", + "network.peer.address": "127.0.0.1", + "network.peer.port": 6379, + ] + ) + #expect(span.status?.code == .error) } - @Test + @Test(.disabled("Pipeline support not implemented yet")) @available(valkeySwift 1.0, *) func testPipelinedSameCommandsSpan() async throws { - await #expect(processExitsWith: .success, "Running in a separate process because test uses bootstrap") { - let tracer = TestTracer() - InstrumentationSystem.bootstrapInternal(tracer) - - let channel = NIOAsyncTestingChannel() - let logger = Logger(label: "test") - let connection = try await ValkeyConnection.setupChannelAndConnect(channel, logger: logger) - try await channel.processHello() + let tracer = TestTracer() + var config = ValkeyConnectionConfiguration() + config.tracing.tracer = tracer + + let channel = NIOAsyncTestingChannel() + let logger = Logger(label: "test") + let connection = try await ValkeyConnection.setupChannelAndConnect(channel, configuration: config, logger: logger) + try await channel.processHello() + + async let results = connection.execute( + SET("foo", value: "bar"), + SET("bar", value: "foo") + ) + var outbound = try await channel.waitForOutboundWrite(as: ByteBuffer.self) + let set1 = RESPToken(.command(["SET", "foo", "bar"])).base + #expect(outbound.readSlice(length: set1.readableBytes) == set1) + #expect(outbound == RESPToken(.command(["SET", "bar", "foo"])).base) + try await channel.writeInbound(RESPToken(.simpleString("OK")).base) + try await channel.writeInbound(RESPToken(.simpleString("OK")).base) - async let results = connection.execute( - SET("foo", value: "bar"), - SET("bar", value: "foo") - ) - var outbound = try await channel.waitForOutboundWrite(as: ByteBuffer.self) - let set1 = RESPToken(.command(["SET", "foo", "bar"])).base - #expect(outbound.readSlice(length: set1.readableBytes) == set1) - #expect(outbound == RESPToken(.command(["SET", "bar", "foo"])).base) - try await channel.writeInbound(RESPToken(.simpleString("OK")).base) - try await channel.writeInbound(RESPToken(.simpleString("OK")).base) - - #expect(try await results.1.get().map { String(buffer: $0) } == "OK") - - #expect(tracer.spans.count == 1) - let span = try #require(tracer.spans.first) - #expect(span.operationName == "MULTI") - #expect(span.kind == .client) - #expect(span.recordedErrors.isEmpty) - #expect( - span.attributes == [ - "db.system.name": "valkey", - "db.operation.name": "MULTI SET", - "db.operation.batch.size": 2, - "server.address": "127.0.0.1", - "network.peer.address": "127.0.0.1", - "network.peer.port": 6379, - ] - ) - #expect(span.recordedErrors.isEmpty) - #expect(span.status == nil) - } + #expect(try await results.1.get().map { String(buffer: $0) } == "OK") + + #expect(tracer.spans.count == 1) + let span = try #require(tracer.spans.first) + #expect(span.operationName == "MULTI") + #expect(span.kind == .client) + #expect(span.recordedErrors.isEmpty) + #expect( + span.attributes == [ + "db.system.name": "valkey", + "db.operation.name": "MULTI SET", + "db.operation.batch.size": 2, + "server.address": "127.0.0.1", + "network.peer.address": "127.0.0.1", + "network.peer.port": 6379, + ] + ) + #expect(span.recordedErrors.isEmpty) + #expect(span.status == nil) } - @Test + @Test(.disabled("Pipeline support not implemented yet")) @available(valkeySwift 1.0, *) func testPipelinedDifferentCommandsSpan() async throws { - await #expect(processExitsWith: .success, "Running in a separate process because test uses bootstrap") { - let tracer = TestTracer() - InstrumentationSystem.bootstrapInternal(tracer) - - let channel = NIOAsyncTestingChannel() - let logger = Logger(label: "test") - let connection = try await ValkeyConnection.setupChannelAndConnect(channel, logger: logger) - try await channel.processHello() - - async let results = connection.execute( - SET("foo", value: "bar"), - GET("foo") - ) - var outbound = try await channel.waitForOutboundWrite(as: ByteBuffer.self) - let set = RESPToken(.command(["SET", "foo", "bar"])).base - #expect(outbound.readSlice(length: set.readableBytes) == set) - #expect(outbound == RESPToken(.command(["GET", "foo"])).base) - try await channel.writeInbound(RESPToken(.simpleString("OK")).base) - try await channel.writeInbound(RESPToken(.bulkString("bar")).base) - - #expect(try await results.1.get().map { String(buffer: $0) } == "bar") - - #expect(tracer.spans.count == 1) - let span = try #require(tracer.spans.first) - #expect(span.operationName == "MULTI") - #expect(span.kind == .client) - #expect(span.recordedErrors.isEmpty) - #expect( - span.attributes == [ - "db.system.name": "valkey", - "db.operation.name": "MULTI", - "db.operation.batch.size": 2, - "server.address": "127.0.0.1", - "network.peer.address": "127.0.0.1", - "network.peer.port": 6379, - ] - ) - #expect(span.recordedErrors.isEmpty) - #expect(span.status == nil) - } + let tracer = TestTracer() + var config = ValkeyConnectionConfiguration() + config.tracing.tracer = tracer + + let channel = NIOAsyncTestingChannel() + let logger = Logger(label: "test") + let connection = try await ValkeyConnection.setupChannelAndConnect(channel, configuration: config, logger: logger) + try await channel.processHello() + + async let results = connection.execute( + SET("foo", value: "bar"), + GET("foo") + ) + var outbound = try await channel.waitForOutboundWrite(as: ByteBuffer.self) + let set = RESPToken(.command(["SET", "foo", "bar"])).base + #expect(outbound.readSlice(length: set.readableBytes) == set) + #expect(outbound == RESPToken(.command(["GET", "foo"])).base) + try await channel.writeInbound(RESPToken(.simpleString("OK")).base) + try await channel.writeInbound(RESPToken(.bulkString("bar")).base) + + #expect(try await results.1.get().map { String(buffer: $0) } == "bar") + + #expect(tracer.spans.count == 1) + let span = try #require(tracer.spans.first) + #expect(span.operationName == "MULTI") + #expect(span.kind == .client) + #expect(span.recordedErrors.isEmpty) + #expect( + span.attributes == [ + "db.system.name": "valkey", + "db.operation.name": "MULTI", + "db.operation.batch.size": 2, + "server.address": "127.0.0.1", + "network.peer.address": "127.0.0.1", + "network.peer.port": 6379, + ] + ) + #expect(span.recordedErrors.isEmpty) + #expect(span.status == nil) } - @Test + @Test(.disabled("Pipeline support not implemented yet")) @available(valkeySwift 1.0, *) func testPipelinedCommandFailureSpan() async throws { - await #expect(processExitsWith: .success, "Running in a separate process because test uses bootstrap") { - let tracer = TestTracer() - InstrumentationSystem.bootstrapInternal(tracer) - - let channel = NIOAsyncTestingChannel() - let logger = Logger(label: "test") - let connection = try await ValkeyConnection.setupChannelAndConnect(channel, logger: logger) - try await channel.processHello() - - async let results = connection.execute( - SET("foo", value: "bar"), - GET("foo") - ) - _ = try await channel.waitForOutboundWrite(as: ByteBuffer.self) + let tracer = TestTracer() + var config = ValkeyConnectionConfiguration() + config.tracing.tracer = tracer + + let channel = NIOAsyncTestingChannel() + let logger = Logger(label: "test") + let connection = try await ValkeyConnection.setupChannelAndConnect(channel, configuration: config, logger: logger) + try await channel.processHello() + + async let results = connection.execute( + SET("foo", value: "bar"), + GET("foo") + ) + _ = try await channel.waitForOutboundWrite(as: ByteBuffer.self) - try await channel.writeInbound(RESPToken(.simpleString("OK")).base) - try await channel.writeInbound(RESPToken(.simpleError("WRONGTYPE Error!")).base) - _ = await results - - #expect(tracer.spans.count == 1) - let span = try #require(tracer.spans.first) - #expect(span.operationName == "MULTI") - #expect(span.kind == .client) - #expect(span.recordedErrors.count == 1) - let error = try #require(span.recordedErrors.first) - #expect(error.0 as? ValkeyClientError == ValkeyClientError(.commandError, message: "WRONGTYPE Error!")) - #expect( - span.attributes == [ - "db.system.name": "valkey", - "db.operation.name": "MULTI", - "db.operation.batch.size": 2, - "server.address": "127.0.0.1", - "network.peer.address": "127.0.0.1", - "network.peer.port": 6379, - ] - ) - #expect(span.status == nil) - } + try await channel.writeInbound(RESPToken(.simpleString("OK")).base) + try await channel.writeInbound(RESPToken(.simpleError("WRONGTYPE Error!")).base) + _ = await results + + #expect(tracer.spans.count == 1) + let span = try #require(tracer.spans.first) + #expect(span.operationName == "MULTI") + #expect(span.kind == .client) + #expect(span.recordedErrors.count == 1) + let error = try #require(span.recordedErrors.first) + #expect(error.0 as? ValkeyClientError == ValkeyClientError(.commandError, message: "WRONGTYPE Error!")) + #expect( + span.attributes == [ + "db.system.name": "valkey", + "db.operation.name": "MULTI", + "db.operation.batch.size": 2, + "server.address": "127.0.0.1", + "network.peer.address": "127.0.0.1", + "network.peer.port": 6379, + ] + ) + #expect(span.status == nil) } } #endif From ada0aa246a92491160998e595483fbfd5ff63945 Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Wed, 27 Aug 2025 14:58:27 +0200 Subject: [PATCH 13/23] swift-format Signed-off-by: Fabian Fett --- Sources/Valkey/Connection/ValkeyConnection.swift | 2 +- Sources/Valkey/Connection/ValkeyConnectionConfiguration.swift | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/Sources/Valkey/Connection/ValkeyConnection.swift b/Sources/Valkey/Connection/ValkeyConnection.swift index 260547a2..6fce7ff1 100644 --- a/Sources/Valkey/Connection/ValkeyConnection.swift +++ b/Sources/Valkey/Connection/ValkeyConnection.swift @@ -231,7 +231,7 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable { _ commands: repeat each Command ) async -> sending (repeat Result<(each Command).Response, Error>) { func convert(_ result: Result, to: Response.Type) -> Result { - return result.flatMap { + result.flatMap { do { return try .success(Response(fromRESP: $0)) } catch { diff --git a/Sources/Valkey/Connection/ValkeyConnectionConfiguration.swift b/Sources/Valkey/Connection/ValkeyConnectionConfiguration.swift index 81c30197..aec37026 100644 --- a/Sources/Valkey/Connection/ValkeyConnectionConfiguration.swift +++ b/Sources/Valkey/Connection/ValkeyConnectionConfiguration.swift @@ -7,6 +7,7 @@ // import NIOSSL + #if DistributedTracingSupport import Tracing #endif From 784f53eaad5306118bb703fec4d017f11bb0348f Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Thu, 28 Aug 2025 12:09:05 +0200 Subject: [PATCH 14/23] swift-format Signed-off-by: Fabian Fett --- Benchmarks/ValkeyBenchmarks/ValkeyConnectionBenchmark.swift | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Benchmarks/ValkeyBenchmarks/ValkeyConnectionBenchmark.swift b/Benchmarks/ValkeyBenchmarks/ValkeyConnectionBenchmark.swift index e3a4de77..f4a9bf85 100644 --- a/Benchmarks/ValkeyBenchmarks/ValkeyConnectionBenchmark.swift +++ b/Benchmarks/ValkeyBenchmarks/ValkeyConnectionBenchmark.swift @@ -12,10 +12,11 @@ import Logging import NIOCore import NIOPosix import Synchronization +import Valkey + #if DistributedTracingSupport import Tracing #endif -import Valkey @available(valkeySwift 1.0, *) func connectionBenchmarks() { From 5b59b0491c50d2187ab1651afc92197463b63783 Mon Sep 17 00:00:00 2001 From: Moritz Lang <16192401+slashmo@users.noreply.github.com> Date: Sat, 30 Aug 2025 19:40:37 +0200 Subject: [PATCH 15/23] Use TestTracer from TracingTestKit Signed-off-by: Moritz Lang <16192401+slashmo@users.noreply.github.com> --- Package.swift | 3 +- Tests/ValkeyTests/Utils/TestTracer.swift | 209 ------------------ Tests/ValkeyTests/ValkeyConnectionTests.swift | 45 ++-- 3 files changed, 23 insertions(+), 234 deletions(-) delete mode 100644 Tests/ValkeyTests/Utils/TestTracer.swift diff --git a/Package.swift b/Package.swift index 4f497d9c..9071540a 100644 --- a/Package.swift +++ b/Package.swift @@ -25,7 +25,7 @@ let package = Package( .package(url: "https://github.com/apple/swift-atomics.git", from: "1.0.0"), .package(url: "https://github.com/apple/swift-collections.git", from: "1.1.4"), .package(url: "https://github.com/apple/swift-log.git", from: "1.6.3"), - .package(url: "https://github.com/apple/swift-distributed-tracing.git", from: "1.0.0"), + .package(url: "https://github.com/slashmo/swift-distributed-tracing.git", branch: "feature/tracing-test-kit"), .package(url: "https://github.com/apple/swift-nio.git", from: "2.81.0"), .package(url: "https://github.com/apple/swift-nio-ssl.git", from: "2.29.0"), .package(url: "https://github.com/apple/swift-nio-transport-services.git", from: "1.23.0"), @@ -93,6 +93,7 @@ let package = Package( .product(name: "NIOTestUtils", package: "swift-nio"), .product(name: "Logging", package: "swift-log"), .product(name: "NIOEmbedded", package: "swift-nio"), + .product(name: "TracingTestKit", package: "swift-distributed-tracing", condition: .when(traits: ["DistributedTracingSupport"])), ], swiftSettings: defaultSwiftSettings ), diff --git a/Tests/ValkeyTests/Utils/TestTracer.swift b/Tests/ValkeyTests/Utils/TestTracer.swift deleted file mode 100644 index 1b0f7273..00000000 --- a/Tests/ValkeyTests/Utils/TestTracer.swift +++ /dev/null @@ -1,209 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the valkey-swift open source project -// -// Copyright (c) 2025 the valkey-swift project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See CONTRIBUTORS.txt for the list of valkey-swift project authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -//===----------------------------------------------------------------------===// -// -// This source file is part of the Swift Distributed Tracing open source project -// -// Copyright (c) 2020-2023 Apple Inc. and the Swift Distributed Tracing project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See CONTRIBUTORS.txt for the list of Swift Distributed Tracing project authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -#if DistributedTracingSupport -import Dispatch -import Foundation -import Instrumentation -import ServiceContextModule -import Tracing - -/// Only intended to be used in single-threaded testing. -final class TestTracer { - private(set) var spans = [TestSpan]() - var onEndSpan: (TestSpan) -> Void = { _ in } - - func startAnySpan( - _ operationName: String, - context: @autoclosure () -> ServiceContext, - ofKind kind: SpanKind, - at instant: @autoclosure () -> Instant, - function: String, - file fileID: String, - line: UInt - ) -> any Tracing.Span { - let span = TestSpan( - operationName: operationName, - startTime: instant(), - context: context(), - kind: kind, - onEnd: self.onEndSpan - ) - self.spans.append(span) - return span - } - - func forceFlush() {} - - func extract(_ carrier: Carrier, into context: inout ServiceContext, using extractor: Extract) - where - Extract: Extractor, - Carrier == Extract.Carrier - { - let traceID = extractor.extract(key: "trace-id", from: carrier) ?? UUID().uuidString - context.traceID = traceID - } - - func inject(_ context: ServiceContext, into carrier: inout Carrier, using injector: Inject) - where - Inject: Injector, - Carrier == Inject.Carrier - { - guard let traceID = context.traceID else { return } - injector.inject(traceID, forKey: "trace-id", into: &carrier) - } -} - -extension TestTracer: Tracer { - func startSpan( - _ operationName: String, - context: @autoclosure () -> ServiceContext, - ofKind kind: SpanKind, - at instant: @autoclosure () -> Instant, - function: String, - file fileID: String, - line: UInt - ) -> TestSpan { - let span = TestSpan( - operationName: operationName, - startTime: instant(), - context: context(), - kind: kind, - onEnd: self.onEndSpan - ) - self.spans.append(span) - return span - } -} - -extension TestTracer { - enum TraceIDKey: ServiceContextKey { - typealias Value = String - } - - enum SpanIDKey: ServiceContextKey { - typealias Value = String - } -} - -extension ServiceContext { - var traceID: String? { - get { - self[TestTracer.TraceIDKey.self] - } - set { - self[TestTracer.TraceIDKey.self] = newValue - } - } - - var spanID: String? { - get { - self[TestTracer.SpanIDKey.self] - } - set { - self[TestTracer.SpanIDKey.self] = newValue - } - } -} - -/// Only intended to be used in single-threaded testing. -final class TestSpan: Span { - let kind: SpanKind - - private(set) var status: SpanStatus? - - let startTimestampNanosSinceEpoch: UInt64 - private(set) var endTimestampNanosSinceEpoch: UInt64? - - private(set) var recordedErrors: [(Error, SpanAttributes)] = [] - - var operationName: String - let context: ServiceContext - - private(set) var events = [SpanEvent]() { - didSet { - self.isRecording = !self.events.isEmpty - } - } - - private(set) var links = [SpanLink]() - - var attributes: SpanAttributes = [:] { - didSet { - self.isRecording = !self.attributes.isEmpty - } - } - - private(set) var isRecording = false - - let onEnd: (TestSpan) -> Void - - init( - operationName: String, - startTime: Instant, - context: ServiceContext, - kind: SpanKind, - onEnd: @escaping (TestSpan) -> Void - ) { - self.operationName = operationName - self.startTimestampNanosSinceEpoch = startTime.nanosecondsSinceEpoch - self.context = context - self.onEnd = onEnd - self.kind = kind - } - - func setStatus(_ status: SpanStatus) { - self.status = status - self.isRecording = true - } - - func addLink(_ link: SpanLink) { - self.links.append(link) - } - - func addEvent(_ event: SpanEvent) { - self.events.append(event) - } - - func recordError( - _ error: Error, - attributes: SpanAttributes, - at instant: @autoclosure () -> Instant - ) { - self.recordedErrors.append((error, attributes)) - } - - func end(at instant: @autoclosure () -> Instant) { - self.endTimestampNanosSinceEpoch = instant().nanosecondsSinceEpoch - self.onEnd(self) - } -} - -extension TestTracer: @unchecked Sendable {} // only intended for single threaded testing -extension TestSpan: @unchecked Sendable {} // only intended for single threaded testing -#endif diff --git a/Tests/ValkeyTests/ValkeyConnectionTests.swift b/Tests/ValkeyTests/ValkeyConnectionTests.swift index 1c5591c2..ea6ea8b1 100644 --- a/Tests/ValkeyTests/ValkeyConnectionTests.swift +++ b/Tests/ValkeyTests/ValkeyConnectionTests.swift @@ -15,7 +15,7 @@ import Testing @testable import Valkey #if DistributedTracingSupport -@testable import Instrumentation +import TracingTestKit #endif @Suite @@ -491,7 +491,7 @@ struct ConnectionTests { try await channel.close() } - #if DistributedTracingSupport && compiler(>=6.2) // Swift Testing exit tests only added in 6.2 + #if DistributedTracingSupport @Suite struct DistributedTracingTests { @Test @@ -514,11 +514,11 @@ struct ConnectionTests { try await channel.writeInbound(RESPToken(.bulkString("Bar")).base) #expect(try await fooResult == "Bar") - #expect(tracer.spans.count == 1) - let span = try #require(tracer.spans.first) + #expect(tracer.finishedSpans.count == 1) + let span = try #require(tracer.finishedSpans.first) #expect(span.operationName == "GET") #expect(span.kind == .client) - #expect(span.recordedErrors.isEmpty) + #expect(span.errors.isEmpty) #expect( span.attributes == [ "db.system.name": "valkey", @@ -528,7 +528,6 @@ struct ConnectionTests { "network.peer.port": 6379, ] ) - #expect(span.recordedErrors.isEmpty) #expect(span.status == nil) } @@ -556,13 +555,13 @@ struct ConnectionTests { #expect(error.message == "ERR Error!") } - #expect(tracer.spans.count == 1) - let span = try #require(tracer.spans.first) + #expect(tracer.finishedSpans.count == 1) + let span = try #require(tracer.finishedSpans.first) #expect(span.operationName == "GET") #expect(span.kind == .client) - #expect(span.recordedErrors.count == 1) - let error = try #require(span.recordedErrors.first) - #expect(error.0 as? ValkeyClientError == ValkeyClientError(.commandError, message: "ERR Error!")) + #expect(span.errors.count == 1) + let error = try #require(span.errors.first) + #expect(error.error as? ValkeyClientError == ValkeyClientError(.commandError, message: "ERR Error!")) #expect( span.attributes == [ "db.system.name": "valkey", @@ -601,11 +600,11 @@ struct ConnectionTests { #expect(try await results.1.get().map { String(buffer: $0) } == "OK") - #expect(tracer.spans.count == 1) - let span = try #require(tracer.spans.first) + #expect(tracer.finishedSpans.count == 1) + let span = try #require(tracer.finishedSpans.first) #expect(span.operationName == "MULTI") #expect(span.kind == .client) - #expect(span.recordedErrors.isEmpty) + #expect(span.errors.isEmpty) #expect( span.attributes == [ "db.system.name": "valkey", @@ -616,7 +615,6 @@ struct ConnectionTests { "network.peer.port": 6379, ] ) - #expect(span.recordedErrors.isEmpty) #expect(span.status == nil) } @@ -645,11 +643,11 @@ struct ConnectionTests { #expect(try await results.1.get().map { String(buffer: $0) } == "bar") - #expect(tracer.spans.count == 1) - let span = try #require(tracer.spans.first) + #expect(tracer.finishedSpans.count == 1) + let span = try #require(tracer.finishedSpans.first) #expect(span.operationName == "MULTI") #expect(span.kind == .client) - #expect(span.recordedErrors.isEmpty) + #expect(span.errors.isEmpty) #expect( span.attributes == [ "db.system.name": "valkey", @@ -660,7 +658,6 @@ struct ConnectionTests { "network.peer.port": 6379, ] ) - #expect(span.recordedErrors.isEmpty) #expect(span.status == nil) } @@ -686,13 +683,13 @@ struct ConnectionTests { try await channel.writeInbound(RESPToken(.simpleError("WRONGTYPE Error!")).base) _ = await results - #expect(tracer.spans.count == 1) - let span = try #require(tracer.spans.first) + #expect(tracer.finishedSpans.count == 1) + let span = try #require(tracer.finishedSpans.first) #expect(span.operationName == "MULTI") #expect(span.kind == .client) - #expect(span.recordedErrors.count == 1) - let error = try #require(span.recordedErrors.first) - #expect(error.0 as? ValkeyClientError == ValkeyClientError(.commandError, message: "WRONGTYPE Error!")) + #expect(span.errors.count == 1) + let error = try #require(span.errors.first) + #expect(error.error as? ValkeyClientError == ValkeyClientError(.commandError, message: "WRONGTYPE Error!")) #expect( span.attributes == [ "db.system.name": "valkey", From 4d0066ef0253f51266e8c8244248480e0d122170 Mon Sep 17 00:00:00 2001 From: Moritz Lang <16192401+slashmo@users.noreply.github.com> Date: Sat, 30 Aug 2025 22:41:26 +0200 Subject: [PATCH 16/23] Put ValkeyConnection.address behind Tracing trait Signed-off-by: Moritz Lang <16192401+slashmo@users.noreply.github.com> --- Sources/Valkey/Connection/ValkeyConnection.swift | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Sources/Valkey/Connection/ValkeyConnection.swift b/Sources/Valkey/Connection/ValkeyConnection.swift index 6fce7ff1..dd154018 100644 --- a/Sources/Valkey/Connection/ValkeyConnection.swift +++ b/Sources/Valkey/Connection/ValkeyConnection.swift @@ -36,14 +36,14 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable { #if DistributedTracingSupport @usableFromInline let tracer: (any Tracer)? + @usableFromInline + let address: (hostOrSocketPath: String, port: Int?)? #endif @usableFromInline let channel: any Channel @usableFromInline let channelHandler: ValkeyChannelHandler let configuration: ValkeyConnectionConfiguration - @usableFromInline - let address: (hostOrSocketPath: String, port: Int?)? let isClosed: Atomic /// Initialize connection From f1667d6c109f6c473a280a73c2523cae74c97f6b Mon Sep 17 00:00:00 2001 From: Moritz Lang <16192401+slashmo@users.noreply.github.com> Date: Sat, 30 Aug 2025 22:59:07 +0200 Subject: [PATCH 17/23] Extract simple error prefix into computed variable Signed-off-by: Moritz Lang <16192401+slashmo@users.noreply.github.com> --- .../Valkey/Connection/ValkeyConnection.swift | 30 ++++++++++++++----- 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/Sources/Valkey/Connection/ValkeyConnection.swift b/Sources/Valkey/Connection/ValkeyConnection.swift index dd154018..b8d49848 100644 --- a/Sources/Valkey/Connection/ValkeyConnection.swift +++ b/Sources/Valkey/Connection/ValkeyConnection.swift @@ -199,15 +199,12 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable { } } catch let error as ValkeyClientError { #if DistributedTracingSupport - span?.recordError(error) - if let message = error.message { - var prefixEndIndex = message.startIndex - while prefixEndIndex < message.endIndex, message[prefixEndIndex] != " " { - message.formIndex(after: &prefixEndIndex) + if let span { + span.recordError(error) + span.setStatus(SpanStatus(code: .error)) + if let prefix = error.simpleErrorPrefix { + span.attributes["db.response.status_code"] = "\(prefix)" } - let prefix = message[message.startIndex.. Date: Sat, 6 Sep 2025 19:14:50 +0200 Subject: [PATCH 18/23] Add missing availability macro to ValkeyTracingConfiguration Signed-off-by: Moritz Lang <16192401+slashmo@users.noreply.github.com> --- Sources/Valkey/Connection/ValkeyConnectionConfiguration.swift | 1 + 1 file changed, 1 insertion(+) diff --git a/Sources/Valkey/Connection/ValkeyConnectionConfiguration.swift b/Sources/Valkey/Connection/ValkeyConnectionConfiguration.swift index aec37026..8bf05f54 100644 --- a/Sources/Valkey/Connection/ValkeyConnectionConfiguration.swift +++ b/Sources/Valkey/Connection/ValkeyConnectionConfiguration.swift @@ -147,6 +147,7 @@ public struct ValkeyConnectionConfiguration: Sendable { } #if DistributedTracingSupport +@available(valkeySwift 1.0, *) public struct ValkeyTracingConfiguration: Sendable { public var tracer: (any Tracer)? = InstrumentationSystem.tracer From 960c035640e07dcbb5f0d9a7ce44ba0d306019d9 Mon Sep 17 00:00:00 2001 From: Moritz Lang <16192401+slashmo@users.noreply.github.com> Date: Sat, 13 Sep 2025 16:20:00 +0200 Subject: [PATCH 19/23] Switch to swift-distributed-tracing upstream Signed-off-by: Moritz Lang <16192401+slashmo@users.noreply.github.com> --- Package.swift | 4 ++-- Tests/ValkeyTests/ValkeyConnectionTests.swift | 12 ++++++------ 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/Package.swift b/Package.swift index 9071540a..f8322676 100644 --- a/Package.swift +++ b/Package.swift @@ -25,7 +25,7 @@ let package = Package( .package(url: "https://github.com/apple/swift-atomics.git", from: "1.0.0"), .package(url: "https://github.com/apple/swift-collections.git", from: "1.1.4"), .package(url: "https://github.com/apple/swift-log.git", from: "1.6.3"), - .package(url: "https://github.com/slashmo/swift-distributed-tracing.git", branch: "feature/tracing-test-kit"), + .package(url: "https://github.com/apple/swift-distributed-tracing.git", from: "1.3.0"), .package(url: "https://github.com/apple/swift-nio.git", from: "2.81.0"), .package(url: "https://github.com/apple/swift-nio-ssl.git", from: "2.29.0"), .package(url: "https://github.com/apple/swift-nio-transport-services.git", from: "1.23.0"), @@ -93,7 +93,7 @@ let package = Package( .product(name: "NIOTestUtils", package: "swift-nio"), .product(name: "Logging", package: "swift-log"), .product(name: "NIOEmbedded", package: "swift-nio"), - .product(name: "TracingTestKit", package: "swift-distributed-tracing", condition: .when(traits: ["DistributedTracingSupport"])), + .product(name: "InMemoryTracing", package: "swift-distributed-tracing", condition: .when(traits: ["DistributedTracingSupport"])), ], swiftSettings: defaultSwiftSettings ), diff --git a/Tests/ValkeyTests/ValkeyConnectionTests.swift b/Tests/ValkeyTests/ValkeyConnectionTests.swift index ea6ea8b1..068aad0e 100644 --- a/Tests/ValkeyTests/ValkeyConnectionTests.swift +++ b/Tests/ValkeyTests/ValkeyConnectionTests.swift @@ -15,7 +15,7 @@ import Testing @testable import Valkey #if DistributedTracingSupport -import TracingTestKit +import InMemoryTracing #endif @Suite @@ -497,7 +497,7 @@ struct ConnectionTests { @Test @available(valkeySwift 1.0, *) func testSingleCommandSpan() async throws { - let tracer = TestTracer() + let tracer = InMemoryTracer() var config = ValkeyConnectionConfiguration() config.tracing.tracer = tracer @@ -534,7 +534,7 @@ struct ConnectionTests { @Test @available(valkeySwift 1.0, *) func testSingleCommandFailureSpan() async throws { - let tracer = TestTracer() + let tracer = InMemoryTracer() var config = ValkeyConnectionConfiguration() config.tracing.tracer = tracer @@ -578,7 +578,7 @@ struct ConnectionTests { @Test(.disabled("Pipeline support not implemented yet")) @available(valkeySwift 1.0, *) func testPipelinedSameCommandsSpan() async throws { - let tracer = TestTracer() + let tracer = InMemoryTracer() var config = ValkeyConnectionConfiguration() config.tracing.tracer = tracer @@ -621,7 +621,7 @@ struct ConnectionTests { @Test(.disabled("Pipeline support not implemented yet")) @available(valkeySwift 1.0, *) func testPipelinedDifferentCommandsSpan() async throws { - let tracer = TestTracer() + let tracer = InMemoryTracer() var config = ValkeyConnectionConfiguration() config.tracing.tracer = tracer @@ -664,7 +664,7 @@ struct ConnectionTests { @Test(.disabled("Pipeline support not implemented yet")) @available(valkeySwift 1.0, *) func testPipelinedCommandFailureSpan() async throws { - let tracer = TestTracer() + let tracer = InMemoryTracer() var config = ValkeyConnectionConfiguration() config.tracing.tracer = tracer From bb8839b58bb641b4daa71bd2234381eb63b84d6d Mon Sep 17 00:00:00 2001 From: Moritz Lang <16192401+slashmo@users.noreply.github.com> Date: Sat, 13 Sep 2025 16:20:24 +0200 Subject: [PATCH 20/23] Fix compilation without DistributedTracingSupport trait Signed-off-by: Moritz Lang <16192401+slashmo@users.noreply.github.com> --- Sources/Valkey/Connection/ValkeyConnection.swift | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/Sources/Valkey/Connection/ValkeyConnection.swift b/Sources/Valkey/Connection/ValkeyConnection.swift index b8d49848..44e304ea 100644 --- a/Sources/Valkey/Connection/ValkeyConnection.swift +++ b/Sources/Valkey/Connection/ValkeyConnection.swift @@ -63,7 +63,6 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable { self.logger = logger #if DistributedTracingSupport self.tracer = configuration.tracing.tracer - #endif switch address?.value { case let .hostname(host, port): self.address = (host, port) @@ -72,6 +71,7 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable { case nil: self.address = nil } + #endif self.isClosed = .init(false) } @@ -263,6 +263,7 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable { } } + #if DistributedTracingSupport @usableFromInline func applyCommonAttributes(to attributes: inout SpanAttributes, commandName: String) { attributes[self.configuration.tracing.attributeNames.databaseOperationName] = commandName @@ -272,6 +273,7 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable { attributes[self.configuration.tracing.attributeNames.serverAddress] = address?.hostOrSocketPath attributes[self.configuration.tracing.attributeNames.serverPort] = address?.port == 6379 ? nil : address?.port } + #endif @usableFromInline nonisolated func cancel(requestID: Int) { From 34db44013f8cebfe2384d59875be48809929541a Mon Sep 17 00:00:00 2001 From: Moritz Lang <16192401+slashmo@users.noreply.github.com> Date: Sat, 13 Sep 2025 16:25:51 +0200 Subject: [PATCH 21/23] Fix benchmark compilation Signed-off-by: Moritz Lang <16192401+slashmo@users.noreply.github.com> --- Benchmarks/ValkeyBenchmarks/ValkeyConnectionBenchmark.swift | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Benchmarks/ValkeyBenchmarks/ValkeyConnectionBenchmark.swift b/Benchmarks/ValkeyBenchmarks/ValkeyConnectionBenchmark.swift index f4a9bf85..4f664244 100644 --- a/Benchmarks/ValkeyBenchmarks/ValkeyConnectionBenchmark.swift +++ b/Benchmarks/ValkeyBenchmarks/ValkeyConnectionBenchmark.swift @@ -69,6 +69,8 @@ func makeConnectionGETBenchmark() -> Benchmark? { // explicitly set tracer to nil, if trait is enabled var configuration = ValkeyConnectionConfiguration() configuration.tracing.tracer = nil + #else + let configuration = ValkeyConnectionConfiguration() #endif try await ValkeyConnection.withConnection( address: .hostname("127.0.0.1", port: port), From faf9dcc11c7cfbb431b93e739670e8233fb262e7 Mon Sep 17 00:00:00 2001 From: Moritz Lang <16192401+slashmo@users.noreply.github.com> Date: Sat, 13 Sep 2025 19:39:59 +0200 Subject: [PATCH 22/23] Document tracing configuration Signed-off-by: Moritz Lang <16192401+slashmo@users.noreply.github.com> --- Sources/Valkey/Connection/ValkeyConnection.swift | 2 +- .../Connection/ValkeyConnectionConfiguration.swift | 13 +++++++++++-- Sources/Valkey/ValkeyClientConfiguration.swift | 2 ++ 3 files changed, 14 insertions(+), 3 deletions(-) diff --git a/Sources/Valkey/Connection/ValkeyConnection.swift b/Sources/Valkey/Connection/ValkeyConnection.swift index 44e304ea..a181326c 100644 --- a/Sources/Valkey/Connection/ValkeyConnection.swift +++ b/Sources/Valkey/Connection/ValkeyConnection.swift @@ -267,7 +267,7 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable { @usableFromInline func applyCommonAttributes(to attributes: inout SpanAttributes, commandName: String) { attributes[self.configuration.tracing.attributeNames.databaseOperationName] = commandName - attributes[self.configuration.tracing.attributeNames.databaseSystemName] = self.configuration.tracing.attributeValue.databaseSystem + attributes[self.configuration.tracing.attributeNames.databaseSystemName] = self.configuration.tracing.attributeValues.databaseSystem attributes[self.configuration.tracing.attributeNames.networkPeerAddress] = channel.remoteAddress?.ipAddress attributes[self.configuration.tracing.attributeNames.networkPeerPort] = channel.remoteAddress?.port attributes[self.configuration.tracing.attributeNames.serverAddress] = address?.hostOrSocketPath diff --git a/Sources/Valkey/Connection/ValkeyConnectionConfiguration.swift b/Sources/Valkey/Connection/ValkeyConnectionConfiguration.swift index 8bf05f54..82ecf2a2 100644 --- a/Sources/Valkey/Connection/ValkeyConnectionConfiguration.swift +++ b/Sources/Valkey/Connection/ValkeyConnectionConfiguration.swift @@ -117,6 +117,8 @@ public struct ValkeyConnectionConfiguration: Sendable { public var clientName: String? #if DistributedTracingSupport + /// The distributed tracing configuration to use for this connection. + /// Defaults to using the globally bootstrapped tracer with OpenTelemetry semantic conventions. public var tracing: ValkeyTracingConfiguration = .init() #endif @@ -148,13 +150,19 @@ public struct ValkeyConnectionConfiguration: Sendable { #if DistributedTracingSupport @available(valkeySwift 1.0, *) +/// A configuration object that defines distributed tracing behavior of a Valkey client. public struct ValkeyTracingConfiguration: Sendable { - + /// The tracer to use, or `nil` to disable tracing. + /// Defaults to the globally bootstrapped tracer. public var tracer: (any Tracer)? = InstrumentationSystem.tracer + /// The attribute names used in spans created by Valkey. Defaults to OpenTelemetry semantics. public var attributeNames: AttributeNames = .init() - public var attributeValue: AttributeValues = .init() + /// The static attribute values used in spans created by Valkey. + public var attributeValues: AttributeValues = .init() + + /// Attribute names used in spans created by Valkey. public struct AttributeNames: Sendable { public var databaseOperationName: String = "db.operation.name" public var databaseSystemName: String = "db.system.name" @@ -164,6 +172,7 @@ public struct ValkeyTracingConfiguration: Sendable { public var serverPort: String = "server.port" } + /// Static attribute values used in spans created by Valkey. public struct AttributeValues: Sendable { public var databaseSystem: String = "valkey" } diff --git a/Sources/Valkey/ValkeyClientConfiguration.swift b/Sources/Valkey/ValkeyClientConfiguration.swift index f435d84f..c2268f2d 100644 --- a/Sources/Valkey/ValkeyClientConfiguration.swift +++ b/Sources/Valkey/ValkeyClientConfiguration.swift @@ -119,6 +119,8 @@ public struct ValkeyClientConfiguration: Sendable { public var tls: TLS #if DistributedTracingSupport + /// The distributed tracing configuration to use for the Valkey connection. + /// Defaults to using the globally bootstrapped tracer with OpenTelemetry semantic conventions. public var tracing: ValkeyTracingConfiguration = .init() #endif From 60c0a012af5e80a351d9c0006ef926a7e941f194 Mon Sep 17 00:00:00 2001 From: Moritz Lang <16192401+slashmo@users.noreply.github.com> Date: Sat, 13 Sep 2025 20:07:36 +0200 Subject: [PATCH 23/23] Set span status code for all errors Signed-off-by: Moritz Lang <16192401+slashmo@users.noreply.github.com> --- .../Valkey/Connection/ValkeyConnection.swift | 5 ++- Tests/ValkeyTests/ValkeyConnectionTests.swift | 42 +++++++++++++++++++ 2 files changed, 46 insertions(+), 1 deletion(-) diff --git a/Sources/Valkey/Connection/ValkeyConnection.swift b/Sources/Valkey/Connection/ValkeyConnection.swift index a181326c..d8b500e8 100644 --- a/Sources/Valkey/Connection/ValkeyConnection.swift +++ b/Sources/Valkey/Connection/ValkeyConnection.swift @@ -210,7 +210,10 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable { throw error } catch { #if DistributedTracingSupport - span?.recordError(error) + if let span { + span.recordError(error) + span.setStatus(SpanStatus(code: .error)) + } #endif throw error } diff --git a/Tests/ValkeyTests/ValkeyConnectionTests.swift b/Tests/ValkeyTests/ValkeyConnectionTests.swift index 068aad0e..939aa1a7 100644 --- a/Tests/ValkeyTests/ValkeyConnectionTests.swift +++ b/Tests/ValkeyTests/ValkeyConnectionTests.swift @@ -575,6 +575,48 @@ struct ConnectionTests { #expect(span.status?.code == .error) } + @Test + @available(valkeySwift 1.0, *) + func testSingleCommandUnknownFailureSpan() async throws { + let tracer = InMemoryTracer() + var config = ValkeyConnectionConfiguration() + config.tracing.tracer = tracer + + let channel = NIOAsyncTestingChannel() + let logger = Logger(label: "test") + let connection = try await ValkeyConnection.setupChannelAndConnect(channel, configuration: config, logger: logger) + try await channel.processHello() + + async let fooResult = connection.get("foo") + _ = try await channel.waitForOutboundWrite(as: ByteBuffer.self) + + await #expect(throws: RESPParsingError.self) { + try await channel.writeInbound(ByteBuffer(string: "invalid resp token")) + } + do { + _ = try await fooResult + Issue.record() + } catch is RESPParsingError {} + + #expect(tracer.finishedSpans.count == 1) + let span = try #require(tracer.finishedSpans.first) + #expect(span.operationName == "GET") + #expect(span.kind == .client) + #expect(span.errors.count == 1) + let error = try #require(span.errors.first) + #expect((error.error as? RESPParsingError)?.code == .invalidLeadingByte) + #expect( + span.attributes == [ + "db.system.name": "valkey", + "db.operation.name": "GET", + "server.address": "127.0.0.1", + "network.peer.address": "127.0.0.1", + "network.peer.port": 6379, + ] + ) + #expect(span.status?.code == .error) + } + @Test(.disabled("Pipeline support not implemented yet")) @available(valkeySwift 1.0, *) func testPipelinedSameCommandsSpan() async throws {