From f582555a23e8e92e6f4846115d631601849ccfaa Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Thu, 25 Sep 2025 19:46:09 +0000 Subject: [PATCH 1/2] Rewrite transaction code, to make it easier to move to client Signed-off-by: Adam Fowler --- .../ValkeyConnection+transactions.swift | 292 ------------------ .../Valkey/Connection/ValkeyConnection.swift | 139 ++++++++- Sources/Valkey/Node/ValkeyNodeClient.swift | 48 ++- Sources/Valkey/RESP/RESPTokenDecodable.swift | 41 +-- Sources/Valkey/ValkeyClient.swift | 39 +++ .../ClientIntegrationTests.swift | 30 +- Tests/ValkeyTests/ValkeyConnectionTests.swift | 93 +++++- dev/generate-transaction-commands.sh | 69 ----- 8 files changed, 351 insertions(+), 400 deletions(-) delete mode 100644 Sources/Valkey/Connection/ValkeyConnection+transactions.swift delete mode 100755 dev/generate-transaction-commands.sh diff --git a/Sources/Valkey/Connection/ValkeyConnection+transactions.swift b/Sources/Valkey/Connection/ValkeyConnection+transactions.swift deleted file mode 100644 index 13c03db8..00000000 --- a/Sources/Valkey/Connection/ValkeyConnection+transactions.swift +++ /dev/null @@ -1,292 +0,0 @@ -// -// This source file is part of the valkey-swift project -// Copyright (c) 2025 the valkey-swift project authors -// -// See LICENSE.txt for license information -// SPDX-License-Identifier: Apache-2.0 -// -// NOTE: THIS FILE IS AUTO-GENERATED BY dev/generate-transaction-commands.sh - -import NIOCore - -@available(valkeySwift 1.0, *) -extension ValkeyConnection { - - @inlinable - public func transaction(_ c0: C0) async throws -> (Result) { - guard let responses = try await self.execute(MULTI(), ValkeyRawResponseCommand(c0), EXEC()).2.get() else { - throw ValkeyClientError(.transactionAborted) - } - return responses.decodeElementResults() - } - - @inlinable - public func transaction( - _ c0: C0, - _ c1: C1 - ) async throws -> (Result, Result) { - guard let responses = try await self.execute(MULTI(), ValkeyRawResponseCommand(c0), ValkeyRawResponseCommand(c1), EXEC()).3.get() else { - throw ValkeyClientError(.transactionAborted) - } - return responses.decodeElementResults() - } - - @inlinable - public func transaction( - _ c0: C0, - _ c1: C1, - _ c2: C2 - ) async throws -> (Result, Result, Result) { - guard - let responses = try await self.execute( - MULTI(), - ValkeyRawResponseCommand(c0), - ValkeyRawResponseCommand(c1), - ValkeyRawResponseCommand(c2), - EXEC() - ).4.get() - else { throw ValkeyClientError(.transactionAborted) } - return responses.decodeElementResults() - } - - @inlinable - public func transaction( - _ c0: C0, - _ c1: C1, - _ c2: C2, - _ c3: C3 - ) async throws -> (Result, Result, Result, Result) - { - guard - let responses = try await self.execute( - MULTI(), - ValkeyRawResponseCommand(c0), - ValkeyRawResponseCommand(c1), - ValkeyRawResponseCommand(c2), - ValkeyRawResponseCommand(c3), - EXEC() - ).5.get() - else { throw ValkeyClientError(.transactionAborted) } - return responses.decodeElementResults() - } - - @inlinable - public func transaction( - _ c0: C0, - _ c1: C1, - _ c2: C2, - _ c3: C3, - _ c4: C4 - ) async throws -> ( - Result, Result, Result, Result, - Result - ) { - guard - let responses = try await self.execute( - MULTI(), - ValkeyRawResponseCommand(c0), - ValkeyRawResponseCommand(c1), - ValkeyRawResponseCommand(c2), - ValkeyRawResponseCommand(c3), - ValkeyRawResponseCommand(c4), - EXEC() - ).6.get() - else { throw ValkeyClientError(.transactionAborted) } - return responses.decodeElementResults() - } - - @inlinable - public func transaction( - _ c0: C0, - _ c1: C1, - _ c2: C2, - _ c3: C3, - _ c4: C4, - _ c5: C5 - ) async throws -> ( - Result, Result, Result, Result, - Result, Result - ) { - guard - let responses = try await self.execute( - MULTI(), - ValkeyRawResponseCommand(c0), - ValkeyRawResponseCommand(c1), - ValkeyRawResponseCommand(c2), - ValkeyRawResponseCommand(c3), - ValkeyRawResponseCommand(c4), - ValkeyRawResponseCommand(c5), - EXEC() - ).7.get() - else { throw ValkeyClientError(.transactionAborted) } - return responses.decodeElementResults() - } - - @inlinable - public func transaction< - C0: ValkeyCommand, - C1: ValkeyCommand, - C2: ValkeyCommand, - C3: ValkeyCommand, - C4: ValkeyCommand, - C5: ValkeyCommand, - C6: ValkeyCommand - >( - _ c0: C0, - _ c1: C1, - _ c2: C2, - _ c3: C3, - _ c4: C4, - _ c5: C5, - _ c6: C6 - ) async throws -> ( - Result, Result, Result, Result, - Result, Result, Result - ) { - guard - let responses = try await self.execute( - MULTI(), - ValkeyRawResponseCommand(c0), - ValkeyRawResponseCommand(c1), - ValkeyRawResponseCommand(c2), - ValkeyRawResponseCommand(c3), - ValkeyRawResponseCommand(c4), - ValkeyRawResponseCommand(c5), - ValkeyRawResponseCommand(c6), - EXEC() - ).8.get() - else { throw ValkeyClientError(.transactionAborted) } - return responses.decodeElementResults() - } - - @inlinable - public func transaction< - C0: ValkeyCommand, - C1: ValkeyCommand, - C2: ValkeyCommand, - C3: ValkeyCommand, - C4: ValkeyCommand, - C5: ValkeyCommand, - C6: ValkeyCommand, - C7: ValkeyCommand - >( - _ c0: C0, - _ c1: C1, - _ c2: C2, - _ c3: C3, - _ c4: C4, - _ c5: C5, - _ c6: C6, - _ c7: C7 - ) async throws -> ( - Result, Result, Result, Result, - Result, Result, Result, Result - ) { - guard - let responses = try await self.execute( - MULTI(), - ValkeyRawResponseCommand(c0), - ValkeyRawResponseCommand(c1), - ValkeyRawResponseCommand(c2), - ValkeyRawResponseCommand(c3), - ValkeyRawResponseCommand(c4), - ValkeyRawResponseCommand(c5), - ValkeyRawResponseCommand(c6), - ValkeyRawResponseCommand(c7), - EXEC() - ).9.get() - else { throw ValkeyClientError(.transactionAborted) } - return responses.decodeElementResults() - } - - @inlinable - public func transaction< - C0: ValkeyCommand, - C1: ValkeyCommand, - C2: ValkeyCommand, - C3: ValkeyCommand, - C4: ValkeyCommand, - C5: ValkeyCommand, - C6: ValkeyCommand, - C7: ValkeyCommand, - C8: ValkeyCommand - >( - _ c0: C0, - _ c1: C1, - _ c2: C2, - _ c3: C3, - _ c4: C4, - _ c5: C5, - _ c6: C6, - _ c7: C7, - _ c8: C8 - ) async throws -> ( - Result, Result, Result, Result, - Result, Result, Result, Result, - Result - ) { - guard - let responses = try await self.execute( - MULTI(), - ValkeyRawResponseCommand(c0), - ValkeyRawResponseCommand(c1), - ValkeyRawResponseCommand(c2), - ValkeyRawResponseCommand(c3), - ValkeyRawResponseCommand(c4), - ValkeyRawResponseCommand(c5), - ValkeyRawResponseCommand(c6), - ValkeyRawResponseCommand(c7), - ValkeyRawResponseCommand(c8), - EXEC() - ).10.get() - else { throw ValkeyClientError(.transactionAborted) } - return responses.decodeElementResults() - } - - @inlinable - public func transaction< - C0: ValkeyCommand, - C1: ValkeyCommand, - C2: ValkeyCommand, - C3: ValkeyCommand, - C4: ValkeyCommand, - C5: ValkeyCommand, - C6: ValkeyCommand, - C7: ValkeyCommand, - C8: ValkeyCommand, - C9: ValkeyCommand - >( - _ c0: C0, - _ c1: C1, - _ c2: C2, - _ c3: C3, - _ c4: C4, - _ c5: C5, - _ c6: C6, - _ c7: C7, - _ c8: C8, - _ c9: C9 - ) async throws -> ( - Result, Result, Result, Result, - Result, Result, Result, Result, - Result, Result - ) { - guard - let responses = try await self.execute( - MULTI(), - ValkeyRawResponseCommand(c0), - ValkeyRawResponseCommand(c1), - ValkeyRawResponseCommand(c2), - ValkeyRawResponseCommand(c3), - ValkeyRawResponseCommand(c4), - ValkeyRawResponseCommand(c5), - ValkeyRawResponseCommand(c6), - ValkeyRawResponseCommand(c7), - ValkeyRawResponseCommand(c8), - ValkeyRawResponseCommand(c9), - EXEC() - ).11.get() - else { throw ValkeyClientError(.transactionAborted) } - return responses.decodeElementResults() - } -} diff --git a/Sources/Valkey/Connection/ValkeyConnection.swift b/Sources/Valkey/Connection/ValkeyConnection.swift index 9e0627a2..452933ff 100644 --- a/Sources/Valkey/Connection/ValkeyConnection.swift +++ b/Sources/Valkey/Connection/ValkeyConnection.swift @@ -243,12 +243,82 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable { } } - /// Pipeline a series of commands to Valkey connection + /// Pipeline a series of commands as a transaction to Valkey connection /// - /// Once all the responses for the commands have been received the function returns - /// a parameter pack of Results, one for each command. + /// Another client will never be served in the middle of the execution of these + /// commands. See https://valkey.io/topics/transactions/ for more information. + /// + /// EXEC and MULTI commands are added to the pipelined commands and the output + /// of the EXEC command is transformed into a parameter pack of Results, one + /// for each command. + /// + /// - Parameter commands: Parameter pack of ValkeyCommands + /// - Returns: Parameter pack holding the responses of all the commands + @inlinable + public func transaction( + _ commands: repeat each Command + ) async throws -> sending (repeat Result<(each Command).Response, Error>) { + func replaceSuccessWithError( + response: Response.Type, + result: Result, + error: any Error + ) -> Result { + switch result { + case .failure(let error): + return .failure(error) + case .success: + return .failure(error) + } + } + var encoder = ValkeyCommandEncoder() + var promises: [EventLoopPromise] = [] + MULTI().encode(into: &encoder) + promises.append(channel.eventLoop.makePromise(of: RESPToken.self)) + for command in repeat each commands { + command.encode(into: &encoder) + promises.append(channel.eventLoop.makePromise(of: RESPToken.self)) + } + EXEC().encode(into: &encoder) + promises.append(channel.eventLoop.makePromise(of: RESPToken.self)) + return try await _execute( + buffer: encoder.buffer, + promises: promises, + valkeyPromises: promises.map { .nio($0) } + ) { promises -> sending Result<(repeat Result<(each Command).Response, Error>), Error> in + // get response from channel handler + do { + guard let responses = try await promises.last!.futureResult._result().convertFromRESP(to: EXEC.Response.self).get() else { + return .failure(ValkeyClientError(.transactionAborted)) + } + return .success(responses.decodeElementResults()) + } catch { + // we received an error while running the EXEC command. We return an + // array of results all with errors. If the queuing of a command already + // generated an error then we use that error, otherwise we use the error + // that we have just caught + var index = AutoIncrementingInteger(1) + return await .success( + (repeat + replaceSuccessWithError( + response: (each Command).Response.self, + result: promises[index.next()].futureResult._result(), + error: error + )) + ) + } + }.get() + } + + /// Pipeline a series of commands as a transaction to Valkey connection + /// + /// Another client will never be served in the middle of the execution of these + /// commands. See https://valkey.io/topics/transactions/ for more information. /// - /// This is an alternative version of the pipeline function ``ValkeyConnection/execute(_:)->(_,_)`` + /// EXEC and MULTI commands are added to the pipelined commands and the output + /// of the EXEC command is transformed into an array of RESPToken Results, one for + /// each command. + /// + /// This is an alternative version of the transaction function ``ValkeyConnection/transaction(_:)->(_,_)`` /// that allows for a collection of ValkeyCommands. It provides more flexibility but the command /// responses are returned as ``RESPToken`` instead of the response type for the command. /// @@ -282,6 +352,67 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable { } } + /// Pipeline a series of commands to Valkey connection + /// + /// Once all the responses for the commands have been received the function returns + /// a parameter pack of Results, one for each command. + /// + /// - Parameter commands: Parameter pack of ValkeyCommands + /// - Returns: Parameter pack holding the responses of all the commands + @inlinable + public func transaction( + _ commands: some Collection + ) async throws -> [Result] { + var encoder = ValkeyCommandEncoder() + var promises: [EventLoopPromise] = [] + MULTI().encode(into: &encoder) + promises.append(channel.eventLoop.makePromise(of: RESPToken.self)) + for command in commands { + command.encode(into: &encoder) + promises.append(channel.eventLoop.makePromise(of: RESPToken.self)) + } + EXEC().encode(into: &encoder) + promises.append(channel.eventLoop.makePromise(of: RESPToken.self)) + return try await _execute( + buffer: encoder.buffer, + promises: promises, + valkeyPromises: promises.map { .nio($0) } + ) { promises -> sending Result<[Result], Error> in + do { + guard let responses = try await promises.last!.futureResult._result().convertFromRESP(to: EXEC.Response.self).get() else { + return .failure(ValkeyClientError(.transactionAborted)) + } + return .success( + responses.map { + switch $0.identifier { + case .simpleError, .bulkError: + .failure(ValkeyClientError(.commandError, message: $0.errorString.map { Swift.String(buffer: $0) })) + default: + .success($0) + } + } + ) + } catch { + // we received an error while running the EXEC command. We return an + // array of results all with errors. If the queuing of a command already + // generated an error then we use that error, otherwise we use the error + // that we have just caught + var results: [Result] = .init() + results.reserveCapacity(promises.count - 2) + for promise in promises[1..<(promises.count - 1)] { + let result = await promise.futureResult._result() + switch result { + case .failure: + results.append(result) + case .success: + results.append(.failure(error)) + } + } + return .success(results) + } + }.get() + } + /// Pipeline a series of commands to Valkey connection and precede each command with an ASKING /// command /// diff --git a/Sources/Valkey/Node/ValkeyNodeClient.swift b/Sources/Valkey/Node/ValkeyNodeClient.swift index 256d240c..cc436056 100644 --- a/Sources/Valkey/Node/ValkeyNodeClient.swift +++ b/Sources/Valkey/Node/ValkeyNodeClient.swift @@ -178,7 +178,7 @@ extension ValkeyNodeClient { /// - Parameter commands: Parameter pack of ValkeyCommands /// - Returns: Parameter pack holding the results of all the commands @inlinable - public func execute( + func execute( _ commands: repeat each Command ) async -> sending (repeat Result<(each Command).Response, any Error>) { do { @@ -203,7 +203,7 @@ extension ValkeyNodeClient { /// - Parameter commands: Collection of ValkeyCommands /// - Returns: Array holding the RESPToken responses of all the commands @inlinable - public func execute( + func execute( _ commands: Commands ) async -> sending [Result] where Commands.Element == any ValkeyCommand { do { @@ -215,6 +215,50 @@ extension ValkeyNodeClient { } } + /// Pipeline a series of commands as a transaction to Valkey connection + /// + /// Another client will never be served in the middle of the execution of these + /// commands. See https://valkey.io/topics/transactions/ for more information. + /// + /// EXEC and MULTI commands are added to the pipelined commands and the output + /// of the EXEC command is transformed into a parameter pack of Results, one + /// for each command. + /// + /// - Parameter commands: Parameter pack of ValkeyCommands + /// - Returns: Parameter pack holding the responses of all the commands + @inlinable + func transaction( + _ commands: repeat each Command + ) async throws -> sending (repeat Result<(each Command).Response, Error>) { + try await self.withConnection { connection in + try await connection.transaction(repeat (each commands)) + } + } + + /// Pipeline a series of commands as a transaction to Valkey connection + /// + /// Another client will never be served in the middle of the execution of these + /// commands. See https://valkey.io/topics/transactions/ for more information. + /// + /// EXEC and MULTI commands are added to the pipelined commands and the output + /// of the EXEC command is transformed into an array of RESPToken Results, one for + /// each command. + /// + /// This is an alternative version of the transaction function ``ValkeyNodeClient/transaction(_:)->(_,_)`` + /// that allows for a collection of ValkeyCommands. It provides more flexibility but the command + /// responses are returned as ``RESPToken`` instead of the response type for the command. + /// + /// - Parameter commands: Collection of ValkeyCommands + /// - Returns: Array holding the RESPToken responses of all the commands + @inlinable + func transaction( + _ commands: Commands + ) async throws -> sending [Result] where Commands.Element == any ValkeyCommand { + try await self.withConnection { connection in + try await connection.transaction(commands) + } + } + /// Internal command used by cluster client, that precedes each command with a ASKING /// command func executeWithAsk( diff --git a/Sources/Valkey/RESP/RESPTokenDecodable.swift b/Sources/Valkey/RESP/RESPTokenDecodable.swift index f2341616..4c5b8f13 100644 --- a/Sources/Valkey/RESP/RESPTokenDecodable.swift +++ b/Sources/Valkey/RESP/RESPTokenDecodable.swift @@ -22,26 +22,6 @@ extension RESPToken: RESPTokenDecodable { try Value(fromRESP: self) } - /// Convert RESP3Token to a Result containing the type to convert to or any error found while converting - /// - /// This function also checks for RESP error types and returns them if found - /// - /// - Parameter type: Type to convert to - /// - Returns: Result containing either the Value or an error - @usableFromInline - func decodeResult(as type: Value.Type = Value.self) -> Result { - switch self.identifier { - case .simpleError, .bulkError: - return .failure(ValkeyClientError(.commandError, message: self.errorString.map { Swift.String(buffer: $0) })) - default: - do { - return try .success(Value(fromRESP: self)) - } catch { - return .failure(error) - } - } - } - @inlinable public init(fromRESP token: RESPToken) throws { self = token @@ -358,10 +338,14 @@ extension RESPToken.Array: RESPTokenDecodable { return try (repeat decodeOptionalRESPToken(iterator.next(), as: (each Value).self)) } - /// Convert RESP3Token Array to a tuple of values - /// - Parameter type: Tuple of types to convert to - /// - Throws: RESPDecodeError + /// Convert RESPToken Array to a tuple of values. + /// + /// RESP error tokens are converted into Result.failure. This is used by the transaction + /// code to convert the array response from EXEC into a parameter pack of Results + /// + /// - Parameter as: Tuple of types to convert to /// - Returns: Tuple of decoded values + /// - Throws: RESPDecodeError @inlinable public func decodeElementResults( as type: (repeat (each Value)).Type = (repeat (each Value)).self @@ -369,7 +353,16 @@ extension RESPToken.Array: RESPTokenDecodable { func decodeOptionalRESPToken(_ token: RESPToken?, as: T.Type) -> Result { switch token { case .some(let value): - return value.decodeResult(as: T.self) + switch value.identifier { + case .simpleError, .bulkError: + return .failure(ValkeyClientError(.commandError, message: value.errorString.map { Swift.String(buffer: $0) })) + default: + do { + return try .success(T(fromRESP: value)) + } catch { + return .failure(error) + } + } case .none: return .failure(RESPDecodeError.invalidArraySize(self, expectedSize: self._parameterPackTypeSize(type))) } diff --git a/Sources/Valkey/ValkeyClient.swift b/Sources/Valkey/ValkeyClient.swift index 6bbc4abf..afd0732b 100644 --- a/Sources/Valkey/ValkeyClient.swift +++ b/Sources/Valkey/ValkeyClient.swift @@ -191,6 +191,45 @@ extension ValkeyClient { ) async -> sending [Result] where Commands.Element == any ValkeyCommand { await node.execute(commands) } + /// Pipeline a series of commands as a transaction to Valkey connection + /// + /// Another client will never be served in the middle of the execution of these + /// commands. See https://valkey.io/topics/transactions/ for more information. + /// + /// EXEC and MULTI commands are added to the pipelined commands and the output + /// of the EXEC command is transformed into a parameter pack of Results, one + /// for each command. + /// + /// - Parameter commands: Parameter pack of ValkeyCommands + /// - Returns: Parameter pack holding the responses of all the commands + @inlinable + public func transaction( + _ commands: repeat each Command + ) async throws -> sending (repeat Result<(each Command).Response, Error>) { + try await node.transaction(repeat each commands) + } + + /// Pipeline a series of commands as a transaction to Valkey connection + /// + /// Another client will never be served in the middle of the execution of these + /// commands. See https://valkey.io/topics/transactions/ for more information. + /// + /// EXEC and MULTI commands are added to the pipelined commands and the output + /// of the EXEC command is transformed into an array of RESPToken Results, one for + /// each command. + /// + /// This is an alternative version of the transaction function ``ValkeyClient/transaction(_:)->(_,_)`` + /// that allows for a collection of ValkeyCommands. It provides more flexibility but the command + /// responses are returned as ``RESPToken`` instead of the response type for the command. + /// + /// - Parameter commands: Collection of ValkeyCommands + /// - Returns: Array holding the RESPToken responses of all the commands + @inlinable + public func transaction( + _ commands: Commands + ) async throws -> sending [Result] where Commands.Element == any ValkeyCommand { + try await node.transaction(commands) + } } #if ServiceLifecycleSupport diff --git a/Tests/IntegrationTests/ClientIntegrationTests.swift b/Tests/IntegrationTests/ClientIntegrationTests.swift index 36791d56..b62ba8c5 100644 --- a/Tests/IntegrationTests/ClientIntegrationTests.swift +++ b/Tests/IntegrationTests/ClientIntegrationTests.swift @@ -277,9 +277,9 @@ struct ClientIntegratedTests { func testTransactionSetIncrGet() async throws { var logger = Logger(label: "Valkey") logger.logLevel = .debug - try await withValkeyConnection(.hostname(valkeyHostname, port: 6379), logger: logger) { connection in - try await withKey(connection: connection) { key in - let responses = try await connection.transaction( + try await withValkeyClient(.hostname(valkeyHostname, port: 6379), logger: logger) { client in + try await withKey(connection: client) { key in + let responses = try await client.transaction( SET(key, value: "100"), INCR(key), GET(key) @@ -289,6 +289,30 @@ struct ClientIntegratedTests { } } + @Test + @available(valkeySwift 1.0, *) + func testInvalidTransactionSetIncrGet() async throws { + var logger = Logger(label: "Valkey") + logger.logLevel = .debug + try await withValkeyClient(.hostname(valkeyHostname, port: 6379), logger: logger) { client in + try await withKey(connection: client) { key in + try await client.set(key, value: "100") + let responses = try await client.transaction( + LPUSH(key, elements: ["Hello"]), + INCR(key), + GET(key) + ) + let lpushError = #expect(throws: ValkeyClientError.self) { + _ = try responses.0.get() + } + #expect(lpushError?.errorCode == .commandError) + #expect(lpushError?.message?.hasPrefix("WRONGTYPE") == true) + let result = try responses.2.get().map { String(buffer: $0) } + #expect(result == "101") + } + } + } + @Test @available(valkeySwift 1.0, *) func testWatch() async throws { diff --git a/Tests/ValkeyTests/ValkeyConnectionTests.swift b/Tests/ValkeyTests/ValkeyConnectionTests.swift index 375eaafc..9afb5128 100644 --- a/Tests/ValkeyTests/ValkeyConnectionTests.swift +++ b/Tests/ValkeyTests/ValkeyConnectionTests.swift @@ -291,12 +291,13 @@ struct ConnectionTests { try await channel.writeInbound(RESPToken(.simpleString("QUEUED")).base) try await channel.writeInbound(RESPToken(.simpleError("ERROR")).base) try await channel.writeInbound(RESPToken(.simpleError("EXECABORT")).base) - do { - _ = try await asyncResults - Issue.record("Transaction should throw error") - } catch let error as ValkeyClientError { - #expect(error == ValkeyClientError(.commandError, message: "EXECABORT")) - } + let results = try await asyncResults + var error = #expect(throws: ValkeyClientError.self) { try results.0.get() } + #expect(error?.errorCode == .commandError) + #expect(error?.message == "EXECABORT") + error = #expect(throws: ValkeyClientError.self) { try results.1.get() } + #expect(error?.errorCode == .commandError) + #expect(error?.message == "ERROR") } @Test @@ -326,6 +327,86 @@ struct ConnectionTests { #expect(throws: ValkeyClientError(.commandError, message: "error")) { try results.1.get() } } + @Test + @available(valkeySwift 1.0, *) + func testTransactionArray() async throws { + let channel = NIOAsyncTestingChannel() + let logger = Logger(label: "test") + let connection = try await ValkeyConnection.setupChannelAndConnect(channel, logger: logger) + try await channel.processHello() + + var commands: [any ValkeyCommand] = [] + commands.append(SET("foo", value: "10")) + commands.append(INCR("foo")) + async let results = connection.transaction(commands) + + let outbound = try await channel.waitForOutboundWrite(as: ByteBuffer.self) + var buffer = ByteBuffer() + buffer.writeImmutableBuffer(RESPToken(.command(["MULTI"])).base) + buffer.writeImmutableBuffer(RESPToken(.command(["SET", "foo", "10"])).base) + buffer.writeImmutableBuffer(RESPToken(.command(["INCR", "foo"])).base) + buffer.writeImmutableBuffer(RESPToken(.command(["EXEC"])).base) + #expect(outbound == buffer) + + try await channel.writeInbound(RESPToken(.simpleString("OK")).base) + try await channel.writeInbound(RESPToken(.simpleString("QUEUED")).base) + try await channel.writeInbound(RESPToken(.simpleString("QUEUED")).base) + try await channel.writeInbound(RESPToken(.array([.simpleString("OK"), .number(11)])).base) + + #expect(try await results[1].get().decode(as: Int.self) == 11) + } + + @Test + @available(valkeySwift 1.0, *) + func testTransactionArrayError() async throws { + let channel = NIOAsyncTestingChannel() + let logger = Logger(label: "test") + let connection = try await ValkeyConnection.setupChannelAndConnect(channel, logger: logger) + try await channel.processHello() + + var commands: [any ValkeyCommand] = [] + commands.append(SET("foo", value: "10")) + commands.append(INCR("foo")) + + async let asyncResults = connection.transaction(commands) + + _ = try await channel.waitForOutboundWrite(as: ByteBuffer.self) + try await channel.writeInbound(RESPToken(.simpleString("OK")).base) + try await channel.writeInbound(RESPToken(.simpleString("QUEUED")).base) + try await channel.writeInbound(RESPToken(.simpleError("ERROR")).base) + try await channel.writeInbound(RESPToken(.simpleError("EXECABORT")).base) + let results = try await asyncResults + var error = #expect(throws: ValkeyClientError.self) { try results[0].get() } + #expect(error?.errorCode == .commandError) + #expect(error?.message == "EXECABORT") + error = #expect(throws: ValkeyClientError.self) { try results[1].get() } + #expect(error?.errorCode == .commandError) + #expect(error?.message == "ERROR") + } + + @Test + @available(valkeySwift 1.0, *) + func testTransactionArrayCommandError() async throws { + let channel = NIOAsyncTestingChannel() + let logger = Logger(label: "test") + let connection = try await ValkeyConnection.setupChannelAndConnect(channel, logger: logger) + try await channel.processHello() + + var commands: [any ValkeyCommand] = [] + commands.append(SET("foo", value: "10")) + commands.append(INCR("foo")) + async let asyncResults = connection.transaction(commands) + + _ = try await channel.waitForOutboundWrite(as: ByteBuffer.self) + + try await channel.writeInbound(RESPToken(.simpleString("OK")).base) + try await channel.writeInbound(RESPToken(.simpleString("QUEUED")).base) + try await channel.writeInbound(RESPToken(.simpleString("QUEUED")).base) + try await channel.writeInbound(RESPToken(.array([.simpleString("OK"), .bulkError("error")])).base) + let results = try await asyncResults + #expect(throws: ValkeyClientError(.commandError, message: "error")) { try results[1].get() } + } + @Test @available(valkeySwift 1.0, *) func testCancellation() async throws { diff --git a/dev/generate-transaction-commands.sh b/dev/generate-transaction-commands.sh deleted file mode 100755 index 2f1b1314..00000000 --- a/dev/generate-transaction-commands.sh +++ /dev/null @@ -1,69 +0,0 @@ -#!/bin/bash -## -## This source file is part of the valkey-swift project -## Copyright (c) 2025 the valkey-swift project authors -## -## See LICENSE.txt for license information -## SPDX-License-Identifier: Apache-2.0 -## - -set -eu - -here="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" - -function genWithoutContextParameter() { - how_many=$1 - - echo "" - - echo " @inlinable" - echo -n " public func transaction(_ c0: C0" - for ((n = 1; n (Result" - for ((n = 1; n" - done - echo ") {" - echo -n " guard let responses = try await self.execute(MULTI(), " - for ((n = 0; n "$here/../Sources/Valkey/Connection/ValkeyConnection+transactions.swift" - -swift format format -i "$here/../Sources/Valkey/Connection/ValkeyConnection+transactions.swift" \ No newline at end of file From 5aac66bfac756d535184363f04bf0321ffa80903 Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Sat, 11 Oct 2025 17:21:04 +0100 Subject: [PATCH 2/2] Fix comments after merge Signed-off-by: Adam Fowler --- .../Valkey/Connection/ValkeyConnection.swift | 30 +++++++++++-------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/Sources/Valkey/Connection/ValkeyConnection.swift b/Sources/Valkey/Connection/ValkeyConnection.swift index 452933ff..fead1513 100644 --- a/Sources/Valkey/Connection/ValkeyConnection.swift +++ b/Sources/Valkey/Connection/ValkeyConnection.swift @@ -309,16 +309,12 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable { }.get() } - /// Pipeline a series of commands as a transaction to Valkey connection - /// - /// Another client will never be served in the middle of the execution of these - /// commands. See https://valkey.io/topics/transactions/ for more information. + /// Pipeline a series of commands to Valkey connection /// - /// EXEC and MULTI commands are added to the pipelined commands and the output - /// of the EXEC command is transformed into an array of RESPToken Results, one for - /// each command. + /// Once all the responses for the commands have been received the function returns + /// an array of Results, one for each command. /// - /// This is an alternative version of the transaction function ``ValkeyConnection/transaction(_:)->(_,_)`` + /// This is an alternative version of the pipeline function ``ValkeyConnection/execute(_:)->(_,_)`` /// that allows for a collection of ValkeyCommands. It provides more flexibility but the command /// responses are returned as ``RESPToken`` instead of the response type for the command. /// @@ -352,13 +348,21 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable { } } - /// Pipeline a series of commands to Valkey connection + /// Pipeline a series of commands as a transaction to Valkey connection /// - /// Once all the responses for the commands have been received the function returns - /// a parameter pack of Results, one for each command. + /// Another client will never be served in the middle of the execution of these + /// commands. See https://valkey.io/topics/transactions/ for more information. /// - /// - Parameter commands: Parameter pack of ValkeyCommands - /// - Returns: Parameter pack holding the responses of all the commands + /// EXEC and MULTI commands are added to the pipelined commands and the output + /// of the EXEC command is transformed into an array of RESPToken Results, one for + /// each command. + /// + /// This is an alternative version of the transaction function ``ValkeyConnection/transaction(_:)->(_,_)`` + /// that allows for a collection of ValkeyCommands. It provides more flexibility but the command + /// responses are returned as ``RESPToken`` instead of the response type for the command. + /// + /// - Parameter commands: Collection of ValkeyCommands + /// - Returns: Array holding the RESPToken responses of all the commands @inlinable public func transaction( _ commands: some Collection