Files
cleonos/wine/cleonos_wine_lib/state.py
2026-04-13 19:12:22 +08:00

131 lines
3.9 KiB
Python

from __future__ import annotations
import collections
import threading
import time
from dataclasses import dataclass, field
from typing import Deque, Dict, Optional, Tuple
from .constants import u64
@dataclass
class SharedKernelState:
start_ns: int = field(default_factory=time.monotonic_ns)
task_count: int = 5
current_task: int = 0
service_count: int = 7
service_ready: int = 7
context_switches: int = 0
kelf_count: int = 2
kelf_runs: int = 0
exec_requests: int = 0
exec_success: int = 0
user_shell_ready: int = 1
user_exec_requested: int = 0
user_launch_tries: int = 0
user_launch_ok: int = 0
user_launch_fail: int = 0
tty_count: int = 4
tty_active: int = 0
kbd_queue: Deque[int] = field(default_factory=collections.deque)
kbd_lock: threading.Lock = field(default_factory=threading.Lock)
kbd_queue_cap: int = 256
kbd_drop_count: int = 0
kbd_push_count: int = 0
kbd_pop_count: int = 0
kbd_hotkey_switches: int = 0
log_journal_cap: int = 256
log_journal: Deque[str] = field(default_factory=lambda: collections.deque(maxlen=256))
fs_write_max: int = 65536
proc_lock: threading.Lock = field(default_factory=threading.Lock)
proc_next_pid: int = 1
proc_current_pid: int = 0
proc_parents: Dict[int, int] = field(default_factory=dict)
proc_status: Dict[int, Optional[int]] = field(default_factory=dict)
def timer_ticks(self) -> int:
return (time.monotonic_ns() - self.start_ns) // 1_000_000
def push_key(self, key: int) -> None:
with self.kbd_lock:
if len(self.kbd_queue) >= self.kbd_queue_cap:
self.kbd_queue.popleft()
self.kbd_drop_count = u64(self.kbd_drop_count + 1)
self.kbd_queue.append(key & 0xFF)
self.kbd_push_count = u64(self.kbd_push_count + 1)
def pop_key(self) -> Optional[int]:
with self.kbd_lock:
if not self.kbd_queue:
return None
self.kbd_pop_count = u64(self.kbd_pop_count + 1)
return self.kbd_queue.popleft()
def buffered_count(self) -> int:
with self.kbd_lock:
return len(self.kbd_queue)
def log_journal_push(self, text: str) -> None:
if text is None:
return
normalized = text.replace("\r", "")
lines = normalized.split("\n")
for line in lines:
if len(line) > 255:
line = line[:255]
self.log_journal.append(line)
def log_journal_count(self) -> int:
return len(self.log_journal)
def log_journal_read(self, index_from_oldest: int) -> Optional[str]:
if index_from_oldest < 0 or index_from_oldest >= len(self.log_journal):
return None
return list(self.log_journal)[index_from_oldest]
def alloc_pid(self, ppid: int) -> int:
with self.proc_lock:
pid = int(self.proc_next_pid)
if pid == 0:
pid = 1
self.proc_next_pid = int(u64(pid + 1))
if self.proc_next_pid == 0:
self.proc_next_pid = 1
self.proc_parents[pid] = int(ppid)
self.proc_status[pid] = None
return pid
def set_current_pid(self, pid: int) -> None:
with self.proc_lock:
self.proc_current_pid = int(pid)
def get_current_pid(self) -> int:
with self.proc_lock:
return int(self.proc_current_pid)
def mark_exited(self, pid: int, status: int) -> None:
if pid <= 0:
return
with self.proc_lock:
self.proc_status[int(pid)] = int(u64(status))
def wait_pid(self, pid: int) -> Tuple[int, int]:
with self.proc_lock:
if pid not in self.proc_status:
return int(u64(-1)), 0
status = self.proc_status[pid]
if status is None:
return 0, 0
return 1, int(status)