From c47bd9bb4b5d277bd294983f4f4d6362b1700536 Mon Sep 17 00:00:00 2001 From: Robert Payne Date: Thu, 1 Dec 2016 20:37:40 +1300 Subject: [PATCH 1/6] Make PostgreSQL use libmill for non-blocking goodness --- Sources/PostgreSQL/Connection.swift | 142 +++++++++++++------- Tests/PostgreSQLTests/PostgreSQLTests.swift | 5 +- 2 files changed, 100 insertions(+), 47 deletions(-) diff --git a/Sources/PostgreSQL/Connection.swift b/Sources/PostgreSQL/Connection.swift index 13e9d65..800e25a 100644 --- a/Sources/PostgreSQL/Connection.swift +++ b/Sources/PostgreSQL/Connection.swift @@ -1,5 +1,6 @@ @_exported import SQL import CLibpq +import CLibvenice import Axis public struct ConnectionError: Error, CustomStringConvertible { @@ -102,6 +103,7 @@ public final class Connection: ConnectionProtocol { public var logger: Logger? private var connection: OpaquePointer? = nil + private var fd: Int32 = -1 public let connectionInfo: ConnectionInfo @@ -128,8 +130,17 @@ public final class Connection: ConnectionProtocol { connectionInfo.password ?? "" ) - if let error = mostRecentError { - throw error + guard PQstatus(connection) == CONNECTION_OK else { + throw mostRecentError ?? ConnectionError(description: "Could not connect to Postgres Server.") + } + + guard PQsetnonblocking(connection, 1) == 0 else { + throw mostRecentError ?? ConnectionError(description: "Could not set to non-blocking mode.") + } + + fd = PQsocket(connection) + guard fd >= 0 else { + throw mostRecentError ?? ConnectionError(description: "Could not get file descriptor.") } } @@ -147,82 +158,123 @@ public final class Connection: ConnectionProtocol { } public func createSavePointNamed(_ name: String) throws { - try execute("SAVEPOINT \(name)", parameters: nil) + try execute("SAVEPOINT ?", parameters: [.string(name)]) } public func rollbackToSavePointNamed(_ name: String) throws { - try execute("ROLLBACK TO SAVEPOINT \(name)", parameters: nil) + try execute("ROLLBACK TO SAVEPOINT ?", parameters: [.string(name)]) } public func releaseSavePointNamed(_ name: String) throws { - try execute("RELEASE SAVEPOINT \(name)", parameters: nil) + try execute("RELEASE SAVEPOINT ?", parameters: [.string(name)]) } @discardableResult public func execute(_ statement: String, parameters: [Value?]?) throws -> Result { - var statement = statement.sqlStringWithEscapedPlaceholdersUsingPrefix("$") { return String($0 + 1) } defer { logger?.debug(statement) } - guard let parameters = parameters else { - guard let resultPointer = PQexec(connection, statement) else { - throw mostRecentError ?? ConnectionError(description: "Empty result") - } - - return try Result(resultPointer) - } - var parameterData = [UnsafePointer?]() var deallocators = [() -> ()]() defer { deallocators.forEach { $0() } } - for parameter in parameters { + if let parameters = parameters { + for parameter in parameters { - guard let value = parameter else { - parameterData.append(nil) - continue + guard let value = parameter else { + parameterData.append(nil) + continue + } + + let data: AnyCollection + switch value { + case .buffer(let value): + data = AnyCollection(value.map { Int8($0) }) + + case .string(let string): + data = AnyCollection(string.utf8CString) + } + + let pointer = UnsafeMutablePointer.allocate(capacity: Int(data.count)) + deallocators.append { + pointer.deallocate(capacity: Int(data.count)) + } + + for (index, byte) in data.enumerated() { + pointer[index] = byte + } + + parameterData.append(pointer) } + } - let data: AnyCollection - switch value { - case .buffer(let value): - data = AnyCollection(value.map { Int8($0) }) + let sendResult: Int32 = parameterData.withUnsafeBufferPointer { buffer in + if buffer.isEmpty { + return PQsendQuery(self.connection, statement) + } else { + return PQsendQueryParams(self.connection, + statement, + Int32(parameterData.count), + nil, + buffer.baseAddress!, + nil, + nil, + 0) + } + } + + guard sendResult == 1 else { + throw mostRecentError ?? ConnectionError(description: "Could not send query.") + } - case .string(let string): - data = AnyCollection(string.utf8CString) + // write query + while true { + mill_fdwait_(fd, FDW_OUT, -1, nil) + let status = PQflush(connection) + guard status >= 0 else { + throw mostRecentError ?? ConnectionError(description: "Could not send query.") } + guard status == 0 else { + continue + } + break + } - let pointer = UnsafeMutablePointer.allocate(capacity: Int(data.count)) - deallocators.append { - pointer.deallocate(capacity: Int(data.count)) + // read response + var lastResult: OpaquePointer? = nil + while true { + guard PQconsumeInput(connection) == 1 else { + throw mostRecentError ?? ConnectionError(description: "Could not send query.") } - for (index, byte) in data.enumerated() { - pointer[index] = byte + guard PQisBusy(connection) == 0 else { + mill_fdwait_(fd, FDW_IN, -1, nil) + continue } - parameterData.append(pointer) - } + guard let result = PQgetResult(connection) else { + break + } - let result: OpaquePointer = try parameterData.withUnsafeBufferPointer { buffer in - guard let result = PQexecParams( - self.connection, - statement, - Int32(parameters.count), - nil, - buffer.isEmpty ? nil : buffer.baseAddress, - nil, - nil, - 0 - ) else { - throw mostRecentError ?? ConnectionError(description: "Empty result") + if lastResult != nil { + PQclear(lastResult!) + lastResult = nil } - return result + + let status = PQresultStatus(result) + guard status == PGRES_COMMAND_OK || status == PGRES_TUPLES_OK else { + throw mostRecentError ?? ConnectionError(description: "Query failed.") + } + + lastResult = result } - return try Result(result) + guard lastResult != nil else { + throw mostRecentError ?? ConnectionError(description: "Query failed.") + } + return try Result(lastResult!) } } diff --git a/Tests/PostgreSQLTests/PostgreSQLTests.swift b/Tests/PostgreSQLTests/PostgreSQLTests.swift index 4226124..000c4e6 100644 --- a/Tests/PostgreSQLTests/PostgreSQLTests.swift +++ b/Tests/PostgreSQLTests/PostgreSQLTests.swift @@ -103,14 +103,15 @@ extension Album: ModelProtocol { // MARK: - Tests public class PostgreSQLTests: XCTestCase { - let connection = try! PostgreSQL.Connection(info: .init(URL(string: "postgres://localhost:5432/swift_test")!)) + var connection: Connection! let logger = Logger(name: "SQL Logger", appenders: [StandardOutputAppender()]) override public func setUp() { super.setUp() - + do { + connection = try! PostgreSQL.Connection(info: .init(URL(string: "postgres://localhost:5432/swift_test")!)) try connection.open() try connection.execute("DROP TABLE IF EXISTS albums") try connection.execute("DROP TABLE IF EXISTS artists") From 0eb0a9b630daf778e3df23890633ce8f8feb129e Mon Sep 17 00:00:00 2001 From: Robert Payne Date: Thu, 1 Dec 2016 20:39:05 +1300 Subject: [PATCH 2/6] Add safety around open/close --- Sources/PostgreSQL/Connection.swift | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/Sources/PostgreSQL/Connection.swift b/Sources/PostgreSQL/Connection.swift index 800e25a..4526919 100644 --- a/Sources/PostgreSQL/Connection.swift +++ b/Sources/PostgreSQL/Connection.swift @@ -120,6 +120,10 @@ public final class Connection: ConnectionProtocol { } public func open() throws { + guard connection == nil else { + throw ConnectionError(description: "Connection already opened.") + } + connection = PQsetdbLogin( connectionInfo.host, String(connectionInfo.port), @@ -153,8 +157,10 @@ public final class Connection: ConnectionProtocol { } public func close() { - PQfinish(connection) - connection = nil + if connection != nil { + PQfinish(connection!) + connection = nil + } } public func createSavePointNamed(_ name: String) throws { From d053c790fba19596c093cf98d476dd5325cbe1aa Mon Sep 17 00:00:00 2001 From: Robert Payne Date: Thu, 1 Dec 2016 22:30:28 +1300 Subject: [PATCH 3/6] Tidy up Postgres Connect to be async --- Sources/PostgreSQL/Connection.swift | 105 +++++++++++++++++++++++----- 1 file changed, 86 insertions(+), 19 deletions(-) diff --git a/Sources/PostgreSQL/Connection.swift b/Sources/PostgreSQL/Connection.swift index 4526919..6073579 100644 --- a/Sources/PostgreSQL/Connection.swift +++ b/Sources/PostgreSQL/Connection.swift @@ -1,4 +1,5 @@ @_exported import SQL +import Foundation import CLibpq import CLibvenice import Axis @@ -17,7 +18,6 @@ public final class Connection: ConnectionProtocol { public var username: String? public var password: String? public var options: String? - public var tty: String? public init?(uri: URL) { do { @@ -41,14 +41,13 @@ public final class Connection: ConnectionProtocol { self.password = uri.password } - public init(host: String, port: Int = 5432, databaseName: String, username: String? = nil, password: String? = nil, options: String? = nil, tty: String? = nil) { + public init(host: String, port: Int = 5432, databaseName: String, username: String? = nil, password: String? = nil, options: String? = nil) { self.host = host self.port = port self.databaseName = databaseName self.username = username self.password = password self.options = options - self.tty = tty } } @@ -124,28 +123,62 @@ public final class Connection: ConnectionProtocol { throw ConnectionError(description: "Connection already opened.") } - connection = PQsetdbLogin( - connectionInfo.host, - String(connectionInfo.port), - connectionInfo.options ?? "", - connectionInfo.tty ?? "", - connectionInfo.databaseName, - connectionInfo.username ?? "", - connectionInfo.password ?? "" - ) - - guard PQstatus(connection) == CONNECTION_OK else { - throw mostRecentError ?? ConnectionError(description: "Could not connect to Postgres Server.") + print("==============> OPEN") + + var components = URLComponents() + components.scheme = "postgres" + components.host = connectionInfo.host + components.port = connectionInfo.port + components.user = connectionInfo.username + components.password = connectionInfo.password + components.path = "/\(connectionInfo.databaseName)" + if let options = connectionInfo.options { + components.queryItems = [URLQueryItem(name: "options", value: options)] } - - guard PQsetnonblocking(connection, 1) == 0 else { - throw mostRecentError ?? ConnectionError(description: "Could not set to non-blocking mode.") + let url = components.url!.absoluteString + + connection = PQconnectStart(url) + + guard connection != nil else { + throw ConnectionError(description: "Could not allocate connection.") } - + + guard PQstatus(connection) != CONNECTION_BAD else { + throw ConnectionError(description: "Could not start connection.") + } + fd = PQsocket(connection) guard fd >= 0 else { throw mostRecentError ?? ConnectionError(description: "Could not get file descriptor.") } + + loop: while true { + let status = PQconnectPoll(connection) + switch status { + case PGRES_POLLING_OK: + break loop + case PGRES_POLLING_READING: + mill_fdwait_(fd, FDW_IN, 15.seconds.fromNow().int64milliseconds, nil) + mill_fdclean_(fd) + case PGRES_POLLING_WRITING: + mill_fdwait_(fd, FDW_OUT, 15.seconds.fromNow().int64milliseconds, nil) + mill_fdclean_(fd) + case PGRES_POLLING_ACTIVE: + break + case PGRES_POLLING_FAILED: + throw mostRecentError ?? ConnectionError(description: "Could not connect to Postgres Server.") + default: + break + } + } + + guard PQsetnonblocking(connection, 1) == 0 else { + throw mostRecentError ?? ConnectionError(description: "Could not set to non-blocking mode.") + } + + guard PQstatus(connection) == CONNECTION_OK else { + throw mostRecentError ?? ConnectionError(description: "Could not connect to Postgres Server.") + } } public var mostRecentError: ConnectionError? { @@ -239,6 +272,7 @@ public final class Connection: ConnectionProtocol { // write query while true { mill_fdwait_(fd, FDW_OUT, -1, nil) + mill_fdclean_(fd) let status = PQflush(connection) guard status >= 0 else { throw mostRecentError ?? ConnectionError(description: "Could not send query.") @@ -258,6 +292,7 @@ public final class Connection: ConnectionProtocol { guard PQisBusy(connection) == 0 else { mill_fdwait_(fd, FDW_IN, -1, nil) + mill_fdclean_(fd) continue } @@ -284,3 +319,35 @@ public final class Connection: ConnectionProtocol { return try Result(lastResult!) } } + +extension Collection where Iterator.Element == String { + + func withUnsafeCStringArray(_ body: (UnsafePointer?>) throws -> T) rethrows -> T { + var pointers: [UnsafePointer?] = [] + var deallocators: [() -> ()] = [] + defer { + for deallocator in deallocators { + deallocator() + } + } + + for string in self { + string.utf8CString.withUnsafeBufferPointer { + let count = $0.count + if count > 0 { + let copy = UnsafeMutablePointer.allocate(capacity: count) + deallocators.append { copy.deallocate(capacity: count) } + memcpy(copy, $0.baseAddress!, count) + pointers.append(copy) + } else { + pointers.append(nil) + } + } + } + + return try pointers.withUnsafeBufferPointer { + try body($0.baseAddress!) + } + } + +} From eda562f896c510fb03d34845dbee323a081d1330 Mon Sep 17 00:00:00 2001 From: Robert Payne Date: Thu, 1 Dec 2016 22:46:06 +1300 Subject: [PATCH 4/6] Remove unecessary print --- Sources/PostgreSQL/Connection.swift | 2 -- 1 file changed, 2 deletions(-) diff --git a/Sources/PostgreSQL/Connection.swift b/Sources/PostgreSQL/Connection.swift index 6073579..00d7622 100644 --- a/Sources/PostgreSQL/Connection.swift +++ b/Sources/PostgreSQL/Connection.swift @@ -123,8 +123,6 @@ public final class Connection: ConnectionProtocol { throw ConnectionError(description: "Connection already opened.") } - print("==============> OPEN") - var components = URLComponents() components.scheme = "postgres" components.host = connectionInfo.host From 912fe0d26871b6bf30ec4bc67f0ad2c40cba6b78 Mon Sep 17 00:00:00 2001 From: Dan Appel Date: Thu, 22 Dec 2016 15:52:18 -0800 Subject: [PATCH 5/6] Make it build with swift 3.0.2 --- .swift-version | 2 +- Package.swift | 3 ++- Sources/PostgreSQL/Connection.swift | 16 ++++++++-------- 3 files changed, 11 insertions(+), 10 deletions(-) diff --git a/.swift-version b/.swift-version index 9f55b2c..b502146 100644 --- a/.swift-version +++ b/.swift-version @@ -1 +1 @@ -3.0 +3.0.2 diff --git a/Package.swift b/Package.swift index d64feb4..7d91fb2 100644 --- a/Package.swift +++ b/Package.swift @@ -4,6 +4,7 @@ let package = Package( name: "PostgreSQL", dependencies: [ .Package(url: "https://github.com/Zewo/CLibpq.git", majorVersion: 0, minor: 13), - .Package(url: "https://github.com/Zewo/SQL.git", majorVersion: 0, minor: 14) + .Package(url: "https://github.com/Zewo/SQL.git", majorVersion: 0, minor: 14), + .Package(url: "https://github.com/Zewo/CLibvenice.git", majorVersion: 0, minor: 14), ] ) diff --git a/Sources/PostgreSQL/Connection.swift b/Sources/PostgreSQL/Connection.swift index 00d7622..df8e9b3 100644 --- a/Sources/PostgreSQL/Connection.swift +++ b/Sources/PostgreSQL/Connection.swift @@ -156,11 +156,11 @@ public final class Connection: ConnectionProtocol { case PGRES_POLLING_OK: break loop case PGRES_POLLING_READING: - mill_fdwait_(fd, FDW_IN, 15.seconds.fromNow().int64milliseconds, nil) - mill_fdclean_(fd) + mill_fdwait(fd, FDW_IN, 15.seconds.fromNow().int64milliseconds, nil) + fdclean(fd) case PGRES_POLLING_WRITING: - mill_fdwait_(fd, FDW_OUT, 15.seconds.fromNow().int64milliseconds, nil) - mill_fdclean_(fd) + mill_fdwait(fd, FDW_OUT, 15.seconds.fromNow().int64milliseconds, nil) + fdclean(fd) case PGRES_POLLING_ACTIVE: break case PGRES_POLLING_FAILED: @@ -269,8 +269,8 @@ public final class Connection: ConnectionProtocol { // write query while true { - mill_fdwait_(fd, FDW_OUT, -1, nil) - mill_fdclean_(fd) + mill_fdwait(fd, FDW_OUT, -1, nil) + fdclean(fd) let status = PQflush(connection) guard status >= 0 else { throw mostRecentError ?? ConnectionError(description: "Could not send query.") @@ -289,8 +289,8 @@ public final class Connection: ConnectionProtocol { } guard PQisBusy(connection) == 0 else { - mill_fdwait_(fd, FDW_IN, -1, nil) - mill_fdclean_(fd) + mill_fdwait(fd, FDW_IN, -1, nil) + fdclean(fd) continue } From 7f6b3cf620b62ca011bff9e1291a647e2350622b Mon Sep 17 00:00:00 2001 From: Dan Appel Date: Thu, 22 Dec 2016 16:34:05 -0800 Subject: [PATCH 6/6] Remove unused code --- Sources/PostgreSQL/Connection.swift | 32 ----------------------------- 1 file changed, 32 deletions(-) diff --git a/Sources/PostgreSQL/Connection.swift b/Sources/PostgreSQL/Connection.swift index df8e9b3..1c76b3d 100644 --- a/Sources/PostgreSQL/Connection.swift +++ b/Sources/PostgreSQL/Connection.swift @@ -317,35 +317,3 @@ public final class Connection: ConnectionProtocol { return try Result(lastResult!) } } - -extension Collection where Iterator.Element == String { - - func withUnsafeCStringArray(_ body: (UnsafePointer?>) throws -> T) rethrows -> T { - var pointers: [UnsafePointer?] = [] - var deallocators: [() -> ()] = [] - defer { - for deallocator in deallocators { - deallocator() - } - } - - for string in self { - string.utf8CString.withUnsafeBufferPointer { - let count = $0.count - if count > 0 { - let copy = UnsafeMutablePointer.allocate(capacity: count) - deallocators.append { copy.deallocate(capacity: count) } - memcpy(copy, $0.baseAddress!, count) - pointers.append(copy) - } else { - pointers.append(nil) - } - } - } - - return try pointers.withUnsafeBufferPointer { - try body($0.baseAddress!) - } - } - -}