diff --git a/Sources/NIOCore/AddressedEnvelope.swift b/Sources/NIOCore/AddressedEnvelope.swift index 47b167a766..9b6a9f7243 100644 --- a/Sources/NIOCore/AddressedEnvelope.swift +++ b/Sources/NIOCore/AddressedEnvelope.swift @@ -39,15 +39,24 @@ public struct AddressedEnvelope { /// Details of any congestion state. public var ecnState: NIOExplicitCongestionNotificationState public var packetInfo: NIOPacketInfo? + public var fd: Int? public init(ecnState: NIOExplicitCongestionNotificationState) { self.ecnState = ecnState self.packetInfo = nil + self.fd = nil } public init(ecnState: NIOExplicitCongestionNotificationState, packetInfo: NIOPacketInfo?) { self.ecnState = ecnState self.packetInfo = packetInfo + self.fd = nil + } + + public init(ecnState: NIOExplicitCongestionNotificationState, fd: Int?) { + self.ecnState = ecnState + self.packetInfo = nil + self.fd = fd } } } diff --git a/Sources/NIOPosix/BaseStreamSocketChannel.swift b/Sources/NIOPosix/BaseStreamSocketChannel.swift index 9c915b1a29..29c567ba2b 100644 --- a/Sources/NIOPosix/BaseStreamSocketChannel.swift +++ b/Sources/NIOPosix/BaseStreamSocketChannel.swift @@ -161,7 +161,24 @@ class BaseStreamSocketChannel: BaseSocketChannel } final override func writeToSocket() throws -> OverallWriteResult { + print(#fileID, #function) let result = try self.pendingWrites.triggerAppropriateWriteOperations( + writeMessage: { ptr, destinationPtr, destinationSize, controlBytes in + fatalError("writeMessage") + guard let ctrlBytes = controlBytes else { + return .processed(0) + } + var s = UnsafeControlMessageStorage.allocate(msghdrCount: 1) + var cb = UnsafeOutboundControlBytes(controlBytes: s[0]) + cb.appendExplicitCongestionState(metadata: ctrlBytes, protocolFamily: .unix) + let controlMessageBytePointer = cb.validControlBytes + return try self.socket.sendmsg( + pointer: ptr, + destinationPtr: destinationPtr, + destinationSize: destinationSize, + controlBytes: controlMessageBytePointer + ) + }, scalarBufferWriteOperation: { ptr in guard ptr.count > 0 else { // No need to call write if the buffer is empty. @@ -294,6 +311,13 @@ class BaseStreamSocketChannel: BaseSocketChannel return } + if let envelope = self.tryUnwrapData(data, as: AddressedEnvelope.self) { + if self.pendingWrites.add(envelope: envelope, promise: promise) { + self.pipeline.syncOperations.fireChannelWritabilityChanged() + } + return + } + let data = self.unwrapData(data, as: IOData.self) if !self.pendingWrites.add(data: data, promise: promise) { diff --git a/Sources/NIOPosix/PendingDatagramWritesManager.swift b/Sources/NIOPosix/PendingDatagramWritesManager.swift index 3be3764409..4554dedbd0 100644 --- a/Sources/NIOPosix/PendingDatagramWritesManager.swift +++ b/Sources/NIOPosix/PendingDatagramWritesManager.swift @@ -365,7 +365,7 @@ private struct PendingDatagramWritesState { case .some(let e): // The compiler can't prove this, but it must be so. assert(self.pendingWrites.distance(from: e, to: self.pendingWrites.startIndex) == 0) - return .scalarBufferWrite + return .scalarBufferWrite(withMetaData: self.pendingWrites.first!.metadata == nil) default: return .nothingToBeWritten } diff --git a/Sources/NIOPosix/PendingWritesManager.swift b/Sources/NIOPosix/PendingWritesManager.swift index c6adbc093e..e0396e7fee 100644 --- a/Sources/NIOPosix/PendingWritesManager.swift +++ b/Sources/NIOPosix/PendingWritesManager.swift @@ -19,6 +19,7 @@ import NIOCore private struct PendingStreamWrite { var data: IOData var promise: Optional> + var metadata: AddressedEnvelope.Metadata? } /// Write result is `.couldNotWriteEverything` but we have no more writes to perform. @@ -301,7 +302,7 @@ private struct PendingStreamWritesState { case 1: switch self.pendingWrites.first!.data { case .byteBuffer: - return .scalarBufferWrite + return .scalarBufferWrite(withMetaData: self.pendingWrites.first!.metadata != nil) case .fileRegion: return .scalarFileWrite } @@ -314,7 +315,7 @@ private struct PendingStreamWritesState { case (.byteBuffer, .byteBuffer): return .vectorBufferWrite case (.byteBuffer, .fileRegion): - return .scalarBufferWrite + return .scalarBufferWrite(withMetaData: self.pendingWrites.first!.metadata != nil) case (.fileRegion, _): return .scalarFileWrite } @@ -322,7 +323,7 @@ private struct PendingStreamWritesState { } } -/// This class manages the writing of pending writes to stream sockets. The state is held in a `PendingWritesState` +/// This class manages the writing of pending writes to stream sockets. The state is held in a `PendingStreamWritesState` /// value. The most important purpose of this object is to call `write`, `writev` or `sendfile` depending on the /// currently pending writes. final class PendingStreamWritesManager: PendingWritesManager { @@ -370,7 +371,10 @@ final class PendingStreamWritesManager: PendingWritesManager { func add(data: IOData, promise: EventLoopPromise?) -> Bool { assert(self.isOpen) self.state.append(PendingStreamWrite(data: data, promise: promise)) + return _add() + } + private func _add() -> Bool { if self.state.bytes > waterMark.high && channelWritabilityFlag.compareExchange(expected: true, desired: false, ordering: .relaxed).exchanged { @@ -381,6 +385,15 @@ final class PendingStreamWritesManager: PendingWritesManager { return true } + func add(envelope: AddressedEnvelope, promise: EventLoopPromise?) -> Bool { + assert(self.isOpen) + print(#function, envelope.metadata) + self.state.append( + PendingStreamWrite(data: .byteBuffer(envelope.data), promise: promise, metadata: envelope.metadata) + ) + return _add() + } + /// Returns the best mechanism to write pending data at the current point in time. var currentBestWriteMechanism: WriteMechanism { self.state.currentBestWriteMechanism @@ -395,14 +408,22 @@ final class PendingStreamWritesManager: PendingWritesManager { /// - scalarFileWriteOperation: An operation that writes a region of a file descriptor (usually `sendfile`). /// - Returns: The `OneWriteOperationResult` and whether the `Channel` is now writable. func triggerAppropriateWriteOperations( + writeMessage: ( + UnsafeRawBufferPointer, UnsafePointer?, socklen_t, AddressedEnvelope.Metadata? + ) throws -> IOResult, scalarBufferWriteOperation: (UnsafeRawBufferPointer) throws -> IOResult, vectorBufferWriteOperation: (UnsafeBufferPointer) throws -> IOResult, scalarFileWriteOperation: (CInt, Int, Int) throws -> IOResult ) throws -> OverallWriteResult { try self.triggerWriteOperations { writeMechanism in + print(#function, writeMechanism) switch writeMechanism { - case .scalarBufferWrite: - return try triggerScalarBufferWrite({ try scalarBufferWriteOperation($0) }) + case .scalarBufferWrite(let metaData): + if metaData { + return try _triggerScalarBufferWrite(scalarWriteOperation: { try writeMessage($0, $1, $2, $3) }) + } else { + return try triggerScalarBufferWrite({ try scalarBufferWriteOperation($0) }) + } case .vectorBufferWrite: return try triggerVectorBufferWrite({ try vectorBufferWriteOperation($0) }) case .scalarFileWrite: @@ -451,6 +472,15 @@ final class PendingStreamWritesManager: PendingWritesManager { } } + private func _triggerScalarBufferWrite( + scalarWriteOperation: ( + UnsafeRawBufferPointer, UnsafePointer?, socklen_t, AddressedEnvelope.Metadata? + ) throws -> IOResult + ) rethrows -> OneWriteOperationResult { + fatalError("\(#function) We made it") + self.didWrite(itemCount: 1, result: .processed(1)) + } + /// Trigger a write of a single `FileRegion` (usually using `sendfile(2)`). /// /// - Parameters: @@ -607,7 +637,7 @@ final class PendingStreamWritesManager: PendingWritesManager { } internal enum WriteMechanism { - case scalarBufferWrite + case scalarBufferWrite(withMetaData: Bool) case vectorBufferWrite case scalarFileWrite case nothingToBeWritten diff --git a/Tests/NIOPosixTests/ChannelTests.swift b/Tests/NIOPosixTests/ChannelTests.swift index bc0edb086f..cc9422a24a 100644 --- a/Tests/NIOPosixTests/ChannelTests.swift +++ b/Tests/NIOPosixTests/ChannelTests.swift @@ -293,6 +293,9 @@ final class ChannelTests: XCTestCase { var multiState = 0 var fileState = 0 let result = try pwm.triggerAppropriateWriteOperations( + writeMessage: { (_, _, _, _) in + return .processed(0) + }, scalarBufferWriteOperation: { buf in defer { singleState += 1