Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Sources/NIOCore/IO.swift
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,15 @@ public struct IOError: Swift.Error {
.reason(self.failureDescription)
}

private enum Error {
package enum Error {
#if os(Windows)
case windows(DWORD)
case winsock(CInt)
#endif
case errno(CInt)
}

private let error: Error
package let error: Error

/// The `errno` that was set for the operation.
public var errnoCode: CInt {
Expand Down
4 changes: 4 additions & 0 deletions Sources/NIOCore/SystemCallHelpers.swift
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ internal func syscall<T: FixedWidthInteger>(
case (EWOULDBLOCK, true):
return .wouldBlock(0)
#endif
#if os(Windows)
case (WSAEWOULDBLOCK, true):
return .wouldBlock(0)
#endif
default:
preconditionIsNotUnacceptableErrno(err: err, where: function)
throw IOError(errnoCode: err, reason: function)
Expand Down
6 changes: 5 additions & 1 deletion Sources/NIOPosix/BSDSocketAPIWindows.swift
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,11 @@ extension NIOBSDSocket {
) throws -> NIOBSDSocket.Handle? {
let socket: NIOBSDSocket.Handle = WinSDK.accept(s, addr, addrlen)
if socket == WinSDK.INVALID_SOCKET {
throw IOError(winsock: WSAGetLastError(), reason: "accept")
let lastError = WSAGetLastError()
if lastError == WSAEWOULDBLOCK {
return nil
}
throw IOError(winsock: lastError, reason: "accept")
}
return socket
}
Expand Down
5 changes: 4 additions & 1 deletion Sources/NIOPosix/BaseSocketChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
import Atomics
import NIOConcurrencyHelpers
import NIOCore
#if os(Windows)
import WinSDK
#endif

private struct SocketChannelLifecycleManager {
// MARK: Types
Expand Down Expand Up @@ -1215,7 +1218,7 @@ class BaseSocketChannel<SocketType: BaseSocketProtocol>: SelectableChannel, Chan
/// - err: The `Error` which was thrown by `readFromSocket`.
/// - Returns: `true` if the `Channel` should be closed, `false` otherwise.
func shouldCloseOnReadError(_ err: Error) -> Bool {
true
return true
}

/// Handles an error reported by the selector.
Expand Down
4 changes: 3 additions & 1 deletion Sources/NIOPosix/SelectorGeneric.swift
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,9 @@ internal class Selector<R: Registration> {
@usableFromInline
typealias EventType = WinSDK.pollfd
@usableFromInline
var pollFDs = [WinSDK.pollfd]()
var pollFDs = [pollfd]()
@usableFromInline
var deregisteredFDs = [Bool]()
#else
#error("Unsupported platform, no suitable selector backend (we need kqueue or epoll support)")
#endif
Expand Down
31 changes: 26 additions & 5 deletions Sources/NIOPosix/SelectorWSAPoll.swift
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ extension Selector: _SelectorBackendProtocol {

func initialiseState0() throws {
self.pollFDs.reserveCapacity(16)
self.deregisteredFDs.reserveCapacity(16)
self.lifecycleState = .open
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lifecycle never became open yet

}

func deinitAssertions0() {
Expand Down Expand Up @@ -102,7 +104,7 @@ extension Selector: _SelectorBackendProtocol {
}
} else {
let result = self.pollFDs.withUnsafeMutableBufferPointer { ptr in
WSAPoll(ptr.baseAddress!, UInt32(ptr.count), time)
WSAPoll(ptr.baseAddress!, UInt32(ptr.count), 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not entirely following why this change needed to be made. Can you elaborate this?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. When you initially create the eventloop, there is no FD to poll. So it goes into a SleepEx(INFINITE, true).
  2. When the server FD is added, it wakes up the selector using wakeup0()
  3. This implementation calls QueueUserAPC(..) on the EventLoop's thread
  4. QueueUserAPC wakes up WSAPoll and observes the socket

At this point there are FDs, so instead of SleepEx - the EventLoop calls into WSAPoll for those FDs.

  1. A client connects to the server
  2. Server accepts the socket, registering it to WSAPoll with a minimal set of events (.reset and .error IIRC)
  3. The eventloop is done, and goes into WSAPoll with this minimal set of events
  4. After some back & forth, the client reregister0s itself with more events
  5. WSAPoll is not aware of the change, WSAPoll is still invoked with the previous events subset
  6. Socket I/O is ignored, becase .read and .write were not part of the events

Then..

  1. A new (second) client attaches to the server
  2. It goes through the same flow, except now WSAPoll correctly polls the old socket's new events (read/write)
  3. New socket goes through the same trouble

To remedy this temporarily, I'm waking up WSAPoll every 1ms so we don't completely block I/O for new sockets. But this 100% not something we'll want to stick with of course. I don't yet know how to best tackle this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is WSAPoll not being made aware of the change? That seems like the obvious bug.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WSAPoll cannot be woken up early

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious what the long term solution here would be. I'd love to hear @fabianfett 's thoughts - not sure if he's available again.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, so assuming that statement is true (I haven't checked), the solution here is to have a self-pipe. This allows us to mostly replicate the eventfd pattern on Linux, so that in order to wake up the loop we write to a pipe.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh that's clever! I'll take a look at that

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need the eventFD pattern here. Sadly. QueueUserAPC should be thrown out.

}

if result > 0 {
Expand Down Expand Up @@ -131,6 +133,19 @@ extension Selector: _SelectorBackendProtocol {

try body((SelectorEvent(io: selectorEvent, registration: registration)))
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line often calls deregister0 indirectly, so we effectively can't mutate pollFDs in deregister0

}

// now clean up any deregistered fds
// In reverse order so we don't have to copy elements out of the array
// If we do in in normal order, we'll have to shift all elements after the removed one
for i in self.deregisteredFDs.indices.reversed() {
if self.deregisteredFDs[i] {
// remove this one
let fd = self.pollFDs[i].fd
self.pollFDs.remove(at: i)
self.deregisteredFDs.remove(at: i)
self.registrations.removeValue(forKey: Int(fd))
}
}
Comment on lines +137 to +148
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do the deregister0 work of cleaning up after the polling is done

} else if result == 0 {
// nothing has happened
} else if result == WinSDK.SOCKET_ERROR {
Expand All @@ -149,6 +164,7 @@ extension Selector: _SelectorBackendProtocol {
// that will allow O(1) access here.
let poll = pollfd(fd: UInt64(fileDescriptor), events: interested.wsaPollEvent, revents: 0)
self.pollFDs.append(poll)
self.deregisteredFDs.append(false)
}

func reregister0(
Expand All @@ -158,7 +174,9 @@ extension Selector: _SelectorBackendProtocol {
newInterested: SelectorEventSet,
registrationID: SelectorRegistrationID
) throws {
fatalError("TODO: Unimplemented")
if let index = self.pollFDs.firstIndex(where: { $0.fd == UInt64(fileDescriptor) }) {
self.pollFDs[index].events = newInterested.wsaPollEvent
}
}

func deregister0(
Expand All @@ -167,13 +185,15 @@ extension Selector: _SelectorBackendProtocol {
oldInterested: SelectorEventSet,
registrationID: SelectorRegistrationID
) throws {
fatalError("TODO: Unimplemented")
if let index = self.pollFDs.firstIndex(where: { $0.fd == UInt64(fileDescriptor) }) {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deregistering is just removing the FD. However, we can't just remove it here as we're iterating over the same pollFDs at the same time. deregister0 is called down the stack of try body((SelectorEvent(io: selectorEvent, registration: registration))). So the available indices of pollFDs changes causing a crash

self.deregisteredFDs[index] = true
}
}

func wakeup0() throws {
// will be called from a different thread
let result = try self.myThread.withHandleUnderLock { handle in
QueueUserAPC(wakeupTarget, handle, 0)
let result = try self.myThread.withHandleUnderLock { threadHandle in
return QueueUserAPC(wakeupTarget, threadHandle.handle, 0)
}
if result == 0 {
let errorCode = GetLastError()
Expand All @@ -185,6 +205,7 @@ extension Selector: _SelectorBackendProtocol {

func close0() throws {
self.pollFDs.removeAll()
self.deregisteredFDs.removeAll()
}
}

Expand Down
30 changes: 19 additions & 11 deletions Sources/NIOPosix/SocketChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -383,12 +383,12 @@ final class ServerSocketChannel: BaseSocketChannel<ServerSocket>, @unchecked Sen
}
guard let err = err as? IOError else { return true }

switch err.errnoCode {
case ECONNABORTED,
EMFILE,
ENFILE,
ENOBUFS,
ENOMEM:
switch err.error {
case .errno(ECONNABORTED),
.errno(EMFILE),
.errno(ENFILE),
.errno(ENOBUFS),
.errno(ENOMEM):
// These are errors we may be able to recover from. The user may just want to stop accepting connections for example
// or provide some other means of back-pressure. This could be achieved by a custom ChannelDuplexHandler.
return false
Expand Down Expand Up @@ -856,22 +856,30 @@ final class DatagramChannel: BaseSocketChannel<Socket>, @unchecked Sendable {

private func shouldCloseOnErrnoCode(_ errnoCode: CInt) -> Bool {
switch errnoCode {
case ECONNREFUSED, ENOMEM:
// These are errors we may be able to recover from.
return false
default:
return true
}
}

private func shouldCloseOnError(_ error: IOError.Error) -> Bool {
switch error {
// ECONNREFUSED can happen on linux if the previous sendto(...) failed.
// See also:
// - https://bugzilla.redhat.com/show_bug.cgi?id=1375
// - https://lists.gt.net/linux/kernel/39575
case ECONNREFUSED,
ENOMEM:
// These are errors we may be able to recover from.
return false
case .errno(let code):
return self.shouldCloseOnErrnoCode(code)
default:
return true
}
}

override func shouldCloseOnReadError(_ err: Error) -> Bool {
guard let err = err as? IOError else { return true }
return self.shouldCloseOnErrnoCode(err.errnoCode)
return self.shouldCloseOnError(err.error)
}

override func error() -> ErrorResult {
Expand Down
6 changes: 1 addition & 5 deletions Sources/NIOPosix/Thread.swift
Original file line number Diff line number Diff line change
Expand Up @@ -92,18 +92,14 @@ final class NIOThread: Sendable {

static var currentThreadName: String? {
#if os(Windows)
ThreadOpsSystem.threadName(.init(GetCurrentThread()))
ThreadOpsSystem.threadName(.init(handle: GetCurrentThread()))
#else
ThreadOpsSystem.threadName(.init(handle: pthread_self()))
#endif
}

static var currentThreadID: UInt {
#if os(Windows)
UInt(bitPattern: .init(bitPattern: ThreadOpsSystem.currentThread))
#else
UInt(bitPattern: .init(bitPattern: ThreadOpsSystem.currentThread.handle))
#endif
}

@discardableResult
Expand Down
47 changes: 39 additions & 8 deletions Sources/NIOPosix/ThreadWindows.swift
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ import WinSDK

typealias ThreadOpsSystem = ThreadOpsWindows
enum ThreadOpsWindows: ThreadOps {
typealias ThreadHandle = HANDLE
struct ThreadHandle: @unchecked Sendable {
let handle: HANDLE
}
typealias ThreadSpecificKey = DWORD
typealias ThreadSpecificKeyDestructor = @convention(c) (UnsafeMutableRawPointer?) -> Void

static func threadName(_ thread: ThreadOpsSystem.ThreadHandle) -> String? {
var pszBuffer: PWSTR?
GetThreadDescription(thread, &pszBuffer)
GetThreadDescription(thread.handle, &pszBuffer)
guard let buffer = pszBuffer else { return nil }
let string: String = String(decodingCString: buffer, as: UTF16.self)
LocalFree(buffer)
Expand All @@ -41,11 +43,27 @@ enum ThreadOpsWindows: ThreadOps {
let routine: @convention(c) (UnsafeMutableRawPointer?) -> CUnsignedInt = {
let boxed = Unmanaged<NIOThread.ThreadBox>.fromOpaque($0!).takeRetainedValue()
let (body, name) = (boxed.value.body, boxed.value.name)
let hThread: ThreadOpsSystem.ThreadHandle = GetCurrentThread()

// Get a real thread handle instead of pseudo-handle
var realHandle: HANDLE? = nil
let success = DuplicateHandle(
GetCurrentProcess(), // Source process
GetCurrentThread(), // Source handle (pseudo-handle)
GetCurrentProcess(), // Target process
&realHandle, // Target handle (real handle)
0, // Desired access (0 = same as source)
false, // Inherit handle
DWORD(DUPLICATE_SAME_ACCESS) // Options
)
Comment on lines +48 to +57
Copy link
Author

@Joannis Joannis Nov 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Windows gives pseudo handles by default, so they're always correct for the current thread. However, when spawning work on another thread, this handle results in getting the incorrect ThreadID, causing QueueUserAPC to notify the wrong (or non-existing) thread. This in turn prevents SleepEx from waking up the eventloop


guard success, let realHandle else {
fatalError("DuplicateHandle failed: \(GetLastError())")
}
let hThread = ThreadOpsSystem.ThreadHandle(handle: realHandle)

if let name = name {
_ = name.withCString(encodedAs: UTF16.self) {
SetThreadDescription(hThread, $0)
SetThreadDescription(hThread.handle, $0)
}
}

Expand All @@ -58,15 +76,28 @@ enum ThreadOpsWindows: ThreadOps {
}

static func isCurrentThread(_ thread: ThreadOpsSystem.ThreadHandle) -> Bool {
CompareObjectHandles(thread, GetCurrentThread())
CompareObjectHandles(thread.handle, GetCurrentThread())
}

static var currentThread: ThreadOpsSystem.ThreadHandle {
GetCurrentThread()
var realHandle: HANDLE? = nil
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likewise here

let success = DuplicateHandle(
GetCurrentProcess(),
GetCurrentThread(),
GetCurrentProcess(),
&realHandle,
0,
false,
DWORD(DUPLICATE_SAME_ACCESS)
)
guard success, let realHandle else {
fatalError("DuplicateHandle failed: \(GetLastError())")
}
return ThreadHandle(handle: realHandle)
}

static func joinThread(_ thread: ThreadOpsSystem.ThreadHandle) {
let dwResult: DWORD = WaitForSingleObject(thread, INFINITE)
let dwResult: DWORD = WaitForSingleObject(thread.handle, INFINITE)
assert(dwResult == WAIT_OBJECT_0, "WaitForSingleObject: \(GetLastError())")
}

Expand All @@ -88,7 +119,7 @@ enum ThreadOpsWindows: ThreadOps {
}

static func compareThreads(_ lhs: ThreadOpsSystem.ThreadHandle, _ rhs: ThreadOpsSystem.ThreadHandle) -> Bool {
CompareObjectHandles(lhs, rhs)
CompareObjectHandles(lhs.handle, rhs.handle)
}
}

Expand Down
Loading