Files
cleonos/wine/cleonos_wine_lib/state.py

204 lines
6.7 KiB
Python
Raw Normal View History

2026-04-13 18:32:22 +08:00
from __future__ import annotations
import collections
import threading
import time
from dataclasses import dataclass, field
2026-04-16 22:29:08 +08:00
from typing import Deque, Dict, List, Optional, Tuple
2026-04-13 18:32:22 +08:00
from .constants import u64
@dataclass
class SharedKernelState:
start_ns: int = field(default_factory=time.monotonic_ns)
task_count: int = 5
current_task: int = 0
service_count: int = 7
service_ready: int = 7
context_switches: int = 0
kelf_count: int = 2
kelf_runs: int = 0
exec_requests: int = 0
exec_success: int = 0
user_shell_ready: int = 1
user_exec_requested: int = 0
user_launch_tries: int = 0
user_launch_ok: int = 0
user_launch_fail: int = 0
tty_count: int = 4
tty_active: int = 0
kbd_queue: Deque[int] = field(default_factory=collections.deque)
kbd_lock: threading.Lock = field(default_factory=threading.Lock)
kbd_queue_cap: int = 256
kbd_drop_count: int = 0
kbd_push_count: int = 0
kbd_pop_count: int = 0
kbd_hotkey_switches: int = 0
log_journal_cap: int = 256
log_journal: Deque[str] = field(default_factory=lambda: collections.deque(maxlen=256))
fs_write_max: int = 65536
2026-04-13 19:08:35 +08:00
proc_lock: threading.Lock = field(default_factory=threading.Lock)
proc_next_pid: int = 1
proc_current_pid: int = 0
proc_parents: Dict[int, int] = field(default_factory=dict)
proc_status: Dict[int, Optional[int]] = field(default_factory=dict)
2026-04-16 22:29:08 +08:00
proc_argv: Dict[int, List[str]] = field(default_factory=dict)
proc_env: Dict[int, List[str]] = field(default_factory=dict)
proc_last_signal: Dict[int, int] = field(default_factory=dict)
proc_fault_vector: Dict[int, int] = field(default_factory=dict)
proc_fault_error: Dict[int, int] = field(default_factory=dict)
proc_fault_rip: Dict[int, int] = field(default_factory=dict)
2026-04-13 19:08:35 +08:00
2026-04-13 18:32:22 +08:00
def timer_ticks(self) -> int:
return (time.monotonic_ns() - self.start_ns) // 1_000_000
def push_key(self, key: int) -> None:
with self.kbd_lock:
if len(self.kbd_queue) >= self.kbd_queue_cap:
self.kbd_queue.popleft()
self.kbd_drop_count = u64(self.kbd_drop_count + 1)
self.kbd_queue.append(key & 0xFF)
self.kbd_push_count = u64(self.kbd_push_count + 1)
def pop_key(self) -> Optional[int]:
with self.kbd_lock:
if not self.kbd_queue:
return None
self.kbd_pop_count = u64(self.kbd_pop_count + 1)
return self.kbd_queue.popleft()
def buffered_count(self) -> int:
with self.kbd_lock:
return len(self.kbd_queue)
def log_journal_push(self, text: str) -> None:
if text is None:
return
normalized = text.replace("\r", "")
lines = normalized.split("\n")
for line in lines:
if len(line) > 255:
line = line[:255]
self.log_journal.append(line)
def log_journal_count(self) -> int:
return len(self.log_journal)
def log_journal_read(self, index_from_oldest: int) -> Optional[str]:
if index_from_oldest < 0 or index_from_oldest >= len(self.log_journal):
return None
2026-04-13 19:08:35 +08:00
return list(self.log_journal)[index_from_oldest]
def alloc_pid(self, ppid: int) -> int:
with self.proc_lock:
pid = int(self.proc_next_pid)
if pid == 0:
pid = 1
self.proc_next_pid = int(u64(pid + 1))
if self.proc_next_pid == 0:
self.proc_next_pid = 1
self.proc_parents[pid] = int(ppid)
self.proc_status[pid] = None
2026-04-16 22:29:08 +08:00
self.proc_argv[pid] = []
self.proc_env[pid] = []
self.proc_last_signal[pid] = 0
self.proc_fault_vector[pid] = 0
self.proc_fault_error[pid] = 0
self.proc_fault_rip[pid] = 0
2026-04-13 19:08:35 +08:00
return pid
def set_current_pid(self, pid: int) -> None:
with self.proc_lock:
self.proc_current_pid = int(pid)
def get_current_pid(self) -> int:
with self.proc_lock:
return int(self.proc_current_pid)
def mark_exited(self, pid: int, status: int) -> None:
if pid <= 0:
return
with self.proc_lock:
self.proc_status[int(pid)] = int(u64(status))
def wait_pid(self, pid: int) -> Tuple[int, int]:
with self.proc_lock:
if pid not in self.proc_status:
return int(u64(-1)), 0
status = self.proc_status[pid]
if status is None:
return 0, 0
2026-04-16 22:29:08 +08:00
return 1, int(status)
def set_proc_cmdline(self, pid: int, argv: List[str], env: List[str]) -> None:
if pid <= 0:
return
with self.proc_lock:
if pid not in self.proc_status:
return
self.proc_argv[pid] = [str(item) for item in argv]
self.proc_env[pid] = [str(item) for item in env]
def set_proc_fault(self, pid: int, signal: int, vector: int, error_code: int, rip: int) -> None:
if pid <= 0:
return
with self.proc_lock:
if pid not in self.proc_status:
return
self.proc_last_signal[pid] = int(u64(signal))
self.proc_fault_vector[pid] = int(u64(vector))
self.proc_fault_error[pid] = int(u64(error_code))
self.proc_fault_rip[pid] = int(u64(rip))
def proc_argc(self, pid: int) -> int:
with self.proc_lock:
return len(self.proc_argv.get(int(pid), []))
def proc_argv_item(self, pid: int, index: int) -> Optional[str]:
with self.proc_lock:
values = self.proc_argv.get(int(pid), [])
if index < 0 or index >= len(values):
return None
return values[index]
def proc_envc(self, pid: int) -> int:
with self.proc_lock:
return len(self.proc_env.get(int(pid), []))
def proc_env_item(self, pid: int, index: int) -> Optional[str]:
with self.proc_lock:
values = self.proc_env.get(int(pid), [])
if index < 0 or index >= len(values):
return None
return values[index]
def proc_signal(self, pid: int) -> int:
with self.proc_lock:
return int(self.proc_last_signal.get(int(pid), 0))
def proc_fault_vector_value(self, pid: int) -> int:
with self.proc_lock:
return int(self.proc_fault_vector.get(int(pid), 0))
def proc_fault_error_value(self, pid: int) -> int:
with self.proc_lock:
return int(self.proc_fault_error.get(int(pid), 0))
def proc_fault_rip_value(self, pid: int) -> int:
with self.proc_lock:
return int(self.proc_fault_rip.get(int(pid), 0))