diff --git a/.gitmodules b/.gitmodules index d1354bb..efb6e95 100644 --- a/.gitmodules +++ b/.gitmodules @@ -7,3 +7,6 @@ [submodule "cleonos/third-party/doomgeneric"] path = cleonos/third-party/doomgeneric url = https://github.com/CLeonOS/doom.git +[submodule "wine"] + path = wine + url = https://github.com/CLeonOS/wine diff --git a/wine b/wine new file mode 160000 index 0000000..2071c8e --- /dev/null +++ b/wine @@ -0,0 +1 @@ +Subproject commit 2071c8ea465fe69c31b4707d932e7c358b7df841 diff --git a/wine/License b/wine/License deleted file mode 100644 index 59711bc..0000000 --- a/wine/License +++ /dev/null @@ -1,51 +0,0 @@ -Apache License -Version 2.0, January 2004 -http://www.apache.org/licenses/ - -TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION - -1. Definitions. - -"License" shall mean the terms and conditions for use, reproduction, and distribution as defined by Sections 1 through 9 of this document. - -"Licensor" shall mean the copyright owner or entity authorized by the copyright owner that is granting the License. - -"Legal Entity" shall mean the union of the acting entity and all other entities that control, are controlled by, or are under common control with that entity. For the purposes of this definition, "control" means (i) the power, direct or indirect, to cause the direction or management of such entity, whether by contract or otherwise, or (ii) ownership of fifty percent (50%) or more of the outstanding shares, or (iii) beneficial ownership of such entity. - -"You" (or "Your") shall mean an individual or Legal Entity exercising permissions granted by this License. - -"Source" form shall mean the preferred form for making modifications, including but not limited to software source code, documentation source, and configuration files. - -"Object" form shall mean any form resulting from mechanical transformation or translation of a Source form, including but not limited to compiled object code, generated documentation, and conversions to other media types. - -"Work" shall mean the work of authorship, whether in Source or Object form, made available under the License, as indicated by a copyright notice that is included in or attached to the work (an example is provided in the Appendix below). - -"Derivative Works" shall mean any work, whether in Source or Object form, that is based on (or derived from) the Work and for which the editorial revisions, annotations, elaborations, or other modifications represent, as a whole, an original work of authorship. For the purposes of this License, Derivative Works shall not include works that remain separable from, or merely link (or bind by name) to the interfaces of, the Work and Derivative Works thereof. - -"Contribution" shall mean any work of authorship, including the original version of the Work and any modifications or additions to that Work or Derivative Works thereof, that is intentionally submitted to Licensor for inclusion in the Work by the copyright owner or by an individual or Legal Entity authorized to submit on behalf of the copyright owner. For the purposes of this definition, "submitted" means any form of electronic, verbal, or written communication sent to the Licensor or its representatives, including but not limited to communication on electronic mailing lists, source code control systems, and issue tracking systems that are managed by, or on behalf of, the Licensor for the purpose of discussing and improving the Work, but excluding communication that is conspicuously marked or otherwise designated in writing by the copyright owner as "Not a Contribution." - -"Contributor" shall mean Licensor and any individual or Legal Entity on behalf of whom a Contribution has been received by Licensor and subsequently incorporated within the Work. - -2. Grant of Copyright License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable copyright license to reproduce, prepare Derivative Works of, publicly display, publicly perform, sublicense, and distribute the Work and such Derivative Works in Source or Object form. - -3. Grant of Patent License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable (except as stated in this section) patent license to make, have made, use, offer to sell, sell, import, and otherwise transfer the Work, where such license applies only to those patent claims licensable by such Contributor that are necessarily infringed by their Contribution(s) alone or by combination of their Contribution(s) with the Work to which such Contribution(s) was submitted. If You institute patent litigation against any entity (including a cross-claim or counterclaim in a lawsuit) alleging that the Work or a Contribution incorporated within the Work constitutes direct or contributory patent infringement, then any patent licenses granted to You under this License for that Work shall terminate as of the date such litigation is filed. - -4. Redistribution. You may reproduce and distribute copies of the Work or Derivative Works thereof in any medium, with or without modifications, and in Source or Object form, provided that You meet the following conditions: - -You must give any other recipients of the Work or Derivative Works a copy of this License; and -You must cause any modified files to carry prominent notices stating that You changed the files; and -You must retain, in the Source form of any Derivative Works that You distribute, all copyright, patent, trademark, and attribution notices from the Source form of the Work, excluding those notices that do not pertain to any part of the Derivative Works; and -If the Work includes a "NOTICE" text file as part of its distribution, then any Derivative Works that You distribute must include a readable copy of the attribution notices contained within such NOTICE file, excluding those notices that do not pertain to any part of the Derivative Works, in at least one of the following places: within a NOTICE text file distributed as part of the Derivative Works; within the Source form or documentation, if provided along with the Derivative Works; or, within a display generated by the Derivative Works, if and wherever such third-party notices normally appear. The contents of the NOTICE file are for informational purposes only and do not modify the License. You may add Your own attribution notices within Derivative Works that You distribute, alongside or as an addendum to the NOTICE text from the Work, provided that such additional attribution notices cannot be construed as modifying the License. -You may add Your own copyright statement to Your modifications and may provide additional or different license terms and conditions for use, reproduction, or distribution of Your modifications, or for any such Derivative Works as a whole, provided Your use, reproduction, and distribution of the Work otherwise complies with the conditions stated in this License. - -5. Submission of Contributions. Unless You explicitly state otherwise, any Contribution intentionally submitted for inclusion in the Work by You to the Licensor shall be under the terms and conditions of this License, without any additional terms or conditions. Notwithstanding the above, nothing herein shall supersede or modify the terms of any separate license agreement you may have executed with Licensor regarding such Contributions. - -6. Trademarks. This License does not grant permission to use the trade names, trademarks, service marks, or product names of the Licensor, except as required for reasonable and customary use in describing the origin of the Work and reproducing the content of the NOTICE file. - -7. Disclaimer of Warranty. Unless required by applicable law or agreed to in writing, Licensor provides the Work (and each Contributor provides its Contributions) on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied, including, without limitation, any warranties or conditions of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A PARTICULAR PURPOSE. You are solely responsible for determining the appropriateness of using or redistributing the Work and assume any risks associated with Your exercise of permissions under this License. - -8. Limitation of Liability. In no event and under no legal theory, whether in tort (including negligence), contract, or otherwise, unless required by applicable law (such as deliberate and grossly negligent acts) or agreed to in writing, shall any Contributor be liable to You for damages, including any direct, indirect, special, incidental, or consequential damages of any character arising as a result of this License or out of the use or inability to use the Work (including but not limited to damages for loss of goodwill, work stoppage, computer failure or malfunction, or any and all other commercial damages or losses), even if such Contributor has been advised of the possibility of such damages. - -9. Accepting Warranty or Additional Liability. While redistributing the Work or Derivative Works thereof, You may choose to offer, and charge a fee for, acceptance of support, warranty, indemnity, or other liability obligations and/or rights consistent with this License. However, in accepting such obligations, You may act only on Your own behalf and on Your sole responsibility, not on behalf of any other Contributor, and only if You agree to indemnify, defend, and hold each Contributor harmless for any liability incurred by, or claims asserted against, such Contributor by reason of your accepting any such warranty or additional liability. - -END OF TERMS AND CONDITIONS \ No newline at end of file diff --git a/wine/README.md b/wine/README.md deleted file mode 100644 index 5d76c47..0000000 --- a/wine/README.md +++ /dev/null @@ -1,90 +0,0 @@ -# CLeonOS-Wine (Native) - -CLeonOS-Wine 现在改为自研运行器:基于 Python + Unicorn,直接运行 CLeonOS x86_64 用户 ELF。 - -不再依赖 Qiling。 - -## 文件结构 - -- `wine/cleonos_wine.py`:兼容入口脚本 -- `wine/cleonos_wine_lib/cli.py`:命令行参数与启动流程 -- `wine/cleonos_wine_lib/runner.py`:ELF 装载、执行、syscall 分发 -- `wine/cleonos_wine_lib/state.py`:内核态统计与共享状态 -- `wine/cleonos_wine_lib/input_pump.py`:主机键盘输入线程 -- `wine/cleonos_wine_lib/constants.py`:常量与 syscall ID -- `wine/cleonos_wine_lib/platform.py`:Unicorn 导入与平台适配 -- `wine/requirements.txt`:Python 依赖(Unicorn + pygame) - -## 安装 - -```bash -pip install -r wine/requirements.txt -``` - -## 运行 - -```bash -python wine/cleonos_wine.py /hello.elf --rootfs build/x86_64/ramdisk_root -python wine/cleonos_wine.py /shell/shell.elf --rootfs build/x86_64/ramdisk_root -python wine/cleonos_wine.py /shell/qrcode.elf --rootfs build/x86_64/ramdisk_root --fb-window -- --ecc H "hello wine" -``` - -也支持直接传宿主路径: - -```bash -python wine/cleonos_wine.py build/x86_64/ramdisk_root/shell/shell.elf --rootfs build/x86_64/ramdisk_root -``` - -## 支持 - -- ELF64 (x86_64) PT_LOAD 段装载 -- CLeonOS `int 0x80` syscall 0..94(含 `FD_*`、`DL_*`、`FB_*`、`PROC_*`、`STATS_*`、`EXEC_PATHV_IO`、`KERNEL_VERSION`、`DISK_*`) -- TTY 输出与键盘输入队列 -- rootfs 文件/目录访问(`FS_*`) -- `/temp` 与已挂载磁盘路径写入限制(`FS_MKDIR/WRITE/APPEND/REMOVE`) -- `EXEC_PATH/EXEC_PATHV` 执行 ELF(带深度限制) -- `EXEC_PATHV_IO`(支持 stdio fd 继承/重定向) -- `SPAWN_PATH/SPAWN_PATHV/WAITPID/EXIT/SLEEP_TICKS/YIELD` -- 进程 `argv/env` 查询(`PROC_ARGC/PROC_ARGV/PROC_ENVC/PROC_ENV`) -- 进程枚举与快照(`PROC_COUNT/PROC_PID_AT/PROC_SNAPSHOT/PROC_KILL`) -- syscall 统计(`STATS_TOTAL/STATS_ID_COUNT/STATS_RECENT_*`) -- 文件描述符(`FD_OPEN/FD_READ/FD_WRITE/FD_CLOSE/FD_DUP`) -- 动态库兼容加载(`DL_OPEN/DL_CLOSE/DL_SYM`,基于 ELF 符号解析) -- framebuffer 兼容(`FB_INFO/FB_BLIT/FB_CLEAR`,支持内存缓冲与窗口显示) -- 内核版本查询(`KERNEL_VERSION`) -- 磁盘接口兼容(`DISK_PRESENT/SIZE_BYTES/SECTOR_COUNT/FORMATTED/FORMAT_FAT32/MOUNT/MOUNTED/MOUNT_PATH/READ_SECTOR/WRITE_SECTOR`) -- Wine 虚拟磁盘目录默认位于 `/__clks_disk0__`(格式化标记文件 `.fat32`,原始扇区后端文件 `.rawdisk.img`) -- 异常退出状态编码与故障元信息(`PROC_LAST_SIGNAL/PROC_FAULT_*`) - -## 版本策略 - -- CLeonOS-Wine 版本号固定为:`85.0.0-wine` -- 该值按项目策略固定,不再随新增 syscall 变更(即使当前实现范围已扩展到 `0..94`) - -## 参数 - -- `--no-kbd`:关闭输入线程 -- `--fb-window`:启用 framebuffer 窗口显示(pygame) -- `--fb-scale N`:窗口缩放倍数(默认 `2`) -- `--fb-max-fps N`:窗口刷新上限(默认 `60`) -- `--fb-hold-ms N`:程序退出后窗口保留毫秒数(默认 `2500`,静态图更容易看清) -- `--argv-line "..."`:直接指定 guest 参数行(等价于 shell 参数字符串) -- `--cwd PATH`:写入命令上下文中的工作目录(默认 `/`) -- `--` 之后内容:作为 guest argv 透传(推荐) -- `--max-exec-depth N`:设置 exec 嵌套深度上限 -- `--verbose`:打印更多日志 -- 环境变量 `CLEONOS_WINE_DISK_SIZE_MB`:设置 Wine 虚拟磁盘容量(MB,默认 `64`) - -## `execv/spawnv` 参数格式 - -- `argv_line`:按空白字符分词(与内核当前实现一致,不支持引号转义)。 -- `env_line`:按 `;` 或换行切分环境变量项,会去掉每项末尾空白。 -- 子进程 `argv[0]` 固定为目标程序路径(如 `/shell/ls.elf`)。 - -## 退出状态说明 - -- 正常退出:返回普通退出码。 -- 异常退出:最高位为 `1`,并编码: -- bits `7:0` = signal -- bits `15:8` = CPU exception vector -- bits `31:16` = error code 低 16 位 diff --git a/wine/__pycache__/cleonos_wine.cpython-313.pyc b/wine/__pycache__/cleonos_wine.cpython-313.pyc deleted file mode 100644 index 0ec9bdd..0000000 Binary files a/wine/__pycache__/cleonos_wine.cpython-313.pyc and /dev/null differ diff --git a/wine/__tmp_raw.txt b/wine/__tmp_raw.txt deleted file mode 100644 index 954f134..0000000 --- a/wine/__tmp_raw.txt +++ /dev/null @@ -1,944 +0,0 @@ -#!/usr/bin/env python3 -""" -CLeonOS-Wine (Qiling backend) - -Run CLeonOS x86_64 user ELF binaries on host OSes with a lightweight syscall shim. -""" - -from __future__ import annotations - -import argparse -import collections -import os -import struct -import sys -import threading -import time -from dataclasses import dataclass, field -from pathlib import Path -from typing import Deque, Optional, Tuple - -try: - from qiling import Qiling -except Exception as exc: - print("[WINE][ERROR] qiling import failed. Install dependencies first:", file=sys.stderr) - print(" pip install -r wine/requirements.txt", file=sys.stderr) - raise SystemExit(1) from exc - -try: - from unicorn import UC_PROT_EXEC, UC_PROT_READ, UC_PROT_WRITE -except Exception: - UC_PROT_READ = 1 - UC_PROT_WRITE = 2 - UC_PROT_EXEC = 4 - - -U64_MASK = (1 << 64) - 1 -MAX_CSTR = 4096 -MAX_IO_READ = 1 << 20 -DEFAULT_MAX_EXEC_DEPTH = 6 -FS_NAME_MAX = 96 - -# CLeonOS syscall IDs from cleonos/c/include/cleonos_syscall.h -SYS_LOG_WRITE = 0 -SYS_TIMER_TICKS = 1 -SYS_TASK_COUNT = 2 -SYS_CUR_TASK = 3 -SYS_SERVICE_COUNT = 4 -SYS_SERVICE_READY_COUNT = 5 -SYS_CONTEXT_SWITCHES = 6 -SYS_KELF_COUNT = 7 -SYS_KELF_RUNS = 8 -SYS_FS_NODE_COUNT = 9 -SYS_FS_CHILD_COUNT = 10 -SYS_FS_GET_CHILD_NAME = 11 -SYS_FS_READ = 12 -SYS_EXEC_PATH = 13 -SYS_EXEC_REQUESTS = 14 -SYS_EXEC_SUCCESS = 15 -SYS_USER_SHELL_READY = 16 -SYS_USER_EXEC_REQUESTED = 17 -SYS_USER_LAUNCH_TRIES = 18 -SYS_USER_LAUNCH_OK = 19 -SYS_USER_LAUNCH_FAIL = 20 -SYS_TTY_COUNT = 21 -SYS_TTY_ACTIVE = 22 -SYS_TTY_SWITCH = 23 -SYS_TTY_WRITE = 24 -SYS_TTY_WRITE_CHAR = 25 -SYS_KBD_GET_CHAR = 26 - - -def u64(value: int) -> int: - return value & U64_MASK - - -def u64_neg1() -> int: - return U64_MASK - - -@dataclass -class SharedKernelState: - start_ns: int = field(default_factory=time.monotonic_ns) - task_count: int = 5 - current_task: int = 0 - service_count: int = 7 - service_ready: int = 7 - context_switches: int = 0 - kelf_count: int = 2 - kelf_runs: int = 0 - exec_requests: int = 0 - exec_success: int = 0 - user_shell_ready: int = 1 - user_exec_requested: int = 0 - user_launch_tries: int = 0 - user_launch_ok: int = 0 - user_launch_fail: int = 0 - tty_count: int = 4 - tty_active: int = 0 - kbd_queue: Deque[int] = field(default_factory=collections.deque) - kbd_lock: threading.Lock = field(default_factory=threading.Lock) - - def timer_ticks(self) -> int: - # Millisecond style tick is enough for shell/user utilities. - return (time.monotonic_ns() - self.start_ns) // 1_000_000 - - def push_key(self, key: int) -> None: - with self.kbd_lock: - if len(self.kbd_queue) >= 1024: - self.kbd_queue.popleft() - self.kbd_queue.append(key & 0xFF) - - def pop_key(self) -> Optional[int]: - with self.kbd_lock: - if not self.kbd_queue: - return None - return self.kbd_queue.popleft() - - -class InputPump: - def __init__(self, state: SharedKernelState) -> None: - self.state = state - self._stop = threading.Event() - self._thread: Optional[threading.Thread] = None - self._posix_term_state = None - - def start(self) -> None: - if self._thread is not None: - return - if not sys.stdin or not hasattr(sys.stdin, "isatty") or not sys.stdin.isatty(): - return - self._thread = threading.Thread(target=self._run, name="cleonos-wine-input", daemon=True) - self._thread.start() - - def stop(self) -> None: - self._stop.set() - if self._thread is not None: - self._thread.join(timeout=0.2) - self._thread = None - self._restore_posix_tty() - - def _run(self) -> None: - if os.name == "nt": - self._run_windows() - else: - self._run_posix() - - def _run_windows(self) -> None: - import msvcrt # pylint: disable=import-error - - while not self._stop.is_set(): - if not msvcrt.kbhit(): - time.sleep(0.005) - continue - - ch = msvcrt.getwch() - if ch in ("\x00", "\xe0"): - # Extended scan code second byte, ignore for now. - _ = msvcrt.getwch() - continue - - norm = self._normalize_char(ch) - if norm is None: - continue - self.state.push_key(ord(norm)) - - def _run_posix(self) -> None: - import select - import termios - import tty - - fd = sys.stdin.fileno() - self._posix_term_state = termios.tcgetattr(fd) - tty.setcbreak(fd) - - try: - while not self._stop.is_set(): - readable, _, _ = select.select([sys.stdin], [], [], 0.05) - if not readable: - continue - ch = sys.stdin.read(1) - norm = self._normalize_char(ch) - if norm is None: - continue - self.state.push_key(ord(norm)) - finally: - self._restore_posix_tty() - - def _restore_posix_tty(self) -> None: - if self._posix_term_state is None: - return - try: - import termios - - fd = sys.stdin.fileno() - termios.tcsetattr(fd, termios.TCSADRAIN, self._posix_term_state) - except Exception: - pass - finally: - self._posix_term_state = None - - @staticmethod - def _normalize_char(ch: str) -> Optional[str]: - if not ch: - return None - if ch == "\r": - return "\n" - return ch - - -class CLeonOSWine: - def __init__( - self, - elf_path: Path, - rootfs: Path, - guest_path_hint: str, - *, - state: Optional[SharedKernelState] = None, - depth: int = 0, - max_exec_depth: int = DEFAULT_MAX_EXEC_DEPTH, - no_kbd: bool = False, - verbose: bool = False, - top_level: bool = True, - ) -> None: - self.elf_path = elf_path - self.rootfs = rootfs - self.guest_path_hint = guest_path_hint - self.state = state if state is not None else SharedKernelState() - self.depth = depth - self.max_exec_depth = max_exec_depth - self.no_kbd = no_kbd - self.verbose = verbose - self.top_level = top_level - - self.entry = self._read_elf_entry(elf_path) - self.exit_code: Optional[int] = None - - self._input_pump: Optional[InputPump] = None - self._stack_base = 0x00007FFF00000000 - self._stack_size = 0x0000000000020000 - self._ret_sentinel = self._stack_base + 0x1000 - - def run(self) -> Optional[int]: - if self.entry == 0: - print(f"[WINE][ERROR] ELF entry is 0, cannot run: {self.elf_path}", file=sys.stderr) - return None - - ql = self._new_ql() - self._install_hooks(ql) - self._prepare_custom_stack(ql) - - if self.top_level and not self.no_kbd: - self._input_pump = InputPump(self.state) - self._input_pump.start() - - try: - try: - ql.run(begin=self.entry) - except TypeError: - ql.run() - except Exception as exc: - if self.verbose or self.top_level: - print(f"[WINE][ERROR] runtime crashed: {exc}", file=sys.stderr) - return None - finally: - if self.top_level and self._input_pump is not None: - self._input_pump.stop() - - if self.exit_code is None: - self.exit_code = self._read_reg(ql, "rax") - - return u64(self.exit_code) - - def _new_ql(self) -> Qiling: - kwargs = {} - level = self._resolve_qiling_verbose(self.verbose) - if level is not None: - kwargs["verbose"] = level - - try: - return Qiling([str(self.elf_path)], str(self.rootfs), **kwargs) - except Exception as exc: - if self.verbose or self.top_level: - print(f"[WINE][WARN] native ELF loader failed, fallback to BLOB mode: {exc}", file=sys.stderr) - return self._new_ql_blob(kwargs) - - def _new_ql_blob(self, kwargs: dict) -> Qiling: - arch, ostype = self._resolve_blob_arch_os() - - last_error: Optional[Exception] = None - creators = ( - lambda: Qiling([], str(self.rootfs), ostype=ostype, archtype=arch, **kwargs), - lambda: Qiling(argv=[], rootfs=str(self.rootfs), ostype=ostype, archtype=arch, **kwargs), - lambda: Qiling(code=b"\x90", rootfs=str(self.rootfs), ostype=ostype, archtype=arch, **kwargs), - ) - - ql = None - for create in creators: - try: - ql = create() - break - except Exception as exc: - last_error = exc - - if ql is None: - raise RuntimeError(f"unable to create qiling blob instance: {last_error}") - - self._load_elf_segments_into_ql(ql, self.elf_path) - return ql - - @staticmethod - def _resolve_qiling_verbose(user_verbose: bool): - try: - from qiling.const import QL_VERBOSE # pylint: disable=import-outside-toplevel - except Exception: - return None - - if user_verbose: - for name in ("DEBUG", "DEFAULT"): - if hasattr(QL_VERBOSE, name): - return getattr(QL_VERBOSE, name) - return None - - for name in ("DISABLED", "OFF", "DEFAULT"): - if hasattr(QL_VERBOSE, name): - return getattr(QL_VERBOSE, name) - return None - - @staticmethod - def _resolve_blob_arch_os() -> Tuple[object, object]: - from qiling.const import QL_ARCH, QL_OS # pylint: disable=import-outside-toplevel - - arch = None - for name in ("X8664", "X86_64", "X86_64BITS"): - if hasattr(QL_ARCH, name): - arch = getattr(QL_ARCH, name) - break - - ostype = None - for name in ("BLOB", "NONE"): - if hasattr(QL_OS, name): - ostype = getattr(QL_OS, name) - break - - if arch is None or ostype is None: - raise RuntimeError("qiling BLOB arch/os constants not found") - return arch, ostype - - def _load_elf_segments_into_ql(self, ql: Qiling, path: Path) -> None: - data = path.read_bytes() - if len(data) < 64 or data[0:4] != b"\x7fELF" or data[4] != 2 or data[5] != 1: - raise RuntimeError(f"unsupported ELF format: {path}") - - phoff = struct.unpack_from(" len(data): - break - - p_type, p_flags, p_offset, p_vaddr, _p_paddr, p_filesz, p_memsz, _p_align = struct.unpack_from( - " 0: - file_start = int(p_offset) - file_end = min(len(data), file_start + int(p_filesz)) - if file_start < file_end: - seg_data = data[file_start:file_end] - if not self._safe_write_mem(ql, int(p_vaddr), seg_data): - raise RuntimeError(f"failed to write PT_LOAD segment at 0x{int(p_vaddr):X}") - - perms = 0 - if p_flags & 0x4: - perms |= UC_PROT_READ - if p_flags & 0x2: - perms |= UC_PROT_WRITE - if p_flags & 0x1: - perms |= UC_PROT_EXEC - if perms == 0: - perms = UC_PROT_READ - - try: - ql.mem.protect(map_start, map_size, perms) - except Exception: - pass - - loaded += 1 - - if loaded == 0: - raise RuntimeError(f"no PT_LOAD segments found: {path}") - - def _install_hooks(self, ql: Qiling) -> None: - hooker = getattr(ql, "hook_intno", None) - if hooker is None: - hooker = getattr(ql, "hook_intr", None) - if hooker is None: - raise RuntimeError("Qiling interrupt hook API not found") - - installed = False - for fn in ( - lambda: hooker(self._on_int80, 0x80), - lambda: hooker(0x80, self._on_int80), - ): - try: - fn() - installed = True - break - except TypeError: - continue - if not installed: - raise RuntimeError("Failed to install INT 0x80 hook") - - hook_addr = getattr(ql, "hook_address", None) - if hook_addr is None: - raise RuntimeError("Qiling address hook API not found") - - installed = False - for fn in ( - lambda: hook_addr(self._on_entry_return, self._ret_sentinel), - lambda: hook_addr(self._ret_sentinel, self._on_entry_return), - ): - try: - fn() - installed = True - break - except TypeError: - continue - if not installed: - raise RuntimeError("Failed to install return sentinel hook") - - def _prepare_custom_stack(self, ql: Qiling) -> None: - self._map_memory(ql, self._stack_base, self._stack_size) - if not self._safe_write_mem(ql, self._ret_sentinel, b"\xC3"): - raise RuntimeError("failed to place return sentinel") - - rsp = self._stack_base + self._stack_size - 8 - if not self._safe_write_mem(ql, rsp, struct.pack(" None: - try: - ql.mem.map(addr, size, info="[cleonos-wine-stack]") - return - except TypeError: - # Older qiling/unicorn builds may not support "info". - pass - except Exception: - # Already mapped is acceptable. - return - - try: - ql.mem.map(addr, size) - except Exception: - # Already mapped is acceptable. - pass - - def _on_entry_return(self, ql: Qiling, *_args) -> None: - self.exit_code = self._read_reg(ql, "rax") - ql.emu_stop() - - def _on_int80(self, ql: Qiling, *_args) -> None: - syscall_id = self._read_reg(ql, "rax") - arg0 = self._read_reg(ql, "rbx") - arg1 = self._read_reg(ql, "rcx") - arg2 = self._read_reg(ql, "rdx") - - self.state.context_switches = u64(self.state.context_switches + 1) - ret = self._dispatch_syscall(ql, syscall_id, arg0, arg1, arg2) - self._write_reg(ql, "rax", u64(ret)) - - def _dispatch_syscall(self, ql: Qiling, sid: int, arg0: int, arg1: int, arg2: int) -> int: - if sid == SYS_LOG_WRITE: - data = self._read_guest_bytes(ql, arg0, arg1) - self._host_write(data.decode("utf-8", errors="replace")) - return len(data) - if sid == SYS_TIMER_TICKS: - return self.state.timer_ticks() - if sid == SYS_TASK_COUNT: - return self.state.task_count - if sid == SYS_CUR_TASK: - return self.state.current_task - if sid == SYS_SERVICE_COUNT: - return self.state.service_count - if sid == SYS_SERVICE_READY_COUNT: - return self.state.service_ready - if sid == SYS_CONTEXT_SWITCHES: - return self.state.context_switches - if sid == SYS_KELF_COUNT: - return self.state.kelf_count - if sid == SYS_KELF_RUNS: - return self.state.kelf_runs - if sid == SYS_FS_NODE_COUNT: - return self._fs_node_count() - if sid == SYS_FS_CHILD_COUNT: - return self._fs_child_count(ql, arg0) - if sid == SYS_FS_GET_CHILD_NAME: - return self._fs_get_child_name(ql, arg0, arg1, arg2) - if sid == SYS_FS_READ: - return self._fs_read(ql, arg0, arg1, arg2) - if sid == SYS_EXEC_PATH: - return self._exec_path(ql, arg0) - if sid == SYS_EXEC_REQUESTS: - return self.state.exec_requests - if sid == SYS_EXEC_SUCCESS: - return self.state.exec_success - if sid == SYS_USER_SHELL_READY: - return self.state.user_shell_ready - if sid == SYS_USER_EXEC_REQUESTED: - return self.state.user_exec_requested - if sid == SYS_USER_LAUNCH_TRIES: - return self.state.user_launch_tries - if sid == SYS_USER_LAUNCH_OK: - return self.state.user_launch_ok - if sid == SYS_USER_LAUNCH_FAIL: - return self.state.user_launch_fail - if sid == SYS_TTY_COUNT: - return self.state.tty_count - if sid == SYS_TTY_ACTIVE: - return self.state.tty_active - if sid == SYS_TTY_SWITCH: - if arg0 >= self.state.tty_count: - return u64_neg1() - self.state.tty_active = int(arg0) - return 0 - if sid == SYS_TTY_WRITE: - data = self._read_guest_bytes(ql, arg0, arg1) - self._host_write(data.decode("utf-8", errors="replace")) - return len(data) - if sid == SYS_TTY_WRITE_CHAR: - ch = chr(arg0 & 0xFF) - if ch in ("\b", "\x7f"): - self._host_write("\b \b") - else: - self._host_write(ch) - return 0 - if sid == SYS_KBD_GET_CHAR: - key = self.state.pop_key() - return u64_neg1() if key is None else key - - # Unknown syscall: keep app running, report failure. - return u64_neg1() - - def _host_write(self, text: str) -> None: - if not text: - return - sys.stdout.write(text) - sys.stdout.flush() - - def _fs_node_count(self) -> int: - count = 1 # root node - for root, dirs, files in os.walk(self.rootfs): - dirs[:] = [d for d in dirs if not d.startswith(".")] - files = [f for f in files if not f.startswith(".")] - count += len(dirs) + len(files) - _ = root - return count - - def _fs_child_count(self, ql: Qiling, dir_ptr: int) -> int: - path = self._read_guest_cstring(ql, dir_ptr) - host_dir = self._guest_to_host(path, must_exist=True) - if host_dir is None or not host_dir.is_dir(): - return u64_neg1() - return len(self._list_children(host_dir)) - - def _fs_get_child_name(self, ql: Qiling, dir_ptr: int, index: int, out_ptr: int) -> int: - if out_ptr == 0: - return 0 - path = self._read_guest_cstring(ql, dir_ptr) - host_dir = self._guest_to_host(path, must_exist=True) - if host_dir is None or not host_dir.is_dir(): - return 0 - - children = self._list_children(host_dir) - if index >= len(children): - return 0 - - name = children[int(index)] - encoded = name.encode("utf-8", errors="replace") - if len(encoded) >= FS_NAME_MAX: - encoded = encoded[: FS_NAME_MAX - 1] - self._safe_write_mem(ql, out_ptr, encoded + b"\x00") - return 1 - - def _fs_read(self, ql: Qiling, path_ptr: int, out_ptr: int, buf_size: int) -> int: - if out_ptr == 0 or buf_size == 0: - return 0 - path = self._read_guest_cstring(ql, path_ptr) - host_path = self._guest_to_host(path, must_exist=True) - if host_path is None or not host_path.is_file(): - return 0 - - read_size = int(min(buf_size, MAX_IO_READ)) - try: - data = host_path.read_bytes()[:read_size] - except Exception: - return 0 - - if not data: - return 0 - self._safe_write_mem(ql, out_ptr, data) - return len(data) - - def _exec_path(self, ql: Qiling, path_ptr: int) -> int: - path = self._read_guest_cstring(ql, path_ptr) - guest_path = self._normalize_guest_path(path) - host_path = self._guest_to_host(guest_path, must_exist=True) - - self.state.exec_requests = u64(self.state.exec_requests + 1) - self.state.user_exec_requested = 1 - self.state.user_launch_tries = u64(self.state.user_launch_tries + 1) - - if host_path is None or not host_path.is_file(): - self.state.user_launch_fail = u64(self.state.user_launch_fail + 1) - return u64_neg1() - - if self.depth >= self.max_exec_depth: - print(f"[WINE][WARN] exec depth exceeded: {guest_path}", file=sys.stderr) - self.state.user_launch_fail = u64(self.state.user_launch_fail + 1) - return u64_neg1() - - child = CLeonOSWine( - host_path, - self.rootfs, - guest_path_hint=guest_path, - state=self.state, - depth=self.depth + 1, - max_exec_depth=self.max_exec_depth, - no_kbd=True, - verbose=self.verbose, - top_level=False, - ) - child_ret = child.run() - if child_ret is None: - self.state.user_launch_fail = u64(self.state.user_launch_fail + 1) - return u64_neg1() - - self.state.exec_success = u64(self.state.exec_success + 1) - self.state.user_launch_ok = u64(self.state.user_launch_ok + 1) - if guest_path.lower().startswith("/system/"): - self.state.kelf_runs = u64(self.state.kelf_runs + 1) - return 0 - - def _guest_to_host(self, guest_path: str, *, must_exist: bool) -> Optional[Path]: - norm = self._normalize_guest_path(guest_path) - if norm == "/": - return self.rootfs if (not must_exist or self.rootfs.exists()) else None - - current = self.rootfs - for part in [p for p in norm.split("/") if p]: - candidate = current / part - if candidate.exists(): - current = candidate - continue - if current.exists() and current.is_dir(): - match = self._find_case_insensitive(current, part) - if match is not None: - current = match - continue - current = candidate - - if must_exist and not current.exists(): - return None - return current - - @staticmethod - def _find_case_insensitive(parent: Path, name: str) -> Optional[Path]: - target = name.lower() - try: - for entry in parent.iterdir(): - if entry.name.lower() == target: - return entry - except Exception: - return None - return None - - @staticmethod - def _normalize_guest_path(path: str) -> str: - p = (path or "").replace("\\", "/").strip() - if not p: - return "/" - if not p.startswith("/"): - p = "/" + p - - parts = [] - for token in p.split("/"): - if token in ("", "."): - continue - if token == "..": - if parts: - parts.pop() - continue - parts.append(token) - return "/" + "/".join(parts) - - @staticmethod - def _list_children(dir_path: Path): - try: - names = [entry.name for entry in dir_path.iterdir() if not entry.name.startswith(".")] - except Exception: - return [] - names.sort(key=lambda x: x.lower()) - return names - - def _read_guest_cstring(self, ql: Qiling, addr: int, max_len: int = MAX_CSTR) -> str: - if addr == 0: - return "" - - out = bytearray() - for i in range(max_len): - try: - chunk = ql.mem.read(addr + i, 1) - except Exception: - break - if not chunk or chunk[0] == 0: - break - out.append(chunk[0]) - return out.decode("utf-8", errors="replace") - - def _read_guest_bytes(self, ql: Qiling, addr: int, size: int) -> bytes: - if addr == 0 or size == 0: - return b"" - safe_size = int(min(size, MAX_IO_READ)) - try: - return bytes(ql.mem.read(addr, safe_size)) - except Exception: - return b"" - - @staticmethod - def _safe_write_mem(ql: Qiling, addr: int, data: bytes) -> bool: - if addr == 0 or not data: - return False - try: - ql.mem.write(addr, data) - return True - except Exception: - return False - - @staticmethod - def _read_reg(ql: Qiling, reg: str) -> int: - arch_regs = getattr(getattr(ql, "arch", None), "regs", None) - if arch_regs is not None: - if hasattr(arch_regs, reg): - return int(getattr(arch_regs, reg)) - if hasattr(arch_regs, "read"): - try: - return int(arch_regs.read(reg)) - except Exception: - pass - - reg_obj = getattr(ql, "reg", None) - if reg_obj is not None: - if hasattr(reg_obj, reg): - return int(getattr(reg_obj, reg)) - if hasattr(reg_obj, "read"): - try: - return int(reg_obj.read(reg)) - except Exception: - pass - - raise RuntimeError(f"cannot read register: {reg}") - - @staticmethod - def _write_reg(ql: Qiling, reg: str, value: int) -> None: - v = u64(value) - arch_regs = getattr(getattr(ql, "arch", None), "regs", None) - if arch_regs is not None: - if hasattr(arch_regs, reg): - setattr(arch_regs, reg, v) - return - if hasattr(arch_regs, "write"): - try: - arch_regs.write(reg, v) - return - except Exception: - pass - - reg_obj = getattr(ql, "reg", None) - if reg_obj is not None: - if hasattr(reg_obj, reg): - setattr(reg_obj, reg, v) - return - if hasattr(reg_obj, "write"): - try: - reg_obj.write(reg, v) - return - except Exception: - pass - - raise RuntimeError(f"cannot write register: {reg}") - - @staticmethod - def _read_elf_entry(path: Path) -> int: - try: - data = path.read_bytes()[:64] - except Exception: - return 0 - if len(data) < 64: - return 0 - if data[0:4] != b"\x7fELF": - return 0 - if data[4] != 2 or data[5] != 1: - return 0 - return struct.unpack_from(" Path: - if path_arg: - root = Path(path_arg).expanduser().resolve() - if not root.exists() or not root.is_dir(): - raise FileNotFoundError(f"rootfs not found: {root}") - return root - - candidates = [ - Path("build/x86_64/ramdisk_root"), - Path("ramdisk"), - ] - for candidate in candidates: - if candidate.exists() and candidate.is_dir(): - return candidate.resolve() - - raise FileNotFoundError("rootfs not found; pass --rootfs") - - -def _guest_to_host_for_resolve(rootfs: Path, guest_path: str) -> Optional[Path]: - norm = CLeonOSWine._normalize_guest_path(guest_path) - if norm == "/": - return rootfs - - current = rootfs - for part in [p for p in norm.split("/") if p]: - candidate = current / part - if candidate.exists(): - current = candidate - continue - if current.exists() and current.is_dir(): - match = None - for entry in current.iterdir(): - if entry.name.lower() == part.lower(): - match = entry - break - if match is not None: - current = match - continue - current = candidate - - if current.exists(): - return current - return None - - -def resolve_elf_target(elf_arg: str, rootfs: Path) -> Tuple[Path, str]: - host_candidate = Path(elf_arg).expanduser() - if host_candidate.exists(): - host_path = host_candidate.resolve() - try: - rel = host_path.relative_to(rootfs) - guest_path = "/" + rel.as_posix() - except ValueError: - guest_path = "/" + host_path.name - return host_path, guest_path - - # Fallback: treat as CLeonOS guest path. - guest_path = CLeonOSWine._normalize_guest_path(elf_arg) - host_path = _guest_to_host_for_resolve(rootfs, guest_path) - if host_path is None: - raise FileNotFoundError(f"ELF not found as host path or guest path: {elf_arg}") - return host_path.resolve(), guest_path - - -def parse_args() -> argparse.Namespace: - parser = argparse.ArgumentParser(description="CLeonOS-Wine: run CLeonOS ELF with Qiling.") - parser.add_argument("elf", help="Target ELF path. Supports /guest/path or host file path.") - parser.add_argument("--rootfs", help="Rootfs directory (default: build/x86_64/ramdisk_root).") - parser.add_argument("--no-kbd", action="store_true", help="Disable host keyboard input pump.") - parser.add_argument("--max-exec-depth", type=int, default=DEFAULT_MAX_EXEC_DEPTH, help="Nested exec depth guard.") - parser.add_argument("--verbose", action="store_true", help="Enable verbose runner output.") - return parser.parse_args() - - -def main() -> int: - args = parse_args() - - try: - rootfs = resolve_rootfs(args.rootfs) - elf_path, guest_path = resolve_elf_target(args.elf, rootfs) - except Exception as exc: - print(f"[WINE][ERROR] {exc}", file=sys.stderr) - return 2 - - if args.verbose: - print(f"[WINE] rootfs={rootfs}", file=sys.stderr) - print(f"[WINE] elf={elf_path}", file=sys.stderr) - print(f"[WINE] guest={guest_path}", file=sys.stderr) - - state = SharedKernelState() - runner = CLeonOSWine( - elf_path=elf_path, - rootfs=rootfs, - guest_path_hint=guest_path, - state=state, - max_exec_depth=max(1, args.max_exec_depth), - no_kbd=args.no_kbd, - verbose=args.verbose, - top_level=True, - ) - ret = runner.run() - if ret is None: - return 1 - - if args.verbose: - print(f"\n[WINE] exit=0x{ret:016X}", file=sys.stderr) - return int(ret & 0xFF) - - -if __name__ == "__main__": - raise SystemExit(main()) - - - - - - - diff --git a/wine/cleonos_wine.py b/wine/cleonos_wine.py deleted file mode 100644 index 8cfad7c..0000000 --- a/wine/cleonos_wine.py +++ /dev/null @@ -1,7 +0,0 @@ -#!/usr/bin/env python3 - -from cleonos_wine_lib.cli import main - - -if __name__ == "__main__": - raise SystemExit(main()) \ No newline at end of file diff --git a/wine/cleonos_wine_lib/__init__.py b/wine/cleonos_wine_lib/__init__.py deleted file mode 100644 index 9e23296..0000000 --- a/wine/cleonos_wine_lib/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -from .cli import main -from .runner import CLeonOSWineNative -from .state import SharedKernelState - -__all__ = ["main", "CLeonOSWineNative", "SharedKernelState"] \ No newline at end of file diff --git a/wine/cleonos_wine_lib/__pycache__/__init__.cpython-313.pyc b/wine/cleonos_wine_lib/__pycache__/__init__.cpython-313.pyc deleted file mode 100644 index 3b7e531..0000000 Binary files a/wine/cleonos_wine_lib/__pycache__/__init__.cpython-313.pyc and /dev/null differ diff --git a/wine/cleonos_wine_lib/__pycache__/cli.cpython-313.pyc b/wine/cleonos_wine_lib/__pycache__/cli.cpython-313.pyc deleted file mode 100644 index 5769293..0000000 Binary files a/wine/cleonos_wine_lib/__pycache__/cli.cpython-313.pyc and /dev/null differ diff --git a/wine/cleonos_wine_lib/__pycache__/constants.cpython-313.pyc b/wine/cleonos_wine_lib/__pycache__/constants.cpython-313.pyc deleted file mode 100644 index a8f38d4..0000000 Binary files a/wine/cleonos_wine_lib/__pycache__/constants.cpython-313.pyc and /dev/null differ diff --git a/wine/cleonos_wine_lib/__pycache__/fb_window.cpython-313.pyc b/wine/cleonos_wine_lib/__pycache__/fb_window.cpython-313.pyc deleted file mode 100644 index 95fa9d0..0000000 Binary files a/wine/cleonos_wine_lib/__pycache__/fb_window.cpython-313.pyc and /dev/null differ diff --git a/wine/cleonos_wine_lib/__pycache__/input_pump.cpython-313.pyc b/wine/cleonos_wine_lib/__pycache__/input_pump.cpython-313.pyc deleted file mode 100644 index 9735ec2..0000000 Binary files a/wine/cleonos_wine_lib/__pycache__/input_pump.cpython-313.pyc and /dev/null differ diff --git a/wine/cleonos_wine_lib/__pycache__/platform.cpython-313.pyc b/wine/cleonos_wine_lib/__pycache__/platform.cpython-313.pyc deleted file mode 100644 index e8bec4d..0000000 Binary files a/wine/cleonos_wine_lib/__pycache__/platform.cpython-313.pyc and /dev/null differ diff --git a/wine/cleonos_wine_lib/__pycache__/runner.cpython-313.pyc b/wine/cleonos_wine_lib/__pycache__/runner.cpython-313.pyc deleted file mode 100644 index 16ac5d4..0000000 Binary files a/wine/cleonos_wine_lib/__pycache__/runner.cpython-313.pyc and /dev/null differ diff --git a/wine/cleonos_wine_lib/__pycache__/state.cpython-313.pyc b/wine/cleonos_wine_lib/__pycache__/state.cpython-313.pyc deleted file mode 100644 index 9359408..0000000 Binary files a/wine/cleonos_wine_lib/__pycache__/state.cpython-313.pyc and /dev/null differ diff --git a/wine/cleonos_wine_lib/cli.py b/wine/cleonos_wine_lib/cli.py deleted file mode 100644 index e04a4ed..0000000 --- a/wine/cleonos_wine_lib/cli.py +++ /dev/null @@ -1,124 +0,0 @@ -from __future__ import annotations - -import argparse -import shlex -import sys -from pathlib import Path -from typing import List - -from .constants import DEFAULT_MAX_EXEC_DEPTH -from .runner import CLeonOSWineNative, resolve_elf_target, resolve_rootfs -from .state import SharedKernelState - - -def _encode_cstr(text: str, size: int) -> bytes: - if size <= 0: - return b"" - - data = text.encode("utf-8", errors="replace") - if len(data) >= size: - data = data[: size - 1] - return data + (b"\x00" * (size - len(data))) - - -def _normalize_guest_cwd(cwd: str) -> str: - value = (cwd or "").strip().replace("\\", "/") - if not value: - return "/" - if not value.startswith("/"): - value = "/" + value - return value - - -def _write_command_context(rootfs: Path, guest_path: str, guest_args: List[str], cwd: str) -> None: - name = Path(guest_path).name - cmd = name[:-4] if name.lower().endswith(".elf") else name - arg = " ".join(guest_args) - ctx_payload = b"".join( - ( - _encode_cstr(cmd, 32), - _encode_cstr(arg, 160), - _encode_cstr(_normalize_guest_cwd(cwd), 192), - ) - ) - ctx_path = rootfs / "temp" / ".ush_cmd_ctx.bin" - ctx_path.parent.mkdir(parents=True, exist_ok=True) - ctx_path.write_bytes(ctx_payload) - - -def parse_args() -> argparse.Namespace: - parser = argparse.ArgumentParser(description="CLeonOS-Wine: run CLeonOS ELF with Unicorn.") - parser.add_argument("elf", help="Target ELF path. Supports /guest/path or host file path.") - parser.add_argument("--rootfs", help="Rootfs directory (default: build/x86_64/ramdisk_root).") - parser.add_argument("--no-kbd", action="store_true", help="Disable host keyboard input pump.") - parser.add_argument("--fb-window", action="store_true", help="Enable host framebuffer window (pygame backend).") - parser.add_argument("--fb-scale", type=int, default=2, help="Framebuffer window scale factor (default: 2).") - parser.add_argument("--fb-max-fps", type=int, default=60, help="Framebuffer present FPS limit (default: 60).") - parser.add_argument("--fb-hold-ms", type=int, default=2500, help="Keep fb window visible after app exits (ms).") - parser.add_argument("--argv-line", default="", help="Guest argv as one line (whitespace-separated).") - parser.add_argument("--cwd", default="/", help="Guest cwd for command-context apps.") - parser.add_argument("guest_args", nargs="*", help="Guest args (for dash-prefixed args use '--').") - parser.add_argument("--max-exec-depth", type=int, default=DEFAULT_MAX_EXEC_DEPTH, help="Nested exec depth guard.") - parser.add_argument("--verbose", action="store_true", help="Enable verbose runner output.") - return parser.parse_args() - - -def main() -> int: - args = parse_args() - guest_args = list(args.guest_args or []) - argv_items: List[str] - - try: - rootfs = resolve_rootfs(args.rootfs) - elf_path, guest_path = resolve_elf_target(args.elf, rootfs) - except Exception as exc: - print(f"[WINE][ERROR] {exc}", file=sys.stderr) - return 2 - - if len(guest_args) > 0 and guest_args[0] == "--": - guest_args = guest_args[1:] - - if (args.argv_line or "").strip(): - try: - guest_args = shlex.split(args.argv_line) - except Exception: - guest_args = (args.argv_line or "").split() - - argv_items = [guest_path] - argv_items.extend(guest_args) - - if guest_args: - try: - _write_command_context(rootfs, guest_path, guest_args, args.cwd) - except Exception as exc: - print(f"[WINE][WARN] failed to write command context: {exc}", file=sys.stderr) - - if args.verbose: - print("[WINE] backend=unicorn", file=sys.stderr) - print(f"[WINE] rootfs={rootfs}", file=sys.stderr) - print(f"[WINE] elf={elf_path}", file=sys.stderr) - print(f"[WINE] guest={guest_path}", file=sys.stderr) - - state = SharedKernelState() - runner = CLeonOSWineNative( - elf_path=elf_path, - rootfs=rootfs, - guest_path_hint=guest_path, - state=state, - max_exec_depth=max(1, args.max_exec_depth), - no_kbd=args.no_kbd, - verbose=args.verbose, - top_level=True, - fb_window=args.fb_window, - fb_scale=max(1, args.fb_scale), - fb_max_fps=max(1, args.fb_max_fps), - fb_hold_ms=max(0, args.fb_hold_ms), - argv_items=argv_items, - ) - ret = runner.run() - if ret is None: - return 1 - - if args.verbose: - print(f"\n[WINE] exit=0x{ret:016X}", file=sys.stderr) - return int(ret & 0xFF) diff --git a/wine/cleonos_wine_lib/constants.py b/wine/cleonos_wine_lib/constants.py deleted file mode 100644 index 3f4e96a..0000000 --- a/wine/cleonos_wine_lib/constants.py +++ /dev/null @@ -1,149 +0,0 @@ -from __future__ import annotations - -U64_MASK = (1 << 64) - 1 -PAGE_SIZE = 0x1000 -MAX_CSTR = 4096 -MAX_IO_READ = 1 << 20 -DEFAULT_MAX_EXEC_DEPTH = 6 -FS_NAME_MAX = 96 - -CLKS_VERSION_STRING = "1.0.0-alpha" -CLEONOS_VERSION_STRING = "1.0.0-alpha" -WINE_IMPLEMENTED_SYSCALL_COUNT = 85 -# Frozen policy: this version string must not change in future updates. -CLEONOS_WINE_VERSION_STRING = "85.0.0-wine" - -# CLeonOS syscall IDs from cleonos/c/include/cleonos_syscall.h -SYS_LOG_WRITE = 0 -SYS_TIMER_TICKS = 1 -SYS_TASK_COUNT = 2 -SYS_CUR_TASK = 3 -SYS_SERVICE_COUNT = 4 -SYS_SERVICE_READY_COUNT = 5 -SYS_CONTEXT_SWITCHES = 6 -SYS_KELF_COUNT = 7 -SYS_KELF_RUNS = 8 -SYS_FS_NODE_COUNT = 9 -SYS_FS_CHILD_COUNT = 10 -SYS_FS_GET_CHILD_NAME = 11 -SYS_FS_READ = 12 -SYS_EXEC_PATH = 13 -SYS_EXEC_REQUESTS = 14 -SYS_EXEC_SUCCESS = 15 -SYS_USER_SHELL_READY = 16 -SYS_USER_EXEC_REQUESTED = 17 -SYS_USER_LAUNCH_TRIES = 18 -SYS_USER_LAUNCH_OK = 19 -SYS_USER_LAUNCH_FAIL = 20 -SYS_TTY_COUNT = 21 -SYS_TTY_ACTIVE = 22 -SYS_TTY_SWITCH = 23 -SYS_TTY_WRITE = 24 -SYS_TTY_WRITE_CHAR = 25 -SYS_KBD_GET_CHAR = 26 -SYS_FS_STAT_TYPE = 27 -SYS_FS_STAT_SIZE = 28 -SYS_FS_MKDIR = 29 -SYS_FS_WRITE = 30 -SYS_FS_APPEND = 31 -SYS_FS_REMOVE = 32 -SYS_LOG_JOURNAL_COUNT = 33 -SYS_LOG_JOURNAL_READ = 34 -SYS_KBD_BUFFERED = 35 -SYS_KBD_PUSHED = 36 -SYS_KBD_POPPED = 37 -SYS_KBD_DROPPED = 38 -SYS_KBD_HOTKEY_SWITCHES = 39 -SYS_GETPID = 40 -SYS_SPAWN_PATH = 41 -SYS_WAITPID = 42 -SYS_EXIT = 43 -SYS_SLEEP_TICKS = 44 -SYS_YIELD = 45 -SYS_SHUTDOWN = 46 -SYS_RESTART = 47 -SYS_AUDIO_AVAILABLE = 48 -SYS_AUDIO_PLAY_TONE = 49 -SYS_AUDIO_STOP = 50 -SYS_EXEC_PATHV = 51 -SYS_SPAWN_PATHV = 52 -SYS_PROC_ARGC = 53 -SYS_PROC_ARGV = 54 -SYS_PROC_ENVC = 55 -SYS_PROC_ENV = 56 -SYS_PROC_LAST_SIGNAL = 57 -SYS_PROC_FAULT_VECTOR = 58 -SYS_PROC_FAULT_ERROR = 59 -SYS_PROC_FAULT_RIP = 60 -SYS_PROC_COUNT = 61 -SYS_PROC_PID_AT = 62 -SYS_PROC_SNAPSHOT = 63 -SYS_PROC_KILL = 64 -SYS_KDBG_SYM = 65 -SYS_KDBG_BT = 66 -SYS_KDBG_REGS = 67 -SYS_STATS_TOTAL = 68 -SYS_STATS_ID_COUNT = 69 -SYS_STATS_RECENT_WINDOW = 70 -SYS_STATS_RECENT_ID = 71 -SYS_FD_OPEN = 72 -SYS_FD_READ = 73 -SYS_FD_WRITE = 74 -SYS_FD_CLOSE = 75 -SYS_FD_DUP = 76 -SYS_DL_OPEN = 77 -SYS_DL_CLOSE = 78 -SYS_DL_SYM = 79 -SYS_EXEC_PATHV_IO = 80 -SYS_FB_INFO = 81 -SYS_FB_BLIT = 82 -SYS_FB_CLEAR = 83 -SYS_KERNEL_VERSION = 84 -SYS_DISK_PRESENT = 85 -SYS_DISK_SIZE_BYTES = 86 -SYS_DISK_SECTOR_COUNT = 87 -SYS_DISK_FORMATTED = 88 -SYS_DISK_FORMAT_FAT32 = 89 -SYS_DISK_MOUNT = 90 -SYS_DISK_MOUNTED = 91 -SYS_DISK_MOUNT_PATH = 92 -SYS_DISK_READ_SECTOR = 93 -SYS_DISK_WRITE_SECTOR = 94 - -# proc states (from cleonos/c/include/cleonos_syscall.h) -PROC_STATE_UNUSED = 0 -PROC_STATE_PENDING = 1 -PROC_STATE_RUNNING = 2 -PROC_STATE_EXITED = 3 -PROC_STATE_STOPPED = 4 - -# signals (from cleonos/c/include/cleonos_syscall.h) -SIGKILL = 9 -SIGTERM = 15 -SIGCONT = 18 -SIGSTOP = 19 - -# open flags (from cleonos/c/include/cleonos_syscall.h) -O_RDONLY = 0x0000 -O_WRONLY = 0x0001 -O_RDWR = 0x0002 -O_CREAT = 0x0040 -O_TRUNC = 0x0200 -O_APPEND = 0x0400 -FD_INHERIT = U64_MASK - - -def u64(value: int) -> int: - return value & U64_MASK - - -def u64_neg1() -> int: - return U64_MASK - - -def page_floor(addr: int) -> int: - return addr & ~(PAGE_SIZE - 1) - - -def page_ceil(addr: int) -> int: - return (addr + PAGE_SIZE - 1) & ~(PAGE_SIZE - 1) diff --git a/wine/cleonos_wine_lib/fb_window.py b/wine/cleonos_wine_lib/fb_window.py deleted file mode 100644 index 57c9865..0000000 --- a/wine/cleonos_wine_lib/fb_window.py +++ /dev/null @@ -1,143 +0,0 @@ -from __future__ import annotations - -import sys -import time -from typing import Optional - -from .state import SharedKernelState - - -CLEONOS_KEY_LEFT = 0x01 -CLEONOS_KEY_RIGHT = 0x02 -CLEONOS_KEY_UP = 0x03 -CLEONOS_KEY_DOWN = 0x04 - - -class FBWindow: - def __init__(self, pygame_mod, width: int, height: int, scale: int, max_fps: int, *, verbose: bool = False) -> None: - self._pygame = pygame_mod - self.width = int(width) - self.height = int(height) - self.scale = max(1, int(scale)) - self._verbose = bool(verbose) - self._closed = False - self._last_present_ns = 0 - self._frame_interval_ns = int(1_000_000_000 / max_fps) if int(max_fps) > 0 else 0 - - window_w = self.width * self.scale - window_h = self.height * self.scale - - self._pygame.display.set_caption("CLeonOS Wine Framebuffer") - self._screen = self._pygame.display.set_mode((window_w, window_h)) - - @classmethod - def create( - cls, - width: int, - height: int, - scale: int, - max_fps: int, - *, - verbose: bool = False, - ) -> Optional["FBWindow"]: - try: - import pygame # type: ignore - except Exception as exc: - print(f"[WINE][WARN] framebuffer window disabled: pygame import failed ({exc})", file=sys.stderr) - return None - - try: - pygame.init() - pygame.display.init() - return cls(pygame, width, height, scale, max_fps, verbose=verbose) - except Exception as exc: - print(f"[WINE][WARN] framebuffer window disabled: unable to init display ({exc})", file=sys.stderr) - try: - pygame.quit() - except Exception: - pass - return None - - def close(self) -> None: - if self._closed: - return - - self._closed = True - try: - self._pygame.display.quit() - except Exception: - pass - try: - self._pygame.quit() - except Exception: - pass - - def is_closed(self) -> bool: - return self._closed - - def pump_input(self, state: SharedKernelState) -> None: - if self._closed: - return - - try: - events = self._pygame.event.get() - except Exception: - self.close() - return - - for event in events: - if event.type == self._pygame.QUIT: - self.close() - continue - - if event.type != self._pygame.KEYDOWN: - continue - - key = event.key - ch = 0 - - if key == self._pygame.K_LEFT: - ch = CLEONOS_KEY_LEFT - elif key == self._pygame.K_RIGHT: - ch = CLEONOS_KEY_RIGHT - elif key == self._pygame.K_UP: - ch = CLEONOS_KEY_UP - elif key == self._pygame.K_DOWN: - ch = CLEONOS_KEY_DOWN - elif key in (self._pygame.K_RETURN, self._pygame.K_KP_ENTER): - ch = ord("\n") - elif key == self._pygame.K_BACKSPACE: - ch = 8 - elif key == self._pygame.K_ESCAPE: - ch = 27 - elif key == self._pygame.K_TAB: - ch = ord("\t") - else: - text = getattr(event, "unicode", "") - if isinstance(text, str) and len(text) == 1: - code = ord(text) - if 32 <= code <= 126: - ch = code - - if ch != 0: - state.push_key(ch) - - def present(self, pixels_bgra: bytearray, *, force: bool = False) -> bool: - if self._closed: - return False - - now_ns = time.monotonic_ns() - if not force and self._frame_interval_ns > 0 and (now_ns - self._last_present_ns) < self._frame_interval_ns: - return False - - try: - src = self._pygame.image.frombuffer(pixels_bgra, (self.width, self.height), "BGRA") - if self.scale != 1: - src = self._pygame.transform.scale(src, (self.width * self.scale, self.height * self.scale)) - self._screen.blit(src, (0, 0)) - self._pygame.display.flip() - self._last_present_ns = now_ns - return True - except Exception: - self.close() - return False diff --git a/wine/cleonos_wine_lib/input_pump.py b/wine/cleonos_wine_lib/input_pump.py deleted file mode 100644 index 0581a18..0000000 --- a/wine/cleonos_wine_lib/input_pump.py +++ /dev/null @@ -1,99 +0,0 @@ -from __future__ import annotations - -import os -import sys -import threading -import time -from typing import Optional - -from .state import SharedKernelState - - -class InputPump: - def __init__(self, state: SharedKernelState) -> None: - self.state = state - self._stop = threading.Event() - self._thread: Optional[threading.Thread] = None - self._posix_term_state = None - - def start(self) -> None: - if self._thread is not None: - return - if not sys.stdin or not hasattr(sys.stdin, "isatty") or not sys.stdin.isatty(): - return - self._thread = threading.Thread(target=self._run, name="cleonos-wine-input", daemon=True) - self._thread.start() - - def stop(self) -> None: - self._stop.set() - if self._thread is not None: - self._thread.join(timeout=0.2) - self._thread = None - self._restore_posix_tty() - - def _run(self) -> None: - if os.name == "nt": - self._run_windows() - else: - self._run_posix() - - def _run_windows(self) -> None: - import msvcrt # pylint: disable=import-error - - while not self._stop.is_set(): - if not msvcrt.kbhit(): - time.sleep(0.005) - continue - - ch = msvcrt.getwch() - if ch in ("\x00", "\xe0"): - _ = msvcrt.getwch() - continue - - norm = self._normalize_char(ch) - if norm is None: - continue - self.state.push_key(ord(norm)) - - def _run_posix(self) -> None: - import select - import termios - import tty - - fd = sys.stdin.fileno() - self._posix_term_state = termios.tcgetattr(fd) - tty.setcbreak(fd) - - try: - while not self._stop.is_set(): - readable, _, _ = select.select([sys.stdin], [], [], 0.05) - if not readable: - continue - ch = sys.stdin.read(1) - norm = self._normalize_char(ch) - if norm is None: - continue - self.state.push_key(ord(norm)) - finally: - self._restore_posix_tty() - - def _restore_posix_tty(self) -> None: - if self._posix_term_state is None: - return - try: - import termios - - fd = sys.stdin.fileno() - termios.tcsetattr(fd, termios.TCSADRAIN, self._posix_term_state) - except Exception: - pass - finally: - self._posix_term_state = None - - @staticmethod - def _normalize_char(ch: str) -> Optional[str]: - if not ch: - return None - if ch == "\r": - return "\n" - return ch \ No newline at end of file diff --git a/wine/cleonos_wine_lib/platform.py b/wine/cleonos_wine_lib/platform.py deleted file mode 100644 index cb82384..0000000 --- a/wine/cleonos_wine_lib/platform.py +++ /dev/null @@ -1,59 +0,0 @@ -from __future__ import annotations - -import sys - -try: - from unicorn import Uc, UcError - from unicorn import UC_ARCH_X86, UC_MODE_64 - from unicorn import UC_HOOK_CODE, UC_HOOK_INTR - from unicorn import UC_PROT_ALL, UC_PROT_EXEC, UC_PROT_READ, UC_PROT_WRITE - from unicorn import ( - UC_ERR_FETCH_PROT, - UC_ERR_FETCH_UNMAPPED, - UC_ERR_INSN_INVALID, - UC_ERR_READ_PROT, - UC_ERR_READ_UNMAPPED, - UC_ERR_WRITE_PROT, - UC_ERR_WRITE_UNMAPPED, - ) - from unicorn.x86_const import ( - UC_X86_REG_RAX, - UC_X86_REG_RBX, - UC_X86_REG_RCX, - UC_X86_REG_RDX, - UC_X86_REG_RBP, - UC_X86_REG_RIP, - UC_X86_REG_RSP, - ) -except Exception as exc: - print("[WINE][ERROR] unicorn import failed. Install dependencies first:", file=sys.stderr) - print(" pip install -r wine/requirements.txt", file=sys.stderr) - raise SystemExit(1) from exc - - -__all__ = [ - "Uc", - "UcError", - "UC_ARCH_X86", - "UC_MODE_64", - "UC_HOOK_CODE", - "UC_HOOK_INTR", - "UC_PROT_ALL", - "UC_PROT_EXEC", - "UC_PROT_READ", - "UC_PROT_WRITE", - "UC_ERR_FETCH_PROT", - "UC_ERR_FETCH_UNMAPPED", - "UC_ERR_INSN_INVALID", - "UC_ERR_READ_PROT", - "UC_ERR_READ_UNMAPPED", - "UC_ERR_WRITE_PROT", - "UC_ERR_WRITE_UNMAPPED", - "UC_X86_REG_RAX", - "UC_X86_REG_RBX", - "UC_X86_REG_RCX", - "UC_X86_REG_RDX", - "UC_X86_REG_RBP", - "UC_X86_REG_RIP", - "UC_X86_REG_RSP", -] diff --git a/wine/cleonos_wine_lib/runner.py b/wine/cleonos_wine_lib/runner.py deleted file mode 100644 index d7ceb6f..0000000 --- a/wine/cleonos_wine_lib/runner.py +++ /dev/null @@ -1,2525 +0,0 @@ -from __future__ import annotations - -import os -import shutil -import struct -import sys -import time -from dataclasses import dataclass, field -from pathlib import Path -from typing import Dict, List, Optional, Tuple - -from .constants import ( - CLKS_VERSION_STRING, - DEFAULT_MAX_EXEC_DEPTH, - FD_INHERIT, - FS_NAME_MAX, - MAX_CSTR, - MAX_IO_READ, - O_APPEND, - O_CREAT, - O_RDONLY, - O_RDWR, - O_TRUNC, - O_WRONLY, - PAGE_SIZE, - PROC_STATE_EXITED, - PROC_STATE_PENDING, - PROC_STATE_RUNNING, - PROC_STATE_STOPPED, - SIGCONT, - SIGKILL, - SIGSTOP, - SIGTERM, - SYS_AUDIO_AVAILABLE, - SYS_AUDIO_PLAY_TONE, - SYS_AUDIO_STOP, - SYS_CONTEXT_SWITCHES, - SYS_CUR_TASK, - SYS_DL_CLOSE, - SYS_DL_OPEN, - SYS_DL_SYM, - SYS_EXEC_PATH, - SYS_EXEC_PATHV, - SYS_EXEC_PATHV_IO, - SYS_EXEC_REQUESTS, - SYS_EXEC_SUCCESS, - SYS_EXIT, - SYS_FD_CLOSE, - SYS_FD_DUP, - SYS_FD_OPEN, - SYS_FD_READ, - SYS_FD_WRITE, - SYS_FB_BLIT, - SYS_FB_CLEAR, - SYS_FB_INFO, - SYS_FS_APPEND, - SYS_FS_CHILD_COUNT, - SYS_FS_GET_CHILD_NAME, - SYS_FS_MKDIR, - SYS_FS_NODE_COUNT, - SYS_FS_READ, - SYS_FS_REMOVE, - SYS_FS_STAT_SIZE, - SYS_FS_STAT_TYPE, - SYS_FS_WRITE, - SYS_DISK_FORMATTED, - SYS_DISK_FORMAT_FAT32, - SYS_DISK_MOUNT, - SYS_DISK_MOUNTED, - SYS_DISK_MOUNT_PATH, - SYS_DISK_PRESENT, - SYS_DISK_READ_SECTOR, - SYS_DISK_SECTOR_COUNT, - SYS_DISK_SIZE_BYTES, - SYS_DISK_WRITE_SECTOR, - SYS_GETPID, - SYS_KERNEL_VERSION, - SYS_KBD_BUFFERED, - SYS_KBD_DROPPED, - SYS_KBD_GET_CHAR, - SYS_KBD_HOTKEY_SWITCHES, - SYS_KBD_POPPED, - SYS_KBD_PUSHED, - SYS_KELF_COUNT, - SYS_KELF_RUNS, - SYS_LOG_JOURNAL_COUNT, - SYS_LOG_JOURNAL_READ, - SYS_LOG_WRITE, - SYS_PROC_ARGC, - SYS_PROC_ARGV, - SYS_PROC_COUNT, - SYS_PROC_ENVC, - SYS_PROC_ENV, - SYS_PROC_FAULT_ERROR, - SYS_PROC_FAULT_RIP, - SYS_PROC_FAULT_VECTOR, - SYS_PROC_KILL, - SYS_PROC_LAST_SIGNAL, - SYS_PROC_PID_AT, - SYS_PROC_SNAPSHOT, - SYS_RESTART, - SYS_SERVICE_COUNT, - SYS_SERVICE_READY_COUNT, - SYS_SHUTDOWN, - SYS_SLEEP_TICKS, - SYS_SPAWN_PATH, - SYS_SPAWN_PATHV, - SYS_STATS_ID_COUNT, - SYS_STATS_RECENT_ID, - SYS_STATS_RECENT_WINDOW, - SYS_STATS_TOTAL, - SYS_TASK_COUNT, - SYS_TIMER_TICKS, - SYS_TTY_ACTIVE, - SYS_TTY_COUNT, - SYS_TTY_SWITCH, - SYS_TTY_WRITE, - SYS_TTY_WRITE_CHAR, - SYS_USER_EXEC_REQUESTED, - SYS_USER_LAUNCH_FAIL, - SYS_USER_LAUNCH_OK, - SYS_USER_LAUNCH_TRIES, - SYS_USER_SHELL_READY, - SYS_WAITPID, - SYS_YIELD, - page_ceil, - page_floor, - u64, - u64_neg1, -) -from .fb_window import FBWindow -from .input_pump import InputPump -from .platform import ( - Uc, - UcError, - UC_ARCH_X86, - UC_ERR_FETCH_PROT, - UC_ERR_FETCH_UNMAPPED, - UC_ERR_INSN_INVALID, - UC_ERR_READ_PROT, - UC_ERR_READ_UNMAPPED, - UC_ERR_WRITE_PROT, - UC_ERR_WRITE_UNMAPPED, - UC_HOOK_CODE, - UC_HOOK_INTR, - UC_MODE_64, - UC_PROT_ALL, - UC_PROT_EXEC, - UC_PROT_READ, - UC_PROT_WRITE, - UC_X86_REG_RAX, - UC_X86_REG_RBP, - UC_X86_REG_RBX, - UC_X86_REG_RCX, - UC_X86_REG_RDX, - UC_X86_REG_RSP, - UC_X86_REG_RIP, -) -from .state import SharedKernelState - - -@dataclass -class ELFSegment: - vaddr: int - memsz: int - flags: int - data: bytes - - -@dataclass -class ELFImage: - entry: int - segments: List[ELFSegment] - - -@dataclass -class FDEntry: - kind: str - flags: int - offset: int = 0 - path: str = "" - tty_index: int = 0 - - -@dataclass -class DLImage: - handle: int - guest_path: str - host_path: str - owner_pid: int - ref_count: int - map_start: int - map_end: int - load_bias: int - symbols: Dict[str, int] = field(default_factory=dict) - - -EXEC_PATH_MAX = 192 -EXEC_ARG_LINE_MAX = 256 -EXEC_ENV_LINE_MAX = 512 -EXEC_MAX_ARGS = 24 -EXEC_MAX_ENVS = 24 -EXEC_ITEM_MAX = 128 -EXEC_STATUS_SIGNAL_FLAG = 1 << 63 -PROC_PATH_MAX = 192 -FD_MAX = 64 -DL_MAX_NAME = 192 -DL_MAX_SYMBOL = 128 -DL_BASE_START = 0x0000000100000000 -DL_BASE_GAP = 0x0000000000100000 -FB_DEFAULT_WIDTH = 1280 -FB_DEFAULT_HEIGHT = 800 -FB_MAX_DIM = 4096 -FB_MAX_UPLOAD_BYTES = 64 * 1024 * 1024 - - -class CLeonOSWineNative: - def __init__( - self, - elf_path: Path, - rootfs: Path, - guest_path_hint: str, - *, - state: Optional[SharedKernelState] = None, - depth: int = 0, - max_exec_depth: int = DEFAULT_MAX_EXEC_DEPTH, - no_kbd: bool = False, - verbose: bool = False, - top_level: bool = True, - pid: int = 0, - ppid: int = 0, - argv_items: Optional[List[str]] = None, - env_items: Optional[List[str]] = None, - inherited_fds: Optional[Dict[int, FDEntry]] = None, - fb_window: bool = False, - fb_scale: int = 2, - fb_max_fps: int = 60, - fb_hold_ms: int = 2500, - ) -> None: - self.elf_path = elf_path - self.rootfs = rootfs - self.guest_path_hint = guest_path_hint - self.state = state if state is not None else SharedKernelState() - self.depth = depth - self.max_exec_depth = max_exec_depth - self.no_kbd = no_kbd - self.verbose = verbose - self.top_level = top_level - self.pid = int(pid) - self.ppid = int(ppid) - self.fb_window = bool(fb_window) - self.fb_scale = max(1, int(fb_scale)) - self.fb_max_fps = max(1, int(fb_max_fps)) - self.fb_hold_ms = max(0, int(fb_hold_ms)) - self.argv_items = list(argv_items) if argv_items is not None else [] - self.env_items = list(env_items) if env_items is not None else [] - self._exit_requested = False - self._exit_status = 0 - - self.image = self._parse_elf(self.elf_path) - self.exit_code: Optional[int] = None - self._input_pump: Optional[InputPump] = None - - self._stack_base = 0x00007FFF00000000 - self._stack_size = 0x0000000000020000 - self._ret_sentinel = 0x00007FFF10000000 - self._mapped_ranges: List[Tuple[int, int]] = [] - self._tty_index = int(self.state.tty_active) - self._fds: Dict[int, FDEntry] = {} - self._fd_inherited = inherited_fds if inherited_fds is not None else {} - self._dl_images: Dict[int, DLImage] = {} - self._dl_path_to_handle: Dict[str, int] = {} - self._dl_next_handle = 1 - self._dl_next_base = DL_BASE_START - - self._fb_width = self._bounded_env_int("CLEONOS_WINE_FB_WIDTH", FB_DEFAULT_WIDTH, 64, FB_MAX_DIM) - self._fb_height = self._bounded_env_int("CLEONOS_WINE_FB_HEIGHT", FB_DEFAULT_HEIGHT, 64, FB_MAX_DIM) - self._fb_bpp = 32 - self._fb_pitch = self._fb_width * 4 - self._fb_pixels = bytearray(self._fb_pitch * self._fb_height) - self._fb_window: Optional[FBWindow] = None - self._fb_window_failed = False - self._fb_dirty = False - self._fb_presented_once = False - - self._disk_present = True - self._disk_size_bytes = self._bounded_env_int("CLEONOS_WINE_DISK_SIZE_MB", 64, 8, 4096) * 1024 * 1024 - self._disk_mount_path = "/temp/disk" - self._disk_root = (self.rootfs / "__clks_disk0__").resolve() - self._disk_marker = self._disk_root / ".fat32" - self._disk_image_file = self._disk_root / ".rawdisk.img" - self._disk_formatted = False - self._disk_mounted = False - try: - self._disk_root.mkdir(parents=True, exist_ok=True) - self._disk_prepare_raw_image() - if self._disk_marker.exists(): - self._disk_formatted = True - self._disk_mounted = True - except Exception: - self._disk_present = False - self._disk_formatted = False - self._disk_mounted = False - - default_path = self._normalize_guest_path(self.guest_path_hint or f"/{self.elf_path.name}") - self.argv_items, self.env_items = self._prepare_exec_items(default_path, self.argv_items, self.env_items) - self._init_default_fds() - - @staticmethod - def _clone_fd_entry(entry: FDEntry) -> FDEntry: - return FDEntry(kind=entry.kind, flags=int(entry.flags), offset=int(entry.offset), path=str(entry.path), tty_index=int(entry.tty_index)) - - @staticmethod - def _fd_access_mode(flags: int) -> int: - return int(flags) & 0x3 - - @classmethod - def _fd_access_mode_valid(cls, flags: int) -> bool: - mode = cls._fd_access_mode(flags) - return mode in (O_RDONLY, O_WRONLY, O_RDWR) - - @classmethod - def _fd_can_read(cls, flags: int) -> bool: - mode = cls._fd_access_mode(flags) - return mode in (O_RDONLY, O_RDWR) - - @classmethod - def _fd_can_write(cls, flags: int) -> bool: - mode = cls._fd_access_mode(flags) - return mode in (O_WRONLY, O_RDWR) - - @staticmethod - def _bounded_env_int(name: str, default: int, min_value: int, max_value: int) -> int: - raw = os.environ.get(name) - value = default - - if raw is not None: - try: - value = int(raw.strip(), 10) - except Exception: - value = default - - if value < min_value: - value = min_value - if value > max_value: - value = max_value - return value - - def _ensure_fb_window(self) -> None: - if not self.fb_window: - return - - if self._fb_window is not None or self._fb_window_failed: - return - - self._fb_window = FBWindow.create( - self._fb_width, - self._fb_height, - self.fb_scale, - self.fb_max_fps, - verbose=self.verbose, - ) - - if self._fb_window is None: - self._fb_window_failed = True - - def _fb_poll_window(self) -> None: - self._ensure_fb_window() - if self._fb_window is not None: - self._fb_window.pump_input(self.state) - if self._fb_window.is_closed(): - self._fb_window = None - - def _fb_present(self, *, force: bool = False) -> None: - self._ensure_fb_window() - if self._fb_window is None: - return - - self._fb_window.pump_input(self.state) - did_present = self._fb_window.present(self._fb_pixels, force=force) - if did_present: - self._fb_dirty = False - self._fb_presented_once = True - - if self._fb_window.is_closed(): - self._fb_window = None - - def _fb_mark_dirty(self) -> None: - self._fb_dirty = True - self._fb_present(force=False) - - def _fb_hold_after_exit(self) -> None: - end_ns: int - - if self._fb_window is None: - return - - if self.fb_hold_ms <= 0 or self._fb_presented_once is False: - return - - end_ns = time.monotonic_ns() + (self.fb_hold_ms * 1_000_000) - - while time.monotonic_ns() < end_ns: - if self._fb_window is None: - return - - self._fb_window.pump_input(self.state) - if self._fb_window.is_closed(): - self._fb_window = None - return - - self._fb_window.present(self._fb_pixels, force=True) - time.sleep(0.016) - - def _init_default_fds(self) -> None: - self._fds = { - 0: FDEntry(kind="tty", flags=O_RDONLY, offset=0, tty_index=self._tty_index), - 1: FDEntry(kind="tty", flags=O_WRONLY, offset=0, tty_index=self._tty_index), - 2: FDEntry(kind="tty", flags=O_WRONLY, offset=0, tty_index=self._tty_index), - } - - for target in (0, 1, 2): - inherited = self._fd_inherited.get(target) - if inherited is not None: - self._fds[target] = self._clone_fd_entry(inherited) - - for target in (0, 1, 2): - entry = self._fds.get(target) - if entry is not None and entry.kind == "tty": - self._tty_index = int(entry.tty_index) - break - - def _fd_lookup(self, fd: int) -> Optional[FDEntry]: - if fd < 0 or fd >= FD_MAX: - return None - return self._fds.get(int(fd)) - - def _fd_find_free(self) -> int: - for fd in range(FD_MAX): - if fd not in self._fds: - return fd - return -1 - - def _stdio_entry_for_child(self, target_fd: int, override_fd: int, require_read: bool, require_write: bool) -> Optional[FDEntry]: - if override_fd == FD_INHERIT: - src = self._fd_lookup(target_fd) - else: - src = self._fd_lookup(override_fd) - - if src is None: - return None - - if require_read and not self._fd_can_read(src.flags): - return None - - if require_write and not self._fd_can_write(src.flags): - return None - - return self._clone_fd_entry(src) - - def _build_child_stdio_map(self, stdin_fd: int, stdout_fd: int, stderr_fd: int) -> Optional[Dict[int, FDEntry]]: - child_map: Dict[int, FDEntry] = {} - - in_entry = self._stdio_entry_for_child(0, stdin_fd, require_read=True, require_write=False) - out_entry = self._stdio_entry_for_child(1, stdout_fd, require_read=False, require_write=True) - err_entry = self._stdio_entry_for_child(2, stderr_fd, require_read=False, require_write=True) - - if in_entry is None or out_entry is None or err_entry is None: - return None - - child_map[0] = in_entry - child_map[1] = out_entry - child_map[2] = err_entry - return child_map - - def run(self) -> Optional[int]: - if self.pid == 0: - self.pid = self.state.alloc_pid(self.ppid) - - prev_pid = self.state.get_current_pid() - self.state.set_current_pid(self.pid) - self.state.set_proc_cmdline(self.pid, self.argv_items, self.env_items) - self.state.set_proc_fault(self.pid, 0, 0, 0, 0) - self.state.set_proc_running(self.pid, self.argv_items[0] if self.argv_items else self.guest_path_hint, self._tty_index) - - uc = Uc(UC_ARCH_X86, UC_MODE_64) - self._install_hooks(uc) - self._load_segments(uc) - self._prepare_stack_and_return(uc) - self._ensure_fb_window() - self._fb_present(force=True) - - if self.top_level and not self.no_kbd: - self._input_pump = InputPump(self.state) - self._input_pump.start() - - interrupted = False - runtime_fault_status: Optional[int] = None - - try: - uc.emu_start(self.image.entry, 0) - except KeyboardInterrupt: - interrupted = True - if self.top_level: - print("\n[WINE] interrupted by user", file=sys.stderr) - except UcError as exc: - runtime_fault_status = self._status_from_uc_error(uc, exc) - if self.verbose or self.top_level: - print(f"[WINE][ERROR] runtime crashed: {exc}", file=sys.stderr) - finally: - if self.top_level and self._input_pump is not None: - self._input_pump.stop() - if self._fb_window is not None: - self._fb_hold_after_exit() - if self._fb_window is not None: - self._fb_window.close() - self._fb_window = None - - if interrupted: - self.state.mark_exited(self.pid, u64_neg1()) - self.state.set_current_pid(prev_pid) - return None - - if runtime_fault_status is not None: - self.exit_code = runtime_fault_status - - if self.exit_code is None: - self.exit_code = self._reg_read(uc, UC_X86_REG_RAX) - - if self._exit_requested: - self.exit_code = self._exit_status - - self.exit_code = u64(self.exit_code) - self.state.mark_exited(self.pid, self.exit_code) - self.state.set_current_pid(prev_pid) - return self.exit_code - - def _install_hooks(self, uc: Uc) -> None: - uc.hook_add(UC_HOOK_INTR, self._hook_intr) - uc.hook_add(UC_HOOK_CODE, self._hook_code, begin=self._ret_sentinel, end=self._ret_sentinel) - - def _hook_code(self, uc: Uc, address: int, size: int, _user_data) -> None: - _ = size - if address == self._ret_sentinel: - self.exit_code = self._reg_read(uc, UC_X86_REG_RAX) - uc.emu_stop() - - def _hook_intr(self, uc: Uc, intno: int, _user_data) -> None: - if intno != 0x80: - raise UcError(1) - - syscall_id = self._reg_read(uc, UC_X86_REG_RAX) - arg0 = self._reg_read(uc, UC_X86_REG_RBX) - arg1 = self._reg_read(uc, UC_X86_REG_RCX) - arg2 = self._reg_read(uc, UC_X86_REG_RDX) - - self.state.record_syscall(syscall_id) - self.state.context_switches = u64(self.state.context_switches + 1) - ret = self._dispatch_syscall(uc, syscall_id, arg0, arg1, arg2) - self._reg_write(uc, UC_X86_REG_RAX, u64(ret)) - - def _dispatch_syscall(self, uc: Uc, sid: int, arg0: int, arg1: int, arg2: int) -> int: - self._fb_poll_window() - - if sid == SYS_LOG_WRITE: - data = self._read_guest_bytes(uc, arg0, arg1) - text = data.decode("utf-8", errors="replace") - self._host_write(text) - self.state.log_journal_push(text) - return len(data) - if sid == SYS_TIMER_TICKS: - return self.state.timer_ticks() - if sid == SYS_TASK_COUNT: - return self.state.task_count - if sid == SYS_CUR_TASK: - return self.state.current_task - if sid == SYS_SERVICE_COUNT: - return self.state.service_count - if sid == SYS_SERVICE_READY_COUNT: - return self.state.service_ready - if sid == SYS_CONTEXT_SWITCHES: - return self.state.context_switches - if sid == SYS_KELF_COUNT: - return self.state.kelf_count - if sid == SYS_KELF_RUNS: - return self.state.kelf_runs - if sid == SYS_FS_NODE_COUNT: - return self._fs_node_count() - if sid == SYS_FS_CHILD_COUNT: - return self._fs_child_count(uc, arg0) - if sid == SYS_FS_GET_CHILD_NAME: - return self._fs_get_child_name(uc, arg0, arg1, arg2) - if sid == SYS_FS_READ: - return self._fs_read(uc, arg0, arg1, arg2) - if sid == SYS_EXEC_PATH: - return self._exec_path(uc, arg0) - if sid == SYS_EXEC_PATHV: - return self._exec_pathv(uc, arg0, arg1, arg2) - if sid == SYS_EXEC_PATHV_IO: - return self._exec_pathv_io(uc, arg0, arg1, arg2) - if sid == SYS_SPAWN_PATH: - return self._spawn_path(uc, arg0) - if sid == SYS_SPAWN_PATHV: - return self._spawn_pathv(uc, arg0, arg1, arg2) - if sid == SYS_WAITPID: - return self._wait_pid(uc, arg0, arg1) - if sid == SYS_GETPID: - return self.state.get_current_pid() - if sid == SYS_PROC_ARGC: - return self._proc_argc() - if sid == SYS_PROC_ARGV: - return self._proc_argv(uc, arg0, arg1, arg2) - if sid == SYS_PROC_ENVC: - return self._proc_envc() - if sid == SYS_PROC_ENV: - return self._proc_env(uc, arg0, arg1, arg2) - if sid == SYS_PROC_LAST_SIGNAL: - return self._proc_last_signal() - if sid == SYS_PROC_FAULT_VECTOR: - return self._proc_fault_vector() - if sid == SYS_PROC_FAULT_ERROR: - return self._proc_fault_error() - if sid == SYS_PROC_FAULT_RIP: - return self._proc_fault_rip() - if sid == SYS_PROC_COUNT: - return self._proc_count() - if sid == SYS_PROC_PID_AT: - return self._proc_pid_at(uc, arg0, arg1) - if sid == SYS_PROC_SNAPSHOT: - return self._proc_snapshot(uc, arg0, arg1, arg2) - if sid == SYS_PROC_KILL: - return self._proc_kill(uc, arg0, arg1) - if sid == SYS_EXIT: - return self._request_exit(uc, arg0) - if sid == SYS_SLEEP_TICKS: - return self._sleep_ticks(arg0) - if sid == SYS_YIELD: - return self._yield_once() - if sid == SYS_AUDIO_AVAILABLE: - return 0 - if sid == SYS_AUDIO_PLAY_TONE: - return 0 - if sid == SYS_AUDIO_STOP: - return 1 - if sid == SYS_SHUTDOWN: - self._host_write("\n[WINE] shutdown requested by guest\n") - self._exit_requested = True - self._exit_status = 0 - return 1 - if sid == SYS_RESTART: - self._host_write("\n[WINE] restart requested by guest\n") - self._exit_requested = True - self._exit_status = 0 - return 1 - if sid == SYS_EXEC_REQUESTS: - return self.state.exec_requests - if sid == SYS_EXEC_SUCCESS: - return self.state.exec_success - if sid == SYS_USER_SHELL_READY: - return self.state.user_shell_ready - if sid == SYS_USER_EXEC_REQUESTED: - return self.state.user_exec_requested - if sid == SYS_USER_LAUNCH_TRIES: - return self.state.user_launch_tries - if sid == SYS_USER_LAUNCH_OK: - return self.state.user_launch_ok - if sid == SYS_USER_LAUNCH_FAIL: - return self.state.user_launch_fail - if sid == SYS_TTY_COUNT: - return self.state.tty_count - if sid == SYS_TTY_ACTIVE: - return self.state.tty_active - if sid == SYS_TTY_SWITCH: - if arg0 >= self.state.tty_count: - return u64_neg1() - self.state.tty_active = int(arg0) - return self.state.tty_active - if sid == SYS_TTY_WRITE: - data = self._read_guest_bytes(uc, arg0, arg1) - self._host_write(data.decode("utf-8", errors="replace")) - return len(data) - if sid == SYS_TTY_WRITE_CHAR: - ch = chr(arg0 & 0xFF) - if ch in ("\b", "\x7f"): - self._host_write("\b \b") - else: - self._host_write(ch) - return 1 - if sid == SYS_KBD_GET_CHAR: - key = self.state.pop_key() - return u64_neg1() if key is None else key - if sid == SYS_FS_STAT_TYPE: - return self._fs_stat_type(uc, arg0) - if sid == SYS_FS_STAT_SIZE: - return self._fs_stat_size(uc, arg0) - if sid == SYS_FS_MKDIR: - return self._fs_mkdir(uc, arg0) - if sid == SYS_FS_WRITE: - return self._fs_write(uc, arg0, arg1, arg2) - if sid == SYS_FS_APPEND: - return self._fs_append(uc, arg0, arg1, arg2) - if sid == SYS_FS_REMOVE: - return self._fs_remove(uc, arg0) - if sid == SYS_LOG_JOURNAL_COUNT: - return self.state.log_journal_count() - if sid == SYS_LOG_JOURNAL_READ: - return self._log_journal_read(uc, arg0, arg1, arg2) - if sid == SYS_KBD_BUFFERED: - return self.state.buffered_count() - if sid == SYS_KBD_PUSHED: - return self.state.kbd_push_count - if sid == SYS_KBD_POPPED: - return self.state.kbd_pop_count - if sid == SYS_KBD_DROPPED: - return self.state.kbd_drop_count - if sid == SYS_KBD_HOTKEY_SWITCHES: - return self.state.kbd_hotkey_switches - if sid == SYS_STATS_TOTAL: - return self.state.stats_total() - if sid == SYS_STATS_ID_COUNT: - return self.state.stats_id_count(arg0) - if sid == SYS_STATS_RECENT_WINDOW: - return self.state.stats_recent_window() - if sid == SYS_STATS_RECENT_ID: - return self.state.stats_recent_id_count(arg0) - if sid == SYS_FD_OPEN: - return self._fd_open(uc, arg0, arg1, arg2) - if sid == SYS_FD_READ: - return self._fd_read(uc, arg0, arg1, arg2) - if sid == SYS_FD_WRITE: - return self._fd_write(uc, arg0, arg1, arg2) - if sid == SYS_FD_CLOSE: - return self._fd_close(arg0) - if sid == SYS_FD_DUP: - return self._fd_dup(arg0) - if sid == SYS_DL_OPEN: - return self._dl_open(uc, arg0) - if sid == SYS_DL_CLOSE: - return self._dl_close(arg0) - if sid == SYS_DL_SYM: - return self._dl_sym(uc, arg0, arg1) - if sid == SYS_FB_INFO: - return self._fb_info(uc, arg0) - if sid == SYS_FB_BLIT: - return self._fb_blit(uc, arg0) - if sid == SYS_FB_CLEAR: - return self._fb_clear(arg0) - if sid == SYS_KERNEL_VERSION: - return self._kernel_version(uc, arg0, arg1) - if sid == SYS_DISK_PRESENT: - return 1 if self._disk_present else 0 - if sid == SYS_DISK_SIZE_BYTES: - return int(u64(self._disk_size_bytes if self._disk_present else 0)) - if sid == SYS_DISK_SECTOR_COUNT: - if not self._disk_present: - return 0 - return int(u64(self._disk_size_bytes // 512)) - if sid == SYS_DISK_FORMATTED: - return 1 if (self._disk_present and self._disk_formatted) else 0 - if sid == SYS_DISK_FORMAT_FAT32: - label = self._read_guest_cstring(uc, arg0, 16) if arg0 != 0 else "" - return self._disk_format_fat32(label) - if sid == SYS_DISK_MOUNT: - return self._disk_mount(uc, arg0) - if sid == SYS_DISK_MOUNTED: - return 1 if self._disk_mounted else 0 - if sid == SYS_DISK_MOUNT_PATH: - return self._disk_mount_path_query(uc, arg0, arg1) - if sid == SYS_DISK_READ_SECTOR: - sector_data = self._disk_read_sector_raw(int(arg0)) - if sector_data is None: - return 0 - return 1 if self._write_guest_bytes(uc, int(arg1), sector_data) else 0 - if sid == SYS_DISK_WRITE_SECTOR: - sector_data = self._read_guest_bytes_exact(uc, int(arg1), 512) - if sector_data is None: - return 0 - return 1 if self._disk_write_sector_raw(int(arg0), sector_data) else 0 - - return u64_neg1() - - def _host_write(self, text: str) -> None: - if not text: - return - sys.stdout.write(text) - sys.stdout.flush() - - def _host_write_bytes(self, data: bytes) -> None: - if not data: - return - if hasattr(sys.stdout, "buffer"): - sys.stdout.buffer.write(data) - sys.stdout.flush() - return - self._host_write(data.decode("utf-8", errors="replace")) - - def _load_segments(self, uc: Uc) -> None: - for seg in self.image.segments: - start = page_floor(seg.vaddr) - end = page_ceil(seg.vaddr + seg.memsz) - self._map_region(uc, start, end - start, UC_PROT_ALL) - - for seg in self.image.segments: - if seg.data: - self._mem_write(uc, seg.vaddr, seg.data) - - for seg in self.image.segments: - start = page_floor(seg.vaddr) - end = page_ceil(seg.vaddr + seg.memsz) - size = end - start - perms = 0 - if seg.flags & 0x4: - perms |= UC_PROT_READ - if seg.flags & 0x2: - perms |= UC_PROT_WRITE - if seg.flags & 0x1: - perms |= UC_PROT_EXEC - if perms == 0: - perms = UC_PROT_READ - try: - uc.mem_protect(start, size, perms) - except Exception: - pass - - def _prepare_stack_and_return(self, uc: Uc) -> None: - self._map_region(uc, self._stack_base, self._stack_size, UC_PROT_READ | UC_PROT_WRITE) - self._map_region(uc, self._ret_sentinel, PAGE_SIZE, UC_PROT_READ | UC_PROT_EXEC) - self._mem_write(uc, self._ret_sentinel, b"\x90") - - rsp = self._stack_base + self._stack_size - 8 - self._mem_write(uc, rsp, struct.pack(" None: - if size <= 0: - return - start = page_floor(addr) - end = page_ceil(addr + size) - - if self._is_range_mapped(start, end): - return - - uc.mem_map(start, end - start, perms) - self._mapped_ranges.append((start, end)) - - def _is_range_mapped(self, start: int, end: int) -> bool: - for ms, me in self._mapped_ranges: - if start >= ms and end <= me: - return True - return False - - def _range_overlaps_mapped(self, start: int, end: int) -> bool: - for ms, me in self._mapped_ranges: - if start < me and end > ms: - return True - return False - - @staticmethod - def _reg_read(uc: Uc, reg: int) -> int: - return int(uc.reg_read(reg)) - - @staticmethod - def _reg_write(uc: Uc, reg: int, value: int) -> None: - uc.reg_write(reg, u64(value)) - - @staticmethod - def _mem_write(uc: Uc, addr: int, data: bytes) -> None: - if addr == 0 or not data: - return - uc.mem_write(addr, data) - - def _read_guest_cstring(self, uc: Uc, addr: int, max_len: int = MAX_CSTR) -> str: - if addr == 0: - return "" - - out = bytearray() - for i in range(max_len): - try: - ch = uc.mem_read(addr + i, 1) - except UcError: - break - if not ch or ch[0] == 0: - break - out.append(ch[0]) - return out.decode("utf-8", errors="replace") - - def _read_guest_bytes(self, uc: Uc, addr: int, size: int) -> bytes: - if addr == 0 or size == 0: - return b"" - safe_size = int(min(size, MAX_IO_READ)) - try: - return bytes(uc.mem_read(addr, safe_size)) - except UcError: - return b"" - - def _read_guest_bytes_exact(self, uc: Uc, addr: int, size: int) -> Optional[bytes]: - if size < 0 or addr == 0: - return None - if size == 0: - return b"" - - out = bytearray() - cursor = int(addr) - left = int(size) - - while left > 0: - chunk = min(left, MAX_IO_READ) - try: - data = uc.mem_read(cursor, chunk) - except UcError: - return None - if len(data) != chunk: - return None - out.extend(data) - cursor += chunk - left -= chunk - - return bytes(out) - - def _write_guest_bytes(self, uc: Uc, addr: int, data: bytes) -> bool: - if addr == 0: - return False - try: - uc.mem_write(addr, data) - return True - except UcError: - return False - - @staticmethod - def _parse_elf_image_from_blob(data: bytes, *, require_entry: bool) -> ELFImage: - if len(data) < 64: - raise RuntimeError("ELF too small") - if data[0:4] != b"\x7fELF": - raise RuntimeError("invalid ELF magic") - if data[4] != 2 or data[5] != 1: - raise RuntimeError("unsupported ELF class/endianness") - - entry = struct.unpack_from(" len(data): - break - - p_type, p_flags, p_offset, p_vaddr, _p_paddr, p_filesz, p_memsz, _p_align = struct.unpack_from( - " 0: - if fo >= len(data): - seg_data = b"" - else: - seg_data = data[fo : min(len(data), fo + fs)] - else: - seg_data = b"" - - segments.append(ELFSegment(vaddr=int(p_vaddr), memsz=int(p_memsz), flags=int(p_flags), data=seg_data)) - - if not segments: - raise RuntimeError("ELF has no PT_LOAD segments") - - return ELFImage(entry=int(entry), segments=segments) - - @staticmethod - def _parse_elf(path: Path) -> ELFImage: - data = path.read_bytes() - try: - return CLeonOSWineNative._parse_elf_image_from_blob(data, require_entry=True) - except RuntimeError as exc: - raise RuntimeError(f"{exc}: {path}") from exc - - def _fs_node_count(self) -> int: - count = 1 - for _root, dirs, files in os.walk(self.rootfs): - dirs[:] = [d for d in dirs if not d.startswith(".")] - files = [f for f in files if not f.startswith(".")] - count += len(dirs) + len(files) - return count - - def _fs_child_count(self, uc: Uc, dir_ptr: int) -> int: - path = self._read_guest_cstring(uc, dir_ptr) - host_dir = self._guest_to_host(path, must_exist=True) - if host_dir is None or not host_dir.is_dir(): - return u64_neg1() - return len(self._list_children(host_dir)) - - def _fs_get_child_name(self, uc: Uc, dir_ptr: int, index: int, out_ptr: int) -> int: - if out_ptr == 0: - return 0 - path = self._read_guest_cstring(uc, dir_ptr) - host_dir = self._guest_to_host(path, must_exist=True) - if host_dir is None or not host_dir.is_dir(): - return 0 - - children = self._list_children(host_dir) - if index >= len(children): - return 0 - - name = children[int(index)] - encoded = name.encode("utf-8", errors="replace") - if len(encoded) >= FS_NAME_MAX: - encoded = encoded[: FS_NAME_MAX - 1] - - return 1 if self._write_guest_bytes(uc, out_ptr, encoded + b"\x00") else 0 - - def _fs_read(self, uc: Uc, path_ptr: int, out_ptr: int, buf_size: int) -> int: - if out_ptr == 0 or buf_size == 0: - return 0 - - path = self._read_guest_cstring(uc, path_ptr) - host_path = self._guest_to_host(path, must_exist=True) - if host_path is None or not host_path.is_file(): - return 0 - - read_size = int(min(buf_size, MAX_IO_READ)) - try: - data = host_path.read_bytes()[:read_size] - except Exception: - return 0 - - if not data: - return 0 - return len(data) if self._write_guest_bytes(uc, out_ptr, data) else 0 - - def _fs_stat_type(self, uc: Uc, path_ptr: int) -> int: - path = self._read_guest_cstring(uc, path_ptr) - host_path = self._guest_to_host(path, must_exist=True) - if host_path is None: - return u64_neg1() - if host_path.is_dir(): - return 2 - if host_path.is_file(): - return 1 - return u64_neg1() - - def _fs_stat_size(self, uc: Uc, path_ptr: int) -> int: - path = self._read_guest_cstring(uc, path_ptr) - host_path = self._guest_to_host(path, must_exist=True) - if host_path is None: - return u64_neg1() - if host_path.is_dir(): - return 0 - if host_path.is_file(): - try: - return host_path.stat().st_size - except Exception: - return u64_neg1() - return u64_neg1() - - @staticmethod - def _guest_path_is_under_temp(path: str) -> bool: - return path == "/temp" or path.startswith("/temp/") - - def _disk_path_is_under_mount(self, path: str) -> bool: - if not self._disk_mounted: - return False - normalized = self._normalize_guest_path(path) - mount = self._normalize_guest_path(self._disk_mount_path) - return normalized == mount or normalized.startswith(mount + "/") - - def _disk_guest_to_host(self, guest_path: str, *, must_exist: bool) -> Optional[Path]: - normalized = self._normalize_guest_path(guest_path) - mount = self._normalize_guest_path(self._disk_mount_path) - - if not self._disk_present or not self._disk_formatted or not self._disk_mounted: - return None - - if normalized == mount: - host = self._disk_root - elif normalized.startswith(mount + "/"): - rel = normalized[len(mount) + 1 :] - parts = [part for part in rel.split("/") if part] - host = self._disk_root.joinpath(*parts) if parts else self._disk_root - else: - return None - - if must_exist and not host.exists(): - return None - return host - - def _disk_prepare_raw_image(self) -> None: - target_size = int(self._disk_size_bytes) - - if target_size < 512: - raise RuntimeError("wine disk image size too small") - - if not self._disk_image_file.exists(): - with open(self._disk_image_file, "wb") as fp: - fp.truncate(target_size) - return - - current_size = int(self._disk_image_file.stat().st_size) - if current_size != target_size: - with open(self._disk_image_file, "r+b") as fp: - fp.truncate(target_size) - - def _disk_read_sector_raw(self, lba: int) -> Optional[bytes]: - if not self._disk_present: - return None - if lba < 0: - return None - offset = int(lba) * 512 - if offset + 512 > int(self._disk_size_bytes): - return None - try: - with open(self._disk_image_file, "rb") as fp: - fp.seek(offset) - data = fp.read(512) - if len(data) != 512: - return None - return data - except Exception: - return None - - def _disk_write_sector_raw(self, lba: int, data: bytes) -> bool: - if not self._disk_present: - return False - if lba < 0: - return False - if len(data) != 512: - return False - offset = int(lba) * 512 - if offset + 512 > int(self._disk_size_bytes): - return False - try: - with open(self._disk_image_file, "r+b") as fp: - fp.seek(offset) - fp.write(data) - fp.flush() - return True - except Exception: - return False - - def _disk_format_fat32(self, label: str) -> int: - _ = label - if not self._disk_present: - return 0 - - try: - self._disk_prepare_raw_image() - self._disk_root.mkdir(parents=True, exist_ok=True) - for child in list(self._disk_root.iterdir()): - if child.name == self._disk_marker.name or child.name == self._disk_image_file.name: - continue - if child.is_dir(): - shutil.rmtree(child) - else: - child.unlink() - with open(self._disk_image_file, "r+b") as fp: - fp.seek(0) - fp.write(b"\x00" * 512) - fp.flush() - self._disk_marker.write_text("FAT32\n", encoding="utf-8") - self._disk_formatted = True - return 1 - except Exception: - return 0 - - def _disk_mount(self, uc: Uc, mount_ptr: int) -> int: - mount_path = self._normalize_guest_path(self._read_guest_cstring(uc, mount_ptr, EXEC_PATH_MAX)) - - if not self._disk_present or not self._disk_formatted: - return 0 - - if mount_path == "/": - return 0 - - self._disk_mount_path = mount_path - self._disk_mounted = True - return 1 - - def _disk_mount_path_query(self, uc: Uc, out_ptr: int, out_size: int) -> int: - if out_ptr == 0 or out_size == 0: - return 0 - - if not self._disk_mounted: - return 0 - - payload = self._normalize_guest_path(self._disk_mount_path).encode("utf-8", errors="replace") - max_copy = int(out_size) - 1 - if max_copy < 0: - return 0 - if len(payload) > max_copy: - payload = payload[:max_copy] - return len(payload) if self._write_guest_bytes(uc, out_ptr, payload + b"\x00") else 0 - - def _fs_mkdir(self, uc: Uc, path_ptr: int) -> int: - path = self._normalize_guest_path(self._read_guest_cstring(uc, path_ptr)) - is_temp_path = self._guest_path_is_under_temp(path) - is_disk_path = self._disk_path_is_under_mount(path) - - if not is_temp_path and not is_disk_path: - return 0 - - if is_disk_path and not self._disk_formatted: - return 0 - - host_path = self._guest_to_host(path, must_exist=False) - if host_path is None: - return 0 - - if host_path.exists() and host_path.is_file(): - return 0 - - try: - host_path.mkdir(parents=True, exist_ok=True) - return 1 - except Exception: - return 0 - - def _fs_write_common(self, uc: Uc, path_ptr: int, data_ptr: int, size: int, append_mode: bool) -> int: - path = self._normalize_guest_path(self._read_guest_cstring(uc, path_ptr)) - is_temp_path = self._guest_path_is_under_temp(path) - is_disk_path = self._disk_path_is_under_mount(path) - disk_mount = self._normalize_guest_path(self._disk_mount_path) - - if not is_temp_path and not is_disk_path: - return 0 - - if is_disk_path and not self._disk_formatted: - return 0 - - if path == "/temp" or (is_disk_path and path == disk_mount): - return 0 - - if size < 0 or size > self.state.fs_write_max: - return 0 - - host_path = self._guest_to_host(path, must_exist=False) - if host_path is None: - return 0 - - if host_path.exists() and host_path.is_dir(): - return 0 - - data = b"" - if size > 0: - if data_ptr == 0: - return 0 - data = self._read_guest_bytes(uc, data_ptr, size) - if len(data) != int(size): - return 0 - - try: - host_path.parent.mkdir(parents=True, exist_ok=True) - mode = "ab" if append_mode else "wb" - with host_path.open(mode) as fh: - if data: - fh.write(data) - return 1 - except Exception: - return 0 - - def _fs_write(self, uc: Uc, path_ptr: int, data_ptr: int, size: int) -> int: - return self._fs_write_common(uc, path_ptr, data_ptr, size, append_mode=False) - - def _fs_append(self, uc: Uc, path_ptr: int, data_ptr: int, size: int) -> int: - return self._fs_write_common(uc, path_ptr, data_ptr, size, append_mode=True) - - def _fs_remove(self, uc: Uc, path_ptr: int) -> int: - path = self._normalize_guest_path(self._read_guest_cstring(uc, path_ptr)) - is_temp_path = self._guest_path_is_under_temp(path) - is_disk_path = self._disk_path_is_under_mount(path) - disk_mount = self._normalize_guest_path(self._disk_mount_path) - - if not is_temp_path and not is_disk_path: - return 0 - - if is_disk_path and not self._disk_formatted: - return 0 - - if path == "/temp" or (is_disk_path and path == disk_mount): - return 0 - - host_path = self._guest_to_host(path, must_exist=True) - if host_path is None: - return 0 - - try: - if host_path.is_dir(): - if any(host_path.iterdir()): - return 0 - host_path.rmdir() - else: - host_path.unlink() - return 1 - except Exception: - return 0 - - def _log_journal_read(self, uc: Uc, index_from_oldest: int, out_ptr: int, out_size: int) -> int: - if out_ptr == 0 or out_size == 0: - return 0 - - line = self.state.log_journal_read(int(index_from_oldest)) - if line is None: - return 0 - - encoded = line.encode("utf-8", errors="replace") - max_payload = int(out_size) - 1 - if max_payload < 0: - return 0 - - if len(encoded) > max_payload: - encoded = encoded[:max_payload] - - return 1 if self._write_guest_bytes(uc, out_ptr, encoded + b"\x00") else 0 - - def _exec_path(self, uc: Uc, path_ptr: int) -> int: - return self._spawn_path_common( - uc, - path_ptr, - 0, - 0, - return_pid=False, - env_line_override=None, - stdin_fd=FD_INHERIT, - stdout_fd=FD_INHERIT, - stderr_fd=FD_INHERIT, - ) - - def _exec_pathv(self, uc: Uc, path_ptr: int, argv_ptr: int, env_ptr: int) -> int: - return self._spawn_path_common( - uc, - path_ptr, - argv_ptr, - env_ptr, - return_pid=False, - env_line_override=None, - stdin_fd=FD_INHERIT, - stdout_fd=FD_INHERIT, - stderr_fd=FD_INHERIT, - ) - - def _exec_pathv_io(self, uc: Uc, path_ptr: int, argv_ptr: int, req_ptr: int) -> int: - req_data: Optional[bytes] - env_ptr: int - stdin_fd: int - stdout_fd: int - stderr_fd: int - env_line: str - - if req_ptr == 0: - return u64_neg1() - - req_data = self._read_guest_bytes_exact(uc, req_ptr, 32) - if req_data is None or len(req_data) != 32: - return u64_neg1() - - env_ptr, stdin_fd, stdout_fd, stderr_fd = struct.unpack(" int: - return self._spawn_path_common( - uc, - path_ptr, - 0, - 0, - return_pid=True, - env_line_override=None, - stdin_fd=FD_INHERIT, - stdout_fd=FD_INHERIT, - stderr_fd=FD_INHERIT, - ) - - def _spawn_pathv(self, uc: Uc, path_ptr: int, argv_ptr: int, env_ptr: int) -> int: - return self._spawn_path_common( - uc, - path_ptr, - argv_ptr, - env_ptr, - return_pid=True, - env_line_override=None, - stdin_fd=FD_INHERIT, - stdout_fd=FD_INHERIT, - stderr_fd=FD_INHERIT, - ) - - def _spawn_path_common( - self, - uc: Uc, - path_ptr: int, - argv_ptr: int, - env_ptr: int, - *, - return_pid: bool, - env_line_override: Optional[str], - stdin_fd: int, - stdout_fd: int, - stderr_fd: int, - ) -> int: - path = self._read_guest_cstring(uc, path_ptr, EXEC_PATH_MAX) - guest_path = self._normalize_guest_path(path) - argv_line = self._read_guest_cstring(uc, argv_ptr, EXEC_ARG_LINE_MAX) if argv_ptr != 0 else "" - env_line = ( - env_line_override - if env_line_override is not None - else (self._read_guest_cstring(uc, env_ptr, EXEC_ENV_LINE_MAX) if env_ptr != 0 else "") - ) - host_path = self._guest_to_host(guest_path, must_exist=True) - child_stdio = self._build_child_stdio_map(int(stdin_fd), int(stdout_fd), int(stderr_fd)) - - self.state.exec_requests = u64(self.state.exec_requests + 1) - self.state.user_exec_requested = 1 - self.state.user_launch_tries = u64(self.state.user_launch_tries + 1) - - if child_stdio is None: - self.state.user_launch_fail = u64(self.state.user_launch_fail + 1) - return u64_neg1() - - if host_path is None or not host_path.is_file(): - self.state.user_launch_fail = u64(self.state.user_launch_fail + 1) - return u64_neg1() - - if self.depth >= self.max_exec_depth: - print(f"[WINE][WARN] exec depth exceeded: {guest_path}", file=sys.stderr) - self.state.user_launch_fail = u64(self.state.user_launch_fail + 1) - return u64_neg1() - - parent_pid = self.state.get_current_pid() - child_pid = self.state.alloc_pid(parent_pid) - argv_items, env_items = self._build_child_exec_items(guest_path, argv_line, env_line) - self.state.set_proc_cmdline(child_pid, argv_items, env_items) - self.state.set_proc_fault(child_pid, 0, 0, 0, 0) - - child = CLeonOSWineNative( - elf_path=host_path, - rootfs=self.rootfs, - guest_path_hint=guest_path, - state=self.state, - depth=self.depth + 1, - max_exec_depth=self.max_exec_depth, - no_kbd=True, - verbose=self.verbose, - top_level=False, - pid=child_pid, - ppid=parent_pid, - argv_items=argv_items, - env_items=env_items, - inherited_fds=child_stdio, - fb_window=self.fb_window, - fb_scale=self.fb_scale, - fb_max_fps=self.fb_max_fps, - fb_hold_ms=self.fb_hold_ms, - ) - child_ret = child.run() - - if child_ret is None: - self.state.user_launch_fail = u64(self.state.user_launch_fail + 1) - return u64_neg1() - - self.state.exec_success = u64(self.state.exec_success + 1) - self.state.user_launch_ok = u64(self.state.user_launch_ok + 1) - - if guest_path.lower().startswith("/system/"): - self.state.kelf_runs = u64(self.state.kelf_runs + 1) - - if return_pid: - return child_pid - - return u64(child_ret) - - def _proc_argc(self) -> int: - return self.state.proc_argc(self.state.get_current_pid()) - - def _proc_argv(self, uc: Uc, index: int, out_ptr: int, out_size: int) -> int: - return self._copy_proc_item_to_guest( - uc, - self.state.proc_argv_item(self.state.get_current_pid(), int(index)), - out_ptr, - out_size, - ) - - def _proc_envc(self) -> int: - return self.state.proc_envc(self.state.get_current_pid()) - - def _proc_env(self, uc: Uc, index: int, out_ptr: int, out_size: int) -> int: - return self._copy_proc_item_to_guest( - uc, - self.state.proc_env_item(self.state.get_current_pid(), int(index)), - out_ptr, - out_size, - ) - - def _proc_last_signal(self) -> int: - return self.state.proc_signal(self.state.get_current_pid()) - - def _proc_fault_vector(self) -> int: - return self.state.proc_fault_vector_value(self.state.get_current_pid()) - - def _proc_fault_error(self) -> int: - return self.state.proc_fault_error_value(self.state.get_current_pid()) - - def _proc_fault_rip(self) -> int: - return self.state.proc_fault_rip_value(self.state.get_current_pid()) - - def _proc_count(self) -> int: - return self.state.proc_count() - - def _proc_pid_at(self, uc: Uc, index: int, out_ptr: int) -> int: - if out_ptr == 0: - return 0 - - pid = self.state.proc_pid_at(int(index)) - if pid is None: - return 0 - - return 1 if self._write_guest_bytes(uc, out_ptr, struct.pack(" int: - if out_ptr == 0 or out_size < (13 * 8 + PROC_PATH_MAX): - return 0 - - target = int(pid) - state_value = self.state.proc_state_value(target) - if state_value == 0: - return 0 - - path = self.state.proc_path_value(target) - encoded_path = path.encode("utf-8", errors="replace") - if len(encoded_path) >= PROC_PATH_MAX: - encoded_path = encoded_path[: PROC_PATH_MAX - 1] - path_buf = encoded_path + b"\x00" + (b"\x00" * (PROC_PATH_MAX - len(encoded_path) - 1)) - - blob = struct.pack( - "<13Q", - u64(target), - u64(self.state.proc_ppid(target)), - u64(state_value), - u64(self.state.proc_started_tick_value(target)), - u64(self.state.proc_exited_tick_value(target)), - u64(self.state.proc_exit_status_value(target)), - u64(self.state.proc_runtime_ticks(target)), - u64(self.state.proc_mem_bytes_value(target)), - u64(self.state.proc_tty_index_value(target)), - u64(self.state.proc_signal(target)), - u64(self.state.proc_fault_vector_value(target)), - u64(self.state.proc_fault_error_value(target)), - u64(self.state.proc_fault_rip_value(target)), - ) + path_buf - - return 1 if self._write_guest_bytes(uc, out_ptr, blob) else 0 - - def _proc_kill(self, uc: Uc, pid: int, signal: int) -> int: - target = int(pid) - if target <= 0: - return u64_neg1() - - current_state = self.state.proc_state_value(target) - if current_state == 0: - return u64_neg1() - - effective_signal = int(signal) & 0xFF - if effective_signal == 0: - effective_signal = SIGTERM - - self.state.set_proc_fault(target, effective_signal, 0, 0, 0) - - if current_state == PROC_STATE_EXITED: - return 1 - - if effective_signal == SIGCONT: - if current_state == PROC_STATE_STOPPED: - self.state.set_proc_pending(target) - return 1 - - if effective_signal == SIGSTOP and current_state in (PROC_STATE_PENDING, PROC_STATE_STOPPED): - self.state.set_proc_stopped(target) - return 1 - - status = self._encode_signal_status(effective_signal, 0, 0) - - if target == self.state.get_current_pid(): - self._exit_requested = True - self._exit_status = u64(status) - if effective_signal == SIGSTOP: - self.state.set_proc_stopped(target) - uc.emu_stop() - return 1 - - if effective_signal == SIGSTOP: - self.state.set_proc_stopped(target) - return 1 - - self.state.mark_exited(target, status) - return 1 - - def _wait_pid(self, uc: Uc, pid: int, out_ptr: int) -> int: - wait_ret, status = self.state.wait_pid(int(pid)) - - if wait_ret == 1 and out_ptr != 0: - self._write_guest_bytes(uc, out_ptr, struct.pack(" int: - self._exit_requested = True - self._exit_status = u64(status) - uc.emu_stop() - return 1 - - def _sleep_ticks(self, ticks: int) -> int: - ticks = int(u64(ticks)) - - if ticks == 0: - return 0 - - start = self.state.timer_ticks() - - while (self.state.timer_ticks() - start) < ticks: - time.sleep(0.001) - - return self.state.timer_ticks() - start - - def _yield_once(self) -> int: - time.sleep(0) - return self.state.timer_ticks() - - def _fd_open(self, uc: Uc, path_ptr: int, flags: int, mode: int) -> int: - _ = mode - guest_path = self._normalize_guest_path(self._read_guest_cstring(uc, path_ptr, EXEC_PATH_MAX)) - open_flags = int(u64(flags)) - lower_path = guest_path.lower() - fd_slot: int - entry: FDEntry - - if not guest_path.startswith("/"): - return u64_neg1() - - if not self._fd_access_mode_valid(open_flags): - return u64_neg1() - - if ((open_flags & O_TRUNC) != 0 or (open_flags & O_APPEND) != 0) and not self._fd_can_write(open_flags): - return u64_neg1() - - fd_slot = self._fd_find_free() - if fd_slot < 0: - return u64_neg1() - - if lower_path == "/dev/tty": - entry = FDEntry(kind="tty", flags=open_flags, offset=0, tty_index=self._tty_index) - self._fds[fd_slot] = entry - return fd_slot - - if lower_path == "/dev/null": - entry = FDEntry(kind="dev_null", flags=open_flags, offset=0, tty_index=self._tty_index) - self._fds[fd_slot] = entry - return fd_slot - - if lower_path == "/dev/zero": - entry = FDEntry(kind="dev_zero", flags=open_flags, offset=0, tty_index=self._tty_index) - self._fds[fd_slot] = entry - return fd_slot - - if lower_path == "/dev/random": - entry = FDEntry(kind="dev_random", flags=open_flags, offset=0, tty_index=self._tty_index) - self._fds[fd_slot] = entry - return fd_slot - - host_path = self._guest_to_host(guest_path, must_exist=False) - if host_path is None: - return u64_neg1() - - try: - if not host_path.exists(): - if (open_flags & O_CREAT) == 0 or not self._fd_can_write(open_flags): - return u64_neg1() - host_path.parent.mkdir(parents=True, exist_ok=True) - host_path.write_bytes(b"") - - if host_path.is_dir(): - return u64_neg1() - - if (open_flags & O_TRUNC) != 0: - host_path.write_bytes(b"") - - offset = int(host_path.stat().st_size) if (open_flags & O_APPEND) != 0 else 0 - except Exception: - return u64_neg1() - - entry = FDEntry(kind="file", flags=open_flags, offset=offset, path=str(host_path), tty_index=self._tty_index) - self._fds[fd_slot] = entry - return fd_slot - - def _fd_read(self, uc: Uc, fd: int, out_ptr: int, size: int) -> int: - req = int(u64(size)) - entry = self._fd_lookup(int(fd)) - data: bytes - - if req == 0: - return 0 - - if out_ptr == 0: - return u64_neg1() - - if entry is None or not self._fd_can_read(entry.flags): - return u64_neg1() - - req = min(req, MAX_IO_READ) - - if entry.kind == "tty": - out = bytearray() - while len(out) < req: - key = self.state.pop_key() - if key is None: - break - out.append(key & 0xFF) - data = bytes(out) - elif entry.kind == "dev_null": - return 0 - elif entry.kind == "dev_zero": - data = b"\x00" * req - elif entry.kind == "dev_random": - data = os.urandom(req) - elif entry.kind == "file": - try: - with open(entry.path, "rb") as fh: - fh.seek(entry.offset) - data = fh.read(req) - except Exception: - return u64_neg1() - else: - return u64_neg1() - - if len(data) == 0: - return 0 - - if not self._write_guest_bytes(uc, int(out_ptr), data): - return u64_neg1() - - entry.offset += len(data) - return len(data) - - def _fd_write(self, uc: Uc, fd: int, buf_ptr: int, size: int) -> int: - req = int(u64(size)) - entry = self._fd_lookup(int(fd)) - data: Optional[bytes] - write_pos: int - - if req == 0: - return 0 - - if buf_ptr == 0: - return u64_neg1() - - if entry is None or not self._fd_can_write(entry.flags): - return u64_neg1() - - req = min(req, MAX_IO_READ) - data = self._read_guest_bytes_exact(uc, int(buf_ptr), req) - if data is None: - return u64_neg1() - - if entry.kind == "tty": - self._host_write_bytes(data) - entry.offset += len(data) - return len(data) - - if entry.kind in ("dev_null", "dev_zero", "dev_random"): - entry.offset += len(data) - return len(data) - - if entry.kind != "file": - return u64_neg1() - - try: - host_path = Path(entry.path) - if not host_path.exists(): - if (entry.flags & O_CREAT) == 0 or not self._fd_can_write(entry.flags): - return u64_neg1() - host_path.parent.mkdir(parents=True, exist_ok=True) - host_path.write_bytes(b"") - - with open(host_path, "r+b") as fh: - write_pos = int(entry.offset) - fh.seek(write_pos) - fh.write(data) - except Exception: - return u64_neg1() - - entry.offset += len(data) - return len(data) - - def _fd_close(self, fd: int) -> int: - key = int(fd) - if key not in self._fds: - return u64_neg1() - del self._fds[key] - return 0 - - def _fd_dup(self, fd: int) -> int: - src = self._fd_lookup(int(fd)) - if src is None: - return u64_neg1() - - slot = self._fd_find_free() - if slot < 0: - return u64_neg1() - - self._fds[slot] = self._clone_fd_entry(src) - return slot - - def _dl_alloc_handle(self) -> int: - handle = int(u64(self._dl_next_handle)) - if handle == 0: - handle = 1 - - while handle in self._dl_images or handle == 0: - handle = int(u64(handle + 1)) - if handle == 0: - handle = 1 - - self._dl_next_handle = int(u64(handle + 1)) - if self._dl_next_handle == 0: - self._dl_next_handle = 1 - - return handle - - def _dl_pick_base(self, min_vaddr: int, max_vaddr: int) -> Tuple[int, int, int]: - old_base = page_floor(min_vaddr) - span = page_ceil(max_vaddr - old_base) - candidate = page_ceil(self._dl_next_base) - retries = 0 - - if span <= 0: - return 0, 0, 0 - - while retries < 1024: - start = candidate - end = start + span - - if not self._range_overlaps_mapped(start, end): - self._dl_next_base = end + DL_BASE_GAP - return start, end, start - old_base - - candidate = end + DL_BASE_GAP - retries += 1 - - return 0, 0, 0 - - @staticmethod - def _dl_rebase_non_exec_segment(data: bytes, old_base: int, old_end: int, delta: int) -> bytes: - if not data or delta == 0: - return data - - patched = bytearray(data) - limit = len(patched) - (len(patched) % 8) - - for off in range(0, limit, 8): - value = struct.unpack_from(" str: - if start < 0 or end <= start or start >= len(blob): - return "" - - limit = min(end, len(blob)) - cur = start - while cur < limit and blob[cur] != 0: - cur += 1 - - if cur <= start: - return "" - - return blob[start:cur].decode("utf-8", errors="ignore") - - @classmethod - def _dl_extract_symbols(cls, blob: bytes, load_bias: int) -> Dict[str, int]: - symbols: Dict[str, int] = {} - sections: List[Tuple[int, int, int, int, int, int, int, int, int, int]] = [] - - if len(blob) < 0x40: - return symbols - - try: - shoff = struct.unpack_from(" len(blob): - return symbols - - for idx in range(shnum): - off = shoff + (idx * shentsize) - try: - sh = struct.unpack_from("= len(sections): - continue - - if sh_entsize < 24: - sh_entsize = 24 - - if sh_offset < 0 or sh_size <= 0 or sh_offset >= len(blob): - continue - - sym_end = min(len(blob), sh_offset + sh_size) - if sym_end <= sh_offset: - continue - - strtab = sections[sh_link] - str_off = int(strtab[4]) - str_size = int(strtab[5]) - - if str_size <= 0 or str_off < 0 or str_off >= len(blob): - continue - - str_end = min(len(blob), str_off + str_size) - count = (sym_end - sh_offset) // sh_entsize - - for i in range(count): - ent_off = sh_offset + (i * sh_entsize) - if ent_off + 24 > len(blob): - break - - try: - st_name, _st_info, _st_other, st_shndx, st_value, _st_size = struct.unpack_from( - " int: - guest_path = self._normalize_guest_path(self._read_guest_cstring(uc, path_ptr, DL_MAX_NAME)) - cached_handle = self._dl_path_to_handle.get(guest_path) - host_path: Optional[Path] - file_blob: bytes - image: ELFImage - e_type: int - min_vaddr: int - max_vaddr: int - map_start: int - map_end: int - load_bias: int - handle: int - owner_pid: int - - if not guest_path.startswith("/") or guest_path == "/": - return u64_neg1() - if len(guest_path.encode("utf-8", errors="replace")) >= DL_MAX_NAME: - return u64_neg1() - - if cached_handle is not None: - cached = self._dl_images.get(cached_handle) - if cached is not None: - cached.ref_count += 1 - return cached.handle - - host_path = self._guest_to_host(guest_path, must_exist=True) - if host_path is None or not host_path.is_file(): - return u64_neg1() - - try: - file_blob = host_path.read_bytes() - image = self._parse_elf_image_from_blob(file_blob, require_entry=False) - e_type = struct.unpack_from(" int: - key = int(handle) - image = self._dl_images.get(key) - - if key == 0 or image is None: - return u64_neg1() - - if image.ref_count > 1: - image.ref_count -= 1 - return 0 - - del self._dl_images[key] - if self._dl_path_to_handle.get(image.guest_path) == key: - del self._dl_path_to_handle[image.guest_path] - return 0 - - def _dl_sym(self, uc: Uc, handle: int, symbol_ptr: int) -> int: - symbol = self._read_guest_cstring(uc, symbol_ptr, DL_MAX_SYMBOL) - image = self._dl_images.get(int(handle)) - addr: Optional[int] - - if image is None or not symbol: - return 0 - - addr = image.symbols.get(symbol) - if addr is None and not symbol.startswith("_"): - addr = image.symbols.get(f"_{symbol}") - - return int(u64(addr)) if addr is not None else 0 - - def _kernel_version(self, uc: Uc, out_ptr: int, out_size: int) -> int: - if out_ptr == 0 or out_size <= 0: - return 0 - - max_copy = int(out_size) - 1 - if max_copy < 0: - return 0 - - payload = CLKS_VERSION_STRING.encode("utf-8", errors="replace") - if len(payload) > max_copy: - payload = payload[:max_copy] - - if not self._write_guest_bytes(uc, int(out_ptr), payload + b"\x00"): - return 0 - - return len(payload) - - def _fb_info(self, uc: Uc, out_ptr: int) -> int: - if out_ptr == 0: - return 0 - - self._ensure_fb_window() - - payload = struct.pack( - " int: - if len(self._fb_pixels) == 0: - return 0 - - pixel = struct.pack(" int: - req_blob = self._read_guest_bytes_exact(uc, int(req_ptr), 56) - pixels_ptr: int - src_width: int - src_height: int - src_pitch_bytes: int - dst_x: int - dst_y: int - scale: int - total_src_bytes: int - src_blob: Optional[bytes] - max_src_w: int - max_src_h: int - copy_w: int - copy_h: int - - if req_ptr == 0 or req_blob is None or len(req_blob) != 56: - return 0 - - pixels_ptr, src_width, src_height, src_pitch_bytes, dst_x, dst_y, scale = struct.unpack(" 4096 or src_height > 4096 or scale > 8: - return 0 - - if src_pitch_bytes == 0: - src_pitch_bytes = src_width * 4 - - if src_pitch_bytes < (src_width * 4): - return 0 - - if src_height > (u64_neg1() // src_pitch_bytes): - return 0 - - if dst_x >= self._fb_width or dst_y >= self._fb_height: - return 0 - - total_src_bytes = src_pitch_bytes * src_height - if total_src_bytes == 0 or total_src_bytes > FB_MAX_UPLOAD_BYTES: - return 0 - - src_blob = self._read_guest_bytes_exact(uc, pixels_ptr, total_src_bytes) - if src_blob is None: - return 0 - - max_src_w = (self._fb_width - dst_x + scale - 1) // scale - max_src_h = (self._fb_height - dst_y + scale - 1) // scale - copy_w = min(src_width, max_src_w) - copy_h = min(src_height, max_src_h) - - if copy_w <= 0 or copy_h <= 0: - return 0 - - if scale == 1: - row_bytes = copy_w * 4 - for y in range(copy_h): - src_off = y * src_pitch_bytes - dst_off = ((dst_y + y) * self._fb_width + dst_x) * 4 - self._fb_pixels[dst_off : dst_off + row_bytes] = src_blob[src_off : src_off + row_bytes] - else: - draw_row_pixels = min(self._fb_width - dst_x, copy_w * scale) - draw_row_bytes = draw_row_pixels * 4 - - for y in range(copy_h): - src_row_off = y * src_pitch_bytes - src_row = memoryview(src_blob)[src_row_off : src_row_off + (copy_w * 4)] - expanded = bytearray(copy_w * scale * 4) - write_off = 0 - - for x in range(copy_w): - pixel = bytes(src_row[x * 4 : (x + 1) * 4]) - expanded[write_off : write_off + (scale * 4)] = pixel * scale - write_off += scale * 4 - - draw_y = dst_y + (y * scale) - repeat_h = min(scale, self._fb_height - draw_y) - row_data = expanded[:draw_row_bytes] - - for sy in range(repeat_h): - dst_off = ((draw_y + sy) * self._fb_width + dst_x) * 4 - self._fb_pixels[dst_off : dst_off + draw_row_bytes] = row_data - - self._fb_mark_dirty() - - return 1 - - @staticmethod - def _truncate_item_text(text: str, max_bytes: int = EXEC_ITEM_MAX) -> str: - if max_bytes <= 1: - return "" - encoded = (text or "").encode("utf-8", errors="replace") - if len(encoded) >= max_bytes: - encoded = encoded[: max_bytes - 1] - return encoded.decode("utf-8", errors="ignore") - - @staticmethod - def _parse_whitespace_items(line: str, max_count: int) -> List[str]: - out: List[str] = [] - i = 0 - text = line or "" - length = len(text) - - while i < length: - while i < length and text[i] in (" ", "\t", "\r", "\n"): - i += 1 - if i >= length: - break - start = i - while i < length and text[i] not in (" ", "\t", "\r", "\n"): - i += 1 - out.append(text[start:i]) - if len(out) >= max_count: - break - - return out - - @staticmethod - def _parse_env_items(line: str, max_count: int) -> List[str]: - out: List[str] = [] - i = 0 - text = line or "" - length = len(text) - - while i < length: - while i < length and text[i] in (" ", "\t", ";", "\r", "\n"): - i += 1 - if i >= length: - break - - start = i - while i < length and text[i] not in (";", "\r", "\n"): - i += 1 - value = text[start:i].rstrip(" \t") - if value: - out.append(value) - if len(out) >= max_count: - break - - return out - - @classmethod - def _prepare_exec_items(cls, path: str, argv_items: List[str], env_items: List[str]) -> Tuple[List[str], List[str]]: - normalized_path = cls._normalize_guest_path(path) - - prepared_argv: List[str] = [] - for item in argv_items[:EXEC_MAX_ARGS]: - value = cls._truncate_item_text(item) - if value: - prepared_argv.append(value) - - if not prepared_argv: - prepared_argv = [cls._truncate_item_text(normalized_path)] - elif prepared_argv[0] == "": - prepared_argv[0] = cls._truncate_item_text(normalized_path) - - prepared_env: List[str] = [] - for item in env_items[:EXEC_MAX_ENVS]: - value = cls._truncate_item_text(item) - if value: - prepared_env.append(value) - - return prepared_argv[:EXEC_MAX_ARGS], prepared_env[:EXEC_MAX_ENVS] - - @classmethod - def _build_child_exec_items(cls, guest_path: str, argv_line: str, env_line: str) -> Tuple[List[str], List[str]]: - argv_items = [guest_path] - argv_items.extend(cls._parse_whitespace_items(argv_line or "", EXEC_MAX_ARGS - 1)) - env_items = cls._parse_env_items(env_line or "", EXEC_MAX_ENVS) - return cls._prepare_exec_items(guest_path, argv_items, env_items) - - def _copy_proc_item_to_guest(self, uc: Uc, item: Optional[str], out_ptr: int, out_size: int) -> int: - if out_ptr == 0 or out_size == 0 or item is None: - return 0 - - max_out = int(min(int(out_size), EXEC_ITEM_MAX)) - if max_out <= 0: - return 0 - - encoded = self._truncate_item_text(item, max_out + 1).encode("utf-8", errors="replace") - if len(encoded) >= max_out: - encoded = encoded[: max_out - 1] - - return 1 if self._write_guest_bytes(uc, out_ptr, encoded + b"\x00") else 0 - - @staticmethod - def _signal_from_vector(vector: int) -> int: - if vector in (0, 16, 19): - return 8 - if vector == 6: - return 4 - if vector == 3: - return 5 - if vector in (10, 11, 12, 13, 14, 17): - return 11 - return 6 - - @staticmethod - def _encode_signal_status(signal: int, vector: int, error_code: int) -> int: - return u64( - EXEC_STATUS_SIGNAL_FLAG - | (int(signal) & 0xFF) - | ((int(vector) & 0xFF) << 8) - | ((int(error_code) & 0xFFFF) << 16) - ) - - def _status_from_uc_error(self, uc: Uc, exc: UcError) -> int: - errno = int(getattr(exc, "errno", 0)) - vector = 13 - error_code = 0 - - if errno in (UC_ERR_READ_UNMAPPED, UC_ERR_WRITE_UNMAPPED, UC_ERR_FETCH_UNMAPPED): - vector = 14 - if errno == UC_ERR_WRITE_UNMAPPED: - error_code = 0x2 - elif errno == UC_ERR_FETCH_UNMAPPED: - error_code = 0x10 - elif errno in (UC_ERR_READ_PROT, UC_ERR_WRITE_PROT, UC_ERR_FETCH_PROT): - vector = 14 - error_code = 0x1 - if errno == UC_ERR_WRITE_PROT: - error_code |= 0x2 - elif errno == UC_ERR_FETCH_PROT: - error_code |= 0x10 - elif errno == UC_ERR_INSN_INVALID: - vector = 6 - error_code = 0 - - rip = 0 - try: - rip = self._reg_read(uc, UC_X86_REG_RIP) - except Exception: - rip = 0 - - signal = self._signal_from_vector(vector) - self.state.set_proc_fault(self.pid, signal, vector, error_code, rip) - return self._encode_signal_status(signal, vector, error_code) - - def _guest_to_host(self, guest_path: str, *, must_exist: bool) -> Optional[Path]: - norm = self._normalize_guest_path(guest_path) - disk_host = self._disk_guest_to_host(norm, must_exist=must_exist) - - if disk_host is not None: - return disk_host - - if norm == "/": - return self.rootfs if (not must_exist or self.rootfs.exists()) else None - - current = self.rootfs - for part in [p for p in norm.split("/") if p]: - candidate = current / part - if candidate.exists(): - current = candidate - continue - - if current.exists() and current.is_dir(): - match = self._find_case_insensitive(current, part) - if match is not None: - current = match - continue - - current = candidate - - if must_exist and not current.exists(): - return None - return current - - @staticmethod - def _find_case_insensitive(parent: Path, name: str) -> Optional[Path]: - target = name.lower() - try: - for entry in parent.iterdir(): - if entry.name.lower() == target: - return entry - except Exception: - return None - return None - - @staticmethod - def _normalize_guest_path(path: str) -> str: - p = (path or "").replace("\\", "/").strip() - if not p: - return "/" - if not p.startswith("/"): - p = "/" + p - - parts = [] - for token in p.split("/"): - if token in ("", "."): - continue - if token == "..": - if parts: - parts.pop() - continue - parts.append(token) - - return "/" + "/".join(parts) - - @staticmethod - def _list_children(dir_path: Path) -> List[str]: - try: - names = [entry.name for entry in dir_path.iterdir() if not entry.name.startswith(".")] - except Exception: - return [] - names.sort(key=lambda x: x.lower()) - return names - - -def resolve_rootfs(path_arg: Optional[str]) -> Path: - if path_arg: - root = Path(path_arg).expanduser().resolve() - if not root.exists() or not root.is_dir(): - raise FileNotFoundError(f"rootfs not found: {root}") - return root - - candidates = [ - Path("build/x86_64/ramdisk_root"), - Path("ramdisk"), - ] - for candidate in candidates: - if candidate.exists() and candidate.is_dir(): - return candidate.resolve() - - raise FileNotFoundError("rootfs not found; pass --rootfs") - - -def _guest_to_host_for_resolve(rootfs: Path, guest_path: str) -> Optional[Path]: - norm = CLeonOSWineNative._normalize_guest_path(guest_path) - if norm == "/": - return rootfs - - current = rootfs - for part in [p for p in norm.split("/") if p]: - candidate = current / part - if candidate.exists(): - current = candidate - continue - - if current.exists() and current.is_dir(): - match = None - for entry in current.iterdir(): - if entry.name.lower() == part.lower(): - match = entry - break - if match is not None: - current = match - continue - - current = candidate - - return current if current.exists() else None - - -def resolve_elf_target(elf_arg: str, rootfs: Path) -> Tuple[Path, str]: - host_candidate = Path(elf_arg).expanduser() - if host_candidate.exists(): - host_path = host_candidate.resolve() - try: - rel = host_path.relative_to(rootfs) - guest_path = "/" + rel.as_posix() - except ValueError: - guest_path = "/" + host_path.name - return host_path, guest_path - - guest_path = CLeonOSWineNative._normalize_guest_path(elf_arg) - host_path = _guest_to_host_for_resolve(rootfs, guest_path) - if host_path is None: - raise FileNotFoundError(f"ELF not found as host path or guest path: {elf_arg}") - return host_path.resolve(), guest_path diff --git a/wine/cleonos_wine_lib/state.py b/wine/cleonos_wine_lib/state.py deleted file mode 100644 index 7c5ce38..0000000 --- a/wine/cleonos_wine_lib/state.py +++ /dev/null @@ -1,354 +0,0 @@ -from __future__ import annotations - -import collections -import threading -import time -from dataclasses import dataclass, field -from typing import Deque, Dict, List, Optional, Tuple - -from .constants import PROC_STATE_EXITED, PROC_STATE_PENDING, PROC_STATE_RUNNING, PROC_STATE_STOPPED, u64 - - -@dataclass -class SharedKernelState: - start_ns: int = field(default_factory=time.monotonic_ns) - task_count: int = 5 - current_task: int = 0 - service_count: int = 7 - service_ready: int = 7 - context_switches: int = 0 - kelf_count: int = 2 - kelf_runs: int = 0 - exec_requests: int = 0 - exec_success: int = 0 - user_shell_ready: int = 1 - user_exec_requested: int = 0 - user_launch_tries: int = 0 - user_launch_ok: int = 0 - user_launch_fail: int = 0 - tty_count: int = 4 - tty_active: int = 0 - kbd_queue: Deque[int] = field(default_factory=collections.deque) - kbd_lock: threading.Lock = field(default_factory=threading.Lock) - kbd_queue_cap: int = 256 - kbd_drop_count: int = 0 - kbd_push_count: int = 0 - kbd_pop_count: int = 0 - kbd_hotkey_switches: int = 0 - log_journal_cap: int = 256 - log_journal: Deque[str] = field(default_factory=lambda: collections.deque(maxlen=256)) - fs_write_max: int = 16 * 1024 * 1024 - - # syscall stats - stats_lock: threading.Lock = field(default_factory=threading.Lock) - stats_total_calls: int = 0 - stats_id_total: Dict[int, int] = field(default_factory=dict) - stats_recent_window_cap: int = 256 - stats_recent_ring: Deque[int] = field(default_factory=lambda: collections.deque(maxlen=256)) - - # process table - proc_lock: threading.Lock = field(default_factory=threading.Lock) - proc_next_pid: int = 1 - proc_current_pid: int = 0 - proc_parents: Dict[int, int] = field(default_factory=dict) - proc_status: Dict[int, Optional[int]] = field(default_factory=dict) - proc_state: Dict[int, int] = field(default_factory=dict) - proc_started_tick: Dict[int, int] = field(default_factory=dict) - proc_exited_tick: Dict[int, int] = field(default_factory=dict) - proc_mem_bytes: Dict[int, int] = field(default_factory=dict) - proc_tty_index: Dict[int, int] = field(default_factory=dict) - proc_path: Dict[int, str] = field(default_factory=dict) - proc_argv: Dict[int, List[str]] = field(default_factory=dict) - proc_env: Dict[int, List[str]] = field(default_factory=dict) - proc_last_signal: Dict[int, int] = field(default_factory=dict) - proc_fault_vector: Dict[int, int] = field(default_factory=dict) - proc_fault_error: Dict[int, int] = field(default_factory=dict) - proc_fault_rip: Dict[int, int] = field(default_factory=dict) - - def timer_ticks(self) -> int: - return (time.monotonic_ns() - self.start_ns) // 1_000_000 - - def record_syscall(self, sid: int) -> None: - key = int(u64(sid)) - - with self.stats_lock: - self.stats_total_calls = int(u64(self.stats_total_calls + 1)) - self.stats_id_total[key] = int(u64(self.stats_id_total.get(key, 0) + 1)) - self.stats_recent_ring.append(key) - - def stats_total(self) -> int: - with self.stats_lock: - return int(self.stats_total_calls) - - def stats_id_count(self, sid: int) -> int: - key = int(u64(sid)) - with self.stats_lock: - return int(self.stats_id_total.get(key, 0)) - - def stats_recent_window(self) -> int: - with self.stats_lock: - return len(self.stats_recent_ring) - - def stats_recent_id_count(self, sid: int) -> int: - key = int(u64(sid)) - with self.stats_lock: - return sum(1 for item in self.stats_recent_ring if item == key) - - def push_key(self, key: int) -> None: - with self.kbd_lock: - if len(self.kbd_queue) >= self.kbd_queue_cap: - self.kbd_queue.popleft() - self.kbd_drop_count = u64(self.kbd_drop_count + 1) - self.kbd_queue.append(key & 0xFF) - self.kbd_push_count = u64(self.kbd_push_count + 1) - - def pop_key(self) -> Optional[int]: - with self.kbd_lock: - if not self.kbd_queue: - return None - self.kbd_pop_count = u64(self.kbd_pop_count + 1) - return self.kbd_queue.popleft() - - def buffered_count(self) -> int: - with self.kbd_lock: - return len(self.kbd_queue) - - def log_journal_push(self, text: str) -> None: - if text is None: - return - - normalized = text.replace("\r", "") - lines = normalized.split("\n") - - for line in lines: - if len(line) > 255: - line = line[:255] - self.log_journal.append(line) - - def log_journal_count(self) -> int: - return len(self.log_journal) - - def log_journal_read(self, index_from_oldest: int) -> Optional[str]: - if index_from_oldest < 0 or index_from_oldest >= len(self.log_journal): - return None - return list(self.log_journal)[index_from_oldest] - - def alloc_pid(self, ppid: int) -> int: - now = self.timer_ticks() - - with self.proc_lock: - pid = int(self.proc_next_pid) - - if pid == 0: - pid = 1 - - self.proc_next_pid = int(u64(pid + 1)) - - if self.proc_next_pid == 0: - self.proc_next_pid = 1 - - self.proc_parents[pid] = int(ppid) - self.proc_status[pid] = None - self.proc_state[pid] = PROC_STATE_PENDING - self.proc_started_tick[pid] = now - self.proc_exited_tick[pid] = 0 - self.proc_mem_bytes[pid] = 0 - self.proc_tty_index[pid] = int(self.tty_active) - self.proc_path[pid] = "" - self.proc_argv[pid] = [] - self.proc_env[pid] = [] - self.proc_last_signal[pid] = 0 - self.proc_fault_vector[pid] = 0 - self.proc_fault_error[pid] = 0 - self.proc_fault_rip[pid] = 0 - return pid - - def set_current_pid(self, pid: int) -> None: - with self.proc_lock: - self.proc_current_pid = int(pid) - - def get_current_pid(self) -> int: - with self.proc_lock: - return int(self.proc_current_pid) - - def set_proc_running(self, pid: int, path: Optional[str], tty_index: int) -> None: - if pid <= 0: - return - - now = self.timer_ticks() - - with self.proc_lock: - if pid not in self.proc_status: - return - - self.proc_state[pid] = PROC_STATE_RUNNING - if self.proc_started_tick.get(pid, 0) == 0: - self.proc_started_tick[pid] = now - self.proc_tty_index[pid] = int(tty_index) - if path: - self.proc_path[pid] = str(path) - - def mark_exited(self, pid: int, status: int) -> None: - if pid <= 0: - return - - with self.proc_lock: - self.proc_status[int(pid)] = int(u64(status)) - self.proc_state[int(pid)] = PROC_STATE_EXITED - self.proc_exited_tick[int(pid)] = self.timer_ticks() - - def set_proc_stopped(self, pid: int) -> None: - if pid <= 0: - return - - with self.proc_lock: - if pid not in self.proc_status: - return - if self.proc_state.get(pid, PROC_STATE_PENDING) != PROC_STATE_EXITED: - self.proc_state[pid] = PROC_STATE_STOPPED - - def set_proc_pending(self, pid: int) -> None: - if pid <= 0: - return - - with self.proc_lock: - if pid not in self.proc_status: - return - if self.proc_state.get(pid, PROC_STATE_PENDING) != PROC_STATE_EXITED: - self.proc_state[pid] = PROC_STATE_PENDING - - def wait_pid(self, pid: int) -> Tuple[int, int]: - with self.proc_lock: - if pid not in self.proc_status: - return int(u64(-1)), 0 - - status = self.proc_status[pid] - - if status is None: - return 0, 0 - - return 1, int(status) - - def set_proc_cmdline(self, pid: int, argv: List[str], env: List[str]) -> None: - if pid <= 0: - return - - with self.proc_lock: - if pid not in self.proc_status: - return - self.proc_argv[pid] = [str(item) for item in argv] - self.proc_env[pid] = [str(item) for item in env] - if argv: - self.proc_path[pid] = str(argv[0]) - - def set_proc_fault(self, pid: int, signal: int, vector: int, error_code: int, rip: int) -> None: - if pid <= 0: - return - - with self.proc_lock: - if pid not in self.proc_status: - return - self.proc_last_signal[pid] = int(u64(signal)) - self.proc_fault_vector[pid] = int(u64(vector)) - self.proc_fault_error[pid] = int(u64(error_code)) - self.proc_fault_rip[pid] = int(u64(rip)) - - def proc_count(self) -> int: - with self.proc_lock: - return len(self.proc_status) - - def proc_pid_at(self, index: int) -> Optional[int]: - if index < 0: - return None - - with self.proc_lock: - ordered = sorted(self.proc_status.keys()) - if index >= len(ordered): - return None - return int(ordered[index]) - - def proc_state_value(self, pid: int) -> int: - with self.proc_lock: - return int(self.proc_state.get(int(pid), 0)) - - def proc_ppid(self, pid: int) -> int: - with self.proc_lock: - return int(self.proc_parents.get(int(pid), 0)) - - def proc_started_tick_value(self, pid: int) -> int: - with self.proc_lock: - return int(self.proc_started_tick.get(int(pid), 0)) - - def proc_exited_tick_value(self, pid: int) -> int: - with self.proc_lock: - return int(self.proc_exited_tick.get(int(pid), 0)) - - def proc_exit_status_value(self, pid: int) -> int: - with self.proc_lock: - value = self.proc_status.get(int(pid)) - return int(value) if value is not None else 0 - - def proc_runtime_ticks(self, pid: int) -> int: - with self.proc_lock: - start = int(self.proc_started_tick.get(int(pid), 0)) - state = int(self.proc_state.get(int(pid), 0)) - end = int(self.proc_exited_tick.get(int(pid), 0)) - - if start == 0: - return 0 - - if state == PROC_STATE_EXITED and end >= start: - return end - start - - now = self.timer_ticks() - return 0 if now < start else now - start - - def proc_mem_bytes_value(self, pid: int) -> int: - with self.proc_lock: - return int(self.proc_mem_bytes.get(int(pid), 0)) - - def proc_tty_index_value(self, pid: int) -> int: - with self.proc_lock: - return int(self.proc_tty_index.get(int(pid), 0)) - - def proc_path_value(self, pid: int) -> str: - with self.proc_lock: - return str(self.proc_path.get(int(pid), "")) - - def proc_argc(self, pid: int) -> int: - with self.proc_lock: - return len(self.proc_argv.get(int(pid), [])) - - def proc_argv_item(self, pid: int, index: int) -> Optional[str]: - with self.proc_lock: - values = self.proc_argv.get(int(pid), []) - if index < 0 or index >= len(values): - return None - return values[index] - - def proc_envc(self, pid: int) -> int: - with self.proc_lock: - return len(self.proc_env.get(int(pid), [])) - - def proc_env_item(self, pid: int, index: int) -> Optional[str]: - with self.proc_lock: - values = self.proc_env.get(int(pid), []) - if index < 0 or index >= len(values): - return None - return values[index] - - def proc_signal(self, pid: int) -> int: - with self.proc_lock: - return int(self.proc_last_signal.get(int(pid), 0)) - - def proc_fault_vector_value(self, pid: int) -> int: - with self.proc_lock: - return int(self.proc_fault_vector.get(int(pid), 0)) - - def proc_fault_error_value(self, pid: int) -> int: - with self.proc_lock: - return int(self.proc_fault_error.get(int(pid), 0)) - - def proc_fault_rip_value(self, pid: int) -> int: - with self.proc_lock: - return int(self.proc_fault_rip.get(int(pid), 0)) diff --git a/wine/requirements.txt b/wine/requirements.txt deleted file mode 100644 index c501798..0000000 --- a/wine/requirements.txt +++ /dev/null @@ -1,2 +0,0 @@ -unicorn>=2.0.1 -pygame>=2.5.2