Merge pull request #117 from someoneidoknow/main

support progress bars in `gurt.download` while using GURT://
This commit is contained in:
Face
2025-09-28 19:30:59 +03:00
committed by GitHub
10 changed files with 446 additions and 91 deletions

View File

@@ -85,7 +85,7 @@ func _start_download(download_id: String, url: String, save_path: String, downlo
}
if url.begins_with("gurt://"):
_download_gurt_resource(download_id, url)
_start_gurt_download(download_id, url)
else:
_start_http_download(download_id, url)
@@ -122,56 +122,108 @@ func _start_http_download(download_id: String, url: String):
var timer = Timer.new()
timer.name = "ProgressTimer_" + download_id
timer.wait_time = 0.5
timer.wait_time = 0.2
timer.timeout.connect(func(): _update_download_progress(download_id))
main_node.add_child(timer)
timer.start()
func _download_gurt_resource(download_id: String, url: String):
func _start_gurt_download(download_id: String, url: String):
if not active_downloads.has(download_id):
return
var progress_ui = active_downloads[download_id]["progress_ui"]
var save_path = active_downloads[download_id]["save_path"]
var client = GurtProtocolClient.new()
for ca in CertificateManager.trusted_ca_certificates:
client.add_ca_certificate(ca)
if not client.create_client_with_dns(30, GurtProtocol.DNS_SERVER_IP, GurtProtocol.DNS_SERVER_PORT):
if progress_ui:
progress_ui.update_progress(0, 0, -1) # -1 indicates unknown total size
progress_ui.set_error("Failed to create GURT client")
active_downloads.erase(download_id)
return
var resource_data = await Network.fetch_gurt_resource(url, true)
active_downloads[download_id]["gurt_client"] = client
client.download_started.connect(_on_gurt_download_started)
client.download_progress.connect(_on_gurt_download_progress)
client.download_completed.connect(_on_gurt_download_completed)
client.download_failed.connect(_on_gurt_download_failed)
client.start_download(download_id, url, save_path)
var poll_timer = Timer.new()
poll_timer.wait_time = 0.2
poll_timer.one_shot = false
poll_timer.name = "GurtPoll_" + download_id
poll_timer.timeout.connect(func():
if not active_downloads.has(download_id):
poll_timer.queue_free()
return
var c = active_downloads[download_id].get("gurt_client", null)
if c:
c.poll_events()
else:
poll_timer.queue_free()
)
main_node.add_child(poll_timer)
poll_timer.start()
func _on_gurt_download_started(download_id: String, total_bytes: int):
if not active_downloads.has(download_id):
return
var info = active_downloads[download_id]
info.total_bytes = max(total_bytes, 0)
info.downloaded_bytes = 0
var ui = info.progress_ui
if ui:
ui.update_progress(0.0, 0, info.total_bytes)
if resource_data.is_empty():
var error_msg = "Failed to fetch gurt:// resource"
print(error_msg)
if progress_ui:
progress_ui.set_error(error_msg)
active_downloads.erase(download_id)
func _on_gurt_download_progress(download_id: String, downloaded_bytes: int, total_bytes: int):
if not active_downloads.has(download_id):
return
var info = active_downloads[download_id]
if total_bytes > 0:
info.total_bytes = total_bytes
info.downloaded_bytes = downloaded_bytes
var total = info.total_bytes
var p = 0.0
if total > 0:
p = float(downloaded_bytes) / float(total) * 100.0
var ui = info.progress_ui
if ui:
ui.update_progress(p, downloaded_bytes, total)
var file = FileAccess.open(save_path, FileAccess.WRITE)
if not file:
var error_msg = "Failed to create download file: " + save_path
print(error_msg)
if progress_ui:
progress_ui.set_error(error_msg)
active_downloads.erase(download_id)
func _on_gurt_download_completed(download_id: String, save_path: String):
if not active_downloads.has(download_id):
return
var info = active_downloads[download_id]
var path = save_path if not save_path.is_empty() else info.save_path
var size = 0
if FileAccess.file_exists(path):
var f = FileAccess.open(path, FileAccess.READ)
if f:
size = f.get_length()
f.close()
info.total_bytes = size
info.downloaded_bytes = size
var ui = info.progress_ui
if ui:
ui.set_completed(path)
_add_to_download_history(info, size, path)
active_downloads.erase(download_id)
file.store_buffer(resource_data)
file.close()
var file_size = resource_data.size()
active_downloads[download_id]["total_bytes"] = file_size
active_downloads[download_id]["downloaded_bytes"] = file_size
if progress_ui:
progress_ui.set_completed(save_path)
_add_to_download_history(active_downloads[download_id], file_size, save_path)
func _on_gurt_download_failed(download_id: String, message: String):
if not active_downloads.has(download_id):
return
var info = active_downloads[download_id]
var ui = info.progress_ui
if ui:
ui.set_error(message)
var path = info.save_path
if FileAccess.file_exists(path):
DirAccess.remove_absolute(path)
active_downloads.erase(download_id)
func _update_download_progress(download_id: String):
@@ -240,6 +292,10 @@ func _on_download_progress_cancelled(download_id: String):
return
var download_info = active_downloads[download_id]
if download_info.has("gurt_client"):
var c = download_info["gurt_client"]
c.cancel_download(download_id)
return
var http_request = download_info.get("http_request", null)
if http_request:

View File

@@ -211,8 +211,10 @@ static func format_bytes(given_size: int) -> String:
return str(given_size) + " B"
elif given_size < 1024 * 1024:
return str(given_size / 1024) + " KB"
elif given_size < 1024 * 1024 * 1024:
return "%.1f MB" % (given_size / (1024.0 * 1024.0))
else:
return str(given_size / (1024.0 * 1024)) + " MB"
return "%.2f GB" % (given_size / (1024.0 * 1024.0 * 1024.0))
func get_time_display() -> String:
if status == RequestStatus.PENDING:

View File

@@ -1,6 +1,6 @@
[package]
name = "gurt-godot"
version = "0.1.0"
version = "0.1.1"
edition = "2021"
authors = ["FaceDev"]
license = "MIT"

View File

@@ -1,9 +1,13 @@
use godot::prelude::*;
use gurtlib::prelude::*;
use gurtlib::{GurtMethod, GurtClientConfig, GurtRequest};
use tokio::runtime::Runtime;
use std::sync::Arc;
use gurtlib::{GurtMethod, GurtClientConfig, GurtRequest, GurtResponseHead};
use std::cell::RefCell;
use std::fs::File;
use std::io::Write;
use std::sync::Arc;
use std::sync::Mutex;
use std::collections::HashMap;
use tokio::runtime::Runtime;
struct GurtGodotExtension;
@@ -18,6 +22,8 @@ struct GurtProtocolClient {
client: Arc<RefCell<Option<GurtClient>>>,
runtime: Arc<RefCell<Option<Runtime>>>,
ca_certificates: Arc<RefCell<Vec<String>>>,
cancel_flags: Arc<Mutex<HashMap<String, bool>>>,
event_queue: Arc<Mutex<Vec<DownloadEvent>>>,
}
#[derive(GodotClass)]
@@ -93,6 +99,17 @@ struct GurtProtocolServer {
base: Base<RefCounted>,
}
struct DLState { file: Option<std::fs::File>, total_bytes: i64, downloaded: i64 }
#[derive(Clone)]
enum DownloadEvent {
Started(String, i64),
Progress(String, i64, i64),
Completed(String, String),
Failed(String, String),
}
#[godot_api]
impl GurtProtocolClient {
fn init(base: Base<RefCounted>) -> Self {
@@ -101,12 +118,26 @@ impl GurtProtocolClient {
client: Arc::new(RefCell::new(None)),
runtime: Arc::new(RefCell::new(None)),
ca_certificates: Arc::new(RefCell::new(Vec::new())),
cancel_flags: Arc::new(Mutex::new(HashMap::new())),
event_queue: Arc::new(Mutex::new(Vec::new())),
}
}
#[signal]
fn request_completed(response: Gd<GurtGDResponse>);
#[signal]
fn download_started(download_id: GString, total_bytes: i64);
#[signal]
fn download_progress(download_id: GString, downloaded_bytes: i64, total_bytes: i64);
#[signal]
fn download_completed(download_id: GString, save_path: GString);
#[signal]
fn download_failed(download_id: GString, message: GString);
#[func]
fn create_client(&mut self, timeout_seconds: i32) -> bool {
let runtime = match Runtime::new() {
@@ -253,6 +284,120 @@ impl GurtProtocolClient {
Some(self.convert_response(response))
}
#[func]
fn start_download(&mut self, download_id: GString, url: GString, save_path: GString) -> bool {
let runtime_handle = {
let runtime_binding = self.runtime.borrow();
match runtime_binding.as_ref() {
Some(rt) => rt.handle().clone(),
None => { godot_print!("No runtime available"); return false; }
}
};
let client_instance = {
let client_binding = self.client.borrow();
match client_binding.as_ref() {
Some(c) => c.clone(),
None => { godot_print!("No client available"); return false; }
}
};
let url_str = url.to_string();
let save_path_str = save_path.to_string();
let download_id_string = download_id.to_string();
let cancel_flags = self.cancel_flags.clone();
let event_queue = self.event_queue.clone();
runtime_handle.spawn(async move {
let event_queue_main = event_queue.clone();
let parsed_url = match url::Url::parse(&url_str) { Ok(u) => u, Err(e) => {
if let Ok(mut q) = event_queue.lock() { q.push(DownloadEvent::Failed(download_id_string.clone(), format!("Invalid URL: {}", e))); }
return;
}};
let host = match parsed_url.host_str() { Some(h) => h.to_string(), None => {
if let Ok(mut q) = event_queue.lock() { q.push(DownloadEvent::Failed(download_id_string.clone(), "URL must have a host".to_string())); }
return;
}};
let port = parsed_url.port().unwrap_or(4878);
let path_with_query = if parsed_url.path().is_empty() { "/".to_string() } else { parsed_url.path().to_string() };
let path = match parsed_url.query() { Some(query) => format!("{}?{}", path_with_query, query), None => path_with_query };
let state = Arc::new(Mutex::new(DLState { file: None, total_bytes: -1, downloaded: 0 }));
let request = GurtRequest::new(GurtMethod::GET, path).with_header("User-Agent", "GURT-Client/1.0.0");
let state_head = state.clone();
let event_queue_head = event_queue.clone();
let id_for_head = download_id_string.clone();
let sp_for_head = save_path_str.clone();
let on_head = move |head: &GurtResponseHead| {
if head.status_code < 200 || head.status_code >= 300 {
if let Ok(mut q) = event_queue_head.lock() { q.push(DownloadEvent::Failed(id_for_head.clone(), format!("{} {}", head.status_code, head.status_message))); }
return;
}
let mut total: i64 = -1;
if let Some(cl) = head.headers.get("content-length").or_else(|| head.headers.get("Content-Length")) {
if let Ok(v) = cl.parse::<i64>() { total = v; }
}
match File::create(&sp_for_head) {
Ok(f) => {
if let Ok(mut st) = state_head.lock() { st.file = Some(f); st.total_bytes = total; }
}
Err(e) => { if let Ok(mut q) = event_queue_head.lock() { q.push(DownloadEvent::Failed(id_for_head.clone(), format!("File error: {}", e))); } }
}
if let Ok(mut q) = event_queue_head.lock() { q.push(DownloadEvent::Started(id_for_head.clone(), total)); }
};
let state_chunk = state.clone();
let event_queue_chunk = event_queue.clone();
let id_for_chunk = download_id_string.clone();
let on_chunk = move |chunk: &[u8]| -> bool {
if let Ok(map) = cancel_flags.lock() {
if map.get(&id_for_chunk).copied().unwrap_or(false) { return false; }
}
let mut down = 0i64; let mut total = -1i64; let mut write_result: std::io::Result<()> = Ok(());
if let Ok(mut st) = state_chunk.lock() {
if let Some(f) = st.file.as_mut() { write_result = f.write_all(chunk); }
st.downloaded += chunk.len() as i64; down = st.downloaded; total = st.total_bytes;
}
if let Err(e) = write_result { if let Ok(mut q) = event_queue_chunk.lock() { q.push(DownloadEvent::Failed(id_for_chunk.clone(), format!("Write error: {}", e))); } return false; }
if let Ok(mut q) = event_queue_chunk.lock() { q.push(DownloadEvent::Progress(id_for_chunk.clone(), down, total)); }
true
};
let result = client_instance.stream_request(host.as_str(), port, request, on_head, on_chunk).await;
match result {
Ok(()) => {
if let Ok(mut st) = state.lock() { if let Some(f) = st.file.as_mut() { let _ = f.flush(); } }
if let Ok(mut q) = event_queue_main.lock() { q.push(DownloadEvent::Completed(download_id_string.clone(), save_path_str.clone())); }
}
Err(e) => {
if let Ok(mut q) = event_queue_main.lock() { q.push(DownloadEvent::Failed(download_id_string.clone(), format!("{}", e))); }
}
}
});
true
}
#[func]
fn cancel_download(&mut self, download_id: GString) {
if let Ok(mut map) = self.cancel_flags.lock() { map.insert(download_id.to_string(), true); }
}
#[func]
fn poll_events(&mut self) {
let mut drained: Vec<DownloadEvent> = Vec::new();
if let Ok(mut q) = self.event_queue.lock() { drained.append(&mut *q); }
for ev in drained.into_iter() {
match ev {
DownloadEvent::Started(id, total) => { let mut owner = self.base.to_gd(); let args = [GString::from(id).to_variant(), (total as i64).to_variant()]; owner.emit_signal("download_started".into(), &args); }
DownloadEvent::Progress(id, down, total) => { let mut owner = self.base.to_gd(); let args = [GString::from(id).to_variant(), (down as i64).to_variant(), (total as i64).to_variant()]; owner.emit_signal("download_progress".into(), &args); }
DownloadEvent::Completed(id, path) => { let mut owner = self.base.to_gd(); let args = [GString::from(id).to_variant(), GString::from(path).to_variant()]; owner.emit_signal("download_completed".into(), &args); }
DownloadEvent::Failed(id, msg) => { let mut owner = self.base.to_gd(); let args = [GString::from(id).to_variant(), GString::from(msg).to_variant()]; owner.emit_signal("download_failed".into(), &args); }
}
}
}
#[func]
fn disconnect(&mut self) {
*self.client.borrow_mut() = None;

View File

@@ -1,6 +1,6 @@
[package]
name = "gurtlib"
version = "0.1.0"
version = "0.1.1"
edition = "2021"
authors = ["FaceDev"]
license = "MIT"

View File

@@ -26,6 +26,7 @@ pub struct GurtClientConfig {
pub custom_ca_certificates: Vec<String>,
pub dns_server_ip: String,
pub dns_server_port: u16,
pub read_timeout: Duration,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
@@ -52,6 +53,7 @@ impl Default for GurtClientConfig {
custom_ca_certificates: Vec::new(),
dns_server_ip: "135.125.163.131".to_string(),
dns_server_port: 4878,
read_timeout: Duration::from_secs(5),
}
}
}
@@ -530,6 +532,132 @@ impl GurtClient {
Ok((host, port, path))
}
pub async fn stream_request<HeadCb, ChunkCb>(&self,
host: &str,
port: u16,
mut request: GurtRequest,
mut on_head: HeadCb,
mut on_chunk: ChunkCb,
) -> Result<()>
where
HeadCb: FnMut(&crate::message::GurtResponseHead) + Send,
ChunkCb: FnMut(&[u8]) -> bool + Send,
{
let resolved_host = self.resolve_domain(host).await?;
request = request.with_header("Host", host);
let mut tls_stream = self.get_pooled_connection(&resolved_host, port, Some(host)).await?;
let request_data = request.to_string();
tls_stream.write_all(request_data.as_bytes()).await
.map_err(|e| GurtError::connection(format!("Failed to write request: {}", e)))?;
let mut buffer: Vec<u8> = Vec::new();
let mut temp_buffer = [0u8; 8192];
let start_time = std::time::Instant::now();
let mut headers_parsed = false;
let mut expected_body_length: Option<usize> = None;
let mut headers_end_pos: Option<usize> = None;
let mut head_emitted = false;
let mut delivered: usize = 0;
loop {
if start_time.elapsed() > self.config.request_timeout {
return Err(GurtError::timeout("Request timeout"));
}
match timeout(self.config.read_timeout, tls_stream.read(&mut temp_buffer)).await {
Ok(Ok(0)) => {
if headers_parsed && !head_emitted {
return Err(GurtError::connection("Connection closed before response headers were fully received"));
}
break;
}
Ok(Ok(n)) => {
buffer.extend_from_slice(&temp_buffer[..n]);
if !headers_parsed {
if let Some(pos) = buffer.windows(4).position(|w| w == b"\r\n\r\n") {
headers_end_pos = Some(pos + 4);
headers_parsed = true;
let headers_section = std::str::from_utf8(&buffer[..pos])
.map_err(|e| GurtError::invalid_message(format!("Invalid UTF-8 in headers: {}", e)))?;
let mut lines = headers_section.split("\r\n");
let status_line = lines.next().unwrap_or("");
let parts: Vec<&str> = status_line.splitn(3, ' ').collect();
let mut version = String::new();
let mut status_code: u16 = 0;
let mut status_message = String::new();
if parts.len() >= 2 {
version = parts[0].to_string();
status_code = parts[1].parse().unwrap_or(0);
if parts.len() > 2 { status_message = parts[2].to_string(); }
}
let mut headers = std::collections::HashMap::new();
for line in lines {
if line.is_empty() { break; }
if let Some(colon) = line.find(':') {
let key = line[..colon].trim().to_lowercase();
let value = line[colon+1..].trim().to_string();
if key == "content-length" { expected_body_length = value.parse().ok(); }
headers.insert(key, value);
}
}
let head = crate::message::GurtResponseHead {
version,
status_code,
status_message,
headers,
};
on_head(&head);
head_emitted = true;
if let Some(end) = headers_end_pos {
if buffer.len() > end {
let body_slice = &buffer[end..];
if !on_chunk(body_slice) {
return Err(GurtError::Cancelled);
}
delivered = body_slice.len();
}
}
}
} else {
if let Some(end) = headers_end_pos {
let available = buffer.len().saturating_sub(end + delivered);
if available > 0 {
let start = end + delivered;
let end_pos = end + delivered + available;
if !on_chunk(&buffer[start..end_pos]) {
return Err(GurtError::Cancelled);
}
delivered += available;
}
if let Some(expected_len) = expected_body_length {
if delivered >= expected_len { break; }
}
}
}
}
Ok(Err(e)) => return Err(GurtError::connection(format!("Read error: {}", e))),
Err(_) => continue,
}
}
if let (Some(end), Some(expected_len)) = (headers_end_pos, expected_body_length) {
if delivered >= expected_len {
self.return_connection_to_pool(&resolved_host, port, tls_stream);
}
}
Ok(())
}
async fn resolve_domain(&self, domain: &str) -> Result<String> {
match self.dns_cache.lock() {
Ok(cache) => {

View File

@@ -32,6 +32,9 @@ pub enum GurtError {
#[error("Client error: {0}")]
Client(String),
#[error("Cancelled")]
Cancelled,
}
pub type Result<T> = std::result::Result<T, GurtError>;

View File

@@ -6,7 +6,7 @@ pub mod error;
pub mod message;
pub use error::{GurtError, Result};
pub use message::{GurtMessage, GurtRequest, GurtResponse, GurtMethod};
pub use message::{GurtMessage, GurtRequest, GurtResponse, GurtResponseHead, GurtMethod};
pub use protocol::{GurtStatusCode, GURT_VERSION, DEFAULT_PORT};
pub use crypto::{CryptoManager, TlsConfig, GURT_ALPN, TLS_VERSION};
pub use server::{GurtServer, GurtHandler, ServerContext, Route};
@@ -15,7 +15,7 @@ pub use client::{GurtClient, GurtClientConfig};
pub mod prelude {
pub use crate::{
GurtError, Result,
GurtMessage, GurtRequest, GurtResponse,
GurtMessage, GurtRequest, GurtResponse, GurtResponseHead,
GURT_VERSION, DEFAULT_PORT,
CryptoManager, TlsConfig, GURT_ALPN, TLS_VERSION,
GurtServer, GurtHandler, ServerContext, Route,

View File

@@ -225,6 +225,14 @@ pub struct GurtResponse {
pub body: Vec<u8>,
}
#[derive(Debug, Clone)]
pub struct GurtResponseHead {
pub version: String,
pub status_code: u16,
pub status_message: String,
pub headers: GurtHeaders,
}
impl GurtResponse {
pub fn new(status_code: GurtStatusCode) -> Self {
Self {

View File

@@ -27,7 +27,7 @@
local downloadShitBtn = gurt.select('#download-shit')
local downloadTextBtn = gurt.select('#download-text')
local downloadJsonBtn = gurt.select('#download-json')
local downloadPeakBtn = gurt.select('#download-peak')
trace.log('Download API demo script started.')
local logMessages = {}
@@ -74,6 +74,13 @@
addLog("⚠️ Warning: This is a large file (~5GB)!")
end)
-- no ai slop btw
downloadPeakBtn:on('click', function()
-- i expect you to host this yourself
local downloadId = gurt.download("gurt://127.0.0.1", "peakshit.iso")
addLog(`started your peak download {downloadId}`)
end)
-- Clear log button
clearLogBtn:on('click', function()
logMessages = {}
@@ -107,6 +114,7 @@
</div>
<div style="button-group mb-4">
<button id="download-shit" style="special-button">💿 Download Shit (Ubuntu ISO)</button>
<button id="download-peak" style="special-button">Download shit over gurt :tada:</button>
</div>
<div style="code-block">
@@ -128,6 +136,10 @@ end)
gurt.select("#download-shit"):on("click", function()
local downloadId = gurt.download("https://releases.ubuntu.com/24.04.3/ubuntu-24.04.3-desktop-amd64.iso", "linux.iso")
end)
gurt.select("#download-peak"):on("click", function()
local downloadId = gurt.download("gurt://127.0.0.1", "peakshit.iso")
end)
</div>
</div>
@@ -146,6 +158,7 @@ end)
<li><strong>Return Value:</strong> Returns a unique download ID for tracking</li>
<li><strong>File Types:</strong> Supports any file type (images, text, binary, etc.)</li>
<li><strong>Large Files:</strong> Can handle large downloads like OS images</li>
<li><strong>GURT Protocol:</strong> Supports downloading over the gurt protocol</li>
</ul>
<h3 style="text-[#0369a1] font-semibold mt-4 mb-2">Test Cases:</h3>
<ul style="text-[#075985] space-y-1 text-sm">