This commit is contained in:
2026-04-16 22:29:08 +08:00
parent 2d843a100e
commit 589a83cb07
29 changed files with 1397 additions and 49 deletions

View File

@@ -14,15 +14,28 @@ from .constants import (
MAX_CSTR,
MAX_IO_READ,
PAGE_SIZE,
SYS_AUDIO_AVAILABLE,
SYS_AUDIO_PLAY_TONE,
SYS_AUDIO_STOP,
SYS_CONTEXT_SWITCHES,
SYS_CUR_TASK,
SYS_EXEC_PATH,
SYS_EXEC_PATHV,
SYS_EXEC_REQUESTS,
SYS_EXEC_SUCCESS,
SYS_EXIT,
SYS_GETPID,
SYS_PROC_ARGC,
SYS_PROC_ARGV,
SYS_PROC_ENVC,
SYS_PROC_ENV,
SYS_PROC_FAULT_ERROR,
SYS_PROC_FAULT_RIP,
SYS_PROC_FAULT_VECTOR,
SYS_PROC_LAST_SIGNAL,
SYS_SLEEP_TICKS,
SYS_SPAWN_PATH,
SYS_SPAWN_PATHV,
SYS_WAITPID,
SYS_YIELD,
SYS_SHUTDOWN,
@@ -72,6 +85,13 @@ from .platform import (
Uc,
UcError,
UC_ARCH_X86,
UC_ERR_FETCH_PROT,
UC_ERR_FETCH_UNMAPPED,
UC_ERR_INSN_INVALID,
UC_ERR_READ_PROT,
UC_ERR_READ_UNMAPPED,
UC_ERR_WRITE_PROT,
UC_ERR_WRITE_UNMAPPED,
UC_HOOK_CODE,
UC_HOOK_INTR,
UC_MODE_64,
@@ -85,6 +105,7 @@ from .platform import (
UC_X86_REG_RCX,
UC_X86_REG_RDX,
UC_X86_REG_RSP,
UC_X86_REG_RIP,
)
from .state import SharedKernelState
@@ -103,6 +124,15 @@ class ELFImage:
segments: List[ELFSegment]
EXEC_PATH_MAX = 192
EXEC_ARG_LINE_MAX = 256
EXEC_ENV_LINE_MAX = 512
EXEC_MAX_ARGS = 24
EXEC_MAX_ENVS = 24
EXEC_ITEM_MAX = 128
EXEC_STATUS_SIGNAL_FLAG = 1 << 63
class CLeonOSWineNative:
def __init__(
self,
@@ -118,6 +148,8 @@ class CLeonOSWineNative:
top_level: bool = True,
pid: int = 0,
ppid: int = 0,
argv_items: Optional[List[str]] = None,
env_items: Optional[List[str]] = None,
) -> None:
self.elf_path = elf_path
self.rootfs = rootfs
@@ -130,6 +162,8 @@ class CLeonOSWineNative:
self.top_level = top_level
self.pid = int(pid)
self.ppid = int(ppid)
self.argv_items = list(argv_items) if argv_items is not None else []
self.env_items = list(env_items) if env_items is not None else []
self._exit_requested = False
self._exit_status = 0
@@ -142,12 +176,17 @@ class CLeonOSWineNative:
self._ret_sentinel = 0x00007FFF10000000
self._mapped_ranges: List[Tuple[int, int]] = []
default_path = self._normalize_guest_path(self.guest_path_hint or f"/{self.elf_path.name}")
self.argv_items, self.env_items = self._prepare_exec_items(default_path, self.argv_items, self.env_items)
def run(self) -> Optional[int]:
if self.pid == 0:
self.pid = self.state.alloc_pid(self.ppid)
prev_pid = self.state.get_current_pid()
self.state.set_current_pid(self.pid)
self.state.set_proc_cmdline(self.pid, self.argv_items, self.env_items)
self.state.set_proc_fault(self.pid, 0, 0, 0, 0)
uc = Uc(UC_ARCH_X86, UC_MODE_64)
self._install_hooks(uc)
@@ -158,27 +197,31 @@ class CLeonOSWineNative:
self._input_pump = InputPump(self.state)
self._input_pump.start()
run_failed = False
interrupted = False
runtime_fault_status: Optional[int] = None
try:
uc.emu_start(self.image.entry, 0)
except KeyboardInterrupt:
run_failed = True
interrupted = True
if self.top_level:
print("\n[WINE] interrupted by user", file=sys.stderr)
except UcError as exc:
run_failed = True
runtime_fault_status = self._status_from_uc_error(uc, exc)
if self.verbose or self.top_level:
print(f"[WINE][ERROR] runtime crashed: {exc}", file=sys.stderr)
finally:
if self.top_level and self._input_pump is not None:
self._input_pump.stop()
if run_failed:
if interrupted:
self.state.mark_exited(self.pid, u64_neg1())
self.state.set_current_pid(prev_pid)
return None
if runtime_fault_status is not None:
self.exit_code = runtime_fault_status
if self.exit_code is None:
self.exit_code = self._reg_read(uc, UC_X86_REG_RAX)
@@ -246,18 +289,44 @@ class CLeonOSWineNative:
return self._fs_read(uc, arg0, arg1, arg2)
if sid == SYS_EXEC_PATH:
return self._exec_path(uc, arg0)
if sid == SYS_EXEC_PATHV:
return self._exec_pathv(uc, arg0, arg1, arg2)
if sid == SYS_SPAWN_PATH:
return self._spawn_path(uc, arg0)
if sid == SYS_SPAWN_PATHV:
return self._spawn_pathv(uc, arg0, arg1, arg2)
if sid == SYS_WAITPID:
return self._wait_pid(uc, arg0, arg1)
if sid == SYS_GETPID:
return self.state.get_current_pid()
if sid == SYS_PROC_ARGC:
return self._proc_argc()
if sid == SYS_PROC_ARGV:
return self._proc_argv(uc, arg0, arg1, arg2)
if sid == SYS_PROC_ENVC:
return self._proc_envc()
if sid == SYS_PROC_ENV:
return self._proc_env(uc, arg0, arg1, arg2)
if sid == SYS_PROC_LAST_SIGNAL:
return self._proc_last_signal()
if sid == SYS_PROC_FAULT_VECTOR:
return self._proc_fault_vector()
if sid == SYS_PROC_FAULT_ERROR:
return self._proc_fault_error()
if sid == SYS_PROC_FAULT_RIP:
return self._proc_fault_rip()
if sid == SYS_EXIT:
return self._request_exit(uc, arg0)
if sid == SYS_SLEEP_TICKS:
return self._sleep_ticks(arg0)
if sid == SYS_YIELD:
return self._yield_once()
if sid == SYS_AUDIO_AVAILABLE:
return 0
if sid == SYS_AUDIO_PLAY_TONE:
return 0
if sid == SYS_AUDIO_STOP:
return 1
if sid == SYS_SHUTDOWN:
self._host_write("\n[WINE] shutdown requested by guest\n")
self._exit_requested = True
@@ -674,14 +743,30 @@ class CLeonOSWineNative:
return 1 if self._write_guest_bytes(uc, out_ptr, encoded + b"\x00") else 0
def _exec_path(self, uc: Uc, path_ptr: int) -> int:
return self._spawn_path_common(uc, path_ptr, return_pid=False)
return self._spawn_path_common(uc, path_ptr, 0, 0, return_pid=False)
def _exec_pathv(self, uc: Uc, path_ptr: int, argv_ptr: int, env_ptr: int) -> int:
return self._spawn_path_common(uc, path_ptr, argv_ptr, env_ptr, return_pid=False)
def _spawn_path(self, uc: Uc, path_ptr: int) -> int:
return self._spawn_path_common(uc, path_ptr, return_pid=True)
return self._spawn_path_common(uc, path_ptr, 0, 0, return_pid=True)
def _spawn_path_common(self, uc: Uc, path_ptr: int, *, return_pid: bool) -> int:
path = self._read_guest_cstring(uc, path_ptr)
def _spawn_pathv(self, uc: Uc, path_ptr: int, argv_ptr: int, env_ptr: int) -> int:
return self._spawn_path_common(uc, path_ptr, argv_ptr, env_ptr, return_pid=True)
def _spawn_path_common(
self,
uc: Uc,
path_ptr: int,
argv_ptr: int,
env_ptr: int,
*,
return_pid: bool,
) -> int:
path = self._read_guest_cstring(uc, path_ptr, EXEC_PATH_MAX)
guest_path = self._normalize_guest_path(path)
argv_line = self._read_guest_cstring(uc, argv_ptr, EXEC_ARG_LINE_MAX) if argv_ptr != 0 else ""
env_line = self._read_guest_cstring(uc, env_ptr, EXEC_ENV_LINE_MAX) if env_ptr != 0 else ""
host_path = self._guest_to_host(guest_path, must_exist=True)
self.state.exec_requests = u64(self.state.exec_requests + 1)
@@ -699,6 +784,9 @@ class CLeonOSWineNative:
parent_pid = self.state.get_current_pid()
child_pid = self.state.alloc_pid(parent_pid)
argv_items, env_items = self._build_child_exec_items(guest_path, argv_line, env_line)
self.state.set_proc_cmdline(child_pid, argv_items, env_items)
self.state.set_proc_fault(child_pid, 0, 0, 0, 0)
child = CLeonOSWineNative(
elf_path=host_path,
@@ -712,6 +800,8 @@ class CLeonOSWineNative:
top_level=False,
pid=child_pid,
ppid=parent_pid,
argv_items=argv_items,
env_items=env_items,
)
child_ret = child.run()
@@ -728,7 +818,41 @@ class CLeonOSWineNative:
if return_pid:
return child_pid
return 0
return u64(child_ret)
def _proc_argc(self) -> int:
return self.state.proc_argc(self.state.get_current_pid())
def _proc_argv(self, uc: Uc, index: int, out_ptr: int, out_size: int) -> int:
return self._copy_proc_item_to_guest(
uc,
self.state.proc_argv_item(self.state.get_current_pid(), int(index)),
out_ptr,
out_size,
)
def _proc_envc(self) -> int:
return self.state.proc_envc(self.state.get_current_pid())
def _proc_env(self, uc: Uc, index: int, out_ptr: int, out_size: int) -> int:
return self._copy_proc_item_to_guest(
uc,
self.state.proc_env_item(self.state.get_current_pid(), int(index)),
out_ptr,
out_size,
)
def _proc_last_signal(self) -> int:
return self.state.proc_signal(self.state.get_current_pid())
def _proc_fault_vector(self) -> int:
return self.state.proc_fault_vector_value(self.state.get_current_pid())
def _proc_fault_error(self) -> int:
return self.state.proc_fault_error_value(self.state.get_current_pid())
def _proc_fault_rip(self) -> int:
return self.state.proc_fault_rip_value(self.state.get_current_pid())
def _wait_pid(self, uc: Uc, pid: int, out_ptr: int) -> int:
wait_ret, status = self.state.wait_pid(int(pid))
@@ -761,6 +885,157 @@ class CLeonOSWineNative:
time.sleep(0)
return self.state.timer_ticks()
@staticmethod
def _truncate_item_text(text: str, max_bytes: int = EXEC_ITEM_MAX) -> str:
if max_bytes <= 1:
return ""
encoded = (text or "").encode("utf-8", errors="replace")
if len(encoded) >= max_bytes:
encoded = encoded[: max_bytes - 1]
return encoded.decode("utf-8", errors="ignore")
@staticmethod
def _parse_whitespace_items(line: str, max_count: int) -> List[str]:
out: List[str] = []
i = 0
text = line or ""
length = len(text)
while i < length:
while i < length and text[i] in (" ", "\t", "\r", "\n"):
i += 1
if i >= length:
break
start = i
while i < length and text[i] not in (" ", "\t", "\r", "\n"):
i += 1
out.append(text[start:i])
if len(out) >= max_count:
break
return out
@staticmethod
def _parse_env_items(line: str, max_count: int) -> List[str]:
out: List[str] = []
i = 0
text = line or ""
length = len(text)
while i < length:
while i < length and text[i] in (" ", "\t", ";", "\r", "\n"):
i += 1
if i >= length:
break
start = i
while i < length and text[i] not in (";", "\r", "\n"):
i += 1
value = text[start:i].rstrip(" \t")
if value:
out.append(value)
if len(out) >= max_count:
break
return out
@classmethod
def _prepare_exec_items(cls, path: str, argv_items: List[str], env_items: List[str]) -> Tuple[List[str], List[str]]:
normalized_path = cls._normalize_guest_path(path)
prepared_argv: List[str] = []
for item in argv_items[:EXEC_MAX_ARGS]:
value = cls._truncate_item_text(item)
if value:
prepared_argv.append(value)
if not prepared_argv:
prepared_argv = [cls._truncate_item_text(normalized_path)]
elif prepared_argv[0] == "":
prepared_argv[0] = cls._truncate_item_text(normalized_path)
prepared_env: List[str] = []
for item in env_items[:EXEC_MAX_ENVS]:
value = cls._truncate_item_text(item)
if value:
prepared_env.append(value)
return prepared_argv[:EXEC_MAX_ARGS], prepared_env[:EXEC_MAX_ENVS]
@classmethod
def _build_child_exec_items(cls, guest_path: str, argv_line: str, env_line: str) -> Tuple[List[str], List[str]]:
argv_items = [guest_path]
argv_items.extend(cls._parse_whitespace_items(argv_line or "", EXEC_MAX_ARGS - 1))
env_items = cls._parse_env_items(env_line or "", EXEC_MAX_ENVS)
return cls._prepare_exec_items(guest_path, argv_items, env_items)
def _copy_proc_item_to_guest(self, uc: Uc, item: Optional[str], out_ptr: int, out_size: int) -> int:
if out_ptr == 0 or out_size == 0 or item is None:
return 0
max_out = int(min(int(out_size), EXEC_ITEM_MAX))
if max_out <= 0:
return 0
encoded = self._truncate_item_text(item, max_out + 1).encode("utf-8", errors="replace")
if len(encoded) >= max_out:
encoded = encoded[: max_out - 1]
return 1 if self._write_guest_bytes(uc, out_ptr, encoded + b"\x00") else 0
@staticmethod
def _signal_from_vector(vector: int) -> int:
if vector in (0, 16, 19):
return 8
if vector == 6:
return 4
if vector == 3:
return 5
if vector in (10, 11, 12, 13, 14, 17):
return 11
return 6
@staticmethod
def _encode_signal_status(signal: int, vector: int, error_code: int) -> int:
return u64(
EXEC_STATUS_SIGNAL_FLAG
| (int(signal) & 0xFF)
| ((int(vector) & 0xFF) << 8)
| ((int(error_code) & 0xFFFF) << 16)
)
def _status_from_uc_error(self, uc: Uc, exc: UcError) -> int:
errno = int(getattr(exc, "errno", 0))
vector = 13
error_code = 0
if errno in (UC_ERR_READ_UNMAPPED, UC_ERR_WRITE_UNMAPPED, UC_ERR_FETCH_UNMAPPED):
vector = 14
if errno == UC_ERR_WRITE_UNMAPPED:
error_code = 0x2
elif errno == UC_ERR_FETCH_UNMAPPED:
error_code = 0x10
elif errno in (UC_ERR_READ_PROT, UC_ERR_WRITE_PROT, UC_ERR_FETCH_PROT):
vector = 14
error_code = 0x1
if errno == UC_ERR_WRITE_PROT:
error_code |= 0x2
elif errno == UC_ERR_FETCH_PROT:
error_code |= 0x10
elif errno == UC_ERR_INSN_INVALID:
vector = 6
error_code = 0
rip = 0
try:
rip = self._reg_read(uc, UC_X86_REG_RIP)
except Exception:
rip = 0
signal = self._signal_from_vector(vector)
self.state.set_proc_fault(self.pid, signal, vector, error_code, rip)
return self._encode_signal_status(signal, vector, error_code)
def _guest_to_host(self, guest_path: str, *, must_exist: bool) -> Optional[Path]:
norm = self._normalize_guest_path(guest_path)
if norm == "/":