更新Wine+更新menuconfig

This commit is contained in:
2026-04-19 16:07:44 +08:00
parent 863f85ae4f
commit 739be1d7f7
13 changed files with 1642 additions and 229 deletions

View File

@@ -6,40 +6,48 @@ import sys
import time
from dataclasses import dataclass
from pathlib import Path
from typing import List, Optional, Tuple
from typing import Dict, List, Optional, Tuple
from .constants import (
DEFAULT_MAX_EXEC_DEPTH,
FD_INHERIT,
FS_NAME_MAX,
MAX_CSTR,
MAX_IO_READ,
O_APPEND,
O_CREAT,
O_RDONLY,
O_RDWR,
O_TRUNC,
O_WRONLY,
PAGE_SIZE,
PROC_STATE_EXITED,
PROC_STATE_PENDING,
PROC_STATE_RUNNING,
PROC_STATE_STOPPED,
SIGCONT,
SIGKILL,
SIGSTOP,
SIGTERM,
SYS_AUDIO_AVAILABLE,
SYS_AUDIO_PLAY_TONE,
SYS_AUDIO_STOP,
SYS_CONTEXT_SWITCHES,
SYS_CUR_TASK,
SYS_DL_CLOSE,
SYS_DL_OPEN,
SYS_DL_SYM,
SYS_EXEC_PATH,
SYS_EXEC_PATHV,
SYS_EXEC_PATHV_IO,
SYS_EXEC_REQUESTS,
SYS_EXEC_SUCCESS,
SYS_EXIT,
SYS_GETPID,
SYS_PROC_ARGC,
SYS_PROC_ARGV,
SYS_PROC_ENVC,
SYS_PROC_ENV,
SYS_PROC_FAULT_ERROR,
SYS_PROC_FAULT_RIP,
SYS_PROC_FAULT_VECTOR,
SYS_PROC_LAST_SIGNAL,
SYS_SLEEP_TICKS,
SYS_SPAWN_PATH,
SYS_SPAWN_PATHV,
SYS_WAITPID,
SYS_YIELD,
SYS_SHUTDOWN,
SYS_RESTART,
SYS_FD_CLOSE,
SYS_FD_DUP,
SYS_FD_OPEN,
SYS_FD_READ,
SYS_FD_WRITE,
SYS_FS_APPEND,
SYS_FS_CHILD_COUNT,
SYS_FS_GET_CHILD_NAME,
@@ -50,6 +58,7 @@ from .constants import (
SYS_FS_STAT_SIZE,
SYS_FS_STAT_TYPE,
SYS_FS_WRITE,
SYS_GETPID,
SYS_KBD_BUFFERED,
SYS_KBD_DROPPED,
SYS_KBD_GET_CHAR,
@@ -61,8 +70,29 @@ from .constants import (
SYS_LOG_JOURNAL_COUNT,
SYS_LOG_JOURNAL_READ,
SYS_LOG_WRITE,
SYS_PROC_ARGC,
SYS_PROC_ARGV,
SYS_PROC_COUNT,
SYS_PROC_ENVC,
SYS_PROC_ENV,
SYS_PROC_FAULT_ERROR,
SYS_PROC_FAULT_RIP,
SYS_PROC_FAULT_VECTOR,
SYS_PROC_KILL,
SYS_PROC_LAST_SIGNAL,
SYS_PROC_PID_AT,
SYS_PROC_SNAPSHOT,
SYS_RESTART,
SYS_SERVICE_COUNT,
SYS_SERVICE_READY_COUNT,
SYS_SHUTDOWN,
SYS_SLEEP_TICKS,
SYS_SPAWN_PATH,
SYS_SPAWN_PATHV,
SYS_STATS_ID_COUNT,
SYS_STATS_RECENT_ID,
SYS_STATS_RECENT_WINDOW,
SYS_STATS_TOTAL,
SYS_TASK_COUNT,
SYS_TIMER_TICKS,
SYS_TTY_ACTIVE,
@@ -75,6 +105,8 @@ from .constants import (
SYS_USER_LAUNCH_OK,
SYS_USER_LAUNCH_TRIES,
SYS_USER_SHELL_READY,
SYS_WAITPID,
SYS_YIELD,
page_ceil,
page_floor,
u64,
@@ -124,6 +156,15 @@ class ELFImage:
segments: List[ELFSegment]
@dataclass
class FDEntry:
kind: str
flags: int
offset: int = 0
path: str = ""
tty_index: int = 0
EXEC_PATH_MAX = 192
EXEC_ARG_LINE_MAX = 256
EXEC_ENV_LINE_MAX = 512
@@ -131,6 +172,8 @@ EXEC_MAX_ARGS = 24
EXEC_MAX_ENVS = 24
EXEC_ITEM_MAX = 128
EXEC_STATUS_SIGNAL_FLAG = 1 << 63
PROC_PATH_MAX = 192
FD_MAX = 64
class CLeonOSWineNative:
@@ -150,6 +193,7 @@ class CLeonOSWineNative:
ppid: int = 0,
argv_items: Optional[List[str]] = None,
env_items: Optional[List[str]] = None,
inherited_fds: Optional[Dict[int, FDEntry]] = None,
) -> None:
self.elf_path = elf_path
self.rootfs = rootfs
@@ -175,9 +219,97 @@ class CLeonOSWineNative:
self._stack_size = 0x0000000000020000
self._ret_sentinel = 0x00007FFF10000000
self._mapped_ranges: List[Tuple[int, int]] = []
self._tty_index = int(self.state.tty_active)
self._fds: Dict[int, FDEntry] = {}
self._fd_inherited = inherited_fds if inherited_fds is not None else {}
default_path = self._normalize_guest_path(self.guest_path_hint or f"/{self.elf_path.name}")
self.argv_items, self.env_items = self._prepare_exec_items(default_path, self.argv_items, self.env_items)
self._init_default_fds()
@staticmethod
def _clone_fd_entry(entry: FDEntry) -> FDEntry:
return FDEntry(kind=entry.kind, flags=int(entry.flags), offset=int(entry.offset), path=str(entry.path), tty_index=int(entry.tty_index))
@staticmethod
def _fd_access_mode(flags: int) -> int:
return int(flags) & 0x3
@classmethod
def _fd_access_mode_valid(cls, flags: int) -> bool:
mode = cls._fd_access_mode(flags)
return mode in (O_RDONLY, O_WRONLY, O_RDWR)
@classmethod
def _fd_can_read(cls, flags: int) -> bool:
mode = cls._fd_access_mode(flags)
return mode in (O_RDONLY, O_RDWR)
@classmethod
def _fd_can_write(cls, flags: int) -> bool:
mode = cls._fd_access_mode(flags)
return mode in (O_WRONLY, O_RDWR)
def _init_default_fds(self) -> None:
self._fds = {
0: FDEntry(kind="tty", flags=O_RDONLY, offset=0, tty_index=self._tty_index),
1: FDEntry(kind="tty", flags=O_WRONLY, offset=0, tty_index=self._tty_index),
2: FDEntry(kind="tty", flags=O_WRONLY, offset=0, tty_index=self._tty_index),
}
for target in (0, 1, 2):
inherited = self._fd_inherited.get(target)
if inherited is not None:
self._fds[target] = self._clone_fd_entry(inherited)
for target in (0, 1, 2):
entry = self._fds.get(target)
if entry is not None and entry.kind == "tty":
self._tty_index = int(entry.tty_index)
break
def _fd_lookup(self, fd: int) -> Optional[FDEntry]:
if fd < 0 or fd >= FD_MAX:
return None
return self._fds.get(int(fd))
def _fd_find_free(self) -> int:
for fd in range(FD_MAX):
if fd not in self._fds:
return fd
return -1
def _stdio_entry_for_child(self, target_fd: int, override_fd: int, require_read: bool, require_write: bool) -> Optional[FDEntry]:
if override_fd == FD_INHERIT:
src = self._fd_lookup(target_fd)
else:
src = self._fd_lookup(override_fd)
if src is None:
return None
if require_read and not self._fd_can_read(src.flags):
return None
if require_write and not self._fd_can_write(src.flags):
return None
return self._clone_fd_entry(src)
def _build_child_stdio_map(self, stdin_fd: int, stdout_fd: int, stderr_fd: int) -> Optional[Dict[int, FDEntry]]:
child_map: Dict[int, FDEntry] = {}
in_entry = self._stdio_entry_for_child(0, stdin_fd, require_read=True, require_write=False)
out_entry = self._stdio_entry_for_child(1, stdout_fd, require_read=False, require_write=True)
err_entry = self._stdio_entry_for_child(2, stderr_fd, require_read=False, require_write=True)
if in_entry is None or out_entry is None or err_entry is None:
return None
child_map[0] = in_entry
child_map[1] = out_entry
child_map[2] = err_entry
return child_map
def run(self) -> Optional[int]:
if self.pid == 0:
@@ -187,6 +319,7 @@ class CLeonOSWineNative:
self.state.set_current_pid(self.pid)
self.state.set_proc_cmdline(self.pid, self.argv_items, self.env_items)
self.state.set_proc_fault(self.pid, 0, 0, 0, 0)
self.state.set_proc_running(self.pid, self.argv_items[0] if self.argv_items else self.guest_path_hint, self._tty_index)
uc = Uc(UC_ARCH_X86, UC_MODE_64)
self._install_hooks(uc)
@@ -252,6 +385,7 @@ class CLeonOSWineNative:
arg1 = self._reg_read(uc, UC_X86_REG_RCX)
arg2 = self._reg_read(uc, UC_X86_REG_RDX)
self.state.record_syscall(syscall_id)
self.state.context_switches = u64(self.state.context_switches + 1)
ret = self._dispatch_syscall(uc, syscall_id, arg0, arg1, arg2)
self._reg_write(uc, UC_X86_REG_RAX, u64(ret))
@@ -291,6 +425,8 @@ class CLeonOSWineNative:
return self._exec_path(uc, arg0)
if sid == SYS_EXEC_PATHV:
return self._exec_pathv(uc, arg0, arg1, arg2)
if sid == SYS_EXEC_PATHV_IO:
return self._exec_pathv_io(uc, arg0, arg1, arg2)
if sid == SYS_SPAWN_PATH:
return self._spawn_path(uc, arg0)
if sid == SYS_SPAWN_PATHV:
@@ -315,6 +451,14 @@ class CLeonOSWineNative:
return self._proc_fault_error()
if sid == SYS_PROC_FAULT_RIP:
return self._proc_fault_rip()
if sid == SYS_PROC_COUNT:
return self._proc_count()
if sid == SYS_PROC_PID_AT:
return self._proc_pid_at(uc, arg0, arg1)
if sid == SYS_PROC_SNAPSHOT:
return self._proc_snapshot(uc, arg0, arg1, arg2)
if sid == SYS_PROC_KILL:
return self._proc_kill(uc, arg0, arg1)
if sid == SYS_EXIT:
return self._request_exit(uc, arg0)
if sid == SYS_SLEEP_TICKS:
@@ -400,6 +544,30 @@ class CLeonOSWineNative:
return self.state.kbd_drop_count
if sid == SYS_KBD_HOTKEY_SWITCHES:
return self.state.kbd_hotkey_switches
if sid == SYS_STATS_TOTAL:
return self.state.stats_total()
if sid == SYS_STATS_ID_COUNT:
return self.state.stats_id_count(arg0)
if sid == SYS_STATS_RECENT_WINDOW:
return self.state.stats_recent_window()
if sid == SYS_STATS_RECENT_ID:
return self.state.stats_recent_id_count(arg0)
if sid == SYS_FD_OPEN:
return self._fd_open(uc, arg0, arg1, arg2)
if sid == SYS_FD_READ:
return self._fd_read(uc, arg0, arg1, arg2)
if sid == SYS_FD_WRITE:
return self._fd_write(uc, arg0, arg1, arg2)
if sid == SYS_FD_CLOSE:
return self._fd_close(arg0)
if sid == SYS_FD_DUP:
return self._fd_dup(arg0)
if sid == SYS_DL_OPEN:
return u64_neg1()
if sid == SYS_DL_CLOSE:
return 0
if sid == SYS_DL_SYM:
return 0
return u64_neg1()
@@ -409,6 +577,15 @@ class CLeonOSWineNative:
sys.stdout.write(text)
sys.stdout.flush()
def _host_write_bytes(self, data: bytes) -> None:
if not data:
return
if hasattr(sys.stdout, "buffer"):
sys.stdout.buffer.write(data)
sys.stdout.flush()
return
self._host_write(data.decode("utf-8", errors="replace"))
def _load_segments(self, uc: Uc) -> None:
for seg in self.image.segments:
start = page_floor(seg.vaddr)
@@ -504,6 +681,30 @@ class CLeonOSWineNative:
except UcError:
return b""
def _read_guest_bytes_exact(self, uc: Uc, addr: int, size: int) -> Optional[bytes]:
if size < 0 or addr == 0:
return None
if size == 0:
return b""
out = bytearray()
cursor = int(addr)
left = int(size)
while left > 0:
chunk = min(left, MAX_IO_READ)
try:
data = uc.mem_read(cursor, chunk)
except UcError:
return None
if len(data) != chunk:
return None
out.extend(data)
cursor += chunk
left -= chunk
return bytes(out)
def _write_guest_bytes(self, uc: Uc, addr: int, data: bytes) -> bool:
if addr == 0:
return False
@@ -743,16 +944,86 @@ class CLeonOSWineNative:
return 1 if self._write_guest_bytes(uc, out_ptr, encoded + b"\x00") else 0
def _exec_path(self, uc: Uc, path_ptr: int) -> int:
return self._spawn_path_common(uc, path_ptr, 0, 0, return_pid=False)
return self._spawn_path_common(
uc,
path_ptr,
0,
0,
return_pid=False,
env_line_override=None,
stdin_fd=FD_INHERIT,
stdout_fd=FD_INHERIT,
stderr_fd=FD_INHERIT,
)
def _exec_pathv(self, uc: Uc, path_ptr: int, argv_ptr: int, env_ptr: int) -> int:
return self._spawn_path_common(uc, path_ptr, argv_ptr, env_ptr, return_pid=False)
return self._spawn_path_common(
uc,
path_ptr,
argv_ptr,
env_ptr,
return_pid=False,
env_line_override=None,
stdin_fd=FD_INHERIT,
stdout_fd=FD_INHERIT,
stderr_fd=FD_INHERIT,
)
def _exec_pathv_io(self, uc: Uc, path_ptr: int, argv_ptr: int, req_ptr: int) -> int:
req_data: Optional[bytes]
env_ptr: int
stdin_fd: int
stdout_fd: int
stderr_fd: int
env_line: str
if req_ptr == 0:
return u64_neg1()
req_data = self._read_guest_bytes_exact(uc, req_ptr, 32)
if req_data is None or len(req_data) != 32:
return u64_neg1()
env_ptr, stdin_fd, stdout_fd, stderr_fd = struct.unpack("<QQQQ", req_data)
env_line = self._read_guest_cstring(uc, env_ptr, EXEC_ENV_LINE_MAX) if env_ptr != 0 else ""
return self._spawn_path_common(
uc,
path_ptr,
argv_ptr,
0,
return_pid=False,
env_line_override=env_line,
stdin_fd=stdin_fd,
stdout_fd=stdout_fd,
stderr_fd=stderr_fd,
)
def _spawn_path(self, uc: Uc, path_ptr: int) -> int:
return self._spawn_path_common(uc, path_ptr, 0, 0, return_pid=True)
return self._spawn_path_common(
uc,
path_ptr,
0,
0,
return_pid=True,
env_line_override=None,
stdin_fd=FD_INHERIT,
stdout_fd=FD_INHERIT,
stderr_fd=FD_INHERIT,
)
def _spawn_pathv(self, uc: Uc, path_ptr: int, argv_ptr: int, env_ptr: int) -> int:
return self._spawn_path_common(uc, path_ptr, argv_ptr, env_ptr, return_pid=True)
return self._spawn_path_common(
uc,
path_ptr,
argv_ptr,
env_ptr,
return_pid=True,
env_line_override=None,
stdin_fd=FD_INHERIT,
stdout_fd=FD_INHERIT,
stderr_fd=FD_INHERIT,
)
def _spawn_path_common(
self,
@@ -762,17 +1033,30 @@ class CLeonOSWineNative:
env_ptr: int,
*,
return_pid: bool,
env_line_override: Optional[str],
stdin_fd: int,
stdout_fd: int,
stderr_fd: int,
) -> int:
path = self._read_guest_cstring(uc, path_ptr, EXEC_PATH_MAX)
guest_path = self._normalize_guest_path(path)
argv_line = self._read_guest_cstring(uc, argv_ptr, EXEC_ARG_LINE_MAX) if argv_ptr != 0 else ""
env_line = self._read_guest_cstring(uc, env_ptr, EXEC_ENV_LINE_MAX) if env_ptr != 0 else ""
env_line = (
env_line_override
if env_line_override is not None
else (self._read_guest_cstring(uc, env_ptr, EXEC_ENV_LINE_MAX) if env_ptr != 0 else "")
)
host_path = self._guest_to_host(guest_path, must_exist=True)
child_stdio = self._build_child_stdio_map(int(stdin_fd), int(stdout_fd), int(stderr_fd))
self.state.exec_requests = u64(self.state.exec_requests + 1)
self.state.user_exec_requested = 1
self.state.user_launch_tries = u64(self.state.user_launch_tries + 1)
if child_stdio is None:
self.state.user_launch_fail = u64(self.state.user_launch_fail + 1)
return u64_neg1()
if host_path is None or not host_path.is_file():
self.state.user_launch_fail = u64(self.state.user_launch_fail + 1)
return u64_neg1()
@@ -802,6 +1086,7 @@ class CLeonOSWineNative:
ppid=parent_pid,
argv_items=argv_items,
env_items=env_items,
inherited_fds=child_stdio,
)
child_ret = child.run()
@@ -854,6 +1139,97 @@ class CLeonOSWineNative:
def _proc_fault_rip(self) -> int:
return self.state.proc_fault_rip_value(self.state.get_current_pid())
def _proc_count(self) -> int:
return self.state.proc_count()
def _proc_pid_at(self, uc: Uc, index: int, out_ptr: int) -> int:
if out_ptr == 0:
return 0
pid = self.state.proc_pid_at(int(index))
if pid is None:
return 0
return 1 if self._write_guest_bytes(uc, out_ptr, struct.pack("<Q", u64(pid))) else 0
def _proc_snapshot(self, uc: Uc, pid: int, out_ptr: int, out_size: int) -> int:
if out_ptr == 0 or out_size < (13 * 8 + PROC_PATH_MAX):
return 0
target = int(pid)
state_value = self.state.proc_state_value(target)
if state_value == 0:
return 0
path = self.state.proc_path_value(target)
encoded_path = path.encode("utf-8", errors="replace")
if len(encoded_path) >= PROC_PATH_MAX:
encoded_path = encoded_path[: PROC_PATH_MAX - 1]
path_buf = encoded_path + b"\x00" + (b"\x00" * (PROC_PATH_MAX - len(encoded_path) - 1))
blob = struct.pack(
"<13Q",
u64(target),
u64(self.state.proc_ppid(target)),
u64(state_value),
u64(self.state.proc_started_tick_value(target)),
u64(self.state.proc_exited_tick_value(target)),
u64(self.state.proc_exit_status_value(target)),
u64(self.state.proc_runtime_ticks(target)),
u64(self.state.proc_mem_bytes_value(target)),
u64(self.state.proc_tty_index_value(target)),
u64(self.state.proc_signal(target)),
u64(self.state.proc_fault_vector_value(target)),
u64(self.state.proc_fault_error_value(target)),
u64(self.state.proc_fault_rip_value(target)),
) + path_buf
return 1 if self._write_guest_bytes(uc, out_ptr, blob) else 0
def _proc_kill(self, uc: Uc, pid: int, signal: int) -> int:
target = int(pid)
if target <= 0:
return u64_neg1()
current_state = self.state.proc_state_value(target)
if current_state == 0:
return u64_neg1()
effective_signal = int(signal) & 0xFF
if effective_signal == 0:
effective_signal = SIGTERM
self.state.set_proc_fault(target, effective_signal, 0, 0, 0)
if current_state == PROC_STATE_EXITED:
return 1
if effective_signal == SIGCONT:
if current_state == PROC_STATE_STOPPED:
self.state.set_proc_pending(target)
return 1
if effective_signal == SIGSTOP and current_state in (PROC_STATE_PENDING, PROC_STATE_STOPPED):
self.state.set_proc_stopped(target)
return 1
status = self._encode_signal_status(effective_signal, 0, 0)
if target == self.state.get_current_pid():
self._exit_requested = True
self._exit_status = u64(status)
if effective_signal == SIGSTOP:
self.state.set_proc_stopped(target)
uc.emu_stop()
return 1
if effective_signal == SIGSTOP:
self.state.set_proc_stopped(target)
return 1
self.state.mark_exited(target, status)
return 1
def _wait_pid(self, uc: Uc, pid: int, out_ptr: int) -> int:
wait_ret, status = self.state.wait_pid(int(pid))
@@ -885,6 +1261,190 @@ class CLeonOSWineNative:
time.sleep(0)
return self.state.timer_ticks()
def _fd_open(self, uc: Uc, path_ptr: int, flags: int, mode: int) -> int:
_ = mode
guest_path = self._normalize_guest_path(self._read_guest_cstring(uc, path_ptr, EXEC_PATH_MAX))
open_flags = int(u64(flags))
lower_path = guest_path.lower()
fd_slot: int
entry: FDEntry
if not guest_path.startswith("/"):
return u64_neg1()
if not self._fd_access_mode_valid(open_flags):
return u64_neg1()
if ((open_flags & O_TRUNC) != 0 or (open_flags & O_APPEND) != 0) and not self._fd_can_write(open_flags):
return u64_neg1()
fd_slot = self._fd_find_free()
if fd_slot < 0:
return u64_neg1()
if lower_path == "/dev/tty":
entry = FDEntry(kind="tty", flags=open_flags, offset=0, tty_index=self._tty_index)
self._fds[fd_slot] = entry
return fd_slot
if lower_path == "/dev/null":
entry = FDEntry(kind="dev_null", flags=open_flags, offset=0, tty_index=self._tty_index)
self._fds[fd_slot] = entry
return fd_slot
if lower_path == "/dev/zero":
entry = FDEntry(kind="dev_zero", flags=open_flags, offset=0, tty_index=self._tty_index)
self._fds[fd_slot] = entry
return fd_slot
if lower_path == "/dev/random":
entry = FDEntry(kind="dev_random", flags=open_flags, offset=0, tty_index=self._tty_index)
self._fds[fd_slot] = entry
return fd_slot
host_path = self._guest_to_host(guest_path, must_exist=False)
if host_path is None:
return u64_neg1()
try:
if not host_path.exists():
if (open_flags & O_CREAT) == 0 or not self._fd_can_write(open_flags):
return u64_neg1()
host_path.parent.mkdir(parents=True, exist_ok=True)
host_path.write_bytes(b"")
if host_path.is_dir():
return u64_neg1()
if (open_flags & O_TRUNC) != 0:
host_path.write_bytes(b"")
offset = int(host_path.stat().st_size) if (open_flags & O_APPEND) != 0 else 0
except Exception:
return u64_neg1()
entry = FDEntry(kind="file", flags=open_flags, offset=offset, path=str(host_path), tty_index=self._tty_index)
self._fds[fd_slot] = entry
return fd_slot
def _fd_read(self, uc: Uc, fd: int, out_ptr: int, size: int) -> int:
req = int(u64(size))
entry = self._fd_lookup(int(fd))
data: bytes
if req == 0:
return 0
if out_ptr == 0:
return u64_neg1()
if entry is None or not self._fd_can_read(entry.flags):
return u64_neg1()
req = min(req, MAX_IO_READ)
if entry.kind == "tty":
out = bytearray()
while len(out) < req:
key = self.state.pop_key()
if key is None:
break
out.append(key & 0xFF)
data = bytes(out)
elif entry.kind == "dev_null":
return 0
elif entry.kind == "dev_zero":
data = b"\x00" * req
elif entry.kind == "dev_random":
data = os.urandom(req)
elif entry.kind == "file":
try:
with open(entry.path, "rb") as fh:
fh.seek(entry.offset)
data = fh.read(req)
except Exception:
return u64_neg1()
else:
return u64_neg1()
if len(data) == 0:
return 0
if not self._write_guest_bytes(uc, int(out_ptr), data):
return u64_neg1()
entry.offset += len(data)
return len(data)
def _fd_write(self, uc: Uc, fd: int, buf_ptr: int, size: int) -> int:
req = int(u64(size))
entry = self._fd_lookup(int(fd))
data: Optional[bytes]
write_pos: int
if req == 0:
return 0
if buf_ptr == 0:
return u64_neg1()
if entry is None or not self._fd_can_write(entry.flags):
return u64_neg1()
req = min(req, MAX_IO_READ)
data = self._read_guest_bytes_exact(uc, int(buf_ptr), req)
if data is None:
return u64_neg1()
if entry.kind == "tty":
self._host_write_bytes(data)
entry.offset += len(data)
return len(data)
if entry.kind in ("dev_null", "dev_zero", "dev_random"):
entry.offset += len(data)
return len(data)
if entry.kind != "file":
return u64_neg1()
try:
host_path = Path(entry.path)
if not host_path.exists():
if (entry.flags & O_CREAT) == 0 or not self._fd_can_write(entry.flags):
return u64_neg1()
host_path.parent.mkdir(parents=True, exist_ok=True)
host_path.write_bytes(b"")
with open(host_path, "r+b") as fh:
write_pos = int(entry.offset)
fh.seek(write_pos)
fh.write(data)
except Exception:
return u64_neg1()
entry.offset += len(data)
return len(data)
def _fd_close(self, fd: int) -> int:
key = int(fd)
if key not in self._fds:
return u64_neg1()
del self._fds[key]
return 0
def _fd_dup(self, fd: int) -> int:
src = self._fd_lookup(int(fd))
if src is None:
return u64_neg1()
slot = self._fd_find_free()
if slot < 0:
return u64_neg1()
self._fds[slot] = self._clone_fd_entry(src)
return slot
@staticmethod
def _truncate_item_text(text: str, max_bytes: int = EXEC_ITEM_MAX) -> str:
if max_bytes <= 1: