Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions Sources/NIOCore/AddressedEnvelope.swift
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,24 @@ public struct AddressedEnvelope<DataType> {
/// Details of any congestion state.
public var ecnState: NIOExplicitCongestionNotificationState
public var packetInfo: NIOPacketInfo?
public var fd: Int?

public init(ecnState: NIOExplicitCongestionNotificationState) {
self.ecnState = ecnState
self.packetInfo = nil
self.fd = nil
}

public init(ecnState: NIOExplicitCongestionNotificationState, packetInfo: NIOPacketInfo?) {
self.ecnState = ecnState
self.packetInfo = packetInfo
self.fd = nil
}

public init(ecnState: NIOExplicitCongestionNotificationState, fd: Int?) {
self.ecnState = ecnState
self.packetInfo = nil
self.fd = fd
}
}
}
Expand Down
24 changes: 24 additions & 0 deletions Sources/NIOPosix/BaseStreamSocketChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,24 @@ class BaseStreamSocketChannel<Socket: SocketProtocol>: BaseSocketChannel<Socket>
}

final override func writeToSocket() throws -> OverallWriteResult {
print(#fileID, #function)
let result = try self.pendingWrites.triggerAppropriateWriteOperations(
writeMessage: { ptr, destinationPtr, destinationSize, controlBytes in
fatalError("writeMessage")
guard let ctrlBytes = controlBytes else {
return .processed(0)
}
var s = UnsafeControlMessageStorage.allocate(msghdrCount: 1)
var cb = UnsafeOutboundControlBytes(controlBytes: s[0])
cb.appendExplicitCongestionState(metadata: ctrlBytes, protocolFamily: .unix)
let controlMessageBytePointer = cb.validControlBytes
return try self.socket.sendmsg(
pointer: ptr,
destinationPtr: destinationPtr,
destinationSize: destinationSize,
controlBytes: controlMessageBytePointer
)
},
scalarBufferWriteOperation: { ptr in
guard ptr.count > 0 else {
// No need to call write if the buffer is empty.
Expand Down Expand Up @@ -294,6 +311,13 @@ class BaseStreamSocketChannel<Socket: SocketProtocol>: BaseSocketChannel<Socket>
return
}

if let envelope = self.tryUnwrapData(data, as: AddressedEnvelope<ByteBuffer>.self) {
if self.pendingWrites.add(envelope: envelope, promise: promise) {
self.pipeline.syncOperations.fireChannelWritabilityChanged()
}
return
}

let data = self.unwrapData(data, as: IOData.self)

if !self.pendingWrites.add(data: data, promise: promise) {
Expand Down
2 changes: 1 addition & 1 deletion Sources/NIOPosix/PendingDatagramWritesManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ private struct PendingDatagramWritesState {
case .some(let e):
// The compiler can't prove this, but it must be so.
assert(self.pendingWrites.distance(from: e, to: self.pendingWrites.startIndex) == 0)
return .scalarBufferWrite
return .scalarBufferWrite(withMetaData: self.pendingWrites.first!.metadata == nil)
default:
return .nothingToBeWritten
}
Expand Down
42 changes: 36 additions & 6 deletions Sources/NIOPosix/PendingWritesManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import NIOCore
private struct PendingStreamWrite {
var data: IOData
var promise: Optional<EventLoopPromise<Void>>
var metadata: AddressedEnvelope<ByteBuffer>.Metadata?
}

/// Write result is `.couldNotWriteEverything` but we have no more writes to perform.
Expand Down Expand Up @@ -301,7 +302,7 @@ private struct PendingStreamWritesState {
case 1:
switch self.pendingWrites.first!.data {
case .byteBuffer:
return .scalarBufferWrite
return .scalarBufferWrite(withMetaData: self.pendingWrites.first!.metadata != nil)
case .fileRegion:
return .scalarFileWrite
}
Expand All @@ -314,15 +315,15 @@ private struct PendingStreamWritesState {
case (.byteBuffer, .byteBuffer):
return .vectorBufferWrite
case (.byteBuffer, .fileRegion):
return .scalarBufferWrite
return .scalarBufferWrite(withMetaData: self.pendingWrites.first!.metadata != nil)
case (.fileRegion, _):
return .scalarFileWrite
}
}
}
}

/// This class manages the writing of pending writes to stream sockets. The state is held in a `PendingWritesState`
/// This class manages the writing of pending writes to stream sockets. The state is held in a `PendingStreamWritesState`
/// value. The most important purpose of this object is to call `write`, `writev` or `sendfile` depending on the
/// currently pending writes.
final class PendingStreamWritesManager: PendingWritesManager {
Expand Down Expand Up @@ -370,7 +371,10 @@ final class PendingStreamWritesManager: PendingWritesManager {
func add(data: IOData, promise: EventLoopPromise<Void>?) -> Bool {
assert(self.isOpen)
self.state.append(PendingStreamWrite(data: data, promise: promise))
return _add()
}

private func _add() -> Bool {
if self.state.bytes > waterMark.high
&& channelWritabilityFlag.compareExchange(expected: true, desired: false, ordering: .relaxed).exchanged
{
Expand All @@ -381,6 +385,15 @@ final class PendingStreamWritesManager: PendingWritesManager {
return true
}

func add(envelope: AddressedEnvelope<ByteBuffer>, promise: EventLoopPromise<Void>?) -> Bool {
assert(self.isOpen)
print(#function, envelope.metadata)
self.state.append(
PendingStreamWrite(data: .byteBuffer(envelope.data), promise: promise, metadata: envelope.metadata)
)
return _add()
}

/// Returns the best mechanism to write pending data at the current point in time.
var currentBestWriteMechanism: WriteMechanism {
self.state.currentBestWriteMechanism
Expand All @@ -395,14 +408,22 @@ final class PendingStreamWritesManager: PendingWritesManager {
/// - scalarFileWriteOperation: An operation that writes a region of a file descriptor (usually `sendfile`).
/// - Returns: The `OneWriteOperationResult` and whether the `Channel` is now writable.
func triggerAppropriateWriteOperations(
writeMessage: (
UnsafeRawBufferPointer, UnsafePointer<sockaddr>?, socklen_t, AddressedEnvelope<ByteBuffer>.Metadata?
) throws -> IOResult<Int>,
scalarBufferWriteOperation: (UnsafeRawBufferPointer) throws -> IOResult<Int>,
vectorBufferWriteOperation: (UnsafeBufferPointer<IOVector>) throws -> IOResult<Int>,
scalarFileWriteOperation: (CInt, Int, Int) throws -> IOResult<Int>
) throws -> OverallWriteResult {
try self.triggerWriteOperations { writeMechanism in
print(#function, writeMechanism)
switch writeMechanism {
case .scalarBufferWrite:
return try triggerScalarBufferWrite({ try scalarBufferWriteOperation($0) })
case .scalarBufferWrite(let metaData):
if metaData {
return try _triggerScalarBufferWrite(scalarWriteOperation: { try writeMessage($0, $1, $2, $3) })
} else {
return try triggerScalarBufferWrite({ try scalarBufferWriteOperation($0) })
}
case .vectorBufferWrite:
return try triggerVectorBufferWrite({ try vectorBufferWriteOperation($0) })
case .scalarFileWrite:
Expand Down Expand Up @@ -451,6 +472,15 @@ final class PendingStreamWritesManager: PendingWritesManager {
}
}

private func _triggerScalarBufferWrite(
scalarWriteOperation: (
UnsafeRawBufferPointer, UnsafePointer<sockaddr>?, socklen_t, AddressedEnvelope<ByteBuffer>.Metadata?
) throws -> IOResult<Int>
) rethrows -> OneWriteOperationResult {
fatalError("\(#function) We made it")
self.didWrite(itemCount: 1, result: .processed(1))
}

/// Trigger a write of a single `FileRegion` (usually using `sendfile(2)`).
///
/// - Parameters:
Expand Down Expand Up @@ -607,7 +637,7 @@ final class PendingStreamWritesManager: PendingWritesManager {
}

internal enum WriteMechanism {
case scalarBufferWrite
case scalarBufferWrite(withMetaData: Bool)
case vectorBufferWrite
case scalarFileWrite
case nothingToBeWritten
Expand Down
3 changes: 3 additions & 0 deletions Tests/NIOPosixTests/ChannelTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,9 @@ final class ChannelTests: XCTestCase {
var multiState = 0
var fileState = 0
let result = try pwm.triggerAppropriateWriteOperations(
writeMessage: { (_, _, _, _) in
return .processed(0)
},
scalarBufferWriteOperation: { buf in
defer {
singleState += 1
Expand Down