From f1c8629dce3a70fb8f208d9a7fafb8c96d5485de Mon Sep 17 00:00:00 2001 From: Roman Lavrov Date: Wed, 1 Oct 2025 16:06:47 -0400 Subject: [PATCH 1/4] Prototype pipe handling without dispatch IO --- .../JSONRPCConnection.swift | 99 ++++++++----------- 1 file changed, 43 insertions(+), 56 deletions(-) diff --git a/Sources/LanguageServerProtocolJSONRPC/JSONRPCConnection.swift b/Sources/LanguageServerProtocolJSONRPC/JSONRPCConnection.swift index c2c1c6965..cade54ca8 100644 --- a/Sources/LanguageServerProtocolJSONRPC/JSONRPCConnection.swift +++ b/Sources/LanguageServerProtocolJSONRPC/JSONRPCConnection.swift @@ -58,7 +58,7 @@ public final class JSONRPCConnection: Connection { /// The queue on which we send data. private let sendQueue: DispatchQueue = DispatchQueue(label: "jsonrpc-send-queue", qos: .userInitiated) - private let receiveIO: DispatchIO + private let inFD: FileHandle private let sendIO: DispatchIO private let messageRegistry: MessageRegistry @@ -86,7 +86,6 @@ public final class JSONRPCConnection: Connection { /// Buffer of received bytes that haven't been parsed. /// /// Access to this must be be guaranteed to be sequential to avoid data races. Currently, all access are - /// - The `receiveIO` handler: This is synchronized on `queue`. /// - `requestBufferIsEmpty`: Also synchronized on `queue`. private nonisolated(unsafe) var requestBuffer: [UInt8] = [] @@ -140,23 +139,12 @@ public final class JSONRPCConnection: Connection { #if os(Windows) let rawInFD = dispatch_fd_t(bitPattern: inFD._handle) + self.inFD = inFD #else let rawInFD = inFD.fileDescriptor #endif ioGroup.enter() - receiveIO = DispatchIO( - type: .stream, - fileDescriptor: rawInFD, - queue: queue, - cleanupHandler: { (error: Int32) in - if error != 0 { - logger.fault("IO error \(error)") - } - ioGroup.leave() - } - ) - #if os(Windows) let rawOutFD = dispatch_fd_t(bitPattern: outFD._handle) #else @@ -188,10 +176,6 @@ public final class JSONRPCConnection: Connection { } } - // We cannot assume the client will send us bytes in packets of any particular size, so set the lower limit to 1. - receiveIO.setLimit(lowWater: 1) - receiveIO.setLimit(highWater: Int.max) - sendIO.setLimit(lowWater: 1) sendIO.setLimit(highWater: Int.max) } @@ -288,52 +272,56 @@ public final class JSONRPCConnection: Connection { receiveHandler: MessageHandler, closeHandler: @escaping @Sendable () async -> Void = {} ) { + var fd: FileHandle? queue.sync { precondition(state == .created) state = .running self.receiveHandler = receiveHandler self.closeHandler = closeHandler + fd = self.inFD + } - receiveIO.read(offset: 0, length: Int.max, queue: queue) { done, data, errorCode in - guard errorCode == 0 else { - #if !os(Windows) - if errorCode != POSIXError.ECANCELED.rawValue { - logger.fault("IO error reading \(errorCode)") - } - #endif - if done { self.closeAssumingOnQueue() } - return - } - - if done { - self.closeAssumingOnQueue() - return - } - - guard let data = data, !data.isEmpty else { - return - } - - orLog("Writing input mirror file") { - try self.inputMirrorFile?.write(contentsOf: data) - } - - // Parse and handle any messages in `buffer + data`, leaving any remaining unparsed bytes in `buffer`. - if self.requestBuffer.isEmpty { - data.withUnsafeBytes { (pointer: UnsafePointer) in - let rest = self.parseAndHandleMessages(from: UnsafeBufferPointer(start: pointer, count: data.count)) - self.requestBuffer.append(contentsOf: rest) - } + func reader() async { + var done = false + while !done { + if let data1 = try? fd!.read(upToCount: 1) { + let ad = fd!.availableData + let data = data1 + ad + + queue.sync { + orLog("Writing input mirror file") { + try self.inputMirrorFile?.write(contentsOf: data) + } + + // Parse and handle any messages in `buffer + data`, leaving any remaining unparsed bytes in `buffer`. + if self.requestBuffer.isEmpty { + data.withUnsafeBytes { (pointer: UnsafePointer) in + let rest = self.parseAndHandleMessages(from: UnsafeBufferPointer(start: pointer, count: data.count)) + self.requestBuffer.append(contentsOf: rest) + } + } else { + self.requestBuffer.append(contentsOf: data) + var unused = 0 + self.requestBuffer.withUnsafeBufferPointer { buffer in + let rest = self.parseAndHandleMessages(from: buffer) + unused = rest.count + } + self.requestBuffer.removeFirst(self.requestBuffer.count - unused) + } + } } else { - self.requestBuffer.append(contentsOf: data) - var unused = 0 - self.requestBuffer.withUnsafeBufferPointer { buffer in - let rest = self.parseAndHandleMessages(from: buffer) - unused = rest.count - } - self.requestBuffer.removeFirst(self.requestBuffer.count - unused) + done = true + return } } + + queue.sync { + self.closeAssumingOnQueue() + } + } + + Task.detached { + await reader() } } @@ -660,7 +648,6 @@ public final class JSONRPCConnection: Connection { logger.log("Closing JSONRPCConnection...") // Attempt to close the reader immediately; we do not need to accept remaining inputs. - receiveIO.close(flags: .stop) // Close the writer after it finishes outstanding work. sendIO.close() } From cef0d4ebc1a0dd924aac49ac0e08006f76cef745 Mon Sep 17 00:00:00 2001 From: Roman Lavrov Date: Tue, 7 Oct 2025 15:32:52 -0400 Subject: [PATCH 2/4] Switch to readabilityHandler --- .../JSONRPCConnection.swift | 66 ++++++++----------- 1 file changed, 26 insertions(+), 40 deletions(-) diff --git a/Sources/LanguageServerProtocolJSONRPC/JSONRPCConnection.swift b/Sources/LanguageServerProtocolJSONRPC/JSONRPCConnection.swift index cade54ca8..fd6e5aaf0 100644 --- a/Sources/LanguageServerProtocolJSONRPC/JSONRPCConnection.swift +++ b/Sources/LanguageServerProtocolJSONRPC/JSONRPCConnection.swift @@ -272,56 +272,42 @@ public final class JSONRPCConnection: Connection { receiveHandler: MessageHandler, closeHandler: @escaping @Sendable () async -> Void = {} ) { - var fd: FileHandle? queue.sync { precondition(state == .created) state = .running self.receiveHandler = receiveHandler self.closeHandler = closeHandler - fd = self.inFD } - func reader() async { - var done = false - while !done { - if let data1 = try? fd!.read(upToCount: 1) { - let ad = fd!.availableData - let data = data1 + ad - - queue.sync { - orLog("Writing input mirror file") { - try self.inputMirrorFile?.write(contentsOf: data) - } - - // Parse and handle any messages in `buffer + data`, leaving any remaining unparsed bytes in `buffer`. - if self.requestBuffer.isEmpty { - data.withUnsafeBytes { (pointer: UnsafePointer) in - let rest = self.parseAndHandleMessages(from: UnsafeBufferPointer(start: pointer, count: data.count)) - self.requestBuffer.append(contentsOf: rest) - } - } else { - self.requestBuffer.append(contentsOf: data) - var unused = 0 - self.requestBuffer.withUnsafeBufferPointer { buffer in - let rest = self.parseAndHandleMessages(from: buffer) - unused = rest.count - } - self.requestBuffer.removeFirst(self.requestBuffer.count - unused) - } - } - } else { - done = true - return - } + self.inFD.readabilityHandler = { fileHandle in + let data = fileHandle.availableData + if data.isEmpty { + fileHandle.readabilityHandler = nil + self.close() + return } - queue.sync { - self.closeAssumingOnQueue() - } - } + self.queue.sync { + orLog("Writing input mirror file") { + try self.inputMirrorFile?.write(contentsOf: data) + } - Task.detached { - await reader() + // Parse and handle any messages in `buffer + data`, leaving any remaining unparsed bytes in `buffer`. + if self.requestBuffer.isEmpty { + data.withUnsafeBytes { (pointer: UnsafePointer) in + let rest = self.parseAndHandleMessages(from: UnsafeBufferPointer(start: pointer, count: data.count)) + self.requestBuffer.append(contentsOf: rest) + } + } else { + self.requestBuffer.append(contentsOf: data) + var unused = 0 + self.requestBuffer.withUnsafeBufferPointer { buffer in + let rest = self.parseAndHandleMessages(from: buffer) + unused = rest.count + } + self.requestBuffer.removeFirst(self.requestBuffer.count - unused) + } + } } } From a4dfcf16ce38f9357c3173ae016d6ea76ac9ecf1 Mon Sep 17 00:00:00 2001 From: Roman Lavrov Date: Tue, 7 Oct 2025 15:58:25 -0400 Subject: [PATCH 3/4] Use readData instead of availableData Similarly to: https://github.com/swiftlang/swift-package-manager/pull/8047 --- Sources/LanguageServerProtocolJSONRPC/JSONRPCConnection.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/LanguageServerProtocolJSONRPC/JSONRPCConnection.swift b/Sources/LanguageServerProtocolJSONRPC/JSONRPCConnection.swift index fd6e5aaf0..bed2a6cc1 100644 --- a/Sources/LanguageServerProtocolJSONRPC/JSONRPCConnection.swift +++ b/Sources/LanguageServerProtocolJSONRPC/JSONRPCConnection.swift @@ -280,7 +280,7 @@ public final class JSONRPCConnection: Connection { } self.inFD.readabilityHandler = { fileHandle in - let data = fileHandle.availableData + let data = (try? fileHandle.read(upToCount: Int.max)) ?? Data() if data.isEmpty { fileHandle.readabilityHandler = nil self.close() From 877622866884958f7d3b31a50ac98391b21c7cb2 Mon Sep 17 00:00:00 2001 From: Roman Lavrov Date: Thu, 16 Oct 2025 13:34:34 -0400 Subject: [PATCH 4/4] Handle both sides as well as close() --- .../JSONRPCConnection.swift | 71 ++++++++----------- 1 file changed, 28 insertions(+), 43 deletions(-) diff --git a/Sources/LanguageServerProtocolJSONRPC/JSONRPCConnection.swift b/Sources/LanguageServerProtocolJSONRPC/JSONRPCConnection.swift index bed2a6cc1..aca61c595 100644 --- a/Sources/LanguageServerProtocolJSONRPC/JSONRPCConnection.swift +++ b/Sources/LanguageServerProtocolJSONRPC/JSONRPCConnection.swift @@ -59,7 +59,8 @@ public final class JSONRPCConnection: Connection { private let sendQueue: DispatchQueue = DispatchQueue(label: "jsonrpc-send-queue", qos: .userInitiated) private let inFD: FileHandle - private let sendIO: DispatchIO + private let outFD: FileHandle + let ioGroup: DispatchGroup private let messageRegistry: MessageRegistry /// If non-nil, all input received by this `JSONRPCConnection` will be written to the file handle @@ -135,34 +136,12 @@ public final class JSONRPCConnection: Connection { state = .created self.messageRegistry = messageRegistry - let ioGroup = DispatchGroup() + self.ioGroup = DispatchGroup() - #if os(Windows) - let rawInFD = dispatch_fd_t(bitPattern: inFD._handle) self.inFD = inFD - #else - let rawInFD = inFD.fileDescriptor - #endif - - ioGroup.enter() - #if os(Windows) - let rawOutFD = dispatch_fd_t(bitPattern: outFD._handle) - #else - let rawOutFD = outFD.fileDescriptor - #endif - - ioGroup.enter() - sendIO = DispatchIO( - type: .stream, - fileDescriptor: rawOutFD, - queue: sendQueue, - cleanupHandler: { (error: Int32) in - if error != 0 { - logger.fault("IO error \(error)") - } - ioGroup.leave() - } - ) + self.outFD = outFD + + self.ioGroup.enter() ioGroup.notify(queue: queue) { [weak self] in guard let self else { return } @@ -175,9 +154,6 @@ public final class JSONRPCConnection: Connection { await self.closeHandler?() } } - - sendIO.setLimit(lowWater: 1) - sendIO.setLimit(highWater: Int.max) } /// Creates and starts a `JSONRPCConnection` that connects to a subprocess launched with the specified arguments. @@ -280,10 +256,12 @@ public final class JSONRPCConnection: Connection { } self.inFD.readabilityHandler = { fileHandle in - let data = (try? fileHandle.read(upToCount: Int.max)) ?? Data() + let data = fileHandle.availableData if data.isEmpty { fileHandle.readabilityHandler = nil - self.close() + self.queue.async { + self.closeAssumingOnQueue() + } return } @@ -528,16 +506,16 @@ public final class JSONRPCConnection: Connection { orLog("Writing output mirror file") { try outputMirrorFile?.write(contentsOf: dispatchData) } - sendIO.write(offset: 0, data: dispatchData, queue: sendQueue) { [weak self] done, _, errorCode in - if errorCode != 0 { - logger.fault("IO error sending message \(errorCode)") - if done, let self { - // An unrecoverable error occurs on the channel’s file descriptor. - // Close the connection. - self.queue.async { - self.closeAssumingOnQueue() - } + sendQueue.sync { + do { + try outFD.write(contentsOf: dispatchData) + } catch { + logger.fault("IO error sending message \(error.forLogging)") + self.queue.async { + self.ioGroup.leave() + self.closeAssumingOnQueue() } + return } } } @@ -620,7 +598,10 @@ public final class JSONRPCConnection: Connection { /// The user-provided close handler will be called *asynchronously* when all outstanding I/O /// operations have completed. No new I/O will be accepted after `close` returns. public func close() { - queue.sync { closeAssumingOnQueue() } + queue.sync { + closeAssumingOnQueue() + ioGroup.leave() + } } /// Close the connection, assuming that the code is already executing on `queue`. @@ -635,7 +616,11 @@ public final class JSONRPCConnection: Connection { logger.log("Closing JSONRPCConnection...") // Attempt to close the reader immediately; we do not need to accept remaining inputs. // Close the writer after it finishes outstanding work. - sendIO.close() + do { + try outFD.close() + } catch { + logger.error("Failed to close outFD: \(error.forLogging)") + } } }