diff --git a/.gitignore b/.gitignore index 2c09b57c..fc58b183 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ .DS_Store -/.build +.build +.swiftpm /Packages xcuserdata/ DerivedData/ @@ -10,4 +11,4 @@ DerivedData/ Package.resolved .benchmarkBaselines/ .swift-version -.docc-build \ No newline at end of file +.docc-build diff --git a/Benchmarks/ValkeyBenchmarks/ValkeyConnectionBenchmark.swift b/Benchmarks/ValkeyBenchmarks/ValkeyConnectionBenchmark.swift index 63df7890..4f664244 100644 --- a/Benchmarks/ValkeyBenchmarks/ValkeyConnectionBenchmark.swift +++ b/Benchmarks/ValkeyBenchmarks/ValkeyConnectionBenchmark.swift @@ -14,10 +14,17 @@ import NIOPosix import Synchronization import Valkey +#if DistributedTracingSupport +import Tracing +#endif + @available(valkeySwift 1.0, *) func connectionBenchmarks() { makeConnectionCreateAndDropBenchmark() makeConnectionGETBenchmark() + #if DistributedTracingSupport + makeConnectionGETNoOpTracerBenchmark() + #endif makeConnectionPipelineBenchmark() } @@ -58,9 +65,47 @@ func makeConnectionGETBenchmark() -> Benchmark? { return Benchmark("Connection: GET benchmark", configuration: .init(metrics: defaultMetrics, scalingFactor: .kilo)) { benchmark in let port = serverMutex.withLock { $0 }!.localAddress!.port! let logger = Logger(label: "test") + #if DistributedTracingSupport + // explicitly set tracer to nil, if trait is enabled + var configuration = ValkeyConnectionConfiguration() + configuration.tracing.tracer = nil + #else + let configuration = ValkeyConnectionConfiguration() + #endif try await ValkeyConnection.withConnection( address: .hostname("127.0.0.1", port: port), - configuration: .init(), + configuration: configuration, + logger: logger + ) { connection in + benchmark.startMeasurement() + for _ in benchmark.scaledIterations { + let foo = try await connection.get("foo") + precondition(foo.map { String(buffer: $0) } == "Bar") + } + benchmark.stopMeasurement() + } + } setup: { + let server = try await makeLocalServer() + serverMutex.withLock { $0 = server } + } teardown: { + try await serverMutex.withLock { $0 }?.close().get() + } +} + +#if DistributedTracingSupport +@available(valkeySwift 1.0, *) +@discardableResult +func makeConnectionGETNoOpTracerBenchmark() -> Benchmark? { + let serverMutex = Mutex<(any Channel)?>(nil) + + return Benchmark("Connection: GET benchmark – NoOpTracer", configuration: .init(metrics: defaultMetrics, scalingFactor: .kilo)) { benchmark in + let port = serverMutex.withLock { $0 }!.localAddress!.port! + let logger = Logger(label: "test") + var configuration = ValkeyConnectionConfiguration() + configuration.tracing.tracer = NoOpTracer() + try await ValkeyConnection.withConnection( + address: .hostname("127.0.0.1", port: port), + configuration: configuration, logger: logger ) { connection in benchmark.startMeasurement() @@ -77,6 +122,7 @@ func makeConnectionGETBenchmark() -> Benchmark? { try await serverMutex.withLock { $0 }?.close().get() } } +#endif @available(valkeySwift 1.0, *) @discardableResult diff --git a/Package.swift b/Package.swift index f487d4b1..f8322676 100644 --- a/Package.swift +++ b/Package.swift @@ -18,12 +18,14 @@ let package = Package( ], traits: [ .trait(name: "ServiceLifecycleSupport"), - .default(enabledTraits: ["ServiceLifecycleSupport"]), + .trait(name: "DistributedTracingSupport"), + .default(enabledTraits: ["ServiceLifecycleSupport", "DistributedTracingSupport"]), ], dependencies: [ .package(url: "https://github.com/apple/swift-atomics.git", from: "1.0.0"), .package(url: "https://github.com/apple/swift-collections.git", from: "1.1.4"), .package(url: "https://github.com/apple/swift-log.git", from: "1.6.3"), + .package(url: "https://github.com/apple/swift-distributed-tracing.git", from: "1.3.0"), .package(url: "https://github.com/apple/swift-nio.git", from: "2.81.0"), .package(url: "https://github.com/apple/swift-nio-ssl.git", from: "2.29.0"), .package(url: "https://github.com/apple/swift-nio-transport-services.git", from: "1.23.0"), @@ -36,6 +38,7 @@ let package = Package( .byName(name: "_ValkeyConnectionPool"), .product(name: "DequeModule", package: "swift-collections"), .product(name: "Logging", package: "swift-log"), + .product(name: "Tracing", package: "swift-distributed-tracing", condition: .when(traits: ["DistributedTracingSupport"])), .product(name: "NIOCore", package: "swift-nio"), .product(name: "NIOPosix", package: "swift-nio"), .product(name: "NIOSSL", package: "swift-nio-ssl"), @@ -90,6 +93,7 @@ let package = Package( .product(name: "NIOTestUtils", package: "swift-nio"), .product(name: "Logging", package: "swift-log"), .product(name: "NIOEmbedded", package: "swift-nio"), + .product(name: "InMemoryTracing", package: "swift-distributed-tracing", condition: .when(traits: ["DistributedTracingSupport"])), ], swiftSettings: defaultSwiftSettings ), diff --git a/Sources/Valkey/Connection/ValkeyConnection.swift b/Sources/Valkey/Connection/ValkeyConnection.swift index ca472f3e..d8b500e8 100644 --- a/Sources/Valkey/Connection/ValkeyConnection.swift +++ b/Sources/Valkey/Connection/ValkeyConnection.swift @@ -17,6 +17,10 @@ import Network import NIOTransportServices #endif +#if DistributedTracingSupport +import Tracing +#endif + /// A single connection to a Valkey database. @available(valkeySwift 1.0, *) public final actor ValkeyConnection: ValkeyClientProtocol, Sendable { @@ -29,6 +33,12 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable { public let id: ID /// Logger used by Server let logger: Logger + #if DistributedTracingSupport + @usableFromInline + let tracer: (any Tracer)? + @usableFromInline + let address: (hostOrSocketPath: String, port: Int?)? + #endif @usableFromInline let channel: any Channel @usableFromInline @@ -42,6 +52,7 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable { connectionID: ID, channelHandler: ValkeyChannelHandler, configuration: ValkeyConnectionConfiguration, + address: ValkeyServerAddress?, logger: Logger ) { self.unownedExecutor = channel.eventLoop.executor.asUnownedSerialExecutor() @@ -50,6 +61,17 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable { self.configuration = configuration self.id = connectionID self.logger = logger + #if DistributedTracingSupport + self.tracer = configuration.tracing.tracer + switch address?.value { + case let .hostname(host, port): + self.address = (host, port) + case let .unixDomainSocket(path): + self.address = (path, nil) + case nil: + self.address = nil + } + #endif self.isClosed = .init(false) } @@ -153,16 +175,47 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable { @inlinable func _execute(command: Command) async throws -> RESPToken { + #if DistributedTracingSupport + let span = self.tracer?.startSpan(Command.name, ofKind: .client) + defer { span?.end() } + + span?.updateAttributes { attributes in + self.applyCommonAttributes(to: &attributes, commandName: Command.name) + } + #endif + let requestID = Self.requestIDGenerator.next() - return try await withTaskCancellationHandler { - if Task.isCancelled { - throw ValkeyClientError(.cancelled) + + do { + return try await withTaskCancellationHandler { + if Task.isCancelled { + throw ValkeyClientError(.cancelled) + } + return try await withCheckedThrowingContinuation { continuation in + self.channelHandler.write(command: command, continuation: continuation, requestID: requestID) + } + } onCancel: { + self.cancel(requestID: requestID) + } + } catch let error as ValkeyClientError { + #if DistributedTracingSupport + if let span { + span.recordError(error) + span.setStatus(SpanStatus(code: .error)) + if let prefix = error.simpleErrorPrefix { + span.attributes["db.response.status_code"] = "\(prefix)" + } } - return try await withCheckedThrowingContinuation { continuation in - self.channelHandler.write(command: command, continuation: continuation, requestID: requestID) + #endif + throw error + } catch { + #if DistributedTracingSupport + if let span { + span.recordError(error) + span.setStatus(SpanStatus(code: .error)) } - } onCancel: { - self.cancel(requestID: requestID) + #endif + throw error } } @@ -213,6 +266,18 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable { } } + #if DistributedTracingSupport + @usableFromInline + func applyCommonAttributes(to attributes: inout SpanAttributes, commandName: String) { + attributes[self.configuration.tracing.attributeNames.databaseOperationName] = commandName + attributes[self.configuration.tracing.attributeNames.databaseSystemName] = self.configuration.tracing.attributeValues.databaseSystem + attributes[self.configuration.tracing.attributeNames.networkPeerAddress] = channel.remoteAddress?.ipAddress + attributes[self.configuration.tracing.attributeNames.networkPeerPort] = channel.remoteAddress?.port + attributes[self.configuration.tracing.attributeNames.serverAddress] = address?.hostOrSocketPath + attributes[self.configuration.tracing.attributeNames.serverPort] = address?.port == 6379 ? nil : address?.port + } + #endif + @usableFromInline nonisolated func cancel(requestID: Int) { self.channel.eventLoop.execute { @@ -278,6 +343,7 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable { connectionID: connectionID, channelHandler: handler, configuration: configuration, + address: address, logger: logger ) } @@ -313,6 +379,7 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable { connectionID: 0, channelHandler: handler, configuration: configuration, + address: .hostname("127.0.0.1", port: 6379), logger: logger ) return channel.connect(to: try SocketAddress(ipAddress: "127.0.0.1", port: 6379)).map { @@ -390,3 +457,20 @@ struct AutoIncrementingInteger { return value - 1 } } + +#if DistributedTracingSupport +extension ValkeyClientError { + /// Extract the simple error prefix from this error. + /// + /// - SeeAlso: [](https://valkey.io/topics/protocol/#simple-errors) + @usableFromInline + var simpleErrorPrefix: Substring? { + guard let message else { return nil } + var prefixEndIndex = message.startIndex + while prefixEndIndex < message.endIndex, message[prefixEndIndex] != " " { + message.formIndex(after: &prefixEndIndex) + } + return message[message.startIndex..