From 28a2333da3fcf0348e74666f0ae797781bd7cb30 Mon Sep 17 00:00:00 2001 From: zane <39070793+zaneenders@users.noreply.github.com> Date: Mon, 21 Jul 2025 06:42:17 -0600 Subject: [PATCH 1/3] Getting started. --- Sources/NIOPosix/PendingWritesManager.swift | 37 ++++++++++++++++++++- 1 file changed, 36 insertions(+), 1 deletion(-) diff --git a/Sources/NIOPosix/PendingWritesManager.swift b/Sources/NIOPosix/PendingWritesManager.swift index c6adbc093e..fe3edc4900 100644 --- a/Sources/NIOPosix/PendingWritesManager.swift +++ b/Sources/NIOPosix/PendingWritesManager.swift @@ -16,9 +16,43 @@ import Atomics import CNIOLinux import NIOCore +public struct MetadataEnvelope { + public var data: ByteBuffer + public var controlData: MessageMetadata? = nil + + public init(data: ByteBuffer) { + self.data = data + } + + public init(data: ByteBuffer, controlData: MessageMetadata?) { + self.data = data + self.controlData = controlData + } + + /// Any metadata associated with an + public struct MessageMetadata: Hashable, Sendable { + init() { + var m: msghdr = .init() + // Create buffer for file descriptor + let fake_fd = 6666 + let fd_pointer = UnsafeMutableRawPointer(bitPattern: fake_fd) + m.msg_control = fd_pointer + m.msg_controllen = MemoryLayout.size(ofValue: fake_fd) + // TODO other platoforms + let cm: UnsafeMutablePointer = CNIOLinux_CMSG_FIRSTHDR(&m) + cm.pointee.cmsg_level = SOL_SOCKET + cm.pointee.cmsg_type = Int32(SCM_RIGHTS) + cm.pointee.cmsg_len = MemoryLayout.size(ofValue: fake_fd) + + // TODO send `m` + } + } +} + private struct PendingStreamWrite { var data: IOData var promise: Optional> + var metadata: MetadataEnvelope? } /// Write result is `.couldNotWriteEverything` but we have no more writes to perform. @@ -322,7 +356,7 @@ private struct PendingStreamWritesState { } } -/// This class manages the writing of pending writes to stream sockets. The state is held in a `PendingWritesState` +/// This class manages the writing of pending writes to stream sockets. The state is held in a `PendingStreamWritesState` /// value. The most important purpose of this object is to call `write`, `writev` or `sendfile` depending on the /// currently pending writes. final class PendingStreamWritesManager: PendingWritesManager { @@ -400,6 +434,7 @@ final class PendingStreamWritesManager: PendingWritesManager { scalarFileWriteOperation: (CInt, Int, Int) throws -> IOResult ) throws -> OverallWriteResult { try self.triggerWriteOperations { writeMechanism in + // TODO: add with metadata calls. switch writeMechanism { case .scalarBufferWrite: return try triggerScalarBufferWrite({ try scalarBufferWriteOperation($0) }) From 9c080d112fc7dea7ab7e8ab58398772eac54cfb7 Mon Sep 17 00:00:00 2001 From: zane <39070793+zaneenders@users.noreply.github.com> Date: Sat, 26 Jul 2025 22:46:23 -0600 Subject: [PATCH 2/3] Add AddressedEnvelope to PendingWritesManager. --- Sources/NIOCore/AddressedEnvelope.swift | 9 ++++ .../NIOPosix/BaseStreamSocketChannel.swift | 10 ++++ Sources/NIOPosix/PendingWritesManager.swift | 47 +++++-------------- 3 files changed, 32 insertions(+), 34 deletions(-) diff --git a/Sources/NIOCore/AddressedEnvelope.swift b/Sources/NIOCore/AddressedEnvelope.swift index 47b167a766..9b6a9f7243 100644 --- a/Sources/NIOCore/AddressedEnvelope.swift +++ b/Sources/NIOCore/AddressedEnvelope.swift @@ -39,15 +39,24 @@ public struct AddressedEnvelope { /// Details of any congestion state. public var ecnState: NIOExplicitCongestionNotificationState public var packetInfo: NIOPacketInfo? + public var fd: Int? public init(ecnState: NIOExplicitCongestionNotificationState) { self.ecnState = ecnState self.packetInfo = nil + self.fd = nil } public init(ecnState: NIOExplicitCongestionNotificationState, packetInfo: NIOPacketInfo?) { self.ecnState = ecnState self.packetInfo = packetInfo + self.fd = nil + } + + public init(ecnState: NIOExplicitCongestionNotificationState, fd: Int?) { + self.ecnState = ecnState + self.packetInfo = nil + self.fd = fd } } } diff --git a/Sources/NIOPosix/BaseStreamSocketChannel.swift b/Sources/NIOPosix/BaseStreamSocketChannel.swift index 9c915b1a29..ff495eac45 100644 --- a/Sources/NIOPosix/BaseStreamSocketChannel.swift +++ b/Sources/NIOPosix/BaseStreamSocketChannel.swift @@ -161,6 +161,9 @@ class BaseStreamSocketChannel: BaseSocketChannel } final override func writeToSocket() throws -> OverallWriteResult { + print(#fileID, #function) + // TODO: self.socket.sendmmsg(msgs: ) + // TODO: self.socket.sendmsg(pointer: , destinationPtr: UnsafePointer?, destinationSize: socklen_t, controlBytes: UnsafeMutableRawBufferPointer) let result = try self.pendingWrites.triggerAppropriateWriteOperations( scalarBufferWriteOperation: { ptr in guard ptr.count > 0 else { @@ -294,6 +297,13 @@ class BaseStreamSocketChannel: BaseSocketChannel return } + if let envelope = self.tryUnwrapData(data, as: AddressedEnvelope.self) { + if self.pendingWrites.add(envelope: envelope, promise: promise) { + self.pipeline.syncOperations.fireChannelWritabilityChanged() + } + return + } + let data = self.unwrapData(data, as: IOData.self) if !self.pendingWrites.add(data: data, promise: promise) { diff --git a/Sources/NIOPosix/PendingWritesManager.swift b/Sources/NIOPosix/PendingWritesManager.swift index fe3edc4900..766cb934e3 100644 --- a/Sources/NIOPosix/PendingWritesManager.swift +++ b/Sources/NIOPosix/PendingWritesManager.swift @@ -16,43 +16,10 @@ import Atomics import CNIOLinux import NIOCore -public struct MetadataEnvelope { - public var data: ByteBuffer - public var controlData: MessageMetadata? = nil - - public init(data: ByteBuffer) { - self.data = data - } - - public init(data: ByteBuffer, controlData: MessageMetadata?) { - self.data = data - self.controlData = controlData - } - - /// Any metadata associated with an - public struct MessageMetadata: Hashable, Sendable { - init() { - var m: msghdr = .init() - // Create buffer for file descriptor - let fake_fd = 6666 - let fd_pointer = UnsafeMutableRawPointer(bitPattern: fake_fd) - m.msg_control = fd_pointer - m.msg_controllen = MemoryLayout.size(ofValue: fake_fd) - // TODO other platoforms - let cm: UnsafeMutablePointer = CNIOLinux_CMSG_FIRSTHDR(&m) - cm.pointee.cmsg_level = SOL_SOCKET - cm.pointee.cmsg_type = Int32(SCM_RIGHTS) - cm.pointee.cmsg_len = MemoryLayout.size(ofValue: fake_fd) - - // TODO send `m` - } - } -} - private struct PendingStreamWrite { var data: IOData var promise: Optional> - var metadata: MetadataEnvelope? + var metadata: AddressedEnvelope.Metadata? } /// Write result is `.couldNotWriteEverything` but we have no more writes to perform. @@ -404,7 +371,10 @@ final class PendingStreamWritesManager: PendingWritesManager { func add(data: IOData, promise: EventLoopPromise?) -> Bool { assert(self.isOpen) self.state.append(PendingStreamWrite(data: data, promise: promise)) + return _add() + } + private func _add() -> Bool { if self.state.bytes > waterMark.high && channelWritabilityFlag.compareExchange(expected: true, desired: false, ordering: .relaxed).exchanged { @@ -415,6 +385,14 @@ final class PendingStreamWritesManager: PendingWritesManager { return true } + func add(envelope: AddressedEnvelope, promise: EventLoopPromise?) -> Bool { + assert(self.isOpen) + self.state.append( + PendingStreamWrite(data: .byteBuffer(envelope.data), promise: promise, metadata: envelope.metadata) + ) + return _add() + } + /// Returns the best mechanism to write pending data at the current point in time. var currentBestWriteMechanism: WriteMechanism { self.state.currentBestWriteMechanism @@ -434,6 +412,7 @@ final class PendingStreamWritesManager: PendingWritesManager { scalarFileWriteOperation: (CInt, Int, Int) throws -> IOResult ) throws -> OverallWriteResult { try self.triggerWriteOperations { writeMechanism in + print(#function, writeMechanism) // TODO: add with metadata calls. switch writeMechanism { case .scalarBufferWrite: From 2e50c688009cd88c3014adf4c72ae319e574d6cf Mon Sep 17 00:00:00 2001 From: zane <39070793+zaneenders@users.noreply.github.com> Date: Sun, 10 Aug 2025 05:29:29 -0600 Subject: [PATCH 3/3] progress --- .../NIOPosix/BaseStreamSocketChannel.swift | 18 ++++++++++-- .../PendingDatagramWritesManager.swift | 2 +- Sources/NIOPosix/PendingWritesManager.swift | 28 +++++++++++++++---- Tests/NIOPosixTests/ChannelTests.swift | 3 ++ 4 files changed, 42 insertions(+), 9 deletions(-) diff --git a/Sources/NIOPosix/BaseStreamSocketChannel.swift b/Sources/NIOPosix/BaseStreamSocketChannel.swift index ff495eac45..29c567ba2b 100644 --- a/Sources/NIOPosix/BaseStreamSocketChannel.swift +++ b/Sources/NIOPosix/BaseStreamSocketChannel.swift @@ -162,9 +162,23 @@ class BaseStreamSocketChannel: BaseSocketChannel final override func writeToSocket() throws -> OverallWriteResult { print(#fileID, #function) - // TODO: self.socket.sendmmsg(msgs: ) - // TODO: self.socket.sendmsg(pointer: , destinationPtr: UnsafePointer?, destinationSize: socklen_t, controlBytes: UnsafeMutableRawBufferPointer) let result = try self.pendingWrites.triggerAppropriateWriteOperations( + writeMessage: { ptr, destinationPtr, destinationSize, controlBytes in + fatalError("writeMessage") + guard let ctrlBytes = controlBytes else { + return .processed(0) + } + var s = UnsafeControlMessageStorage.allocate(msghdrCount: 1) + var cb = UnsafeOutboundControlBytes(controlBytes: s[0]) + cb.appendExplicitCongestionState(metadata: ctrlBytes, protocolFamily: .unix) + let controlMessageBytePointer = cb.validControlBytes + return try self.socket.sendmsg( + pointer: ptr, + destinationPtr: destinationPtr, + destinationSize: destinationSize, + controlBytes: controlMessageBytePointer + ) + }, scalarBufferWriteOperation: { ptr in guard ptr.count > 0 else { // No need to call write if the buffer is empty. diff --git a/Sources/NIOPosix/PendingDatagramWritesManager.swift b/Sources/NIOPosix/PendingDatagramWritesManager.swift index 3be3764409..4554dedbd0 100644 --- a/Sources/NIOPosix/PendingDatagramWritesManager.swift +++ b/Sources/NIOPosix/PendingDatagramWritesManager.swift @@ -365,7 +365,7 @@ private struct PendingDatagramWritesState { case .some(let e): // The compiler can't prove this, but it must be so. assert(self.pendingWrites.distance(from: e, to: self.pendingWrites.startIndex) == 0) - return .scalarBufferWrite + return .scalarBufferWrite(withMetaData: self.pendingWrites.first!.metadata == nil) default: return .nothingToBeWritten } diff --git a/Sources/NIOPosix/PendingWritesManager.swift b/Sources/NIOPosix/PendingWritesManager.swift index 766cb934e3..e0396e7fee 100644 --- a/Sources/NIOPosix/PendingWritesManager.swift +++ b/Sources/NIOPosix/PendingWritesManager.swift @@ -302,7 +302,7 @@ private struct PendingStreamWritesState { case 1: switch self.pendingWrites.first!.data { case .byteBuffer: - return .scalarBufferWrite + return .scalarBufferWrite(withMetaData: self.pendingWrites.first!.metadata != nil) case .fileRegion: return .scalarFileWrite } @@ -315,7 +315,7 @@ private struct PendingStreamWritesState { case (.byteBuffer, .byteBuffer): return .vectorBufferWrite case (.byteBuffer, .fileRegion): - return .scalarBufferWrite + return .scalarBufferWrite(withMetaData: self.pendingWrites.first!.metadata != nil) case (.fileRegion, _): return .scalarFileWrite } @@ -387,6 +387,7 @@ final class PendingStreamWritesManager: PendingWritesManager { func add(envelope: AddressedEnvelope, promise: EventLoopPromise?) -> Bool { assert(self.isOpen) + print(#function, envelope.metadata) self.state.append( PendingStreamWrite(data: .byteBuffer(envelope.data), promise: promise, metadata: envelope.metadata) ) @@ -407,16 +408,22 @@ final class PendingStreamWritesManager: PendingWritesManager { /// - scalarFileWriteOperation: An operation that writes a region of a file descriptor (usually `sendfile`). /// - Returns: The `OneWriteOperationResult` and whether the `Channel` is now writable. func triggerAppropriateWriteOperations( + writeMessage: ( + UnsafeRawBufferPointer, UnsafePointer?, socklen_t, AddressedEnvelope.Metadata? + ) throws -> IOResult, scalarBufferWriteOperation: (UnsafeRawBufferPointer) throws -> IOResult, vectorBufferWriteOperation: (UnsafeBufferPointer) throws -> IOResult, scalarFileWriteOperation: (CInt, Int, Int) throws -> IOResult ) throws -> OverallWriteResult { try self.triggerWriteOperations { writeMechanism in print(#function, writeMechanism) - // TODO: add with metadata calls. switch writeMechanism { - case .scalarBufferWrite: - return try triggerScalarBufferWrite({ try scalarBufferWriteOperation($0) }) + case .scalarBufferWrite(let metaData): + if metaData { + return try _triggerScalarBufferWrite(scalarWriteOperation: { try writeMessage($0, $1, $2, $3) }) + } else { + return try triggerScalarBufferWrite({ try scalarBufferWriteOperation($0) }) + } case .vectorBufferWrite: return try triggerVectorBufferWrite({ try vectorBufferWriteOperation($0) }) case .scalarFileWrite: @@ -465,6 +472,15 @@ final class PendingStreamWritesManager: PendingWritesManager { } } + private func _triggerScalarBufferWrite( + scalarWriteOperation: ( + UnsafeRawBufferPointer, UnsafePointer?, socklen_t, AddressedEnvelope.Metadata? + ) throws -> IOResult + ) rethrows -> OneWriteOperationResult { + fatalError("\(#function) We made it") + self.didWrite(itemCount: 1, result: .processed(1)) + } + /// Trigger a write of a single `FileRegion` (usually using `sendfile(2)`). /// /// - Parameters: @@ -621,7 +637,7 @@ final class PendingStreamWritesManager: PendingWritesManager { } internal enum WriteMechanism { - case scalarBufferWrite + case scalarBufferWrite(withMetaData: Bool) case vectorBufferWrite case scalarFileWrite case nothingToBeWritten diff --git a/Tests/NIOPosixTests/ChannelTests.swift b/Tests/NIOPosixTests/ChannelTests.swift index bc0edb086f..cc9422a24a 100644 --- a/Tests/NIOPosixTests/ChannelTests.swift +++ b/Tests/NIOPosixTests/ChannelTests.swift @@ -293,6 +293,9 @@ final class ChannelTests: XCTestCase { var multiState = 0 var fileState = 0 let result = try pwm.triggerAppropriateWriteOperations( + writeMessage: { (_, _, _, _) in + return .processed(0) + }, scalarBufferWriteOperation: { buf in defer { singleState += 1