1616 import msvcrt
1717 from ctypes import windll
1818
19- from ctypes import Array , pointer
19+ from ctypes import Array , byref , pointer
2020from ctypes .wintypes import DWORD , HANDLE
2121from typing import Callable , ContextManager , Iterable , Iterator , TextIO
2222
3535
3636from .ansi_escape_sequences import REVERSE_ANSI_SEQUENCES
3737from .base import Input
38+ from .vt100_parser import Vt100Parser
3839
3940__all__ = [
4041 "Win32Input" ,
5253MOUSE_MOVED = 0x0001
5354MOUSE_WHEELED = 0x0004
5455
56+ # See: https://msdn.microsoft.com/pl-pl/library/windows/desktop/ms686033(v=vs.85).aspx
57+ ENABLE_VIRTUAL_TERMINAL_INPUT = 0x0200
58+
5559
5660class _Win32InputBase (Input ):
5761 """
@@ -74,7 +78,12 @@ class Win32Input(_Win32InputBase):
7478
7579 def __init__ (self , stdin : TextIO | None = None ) -> None :
7680 super ().__init__ ()
77- self .console_input_reader = ConsoleInputReader ()
81+ self ._use_virtual_terminal_input = _is_win_vt100_input_enabled ()
82+
83+ if self ._use_virtual_terminal_input :
84+ self .console_input_reader = Vt100InputReader ()
85+ else :
86+ self .console_input_reader = ConsoleInputReader ()
7887
7988 def attach (self , input_ready_callback : Callable [[], None ]) -> ContextManager [None ]:
8089 """
@@ -101,7 +110,9 @@ def closed(self) -> bool:
101110 return False
102111
103112 def raw_mode (self ) -> ContextManager [None ]:
104- return raw_mode ()
113+ return raw_mode (
114+ use_win10_virtual_terminal_input = self ._use_virtual_terminal_input
115+ )
105116
106117 def cooked_mode (self ) -> ContextManager [None ]:
107118 return cooked_mode ()
@@ -124,6 +135,88 @@ def handle(self) -> HANDLE:
124135 return self .console_input_reader .handle
125136
126137
138+ class Vt100InputReader :
139+ def __init__ (self ) -> None :
140+ self ._fdcon = None
141+
142+ self ._buffer : list [KeyPress ] = [] # Buffer to collect the Key objects.
143+ self ._vt100_parser = Vt100Parser (
144+ lambda key_press : self ._buffer .append (key_press )
145+ )
146+
147+ # When stdin is a tty, use that handle, otherwise, create a handle from
148+ # CONIN$.
149+ self .handle : HANDLE
150+ if sys .stdin .isatty ():
151+ self .handle = HANDLE (windll .kernel32 .GetStdHandle (STD_INPUT_HANDLE ))
152+ else :
153+ self ._fdcon = os .open ("CONIN$" , os .O_RDWR | os .O_BINARY )
154+ self .handle = HANDLE (msvcrt .get_osfhandle (self ._fdcon ))
155+
156+ def close (self ) -> None :
157+ "Close fdcon."
158+ if self ._fdcon is not None :
159+ os .close (self ._fdcon )
160+
161+ def read (self ) -> Iterable [KeyPress ]:
162+ """
163+ Return a list of `KeyPress` instances. It won't return anything when
164+ there was nothing to read. (This function doesn't block.)
165+
166+ http://msdn.microsoft.com/en-us/library/windows/desktop/ms684961(v=vs.85).aspx
167+ """
168+ max_count = 2048 # Max events to read at the same time.
169+
170+ read = DWORD (0 )
171+ arrtype = INPUT_RECORD * max_count
172+ input_records = arrtype ()
173+
174+ # Check whether there is some input to read. `ReadConsoleInputW` would
175+ # block otherwise.
176+ # (Actually, the event loop is responsible to make sure that this
177+ # function is only called when there is something to read, but for some
178+ # reason this happened in the asyncio_win32 loop, and it's better to be
179+ # safe anyway.)
180+ if not wait_for_handles ([self .handle ], timeout = 0 ):
181+ return
182+
183+ # Get next batch of input event.
184+ windll .kernel32 .ReadConsoleInputW (
185+ self .handle , pointer (input_records ), max_count , pointer (read )
186+ )
187+
188+ # First, get all the keys from the input buffer, in order to determine
189+ # whether we should consider this a paste event or not.
190+ for key_data in self ._get_keys (read , input_records ):
191+ self ._vt100_parser .feed (key_data )
192+
193+ # Return result.
194+ result = self ._buffer
195+ self ._buffer = []
196+ return result
197+
198+ def _get_keys (
199+ self , read : DWORD , input_records : Array [INPUT_RECORD ]
200+ ) -> Iterator [str ]:
201+ """
202+ Generator that yields `KeyPress` objects from the input records.
203+ """
204+ for i in range (read .value ):
205+ ir = input_records [i ]
206+
207+ # Get the right EventType from the EVENT_RECORD.
208+ # (For some reason the Windows console application 'cmder'
209+ # [http://gooseberrycreative.com/cmder/] can return '0' for
210+ # ir.EventType. -- Just ignore that.)
211+ if ir .EventType in EventTypes :
212+ ev = getattr (ir .Event , EventTypes [ir .EventType ])
213+
214+ # Process if this is a key event. (We also have mouse, menu and
215+ # focus events.)
216+ if isinstance (ev , KEY_EVENT_RECORD ) and ev .KeyDown :
217+ yield ev .uChar .UnicodeChar
218+
219+
127220class ConsoleInputReader :
128221 """
129222 :param recognize_paste: When True, try to discover paste actions and turn
@@ -700,8 +793,11 @@ class raw_mode:
700793 `raw_input` method of `.vt100_input`.
701794 """
702795
703- def __init__ (self , fileno : int | None = None ) -> None :
796+ def __init__ (
797+ self , fileno : int | None = None , use_win10_virtual_terminal_input : bool = False
798+ ) -> None :
704799 self .handle = HANDLE (windll .kernel32 .GetStdHandle (STD_INPUT_HANDLE ))
800+ self .use_win10_virtual_terminal_input = use_win10_virtual_terminal_input
705801
706802 def __enter__ (self ) -> None :
707803 # Remember original mode.
@@ -717,12 +813,15 @@ def _patch(self) -> None:
717813 ENABLE_LINE_INPUT = 0x0002
718814 ENABLE_PROCESSED_INPUT = 0x0001
719815
720- windll .kernel32 .SetConsoleMode (
721- self .handle ,
722- self .original_mode .value
723- & ~ (ENABLE_ECHO_INPUT | ENABLE_LINE_INPUT | ENABLE_PROCESSED_INPUT ),
816+ new_mode = self .original_mode .value & ~ (
817+ ENABLE_ECHO_INPUT | ENABLE_LINE_INPUT | ENABLE_PROCESSED_INPUT
724818 )
725819
820+ if self .use_win10_virtual_terminal_input :
821+ new_mode |= ENABLE_VIRTUAL_TERMINAL_INPUT
822+
823+ windll .kernel32 .SetConsoleMode (self .handle , new_mode )
824+
726825 def __exit__ (self , * a : object ) -> None :
727826 # Restore original mode
728827 windll .kernel32 .SetConsoleMode (self .handle , self .original_mode )
@@ -747,3 +846,25 @@ def _patch(self) -> None:
747846 self .original_mode .value
748847 | (ENABLE_ECHO_INPUT | ENABLE_LINE_INPUT | ENABLE_PROCESSED_INPUT ),
749848 )
849+
850+
851+ def _is_win_vt100_input_enabled () -> bool :
852+ """
853+ Returns True when we're running Windows and VT100 escape sequences are
854+ supported.
855+ """
856+ hconsole = HANDLE (windll .kernel32 .GetStdHandle (STD_INPUT_HANDLE ))
857+
858+ # Get original console mode.
859+ original_mode = DWORD (0 )
860+ windll .kernel32 .GetConsoleMode (hconsole , byref (original_mode ))
861+
862+ try :
863+ # Try to enable VT100 sequences.
864+ result : int = windll .kernel32 .SetConsoleMode (
865+ hconsole , DWORD (ENABLE_VIRTUAL_TERMINAL_INPUT )
866+ )
867+
868+ return result == 1
869+ finally :
870+ windll .kernel32 .SetConsoleMode (hconsole , original_mode )
0 commit comments