66import os
77import socketserver
88import threading
9+ import ujson as json
910
1011from pylsp_jsonrpc .dispatchers import MethodDispatcher
1112from pylsp_jsonrpc .endpoint import Endpoint
@@ -91,32 +92,102 @@ def start_io_lang_server(rfile, wfile, check_parent_process, handler_class):
9192 server .start ()
9293
9394
95+ def start_ws_lang_server (port , check_parent_process , handler_class ):
96+ if not issubclass (handler_class , PythonLSPServer ):
97+ raise ValueError ('Handler class must be an instance of PythonLSPServer' )
98+
99+ # pylint: disable=import-outside-toplevel
100+
101+ # imports needed only for websockets based server
102+ try :
103+ import asyncio
104+ from concurrent .futures import ThreadPoolExecutor
105+ import websockets
106+ except ImportError as e :
107+ raise ImportError ("websocket modules missing. Please run pip install 'python-lsp-server[websockets]" ) from e
108+
109+ with ThreadPoolExecutor (max_workers = 10 ) as tpool :
110+ async def pylsp_ws (websocket ):
111+ log .debug ("Creating LSP object" )
112+
113+ # creating a partial function and suppling the websocket connection
114+ response_handler = partial (send_message , websocket = websocket )
115+
116+ # Not using default stream reader and writer.
117+ # Instead using a consumer based approach to handle processed requests
118+ pylsp_handler = handler_class (rx = None , tx = None , consumer = response_handler ,
119+ check_parent_process = check_parent_process )
120+
121+ async for message in websocket :
122+ try :
123+ log .debug ("consuming payload and feeding it to LSP handler" )
124+ request = json .loads (message )
125+ loop = asyncio .get_running_loop ()
126+ await loop .run_in_executor (tpool , pylsp_handler .consume , request )
127+ except Exception as e : # pylint: disable=broad-except
128+ log .exception ("Failed to process request %s, %s" , message , str (e ))
129+
130+ def send_message (message , websocket ):
131+ """Handler to send responses of processed requests to respective web socket clients"""
132+ try :
133+ payload = json .dumps (message , ensure_ascii = False )
134+ asyncio .run (websocket .send (payload ))
135+ except Exception as e : # pylint: disable=broad-except
136+ log .exception ("Failed to write message %s, %s" , message , str (e ))
137+
138+ async def run_server ():
139+ async with websockets .serve (pylsp_ws , port = port ):
140+ # runs forever
141+ await asyncio .Future ()
142+
143+ asyncio .run (run_server ())
144+
145+
94146class PythonLSPServer (MethodDispatcher ):
95147 """ Implementation of the Microsoft VSCode Language Server Protocol
96148 https://github.com/Microsoft/language-server-protocol/blob/master/versions/protocol-1-x.md
97149 """
98150
99151 # pylint: disable=too-many-public-methods,redefined-builtin
100152
101- def __init__ (self , rx , tx , check_parent_process = False ):
153+ def __init__ (self , rx , tx , check_parent_process = False , consumer = None ):
102154 self .workspace = None
103155 self .config = None
104156 self .root_uri = None
105157 self .watching_thread = None
106158 self .workspaces = {}
107159 self .uri_workspace_mapper = {}
108160
109- self ._jsonrpc_stream_reader = JsonRpcStreamReader (rx )
110- self ._jsonrpc_stream_writer = JsonRpcStreamWriter (tx )
111161 self ._check_parent_process = check_parent_process
112- self ._endpoint = Endpoint (self , self ._jsonrpc_stream_writer .write , max_workers = MAX_WORKERS )
162+
163+ if rx is not None :
164+ self ._jsonrpc_stream_reader = JsonRpcStreamReader (rx )
165+ else :
166+ self ._jsonrpc_stream_reader = None
167+
168+ if tx is not None :
169+ self ._jsonrpc_stream_writer = JsonRpcStreamWriter (tx )
170+ else :
171+ self ._jsonrpc_stream_writer = None
172+
173+ # if consumer is None, it is assumed that the default streams-based approach is being used
174+ if consumer is None :
175+ self ._endpoint = Endpoint (self , self ._jsonrpc_stream_writer .write , max_workers = MAX_WORKERS )
176+ else :
177+ self ._endpoint = Endpoint (self , consumer , max_workers = MAX_WORKERS )
178+
113179 self ._dispatchers = []
114180 self ._shutdown = False
115181
116182 def start (self ):
117183 """Entry point for the server."""
118184 self ._jsonrpc_stream_reader .listen (self ._endpoint .consume )
119185
186+ def consume (self , message ):
187+ """Entry point for consumer based server. Alternative to stream listeners."""
188+ # assuming message will be JSON
189+ self ._endpoint .consume (message )
190+
120191 def __getitem__ (self , item ):
121192 """Override getitem to fallback through multiple dispatchers."""
122193 if self ._shutdown and item != 'exit' :
@@ -141,8 +212,10 @@ def m_shutdown(self, **_kwargs):
141212
142213 def m_exit (self , ** _kwargs ):
143214 self ._endpoint .shutdown ()
144- self ._jsonrpc_stream_reader .close ()
145- self ._jsonrpc_stream_writer .close ()
215+ if self ._jsonrpc_stream_reader is not None :
216+ self ._jsonrpc_stream_reader .close ()
217+ if self ._jsonrpc_stream_writer is not None :
218+ self ._jsonrpc_stream_writer .close ()
146219
147220 def _match_uri_to_workspace (self , uri ):
148221 workspace_uri = _utils .match_uri_to_workspace (uri , self .workspaces )
0 commit comments