diff --git a/CMakeLists.txt b/CMakeLists.txt index 93afb8a..b4d9011 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -125,10 +125,27 @@ if(EXISTS "${CLEONOS_MENUCONFIG_CMAKE}") endif() function(cl_set_bool_cache VAR_NAME DEFAULT_VALUE DOC_TEXT) - if(NOT DEFINED ${VAR_NAME}) + set(_raw_value "") + set(_has_value OFF) + + if(DEFINED ${VAR_NAME}) + set(_raw_value "${${VAR_NAME}}") + set(_has_value ON) + elseif(DEFINED ${VAR_NAME}_IS_ENABLED) + set(_raw_value "${${VAR_NAME}_IS_ENABLED}") + set(_has_value ON) + endif() + + if(NOT _has_value) set(${VAR_NAME} ${DEFAULT_VALUE} CACHE BOOL "${DOC_TEXT}") else() - set(${VAR_NAME} ${${VAR_NAME}} CACHE BOOL "${DOC_TEXT}" FORCE) + string(TOUPPER "${_raw_value}" _raw_upper) + if(_raw_upper STREQUAL "Y" OR _raw_upper STREQUAL "M" OR _raw_upper STREQUAL "ON" OR _raw_upper STREQUAL "TRUE" OR _raw_upper STREQUAL "1") + set(_bool_value ON) + else() + set(_bool_value OFF) + endif() + set(${VAR_NAME} ${_bool_value} CACHE BOOL "${DOC_TEXT}" FORCE) endif() endfunction() diff --git a/clks/kernel/userland.c b/clks/kernel/userland.c index d891088..6869b27 100644 --- a/clks/kernel/userland.c +++ b/clks/kernel/userland.c @@ -53,6 +53,7 @@ static clks_bool clks_userland_probe_elf(const char *path, const char *tag) { return CLKS_TRUE; } +#if CLKS_CFG_USER_INIT_SCRIPT_PROBE static void clks_userland_probe_init_script(void) { const void *data; u64 size = 0ULL; @@ -67,6 +68,7 @@ static void clks_userland_probe_init_script(void) { clks_log(CLKS_LOG_INFO, "USER", "INIT SCRIPT READY /SHELL/INIT.CMD"); clks_log_hex(CLKS_LOG_INFO, "USER", "INIT_SCRIPT_SIZE", size); } +#endif static clks_bool clks_userland_request_shell_exec(void) { u64 status = (u64)-1; diff --git a/configs/menuconfig/clks_features.json b/configs/menuconfig/clks_features.json index 32d4bdc..5715d31 100644 --- a/configs/menuconfig/clks_features.json +++ b/configs/menuconfig/clks_features.json @@ -4,193 +4,256 @@ "key": "CLEONOS_CLKS_ENABLE_AUDIO", "title": "Audio Driver Init", "description": "Initialize kernel audio subsystem during boot.", + "type": "bool", "default": true }, { "key": "CLEONOS_CLKS_ENABLE_MOUSE", "title": "PS/2 Mouse Input", "description": "Initialize kernel PS/2 mouse input subsystem.", + "type": "bool", "default": true }, { "key": "CLEONOS_CLKS_ENABLE_DESKTOP", "title": "TTY2 Desktop", "description": "Enable desktop compositor tick/update path on TTY2.", - "default": true + "type": "bool", + "default": true, + "depends_on": "CLEONOS_CLKS_ENABLE_MOUSE" }, { "key": "CLEONOS_CLKS_ENABLE_DRIVER_MANAGER", "title": "Driver Manager", "description": "Initialize kernel ELF driver manager.", + "type": "bool", "default": true }, { "key": "CLEONOS_CLKS_ENABLE_KELF", "title": "KELF Executor", "description": "Enable kernel ELF app dispatcher and kelfd task.", - "default": true + "type": "bool", + "default": true, + "depends_on": "CLEONOS_CLKS_ENABLE_DRIVER_MANAGER && CLEONOS_CLKS_ENABLE_ELFRUNNER_INIT", + "imply": [ + "CLEONOS_CLKS_ENABLE_ELFRUNNER_PROBE", + "CLEONOS_CLKS_ENABLE_USER_SYSTEM_APP_PROBE" + ] }, { "key": "CLEONOS_CLKS_ENABLE_USERLAND_AUTO_EXEC", "title": "Auto Enter User Shell", "description": "Auto-exec /shell/shell.elf after kernel boot.", - "default": true + "type": "bool", + "default": true, + "depends_on": "CLEONOS_CLKS_ENABLE_USRD_TASK && CLEONOS_CLKS_ENABLE_KEYBOARD", + "select": [ + "CLEONOS_CLKS_ENABLE_USRD_TASK", + "CLEONOS_CLKS_ENABLE_KEYBOARD" + ], + "imply": [ + "CLEONOS_CLKS_ENABLE_USER_INIT_SCRIPT_PROBE", + "CLEONOS_CLKS_ENABLE_SHELL_MODE_LOG" + ] }, { "key": "CLEONOS_CLKS_ENABLE_HEAP_SELFTEST", "title": "Heap Selftest", "description": "Run kmalloc/kfree selftest during kernel boot.", + "type": "bool", "default": true }, { "key": "CLEONOS_CLKS_ENABLE_EXTERNAL_PSF", "title": "Load External PSF Font", "description": "Load /system/tty.psf and apply it to framebuffer TTY.", + "type": "bool", "default": true }, { "key": "CLEONOS_CLKS_ENABLE_KEYBOARD", "title": "PS/2 Keyboard Input", "description": "Initialize PS/2 keyboard input subsystem.", - "default": true + "type": "bool", + "default": true, + "imply": [ + "CLEONOS_CLKS_ENABLE_KBD_TTY_SWITCH_HOTKEY", + "CLEONOS_CLKS_ENABLE_KBD_CTRL_SHORTCUTS" + ] }, { "key": "CLEONOS_CLKS_ENABLE_ELFRUNNER_PROBE", "title": "ELFRUNNER Probe", "description": "Probe kernel ELF runtime metadata after ELFRUNNER init.", - "default": true + "type": "bool", + "default": true, + "depends_on": "CLEONOS_CLKS_ENABLE_ELFRUNNER_INIT && CLEONOS_CLKS_ENABLE_KELF" }, { "key": "CLEONOS_CLKS_ENABLE_KLOGD_TASK", "title": "Scheduler Task: klogd", "description": "Enable periodic klogd maintenance task.", + "type": "bool", "default": true }, { "key": "CLEONOS_CLKS_ENABLE_KWORKER_TASK", "title": "Scheduler Task: kworker", "description": "Enable periodic kernel worker service-heartbeat task.", + "type": "bool", "default": true }, { "key": "CLEONOS_CLKS_ENABLE_USRD_TASK", "title": "Scheduler Task: usrd", "description": "Enable user/runtime dispatch task (shell tick, tty tick, exec tick).", + "type": "bool", "default": true }, { "key": "CLEONOS_CLKS_ENABLE_BOOT_VIDEO_LOG", "title": "Boot Video Geometry Logs", "description": "Print framebuffer width/height/pitch/bpp logs at boot.", - "default": true + "type": "tristate", + "default": "y" }, { "key": "CLEONOS_CLKS_ENABLE_PMM_STATS_LOG", "title": "PMM Stats Logs", "description": "Print PMM managed/free/used/dropped pages at boot.", - "default": true + "type": "tristate", + "default": "y" }, { "key": "CLEONOS_CLKS_ENABLE_HEAP_STATS_LOG", "title": "Heap Stats Logs", "description": "Print heap total/free bytes at boot.", - "default": true + "type": "tristate", + "default": "y" }, { "key": "CLEONOS_CLKS_ENABLE_FS_ROOT_LOG", "title": "FS Root Children Log", "description": "Print root directory children count during FS init.", - "default": true + "type": "tristate", + "default": "y", + "depends_on": "CLEONOS_CLKS_ENABLE_SYSTEM_DIR_CHECK" }, { "key": "CLEONOS_CLKS_ENABLE_SYSTEM_DIR_CHECK", "title": "FS /SYSTEM Sanity Check", "description": "Require /system directory check during boot.", + "type": "bool", "default": true }, { "key": "CLEONOS_CLKS_ENABLE_ELFRUNNER_INIT", "title": "ELFRUNNER Init", "description": "Initialize ELFRUNNER framework in kernel boot path.", + "type": "bool", "default": true }, { "key": "CLEONOS_CLKS_ENABLE_SYSCALL_TICK_QUERY", "title": "SYSCALL Tick Query", "description": "Query timer ticks via syscall and log result during boot.", - "default": true + "type": "bool", + "default": true, + "depends_on": "CLEONOS_CLKS_ENABLE_PROCFS" }, { "key": "CLEONOS_CLKS_ENABLE_TTY_READY_LOG", "title": "TTY Ready Logs", "description": "Print TTY count/active/cursor ready logs.", - "default": true + "type": "tristate", + "default": "y" }, { "key": "CLEONOS_CLKS_ENABLE_IDLE_DEBUG_LOG", "title": "Idle Loop Debug Log", "description": "Print debug log before entering kernel idle loop.", - "default": true + "type": "tristate", + "default": "y" }, { "key": "CLEONOS_CLKS_ENABLE_PROCFS", "title": "Virtual /proc", "description": "Enable virtual procfs paths (/proc, /proc/list, /proc/self, /proc/) in syscall FS layer.", + "type": "bool", "default": true }, { "key": "CLEONOS_CLKS_ENABLE_EXEC_SERIAL_LOG", "title": "EXEC Serial Logs", "description": "Print EXEC run/return/path logs to serial output.", - "default": true + "type": "tristate", + "default": "y" }, { "key": "CLEONOS_CLKS_ENABLE_KBD_TTY_SWITCH_HOTKEY", "title": "Keyboard TTY Switch Hotkey", "description": "Enable ALT+F1..F4 keyboard hotkey for active TTY switching.", - "default": true + "type": "bool", + "default": true, + "depends_on": "CLEONOS_CLKS_ENABLE_KEYBOARD" }, { "key": "CLEONOS_CLKS_ENABLE_KBD_CTRL_SHORTCUTS", "title": "Keyboard Ctrl Shortcuts", "description": "Enable Ctrl+A/C/V shortcuts for input selection/copy/paste.", - "default": true + "type": "bool", + "default": true, + "depends_on": "CLEONOS_CLKS_ENABLE_KEYBOARD", + "imply": [ + "CLEONOS_CLKS_ENABLE_KBD_TTY_SWITCH_HOTKEY" + ] }, { "key": "CLEONOS_CLKS_ENABLE_KBD_FORCE_STOP_HOTKEY", "title": "Keyboard Force-Stop Hotkey", "description": "Enable Ctrl+Alt+C force-stop for current running user process.", - "default": true + "type": "bool", + "default": true, + "depends_on": "CLEONOS_CLKS_ENABLE_KEYBOARD && CLEONOS_CLKS_ENABLE_PROCFS" }, { "key": "CLEONOS_CLKS_ENABLE_USER_INIT_SCRIPT_PROBE", "title": "User Init Script Probe", "description": "Probe and log /shell/init.cmd presence during userland init.", - "default": true + "type": "bool", + "default": true, + "depends_on": "CLEONOS_CLKS_ENABLE_USERLAND_AUTO_EXEC" }, { "key": "CLEONOS_CLKS_ENABLE_USER_SYSTEM_APP_PROBE", "title": "User System App Probe", "description": "Probe /system/elfrunner.elf and /system/memc.elf during userland init.", - "default": true + "type": "bool", + "default": true, + "depends_on": "CLEONOS_CLKS_ENABLE_KELF" }, { "key": "CLEONOS_CLKS_ENABLE_SCHED_TASK_COUNT_LOG", "title": "Scheduler Task Count Log", "description": "Print scheduler task count after scheduler initialization.", - "default": true + "type": "tristate", + "default": "y", + "depends_on": "CLEONOS_CLKS_ENABLE_KLOGD_TASK || CLEONOS_CLKS_ENABLE_KWORKER_TASK || CLEONOS_CLKS_ENABLE_USRD_TASK" }, { "key": "CLEONOS_CLKS_ENABLE_INTERRUPT_READY_LOG", "title": "Interrupt Ready Log", "description": "Print IDT/PIC initialized log after interrupt setup.", - "default": true + "type": "tristate", + "default": "y" }, { "key": "CLEONOS_CLKS_ENABLE_SHELL_MODE_LOG", "title": "Shell Mode Log", "description": "Print whether boot default mode is user shell or kernel shell.", - "default": true + "type": "tristate", + "default": "y" } ] } diff --git a/scripts/__pycache__/menuconfig.cpython-312.pyc b/scripts/__pycache__/menuconfig.cpython-312.pyc deleted file mode 100644 index 33898a0..0000000 Binary files a/scripts/__pycache__/menuconfig.cpython-312.pyc and /dev/null differ diff --git a/scripts/__pycache__/menuconfig.cpython-313.pyc b/scripts/__pycache__/menuconfig.cpython-313.pyc new file mode 100644 index 0000000..9a5fb60 Binary files /dev/null and b/scripts/__pycache__/menuconfig.cpython-313.pyc differ diff --git a/scripts/menuconfig.py b/scripts/menuconfig.py index 6d18ad3..1e6baff 100644 --- a/scripts/menuconfig.py +++ b/scripts/menuconfig.py @@ -21,7 +21,7 @@ import sys import textwrap from dataclasses import dataclass from pathlib import Path -from typing import Dict, Iterable, List, Tuple +from typing import Callable, Dict, Iterable, List, Optional, Set, Tuple try: import curses @@ -51,21 +51,239 @@ class OptionItem: key: str title: str description: str - default: bool + kind: str + default: int + depends_on: str + selects: Tuple[str, ...] + implies: Tuple[str, ...] + + +@dataclass +class EvalResult: + effective: Dict[str, int] + visible: Dict[str, bool] + min_required: Dict[str, int] + max_selectable: Dict[str, int] + selected_by: Dict[str, List[str]] + implied_by: Dict[str, List[str]] + depends_symbols: Dict[str, List[str]] + + +TRI_N = 0 +TRI_M = 1 +TRI_Y = 2 + + +def tri_char(value: int) -> str: + if value >= TRI_Y: + return "y" + if value >= TRI_M: + return "m" + return "n" + + +def tri_text(value: int) -> str: + ch = tri_char(value) + if ch == "y": + return "Y" + if ch == "m": + return "M" + return "N" + + +def normalize_kind(raw: object) -> str: + text = str(raw or "bool").strip().lower() + if text in {"tristate", "tri"}: + return "tristate" + return "bool" + + +def _tri_from_bool(value: bool) -> int: + return TRI_Y if value else TRI_N + + +def normalize_tri(raw: object, default: int, kind: str) -> int: + if isinstance(raw, bool): + value = TRI_Y if raw else TRI_N + elif isinstance(raw, (int, float)): + iv = int(raw) + if iv <= 0: + value = TRI_N + elif iv == 1: + value = TRI_M + else: + value = TRI_Y + elif isinstance(raw, str): + text = raw.strip().lower() + if text in {"1", "on", "true", "yes", "y"}: + value = TRI_Y + elif text in {"m", "mod", "module"}: + value = TRI_M + elif text in {"0", "off", "false", "no", "n"}: + value = TRI_N + else: + value = default + else: + value = default + + if kind == "bool": + return TRI_Y if value == TRI_Y else TRI_N + if value < TRI_N: + return TRI_N + if value > TRI_Y: + return TRI_Y + return value + + +def _tokenize_dep_expr(expr: str) -> List[str]: + tokens: List[str] = [] + i = 0 + n = len(expr) + + while i < n: + ch = expr[i] + if ch.isspace(): + i += 1 + continue + if ch in "()!": + tokens.append(ch) + i += 1 + continue + if i + 1 < n: + pair = expr[i : i + 2] + if pair in {"&&", "||"}: + tokens.append(pair) + i += 2 + continue + m = re.match(r"[A-Za-z_][A-Za-z0-9_]*", expr[i:]) + if m: + tok = m.group(0) + tokens.append(tok) + i += len(tok) + continue + raise RuntimeError(f"invalid token in depends expression: {expr!r}") + return tokens + + +class _DepExprParser: + def __init__(self, tokens: List[str], resolver: Callable[[str], int]): + self.tokens = tokens + self.pos = 0 + self.resolver = resolver + + def _peek(self) -> Optional[str]: + if self.pos >= len(self.tokens): + return None + return self.tokens[self.pos] + + def _take(self) -> str: + if self.pos >= len(self.tokens): + raise RuntimeError("unexpected end of expression") + tok = self.tokens[self.pos] + self.pos += 1 + return tok + + def parse(self) -> int: + value = self._parse_or() + if self._peek() is not None: + raise RuntimeError("unexpected token in expression") + return value + + def _parse_or(self) -> int: + value = self._parse_and() + while self._peek() == "||": + self._take() + value = max(value, self._parse_and()) + return value + + def _parse_and(self) -> int: + value = self._parse_unary() + while self._peek() == "&&": + self._take() + value = min(value, self._parse_unary()) + return value + + def _parse_unary(self) -> int: + tok = self._peek() + if tok == "!": + self._take() + value = self._parse_unary() + if value == TRI_Y: + return TRI_N + if value == TRI_N: + return TRI_Y + return TRI_M + return self._parse_primary() + + def _parse_primary(self) -> int: + tok = self._take() + if tok == "(": + value = self._parse_or() + if self._take() != ")": + raise RuntimeError("missing ')' in expression") + return value + + lowered = tok.lower() + if lowered == "y": + return TRI_Y + if lowered == "m": + return TRI_M + if lowered == "n": + return TRI_N + return self.resolver(tok) + + +def eval_dep_expr(expr: str, resolver: Callable[[str], int]) -> int: + text = (expr or "").strip() + if not text: + return TRI_Y + tokens = _tokenize_dep_expr(text) + parser = _DepExprParser(tokens, resolver) + return parser.parse() + + +def extract_dep_symbols(expr: str) -> List[str]: + text = (expr or "").strip() + if not text: + return [] + + out: List[str] = [] + seen: Set[str] = set() + for tok in _tokenize_dep_expr(text): + if tok in {"&&", "||", "!", "(", ")"}: + continue + low = tok.lower() + if low in {"y", "m", "n"}: + continue + if tok in seen: + continue + seen.add(tok) + out.append(tok) + return out def normalize_bool(raw: object, default: bool) -> bool: - if isinstance(raw, bool): - return raw - if isinstance(raw, (int, float)): - return raw != 0 + tri_default = TRI_Y if default else TRI_N + return normalize_tri(raw, tri_default, "bool") == TRI_Y + + +def _normalize_key_list(raw: object) -> Tuple[str, ...]: + items: List[str] = [] + if isinstance(raw, str): - text = raw.strip().lower() - if text in {"1", "on", "true", "yes", "y"}: - return True - if text in {"0", "off", "false", "no", "n"}: - return False - return default + source = re.split(r"[,\s]+", raw.strip()) + elif isinstance(raw, (list, tuple)): + source = [str(x) for x in raw] + else: + return () + + for item in source: + token = str(item).strip() + if not token: + continue + items.append(token) + + return tuple(items) def sanitize_token(name: str) -> str: @@ -89,10 +307,25 @@ def load_clks_options() -> List[OptionItem]: key = str(entry.get("key", "")).strip() title = str(entry.get("title", key)).strip() description = str(entry.get("description", "")).strip() - default = normalize_bool(entry.get("default", True), True) + kind = normalize_kind(entry.get("type", "bool")) + default = normalize_tri(entry.get("default", TRI_Y), TRI_Y, kind) + depends_on = str(entry.get("depends_on", entry.get("depends", ""))).strip() + selects = _normalize_key_list(entry.get("select", entry.get("selects", ()))) + implies = _normalize_key_list(entry.get("imply", entry.get("implies", ()))) if not key: continue - options.append(OptionItem(key=key, title=title, description=description, default=default)) + options.append( + OptionItem( + key=key, + title=title, + description=description, + kind=kind, + default=default, + depends_on=depends_on, + selects=selects, + implies=implies, + ) + ) if not options: raise RuntimeError(f"no CLKS feature options in {CLKS_FEATURES_PATH}") @@ -135,12 +368,23 @@ def discover_user_apps() -> List[OptionItem]: key = f"CLEONOS_USER_APP_{sanitize_token(app)}" title = f"{app}.elf [{section}]" description = f"Build and package user app '{app}' into ramdisk/{section}." - options.append(OptionItem(key=key, title=title, description=description, default=True)) + options.append( + OptionItem( + key=key, + title=title, + description=description, + kind="bool", + default=TRI_Y, + depends_on="", + selects=(), + implies=(), + ) + ) return options -def load_previous_values() -> Dict[str, bool]: +def load_previous_values() -> Dict[str, int]: if not CONFIG_JSON_PATH.exists(): return {} try: @@ -151,54 +395,62 @@ def load_previous_values() -> Dict[str, bool]: if not isinstance(raw, dict): return {} - out: Dict[str, bool] = {} + out: Dict[str, int] = {} for key, value in raw.items(): if not isinstance(key, str): continue - out[key] = normalize_bool(value, False) + out[key] = normalize_tri(value, TRI_N, "tristate") return out -def init_values(options: Iterable[OptionItem], previous: Dict[str, bool], use_defaults: bool) -> Dict[str, bool]: - values: Dict[str, bool] = {} +def init_values(options: Iterable[OptionItem], previous: Dict[str, int], use_defaults: bool) -> Dict[str, int]: + values: Dict[str, int] = {} for item in options: if not use_defaults and item.key in previous: - values[item.key] = previous[item.key] + values[item.key] = normalize_tri(previous[item.key], item.default, item.kind) else: values[item.key] = item.default return values -def _set_option_if_exists(values: Dict[str, bool], key: str, enabled: bool) -> None: +def _build_index(options: Iterable[OptionItem]) -> Dict[str, OptionItem]: + return {item.key: item for item in options} + + +def _set_option_if_exists(values: Dict[str, int], option_index: Dict[str, OptionItem], key: str, level: int) -> None: if key in values: - values[key] = enabled + item = option_index.get(key) + if item is None: + return + values[key] = normalize_tri(level, item.default, item.kind) -def _set_all_options(values: Dict[str, bool], options: List[OptionItem], enabled: bool) -> None: +def _set_all_options(values: Dict[str, int], options: List[OptionItem], level: int) -> None: for item in options: - values[item.key] = enabled + values[item.key] = normalize_tri(level, item.default, item.kind) -def apply_preset(preset: str, clks_options: List[OptionItem], user_options: List[OptionItem], values: Dict[str, bool]) -> None: +def apply_preset(preset: str, clks_options: List[OptionItem], user_options: List[OptionItem], values: Dict[str, int]) -> None: + option_index = _build_index(clks_options + user_options) preset_name = preset.strip().lower() if preset_name == "full": - _set_all_options(values, clks_options, True) - _set_all_options(values, user_options, True) + _set_all_options(values, clks_options, TRI_Y) + _set_all_options(values, user_options, TRI_Y) return if preset_name == "dev": - _set_all_options(values, clks_options, True) - _set_all_options(values, user_options, True) - _set_option_if_exists(values, "CLEONOS_CLKS_ENABLE_USERLAND_AUTO_EXEC", False) - _set_option_if_exists(values, "CLEONOS_CLKS_ENABLE_EXEC_SERIAL_LOG", True) - _set_option_if_exists(values, "CLEONOS_CLKS_ENABLE_PROCFS", True) - _set_option_if_exists(values, "CLEONOS_CLKS_ENABLE_IDLE_DEBUG_LOG", True) + _set_all_options(values, clks_options, TRI_Y) + _set_all_options(values, user_options, TRI_Y) + _set_option_if_exists(values, option_index, "CLEONOS_CLKS_ENABLE_USERLAND_AUTO_EXEC", TRI_N) + _set_option_if_exists(values, option_index, "CLEONOS_CLKS_ENABLE_EXEC_SERIAL_LOG", TRI_Y) + _set_option_if_exists(values, option_index, "CLEONOS_CLKS_ENABLE_PROCFS", TRI_Y) + _set_option_if_exists(values, option_index, "CLEONOS_CLKS_ENABLE_IDLE_DEBUG_LOG", TRI_Y) return if preset_name == "minimal": - _set_all_options(values, clks_options, True) - _set_all_options(values, user_options, False) + _set_all_options(values, clks_options, TRI_Y) + _set_all_options(values, user_options, TRI_N) clks_disable = [ "CLEONOS_CLKS_ENABLE_AUDIO", @@ -222,7 +474,7 @@ def apply_preset(preset: str, clks_options: List[OptionItem], user_options: List "CLEONOS_CLKS_ENABLE_SCHED_TASK_COUNT_LOG", ] for key in clks_disable: - _set_option_if_exists(values, key, False) + _set_option_if_exists(values, option_index, key, TRI_N) clks_enable = [ "CLEONOS_CLKS_ENABLE_KEYBOARD", @@ -240,7 +492,7 @@ def apply_preset(preset: str, clks_options: List[OptionItem], user_options: List "CLEONOS_CLKS_ENABLE_SHELL_MODE_LOG", ] for key in clks_enable: - _set_option_if_exists(values, key, True) + _set_option_if_exists(values, option_index, key, TRI_Y) user_enable_tokens = [ "SHELL", @@ -265,24 +517,271 @@ def apply_preset(preset: str, clks_options: List[OptionItem], user_options: List "TTYDRV", ] for token in user_enable_tokens: - _set_option_if_exists(values, f"CLEONOS_USER_APP_{token}", True) + _set_option_if_exists(values, option_index, f"CLEONOS_USER_APP_{token}", TRI_Y) return raise RuntimeError(f"unknown preset: {preset}") -def print_section(title: str, options: List[OptionItem], values: Dict[str, bool]) -> None: +def _allowed_values(item: OptionItem, min_required: int, max_selectable: int) -> List[int]: + upper = normalize_tri(max_selectable, TRI_N, item.kind) + lower = normalize_tri(min_required, TRI_N, item.kind) + if lower > upper: + lower = upper + + if item.kind == "tristate": + base = [TRI_N, TRI_M, TRI_Y] + else: + base = [TRI_N, TRI_Y] + values = [v for v in base if lower <= v <= upper] + if values: + return values + return [lower] + + +def _choose_select_level(src_value: int, dst_kind: str, is_imply: bool) -> int: + if src_value <= TRI_N: + return TRI_N + if dst_kind == "bool": + if is_imply: + return TRI_Y if src_value == TRI_Y else TRI_N + return TRI_Y + return src_value + + +def evaluate_config(options: List[OptionItem], values: Dict[str, int]) -> EvalResult: + option_index = _build_index(options) + depends_symbols = {item.key: extract_dep_symbols(item.depends_on) for item in options} + + effective: Dict[str, int] = {} + for item in options: + effective[item.key] = normalize_tri(values.get(item.key, item.default), item.default, item.kind) + + visible = {item.key: True for item in options} + min_required = {item.key: TRI_N for item in options} + max_selectable = {item.key: TRI_Y for item in options} + selected_by = {item.key: [] for item in options} + implied_by = {item.key: [] for item in options} + + max_rounds = max(1, len(options) * 8) + for _ in range(max_rounds): + dep_value: Dict[str, int] = {} + for item in options: + def _resolver(symbol: str) -> int: + return effective.get(symbol, TRI_N) + + try: + dep_value[item.key] = normalize_tri(eval_dep_expr(item.depends_on, _resolver), TRI_Y, "tristate") + except Exception: + dep_value[item.key] = TRI_N + + new_visible: Dict[str, bool] = {} + new_max: Dict[str, int] = {} + for item in options: + dep = dep_value[item.key] + new_visible[item.key] = dep > TRI_N + if item.kind == "bool": + new_max[item.key] = TRI_Y if dep == TRI_Y else TRI_N + else: + new_max[item.key] = dep + + new_min = {item.key: TRI_N for item in options} + new_selected_by = {item.key: [] for item in options} + new_implied_by = {item.key: [] for item in options} + + for src in options: + src_value = effective[src.key] + if src_value <= TRI_N: + continue + + for dst_key in src.selects: + dst = option_index.get(dst_key) + if dst is None: + continue + level = _choose_select_level(src_value, dst.kind, is_imply=False) + if level > new_min[dst_key]: + new_min[dst_key] = level + new_selected_by[dst_key].append(f"{src.key}={tri_char(src_value)}") + + for dst_key in src.implies: + dst = option_index.get(dst_key) + if dst is None: + continue + new_implied_by[dst_key].append(f"{src.key}={tri_char(src_value)}") + + changed = False + next_effective: Dict[str, int] = {} + for item in options: + req = normalize_tri(values.get(item.key, item.default), item.default, item.kind) + upper = normalize_tri(new_max[item.key], TRI_N, item.kind) + lower = normalize_tri(new_min[item.key], TRI_N, item.kind) + if lower > upper: + lower = upper + eff = req + if eff < lower: + eff = lower + if eff > upper: + eff = upper + eff = normalize_tri(eff, item.default, item.kind) + next_effective[item.key] = eff + if eff != effective[item.key]: + changed = True + + effective = next_effective + visible = new_visible + min_required = new_min + max_selectable = new_max + selected_by = new_selected_by + implied_by = new_implied_by + if not changed: + break + + for key in selected_by: + selected_by[key].sort() + implied_by[key].sort() + + return EvalResult( + effective=effective, + visible=visible, + min_required=min_required, + max_selectable=max_selectable, + selected_by=selected_by, + implied_by=implied_by, + depends_symbols=depends_symbols, + ) + + +def _option_on(value: int) -> bool: + return value > TRI_N + + +def _set_all(values: Dict[str, int], options: List[OptionItem], level: int) -> None: + for item in options: + values[item.key] = normalize_tri(level, item.default, item.kind) + + +def _set_option_value(values: Dict[str, int], item: OptionItem, level: int) -> None: + values[item.key] = normalize_tri(level, item.default, item.kind) + + +def _cycle_option_value(values: Dict[str, int], item: OptionItem, evaluation: EvalResult, step: int = 1) -> None: + allowed = _allowed_values(item, evaluation.min_required[item.key], evaluation.max_selectable[item.key]) + if not allowed: + return + current = normalize_tri(values.get(item.key, item.default), item.default, item.kind) + if current not in allowed: + current = evaluation.effective.get(item.key, item.default) + if current not in allowed: + current = allowed[0] + pos = allowed.index(current) + values[item.key] = allowed[(pos + step) % len(allowed)] + + +def _detail_lines(item: OptionItem, values: Dict[str, int], ev: EvalResult) -> List[str]: + req = normalize_tri(values.get(item.key, item.default), item.default, item.kind) + eff = ev.effective.get(item.key, item.default) + visible = ev.visible.get(item.key, True) + floor_val = ev.min_required.get(item.key, TRI_N) + ceil_val = ev.max_selectable.get(item.key, TRI_Y) + allowed = ",".join(tri_char(v) for v in _allowed_values(item, floor_val, ceil_val)) + + lines = [ + f"kind: {item.kind}", + f"requested: {tri_char(req)}", + f"effective: {tri_char(eff)}", + f"visible: {'yes' if visible else 'no'}", + f"allowed: {allowed}", + f"depends: {item.depends_on or ''}", + ] + + symbols = ev.depends_symbols.get(item.key, []) + if symbols: + parts = [f"{sym}={tri_char(ev.effective.get(sym, TRI_N))}" for sym in symbols] + lines.append("depends values: " + ", ".join(parts)) + else: + lines.append("depends values: ") + + sel = ev.selected_by.get(item.key, []) + imp = ev.implied_by.get(item.key, []) + lines.append("selected by: " + (", ".join(sel) if sel else "")) + lines.append("implied by: " + (", ".join(imp) if imp else "")) + return lines + + +def _tri_word(value: int) -> str: + if value >= TRI_Y: + return "Enabled" + if value >= TRI_M: + return "Module" + return "Disabled" + + +def _detail_lines_human(item: OptionItem, values: Dict[str, int], ev: EvalResult) -> List[str]: + req = normalize_tri(values.get(item.key, item.default), item.default, item.kind) + eff = ev.effective.get(item.key, item.default) + visible = ev.visible.get(item.key, True) + floor_val = ev.min_required.get(item.key, TRI_N) + ceil_val = ev.max_selectable.get(item.key, TRI_Y) + allowed_vals = _allowed_values(item, floor_val, ceil_val) + + symbols = ev.depends_symbols.get(item.key, []) + if symbols: + dep_values = ", ".join(f"{sym}={tri_char(ev.effective.get(sym, TRI_N))}" for sym in symbols) + else: + dep_values = "" + + selected_chain = ", ".join(ev.selected_by.get(item.key, [])) or "" + implied_chain = ", ".join(ev.implied_by.get(item.key, [])) or "" + allowed_text = "/".join(f"{tri_char(v)}({_tri_word(v)})" for v in allowed_vals) + + return [ + "State:", + f" Running now : {tri_char(eff)} ({_tri_word(eff)})", + f" Your choice : {tri_char(req)} ({_tri_word(req)})", + f" Type : {item.kind}", + f" Visible : {'yes' if visible else 'no'}", + f" Allowed now : {allowed_text}", + "Why:", + f" depends on : {item.depends_on or ''}", + f" depends value : {dep_values}", + f" selected by : {selected_chain}", + f" implied by : {implied_chain}", + "Notes:", + " [a/b] in list means [effective/requested].", + f" {item.description}", + ] + + +def print_section(title: str, section_options: List[OptionItem], all_options: List[OptionItem], values: Dict[str, int]) -> None: + ev = evaluate_config(all_options, values) print() print(f"== {title} ==") - for idx, item in enumerate(options, start=1): - mark = "x" if values.get(item.key, item.default) else " " - print(f"{idx:3d}. [{mark}] {item.title}") - print("Commands: toggle, a enable-all, n disable-all, i info, b back") + for idx, item in enumerate(section_options, start=1): + req = normalize_tri(values.get(item.key, item.default), item.default, item.kind) + eff = ev.effective.get(item.key, item.default) + visible = ev.visible.get(item.key, True) + floor_val = ev.min_required.get(item.key, TRI_N) + ceil_val = ev.max_selectable.get(item.key, TRI_Y) + locked = len(_allowed_values(item, floor_val, ceil_val)) <= 1 + flags: List[str] = [] + if not visible: + flags.append("hidden") + if locked: + flags.append("locked") + if ev.selected_by.get(item.key): + flags.append("selected") + if ev.implied_by.get(item.key): + flags.append("implied") + flag_text = f" ({', '.join(flags)})" if flags else "" + print(f"{idx:3d}. [{tri_char(eff)}|{tri_char(req)}] {item.title}{flag_text}") + print("Commands: cycle, a all->y, n all->n, m all->m, i info, b back") + print("Legend: [effective|requested], states: y/m/n") -def section_loop(title: str, options: List[OptionItem], values: Dict[str, bool]) -> None: +def section_loop(title: str, section_options: List[OptionItem], all_options: List[OptionItem], values: Dict[str, int]) -> None: while True: - print_section(title, options, values) + ev = evaluate_config(all_options, values) + print_section(title, section_options, all_options, values) raw = input(f"{title}> ").strip() if not raw: continue @@ -291,24 +790,28 @@ def section_loop(title: str, options: List[OptionItem], values: Dict[str, bool]) if lower in {"b", "back", "q", "quit"}: return if lower in {"a", "all", "on"}: - for item in options: - values[item.key] = True + for item in section_options: + _set_option_value(values, item, TRI_Y) continue if lower in {"n", "none", "off"}: - for item in options: - values[item.key] = False + for item in section_options: + _set_option_value(values, item, TRI_N) + continue + if lower in {"m", "mod", "module"}: + for item in section_options: + _set_option_value(values, item, TRI_M) continue if lower.startswith("i "): token = lower[2:].strip() if token.isdigit(): idx = int(token) - if 1 <= idx <= len(options): - item = options[idx - 1] - state = "ON" if values.get(item.key, item.default) else "OFF" + if 1 <= idx <= len(section_options): + item = section_options[idx - 1] print() print(f"[{idx}] {item.title}") print(f"key: {item.key}") - print(f"state: {state}") + for line in _detail_lines(item, values, ev): + print(line) print(f"desc: {item.description}") continue print("invalid info index") @@ -316,9 +819,9 @@ def section_loop(title: str, options: List[OptionItem], values: Dict[str, bool]) if raw.isdigit(): idx = int(raw) - if 1 <= idx <= len(options): - item = options[idx - 1] - values[item.key] = not values.get(item.key, item.default) + if 1 <= idx <= len(section_options): + item = section_options[idx - 1] + _cycle_option_value(values, item, ev) else: print("invalid index") continue @@ -476,13 +979,23 @@ def _draw_progress_bar( _safe_addnstr(stdscr, y, x + bar_w + 1, f"{enabled_count:>3}/{total_count:<3}", off_attr | curses.A_BOLD) -def _option_enabled(values: Dict[str, bool], item: OptionItem) -> bool: - return values.get(item.key, item.default) +def _option_enabled(ev: EvalResult, item: OptionItem) -> bool: + return ev.effective.get(item.key, item.default) > TRI_N -def _set_all(values: Dict[str, bool], options: List[OptionItem], enabled: bool) -> None: - for item in options: - values[item.key] = enabled +def _option_flags(item: OptionItem, ev: EvalResult) -> str: + flags: List[str] = [] + if not ev.visible.get(item.key, True): + flags.append("hidden") + if len(_allowed_values(item, ev.min_required.get(item.key, TRI_N), ev.max_selectable.get(item.key, TRI_Y))) <= 1: + flags.append("locked") + if ev.selected_by.get(item.key): + flags.append("selected") + if ev.implied_by.get(item.key): + flags.append("implied") + if not flags: + return "-" + return ",".join(flags) def _draw_scrollbar(stdscr, y: int, x: int, height: int, total: int, top: int, visible: int, track_attr: int, thumb_attr: int) -> None: @@ -508,11 +1021,19 @@ def _draw_scrollbar(stdscr, y: int, x: int, height: int, total: int, top: int, v _safe_addch(stdscr, y + thumb_y + r, x, "#", thumb_attr) -def _run_ncurses_section(stdscr, theme: Dict[str, int], title: str, options: List[OptionItem], values: Dict[str, bool]) -> None: +def _run_ncurses_section( + stdscr, + theme: Dict[str, int], + title: str, + section_options: List[OptionItem], + all_options: List[OptionItem], + values: Dict[str, int], +) -> None: selected = 0 top = 0 while True: + ev = evaluate_config(all_options, values) stdscr.erase() h, w = stdscr.getmaxyx() @@ -541,12 +1062,13 @@ def _run_ncurses_section(stdscr, theme: Dict[str, int], title: str, options: Lis detail_box_w = w - left_w _safe_addnstr(stdscr, 0, 0, f" CLeonOS menuconfig / {title} ", theme["header"]) - enabled_count = sum(1 for item in options if _option_enabled(values, item)) + enabled_count = sum(1 for item in section_options if _option_enabled(ev, item)) + module_count = sum(1 for item in section_options if ev.effective.get(item.key, TRI_N) == TRI_M) _safe_addnstr( stdscr, 1, 0, - f" {enabled_count}/{len(options)} enabled | Arrow/jk move Space toggle a/n all PgUp/PgDn Enter/ESC back ", + f" on:{enabled_count} mod:{module_count} total:{len(section_options)} | Space cycle a/n/m all Enter/ESC back ", theme["subtitle"], ) @@ -561,8 +1083,8 @@ def _run_ncurses_section(stdscr, theme: Dict[str, int], title: str, options: Lis if selected < 0: selected = 0 - if selected >= len(options): - selected = max(0, len(options) - 1) + if selected >= len(section_options): + selected = max(0, len(section_options) - 1) if selected < top: top = selected @@ -573,14 +1095,17 @@ def _run_ncurses_section(stdscr, theme: Dict[str, int], title: str, options: Lis for row in range(visible): idx = top + row - if idx >= len(options): + if idx >= len(section_options): break - item = options[idx] - enabled = _option_enabled(values, item) - mark = "x" if enabled else " " + item = section_options[idx] + req = normalize_tri(values.get(item.key, item.default), item.default, item.kind) + eff = ev.effective.get(item.key, item.default) + flags = _option_flags(item, ev) prefix = ">" if idx == selected else " " - line = f"{prefix} {idx + 1:03d} [{mark}] {item.title}" - base_attr = theme["enabled"] if enabled else theme["disabled"] + line = f"{prefix} {idx + 1:03d} [{tri_char(eff)}|{tri_char(req)}] {item.title}" + if flags != "-": + line += f" [{flags}]" + base_attr = theme["enabled"] if eff > TRI_N else theme["disabled"] attr = theme["selected"] if idx == selected else base_attr _safe_addnstr(stdscr, list_inner_y + row, list_inner_x, line, attr) @@ -589,22 +1114,24 @@ def _run_ncurses_section(stdscr, theme: Dict[str, int], title: str, options: Lis list_inner_y, list_box_x + list_box_w - 2, list_inner_h, - len(options), + len(section_options), top, visible, theme["scroll_track"], theme["scroll_thumb"], ) - if options: - cur = options[selected] + if section_options: + cur = section_options[selected] detail_inner_y = detail_box_y + 1 detail_inner_x = detail_box_x + 2 detail_inner_w = detail_box_w - 4 detail_inner_h = detail_box_h - 2 - state_text = "ENABLED" if _option_enabled(values, cur) else "DISABLED" - state_attr = theme["status_ok"] if _option_enabled(values, cur) else theme["status_warn"] + eff = ev.effective.get(cur.key, cur.default) + req = normalize_tri(values.get(cur.key, cur.default), cur.default, cur.kind) + state_text = f"effective={tri_char(eff)} requested={tri_char(req)} kind={cur.kind}" + state_attr = theme["status_ok"] if eff > TRI_N else theme["status_warn"] _safe_addnstr(stdscr, detail_inner_y + 0, detail_inner_x, cur.title, theme["value_label"]) _safe_addnstr(stdscr, detail_inner_y + 1, detail_inner_x, cur.key, theme["value_key"]) @@ -613,7 +1140,7 @@ def _run_ncurses_section(stdscr, theme: Dict[str, int], title: str, options: Lis stdscr, detail_inner_y + 3, detail_inner_x, - f"Item: {selected + 1}/{len(options)}", + f"Item: {selected + 1}/{len(section_options)} flags: {_option_flags(cur, ev)}", theme["value_label"], ) _draw_progress_bar( @@ -622,19 +1149,30 @@ def _run_ncurses_section(stdscr, theme: Dict[str, int], title: str, options: Lis detail_inner_x, max(12, detail_inner_w), enabled_count, - max(1, len(options)), + max(1, len(section_options)), theme["progress_on"], theme["progress_off"], ) desc_title_y = detail_inner_y + 6 - _safe_addnstr(stdscr, desc_title_y, detail_inner_x, "Description:", theme["value_label"]) - wrapped = textwrap.wrap(cur.description, max(12, detail_inner_w)) + _safe_addnstr(stdscr, desc_title_y, detail_inner_x, "Details:", theme["value_label"]) + raw_lines = _detail_lines_human(cur, values, ev) + wrapped_lines: List[str] = [] + wrap_width = max(12, detail_inner_w) + for raw_line in raw_lines: + if not raw_line: + wrapped_lines.append("") + continue + chunks = textwrap.wrap(raw_line, wrap_width) + if chunks: + wrapped_lines.extend(chunks) + else: + wrapped_lines.append(raw_line) max_desc_lines = max(1, detail_inner_h - 8) - for i, part in enumerate(wrapped[:max_desc_lines]): + for i, part in enumerate(wrapped_lines[:max_desc_lines]): _safe_addnstr(stdscr, desc_title_y + 1 + i, detail_inner_x, part, 0) - _safe_addnstr(stdscr, h - 1, 0, " Space:toggle a:all-on n:all-off Enter/ESC:back ", theme["help"]) + _safe_addnstr(stdscr, h - 1, 0, " Space:cycle a:all-y n:all-n m:all-m Enter/ESC:back ", theme["help"]) stdscr.refresh() key = stdscr.getch() @@ -657,23 +1195,27 @@ def _run_ncurses_section(stdscr, theme: Dict[str, int], title: str, options: Lis selected = 0 continue if key == curses.KEY_END: - selected = max(0, len(options) - 1) + selected = max(0, len(section_options) - 1) continue if key == ord(" "): - if options: - item = options[selected] - values[item.key] = not _option_enabled(values, item) + if section_options: + item = section_options[selected] + _cycle_option_value(values, item, ev) continue if key in (ord("a"), ord("A")): - _set_all(values, options, True) + _set_all(values, section_options, TRI_Y) continue if key in (ord("n"), ord("N")): - _set_all(values, options, False) + _set_all(values, section_options, TRI_N) + continue + if key in (ord("m"), ord("M")): + _set_all(values, section_options, TRI_M) continue -def _run_ncurses_main(stdscr, clks_options: List[OptionItem], user_options: List[OptionItem], values: Dict[str, bool]) -> bool: +def _run_ncurses_main(stdscr, clks_options: List[OptionItem], user_options: List[OptionItem], values: Dict[str, int]) -> bool: theme = _curses_theme() + all_options = clks_options + user_options try: curses.curs_set(0) except Exception: @@ -685,8 +1227,9 @@ def _run_ncurses_main(stdscr, clks_options: List[OptionItem], user_options: List stdscr.erase() h, w = stdscr.getmaxyx() - clks_on = sum(1 for item in clks_options if _option_enabled(values, item)) - user_on = sum(1 for item in user_options if _option_enabled(values, item)) + ev = evaluate_config(all_options, values) + clks_on = sum(1 for item in clks_options if _option_enabled(ev, item)) + user_on = sum(1 for item in user_options if _option_enabled(ev, item)) total_items = len(clks_options) + len(user_options) total_on = clks_on + user_on @@ -745,9 +1288,9 @@ def _run_ncurses_main(stdscr, clks_options: List[OptionItem], user_options: List continue if key in (curses.KEY_ENTER, 10, 13): if selected == 0: - _run_ncurses_section(stdscr, theme, "CLKS", clks_options, values) + _run_ncurses_section(stdscr, theme, "CLKS", clks_options, all_options, values) elif selected == 1: - _run_ncurses_section(stdscr, theme, "USER", user_options, values) + _run_ncurses_section(stdscr, theme, "USER", user_options, all_options, values) elif selected == 2: return True else: @@ -755,7 +1298,7 @@ def _run_ncurses_main(stdscr, clks_options: List[OptionItem], user_options: List continue -def interactive_menu_ncurses(clks_options: List[OptionItem], user_options: List[OptionItem], values: Dict[str, bool]) -> bool: +def interactive_menu_ncurses(clks_options: List[OptionItem], user_options: List[OptionItem], values: Dict[str, int]) -> bool: if curses is None: raise RuntimeError("python curses module unavailable (install python3-curses / ncurses)") if "TERM" not in os.environ or not os.environ["TERM"]: @@ -763,7 +1306,7 @@ def interactive_menu_ncurses(clks_options: List[OptionItem], user_options: List[ return bool(curses.wrapper(lambda stdscr: _run_ncurses_main(stdscr, clks_options, user_options, values))) -def interactive_menu_gui(clks_options: List[OptionItem], user_options: List[OptionItem], values: Dict[str, bool]) -> bool: +def interactive_menu_gui(clks_options: List[OptionItem], user_options: List[OptionItem], values: Dict[str, int]) -> bool: if QtWidgets is None or QtCore is None: raise RuntimeError("python PySide unavailable (install PySide6, or use --plain)") @@ -777,12 +1320,9 @@ def interactive_menu_gui(clks_options: List[OptionItem], user_options: List[Opti app = QtWidgets.QApplication(["menuconfig-gui"]) owns_app = True - qt_checked = getattr(QtCore.Qt, "Checked", QtCore.Qt.CheckState.Checked) - qt_unchecked = getattr(QtCore.Qt, "Unchecked", QtCore.Qt.CheckState.Unchecked) qt_horizontal = getattr(QtCore.Qt, "Horizontal", QtCore.Qt.Orientation.Horizontal) qt_item_enabled = getattr(QtCore.Qt, "ItemIsEnabled", QtCore.Qt.ItemFlag.ItemIsEnabled) qt_item_selectable = getattr(QtCore.Qt, "ItemIsSelectable", QtCore.Qt.ItemFlag.ItemIsSelectable) - qt_item_checkable = getattr(QtCore.Qt, "ItemIsUserCheckable", QtCore.Qt.ItemFlag.ItemIsUserCheckable) resize_to_contents = getattr( QtWidgets.QHeaderView, @@ -833,14 +1373,16 @@ def interactive_menu_gui(clks_options: List[OptionItem], user_options: List[Opti tabs = QtWidgets.QTabWidget() root_layout.addWidget(tabs, 1) + all_options = clks_options + user_options def update_summary() -> None: - clks_on = sum(1 for item in clks_options if values.get(item.key, item.default)) - user_on = sum(1 for item in user_options if values.get(item.key, item.default)) + ev = evaluate_config(all_options, values) + clks_on = sum(1 for item in clks_options if ev.effective.get(item.key, item.default) > TRI_N) + user_on = sum(1 for item in user_options if ev.effective.get(item.key, item.default) > TRI_N) total = len(clks_options) + len(user_options) summary_label.setText( - f"CLKS: {clks_on}/{len(clks_options)} enabled " - f"User: {user_on}/{len(user_options)} enabled " + f"CLKS: {clks_on}/{len(clks_options)} on " + f"User: {user_on}/{len(user_options)} on " f"Total: {clks_on + user_on}/{total}" ) @@ -862,11 +1404,17 @@ def interactive_menu_gui(clks_options: List[OptionItem], user_options: List[Opti toolbar.addWidget(title_label) toolbar.addStretch(1) - toggle_btn = QtWidgets.QPushButton("Toggle Selected") - enable_all_btn = QtWidgets.QPushButton("Enable All") - disable_all_btn = QtWidgets.QPushButton("Disable All") + toggle_btn = QtWidgets.QPushButton("Cycle Selected") + set_y_btn = QtWidgets.QPushButton("Set Y") + set_m_btn = QtWidgets.QPushButton("Set M") + set_n_btn = QtWidgets.QPushButton("Set N") + enable_all_btn = QtWidgets.QPushButton("All Y") + disable_all_btn = QtWidgets.QPushButton("All N") toolbar.addWidget(enable_all_btn) toolbar.addWidget(disable_all_btn) + toolbar.addWidget(set_m_btn) + toolbar.addWidget(set_y_btn) + toolbar.addWidget(set_n_btn) toolbar.addWidget(toggle_btn) layout.addLayout(toolbar) @@ -876,11 +1424,12 @@ def interactive_menu_gui(clks_options: List[OptionItem], user_options: List[Opti left = QtWidgets.QWidget() left_layout = QtWidgets.QVBoxLayout(left) left_layout.setContentsMargins(0, 0, 0, 0) - self.table = QtWidgets.QTableWidget(len(options), 2) - self.table.setHorizontalHeaderLabels(["On", "Option"]) + self.table = QtWidgets.QTableWidget(len(options), 3) + self.table.setHorizontalHeaderLabels(["Value", "Option", "Status"]) self.table.verticalHeader().setVisible(False) self.table.horizontalHeader().setSectionResizeMode(0, resize_to_contents) self.table.horizontalHeader().setSectionResizeMode(1, stretch_mode) + self.table.horizontalHeader().setSectionResizeMode(2, resize_to_contents) self.table.setSelectionBehavior(select_rows) self.table.setSelectionMode(extended_selection) self.table.setAlternatingRowColors(True) @@ -902,10 +1451,13 @@ def interactive_menu_gui(clks_options: List[OptionItem], user_options: List[Opti splitter.setStretchFactor(1, 2) toggle_btn.clicked.connect(self.toggle_selected) + set_y_btn.clicked.connect(lambda: self.set_selected(TRI_Y)) + set_m_btn.clicked.connect(lambda: self.set_selected(TRI_M)) + set_n_btn.clicked.connect(lambda: self.set_selected(TRI_N)) enable_all_btn.clicked.connect(self.enable_all) disable_all_btn.clicked.connect(self.disable_all) self.table.itemSelectionChanged.connect(self._on_selection_changed) - self.table.itemChanged.connect(self._on_item_changed) + self.table.itemDoubleClicked.connect(self._on_item_activated) self.refresh(keep_selection=False) if self.options: @@ -935,10 +1487,14 @@ def interactive_menu_gui(clks_options: List[OptionItem], user_options: List[Opti return item = self.options[row] - enabled = values.get(item.key, item.default) - self.state_label.setText(f"State: {'ENABLED' if enabled else 'DISABLED'}") + ev = evaluate_config(all_options, values) + eff = ev.effective.get(item.key, item.default) + req = normalize_tri(values.get(item.key, item.default), item.default, item.kind) + self.state_label.setText( + f"State: eff={tri_char(eff)} req={tri_char(req)} kind={item.kind} flags={_option_flags(item, ev)}" + ) self.key_label.setText(f"Key: {item.key}") - self.detail_text.setPlainText(f"{item.title}\n\n{item.description}") + self.detail_text.setPlainText("\n".join([item.title, ""] + _detail_lines(item, values, ev) + ["", item.description])) def _on_selection_changed(self) -> None: rows = self._selected_rows() @@ -950,44 +1506,37 @@ def interactive_menu_gui(clks_options: List[OptionItem], user_options: List[Opti if len(rows) > 1: self.state_label.setText(f"State: {len(rows)} items selected") self.key_label.setText("Key: ") - self.detail_text.setPlainText("Multiple options selected.\nUse Toggle Selected to flip all selected entries.") + self.detail_text.setPlainText("Multiple options selected.\nUse Cycle/Set buttons to update selected entries.") return self._show_detail(-1) - def _on_item_changed(self, changed_item) -> None: + def _on_item_activated(self, changed_item) -> None: if self._updating or changed_item is None: return - - if changed_item.column() != 0: - return - row = changed_item.row() - if row < 0 or row >= len(self.options): return - - self.options[row] - values[self.options[row].key] = changed_item.checkState() == qt_checked - self._on_selection_changed() - update_summary() + ev = evaluate_config(all_options, values) + _cycle_option_value(values, self.options[row], ev) + self.refresh(keep_selection=True) def refresh(self, keep_selection: bool = True) -> None: prev_rows = self._selected_rows() if keep_selection else [] self._updating = True self.table.setRowCount(len(self.options)) + ev = evaluate_config(all_options, values) for row, item in enumerate(self.options): - enabled = values.get(item.key, item.default) - check_item = self.table.item(row, 0) - - if check_item is None: - check_item = QtWidgets.QTableWidgetItem("") - check_item.setFlags(qt_item_enabled | qt_item_selectable | qt_item_checkable) - self.table.setItem(row, 0, check_item) - - check_item.setCheckState(qt_checked if enabled else qt_unchecked) + req = normalize_tri(values.get(item.key, item.default), item.default, item.kind) + eff = ev.effective.get(item.key, item.default) + value_item = self.table.item(row, 0) + if value_item is None: + value_item = QtWidgets.QTableWidgetItem("") + value_item.setFlags(qt_item_enabled | qt_item_selectable) + self.table.setItem(row, 0, value_item) + value_item.setText(f"{tri_char(eff)} (req:{tri_char(req)})") title_item = self.table.item(row, 1) if title_item is None: @@ -997,6 +1546,13 @@ def interactive_menu_gui(clks_options: List[OptionItem], user_options: List[Opti else: title_item.setText(item.title) + status_item = self.table.item(row, 2) + if status_item is None: + status_item = QtWidgets.QTableWidgetItem("") + status_item.setFlags(qt_item_enabled | qt_item_selectable) + self.table.setItem(row, 2, status_item) + status_item.setText(_option_flags(item, ev)) + self._updating = False self.table.clearSelection() @@ -1013,39 +1569,30 @@ def interactive_menu_gui(clks_options: List[OptionItem], user_options: List[Opti if not rows: return - self._updating = True for row in rows: item = self.options[row] - new_state = not values.get(item.key, item.default) - values[item.key] = new_state - check_item = self.table.item(row, 0) - if check_item is not None: - check_item.setCheckState(qt_checked if new_state else qt_unchecked) - self._updating = False - self._on_selection_changed() - update_summary() + ev = evaluate_config(all_options, values) + _cycle_option_value(values, item, ev) + self.refresh(keep_selection=True) + + def set_selected(self, state: int) -> None: + rows = self._selected_rows() + if not rows: + return + for row in rows: + item = self.options[row] + _set_option_value(values, item, state) + self.refresh(keep_selection=True) def enable_all(self) -> None: - self._updating = True - for row, item in enumerate(self.options): - values[item.key] = True - check_item = self.table.item(row, 0) - if check_item is not None: - check_item.setCheckState(qt_checked) - self._updating = False - self._on_selection_changed() - update_summary() + for item in self.options: + _set_option_value(values, item, TRI_Y) + self.refresh(keep_selection=False) def disable_all(self) -> None: - self._updating = True - for row, item in enumerate(self.options): - values[item.key] = False - check_item = self.table.item(row, 0) - if check_item is not None: - check_item.setCheckState(qt_unchecked) - self._updating = False - self._on_selection_changed() - update_summary() + for item in self.options: + _set_option_value(values, item, TRI_N) + self.refresh(keep_selection=False) clks_panel = _SectionPanel("CLKS Features", clks_options) user_panel = _SectionPanel("User Apps", user_options) @@ -1054,7 +1601,7 @@ def interactive_menu_gui(clks_options: List[OptionItem], user_options: List[Opti update_summary() footer = QtWidgets.QHBoxLayout() - footer.addWidget(QtWidgets.QLabel("Tip: select rows and click Toggle Selected.")) + footer.addWidget(QtWidgets.QLabel("Tip: double-click a row to cycle, or use Set/Cycle buttons.")) footer.addStretch(1) save_btn = QtWidgets.QPushButton("Save and Exit") @@ -1082,11 +1629,18 @@ def interactive_menu_gui(clks_options: List[OptionItem], user_options: List[Opti return result["save"] -def write_outputs(all_values: Dict[str, bool], ordered_options: List[OptionItem]) -> None: +def write_outputs(all_values: Dict[str, int], ordered_options: List[OptionItem]) -> None: MENUCONFIG_DIR.mkdir(parents=True, exist_ok=True) - ordered_keys = [item.key for item in ordered_options] - output_values: Dict[str, bool] = {key: all_values[key] for key in ordered_keys if key in all_values} + output_values: Dict[str, object] = {} + for item in ordered_options: + if item.key not in all_values: + continue + value = normalize_tri(all_values[item.key], item.default, item.kind) + if item.kind == "bool": + output_values[item.key] = value == TRI_Y + else: + output_values[item.key] = tri_char(value) CONFIG_JSON_PATH.write_text( json.dumps(output_values, ensure_ascii=True, indent=2, sort_keys=True) + "\n", @@ -1099,32 +1653,46 @@ def write_outputs(all_values: Dict[str, bool], ordered_options: List[OptionItem] 'set(CLEONOS_MENUCONFIG_LOADED ON CACHE BOOL "CLeonOS menuconfig loaded" FORCE)', ] for item in ordered_options: - value = "ON" if all_values.get(item.key, item.default) else "OFF" - lines.append(f'set({item.key} {value} CACHE BOOL "{item.title}" FORCE)') + value = normalize_tri(all_values.get(item.key, item.default), item.default, item.kind) + if item.kind == "bool": + cmake_value = "ON" if value == TRI_Y else "OFF" + lines.append(f'set({item.key} {cmake_value} CACHE BOOL "{item.title}" FORCE)') + else: + cmake_value = tri_char(value).upper() + lines.append(f'set({item.key} "{cmake_value}" CACHE STRING "{item.title}" FORCE)') + lines.append( + f'set({item.key}_IS_ENABLED {"ON" if value > TRI_N else "OFF"} ' + f'CACHE BOOL "{item.title} enabled(y|m)" FORCE)' + ) CONFIG_CMAKE_PATH.write_text("\n".join(lines) + "\n", encoding="utf-8") -def show_summary(clks_options: List[OptionItem], user_options: List[OptionItem], values: Dict[str, bool]) -> None: - clks_on = sum(1 for item in clks_options if values.get(item.key, item.default)) - user_on = sum(1 for item in user_options if values.get(item.key, item.default)) +def show_summary(clks_options: List[OptionItem], user_options: List[OptionItem], values: Dict[str, int]) -> None: + all_options = clks_options + user_options + ev = evaluate_config(all_options, values) + clks_on = sum(1 for item in clks_options if ev.effective.get(item.key, item.default) > TRI_N) + user_on = sum(1 for item in user_options if ev.effective.get(item.key, item.default) > TRI_N) + clks_m = sum(1 for item in clks_options if ev.effective.get(item.key, item.default) == TRI_M) + user_m = sum(1 for item in user_options if ev.effective.get(item.key, item.default) == TRI_M) print() print("========== CLeonOS menuconfig ==========") - print(f"1) CLKS features : {clks_on}/{len(clks_options)} enabled") - print(f"2) User features : {user_on}/{len(user_options)} enabled") + print(f"1) CLKS features : on={clks_on} m={clks_m} total={len(clks_options)}") + print(f"2) User features : on={user_on} m={user_m} total={len(user_options)}") print("s) Save and exit") print("q) Quit without saving") -def interactive_menu(clks_options: List[OptionItem], user_options: List[OptionItem], values: Dict[str, bool]) -> bool: +def interactive_menu(clks_options: List[OptionItem], user_options: List[OptionItem], values: Dict[str, int]) -> bool: + all_options = clks_options + user_options while True: show_summary(clks_options, user_options, values) choice = input("Select> ").strip().lower() if choice == "1": - section_loop("CLKS", clks_options, values) + section_loop("CLKS", clks_options, all_options, values) continue if choice == "2": - section_loop("USER", user_options, values) + section_loop("USER", user_options, all_options, values) continue if choice in {"s", "save"}: return True @@ -1133,15 +1701,19 @@ def interactive_menu(clks_options: List[OptionItem], user_options: List[OptionIt print("unknown selection") -def parse_set_overrides(values: Dict[str, bool], kv_pairs: List[str]) -> None: +def parse_set_overrides(values: Dict[str, int], option_index: Dict[str, OptionItem], kv_pairs: List[str]) -> None: for pair in kv_pairs: if "=" not in pair: - raise RuntimeError(f"invalid --set entry: {pair!r}, expected KEY=ON|OFF") + raise RuntimeError(f"invalid --set entry: {pair!r}, expected KEY=Y|M|N") key, raw = pair.split("=", 1) key = key.strip() if not key: raise RuntimeError(f"invalid --set entry: {pair!r}, empty key") - values[key] = normalize_bool(raw, False) + item = option_index.get(key) + if item is None: + values[key] = normalize_tri(raw, TRI_N, "tristate") + else: + values[key] = normalize_tri(raw, item.default, item.kind) def parse_args() -> argparse.Namespace: @@ -1159,7 +1731,7 @@ def parse_args() -> argparse.Namespace: "--set", action="append", default=[], - metavar="KEY=ON|OFF", + metavar="KEY=Y|M|N", help="override one option before save (can be repeated)", ) return parser.parse_args() @@ -1181,7 +1753,8 @@ def main() -> int: if args.preset: apply_preset(args.preset, clks_options, user_options, values) - parse_set_overrides(values, args.set) + option_index = _build_index(all_options) + parse_set_overrides(values, option_index, args.set) should_save = args.non_interactive if not args.non_interactive: @@ -1199,7 +1772,8 @@ def main() -> int: print("menuconfig: no changes saved") return 0 - write_outputs(values, all_options) + final_eval = evaluate_config(all_options, values) + write_outputs(final_eval.effective, all_options) print(f"menuconfig: wrote {CONFIG_JSON_PATH}") print(f"menuconfig: wrote {CONFIG_CMAKE_PATH}") return 0 diff --git a/wine/README.md b/wine/README.md index 64cacaf..0aaaff7 100644 --- a/wine/README.md +++ b/wine/README.md @@ -37,13 +37,17 @@ python wine/cleonos_wine.py build/x86_64/ramdisk_root/shell/shell.elf --rootfs b ## 支持 - ELF64 (x86_64) PT_LOAD 段装载 -- CLeonOS `int 0x80` syscall 0..60 +- CLeonOS `int 0x80` syscall 0..80(含 `FD_*`、`PROC_*`、`STATS_*`、`EXEC_PATHV_IO`) - TTY 输出与键盘输入队列 - rootfs 文件/目录访问(`FS_*`) - `/temp` 写入限制(`FS_MKDIR/WRITE/APPEND/REMOVE`) - `EXEC_PATH/EXEC_PATHV` 执行 ELF(带深度限制) +- `EXEC_PATHV_IO`(支持 stdio fd 继承/重定向) - `SPAWN_PATH/SPAWN_PATHV/WAITPID/EXIT/SLEEP_TICKS/YIELD` - 进程 `argv/env` 查询(`PROC_ARGC/PROC_ARGV/PROC_ENVC/PROC_ENV`) +- 进程枚举与快照(`PROC_COUNT/PROC_PID_AT/PROC_SNAPSHOT/PROC_KILL`) +- syscall 统计(`STATS_TOTAL/STATS_ID_COUNT/STATS_RECENT_*`) +- 文件描述符(`FD_OPEN/FD_READ/FD_WRITE/FD_CLOSE/FD_DUP`) - 异常退出状态编码与故障元信息(`PROC_LAST_SIGNAL/PROC_FAULT_*`) ## 参数 diff --git a/wine/cleonos_wine_lib/__pycache__/constants.cpython-313.pyc b/wine/cleonos_wine_lib/__pycache__/constants.cpython-313.pyc index a68ee79..ede9991 100644 Binary files a/wine/cleonos_wine_lib/__pycache__/constants.cpython-313.pyc and b/wine/cleonos_wine_lib/__pycache__/constants.cpython-313.pyc differ diff --git a/wine/cleonos_wine_lib/__pycache__/runner.cpython-313.pyc b/wine/cleonos_wine_lib/__pycache__/runner.cpython-313.pyc index 34881e1..dea5b2a 100644 Binary files a/wine/cleonos_wine_lib/__pycache__/runner.cpython-313.pyc and b/wine/cleonos_wine_lib/__pycache__/runner.cpython-313.pyc differ diff --git a/wine/cleonos_wine_lib/__pycache__/state.cpython-313.pyc b/wine/cleonos_wine_lib/__pycache__/state.cpython-313.pyc index 04393ef..9359408 100644 Binary files a/wine/cleonos_wine_lib/__pycache__/state.cpython-313.pyc and b/wine/cleonos_wine_lib/__pycache__/state.cpython-313.pyc differ diff --git a/wine/cleonos_wine_lib/constants.py b/wine/cleonos_wine_lib/constants.py index d070515..310aa0f 100644 --- a/wine/cleonos_wine_lib/constants.py +++ b/wine/cleonos_wine_lib/constants.py @@ -69,6 +69,48 @@ SYS_PROC_LAST_SIGNAL = 57 SYS_PROC_FAULT_VECTOR = 58 SYS_PROC_FAULT_ERROR = 59 SYS_PROC_FAULT_RIP = 60 +SYS_PROC_COUNT = 61 +SYS_PROC_PID_AT = 62 +SYS_PROC_SNAPSHOT = 63 +SYS_PROC_KILL = 64 +SYS_KDBG_SYM = 65 +SYS_KDBG_BT = 66 +SYS_KDBG_REGS = 67 +SYS_STATS_TOTAL = 68 +SYS_STATS_ID_COUNT = 69 +SYS_STATS_RECENT_WINDOW = 70 +SYS_STATS_RECENT_ID = 71 +SYS_FD_OPEN = 72 +SYS_FD_READ = 73 +SYS_FD_WRITE = 74 +SYS_FD_CLOSE = 75 +SYS_FD_DUP = 76 +SYS_DL_OPEN = 77 +SYS_DL_CLOSE = 78 +SYS_DL_SYM = 79 +SYS_EXEC_PATHV_IO = 80 + +# proc states (from cleonos/c/include/cleonos_syscall.h) +PROC_STATE_UNUSED = 0 +PROC_STATE_PENDING = 1 +PROC_STATE_RUNNING = 2 +PROC_STATE_EXITED = 3 +PROC_STATE_STOPPED = 4 + +# signals (from cleonos/c/include/cleonos_syscall.h) +SIGKILL = 9 +SIGTERM = 15 +SIGCONT = 18 +SIGSTOP = 19 + +# open flags (from cleonos/c/include/cleonos_syscall.h) +O_RDONLY = 0x0000 +O_WRONLY = 0x0001 +O_RDWR = 0x0002 +O_CREAT = 0x0040 +O_TRUNC = 0x0200 +O_APPEND = 0x0400 +FD_INHERIT = U64_MASK def u64(value: int) -> int: diff --git a/wine/cleonos_wine_lib/runner.py b/wine/cleonos_wine_lib/runner.py index 0b7601b..57b1384 100644 --- a/wine/cleonos_wine_lib/runner.py +++ b/wine/cleonos_wine_lib/runner.py @@ -6,40 +6,48 @@ import sys import time from dataclasses import dataclass from pathlib import Path -from typing import List, Optional, Tuple +from typing import Dict, List, Optional, Tuple from .constants import ( DEFAULT_MAX_EXEC_DEPTH, + FD_INHERIT, FS_NAME_MAX, MAX_CSTR, MAX_IO_READ, + O_APPEND, + O_CREAT, + O_RDONLY, + O_RDWR, + O_TRUNC, + O_WRONLY, PAGE_SIZE, + PROC_STATE_EXITED, + PROC_STATE_PENDING, + PROC_STATE_RUNNING, + PROC_STATE_STOPPED, + SIGCONT, + SIGKILL, + SIGSTOP, + SIGTERM, SYS_AUDIO_AVAILABLE, SYS_AUDIO_PLAY_TONE, SYS_AUDIO_STOP, SYS_CONTEXT_SWITCHES, SYS_CUR_TASK, + SYS_DL_CLOSE, + SYS_DL_OPEN, + SYS_DL_SYM, SYS_EXEC_PATH, SYS_EXEC_PATHV, + SYS_EXEC_PATHV_IO, SYS_EXEC_REQUESTS, SYS_EXEC_SUCCESS, SYS_EXIT, - SYS_GETPID, - SYS_PROC_ARGC, - SYS_PROC_ARGV, - SYS_PROC_ENVC, - SYS_PROC_ENV, - SYS_PROC_FAULT_ERROR, - SYS_PROC_FAULT_RIP, - SYS_PROC_FAULT_VECTOR, - SYS_PROC_LAST_SIGNAL, - SYS_SLEEP_TICKS, - SYS_SPAWN_PATH, - SYS_SPAWN_PATHV, - SYS_WAITPID, - SYS_YIELD, - SYS_SHUTDOWN, - SYS_RESTART, + SYS_FD_CLOSE, + SYS_FD_DUP, + SYS_FD_OPEN, + SYS_FD_READ, + SYS_FD_WRITE, SYS_FS_APPEND, SYS_FS_CHILD_COUNT, SYS_FS_GET_CHILD_NAME, @@ -50,6 +58,7 @@ from .constants import ( SYS_FS_STAT_SIZE, SYS_FS_STAT_TYPE, SYS_FS_WRITE, + SYS_GETPID, SYS_KBD_BUFFERED, SYS_KBD_DROPPED, SYS_KBD_GET_CHAR, @@ -61,8 +70,29 @@ from .constants import ( SYS_LOG_JOURNAL_COUNT, SYS_LOG_JOURNAL_READ, SYS_LOG_WRITE, + SYS_PROC_ARGC, + SYS_PROC_ARGV, + SYS_PROC_COUNT, + SYS_PROC_ENVC, + SYS_PROC_ENV, + SYS_PROC_FAULT_ERROR, + SYS_PROC_FAULT_RIP, + SYS_PROC_FAULT_VECTOR, + SYS_PROC_KILL, + SYS_PROC_LAST_SIGNAL, + SYS_PROC_PID_AT, + SYS_PROC_SNAPSHOT, + SYS_RESTART, SYS_SERVICE_COUNT, SYS_SERVICE_READY_COUNT, + SYS_SHUTDOWN, + SYS_SLEEP_TICKS, + SYS_SPAWN_PATH, + SYS_SPAWN_PATHV, + SYS_STATS_ID_COUNT, + SYS_STATS_RECENT_ID, + SYS_STATS_RECENT_WINDOW, + SYS_STATS_TOTAL, SYS_TASK_COUNT, SYS_TIMER_TICKS, SYS_TTY_ACTIVE, @@ -75,6 +105,8 @@ from .constants import ( SYS_USER_LAUNCH_OK, SYS_USER_LAUNCH_TRIES, SYS_USER_SHELL_READY, + SYS_WAITPID, + SYS_YIELD, page_ceil, page_floor, u64, @@ -124,6 +156,15 @@ class ELFImage: segments: List[ELFSegment] +@dataclass +class FDEntry: + kind: str + flags: int + offset: int = 0 + path: str = "" + tty_index: int = 0 + + EXEC_PATH_MAX = 192 EXEC_ARG_LINE_MAX = 256 EXEC_ENV_LINE_MAX = 512 @@ -131,6 +172,8 @@ EXEC_MAX_ARGS = 24 EXEC_MAX_ENVS = 24 EXEC_ITEM_MAX = 128 EXEC_STATUS_SIGNAL_FLAG = 1 << 63 +PROC_PATH_MAX = 192 +FD_MAX = 64 class CLeonOSWineNative: @@ -150,6 +193,7 @@ class CLeonOSWineNative: ppid: int = 0, argv_items: Optional[List[str]] = None, env_items: Optional[List[str]] = None, + inherited_fds: Optional[Dict[int, FDEntry]] = None, ) -> None: self.elf_path = elf_path self.rootfs = rootfs @@ -175,9 +219,97 @@ class CLeonOSWineNative: self._stack_size = 0x0000000000020000 self._ret_sentinel = 0x00007FFF10000000 self._mapped_ranges: List[Tuple[int, int]] = [] + self._tty_index = int(self.state.tty_active) + self._fds: Dict[int, FDEntry] = {} + self._fd_inherited = inherited_fds if inherited_fds is not None else {} default_path = self._normalize_guest_path(self.guest_path_hint or f"/{self.elf_path.name}") self.argv_items, self.env_items = self._prepare_exec_items(default_path, self.argv_items, self.env_items) + self._init_default_fds() + + @staticmethod + def _clone_fd_entry(entry: FDEntry) -> FDEntry: + return FDEntry(kind=entry.kind, flags=int(entry.flags), offset=int(entry.offset), path=str(entry.path), tty_index=int(entry.tty_index)) + + @staticmethod + def _fd_access_mode(flags: int) -> int: + return int(flags) & 0x3 + + @classmethod + def _fd_access_mode_valid(cls, flags: int) -> bool: + mode = cls._fd_access_mode(flags) + return mode in (O_RDONLY, O_WRONLY, O_RDWR) + + @classmethod + def _fd_can_read(cls, flags: int) -> bool: + mode = cls._fd_access_mode(flags) + return mode in (O_RDONLY, O_RDWR) + + @classmethod + def _fd_can_write(cls, flags: int) -> bool: + mode = cls._fd_access_mode(flags) + return mode in (O_WRONLY, O_RDWR) + + def _init_default_fds(self) -> None: + self._fds = { + 0: FDEntry(kind="tty", flags=O_RDONLY, offset=0, tty_index=self._tty_index), + 1: FDEntry(kind="tty", flags=O_WRONLY, offset=0, tty_index=self._tty_index), + 2: FDEntry(kind="tty", flags=O_WRONLY, offset=0, tty_index=self._tty_index), + } + + for target in (0, 1, 2): + inherited = self._fd_inherited.get(target) + if inherited is not None: + self._fds[target] = self._clone_fd_entry(inherited) + + for target in (0, 1, 2): + entry = self._fds.get(target) + if entry is not None and entry.kind == "tty": + self._tty_index = int(entry.tty_index) + break + + def _fd_lookup(self, fd: int) -> Optional[FDEntry]: + if fd < 0 or fd >= FD_MAX: + return None + return self._fds.get(int(fd)) + + def _fd_find_free(self) -> int: + for fd in range(FD_MAX): + if fd not in self._fds: + return fd + return -1 + + def _stdio_entry_for_child(self, target_fd: int, override_fd: int, require_read: bool, require_write: bool) -> Optional[FDEntry]: + if override_fd == FD_INHERIT: + src = self._fd_lookup(target_fd) + else: + src = self._fd_lookup(override_fd) + + if src is None: + return None + + if require_read and not self._fd_can_read(src.flags): + return None + + if require_write and not self._fd_can_write(src.flags): + return None + + return self._clone_fd_entry(src) + + def _build_child_stdio_map(self, stdin_fd: int, stdout_fd: int, stderr_fd: int) -> Optional[Dict[int, FDEntry]]: + child_map: Dict[int, FDEntry] = {} + + in_entry = self._stdio_entry_for_child(0, stdin_fd, require_read=True, require_write=False) + out_entry = self._stdio_entry_for_child(1, stdout_fd, require_read=False, require_write=True) + err_entry = self._stdio_entry_for_child(2, stderr_fd, require_read=False, require_write=True) + + if in_entry is None or out_entry is None or err_entry is None: + return None + + child_map[0] = in_entry + child_map[1] = out_entry + child_map[2] = err_entry + return child_map def run(self) -> Optional[int]: if self.pid == 0: @@ -187,6 +319,7 @@ class CLeonOSWineNative: self.state.set_current_pid(self.pid) self.state.set_proc_cmdline(self.pid, self.argv_items, self.env_items) self.state.set_proc_fault(self.pid, 0, 0, 0, 0) + self.state.set_proc_running(self.pid, self.argv_items[0] if self.argv_items else self.guest_path_hint, self._tty_index) uc = Uc(UC_ARCH_X86, UC_MODE_64) self._install_hooks(uc) @@ -252,6 +385,7 @@ class CLeonOSWineNative: arg1 = self._reg_read(uc, UC_X86_REG_RCX) arg2 = self._reg_read(uc, UC_X86_REG_RDX) + self.state.record_syscall(syscall_id) self.state.context_switches = u64(self.state.context_switches + 1) ret = self._dispatch_syscall(uc, syscall_id, arg0, arg1, arg2) self._reg_write(uc, UC_X86_REG_RAX, u64(ret)) @@ -291,6 +425,8 @@ class CLeonOSWineNative: return self._exec_path(uc, arg0) if sid == SYS_EXEC_PATHV: return self._exec_pathv(uc, arg0, arg1, arg2) + if sid == SYS_EXEC_PATHV_IO: + return self._exec_pathv_io(uc, arg0, arg1, arg2) if sid == SYS_SPAWN_PATH: return self._spawn_path(uc, arg0) if sid == SYS_SPAWN_PATHV: @@ -315,6 +451,14 @@ class CLeonOSWineNative: return self._proc_fault_error() if sid == SYS_PROC_FAULT_RIP: return self._proc_fault_rip() + if sid == SYS_PROC_COUNT: + return self._proc_count() + if sid == SYS_PROC_PID_AT: + return self._proc_pid_at(uc, arg0, arg1) + if sid == SYS_PROC_SNAPSHOT: + return self._proc_snapshot(uc, arg0, arg1, arg2) + if sid == SYS_PROC_KILL: + return self._proc_kill(uc, arg0, arg1) if sid == SYS_EXIT: return self._request_exit(uc, arg0) if sid == SYS_SLEEP_TICKS: @@ -400,6 +544,30 @@ class CLeonOSWineNative: return self.state.kbd_drop_count if sid == SYS_KBD_HOTKEY_SWITCHES: return self.state.kbd_hotkey_switches + if sid == SYS_STATS_TOTAL: + return self.state.stats_total() + if sid == SYS_STATS_ID_COUNT: + return self.state.stats_id_count(arg0) + if sid == SYS_STATS_RECENT_WINDOW: + return self.state.stats_recent_window() + if sid == SYS_STATS_RECENT_ID: + return self.state.stats_recent_id_count(arg0) + if sid == SYS_FD_OPEN: + return self._fd_open(uc, arg0, arg1, arg2) + if sid == SYS_FD_READ: + return self._fd_read(uc, arg0, arg1, arg2) + if sid == SYS_FD_WRITE: + return self._fd_write(uc, arg0, arg1, arg2) + if sid == SYS_FD_CLOSE: + return self._fd_close(arg0) + if sid == SYS_FD_DUP: + return self._fd_dup(arg0) + if sid == SYS_DL_OPEN: + return u64_neg1() + if sid == SYS_DL_CLOSE: + return 0 + if sid == SYS_DL_SYM: + return 0 return u64_neg1() @@ -409,6 +577,15 @@ class CLeonOSWineNative: sys.stdout.write(text) sys.stdout.flush() + def _host_write_bytes(self, data: bytes) -> None: + if not data: + return + if hasattr(sys.stdout, "buffer"): + sys.stdout.buffer.write(data) + sys.stdout.flush() + return + self._host_write(data.decode("utf-8", errors="replace")) + def _load_segments(self, uc: Uc) -> None: for seg in self.image.segments: start = page_floor(seg.vaddr) @@ -504,6 +681,30 @@ class CLeonOSWineNative: except UcError: return b"" + def _read_guest_bytes_exact(self, uc: Uc, addr: int, size: int) -> Optional[bytes]: + if size < 0 or addr == 0: + return None + if size == 0: + return b"" + + out = bytearray() + cursor = int(addr) + left = int(size) + + while left > 0: + chunk = min(left, MAX_IO_READ) + try: + data = uc.mem_read(cursor, chunk) + except UcError: + return None + if len(data) != chunk: + return None + out.extend(data) + cursor += chunk + left -= chunk + + return bytes(out) + def _write_guest_bytes(self, uc: Uc, addr: int, data: bytes) -> bool: if addr == 0: return False @@ -743,16 +944,86 @@ class CLeonOSWineNative: return 1 if self._write_guest_bytes(uc, out_ptr, encoded + b"\x00") else 0 def _exec_path(self, uc: Uc, path_ptr: int) -> int: - return self._spawn_path_common(uc, path_ptr, 0, 0, return_pid=False) + return self._spawn_path_common( + uc, + path_ptr, + 0, + 0, + return_pid=False, + env_line_override=None, + stdin_fd=FD_INHERIT, + stdout_fd=FD_INHERIT, + stderr_fd=FD_INHERIT, + ) def _exec_pathv(self, uc: Uc, path_ptr: int, argv_ptr: int, env_ptr: int) -> int: - return self._spawn_path_common(uc, path_ptr, argv_ptr, env_ptr, return_pid=False) + return self._spawn_path_common( + uc, + path_ptr, + argv_ptr, + env_ptr, + return_pid=False, + env_line_override=None, + stdin_fd=FD_INHERIT, + stdout_fd=FD_INHERIT, + stderr_fd=FD_INHERIT, + ) + + def _exec_pathv_io(self, uc: Uc, path_ptr: int, argv_ptr: int, req_ptr: int) -> int: + req_data: Optional[bytes] + env_ptr: int + stdin_fd: int + stdout_fd: int + stderr_fd: int + env_line: str + + if req_ptr == 0: + return u64_neg1() + + req_data = self._read_guest_bytes_exact(uc, req_ptr, 32) + if req_data is None or len(req_data) != 32: + return u64_neg1() + + env_ptr, stdin_fd, stdout_fd, stderr_fd = struct.unpack(" int: - return self._spawn_path_common(uc, path_ptr, 0, 0, return_pid=True) + return self._spawn_path_common( + uc, + path_ptr, + 0, + 0, + return_pid=True, + env_line_override=None, + stdin_fd=FD_INHERIT, + stdout_fd=FD_INHERIT, + stderr_fd=FD_INHERIT, + ) def _spawn_pathv(self, uc: Uc, path_ptr: int, argv_ptr: int, env_ptr: int) -> int: - return self._spawn_path_common(uc, path_ptr, argv_ptr, env_ptr, return_pid=True) + return self._spawn_path_common( + uc, + path_ptr, + argv_ptr, + env_ptr, + return_pid=True, + env_line_override=None, + stdin_fd=FD_INHERIT, + stdout_fd=FD_INHERIT, + stderr_fd=FD_INHERIT, + ) def _spawn_path_common( self, @@ -762,17 +1033,30 @@ class CLeonOSWineNative: env_ptr: int, *, return_pid: bool, + env_line_override: Optional[str], + stdin_fd: int, + stdout_fd: int, + stderr_fd: int, ) -> int: path = self._read_guest_cstring(uc, path_ptr, EXEC_PATH_MAX) guest_path = self._normalize_guest_path(path) argv_line = self._read_guest_cstring(uc, argv_ptr, EXEC_ARG_LINE_MAX) if argv_ptr != 0 else "" - env_line = self._read_guest_cstring(uc, env_ptr, EXEC_ENV_LINE_MAX) if env_ptr != 0 else "" + env_line = ( + env_line_override + if env_line_override is not None + else (self._read_guest_cstring(uc, env_ptr, EXEC_ENV_LINE_MAX) if env_ptr != 0 else "") + ) host_path = self._guest_to_host(guest_path, must_exist=True) + child_stdio = self._build_child_stdio_map(int(stdin_fd), int(stdout_fd), int(stderr_fd)) self.state.exec_requests = u64(self.state.exec_requests + 1) self.state.user_exec_requested = 1 self.state.user_launch_tries = u64(self.state.user_launch_tries + 1) + if child_stdio is None: + self.state.user_launch_fail = u64(self.state.user_launch_fail + 1) + return u64_neg1() + if host_path is None or not host_path.is_file(): self.state.user_launch_fail = u64(self.state.user_launch_fail + 1) return u64_neg1() @@ -802,6 +1086,7 @@ class CLeonOSWineNative: ppid=parent_pid, argv_items=argv_items, env_items=env_items, + inherited_fds=child_stdio, ) child_ret = child.run() @@ -854,6 +1139,97 @@ class CLeonOSWineNative: def _proc_fault_rip(self) -> int: return self.state.proc_fault_rip_value(self.state.get_current_pid()) + def _proc_count(self) -> int: + return self.state.proc_count() + + def _proc_pid_at(self, uc: Uc, index: int, out_ptr: int) -> int: + if out_ptr == 0: + return 0 + + pid = self.state.proc_pid_at(int(index)) + if pid is None: + return 0 + + return 1 if self._write_guest_bytes(uc, out_ptr, struct.pack(" int: + if out_ptr == 0 or out_size < (13 * 8 + PROC_PATH_MAX): + return 0 + + target = int(pid) + state_value = self.state.proc_state_value(target) + if state_value == 0: + return 0 + + path = self.state.proc_path_value(target) + encoded_path = path.encode("utf-8", errors="replace") + if len(encoded_path) >= PROC_PATH_MAX: + encoded_path = encoded_path[: PROC_PATH_MAX - 1] + path_buf = encoded_path + b"\x00" + (b"\x00" * (PROC_PATH_MAX - len(encoded_path) - 1)) + + blob = struct.pack( + "<13Q", + u64(target), + u64(self.state.proc_ppid(target)), + u64(state_value), + u64(self.state.proc_started_tick_value(target)), + u64(self.state.proc_exited_tick_value(target)), + u64(self.state.proc_exit_status_value(target)), + u64(self.state.proc_runtime_ticks(target)), + u64(self.state.proc_mem_bytes_value(target)), + u64(self.state.proc_tty_index_value(target)), + u64(self.state.proc_signal(target)), + u64(self.state.proc_fault_vector_value(target)), + u64(self.state.proc_fault_error_value(target)), + u64(self.state.proc_fault_rip_value(target)), + ) + path_buf + + return 1 if self._write_guest_bytes(uc, out_ptr, blob) else 0 + + def _proc_kill(self, uc: Uc, pid: int, signal: int) -> int: + target = int(pid) + if target <= 0: + return u64_neg1() + + current_state = self.state.proc_state_value(target) + if current_state == 0: + return u64_neg1() + + effective_signal = int(signal) & 0xFF + if effective_signal == 0: + effective_signal = SIGTERM + + self.state.set_proc_fault(target, effective_signal, 0, 0, 0) + + if current_state == PROC_STATE_EXITED: + return 1 + + if effective_signal == SIGCONT: + if current_state == PROC_STATE_STOPPED: + self.state.set_proc_pending(target) + return 1 + + if effective_signal == SIGSTOP and current_state in (PROC_STATE_PENDING, PROC_STATE_STOPPED): + self.state.set_proc_stopped(target) + return 1 + + status = self._encode_signal_status(effective_signal, 0, 0) + + if target == self.state.get_current_pid(): + self._exit_requested = True + self._exit_status = u64(status) + if effective_signal == SIGSTOP: + self.state.set_proc_stopped(target) + uc.emu_stop() + return 1 + + if effective_signal == SIGSTOP: + self.state.set_proc_stopped(target) + return 1 + + self.state.mark_exited(target, status) + return 1 + def _wait_pid(self, uc: Uc, pid: int, out_ptr: int) -> int: wait_ret, status = self.state.wait_pid(int(pid)) @@ -885,6 +1261,190 @@ class CLeonOSWineNative: time.sleep(0) return self.state.timer_ticks() + def _fd_open(self, uc: Uc, path_ptr: int, flags: int, mode: int) -> int: + _ = mode + guest_path = self._normalize_guest_path(self._read_guest_cstring(uc, path_ptr, EXEC_PATH_MAX)) + open_flags = int(u64(flags)) + lower_path = guest_path.lower() + fd_slot: int + entry: FDEntry + + if not guest_path.startswith("/"): + return u64_neg1() + + if not self._fd_access_mode_valid(open_flags): + return u64_neg1() + + if ((open_flags & O_TRUNC) != 0 or (open_flags & O_APPEND) != 0) and not self._fd_can_write(open_flags): + return u64_neg1() + + fd_slot = self._fd_find_free() + if fd_slot < 0: + return u64_neg1() + + if lower_path == "/dev/tty": + entry = FDEntry(kind="tty", flags=open_flags, offset=0, tty_index=self._tty_index) + self._fds[fd_slot] = entry + return fd_slot + + if lower_path == "/dev/null": + entry = FDEntry(kind="dev_null", flags=open_flags, offset=0, tty_index=self._tty_index) + self._fds[fd_slot] = entry + return fd_slot + + if lower_path == "/dev/zero": + entry = FDEntry(kind="dev_zero", flags=open_flags, offset=0, tty_index=self._tty_index) + self._fds[fd_slot] = entry + return fd_slot + + if lower_path == "/dev/random": + entry = FDEntry(kind="dev_random", flags=open_flags, offset=0, tty_index=self._tty_index) + self._fds[fd_slot] = entry + return fd_slot + + host_path = self._guest_to_host(guest_path, must_exist=False) + if host_path is None: + return u64_neg1() + + try: + if not host_path.exists(): + if (open_flags & O_CREAT) == 0 or not self._fd_can_write(open_flags): + return u64_neg1() + host_path.parent.mkdir(parents=True, exist_ok=True) + host_path.write_bytes(b"") + + if host_path.is_dir(): + return u64_neg1() + + if (open_flags & O_TRUNC) != 0: + host_path.write_bytes(b"") + + offset = int(host_path.stat().st_size) if (open_flags & O_APPEND) != 0 else 0 + except Exception: + return u64_neg1() + + entry = FDEntry(kind="file", flags=open_flags, offset=offset, path=str(host_path), tty_index=self._tty_index) + self._fds[fd_slot] = entry + return fd_slot + + def _fd_read(self, uc: Uc, fd: int, out_ptr: int, size: int) -> int: + req = int(u64(size)) + entry = self._fd_lookup(int(fd)) + data: bytes + + if req == 0: + return 0 + + if out_ptr == 0: + return u64_neg1() + + if entry is None or not self._fd_can_read(entry.flags): + return u64_neg1() + + req = min(req, MAX_IO_READ) + + if entry.kind == "tty": + out = bytearray() + while len(out) < req: + key = self.state.pop_key() + if key is None: + break + out.append(key & 0xFF) + data = bytes(out) + elif entry.kind == "dev_null": + return 0 + elif entry.kind == "dev_zero": + data = b"\x00" * req + elif entry.kind == "dev_random": + data = os.urandom(req) + elif entry.kind == "file": + try: + with open(entry.path, "rb") as fh: + fh.seek(entry.offset) + data = fh.read(req) + except Exception: + return u64_neg1() + else: + return u64_neg1() + + if len(data) == 0: + return 0 + + if not self._write_guest_bytes(uc, int(out_ptr), data): + return u64_neg1() + + entry.offset += len(data) + return len(data) + + def _fd_write(self, uc: Uc, fd: int, buf_ptr: int, size: int) -> int: + req = int(u64(size)) + entry = self._fd_lookup(int(fd)) + data: Optional[bytes] + write_pos: int + + if req == 0: + return 0 + + if buf_ptr == 0: + return u64_neg1() + + if entry is None or not self._fd_can_write(entry.flags): + return u64_neg1() + + req = min(req, MAX_IO_READ) + data = self._read_guest_bytes_exact(uc, int(buf_ptr), req) + if data is None: + return u64_neg1() + + if entry.kind == "tty": + self._host_write_bytes(data) + entry.offset += len(data) + return len(data) + + if entry.kind in ("dev_null", "dev_zero", "dev_random"): + entry.offset += len(data) + return len(data) + + if entry.kind != "file": + return u64_neg1() + + try: + host_path = Path(entry.path) + if not host_path.exists(): + if (entry.flags & O_CREAT) == 0 or not self._fd_can_write(entry.flags): + return u64_neg1() + host_path.parent.mkdir(parents=True, exist_ok=True) + host_path.write_bytes(b"") + + with open(host_path, "r+b") as fh: + write_pos = int(entry.offset) + fh.seek(write_pos) + fh.write(data) + except Exception: + return u64_neg1() + + entry.offset += len(data) + return len(data) + + def _fd_close(self, fd: int) -> int: + key = int(fd) + if key not in self._fds: + return u64_neg1() + del self._fds[key] + return 0 + + def _fd_dup(self, fd: int) -> int: + src = self._fd_lookup(int(fd)) + if src is None: + return u64_neg1() + + slot = self._fd_find_free() + if slot < 0: + return u64_neg1() + + self._fds[slot] = self._clone_fd_entry(src) + return slot + @staticmethod def _truncate_item_text(text: str, max_bytes: int = EXEC_ITEM_MAX) -> str: if max_bytes <= 1: diff --git a/wine/cleonos_wine_lib/state.py b/wine/cleonos_wine_lib/state.py index 3739786..7c5ce38 100644 --- a/wine/cleonos_wine_lib/state.py +++ b/wine/cleonos_wine_lib/state.py @@ -6,7 +6,7 @@ import time from dataclasses import dataclass, field from typing import Deque, Dict, List, Optional, Tuple -from .constants import u64 +from .constants import PROC_STATE_EXITED, PROC_STATE_PENDING, PROC_STATE_RUNNING, PROC_STATE_STOPPED, u64 @dataclass @@ -37,13 +37,27 @@ class SharedKernelState: kbd_hotkey_switches: int = 0 log_journal_cap: int = 256 log_journal: Deque[str] = field(default_factory=lambda: collections.deque(maxlen=256)) - fs_write_max: int = 65536 + fs_write_max: int = 16 * 1024 * 1024 + # syscall stats + stats_lock: threading.Lock = field(default_factory=threading.Lock) + stats_total_calls: int = 0 + stats_id_total: Dict[int, int] = field(default_factory=dict) + stats_recent_window_cap: int = 256 + stats_recent_ring: Deque[int] = field(default_factory=lambda: collections.deque(maxlen=256)) + + # process table proc_lock: threading.Lock = field(default_factory=threading.Lock) proc_next_pid: int = 1 proc_current_pid: int = 0 proc_parents: Dict[int, int] = field(default_factory=dict) proc_status: Dict[int, Optional[int]] = field(default_factory=dict) + proc_state: Dict[int, int] = field(default_factory=dict) + proc_started_tick: Dict[int, int] = field(default_factory=dict) + proc_exited_tick: Dict[int, int] = field(default_factory=dict) + proc_mem_bytes: Dict[int, int] = field(default_factory=dict) + proc_tty_index: Dict[int, int] = field(default_factory=dict) + proc_path: Dict[int, str] = field(default_factory=dict) proc_argv: Dict[int, List[str]] = field(default_factory=dict) proc_env: Dict[int, List[str]] = field(default_factory=dict) proc_last_signal: Dict[int, int] = field(default_factory=dict) @@ -54,6 +68,32 @@ class SharedKernelState: def timer_ticks(self) -> int: return (time.monotonic_ns() - self.start_ns) // 1_000_000 + def record_syscall(self, sid: int) -> None: + key = int(u64(sid)) + + with self.stats_lock: + self.stats_total_calls = int(u64(self.stats_total_calls + 1)) + self.stats_id_total[key] = int(u64(self.stats_id_total.get(key, 0) + 1)) + self.stats_recent_ring.append(key) + + def stats_total(self) -> int: + with self.stats_lock: + return int(self.stats_total_calls) + + def stats_id_count(self, sid: int) -> int: + key = int(u64(sid)) + with self.stats_lock: + return int(self.stats_id_total.get(key, 0)) + + def stats_recent_window(self) -> int: + with self.stats_lock: + return len(self.stats_recent_ring) + + def stats_recent_id_count(self, sid: int) -> int: + key = int(u64(sid)) + with self.stats_lock: + return sum(1 for item in self.stats_recent_ring if item == key) + def push_key(self, key: int) -> None: with self.kbd_lock: if len(self.kbd_queue) >= self.kbd_queue_cap: @@ -94,6 +134,8 @@ class SharedKernelState: return list(self.log_journal)[index_from_oldest] def alloc_pid(self, ppid: int) -> int: + now = self.timer_ticks() + with self.proc_lock: pid = int(self.proc_next_pid) @@ -107,6 +149,12 @@ class SharedKernelState: self.proc_parents[pid] = int(ppid) self.proc_status[pid] = None + self.proc_state[pid] = PROC_STATE_PENDING + self.proc_started_tick[pid] = now + self.proc_exited_tick[pid] = 0 + self.proc_mem_bytes[pid] = 0 + self.proc_tty_index[pid] = int(self.tty_active) + self.proc_path[pid] = "" self.proc_argv[pid] = [] self.proc_env[pid] = [] self.proc_last_signal[pid] = 0 @@ -123,12 +171,51 @@ class SharedKernelState: with self.proc_lock: return int(self.proc_current_pid) + def set_proc_running(self, pid: int, path: Optional[str], tty_index: int) -> None: + if pid <= 0: + return + + now = self.timer_ticks() + + with self.proc_lock: + if pid not in self.proc_status: + return + + self.proc_state[pid] = PROC_STATE_RUNNING + if self.proc_started_tick.get(pid, 0) == 0: + self.proc_started_tick[pid] = now + self.proc_tty_index[pid] = int(tty_index) + if path: + self.proc_path[pid] = str(path) + def mark_exited(self, pid: int, status: int) -> None: if pid <= 0: return with self.proc_lock: self.proc_status[int(pid)] = int(u64(status)) + self.proc_state[int(pid)] = PROC_STATE_EXITED + self.proc_exited_tick[int(pid)] = self.timer_ticks() + + def set_proc_stopped(self, pid: int) -> None: + if pid <= 0: + return + + with self.proc_lock: + if pid not in self.proc_status: + return + if self.proc_state.get(pid, PROC_STATE_PENDING) != PROC_STATE_EXITED: + self.proc_state[pid] = PROC_STATE_STOPPED + + def set_proc_pending(self, pid: int) -> None: + if pid <= 0: + return + + with self.proc_lock: + if pid not in self.proc_status: + return + if self.proc_state.get(pid, PROC_STATE_PENDING) != PROC_STATE_EXITED: + self.proc_state[pid] = PROC_STATE_PENDING def wait_pid(self, pid: int) -> Tuple[int, int]: with self.proc_lock: @@ -151,6 +238,8 @@ class SharedKernelState: return self.proc_argv[pid] = [str(item) for item in argv] self.proc_env[pid] = [str(item) for item in env] + if argv: + self.proc_path[pid] = str(argv[0]) def set_proc_fault(self, pid: int, signal: int, vector: int, error_code: int, rip: int) -> None: if pid <= 0: @@ -164,6 +253,68 @@ class SharedKernelState: self.proc_fault_error[pid] = int(u64(error_code)) self.proc_fault_rip[pid] = int(u64(rip)) + def proc_count(self) -> int: + with self.proc_lock: + return len(self.proc_status) + + def proc_pid_at(self, index: int) -> Optional[int]: + if index < 0: + return None + + with self.proc_lock: + ordered = sorted(self.proc_status.keys()) + if index >= len(ordered): + return None + return int(ordered[index]) + + def proc_state_value(self, pid: int) -> int: + with self.proc_lock: + return int(self.proc_state.get(int(pid), 0)) + + def proc_ppid(self, pid: int) -> int: + with self.proc_lock: + return int(self.proc_parents.get(int(pid), 0)) + + def proc_started_tick_value(self, pid: int) -> int: + with self.proc_lock: + return int(self.proc_started_tick.get(int(pid), 0)) + + def proc_exited_tick_value(self, pid: int) -> int: + with self.proc_lock: + return int(self.proc_exited_tick.get(int(pid), 0)) + + def proc_exit_status_value(self, pid: int) -> int: + with self.proc_lock: + value = self.proc_status.get(int(pid)) + return int(value) if value is not None else 0 + + def proc_runtime_ticks(self, pid: int) -> int: + with self.proc_lock: + start = int(self.proc_started_tick.get(int(pid), 0)) + state = int(self.proc_state.get(int(pid), 0)) + end = int(self.proc_exited_tick.get(int(pid), 0)) + + if start == 0: + return 0 + + if state == PROC_STATE_EXITED and end >= start: + return end - start + + now = self.timer_ticks() + return 0 if now < start else now - start + + def proc_mem_bytes_value(self, pid: int) -> int: + with self.proc_lock: + return int(self.proc_mem_bytes.get(int(pid), 0)) + + def proc_tty_index_value(self, pid: int) -> int: + with self.proc_lock: + return int(self.proc_tty_index.get(int(pid), 0)) + + def proc_path_value(self, pid: int) -> str: + with self.proc_lock: + return str(self.proc_path.get(int(pid), "")) + def proc_argc(self, pid: int) -> int: with self.proc_lock: return len(self.proc_argv.get(int(pid), []))