mirror of
https://github.com/Leonmmcoset/cleonos.git
synced 2026-04-21 18:44:01 +00:00
99 lines
2.7 KiB
Python
99 lines
2.7 KiB
Python
from __future__ import annotations
|
|
|
|
import os
|
|
import sys
|
|
import threading
|
|
import time
|
|
from typing import Optional
|
|
|
|
from .state import SharedKernelState
|
|
|
|
|
|
class InputPump:
|
|
def __init__(self, state: SharedKernelState) -> None:
|
|
self.state = state
|
|
self._stop = threading.Event()
|
|
self._thread: Optional[threading.Thread] = None
|
|
self._posix_term_state = None
|
|
|
|
def start(self) -> None:
|
|
if self._thread is not None:
|
|
return
|
|
if not sys.stdin or not hasattr(sys.stdin, "isatty") or not sys.stdin.isatty():
|
|
return
|
|
self._thread = threading.Thread(target=self._run, name="cleonos-wine-input", daemon=True)
|
|
self._thread.start()
|
|
|
|
def stop(self) -> None:
|
|
self._stop.set()
|
|
if self._thread is not None:
|
|
self._thread.join(timeout=0.2)
|
|
self._thread = None
|
|
self._restore_posix_tty()
|
|
|
|
def _run(self) -> None:
|
|
if os.name == "nt":
|
|
self._run_windows()
|
|
else:
|
|
self._run_posix()
|
|
|
|
def _run_windows(self) -> None:
|
|
import msvcrt # pylint: disable=import-error
|
|
|
|
while not self._stop.is_set():
|
|
if not msvcrt.kbhit():
|
|
time.sleep(0.005)
|
|
continue
|
|
|
|
ch = msvcrt.getwch()
|
|
if ch in ("\x00", "\xe0"):
|
|
_ = msvcrt.getwch()
|
|
continue
|
|
|
|
norm = self._normalize_char(ch)
|
|
if norm is None:
|
|
continue
|
|
self.state.push_key(ord(norm))
|
|
|
|
def _run_posix(self) -> None:
|
|
import select
|
|
import termios
|
|
import tty
|
|
|
|
fd = sys.stdin.fileno()
|
|
self._posix_term_state = termios.tcgetattr(fd)
|
|
tty.setcbreak(fd)
|
|
|
|
try:
|
|
while not self._stop.is_set():
|
|
readable, _, _ = select.select([sys.stdin], [], [], 0.05)
|
|
if not readable:
|
|
continue
|
|
ch = sys.stdin.read(1)
|
|
norm = self._normalize_char(ch)
|
|
if norm is None:
|
|
continue
|
|
self.state.push_key(ord(norm))
|
|
finally:
|
|
self._restore_posix_tty()
|
|
|
|
def _restore_posix_tty(self) -> None:
|
|
if self._posix_term_state is None:
|
|
return
|
|
try:
|
|
import termios
|
|
|
|
fd = sys.stdin.fileno()
|
|
termios.tcsetattr(fd, termios.TCSADRAIN, self._posix_term_state)
|
|
except Exception:
|
|
pass
|
|
finally:
|
|
self._posix_term_state = None
|
|
|
|
@staticmethod
|
|
def _normalize_char(ch: str) -> Optional[str]:
|
|
if not ch:
|
|
return None
|
|
if ch == "\r":
|
|
return "\n"
|
|
return ch |