|
7 | 7 | from code import InteractiveConsole
|
8 | 8 | from functools import partial
|
9 | 9 | from unittest import TestCase
|
10 |
| -from unittest.mock import MagicMock, patch |
| 10 | +from unittest.mock import MagicMock, call, patch, ANY |
11 | 11 |
|
12 | 12 | from test.support import requires
|
13 | 13 | from test.support.import_helper import import_module
|
14 | 14 |
|
15 | 15 | # Optionally test pyrepl. This currently requires that the
|
16 | 16 | # 'curses' resource be given on the regrtest command line using the -u
|
17 | 17 | # option. Additionally, we need to attempt to import curses and readline.
|
18 |
| -requires('curses') |
19 |
| -curses = import_module('curses') |
20 |
| -readline = import_module('readline') |
| 18 | +requires("curses") |
| 19 | +curses = import_module("curses") |
| 20 | +readline = import_module("readline") |
21 | 21 |
|
22 | 22 | from _pyrepl.console import Console, Event
|
23 | 23 | from _pyrepl.readline import ReadlineAlikeReader, ReadlineConfig
|
24 | 24 | from _pyrepl.simple_interact import _strip_final_indent
|
| 25 | +from _pyrepl.unix_console import UnixConsole |
25 | 26 | from _pyrepl.unix_eventqueue import EventQueue
|
26 | 27 |
|
27 | 28 |
|
@@ -932,5 +933,246 @@ def test_setpos_fromxy_in_wrapped_line(self):
|
932 | 933 | self.assertEqual(reader.pos, 9)
|
933 | 934 |
|
934 | 935 |
|
| 936 | +def unix_console(events, **kwargs): |
| 937 | + console = UnixConsole() |
| 938 | + console.get_event = MagicMock(side_effect=events) |
| 939 | + console.prepare() |
| 940 | + for key, val in kwargs.items(): |
| 941 | + setattr(console, key, val) |
| 942 | + return console |
| 943 | + |
| 944 | + |
| 945 | +handle_events_unix_console = partial(handle_all_events, prepare_console=unix_console) |
| 946 | +handle_events_narrow_unix_console = partial( |
| 947 | + handle_all_events, prepare_console=partial(unix_console, width=5) |
| 948 | +) |
| 949 | +handle_events_short_unix_console = partial( |
| 950 | + handle_all_events, prepare_console=partial(unix_console, height=1) |
| 951 | +) |
| 952 | +handle_events_unix_console_height_3 = partial( |
| 953 | + handle_all_events, prepare_console=partial(unix_console, height=2) |
| 954 | +) |
| 955 | + |
| 956 | + |
| 957 | +TERM_CAPABILITIES = { |
| 958 | + "bel": b"\x07", |
| 959 | + "civis": b"\x1b[?25l", |
| 960 | + "clear": b"\x1b[H\x1b[2J", |
| 961 | + "cnorm": b"\x1b[?12l\x1b[?25h", |
| 962 | + "cub": b"\x1b[%p1%dD", |
| 963 | + "cub1": b"\x08", |
| 964 | + "cud": b"\x1b[%p1%dB", |
| 965 | + "cud1": b"\n", |
| 966 | + "cuf": b"\x1b[%p1%dC", |
| 967 | + "cuf1": b"\x1b[C", |
| 968 | + "cup": b"\x1b[%i%p1%d;%p2%dH", |
| 969 | + "cuu": b"\x1b[%p1%dA", |
| 970 | + "cuu1": b"\x1b[A", |
| 971 | + "dch1": b"\x1b[P", |
| 972 | + "dch": b"\x1b[%p1%dP", |
| 973 | + "el": b"\x1b[K", |
| 974 | + "hpa": b"\x1b[%i%p1%dG", |
| 975 | + "ich": b"\x1b[%p1%d@", |
| 976 | + "ich1": None, |
| 977 | + "ind": b"\n", |
| 978 | + "pad": None, |
| 979 | + "ri": b"\x1bM", |
| 980 | + "rmkx": b"\x1b[?1l\x1b>", |
| 981 | + "smkx": b"\x1b[?1h\x1b=", |
| 982 | +} |
| 983 | + |
| 984 | + |
| 985 | +@patch("_pyrepl.curses.tigetstr", lambda s: TERM_CAPABILITIES.get(s)) |
| 986 | +@patch( |
| 987 | + "_pyrepl.curses.tparm", |
| 988 | + lambda s, *args: s + b":" + b",".join(str(i).encode() for i in args), |
| 989 | +) |
| 990 | +@patch("_pyrepl.curses.setupterm", lambda a, b: None) |
| 991 | +@patch("termios.tcsetattr", lambda a, b, c: None) |
| 992 | +@patch("os.write") |
| 993 | +class TestConsole(TestCase): |
| 994 | + def test_simple_addition(self, _os_write): |
| 995 | + code = "12+34" |
| 996 | + events = code_to_events(code) |
| 997 | + _, _ = handle_events_unix_console(events) |
| 998 | + _os_write.assert_any_call(ANY, b"1") |
| 999 | + _os_write.assert_any_call(ANY, b"2") |
| 1000 | + _os_write.assert_any_call(ANY, b"+") |
| 1001 | + _os_write.assert_any_call(ANY, b"3") |
| 1002 | + _os_write.assert_any_call(ANY, b"4") |
| 1003 | + |
| 1004 | + def test_wrap(self, _os_write): |
| 1005 | + code = "12+34" |
| 1006 | + events = code_to_events(code) |
| 1007 | + _, _ = handle_events_narrow_unix_console(events) |
| 1008 | + _os_write.assert_any_call(ANY, b"1") |
| 1009 | + _os_write.assert_any_call(ANY, b"2") |
| 1010 | + _os_write.assert_any_call(ANY, b"+") |
| 1011 | + _os_write.assert_any_call(ANY, b"3") |
| 1012 | + _os_write.assert_any_call(ANY, b"\\") |
| 1013 | + _os_write.assert_any_call(ANY, b"\n") |
| 1014 | + _os_write.assert_any_call(ANY, b"4") |
| 1015 | + |
| 1016 | + def test_cursor_left(self, _os_write): |
| 1017 | + code = "1" |
| 1018 | + events = itertools.chain( |
| 1019 | + code_to_events(code), |
| 1020 | + [Event(evt="key", data="left", raw=bytearray(b"\x1bOD"))], |
| 1021 | + ) |
| 1022 | + _, _ = handle_events_unix_console(events) |
| 1023 | + _os_write.assert_any_call(ANY, TERM_CAPABILITIES["cub"] + b":1") |
| 1024 | + |
| 1025 | + def test_cursor_left_right(self, _os_write): |
| 1026 | + code = "1" |
| 1027 | + events = itertools.chain( |
| 1028 | + code_to_events(code), |
| 1029 | + [ |
| 1030 | + Event(evt="key", data="left", raw=bytearray(b"\x1bOD")), |
| 1031 | + Event(evt="key", data="right", raw=bytearray(b"\x1bOC")), |
| 1032 | + ], |
| 1033 | + ) |
| 1034 | + _, _ = handle_events_unix_console(events) |
| 1035 | + _os_write.assert_any_call(ANY, TERM_CAPABILITIES["cub"] + b":1") |
| 1036 | + _os_write.assert_any_call(ANY, TERM_CAPABILITIES["cuf"] + b":1") |
| 1037 | + |
| 1038 | + def test_cursor_up(self, _os_write): |
| 1039 | + code = "1\n2+3" |
| 1040 | + events = itertools.chain( |
| 1041 | + code_to_events(code), |
| 1042 | + [Event(evt="key", data="up", raw=bytearray(b"\x1bOA"))], |
| 1043 | + ) |
| 1044 | + _, _ = handle_events_unix_console(events) |
| 1045 | + _os_write.assert_any_call(ANY, TERM_CAPABILITIES["cuu"] + b":1") |
| 1046 | + |
| 1047 | + def test_cursor_up_down(self, _os_write): |
| 1048 | + code = "1\n2+3" |
| 1049 | + events = itertools.chain( |
| 1050 | + code_to_events(code), |
| 1051 | + [ |
| 1052 | + Event(evt="key", data="up", raw=bytearray(b"\x1bOA")), |
| 1053 | + Event(evt="key", data="down", raw=bytearray(b"\x1bOB")), |
| 1054 | + ], |
| 1055 | + ) |
| 1056 | + _, _ = handle_events_unix_console(events) |
| 1057 | + _os_write.assert_any_call(ANY, TERM_CAPABILITIES["cuu"] + b":1") |
| 1058 | + _os_write.assert_any_call(ANY, TERM_CAPABILITIES["cud"] + b":1") |
| 1059 | + |
| 1060 | + def test_cursor_back_write(self, _os_write): |
| 1061 | + events = itertools.chain( |
| 1062 | + code_to_events("1"), |
| 1063 | + [Event(evt="key", data="left", raw=bytearray(b"\x1bOD"))], |
| 1064 | + code_to_events("2"), |
| 1065 | + ) |
| 1066 | + _, _ = handle_events_unix_console(events) |
| 1067 | + _os_write.assert_any_call(ANY, b"1") |
| 1068 | + _os_write.assert_any_call(ANY, TERM_CAPABILITIES["cub"] + b":1") |
| 1069 | + _os_write.assert_any_call(ANY, b"2") |
| 1070 | + |
| 1071 | + def test_multiline_function_move_up_short_terminal(self, _os_write): |
| 1072 | + # fmt: off |
| 1073 | + code = ( |
| 1074 | + "def f():\n" |
| 1075 | + " foo" |
| 1076 | + ) |
| 1077 | + # fmt: on |
| 1078 | + |
| 1079 | + events = itertools.chain( |
| 1080 | + code_to_events(code), |
| 1081 | + [ |
| 1082 | + Event(evt="key", data="up", raw=bytearray(b"\x1bOA")), |
| 1083 | + Event(evt="scroll", data=None), |
| 1084 | + ], |
| 1085 | + ) |
| 1086 | + _, _ = handle_events_short_unix_console(events) |
| 1087 | + _os_write.assert_any_call(ANY, TERM_CAPABILITIES["ri"] + b":") |
| 1088 | + |
| 1089 | + def test_multiline_function_move_up_down_short_terminal(self, _os_write): |
| 1090 | + # fmt: off |
| 1091 | + code = ( |
| 1092 | + "def f():\n" |
| 1093 | + " foo" |
| 1094 | + ) |
| 1095 | + # fmt: on |
| 1096 | + |
| 1097 | + events = itertools.chain( |
| 1098 | + code_to_events(code), |
| 1099 | + [ |
| 1100 | + Event(evt="key", data="up", raw=bytearray(b"\x1bOA")), |
| 1101 | + Event(evt="scroll", data=None), |
| 1102 | + Event(evt="key", data="down", raw=bytearray(b"\x1bOB")), |
| 1103 | + Event(evt="scroll", data=None), |
| 1104 | + ], |
| 1105 | + ) |
| 1106 | + _, _ = handle_events_short_unix_console(events) |
| 1107 | + _os_write.assert_any_call(ANY, TERM_CAPABILITIES["ri"] + b":") |
| 1108 | + _os_write.assert_any_call(ANY, TERM_CAPABILITIES["ind"] + b":") |
| 1109 | + |
| 1110 | + def test_resize_bigger_on_multiline_function(self, _os_write): |
| 1111 | + # fmt: off |
| 1112 | + code = ( |
| 1113 | + "def f():\n" |
| 1114 | + " foo" |
| 1115 | + ) |
| 1116 | + # fmt: on |
| 1117 | + |
| 1118 | + events = itertools.chain(code_to_events(code)) |
| 1119 | + reader, console = handle_events_short_unix_console(events) |
| 1120 | + |
| 1121 | + console.height = 2 |
| 1122 | + |
| 1123 | + def same_reader(_): |
| 1124 | + return reader |
| 1125 | + |
| 1126 | + def same_console(events): |
| 1127 | + console.get_event = MagicMock(side_effect=events) |
| 1128 | + return console |
| 1129 | + |
| 1130 | + _, _ = handle_all_events( |
| 1131 | + [Event(evt="resize", data=None)], |
| 1132 | + prepare_reader=same_reader, |
| 1133 | + prepare_console=same_console, |
| 1134 | + ) |
| 1135 | + _os_write.assert_has_calls( |
| 1136 | + [ |
| 1137 | + call(ANY, TERM_CAPABILITIES["ri"] + b":"), |
| 1138 | + call(ANY, TERM_CAPABILITIES["cup"] + b":0,0"), |
| 1139 | + call(ANY, b"def f():"), |
| 1140 | + ] |
| 1141 | + ) |
| 1142 | + |
| 1143 | + def test_resize_smaller_on_multiline_function(self, _os_write): |
| 1144 | + # fmt: off |
| 1145 | + code = ( |
| 1146 | + "def f():\n" |
| 1147 | + " foo" |
| 1148 | + ) |
| 1149 | + # fmt: on |
| 1150 | + |
| 1151 | + events = itertools.chain(code_to_events(code)) |
| 1152 | + reader, console = handle_events_unix_console_height_3(events) |
| 1153 | + |
| 1154 | + console.height = 1 |
| 1155 | + |
| 1156 | + def same_reader(_): |
| 1157 | + return reader |
| 1158 | + |
| 1159 | + def same_console(events): |
| 1160 | + console.get_event = MagicMock(side_effect=events) |
| 1161 | + return console |
| 1162 | + |
| 1163 | + _, _ = handle_all_events( |
| 1164 | + [Event(evt="resize", data=None)], |
| 1165 | + prepare_reader=same_reader, |
| 1166 | + prepare_console=same_console, |
| 1167 | + ) |
| 1168 | + _os_write.assert_has_calls( |
| 1169 | + [ |
| 1170 | + call(ANY, TERM_CAPABILITIES["ind"] + b":"), |
| 1171 | + call(ANY, TERM_CAPABILITIES["cup"] + b":0,0"), |
| 1172 | + call(ANY, b" foo"), |
| 1173 | + ] |
| 1174 | + ) |
| 1175 | + |
| 1176 | + |
935 | 1177 | if __name__ == "__main__":
|
936 | 1178 | unittest.main()
|
0 commit comments