Wine拆分

This commit is contained in:
2026-04-23 21:53:13 +08:00
parent 439479a3fa
commit 881b62b5d8
24 changed files with 4 additions and 4552 deletions

3
.gitmodules vendored
View File

@@ -7,3 +7,6 @@
[submodule "cleonos/third-party/doomgeneric"] [submodule "cleonos/third-party/doomgeneric"]
path = cleonos/third-party/doomgeneric path = cleonos/third-party/doomgeneric
url = https://github.com/CLeonOS/doom.git url = https://github.com/CLeonOS/doom.git
[submodule "wine"]
path = wine
url = https://github.com/CLeonOS/wine

1
wine Submodule

Submodule wine added at 2071c8ea46

View File

@@ -1,51 +0,0 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction, and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all other entities that control, are controlled by, or are under common control with that entity. For the purposes of this definition, "control" means (i) the power, direct or indirect, to cause the direction or management of such entity, whether by contract or otherwise, or (ii) ownership of fifty percent (50%) or more of the outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications, including but not limited to software source code, documentation source, and configuration files.
"Object" form shall mean any form resulting from mechanical transformation or translation of a Source form, including but not limited to compiled object code, generated documentation, and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or Object form, made available under the License, as indicated by a copyright notice that is included in or attached to the work (an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object form, that is based on (or derived from) the Work and for which the editorial revisions, annotations, elaborations, or other modifications represent, as a whole, an original work of authorship. For the purposes of this License, Derivative Works shall not include works that remain separable from, or merely link (or bind by name) to the interfaces of, the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including the original version of the Work and any modifications or additions to that Work or Derivative Works thereof, that is intentionally submitted to Licensor for inclusion in the Work by the copyright owner or by an individual or Legal Entity authorized to submit on behalf of the copyright owner. For the purposes of this definition, "submitted" means any form of electronic, verbal, or written communication sent to the Licensor or its representatives, including but not limited to communication on electronic mailing lists, source code control systems, and issue tracking systems that are managed by, or on behalf of, the Licensor for the purpose of discussing and improving the Work, but excluding communication that is conspicuously marked or otherwise designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity on behalf of whom a Contribution has been received by Licensor and subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable copyright license to reproduce, prepare Derivative Works of, publicly display, publicly perform, sublicense, and distribute the Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable (except as stated in this section) patent license to make, have made, use, offer to sell, sell, import, and otherwise transfer the Work, where such license applies only to those patent claims licensable by such Contributor that are necessarily infringed by their Contribution(s) alone or by combination of their Contribution(s) with the Work to which such Contribution(s) was submitted. If You institute patent litigation against any entity (including a cross-claim or counterclaim in a lawsuit) alleging that the Work or a Contribution incorporated within the Work constitutes direct or contributory patent infringement, then any patent licenses granted to You under this License for that Work shall terminate as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the Work or Derivative Works thereof in any medium, with or without modifications, and in Source or Object form, provided that You meet the following conditions:
You must give any other recipients of the Work or Derivative Works a copy of this License; and
You must cause any modified files to carry prominent notices stating that You changed the files; and
You must retain, in the Source form of any Derivative Works that You distribute, all copyright, patent, trademark, and attribution notices from the Source form of the Work, excluding those notices that do not pertain to any part of the Derivative Works; and
If the Work includes a "NOTICE" text file as part of its distribution, then any Derivative Works that You distribute must include a readable copy of the attribution notices contained within such NOTICE file, excluding those notices that do not pertain to any part of the Derivative Works, in at least one of the following places: within a NOTICE text file distributed as part of the Derivative Works; within the Source form or documentation, if provided along with the Derivative Works; or, within a display generated by the Derivative Works, if and wherever such third-party notices normally appear. The contents of the NOTICE file are for informational purposes only and do not modify the License. You may add Your own attribution notices within Derivative Works that You distribute, alongside or as an addendum to the NOTICE text from the Work, provided that such additional attribution notices cannot be construed as modifying the License.
You may add Your own copyright statement to Your modifications and may provide additional or different license terms and conditions for use, reproduction, or distribution of Your modifications, or for any such Derivative Works as a whole, provided Your use, reproduction, and distribution of the Work otherwise complies with the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise, any Contribution intentionally submitted for inclusion in the Work by You to the Licensor shall be under the terms and conditions of this License, without any additional terms or conditions. Notwithstanding the above, nothing herein shall supersede or modify the terms of any separate license agreement you may have executed with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade names, trademarks, service marks, or product names of the Licensor, except as required for reasonable and customary use in describing the origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or agreed to in writing, Licensor provides the Work (and each Contributor provides its Contributions) on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied, including, without limitation, any warranties or conditions of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A PARTICULAR PURPOSE. You are solely responsible for determining the appropriateness of using or redistributing the Work and assume any risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory, whether in tort (including negligence), contract, or otherwise, unless required by applicable law (such as deliberate and grossly negligent acts) or agreed to in writing, shall any Contributor be liable to You for damages, including any direct, indirect, special, incidental, or consequential damages of any character arising as a result of this License or out of the use or inability to use the Work (including but not limited to damages for loss of goodwill, work stoppage, computer failure or malfunction, or any and all other commercial damages or losses), even if such Contributor has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing the Work or Derivative Works thereof, You may choose to offer, and charge a fee for, acceptance of support, warranty, indemnity, or other liability obligations and/or rights consistent with this License. However, in accepting such obligations, You may act only on Your own behalf and on Your sole responsibility, not on behalf of any other Contributor, and only if You agree to indemnify, defend, and hold each Contributor harmless for any liability incurred by, or claims asserted against, such Contributor by reason of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS

View File

@@ -1,90 +0,0 @@
# CLeonOS-Wine (Native)
CLeonOS-Wine 现在改为自研运行器:基于 Python + Unicorn直接运行 CLeonOS x86_64 用户 ELF。
不再依赖 Qiling。
## 文件结构
- `wine/cleonos_wine.py`:兼容入口脚本
- `wine/cleonos_wine_lib/cli.py`:命令行参数与启动流程
- `wine/cleonos_wine_lib/runner.py`ELF 装载、执行、syscall 分发
- `wine/cleonos_wine_lib/state.py`:内核态统计与共享状态
- `wine/cleonos_wine_lib/input_pump.py`:主机键盘输入线程
- `wine/cleonos_wine_lib/constants.py`:常量与 syscall ID
- `wine/cleonos_wine_lib/platform.py`Unicorn 导入与平台适配
- `wine/requirements.txt`Python 依赖Unicorn + pygame
## 安装
```bash
pip install -r wine/requirements.txt
```
## 运行
```bash
python wine/cleonos_wine.py /hello.elf --rootfs build/x86_64/ramdisk_root
python wine/cleonos_wine.py /shell/shell.elf --rootfs build/x86_64/ramdisk_root
python wine/cleonos_wine.py /shell/qrcode.elf --rootfs build/x86_64/ramdisk_root --fb-window -- --ecc H "hello wine"
```
也支持直接传宿主路径:
```bash
python wine/cleonos_wine.py build/x86_64/ramdisk_root/shell/shell.elf --rootfs build/x86_64/ramdisk_root
```
## 支持
- ELF64 (x86_64) PT_LOAD 段装载
- CLeonOS `int 0x80` syscall 0..94(含 `FD_*``DL_*``FB_*``PROC_*``STATS_*``EXEC_PATHV_IO``KERNEL_VERSION``DISK_*`
- TTY 输出与键盘输入队列
- rootfs 文件/目录访问(`FS_*`
- `/temp` 与已挂载磁盘路径写入限制(`FS_MKDIR/WRITE/APPEND/REMOVE`
- `EXEC_PATH/EXEC_PATHV` 执行 ELF带深度限制
- `EXEC_PATHV_IO`(支持 stdio fd 继承/重定向)
- `SPAWN_PATH/SPAWN_PATHV/WAITPID/EXIT/SLEEP_TICKS/YIELD`
- 进程 `argv/env` 查询(`PROC_ARGC/PROC_ARGV/PROC_ENVC/PROC_ENV`
- 进程枚举与快照(`PROC_COUNT/PROC_PID_AT/PROC_SNAPSHOT/PROC_KILL`
- syscall 统计(`STATS_TOTAL/STATS_ID_COUNT/STATS_RECENT_*`
- 文件描述符(`FD_OPEN/FD_READ/FD_WRITE/FD_CLOSE/FD_DUP`
- 动态库兼容加载(`DL_OPEN/DL_CLOSE/DL_SYM`,基于 ELF 符号解析)
- framebuffer 兼容(`FB_INFO/FB_BLIT/FB_CLEAR`,支持内存缓冲与窗口显示)
- 内核版本查询(`KERNEL_VERSION`
- 磁盘接口兼容(`DISK_PRESENT/SIZE_BYTES/SECTOR_COUNT/FORMATTED/FORMAT_FAT32/MOUNT/MOUNTED/MOUNT_PATH/READ_SECTOR/WRITE_SECTOR`
- Wine 虚拟磁盘目录默认位于 `<rootfs>/__clks_disk0__`(格式化标记文件 `.fat32`,原始扇区后端文件 `.rawdisk.img`
- 异常退出状态编码与故障元信息(`PROC_LAST_SIGNAL/PROC_FAULT_*`
## 版本策略
- CLeonOS-Wine 版本号固定为:`85.0.0-wine`
- 该值按项目策略固定,不再随新增 syscall 变更(即使当前实现范围已扩展到 `0..94`
## 参数
- `--no-kbd`:关闭输入线程
- `--fb-window`:启用 framebuffer 窗口显示pygame
- `--fb-scale N`:窗口缩放倍数(默认 `2`
- `--fb-max-fps N`:窗口刷新上限(默认 `60`
- `--fb-hold-ms N`:程序退出后窗口保留毫秒数(默认 `2500`,静态图更容易看清)
- `--argv-line "..."`:直接指定 guest 参数行(等价于 shell 参数字符串)
- `--cwd PATH`:写入命令上下文中的工作目录(默认 `/`
- `--` 之后内容:作为 guest argv 透传(推荐)
- `--max-exec-depth N`:设置 exec 嵌套深度上限
- `--verbose`:打印更多日志
- 环境变量 `CLEONOS_WINE_DISK_SIZE_MB`:设置 Wine 虚拟磁盘容量MB默认 `64`
## `execv/spawnv` 参数格式
- `argv_line`:按空白字符分词(与内核当前实现一致,不支持引号转义)。
- `env_line`:按 `;` 或换行切分环境变量项,会去掉每项末尾空白。
- 子进程 `argv[0]` 固定为目标程序路径(如 `/shell/ls.elf`)。
## 退出状态说明
- 正常退出:返回普通退出码。
- 异常退出:最高位为 `1`,并编码:
- bits `7:0` = signal
- bits `15:8` = CPU exception vector
- bits `31:16` = error code 低 16 位

View File

@@ -1,944 +0,0 @@
#!/usr/bin/env python3
"""
CLeonOS-Wine (Qiling backend)
Run CLeonOS x86_64 user ELF binaries on host OSes with a lightweight syscall shim.
"""
from __future__ import annotations
import argparse
import collections
import os
import struct
import sys
import threading
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Deque, Optional, Tuple
try:
from qiling import Qiling
except Exception as exc:
print("[WINE][ERROR] qiling import failed. Install dependencies first:", file=sys.stderr)
print(" pip install -r wine/requirements.txt", file=sys.stderr)
raise SystemExit(1) from exc
try:
from unicorn import UC_PROT_EXEC, UC_PROT_READ, UC_PROT_WRITE
except Exception:
UC_PROT_READ = 1
UC_PROT_WRITE = 2
UC_PROT_EXEC = 4
U64_MASK = (1 << 64) - 1
MAX_CSTR = 4096
MAX_IO_READ = 1 << 20
DEFAULT_MAX_EXEC_DEPTH = 6
FS_NAME_MAX = 96
# CLeonOS syscall IDs from cleonos/c/include/cleonos_syscall.h
SYS_LOG_WRITE = 0
SYS_TIMER_TICKS = 1
SYS_TASK_COUNT = 2
SYS_CUR_TASK = 3
SYS_SERVICE_COUNT = 4
SYS_SERVICE_READY_COUNT = 5
SYS_CONTEXT_SWITCHES = 6
SYS_KELF_COUNT = 7
SYS_KELF_RUNS = 8
SYS_FS_NODE_COUNT = 9
SYS_FS_CHILD_COUNT = 10
SYS_FS_GET_CHILD_NAME = 11
SYS_FS_READ = 12
SYS_EXEC_PATH = 13
SYS_EXEC_REQUESTS = 14
SYS_EXEC_SUCCESS = 15
SYS_USER_SHELL_READY = 16
SYS_USER_EXEC_REQUESTED = 17
SYS_USER_LAUNCH_TRIES = 18
SYS_USER_LAUNCH_OK = 19
SYS_USER_LAUNCH_FAIL = 20
SYS_TTY_COUNT = 21
SYS_TTY_ACTIVE = 22
SYS_TTY_SWITCH = 23
SYS_TTY_WRITE = 24
SYS_TTY_WRITE_CHAR = 25
SYS_KBD_GET_CHAR = 26
def u64(value: int) -> int:
return value & U64_MASK
def u64_neg1() -> int:
return U64_MASK
@dataclass
class SharedKernelState:
start_ns: int = field(default_factory=time.monotonic_ns)
task_count: int = 5
current_task: int = 0
service_count: int = 7
service_ready: int = 7
context_switches: int = 0
kelf_count: int = 2
kelf_runs: int = 0
exec_requests: int = 0
exec_success: int = 0
user_shell_ready: int = 1
user_exec_requested: int = 0
user_launch_tries: int = 0
user_launch_ok: int = 0
user_launch_fail: int = 0
tty_count: int = 4
tty_active: int = 0
kbd_queue: Deque[int] = field(default_factory=collections.deque)
kbd_lock: threading.Lock = field(default_factory=threading.Lock)
def timer_ticks(self) -> int:
# Millisecond style tick is enough for shell/user utilities.
return (time.monotonic_ns() - self.start_ns) // 1_000_000
def push_key(self, key: int) -> None:
with self.kbd_lock:
if len(self.kbd_queue) >= 1024:
self.kbd_queue.popleft()
self.kbd_queue.append(key & 0xFF)
def pop_key(self) -> Optional[int]:
with self.kbd_lock:
if not self.kbd_queue:
return None
return self.kbd_queue.popleft()
class InputPump:
def __init__(self, state: SharedKernelState) -> None:
self.state = state
self._stop = threading.Event()
self._thread: Optional[threading.Thread] = None
self._posix_term_state = None
def start(self) -> None:
if self._thread is not None:
return
if not sys.stdin or not hasattr(sys.stdin, "isatty") or not sys.stdin.isatty():
return
self._thread = threading.Thread(target=self._run, name="cleonos-wine-input", daemon=True)
self._thread.start()
def stop(self) -> None:
self._stop.set()
if self._thread is not None:
self._thread.join(timeout=0.2)
self._thread = None
self._restore_posix_tty()
def _run(self) -> None:
if os.name == "nt":
self._run_windows()
else:
self._run_posix()
def _run_windows(self) -> None:
import msvcrt # pylint: disable=import-error
while not self._stop.is_set():
if not msvcrt.kbhit():
time.sleep(0.005)
continue
ch = msvcrt.getwch()
if ch in ("\x00", "\xe0"):
# Extended scan code second byte, ignore for now.
_ = msvcrt.getwch()
continue
norm = self._normalize_char(ch)
if norm is None:
continue
self.state.push_key(ord(norm))
def _run_posix(self) -> None:
import select
import termios
import tty
fd = sys.stdin.fileno()
self._posix_term_state = termios.tcgetattr(fd)
tty.setcbreak(fd)
try:
while not self._stop.is_set():
readable, _, _ = select.select([sys.stdin], [], [], 0.05)
if not readable:
continue
ch = sys.stdin.read(1)
norm = self._normalize_char(ch)
if norm is None:
continue
self.state.push_key(ord(norm))
finally:
self._restore_posix_tty()
def _restore_posix_tty(self) -> None:
if self._posix_term_state is None:
return
try:
import termios
fd = sys.stdin.fileno()
termios.tcsetattr(fd, termios.TCSADRAIN, self._posix_term_state)
except Exception:
pass
finally:
self._posix_term_state = None
@staticmethod
def _normalize_char(ch: str) -> Optional[str]:
if not ch:
return None
if ch == "\r":
return "\n"
return ch
class CLeonOSWine:
def __init__(
self,
elf_path: Path,
rootfs: Path,
guest_path_hint: str,
*,
state: Optional[SharedKernelState] = None,
depth: int = 0,
max_exec_depth: int = DEFAULT_MAX_EXEC_DEPTH,
no_kbd: bool = False,
verbose: bool = False,
top_level: bool = True,
) -> None:
self.elf_path = elf_path
self.rootfs = rootfs
self.guest_path_hint = guest_path_hint
self.state = state if state is not None else SharedKernelState()
self.depth = depth
self.max_exec_depth = max_exec_depth
self.no_kbd = no_kbd
self.verbose = verbose
self.top_level = top_level
self.entry = self._read_elf_entry(elf_path)
self.exit_code: Optional[int] = None
self._input_pump: Optional[InputPump] = None
self._stack_base = 0x00007FFF00000000
self._stack_size = 0x0000000000020000
self._ret_sentinel = self._stack_base + 0x1000
def run(self) -> Optional[int]:
if self.entry == 0:
print(f"[WINE][ERROR] ELF entry is 0, cannot run: {self.elf_path}", file=sys.stderr)
return None
ql = self._new_ql()
self._install_hooks(ql)
self._prepare_custom_stack(ql)
if self.top_level and not self.no_kbd:
self._input_pump = InputPump(self.state)
self._input_pump.start()
try:
try:
ql.run(begin=self.entry)
except TypeError:
ql.run()
except Exception as exc:
if self.verbose or self.top_level:
print(f"[WINE][ERROR] runtime crashed: {exc}", file=sys.stderr)
return None
finally:
if self.top_level and self._input_pump is not None:
self._input_pump.stop()
if self.exit_code is None:
self.exit_code = self._read_reg(ql, "rax")
return u64(self.exit_code)
def _new_ql(self) -> Qiling:
kwargs = {}
level = self._resolve_qiling_verbose(self.verbose)
if level is not None:
kwargs["verbose"] = level
try:
return Qiling([str(self.elf_path)], str(self.rootfs), **kwargs)
except Exception as exc:
if self.verbose or self.top_level:
print(f"[WINE][WARN] native ELF loader failed, fallback to BLOB mode: {exc}", file=sys.stderr)
return self._new_ql_blob(kwargs)
def _new_ql_blob(self, kwargs: dict) -> Qiling:
arch, ostype = self._resolve_blob_arch_os()
last_error: Optional[Exception] = None
creators = (
lambda: Qiling([], str(self.rootfs), ostype=ostype, archtype=arch, **kwargs),
lambda: Qiling(argv=[], rootfs=str(self.rootfs), ostype=ostype, archtype=arch, **kwargs),
lambda: Qiling(code=b"\x90", rootfs=str(self.rootfs), ostype=ostype, archtype=arch, **kwargs),
)
ql = None
for create in creators:
try:
ql = create()
break
except Exception as exc:
last_error = exc
if ql is None:
raise RuntimeError(f"unable to create qiling blob instance: {last_error}")
self._load_elf_segments_into_ql(ql, self.elf_path)
return ql
@staticmethod
def _resolve_qiling_verbose(user_verbose: bool):
try:
from qiling.const import QL_VERBOSE # pylint: disable=import-outside-toplevel
except Exception:
return None
if user_verbose:
for name in ("DEBUG", "DEFAULT"):
if hasattr(QL_VERBOSE, name):
return getattr(QL_VERBOSE, name)
return None
for name in ("DISABLED", "OFF", "DEFAULT"):
if hasattr(QL_VERBOSE, name):
return getattr(QL_VERBOSE, name)
return None
@staticmethod
def _resolve_blob_arch_os() -> Tuple[object, object]:
from qiling.const import QL_ARCH, QL_OS # pylint: disable=import-outside-toplevel
arch = None
for name in ("X8664", "X86_64", "X86_64BITS"):
if hasattr(QL_ARCH, name):
arch = getattr(QL_ARCH, name)
break
ostype = None
for name in ("BLOB", "NONE"):
if hasattr(QL_OS, name):
ostype = getattr(QL_OS, name)
break
if arch is None or ostype is None:
raise RuntimeError("qiling BLOB arch/os constants not found")
return arch, ostype
def _load_elf_segments_into_ql(self, ql: Qiling, path: Path) -> None:
data = path.read_bytes()
if len(data) < 64 or data[0:4] != b"\x7fELF" or data[4] != 2 or data[5] != 1:
raise RuntimeError(f"unsupported ELF format: {path}")
phoff = struct.unpack_from("<Q", data, 0x20)[0]
phentsize = struct.unpack_from("<H", data, 0x36)[0]
phnum = struct.unpack_from("<H", data, 0x38)[0]
if phentsize == 0 or phnum == 0:
raise RuntimeError(f"ELF program headers missing: {path}")
loaded = 0
for i in range(phnum):
off = phoff + i * phentsize
if off + 56 > len(data):
break
p_type, p_flags, p_offset, p_vaddr, _p_paddr, p_filesz, p_memsz, _p_align = struct.unpack_from(
"<IIQQQQQQ", data, off
)
if p_type != 1 or p_memsz == 0:
continue
map_start = int(p_vaddr & ~0xFFF)
map_end = int((p_vaddr + p_memsz + 0xFFF) & ~0xFFF)
map_size = map_end - map_start
if map_size <= 0:
continue
self._map_memory(ql, map_start, map_size)
if p_filesz > 0:
file_start = int(p_offset)
file_end = min(len(data), file_start + int(p_filesz))
if file_start < file_end:
seg_data = data[file_start:file_end]
if not self._safe_write_mem(ql, int(p_vaddr), seg_data):
raise RuntimeError(f"failed to write PT_LOAD segment at 0x{int(p_vaddr):X}")
perms = 0
if p_flags & 0x4:
perms |= UC_PROT_READ
if p_flags & 0x2:
perms |= UC_PROT_WRITE
if p_flags & 0x1:
perms |= UC_PROT_EXEC
if perms == 0:
perms = UC_PROT_READ
try:
ql.mem.protect(map_start, map_size, perms)
except Exception:
pass
loaded += 1
if loaded == 0:
raise RuntimeError(f"no PT_LOAD segments found: {path}")
def _install_hooks(self, ql: Qiling) -> None:
hooker = getattr(ql, "hook_intno", None)
if hooker is None:
hooker = getattr(ql, "hook_intr", None)
if hooker is None:
raise RuntimeError("Qiling interrupt hook API not found")
installed = False
for fn in (
lambda: hooker(self._on_int80, 0x80),
lambda: hooker(0x80, self._on_int80),
):
try:
fn()
installed = True
break
except TypeError:
continue
if not installed:
raise RuntimeError("Failed to install INT 0x80 hook")
hook_addr = getattr(ql, "hook_address", None)
if hook_addr is None:
raise RuntimeError("Qiling address hook API not found")
installed = False
for fn in (
lambda: hook_addr(self._on_entry_return, self._ret_sentinel),
lambda: hook_addr(self._ret_sentinel, self._on_entry_return),
):
try:
fn()
installed = True
break
except TypeError:
continue
if not installed:
raise RuntimeError("Failed to install return sentinel hook")
def _prepare_custom_stack(self, ql: Qiling) -> None:
self._map_memory(ql, self._stack_base, self._stack_size)
if not self._safe_write_mem(ql, self._ret_sentinel, b"\xC3"):
raise RuntimeError("failed to place return sentinel")
rsp = self._stack_base + self._stack_size - 8
if not self._safe_write_mem(ql, rsp, struct.pack("<Q", self._ret_sentinel)):
raise RuntimeError("failed to initialize custom stack")
self._write_reg(ql, "rsp", rsp)
self._write_reg(ql, "rbp", rsp)
self._write_reg(ql, "rip", self.entry)
def _map_memory(self, ql: Qiling, addr: int, size: int) -> None:
try:
ql.mem.map(addr, size, info="[cleonos-wine-stack]")
return
except TypeError:
# Older qiling/unicorn builds may not support "info".
pass
except Exception:
# Already mapped is acceptable.
return
try:
ql.mem.map(addr, size)
except Exception:
# Already mapped is acceptable.
pass
def _on_entry_return(self, ql: Qiling, *_args) -> None:
self.exit_code = self._read_reg(ql, "rax")
ql.emu_stop()
def _on_int80(self, ql: Qiling, *_args) -> None:
syscall_id = self._read_reg(ql, "rax")
arg0 = self._read_reg(ql, "rbx")
arg1 = self._read_reg(ql, "rcx")
arg2 = self._read_reg(ql, "rdx")
self.state.context_switches = u64(self.state.context_switches + 1)
ret = self._dispatch_syscall(ql, syscall_id, arg0, arg1, arg2)
self._write_reg(ql, "rax", u64(ret))
def _dispatch_syscall(self, ql: Qiling, sid: int, arg0: int, arg1: int, arg2: int) -> int:
if sid == SYS_LOG_WRITE:
data = self._read_guest_bytes(ql, arg0, arg1)
self._host_write(data.decode("utf-8", errors="replace"))
return len(data)
if sid == SYS_TIMER_TICKS:
return self.state.timer_ticks()
if sid == SYS_TASK_COUNT:
return self.state.task_count
if sid == SYS_CUR_TASK:
return self.state.current_task
if sid == SYS_SERVICE_COUNT:
return self.state.service_count
if sid == SYS_SERVICE_READY_COUNT:
return self.state.service_ready
if sid == SYS_CONTEXT_SWITCHES:
return self.state.context_switches
if sid == SYS_KELF_COUNT:
return self.state.kelf_count
if sid == SYS_KELF_RUNS:
return self.state.kelf_runs
if sid == SYS_FS_NODE_COUNT:
return self._fs_node_count()
if sid == SYS_FS_CHILD_COUNT:
return self._fs_child_count(ql, arg0)
if sid == SYS_FS_GET_CHILD_NAME:
return self._fs_get_child_name(ql, arg0, arg1, arg2)
if sid == SYS_FS_READ:
return self._fs_read(ql, arg0, arg1, arg2)
if sid == SYS_EXEC_PATH:
return self._exec_path(ql, arg0)
if sid == SYS_EXEC_REQUESTS:
return self.state.exec_requests
if sid == SYS_EXEC_SUCCESS:
return self.state.exec_success
if sid == SYS_USER_SHELL_READY:
return self.state.user_shell_ready
if sid == SYS_USER_EXEC_REQUESTED:
return self.state.user_exec_requested
if sid == SYS_USER_LAUNCH_TRIES:
return self.state.user_launch_tries
if sid == SYS_USER_LAUNCH_OK:
return self.state.user_launch_ok
if sid == SYS_USER_LAUNCH_FAIL:
return self.state.user_launch_fail
if sid == SYS_TTY_COUNT:
return self.state.tty_count
if sid == SYS_TTY_ACTIVE:
return self.state.tty_active
if sid == SYS_TTY_SWITCH:
if arg0 >= self.state.tty_count:
return u64_neg1()
self.state.tty_active = int(arg0)
return 0
if sid == SYS_TTY_WRITE:
data = self._read_guest_bytes(ql, arg0, arg1)
self._host_write(data.decode("utf-8", errors="replace"))
return len(data)
if sid == SYS_TTY_WRITE_CHAR:
ch = chr(arg0 & 0xFF)
if ch in ("\b", "\x7f"):
self._host_write("\b \b")
else:
self._host_write(ch)
return 0
if sid == SYS_KBD_GET_CHAR:
key = self.state.pop_key()
return u64_neg1() if key is None else key
# Unknown syscall: keep app running, report failure.
return u64_neg1()
def _host_write(self, text: str) -> None:
if not text:
return
sys.stdout.write(text)
sys.stdout.flush()
def _fs_node_count(self) -> int:
count = 1 # root node
for root, dirs, files in os.walk(self.rootfs):
dirs[:] = [d for d in dirs if not d.startswith(".")]
files = [f for f in files if not f.startswith(".")]
count += len(dirs) + len(files)
_ = root
return count
def _fs_child_count(self, ql: Qiling, dir_ptr: int) -> int:
path = self._read_guest_cstring(ql, dir_ptr)
host_dir = self._guest_to_host(path, must_exist=True)
if host_dir is None or not host_dir.is_dir():
return u64_neg1()
return len(self._list_children(host_dir))
def _fs_get_child_name(self, ql: Qiling, dir_ptr: int, index: int, out_ptr: int) -> int:
if out_ptr == 0:
return 0
path = self._read_guest_cstring(ql, dir_ptr)
host_dir = self._guest_to_host(path, must_exist=True)
if host_dir is None or not host_dir.is_dir():
return 0
children = self._list_children(host_dir)
if index >= len(children):
return 0
name = children[int(index)]
encoded = name.encode("utf-8", errors="replace")
if len(encoded) >= FS_NAME_MAX:
encoded = encoded[: FS_NAME_MAX - 1]
self._safe_write_mem(ql, out_ptr, encoded + b"\x00")
return 1
def _fs_read(self, ql: Qiling, path_ptr: int, out_ptr: int, buf_size: int) -> int:
if out_ptr == 0 or buf_size == 0:
return 0
path = self._read_guest_cstring(ql, path_ptr)
host_path = self._guest_to_host(path, must_exist=True)
if host_path is None or not host_path.is_file():
return 0
read_size = int(min(buf_size, MAX_IO_READ))
try:
data = host_path.read_bytes()[:read_size]
except Exception:
return 0
if not data:
return 0
self._safe_write_mem(ql, out_ptr, data)
return len(data)
def _exec_path(self, ql: Qiling, path_ptr: int) -> int:
path = self._read_guest_cstring(ql, path_ptr)
guest_path = self._normalize_guest_path(path)
host_path = self._guest_to_host(guest_path, must_exist=True)
self.state.exec_requests = u64(self.state.exec_requests + 1)
self.state.user_exec_requested = 1
self.state.user_launch_tries = u64(self.state.user_launch_tries + 1)
if host_path is None or not host_path.is_file():
self.state.user_launch_fail = u64(self.state.user_launch_fail + 1)
return u64_neg1()
if self.depth >= self.max_exec_depth:
print(f"[WINE][WARN] exec depth exceeded: {guest_path}", file=sys.stderr)
self.state.user_launch_fail = u64(self.state.user_launch_fail + 1)
return u64_neg1()
child = CLeonOSWine(
host_path,
self.rootfs,
guest_path_hint=guest_path,
state=self.state,
depth=self.depth + 1,
max_exec_depth=self.max_exec_depth,
no_kbd=True,
verbose=self.verbose,
top_level=False,
)
child_ret = child.run()
if child_ret is None:
self.state.user_launch_fail = u64(self.state.user_launch_fail + 1)
return u64_neg1()
self.state.exec_success = u64(self.state.exec_success + 1)
self.state.user_launch_ok = u64(self.state.user_launch_ok + 1)
if guest_path.lower().startswith("/system/"):
self.state.kelf_runs = u64(self.state.kelf_runs + 1)
return 0
def _guest_to_host(self, guest_path: str, *, must_exist: bool) -> Optional[Path]:
norm = self._normalize_guest_path(guest_path)
if norm == "/":
return self.rootfs if (not must_exist or self.rootfs.exists()) else None
current = self.rootfs
for part in [p for p in norm.split("/") if p]:
candidate = current / part
if candidate.exists():
current = candidate
continue
if current.exists() and current.is_dir():
match = self._find_case_insensitive(current, part)
if match is not None:
current = match
continue
current = candidate
if must_exist and not current.exists():
return None
return current
@staticmethod
def _find_case_insensitive(parent: Path, name: str) -> Optional[Path]:
target = name.lower()
try:
for entry in parent.iterdir():
if entry.name.lower() == target:
return entry
except Exception:
return None
return None
@staticmethod
def _normalize_guest_path(path: str) -> str:
p = (path or "").replace("\\", "/").strip()
if not p:
return "/"
if not p.startswith("/"):
p = "/" + p
parts = []
for token in p.split("/"):
if token in ("", "."):
continue
if token == "..":
if parts:
parts.pop()
continue
parts.append(token)
return "/" + "/".join(parts)
@staticmethod
def _list_children(dir_path: Path):
try:
names = [entry.name for entry in dir_path.iterdir() if not entry.name.startswith(".")]
except Exception:
return []
names.sort(key=lambda x: x.lower())
return names
def _read_guest_cstring(self, ql: Qiling, addr: int, max_len: int = MAX_CSTR) -> str:
if addr == 0:
return ""
out = bytearray()
for i in range(max_len):
try:
chunk = ql.mem.read(addr + i, 1)
except Exception:
break
if not chunk or chunk[0] == 0:
break
out.append(chunk[0])
return out.decode("utf-8", errors="replace")
def _read_guest_bytes(self, ql: Qiling, addr: int, size: int) -> bytes:
if addr == 0 or size == 0:
return b""
safe_size = int(min(size, MAX_IO_READ))
try:
return bytes(ql.mem.read(addr, safe_size))
except Exception:
return b""
@staticmethod
def _safe_write_mem(ql: Qiling, addr: int, data: bytes) -> bool:
if addr == 0 or not data:
return False
try:
ql.mem.write(addr, data)
return True
except Exception:
return False
@staticmethod
def _read_reg(ql: Qiling, reg: str) -> int:
arch_regs = getattr(getattr(ql, "arch", None), "regs", None)
if arch_regs is not None:
if hasattr(arch_regs, reg):
return int(getattr(arch_regs, reg))
if hasattr(arch_regs, "read"):
try:
return int(arch_regs.read(reg))
except Exception:
pass
reg_obj = getattr(ql, "reg", None)
if reg_obj is not None:
if hasattr(reg_obj, reg):
return int(getattr(reg_obj, reg))
if hasattr(reg_obj, "read"):
try:
return int(reg_obj.read(reg))
except Exception:
pass
raise RuntimeError(f"cannot read register: {reg}")
@staticmethod
def _write_reg(ql: Qiling, reg: str, value: int) -> None:
v = u64(value)
arch_regs = getattr(getattr(ql, "arch", None), "regs", None)
if arch_regs is not None:
if hasattr(arch_regs, reg):
setattr(arch_regs, reg, v)
return
if hasattr(arch_regs, "write"):
try:
arch_regs.write(reg, v)
return
except Exception:
pass
reg_obj = getattr(ql, "reg", None)
if reg_obj is not None:
if hasattr(reg_obj, reg):
setattr(reg_obj, reg, v)
return
if hasattr(reg_obj, "write"):
try:
reg_obj.write(reg, v)
return
except Exception:
pass
raise RuntimeError(f"cannot write register: {reg}")
@staticmethod
def _read_elf_entry(path: Path) -> int:
try:
data = path.read_bytes()[:64]
except Exception:
return 0
if len(data) < 64:
return 0
if data[0:4] != b"\x7fELF":
return 0
if data[4] != 2 or data[5] != 1:
return 0
return struct.unpack_from("<Q", data, 0x18)[0]
def resolve_rootfs(path_arg: Optional[str]) -> Path:
if path_arg:
root = Path(path_arg).expanduser().resolve()
if not root.exists() or not root.is_dir():
raise FileNotFoundError(f"rootfs not found: {root}")
return root
candidates = [
Path("build/x86_64/ramdisk_root"),
Path("ramdisk"),
]
for candidate in candidates:
if candidate.exists() and candidate.is_dir():
return candidate.resolve()
raise FileNotFoundError("rootfs not found; pass --rootfs")
def _guest_to_host_for_resolve(rootfs: Path, guest_path: str) -> Optional[Path]:
norm = CLeonOSWine._normalize_guest_path(guest_path)
if norm == "/":
return rootfs
current = rootfs
for part in [p for p in norm.split("/") if p]:
candidate = current / part
if candidate.exists():
current = candidate
continue
if current.exists() and current.is_dir():
match = None
for entry in current.iterdir():
if entry.name.lower() == part.lower():
match = entry
break
if match is not None:
current = match
continue
current = candidate
if current.exists():
return current
return None
def resolve_elf_target(elf_arg: str, rootfs: Path) -> Tuple[Path, str]:
host_candidate = Path(elf_arg).expanduser()
if host_candidate.exists():
host_path = host_candidate.resolve()
try:
rel = host_path.relative_to(rootfs)
guest_path = "/" + rel.as_posix()
except ValueError:
guest_path = "/" + host_path.name
return host_path, guest_path
# Fallback: treat as CLeonOS guest path.
guest_path = CLeonOSWine._normalize_guest_path(elf_arg)
host_path = _guest_to_host_for_resolve(rootfs, guest_path)
if host_path is None:
raise FileNotFoundError(f"ELF not found as host path or guest path: {elf_arg}")
return host_path.resolve(), guest_path
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="CLeonOS-Wine: run CLeonOS ELF with Qiling.")
parser.add_argument("elf", help="Target ELF path. Supports /guest/path or host file path.")
parser.add_argument("--rootfs", help="Rootfs directory (default: build/x86_64/ramdisk_root).")
parser.add_argument("--no-kbd", action="store_true", help="Disable host keyboard input pump.")
parser.add_argument("--max-exec-depth", type=int, default=DEFAULT_MAX_EXEC_DEPTH, help="Nested exec depth guard.")
parser.add_argument("--verbose", action="store_true", help="Enable verbose runner output.")
return parser.parse_args()
def main() -> int:
args = parse_args()
try:
rootfs = resolve_rootfs(args.rootfs)
elf_path, guest_path = resolve_elf_target(args.elf, rootfs)
except Exception as exc:
print(f"[WINE][ERROR] {exc}", file=sys.stderr)
return 2
if args.verbose:
print(f"[WINE] rootfs={rootfs}", file=sys.stderr)
print(f"[WINE] elf={elf_path}", file=sys.stderr)
print(f"[WINE] guest={guest_path}", file=sys.stderr)
state = SharedKernelState()
runner = CLeonOSWine(
elf_path=elf_path,
rootfs=rootfs,
guest_path_hint=guest_path,
state=state,
max_exec_depth=max(1, args.max_exec_depth),
no_kbd=args.no_kbd,
verbose=args.verbose,
top_level=True,
)
ret = runner.run()
if ret is None:
return 1
if args.verbose:
print(f"\n[WINE] exit=0x{ret:016X}", file=sys.stderr)
return int(ret & 0xFF)
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -1,7 +0,0 @@
#!/usr/bin/env python3
from cleonos_wine_lib.cli import main
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -1,5 +0,0 @@
from .cli import main
from .runner import CLeonOSWineNative
from .state import SharedKernelState
__all__ = ["main", "CLeonOSWineNative", "SharedKernelState"]

View File

@@ -1,124 +0,0 @@
from __future__ import annotations
import argparse
import shlex
import sys
from pathlib import Path
from typing import List
from .constants import DEFAULT_MAX_EXEC_DEPTH
from .runner import CLeonOSWineNative, resolve_elf_target, resolve_rootfs
from .state import SharedKernelState
def _encode_cstr(text: str, size: int) -> bytes:
if size <= 0:
return b""
data = text.encode("utf-8", errors="replace")
if len(data) >= size:
data = data[: size - 1]
return data + (b"\x00" * (size - len(data)))
def _normalize_guest_cwd(cwd: str) -> str:
value = (cwd or "").strip().replace("\\", "/")
if not value:
return "/"
if not value.startswith("/"):
value = "/" + value
return value
def _write_command_context(rootfs: Path, guest_path: str, guest_args: List[str], cwd: str) -> None:
name = Path(guest_path).name
cmd = name[:-4] if name.lower().endswith(".elf") else name
arg = " ".join(guest_args)
ctx_payload = b"".join(
(
_encode_cstr(cmd, 32),
_encode_cstr(arg, 160),
_encode_cstr(_normalize_guest_cwd(cwd), 192),
)
)
ctx_path = rootfs / "temp" / ".ush_cmd_ctx.bin"
ctx_path.parent.mkdir(parents=True, exist_ok=True)
ctx_path.write_bytes(ctx_payload)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="CLeonOS-Wine: run CLeonOS ELF with Unicorn.")
parser.add_argument("elf", help="Target ELF path. Supports /guest/path or host file path.")
parser.add_argument("--rootfs", help="Rootfs directory (default: build/x86_64/ramdisk_root).")
parser.add_argument("--no-kbd", action="store_true", help="Disable host keyboard input pump.")
parser.add_argument("--fb-window", action="store_true", help="Enable host framebuffer window (pygame backend).")
parser.add_argument("--fb-scale", type=int, default=2, help="Framebuffer window scale factor (default: 2).")
parser.add_argument("--fb-max-fps", type=int, default=60, help="Framebuffer present FPS limit (default: 60).")
parser.add_argument("--fb-hold-ms", type=int, default=2500, help="Keep fb window visible after app exits (ms).")
parser.add_argument("--argv-line", default="", help="Guest argv as one line (whitespace-separated).")
parser.add_argument("--cwd", default="/", help="Guest cwd for command-context apps.")
parser.add_argument("guest_args", nargs="*", help="Guest args (for dash-prefixed args use '--').")
parser.add_argument("--max-exec-depth", type=int, default=DEFAULT_MAX_EXEC_DEPTH, help="Nested exec depth guard.")
parser.add_argument("--verbose", action="store_true", help="Enable verbose runner output.")
return parser.parse_args()
def main() -> int:
args = parse_args()
guest_args = list(args.guest_args or [])
argv_items: List[str]
try:
rootfs = resolve_rootfs(args.rootfs)
elf_path, guest_path = resolve_elf_target(args.elf, rootfs)
except Exception as exc:
print(f"[WINE][ERROR] {exc}", file=sys.stderr)
return 2
if len(guest_args) > 0 and guest_args[0] == "--":
guest_args = guest_args[1:]
if (args.argv_line or "").strip():
try:
guest_args = shlex.split(args.argv_line)
except Exception:
guest_args = (args.argv_line or "").split()
argv_items = [guest_path]
argv_items.extend(guest_args)
if guest_args:
try:
_write_command_context(rootfs, guest_path, guest_args, args.cwd)
except Exception as exc:
print(f"[WINE][WARN] failed to write command context: {exc}", file=sys.stderr)
if args.verbose:
print("[WINE] backend=unicorn", file=sys.stderr)
print(f"[WINE] rootfs={rootfs}", file=sys.stderr)
print(f"[WINE] elf={elf_path}", file=sys.stderr)
print(f"[WINE] guest={guest_path}", file=sys.stderr)
state = SharedKernelState()
runner = CLeonOSWineNative(
elf_path=elf_path,
rootfs=rootfs,
guest_path_hint=guest_path,
state=state,
max_exec_depth=max(1, args.max_exec_depth),
no_kbd=args.no_kbd,
verbose=args.verbose,
top_level=True,
fb_window=args.fb_window,
fb_scale=max(1, args.fb_scale),
fb_max_fps=max(1, args.fb_max_fps),
fb_hold_ms=max(0, args.fb_hold_ms),
argv_items=argv_items,
)
ret = runner.run()
if ret is None:
return 1
if args.verbose:
print(f"\n[WINE] exit=0x{ret:016X}", file=sys.stderr)
return int(ret & 0xFF)

View File

@@ -1,149 +0,0 @@
from __future__ import annotations
U64_MASK = (1 << 64) - 1
PAGE_SIZE = 0x1000
MAX_CSTR = 4096
MAX_IO_READ = 1 << 20
DEFAULT_MAX_EXEC_DEPTH = 6
FS_NAME_MAX = 96
CLKS_VERSION_STRING = "1.0.0-alpha"
CLEONOS_VERSION_STRING = "1.0.0-alpha"
WINE_IMPLEMENTED_SYSCALL_COUNT = 85
# Frozen policy: this version string must not change in future updates.
CLEONOS_WINE_VERSION_STRING = "85.0.0-wine"
# CLeonOS syscall IDs from cleonos/c/include/cleonos_syscall.h
SYS_LOG_WRITE = 0
SYS_TIMER_TICKS = 1
SYS_TASK_COUNT = 2
SYS_CUR_TASK = 3
SYS_SERVICE_COUNT = 4
SYS_SERVICE_READY_COUNT = 5
SYS_CONTEXT_SWITCHES = 6
SYS_KELF_COUNT = 7
SYS_KELF_RUNS = 8
SYS_FS_NODE_COUNT = 9
SYS_FS_CHILD_COUNT = 10
SYS_FS_GET_CHILD_NAME = 11
SYS_FS_READ = 12
SYS_EXEC_PATH = 13
SYS_EXEC_REQUESTS = 14
SYS_EXEC_SUCCESS = 15
SYS_USER_SHELL_READY = 16
SYS_USER_EXEC_REQUESTED = 17
SYS_USER_LAUNCH_TRIES = 18
SYS_USER_LAUNCH_OK = 19
SYS_USER_LAUNCH_FAIL = 20
SYS_TTY_COUNT = 21
SYS_TTY_ACTIVE = 22
SYS_TTY_SWITCH = 23
SYS_TTY_WRITE = 24
SYS_TTY_WRITE_CHAR = 25
SYS_KBD_GET_CHAR = 26
SYS_FS_STAT_TYPE = 27
SYS_FS_STAT_SIZE = 28
SYS_FS_MKDIR = 29
SYS_FS_WRITE = 30
SYS_FS_APPEND = 31
SYS_FS_REMOVE = 32
SYS_LOG_JOURNAL_COUNT = 33
SYS_LOG_JOURNAL_READ = 34
SYS_KBD_BUFFERED = 35
SYS_KBD_PUSHED = 36
SYS_KBD_POPPED = 37
SYS_KBD_DROPPED = 38
SYS_KBD_HOTKEY_SWITCHES = 39
SYS_GETPID = 40
SYS_SPAWN_PATH = 41
SYS_WAITPID = 42
SYS_EXIT = 43
SYS_SLEEP_TICKS = 44
SYS_YIELD = 45
SYS_SHUTDOWN = 46
SYS_RESTART = 47
SYS_AUDIO_AVAILABLE = 48
SYS_AUDIO_PLAY_TONE = 49
SYS_AUDIO_STOP = 50
SYS_EXEC_PATHV = 51
SYS_SPAWN_PATHV = 52
SYS_PROC_ARGC = 53
SYS_PROC_ARGV = 54
SYS_PROC_ENVC = 55
SYS_PROC_ENV = 56
SYS_PROC_LAST_SIGNAL = 57
SYS_PROC_FAULT_VECTOR = 58
SYS_PROC_FAULT_ERROR = 59
SYS_PROC_FAULT_RIP = 60
SYS_PROC_COUNT = 61
SYS_PROC_PID_AT = 62
SYS_PROC_SNAPSHOT = 63
SYS_PROC_KILL = 64
SYS_KDBG_SYM = 65
SYS_KDBG_BT = 66
SYS_KDBG_REGS = 67
SYS_STATS_TOTAL = 68
SYS_STATS_ID_COUNT = 69
SYS_STATS_RECENT_WINDOW = 70
SYS_STATS_RECENT_ID = 71
SYS_FD_OPEN = 72
SYS_FD_READ = 73
SYS_FD_WRITE = 74
SYS_FD_CLOSE = 75
SYS_FD_DUP = 76
SYS_DL_OPEN = 77
SYS_DL_CLOSE = 78
SYS_DL_SYM = 79
SYS_EXEC_PATHV_IO = 80
SYS_FB_INFO = 81
SYS_FB_BLIT = 82
SYS_FB_CLEAR = 83
SYS_KERNEL_VERSION = 84
SYS_DISK_PRESENT = 85
SYS_DISK_SIZE_BYTES = 86
SYS_DISK_SECTOR_COUNT = 87
SYS_DISK_FORMATTED = 88
SYS_DISK_FORMAT_FAT32 = 89
SYS_DISK_MOUNT = 90
SYS_DISK_MOUNTED = 91
SYS_DISK_MOUNT_PATH = 92
SYS_DISK_READ_SECTOR = 93
SYS_DISK_WRITE_SECTOR = 94
# proc states (from cleonos/c/include/cleonos_syscall.h)
PROC_STATE_UNUSED = 0
PROC_STATE_PENDING = 1
PROC_STATE_RUNNING = 2
PROC_STATE_EXITED = 3
PROC_STATE_STOPPED = 4
# signals (from cleonos/c/include/cleonos_syscall.h)
SIGKILL = 9
SIGTERM = 15
SIGCONT = 18
SIGSTOP = 19
# open flags (from cleonos/c/include/cleonos_syscall.h)
O_RDONLY = 0x0000
O_WRONLY = 0x0001
O_RDWR = 0x0002
O_CREAT = 0x0040
O_TRUNC = 0x0200
O_APPEND = 0x0400
FD_INHERIT = U64_MASK
def u64(value: int) -> int:
return value & U64_MASK
def u64_neg1() -> int:
return U64_MASK
def page_floor(addr: int) -> int:
return addr & ~(PAGE_SIZE - 1)
def page_ceil(addr: int) -> int:
return (addr + PAGE_SIZE - 1) & ~(PAGE_SIZE - 1)

View File

@@ -1,143 +0,0 @@
from __future__ import annotations
import sys
import time
from typing import Optional
from .state import SharedKernelState
CLEONOS_KEY_LEFT = 0x01
CLEONOS_KEY_RIGHT = 0x02
CLEONOS_KEY_UP = 0x03
CLEONOS_KEY_DOWN = 0x04
class FBWindow:
def __init__(self, pygame_mod, width: int, height: int, scale: int, max_fps: int, *, verbose: bool = False) -> None:
self._pygame = pygame_mod
self.width = int(width)
self.height = int(height)
self.scale = max(1, int(scale))
self._verbose = bool(verbose)
self._closed = False
self._last_present_ns = 0
self._frame_interval_ns = int(1_000_000_000 / max_fps) if int(max_fps) > 0 else 0
window_w = self.width * self.scale
window_h = self.height * self.scale
self._pygame.display.set_caption("CLeonOS Wine Framebuffer")
self._screen = self._pygame.display.set_mode((window_w, window_h))
@classmethod
def create(
cls,
width: int,
height: int,
scale: int,
max_fps: int,
*,
verbose: bool = False,
) -> Optional["FBWindow"]:
try:
import pygame # type: ignore
except Exception as exc:
print(f"[WINE][WARN] framebuffer window disabled: pygame import failed ({exc})", file=sys.stderr)
return None
try:
pygame.init()
pygame.display.init()
return cls(pygame, width, height, scale, max_fps, verbose=verbose)
except Exception as exc:
print(f"[WINE][WARN] framebuffer window disabled: unable to init display ({exc})", file=sys.stderr)
try:
pygame.quit()
except Exception:
pass
return None
def close(self) -> None:
if self._closed:
return
self._closed = True
try:
self._pygame.display.quit()
except Exception:
pass
try:
self._pygame.quit()
except Exception:
pass
def is_closed(self) -> bool:
return self._closed
def pump_input(self, state: SharedKernelState) -> None:
if self._closed:
return
try:
events = self._pygame.event.get()
except Exception:
self.close()
return
for event in events:
if event.type == self._pygame.QUIT:
self.close()
continue
if event.type != self._pygame.KEYDOWN:
continue
key = event.key
ch = 0
if key == self._pygame.K_LEFT:
ch = CLEONOS_KEY_LEFT
elif key == self._pygame.K_RIGHT:
ch = CLEONOS_KEY_RIGHT
elif key == self._pygame.K_UP:
ch = CLEONOS_KEY_UP
elif key == self._pygame.K_DOWN:
ch = CLEONOS_KEY_DOWN
elif key in (self._pygame.K_RETURN, self._pygame.K_KP_ENTER):
ch = ord("\n")
elif key == self._pygame.K_BACKSPACE:
ch = 8
elif key == self._pygame.K_ESCAPE:
ch = 27
elif key == self._pygame.K_TAB:
ch = ord("\t")
else:
text = getattr(event, "unicode", "")
if isinstance(text, str) and len(text) == 1:
code = ord(text)
if 32 <= code <= 126:
ch = code
if ch != 0:
state.push_key(ch)
def present(self, pixels_bgra: bytearray, *, force: bool = False) -> bool:
if self._closed:
return False
now_ns = time.monotonic_ns()
if not force and self._frame_interval_ns > 0 and (now_ns - self._last_present_ns) < self._frame_interval_ns:
return False
try:
src = self._pygame.image.frombuffer(pixels_bgra, (self.width, self.height), "BGRA")
if self.scale != 1:
src = self._pygame.transform.scale(src, (self.width * self.scale, self.height * self.scale))
self._screen.blit(src, (0, 0))
self._pygame.display.flip()
self._last_present_ns = now_ns
return True
except Exception:
self.close()
return False

View File

@@ -1,99 +0,0 @@
from __future__ import annotations
import os
import sys
import threading
import time
from typing import Optional
from .state import SharedKernelState
class InputPump:
def __init__(self, state: SharedKernelState) -> None:
self.state = state
self._stop = threading.Event()
self._thread: Optional[threading.Thread] = None
self._posix_term_state = None
def start(self) -> None:
if self._thread is not None:
return
if not sys.stdin or not hasattr(sys.stdin, "isatty") or not sys.stdin.isatty():
return
self._thread = threading.Thread(target=self._run, name="cleonos-wine-input", daemon=True)
self._thread.start()
def stop(self) -> None:
self._stop.set()
if self._thread is not None:
self._thread.join(timeout=0.2)
self._thread = None
self._restore_posix_tty()
def _run(self) -> None:
if os.name == "nt":
self._run_windows()
else:
self._run_posix()
def _run_windows(self) -> None:
import msvcrt # pylint: disable=import-error
while not self._stop.is_set():
if not msvcrt.kbhit():
time.sleep(0.005)
continue
ch = msvcrt.getwch()
if ch in ("\x00", "\xe0"):
_ = msvcrt.getwch()
continue
norm = self._normalize_char(ch)
if norm is None:
continue
self.state.push_key(ord(norm))
def _run_posix(self) -> None:
import select
import termios
import tty
fd = sys.stdin.fileno()
self._posix_term_state = termios.tcgetattr(fd)
tty.setcbreak(fd)
try:
while not self._stop.is_set():
readable, _, _ = select.select([sys.stdin], [], [], 0.05)
if not readable:
continue
ch = sys.stdin.read(1)
norm = self._normalize_char(ch)
if norm is None:
continue
self.state.push_key(ord(norm))
finally:
self._restore_posix_tty()
def _restore_posix_tty(self) -> None:
if self._posix_term_state is None:
return
try:
import termios
fd = sys.stdin.fileno()
termios.tcsetattr(fd, termios.TCSADRAIN, self._posix_term_state)
except Exception:
pass
finally:
self._posix_term_state = None
@staticmethod
def _normalize_char(ch: str) -> Optional[str]:
if not ch:
return None
if ch == "\r":
return "\n"
return ch

View File

@@ -1,59 +0,0 @@
from __future__ import annotations
import sys
try:
from unicorn import Uc, UcError
from unicorn import UC_ARCH_X86, UC_MODE_64
from unicorn import UC_HOOK_CODE, UC_HOOK_INTR
from unicorn import UC_PROT_ALL, UC_PROT_EXEC, UC_PROT_READ, UC_PROT_WRITE
from unicorn import (
UC_ERR_FETCH_PROT,
UC_ERR_FETCH_UNMAPPED,
UC_ERR_INSN_INVALID,
UC_ERR_READ_PROT,
UC_ERR_READ_UNMAPPED,
UC_ERR_WRITE_PROT,
UC_ERR_WRITE_UNMAPPED,
)
from unicorn.x86_const import (
UC_X86_REG_RAX,
UC_X86_REG_RBX,
UC_X86_REG_RCX,
UC_X86_REG_RDX,
UC_X86_REG_RBP,
UC_X86_REG_RIP,
UC_X86_REG_RSP,
)
except Exception as exc:
print("[WINE][ERROR] unicorn import failed. Install dependencies first:", file=sys.stderr)
print(" pip install -r wine/requirements.txt", file=sys.stderr)
raise SystemExit(1) from exc
__all__ = [
"Uc",
"UcError",
"UC_ARCH_X86",
"UC_MODE_64",
"UC_HOOK_CODE",
"UC_HOOK_INTR",
"UC_PROT_ALL",
"UC_PROT_EXEC",
"UC_PROT_READ",
"UC_PROT_WRITE",
"UC_ERR_FETCH_PROT",
"UC_ERR_FETCH_UNMAPPED",
"UC_ERR_INSN_INVALID",
"UC_ERR_READ_PROT",
"UC_ERR_READ_UNMAPPED",
"UC_ERR_WRITE_PROT",
"UC_ERR_WRITE_UNMAPPED",
"UC_X86_REG_RAX",
"UC_X86_REG_RBX",
"UC_X86_REG_RCX",
"UC_X86_REG_RDX",
"UC_X86_REG_RBP",
"UC_X86_REG_RIP",
"UC_X86_REG_RSP",
]

File diff suppressed because it is too large Load Diff

View File

@@ -1,354 +0,0 @@
from __future__ import annotations
import collections
import threading
import time
from dataclasses import dataclass, field
from typing import Deque, Dict, List, Optional, Tuple
from .constants import PROC_STATE_EXITED, PROC_STATE_PENDING, PROC_STATE_RUNNING, PROC_STATE_STOPPED, u64
@dataclass
class SharedKernelState:
start_ns: int = field(default_factory=time.monotonic_ns)
task_count: int = 5
current_task: int = 0
service_count: int = 7
service_ready: int = 7
context_switches: int = 0
kelf_count: int = 2
kelf_runs: int = 0
exec_requests: int = 0
exec_success: int = 0
user_shell_ready: int = 1
user_exec_requested: int = 0
user_launch_tries: int = 0
user_launch_ok: int = 0
user_launch_fail: int = 0
tty_count: int = 4
tty_active: int = 0
kbd_queue: Deque[int] = field(default_factory=collections.deque)
kbd_lock: threading.Lock = field(default_factory=threading.Lock)
kbd_queue_cap: int = 256
kbd_drop_count: int = 0
kbd_push_count: int = 0
kbd_pop_count: int = 0
kbd_hotkey_switches: int = 0
log_journal_cap: int = 256
log_journal: Deque[str] = field(default_factory=lambda: collections.deque(maxlen=256))
fs_write_max: int = 16 * 1024 * 1024
# syscall stats
stats_lock: threading.Lock = field(default_factory=threading.Lock)
stats_total_calls: int = 0
stats_id_total: Dict[int, int] = field(default_factory=dict)
stats_recent_window_cap: int = 256
stats_recent_ring: Deque[int] = field(default_factory=lambda: collections.deque(maxlen=256))
# process table
proc_lock: threading.Lock = field(default_factory=threading.Lock)
proc_next_pid: int = 1
proc_current_pid: int = 0
proc_parents: Dict[int, int] = field(default_factory=dict)
proc_status: Dict[int, Optional[int]] = field(default_factory=dict)
proc_state: Dict[int, int] = field(default_factory=dict)
proc_started_tick: Dict[int, int] = field(default_factory=dict)
proc_exited_tick: Dict[int, int] = field(default_factory=dict)
proc_mem_bytes: Dict[int, int] = field(default_factory=dict)
proc_tty_index: Dict[int, int] = field(default_factory=dict)
proc_path: Dict[int, str] = field(default_factory=dict)
proc_argv: Dict[int, List[str]] = field(default_factory=dict)
proc_env: Dict[int, List[str]] = field(default_factory=dict)
proc_last_signal: Dict[int, int] = field(default_factory=dict)
proc_fault_vector: Dict[int, int] = field(default_factory=dict)
proc_fault_error: Dict[int, int] = field(default_factory=dict)
proc_fault_rip: Dict[int, int] = field(default_factory=dict)
def timer_ticks(self) -> int:
return (time.monotonic_ns() - self.start_ns) // 1_000_000
def record_syscall(self, sid: int) -> None:
key = int(u64(sid))
with self.stats_lock:
self.stats_total_calls = int(u64(self.stats_total_calls + 1))
self.stats_id_total[key] = int(u64(self.stats_id_total.get(key, 0) + 1))
self.stats_recent_ring.append(key)
def stats_total(self) -> int:
with self.stats_lock:
return int(self.stats_total_calls)
def stats_id_count(self, sid: int) -> int:
key = int(u64(sid))
with self.stats_lock:
return int(self.stats_id_total.get(key, 0))
def stats_recent_window(self) -> int:
with self.stats_lock:
return len(self.stats_recent_ring)
def stats_recent_id_count(self, sid: int) -> int:
key = int(u64(sid))
with self.stats_lock:
return sum(1 for item in self.stats_recent_ring if item == key)
def push_key(self, key: int) -> None:
with self.kbd_lock:
if len(self.kbd_queue) >= self.kbd_queue_cap:
self.kbd_queue.popleft()
self.kbd_drop_count = u64(self.kbd_drop_count + 1)
self.kbd_queue.append(key & 0xFF)
self.kbd_push_count = u64(self.kbd_push_count + 1)
def pop_key(self) -> Optional[int]:
with self.kbd_lock:
if not self.kbd_queue:
return None
self.kbd_pop_count = u64(self.kbd_pop_count + 1)
return self.kbd_queue.popleft()
def buffered_count(self) -> int:
with self.kbd_lock:
return len(self.kbd_queue)
def log_journal_push(self, text: str) -> None:
if text is None:
return
normalized = text.replace("\r", "")
lines = normalized.split("\n")
for line in lines:
if len(line) > 255:
line = line[:255]
self.log_journal.append(line)
def log_journal_count(self) -> int:
return len(self.log_journal)
def log_journal_read(self, index_from_oldest: int) -> Optional[str]:
if index_from_oldest < 0 or index_from_oldest >= len(self.log_journal):
return None
return list(self.log_journal)[index_from_oldest]
def alloc_pid(self, ppid: int) -> int:
now = self.timer_ticks()
with self.proc_lock:
pid = int(self.proc_next_pid)
if pid == 0:
pid = 1
self.proc_next_pid = int(u64(pid + 1))
if self.proc_next_pid == 0:
self.proc_next_pid = 1
self.proc_parents[pid] = int(ppid)
self.proc_status[pid] = None
self.proc_state[pid] = PROC_STATE_PENDING
self.proc_started_tick[pid] = now
self.proc_exited_tick[pid] = 0
self.proc_mem_bytes[pid] = 0
self.proc_tty_index[pid] = int(self.tty_active)
self.proc_path[pid] = ""
self.proc_argv[pid] = []
self.proc_env[pid] = []
self.proc_last_signal[pid] = 0
self.proc_fault_vector[pid] = 0
self.proc_fault_error[pid] = 0
self.proc_fault_rip[pid] = 0
return pid
def set_current_pid(self, pid: int) -> None:
with self.proc_lock:
self.proc_current_pid = int(pid)
def get_current_pid(self) -> int:
with self.proc_lock:
return int(self.proc_current_pid)
def set_proc_running(self, pid: int, path: Optional[str], tty_index: int) -> None:
if pid <= 0:
return
now = self.timer_ticks()
with self.proc_lock:
if pid not in self.proc_status:
return
self.proc_state[pid] = PROC_STATE_RUNNING
if self.proc_started_tick.get(pid, 0) == 0:
self.proc_started_tick[pid] = now
self.proc_tty_index[pid] = int(tty_index)
if path:
self.proc_path[pid] = str(path)
def mark_exited(self, pid: int, status: int) -> None:
if pid <= 0:
return
with self.proc_lock:
self.proc_status[int(pid)] = int(u64(status))
self.proc_state[int(pid)] = PROC_STATE_EXITED
self.proc_exited_tick[int(pid)] = self.timer_ticks()
def set_proc_stopped(self, pid: int) -> None:
if pid <= 0:
return
with self.proc_lock:
if pid not in self.proc_status:
return
if self.proc_state.get(pid, PROC_STATE_PENDING) != PROC_STATE_EXITED:
self.proc_state[pid] = PROC_STATE_STOPPED
def set_proc_pending(self, pid: int) -> None:
if pid <= 0:
return
with self.proc_lock:
if pid not in self.proc_status:
return
if self.proc_state.get(pid, PROC_STATE_PENDING) != PROC_STATE_EXITED:
self.proc_state[pid] = PROC_STATE_PENDING
def wait_pid(self, pid: int) -> Tuple[int, int]:
with self.proc_lock:
if pid not in self.proc_status:
return int(u64(-1)), 0
status = self.proc_status[pid]
if status is None:
return 0, 0
return 1, int(status)
def set_proc_cmdline(self, pid: int, argv: List[str], env: List[str]) -> None:
if pid <= 0:
return
with self.proc_lock:
if pid not in self.proc_status:
return
self.proc_argv[pid] = [str(item) for item in argv]
self.proc_env[pid] = [str(item) for item in env]
if argv:
self.proc_path[pid] = str(argv[0])
def set_proc_fault(self, pid: int, signal: int, vector: int, error_code: int, rip: int) -> None:
if pid <= 0:
return
with self.proc_lock:
if pid not in self.proc_status:
return
self.proc_last_signal[pid] = int(u64(signal))
self.proc_fault_vector[pid] = int(u64(vector))
self.proc_fault_error[pid] = int(u64(error_code))
self.proc_fault_rip[pid] = int(u64(rip))
def proc_count(self) -> int:
with self.proc_lock:
return len(self.proc_status)
def proc_pid_at(self, index: int) -> Optional[int]:
if index < 0:
return None
with self.proc_lock:
ordered = sorted(self.proc_status.keys())
if index >= len(ordered):
return None
return int(ordered[index])
def proc_state_value(self, pid: int) -> int:
with self.proc_lock:
return int(self.proc_state.get(int(pid), 0))
def proc_ppid(self, pid: int) -> int:
with self.proc_lock:
return int(self.proc_parents.get(int(pid), 0))
def proc_started_tick_value(self, pid: int) -> int:
with self.proc_lock:
return int(self.proc_started_tick.get(int(pid), 0))
def proc_exited_tick_value(self, pid: int) -> int:
with self.proc_lock:
return int(self.proc_exited_tick.get(int(pid), 0))
def proc_exit_status_value(self, pid: int) -> int:
with self.proc_lock:
value = self.proc_status.get(int(pid))
return int(value) if value is not None else 0
def proc_runtime_ticks(self, pid: int) -> int:
with self.proc_lock:
start = int(self.proc_started_tick.get(int(pid), 0))
state = int(self.proc_state.get(int(pid), 0))
end = int(self.proc_exited_tick.get(int(pid), 0))
if start == 0:
return 0
if state == PROC_STATE_EXITED and end >= start:
return end - start
now = self.timer_ticks()
return 0 if now < start else now - start
def proc_mem_bytes_value(self, pid: int) -> int:
with self.proc_lock:
return int(self.proc_mem_bytes.get(int(pid), 0))
def proc_tty_index_value(self, pid: int) -> int:
with self.proc_lock:
return int(self.proc_tty_index.get(int(pid), 0))
def proc_path_value(self, pid: int) -> str:
with self.proc_lock:
return str(self.proc_path.get(int(pid), ""))
def proc_argc(self, pid: int) -> int:
with self.proc_lock:
return len(self.proc_argv.get(int(pid), []))
def proc_argv_item(self, pid: int, index: int) -> Optional[str]:
with self.proc_lock:
values = self.proc_argv.get(int(pid), [])
if index < 0 or index >= len(values):
return None
return values[index]
def proc_envc(self, pid: int) -> int:
with self.proc_lock:
return len(self.proc_env.get(int(pid), []))
def proc_env_item(self, pid: int, index: int) -> Optional[str]:
with self.proc_lock:
values = self.proc_env.get(int(pid), [])
if index < 0 or index >= len(values):
return None
return values[index]
def proc_signal(self, pid: int) -> int:
with self.proc_lock:
return int(self.proc_last_signal.get(int(pid), 0))
def proc_fault_vector_value(self, pid: int) -> int:
with self.proc_lock:
return int(self.proc_fault_vector.get(int(pid), 0))
def proc_fault_error_value(self, pid: int) -> int:
with self.proc_lock:
return int(self.proc_fault_error.get(int(pid), 0))
def proc_fault_rip_value(self, pid: int) -> int:
with self.proc_lock:
return int(self.proc_fault_rip.get(int(pid), 0))

View File

@@ -1,2 +0,0 @@
unicorn>=2.0.1
pygame>=2.5.2