mirror of
https://github.com/Leonmmcoset/cleonos.git
synced 2026-04-21 18:44:01 +00:00
204 lines
6.7 KiB
Python
204 lines
6.7 KiB
Python
from __future__ import annotations
|
|
|
|
import collections
|
|
import threading
|
|
import time
|
|
from dataclasses import dataclass, field
|
|
from typing import Deque, Dict, List, Optional, Tuple
|
|
|
|
from .constants import u64
|
|
|
|
|
|
@dataclass
|
|
class SharedKernelState:
|
|
start_ns: int = field(default_factory=time.monotonic_ns)
|
|
task_count: int = 5
|
|
current_task: int = 0
|
|
service_count: int = 7
|
|
service_ready: int = 7
|
|
context_switches: int = 0
|
|
kelf_count: int = 2
|
|
kelf_runs: int = 0
|
|
exec_requests: int = 0
|
|
exec_success: int = 0
|
|
user_shell_ready: int = 1
|
|
user_exec_requested: int = 0
|
|
user_launch_tries: int = 0
|
|
user_launch_ok: int = 0
|
|
user_launch_fail: int = 0
|
|
tty_count: int = 4
|
|
tty_active: int = 0
|
|
kbd_queue: Deque[int] = field(default_factory=collections.deque)
|
|
kbd_lock: threading.Lock = field(default_factory=threading.Lock)
|
|
kbd_queue_cap: int = 256
|
|
kbd_drop_count: int = 0
|
|
kbd_push_count: int = 0
|
|
kbd_pop_count: int = 0
|
|
kbd_hotkey_switches: int = 0
|
|
log_journal_cap: int = 256
|
|
log_journal: Deque[str] = field(default_factory=lambda: collections.deque(maxlen=256))
|
|
fs_write_max: int = 65536
|
|
|
|
proc_lock: threading.Lock = field(default_factory=threading.Lock)
|
|
proc_next_pid: int = 1
|
|
proc_current_pid: int = 0
|
|
proc_parents: Dict[int, int] = field(default_factory=dict)
|
|
proc_status: Dict[int, Optional[int]] = field(default_factory=dict)
|
|
proc_argv: Dict[int, List[str]] = field(default_factory=dict)
|
|
proc_env: Dict[int, List[str]] = field(default_factory=dict)
|
|
proc_last_signal: Dict[int, int] = field(default_factory=dict)
|
|
proc_fault_vector: Dict[int, int] = field(default_factory=dict)
|
|
proc_fault_error: Dict[int, int] = field(default_factory=dict)
|
|
proc_fault_rip: Dict[int, int] = field(default_factory=dict)
|
|
|
|
def timer_ticks(self) -> int:
|
|
return (time.monotonic_ns() - self.start_ns) // 1_000_000
|
|
|
|
def push_key(self, key: int) -> None:
|
|
with self.kbd_lock:
|
|
if len(self.kbd_queue) >= self.kbd_queue_cap:
|
|
self.kbd_queue.popleft()
|
|
self.kbd_drop_count = u64(self.kbd_drop_count + 1)
|
|
self.kbd_queue.append(key & 0xFF)
|
|
self.kbd_push_count = u64(self.kbd_push_count + 1)
|
|
|
|
def pop_key(self) -> Optional[int]:
|
|
with self.kbd_lock:
|
|
if not self.kbd_queue:
|
|
return None
|
|
self.kbd_pop_count = u64(self.kbd_pop_count + 1)
|
|
return self.kbd_queue.popleft()
|
|
|
|
def buffered_count(self) -> int:
|
|
with self.kbd_lock:
|
|
return len(self.kbd_queue)
|
|
|
|
def log_journal_push(self, text: str) -> None:
|
|
if text is None:
|
|
return
|
|
|
|
normalized = text.replace("\r", "")
|
|
lines = normalized.split("\n")
|
|
|
|
for line in lines:
|
|
if len(line) > 255:
|
|
line = line[:255]
|
|
self.log_journal.append(line)
|
|
|
|
def log_journal_count(self) -> int:
|
|
return len(self.log_journal)
|
|
|
|
def log_journal_read(self, index_from_oldest: int) -> Optional[str]:
|
|
if index_from_oldest < 0 or index_from_oldest >= len(self.log_journal):
|
|
return None
|
|
return list(self.log_journal)[index_from_oldest]
|
|
|
|
def alloc_pid(self, ppid: int) -> int:
|
|
with self.proc_lock:
|
|
pid = int(self.proc_next_pid)
|
|
|
|
if pid == 0:
|
|
pid = 1
|
|
|
|
self.proc_next_pid = int(u64(pid + 1))
|
|
|
|
if self.proc_next_pid == 0:
|
|
self.proc_next_pid = 1
|
|
|
|
self.proc_parents[pid] = int(ppid)
|
|
self.proc_status[pid] = None
|
|
self.proc_argv[pid] = []
|
|
self.proc_env[pid] = []
|
|
self.proc_last_signal[pid] = 0
|
|
self.proc_fault_vector[pid] = 0
|
|
self.proc_fault_error[pid] = 0
|
|
self.proc_fault_rip[pid] = 0
|
|
return pid
|
|
|
|
def set_current_pid(self, pid: int) -> None:
|
|
with self.proc_lock:
|
|
self.proc_current_pid = int(pid)
|
|
|
|
def get_current_pid(self) -> int:
|
|
with self.proc_lock:
|
|
return int(self.proc_current_pid)
|
|
|
|
def mark_exited(self, pid: int, status: int) -> None:
|
|
if pid <= 0:
|
|
return
|
|
|
|
with self.proc_lock:
|
|
self.proc_status[int(pid)] = int(u64(status))
|
|
|
|
def wait_pid(self, pid: int) -> Tuple[int, int]:
|
|
with self.proc_lock:
|
|
if pid not in self.proc_status:
|
|
return int(u64(-1)), 0
|
|
|
|
status = self.proc_status[pid]
|
|
|
|
if status is None:
|
|
return 0, 0
|
|
|
|
return 1, int(status)
|
|
|
|
def set_proc_cmdline(self, pid: int, argv: List[str], env: List[str]) -> None:
|
|
if pid <= 0:
|
|
return
|
|
|
|
with self.proc_lock:
|
|
if pid not in self.proc_status:
|
|
return
|
|
self.proc_argv[pid] = [str(item) for item in argv]
|
|
self.proc_env[pid] = [str(item) for item in env]
|
|
|
|
def set_proc_fault(self, pid: int, signal: int, vector: int, error_code: int, rip: int) -> None:
|
|
if pid <= 0:
|
|
return
|
|
|
|
with self.proc_lock:
|
|
if pid not in self.proc_status:
|
|
return
|
|
self.proc_last_signal[pid] = int(u64(signal))
|
|
self.proc_fault_vector[pid] = int(u64(vector))
|
|
self.proc_fault_error[pid] = int(u64(error_code))
|
|
self.proc_fault_rip[pid] = int(u64(rip))
|
|
|
|
def proc_argc(self, pid: int) -> int:
|
|
with self.proc_lock:
|
|
return len(self.proc_argv.get(int(pid), []))
|
|
|
|
def proc_argv_item(self, pid: int, index: int) -> Optional[str]:
|
|
with self.proc_lock:
|
|
values = self.proc_argv.get(int(pid), [])
|
|
if index < 0 or index >= len(values):
|
|
return None
|
|
return values[index]
|
|
|
|
def proc_envc(self, pid: int) -> int:
|
|
with self.proc_lock:
|
|
return len(self.proc_env.get(int(pid), []))
|
|
|
|
def proc_env_item(self, pid: int, index: int) -> Optional[str]:
|
|
with self.proc_lock:
|
|
values = self.proc_env.get(int(pid), [])
|
|
if index < 0 or index >= len(values):
|
|
return None
|
|
return values[index]
|
|
|
|
def proc_signal(self, pid: int) -> int:
|
|
with self.proc_lock:
|
|
return int(self.proc_last_signal.get(int(pid), 0))
|
|
|
|
def proc_fault_vector_value(self, pid: int) -> int:
|
|
with self.proc_lock:
|
|
return int(self.proc_fault_vector.get(int(pid), 0))
|
|
|
|
def proc_fault_error_value(self, pid: int) -> int:
|
|
with self.proc_lock:
|
|
return int(self.proc_fault_error.get(int(pid), 0))
|
|
|
|
def proc_fault_rip_value(self, pid: int) -> int:
|
|
with self.proc_lock:
|
|
return int(self.proc_fault_rip.get(int(pid), 0))
|