diff --git a/Sources/Valkey/Connection/ValkeyConnection+transactions.swift b/Sources/Valkey/Connection/ValkeyConnection+transactions.swift deleted file mode 100644 index 95d89dd9..00000000 --- a/Sources/Valkey/Connection/ValkeyConnection+transactions.swift +++ /dev/null @@ -1,288 +0,0 @@ -// -// This source file is part of the valkey-swift project -// Copyright (c) 2025 the valkey-swift project authors -// -// See LICENSE.txt for license information -// SPDX-License-Identifier: Apache-2.0 -// -// NOTE: THIS FILE IS AUTO-GENERATED BY dev/generate-transaction-commands.sh - -import NIOCore - -@available(valkeySwift 1.0, *) -extension ValkeyConnection { - - @inlinable - public func transaction(_ c0: C0) async throws -> (Result) { - guard let responses = try await self.execute(MULTI(), ValkeyRawResponseCommand(c0), EXEC()).2.get() else { - throw ValkeyClientError(.transactionAborted) - } - return responses.decodeElementResults() - } - - @inlinable - public func transaction( - _ c0: C0, - _ c1: C1 - ) async throws -> (Result, Result) { - guard let responses = try await self.execute(MULTI(), ValkeyRawResponseCommand(c0), ValkeyRawResponseCommand(c1), EXEC()).3.get() else { - throw ValkeyClientError(.transactionAborted) - } - return responses.decodeElementResults() - } - - @inlinable - public func transaction( - _ c0: C0, - _ c1: C1, - _ c2: C2 - ) async throws -> (Result, Result, Result) { - guard - let responses = try await self.execute( - MULTI(), - ValkeyRawResponseCommand(c0), - ValkeyRawResponseCommand(c1), - ValkeyRawResponseCommand(c2), - EXEC() - ).4.get() - else { throw ValkeyClientError(.transactionAborted) } - return responses.decodeElementResults() - } - - @inlinable - public func transaction( - _ c0: C0, - _ c1: C1, - _ c2: C2, - _ c3: C3 - ) async throws -> (Result, Result, Result, Result) { - guard - let responses = try await self.execute( - MULTI(), - ValkeyRawResponseCommand(c0), - ValkeyRawResponseCommand(c1), - ValkeyRawResponseCommand(c2), - ValkeyRawResponseCommand(c3), - EXEC() - ).5.get() - else { throw ValkeyClientError(.transactionAborted) } - return responses.decodeElementResults() - } - - @inlinable - public func transaction( - _ c0: C0, - _ c1: C1, - _ c2: C2, - _ c3: C3, - _ c4: C4 - ) async throws -> ( - Result, Result, Result, Result, Result - ) { - guard - let responses = try await self.execute( - MULTI(), - ValkeyRawResponseCommand(c0), - ValkeyRawResponseCommand(c1), - ValkeyRawResponseCommand(c2), - ValkeyRawResponseCommand(c3), - ValkeyRawResponseCommand(c4), - EXEC() - ).6.get() - else { throw ValkeyClientError(.transactionAborted) } - return responses.decodeElementResults() - } - - @inlinable - public func transaction( - _ c0: C0, - _ c1: C1, - _ c2: C2, - _ c3: C3, - _ c4: C4, - _ c5: C5 - ) async throws -> ( - Result, Result, Result, Result, Result, - Result - ) { - guard - let responses = try await self.execute( - MULTI(), - ValkeyRawResponseCommand(c0), - ValkeyRawResponseCommand(c1), - ValkeyRawResponseCommand(c2), - ValkeyRawResponseCommand(c3), - ValkeyRawResponseCommand(c4), - ValkeyRawResponseCommand(c5), - EXEC() - ).7.get() - else { throw ValkeyClientError(.transactionAborted) } - return responses.decodeElementResults() - } - - @inlinable - public func transaction< - C0: ValkeyCommand, - C1: ValkeyCommand, - C2: ValkeyCommand, - C3: ValkeyCommand, - C4: ValkeyCommand, - C5: ValkeyCommand, - C6: ValkeyCommand - >( - _ c0: C0, - _ c1: C1, - _ c2: C2, - _ c3: C3, - _ c4: C4, - _ c5: C5, - _ c6: C6 - ) async throws -> ( - Result, Result, Result, Result, Result, - Result, Result - ) { - guard - let responses = try await self.execute( - MULTI(), - ValkeyRawResponseCommand(c0), - ValkeyRawResponseCommand(c1), - ValkeyRawResponseCommand(c2), - ValkeyRawResponseCommand(c3), - ValkeyRawResponseCommand(c4), - ValkeyRawResponseCommand(c5), - ValkeyRawResponseCommand(c6), - EXEC() - ).8.get() - else { throw ValkeyClientError(.transactionAborted) } - return responses.decodeElementResults() - } - - @inlinable - public func transaction< - C0: ValkeyCommand, - C1: ValkeyCommand, - C2: ValkeyCommand, - C3: ValkeyCommand, - C4: ValkeyCommand, - C5: ValkeyCommand, - C6: ValkeyCommand, - C7: ValkeyCommand - >( - _ c0: C0, - _ c1: C1, - _ c2: C2, - _ c3: C3, - _ c4: C4, - _ c5: C5, - _ c6: C6, - _ c7: C7 - ) async throws -> ( - Result, Result, Result, Result, Result, - Result, Result, Result - ) { - guard - let responses = try await self.execute( - MULTI(), - ValkeyRawResponseCommand(c0), - ValkeyRawResponseCommand(c1), - ValkeyRawResponseCommand(c2), - ValkeyRawResponseCommand(c3), - ValkeyRawResponseCommand(c4), - ValkeyRawResponseCommand(c5), - ValkeyRawResponseCommand(c6), - ValkeyRawResponseCommand(c7), - EXEC() - ).9.get() - else { throw ValkeyClientError(.transactionAborted) } - return responses.decodeElementResults() - } - - @inlinable - public func transaction< - C0: ValkeyCommand, - C1: ValkeyCommand, - C2: ValkeyCommand, - C3: ValkeyCommand, - C4: ValkeyCommand, - C5: ValkeyCommand, - C6: ValkeyCommand, - C7: ValkeyCommand, - C8: ValkeyCommand - >( - _ c0: C0, - _ c1: C1, - _ c2: C2, - _ c3: C3, - _ c4: C4, - _ c5: C5, - _ c6: C6, - _ c7: C7, - _ c8: C8 - ) async throws -> ( - Result, Result, Result, Result, Result, - Result, Result, Result, Result - ) { - guard - let responses = try await self.execute( - MULTI(), - ValkeyRawResponseCommand(c0), - ValkeyRawResponseCommand(c1), - ValkeyRawResponseCommand(c2), - ValkeyRawResponseCommand(c3), - ValkeyRawResponseCommand(c4), - ValkeyRawResponseCommand(c5), - ValkeyRawResponseCommand(c6), - ValkeyRawResponseCommand(c7), - ValkeyRawResponseCommand(c8), - EXEC() - ).10.get() - else { throw ValkeyClientError(.transactionAborted) } - return responses.decodeElementResults() - } - - @inlinable - public func transaction< - C0: ValkeyCommand, - C1: ValkeyCommand, - C2: ValkeyCommand, - C3: ValkeyCommand, - C4: ValkeyCommand, - C5: ValkeyCommand, - C6: ValkeyCommand, - C7: ValkeyCommand, - C8: ValkeyCommand, - C9: ValkeyCommand - >( - _ c0: C0, - _ c1: C1, - _ c2: C2, - _ c3: C3, - _ c4: C4, - _ c5: C5, - _ c6: C6, - _ c7: C7, - _ c8: C8, - _ c9: C9 - ) async throws -> ( - Result, Result, Result, Result, Result, - Result, Result, Result, Result, Result - ) { - guard - let responses = try await self.execute( - MULTI(), - ValkeyRawResponseCommand(c0), - ValkeyRawResponseCommand(c1), - ValkeyRawResponseCommand(c2), - ValkeyRawResponseCommand(c3), - ValkeyRawResponseCommand(c4), - ValkeyRawResponseCommand(c5), - ValkeyRawResponseCommand(c6), - ValkeyRawResponseCommand(c7), - ValkeyRawResponseCommand(c8), - ValkeyRawResponseCommand(c9), - EXEC() - ).11.get() - else { throw ValkeyClientError(.transactionAborted) } - return responses.decodeElementResults() - } -} diff --git a/Sources/Valkey/Connection/ValkeyConnection.swift b/Sources/Valkey/Connection/ValkeyConnection.swift index 19645e1e..5e9aa949 100644 --- a/Sources/Valkey/Connection/ValkeyConnection.swift +++ b/Sources/Valkey/Connection/ValkeyConnection.swift @@ -225,90 +225,194 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable { public func execute( _ commands: repeat each Command ) async -> sending (repeat Result<(each Command).Response, Error>) { - func convert(_ result: Result, to: Response.Type) -> Result { - result.flatMap { - do { - return try .success(Response(fromRESP: $0)) - } catch { - return .failure(error) - } - } - } - let requestID = Self.requestIDGenerator.next() // this currently allocates a promise for every command. We could collapse this down to one promise - var mpromises: [EventLoopPromise] = [] + var promises: [EventLoopPromise] = [] var encoder = ValkeyCommandEncoder() for command in repeat each commands { command.encode(into: &encoder) - mpromises.append(channel.eventLoop.makePromise(of: RESPToken.self)) + promises.append(channel.eventLoop.makePromise(of: RESPToken.self)) } - let outBuffer = encoder.buffer - let promises = mpromises - return await withTaskCancellationHandler { - if Task.isCancelled { - for promise in mpromises { - promise.fail(ValkeyClientError(.cancelled)) - } - } else { - // write directly to channel handler - self.channelHandler.write(request: ValkeyRequest.multiple(buffer: outBuffer, promises: promises.map { .nio($0) }, id: requestID)) - } + return await _execute( + buffer: encoder.buffer, + promises: promises, + valkeyPromises: promises.map { .nio($0) } + ) { promises in // get response from channel handler var index = AutoIncrementingInteger() - return await (repeat convert(promises[index.next()].futureResult._result(), to: (each Command).Response.self)) - } onCancel: { - self.cancel(requestID: requestID) + return await (repeat promises[index.next()].futureResult._result().convertFromRESP(to: (each Command).Response.self)) } } - /// Pipeline a series of commands to Valkey connection + /// Pipeline a series of commands as a transaction to Valkey connection /// - /// Once all the responses for the commands have been received the function returns - /// an array of RESPToken Results, one for each command. + /// Another client will never be served in the middle of the execution of these + /// commands. See https://valkey.io/topics/transactions/ for more information. /// - /// This is an alternative version of the pipelining function ``ValkeyConnection/execute(_:)->(_,_)`` - /// that allows for a collection of ValkeyCommands. It provides more flexibility but is - /// slightly more expensive to run and the command responses are returned as ``RESPToken`` - /// instead of the response type for the command. + /// EXEC and MULTI commands are added to the pipelined commands and the output + /// of the EXEC command is transformed into a parameter pack of Results, one + /// for each command. + /// + /// - Parameter commands: Parameter pack of ValkeyCommands + /// - Returns: Parameter pack holding the responses of all the commands + @inlinable + public func transaction( + _ commands: repeat each Command + ) async throws -> sending (repeat Result<(each Command).Response, Error>) { + func replaceSuccessWithError( + response: Response.Type, + result: Result, + error: any Error + ) -> Result { + switch result { + case .failure(let error): + return .failure(error) + case .success: + return .failure(error) + } + } + var encoder = ValkeyCommandEncoder() + var promises: [EventLoopPromise] = [] + MULTI().encode(into: &encoder) + promises.append(channel.eventLoop.makePromise(of: RESPToken.self)) + for command in repeat each commands { + command.encode(into: &encoder) + promises.append(channel.eventLoop.makePromise(of: RESPToken.self)) + } + EXEC().encode(into: &encoder) + promises.append(channel.eventLoop.makePromise(of: RESPToken.self)) + return try await _execute( + buffer: encoder.buffer, + promises: promises, + valkeyPromises: promises.map { .nio($0) } + ) { promises -> sending Result<(repeat Result<(each Command).Response, Error>), Error> in + // get response from channel handler + do { + guard let responses = try await promises.last!.futureResult._result().convertFromRESP(to: EXEC.Response.self).get() else { + return .failure(ValkeyClientError(.transactionAborted)) + } + return .success(responses.decodeElementResults()) + } catch { + // we received an error while running the EXEC command. We return an + // array of results all with errors. If the queuing of a command already + // generated an error then we use that error, otherwise we use the error + // that we have just caught + var index = AutoIncrementingInteger(1) + return await .success( + (repeat + replaceSuccessWithError( + response: (each Command).Response.self, + result: promises[index.next()].futureResult._result(), + error: error + )) + ) + } + }.get() + } + + /// Pipeline a series of commands as a transaction to Valkey connection + /// + /// Another client will never be served in the middle of the execution of these + /// commands. See https://valkey.io/topics/transactions/ for more information. + /// + /// EXEC and MULTI commands are added to the pipelined commands and the output + /// of the EXEC command is transformed into an array of RESPToken Results, one for + /// each command. + /// + /// This is an alternative version of the transaction function ``ValkeyConnection/transaction(_:)->(_,_)`` + /// that allows for a collection of ValkeyCommands. It provides more flexibility but the command + /// responses are returned as ``RESPToken`` instead of the response type for the command. /// /// - Parameter commands: Collection of ValkeyCommands /// - Returns: Array holding the RESPToken responses of all the commands @inlinable public func execute( _ commands: some Collection - ) async -> sending [Result] { - let requestID = Self.requestIDGenerator.next() + ) async -> [Result] { // this currently allocates a promise for every command. We could collapse this down to one promise - var mpromises: [EventLoopPromise] = [] - mpromises.reserveCapacity(commands.count) + var promises: [EventLoopPromise] = [] + promises.reserveCapacity(commands.count) var encoder = ValkeyCommandEncoder() for command in commands { command.encode(into: &encoder) - mpromises.append(channel.eventLoop.makePromise(of: RESPToken.self)) + promises.append(channel.eventLoop.makePromise(of: RESPToken.self)) } - let outBuffer = encoder.buffer - let promises = mpromises - return await withTaskCancellationHandler { - if Task.isCancelled { - for promise in mpromises { - promise.fail(ValkeyClientError(.cancelled)) - } - } else { - // write directly to channel handler - self.channelHandler.write(request: ValkeyRequest.multiple(buffer: outBuffer, promises: promises.map { .nio($0) }, id: requestID)) - } + let count = commands.count + return await _execute( + buffer: encoder.buffer, + promises: promises, + valkeyPromises: promises.map { .nio($0) } + ) { promises in // get response from channel handler var results: [Result] = .init() - results.reserveCapacity(commands.count) + results.reserveCapacity(count) for promise in promises { await results.append(promise.futureResult._result()) } return results - } onCancel: { - self.cancel(requestID: requestID) } } + /// Pipeline a series of commands to Valkey connection + /// + /// Once all the responses for the commands have been received the function returns + /// a parameter pack of Results, one for each command. + /// + /// - Parameter commands: Parameter pack of ValkeyCommands + /// - Returns: Parameter pack holding the responses of all the commands + @inlinable + public func transaction( + _ commands: some Collection + ) async throws -> [Result] { + var encoder = ValkeyCommandEncoder() + var promises: [EventLoopPromise] = [] + MULTI().encode(into: &encoder) + promises.append(channel.eventLoop.makePromise(of: RESPToken.self)) + for command in commands { + command.encode(into: &encoder) + promises.append(channel.eventLoop.makePromise(of: RESPToken.self)) + } + EXEC().encode(into: &encoder) + promises.append(channel.eventLoop.makePromise(of: RESPToken.self)) + return try await _execute( + buffer: encoder.buffer, + promises: promises, + valkeyPromises: promises.map { .nio($0) } + ) { promises -> sending Result<[Result], Error> in + do { + guard let responses = try await promises.last!.futureResult._result().convertFromRESP(to: EXEC.Response.self).get() else { + return .failure(ValkeyClientError(.transactionAborted)) + } + return .success( + responses.map { + switch $0.identifier { + case .simpleError, .bulkError: + .failure(ValkeyClientError(.commandError, message: $0.errorString.map { Swift.String(buffer: $0) })) + default: + .success($0) + } + } + ) + } catch { + // we received an error while running the EXEC command. We return an + // array of results all with errors. If the queuing of a command already + // generated an error then we use that error, otherwise we use the error + // that we have just caught + var results: [Result] = .init() + results.reserveCapacity(promises.count - 2) + for promise in promises[1..<(promises.count - 1)] { + let result = await promise.futureResult._result() + switch result { + case .failure: + results.append(result) + case .success: + results.append(.failure(error)) + } + } + return .success(results) + } + }.get() + } + /// Pipeline a series of commands to Valkey connection and precede each command with an ASKING /// command /// @@ -322,37 +426,62 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable { @usableFromInline func executeWithAsk( _ commands: some Collection - ) async -> sending [Result] { - let requestID = Self.requestIDGenerator.next() + ) async -> [Result] { // this currently allocates a promise for every command. We could collapse this down to one promise - var mpromises: [EventLoopPromise] = [] - mpromises.reserveCapacity(commands.count) + var promises: [EventLoopPromise] = [] + promises.reserveCapacity(commands.count) + var valkeyPromises: [ValkeyPromise] = [] + valkeyPromises.reserveCapacity(commands.count * 2) var encoder = ValkeyCommandEncoder() for command in commands { ASKING().encode(into: &encoder) command.encode(into: &encoder) - mpromises.append(channel.eventLoop.makePromise(of: RESPToken.self)) + promises.append(channel.eventLoop.makePromise(of: RESPToken.self)) + valkeyPromises.append(.forget) + valkeyPromises.append(.nio(promises.last!)) } - let outBuffer = encoder.buffer - let promises = mpromises + + let count = commands.count + return await _execute( + buffer: encoder.buffer, + promises: promises, + valkeyPromises: valkeyPromises + ) { promises in + // get response from channel handler + var results: [Result] = .init() + results.reserveCapacity(count) + for promise in promises { + await results.append(promise.futureResult._result()) + } + return results + } + } + + /// Execute stream of commands written into buffer + /// + /// The function is provided with an array of EventLoopPromises for the responses of commands + /// we care about and an array of valkey promises one for each command + @inlinable + func _execute( + buffer: ByteBuffer, + promises: [EventLoopPromise], + valkeyPromises: [ValkeyPromise], + processResults: sending ([EventLoopPromise]) async -> sending Value + ) async -> Value { + let requestID = Self.requestIDGenerator.next() return await withTaskCancellationHandler { if Task.isCancelled { - for promise in mpromises { + for promise in promises { promise.fail(ValkeyClientError(.cancelled)) } } else { // write directly to channel handler self.channelHandler.write( - request: ValkeyRequest.multiple(buffer: outBuffer, promises: promises.flatMap { [.forget, .nio($0)] }, id: requestID) + request: ValkeyRequest.multiple(buffer: buffer, promises: valkeyPromises, id: requestID) ) } - // get response from channel handler - var results: [Result] = .init() - results.reserveCapacity(commands.count) - for promise in promises { - await results.append(promise.futureResult._result()) - } - return results + + return await processResults(promises) } onCancel: { self.cancel(requestID: requestID) } @@ -539,8 +668,8 @@ struct AutoIncrementingInteger { var value: Int = 0 @inlinable - init() { - self.value = 0 + init(_ value: Int = 0) { + self.value = value } @inlinable @@ -566,3 +695,16 @@ extension ValkeyClientError { } } #endif + +extension Result where Success == RESPToken, Failure == any Error { + @usableFromInline + func convertFromRESP(to: Response.Type) -> Result { + self.flatMap { + do { + return try .success(Response(fromRESP: $0)) + } catch { + return .failure(error) + } + } + } +} diff --git a/Sources/Valkey/Node/ValkeyNodeClient.swift b/Sources/Valkey/Node/ValkeyNodeClient.swift index 83d9edfc..5e673125 100644 --- a/Sources/Valkey/Node/ValkeyNodeClient.swift +++ b/Sources/Valkey/Node/ValkeyNodeClient.swift @@ -146,7 +146,7 @@ extension ValkeyNodeClient { /// - Parameter commands: Parameter pack of ValkeyCommands /// - Returns: Parameter pack holding the results of all the commands @inlinable - public func execute( + func execute( _ commands: repeat each Command ) async -> sending (repeat Result<(each Command).Response, Error>) { do { @@ -171,7 +171,7 @@ extension ValkeyNodeClient { /// - Parameter commands: Collection of ValkeyCommands /// - Returns: Array holding the RESPToken responses of all the commands @inlinable - public func execute( + func execute( _ commands: Commands ) async -> sending [Result] where Commands.Element == any ValkeyCommand { do { @@ -183,6 +183,50 @@ extension ValkeyNodeClient { } } + /// Pipeline a series of commands as a transaction to Valkey connection + /// + /// Another client will never be served in the middle of the execution of these + /// commands. See https://valkey.io/topics/transactions/ for more information. + /// + /// EXEC and MULTI commands are added to the pipelined commands and the output + /// of the EXEC command is transformed into a parameter pack of Results, one + /// for each command. + /// + /// - Parameter commands: Parameter pack of ValkeyCommands + /// - Returns: Parameter pack holding the responses of all the commands + @inlinable + func transaction( + _ commands: repeat each Command + ) async throws -> sending (repeat Result<(each Command).Response, Error>) { + try await self.withConnection { connection in + try await connection.transaction(repeat (each commands)) + } + } + + /// Pipeline a series of commands as a transaction to Valkey connection + /// + /// Another client will never be served in the middle of the execution of these + /// commands. See https://valkey.io/topics/transactions/ for more information. + /// + /// EXEC and MULTI commands are added to the pipelined commands and the output + /// of the EXEC command is transformed into an array of RESPToken Results, one for + /// each command. + /// + /// This is an alternative version of the transaction function ``ValkeyNodeClient/transaction(_:)->(_,_)`` + /// that allows for a collection of ValkeyCommands. It provides more flexibility but the command + /// responses are returned as ``RESPToken`` instead of the response type for the command. + /// + /// - Parameter commands: Collection of ValkeyCommands + /// - Returns: Array holding the RESPToken responses of all the commands + @inlinable + func transaction( + _ commands: Commands + ) async throws -> sending [Result] where Commands.Element == any ValkeyCommand { + try await self.withConnection { connection in + try await connection.transaction(commands) + } + } + /// Internal command used by cluster client, that precedes each command with a ASKING /// command func executeWithAsk( diff --git a/Sources/Valkey/RESP/RESPTokenDecodable.swift b/Sources/Valkey/RESP/RESPTokenDecodable.swift index 8e58c25a..2fa82b82 100644 --- a/Sources/Valkey/RESP/RESPTokenDecodable.swift +++ b/Sources/Valkey/RESP/RESPTokenDecodable.swift @@ -22,26 +22,6 @@ extension RESPToken: RESPTokenDecodable { try Value(fromRESP: self) } - /// Convert RESP3Token to a Result containing the type to convert to or any error found while converting - /// - /// This function also checks for RESP error types and returns them if found - /// - /// - Parameter type: Type to convert to - /// - Returns: Result containing either the Value or an error - @usableFromInline - func decodeResult(as type: Value.Type = Value.self) -> Result { - switch self.identifier { - case .simpleError, .bulkError: - return .failure(ValkeyClientError(.commandError, message: self.errorString.map { Swift.String(buffer: $0) })) - default: - do { - return try .success(Value(fromRESP: self)) - } catch { - return .failure(error) - } - } - } - @inlinable public init(fromRESP token: RESPToken) throws { self = token @@ -359,9 +339,12 @@ extension RESPToken.Array: RESPTokenDecodable { return try (repeat decodeOptionalRESPToken(iterator.next(), as: (each Value).self)) } - /// Convert RESP3Token Array to a tuple of values + /// Convert RESPToken Array to a tuple of values. + /// + /// RESP error tokens are converted into Result.failure. This is used by the transaction + /// code to convert the array response from EXEC into a parameter pack of Results + /// /// - Parameter as: Tuple of types to convert to - /// - Throws: RESPDecodeError /// - Returns: Tuple of decoded values @inlinable public func decodeElementResults( @@ -370,7 +353,16 @@ extension RESPToken.Array: RESPTokenDecodable { func decodeOptionalRESPToken(_ token: RESPToken?, as: T.Type) -> Result { switch token { case .some(let value): - return value.decodeResult(as: T.self) + switch value.identifier { + case .simpleError, .bulkError: + return .failure(ValkeyClientError(.commandError, message: value.errorString.map { Swift.String(buffer: $0) })) + default: + do { + return try .success(T(fromRESP: value)) + } catch { + return .failure(error) + } + } case .none: // TODO: Fixup error when we have a decoding error return .failure(RESPParsingError(code: .unexpectedType, buffer: token?.base ?? .init())) diff --git a/Sources/Valkey/ValkeyClient.swift b/Sources/Valkey/ValkeyClient.swift index 28e648f0..3cd7afa4 100644 --- a/Sources/Valkey/ValkeyClient.swift +++ b/Sources/Valkey/ValkeyClient.swift @@ -217,6 +217,45 @@ extension ValkeyClient { ) async -> sending [Result] where Commands.Element == any ValkeyCommand { await node.execute(commands) } + /// Pipeline a series of commands as a transaction to Valkey connection + /// + /// Another client will never be served in the middle of the execution of these + /// commands. See https://valkey.io/topics/transactions/ for more information. + /// + /// EXEC and MULTI commands are added to the pipelined commands and the output + /// of the EXEC command is transformed into a parameter pack of Results, one + /// for each command. + /// + /// - Parameter commands: Parameter pack of ValkeyCommands + /// - Returns: Parameter pack holding the responses of all the commands + @inlinable + public func transaction( + _ commands: repeat each Command + ) async throws -> sending (repeat Result<(each Command).Response, Error>) { + try await node.transaction(repeat each commands) + } + + /// Pipeline a series of commands as a transaction to Valkey connection + /// + /// Another client will never be served in the middle of the execution of these + /// commands. See https://valkey.io/topics/transactions/ for more information. + /// + /// EXEC and MULTI commands are added to the pipelined commands and the output + /// of the EXEC command is transformed into an array of RESPToken Results, one for + /// each command. + /// + /// This is an alternative version of the transaction function ``ValkeyClient/transaction(_:)->(_,_)`` + /// that allows for a collection of ValkeyCommands. It provides more flexibility but the command + /// responses are returned as ``RESPToken`` instead of the response type for the command. + /// + /// - Parameter commands: Collection of ValkeyCommands + /// - Returns: Array holding the RESPToken responses of all the commands + @inlinable + public func transaction( + _ commands: Commands + ) async throws -> sending [Result] where Commands.Element == any ValkeyCommand { + try await node.transaction(commands) + } } #if ServiceLifecycleSupport diff --git a/Tests/IntegrationTests/ClientIntegrationTests.swift b/Tests/IntegrationTests/ClientIntegrationTests.swift index dd85c74e..2c06abf8 100644 --- a/Tests/IntegrationTests/ClientIntegrationTests.swift +++ b/Tests/IntegrationTests/ClientIntegrationTests.swift @@ -258,9 +258,9 @@ struct ClientIntegratedTests { func testTransactionSetIncrGet() async throws { var logger = Logger(label: "Valkey") logger.logLevel = .debug - try await withValkeyConnection(.hostname(valkeyHostname, port: 6379), logger: logger) { connection in - try await withKey(connection: connection) { key in - let responses = try await connection.transaction( + try await withValkeyClient(.hostname(valkeyHostname, port: 6379), logger: logger) { client in + try await withKey(connection: client) { key in + let responses = try await client.transaction( SET(key, value: "100"), INCR(key), GET(key) @@ -270,6 +270,30 @@ struct ClientIntegratedTests { } } + @Test + @available(valkeySwift 1.0, *) + func testInvalidTransactionSetIncrGet() async throws { + var logger = Logger(label: "Valkey") + logger.logLevel = .debug + try await withValkeyClient(.hostname(valkeyHostname, port: 6379), logger: logger) { client in + try await withKey(connection: client) { key in + try await client.set(key, value: "100") + let responses = try await client.transaction( + LPUSH(key, elements: ["Hello"]), + INCR(key), + GET(key) + ) + let lpushError = #expect(throws: ValkeyClientError.self) { + _ = try responses.0.get() + } + #expect(lpushError?.errorCode == .commandError) + #expect(lpushError?.message?.hasPrefix("WRONGTYPE") == true) + let result = try responses.2.get().map { String(buffer: $0) } + #expect(result == "101") + } + } + } + @Test @available(valkeySwift 1.0, *) func testWatch() async throws { diff --git a/Tests/ValkeyTests/ValkeyConnectionTests.swift b/Tests/ValkeyTests/ValkeyConnectionTests.swift index 939aa1a7..8edf299f 100644 --- a/Tests/ValkeyTests/ValkeyConnectionTests.swift +++ b/Tests/ValkeyTests/ValkeyConnectionTests.swift @@ -290,12 +290,13 @@ struct ConnectionTests { try await channel.writeInbound(RESPToken(.simpleString("QUEUED")).base) try await channel.writeInbound(RESPToken(.simpleError("ERROR")).base) try await channel.writeInbound(RESPToken(.simpleError("EXECABORT")).base) - do { - _ = try await asyncResults - Issue.record("Transaction should throw error") - } catch let error as ValkeyClientError { - #expect(error == ValkeyClientError(.commandError, message: "EXECABORT")) - } + let results = try await asyncResults + var error = #expect(throws: ValkeyClientError.self) { try results.0.get() } + #expect(error?.errorCode == .commandError) + #expect(error?.message == "EXECABORT") + error = #expect(throws: ValkeyClientError.self) { try results.1.get() } + #expect(error?.errorCode == .commandError) + #expect(error?.message == "ERROR") } @Test @@ -325,6 +326,86 @@ struct ConnectionTests { #expect(throws: ValkeyClientError(.commandError, message: "error")) { try results.1.get() } } + @Test + @available(valkeySwift 1.0, *) + func testTransactionArray() async throws { + let channel = NIOAsyncTestingChannel() + let logger = Logger(label: "test") + let connection = try await ValkeyConnection.setupChannelAndConnect(channel, logger: logger) + try await channel.processHello() + + var commands: [any ValkeyCommand] = [] + commands.append(SET("foo", value: "10")) + commands.append(INCR("foo")) + async let results = connection.transaction(commands) + + let outbound = try await channel.waitForOutboundWrite(as: ByteBuffer.self) + var buffer = ByteBuffer() + buffer.writeImmutableBuffer(RESPToken(.command(["MULTI"])).base) + buffer.writeImmutableBuffer(RESPToken(.command(["SET", "foo", "10"])).base) + buffer.writeImmutableBuffer(RESPToken(.command(["INCR", "foo"])).base) + buffer.writeImmutableBuffer(RESPToken(.command(["EXEC"])).base) + #expect(outbound == buffer) + + try await channel.writeInbound(RESPToken(.simpleString("OK")).base) + try await channel.writeInbound(RESPToken(.simpleString("QUEUED")).base) + try await channel.writeInbound(RESPToken(.simpleString("QUEUED")).base) + try await channel.writeInbound(RESPToken(.array([.simpleString("OK"), .number(11)])).base) + + #expect(try await results[1].get().decode(as: Int.self) == 11) + } + + @Test + @available(valkeySwift 1.0, *) + func testTransactionArrayError() async throws { + let channel = NIOAsyncTestingChannel() + let logger = Logger(label: "test") + let connection = try await ValkeyConnection.setupChannelAndConnect(channel, logger: logger) + try await channel.processHello() + + var commands: [any ValkeyCommand] = [] + commands.append(SET("foo", value: "10")) + commands.append(INCR("foo")) + + async let asyncResults = connection.transaction(commands) + + _ = try await channel.waitForOutboundWrite(as: ByteBuffer.self) + try await channel.writeInbound(RESPToken(.simpleString("OK")).base) + try await channel.writeInbound(RESPToken(.simpleString("QUEUED")).base) + try await channel.writeInbound(RESPToken(.simpleError("ERROR")).base) + try await channel.writeInbound(RESPToken(.simpleError("EXECABORT")).base) + let results = try await asyncResults + var error = #expect(throws: ValkeyClientError.self) { try results[0].get() } + #expect(error?.errorCode == .commandError) + #expect(error?.message == "EXECABORT") + error = #expect(throws: ValkeyClientError.self) { try results[1].get() } + #expect(error?.errorCode == .commandError) + #expect(error?.message == "ERROR") + } + + @Test + @available(valkeySwift 1.0, *) + func testTransactionArrayCommandError() async throws { + let channel = NIOAsyncTestingChannel() + let logger = Logger(label: "test") + let connection = try await ValkeyConnection.setupChannelAndConnect(channel, logger: logger) + try await channel.processHello() + + var commands: [any ValkeyCommand] = [] + commands.append(SET("foo", value: "10")) + commands.append(INCR("foo")) + async let asyncResults = connection.transaction(commands) + + _ = try await channel.waitForOutboundWrite(as: ByteBuffer.self) + + try await channel.writeInbound(RESPToken(.simpleString("OK")).base) + try await channel.writeInbound(RESPToken(.simpleString("QUEUED")).base) + try await channel.writeInbound(RESPToken(.simpleString("QUEUED")).base) + try await channel.writeInbound(RESPToken(.array([.simpleString("OK"), .bulkError("error")])).base) + let results = try await asyncResults + #expect(throws: ValkeyClientError(.commandError, message: "error")) { try results[1].get() } + } + @Test @available(valkeySwift 1.0, *) func testCancellation() async throws { diff --git a/dev/generate-transaction-commands.sh b/dev/generate-transaction-commands.sh deleted file mode 100755 index ea84f884..00000000 --- a/dev/generate-transaction-commands.sh +++ /dev/null @@ -1,69 +0,0 @@ -#!/bin/bash -## -## This source file is part of the valkey-swift project -## Copyright (c) 2025 the valkey-swift project authors -## -## See LICENSE.txt for license information -## SPDX-License-Identifier: Apache-2.0 -## - -set -eu - -here="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" - -function genWithoutContextParameter() { - how_many=$1 - - echo "" - - echo " @inlinable" - echo -n " public func transaction(_ c0: C0" - for ((n = 1; n (Result" - for ((n = 1; n" - done - echo ") {" - echo -n " guard let responses = try await self.execute(MULTI(), " - for ((n = 0; n "$here/../Sources/Valkey/Connection/ValkeyConnection+transactions.swift" - -swift format format -i "$here/../Sources/Valkey/Connection/ValkeyConnection+transactions.swift" \ No newline at end of file