1use std::io::Write;
14use std::path::{Path, PathBuf};
15
16use futures_util::StreamExt;
17use serde::Deserialize;
18
19#[derive(Debug, Clone)]
21pub struct EngineFetchProgress {
22 pub stage: EngineStage,
23 pub current: u64,
25 pub total: Option<u64>,
27 pub message: String,
28}
29
30#[derive(Debug, Clone, Copy, PartialEq, Eq)]
32pub enum EngineStage {
33 Resolving,
34 Downloading,
35 Extracting,
36 Installing,
37}
38
39pub async fn ensure_engine(
44 dest: &Path,
45 on_progress: impl Fn(EngineFetchProgress) + Send + Sync,
46) -> Result<PathBuf, String> {
47 let binary = dest.join(exe_name());
48 if binary.is_file() {
49 return Ok(binary);
50 }
51 std::fs::create_dir_all(dest).map_err(|e| format!("create {}: {e}", dest.display()))?;
52
53 let catalog = load_catalog()?;
54 let spec = catalog
55 .assets
56 .iter()
57 .find(|a| a.os == host_os() && a.arch == host_arch())
58 .ok_or_else(|| {
59 format!(
60 "no engine release configured for {}/{}",
61 host_os(),
62 host_arch()
63 )
64 })?;
65
66 on_progress(EngineFetchProgress {
67 stage: EngineStage::Resolving,
68 current: 0,
69 total: None,
70 message: "Resolving engine release…".into(),
71 });
72
73 let client = reqwest::Client::builder()
74 .user_agent(catalog.user_agent.clone())
75 .build()
76 .map_err(|e| format!("http client: {e}"))?;
77
78 let release: Release = client
79 .get(&catalog.release_api)
80 .header(reqwest::header::ACCEPT, "application/vnd.github+json")
81 .send()
82 .await
83 .and_then(|r| r.error_for_status())
84 .map_err(|e| format!("fetch release metadata: {e}"))?
85 .json()
86 .await
87 .map_err(|e| format!("parse release metadata: {e}"))?;
88
89 let ext = spec.archive.ext();
90 let asset = release
91 .assets
92 .iter()
93 .find(|a| a.name.contains(&spec.name) && a.name.ends_with(ext))
94 .ok_or_else(|| {
95 format!(
96 "no '{}'*{} asset in release {}",
97 spec.name, ext, release.tag_name
98 )
99 })?;
100
101 let tmp = std::env::temp_dir().join(format!("flow-engine-{}", std::process::id()));
103 std::fs::create_dir_all(&tmp).map_err(|e| format!("create temp dir: {e}"))?;
104 let archive_path = tmp.join(&asset.name);
105 download(
106 &client,
107 &asset.browser_download_url,
108 &archive_path,
109 &on_progress,
110 )
111 .await?;
112
113 on_progress(EngineFetchProgress {
114 stage: EngineStage::Extracting,
115 current: 0,
116 total: None,
117 message: "Extracting engine…".into(),
118 });
119
120 let dest_owned = dest.to_path_buf();
122 let archive = spec.archive;
123 let tmp_for_task = tmp.clone();
124 let binary = tokio::task::spawn_blocking(move || {
125 extract_and_install(&archive_path, &dest_owned, archive, &tmp_for_task)
126 })
127 .await
128 .map_err(|e| format!("extract task: {e}"))??;
129
130 let _ = std::fs::remove_dir_all(&tmp);
131
132 on_progress(EngineFetchProgress {
133 stage: EngineStage::Installing,
134 current: 0,
135 total: None,
136 message: "Engine ready".into(),
137 });
138
139 Ok(binary)
140}
141
142fn exe_name() -> &'static str {
143 if cfg!(windows) {
144 "llama-server.exe"
145 } else {
146 "llama-server"
147 }
148}
149
150fn host_os() -> &'static str {
151 if cfg!(target_os = "macos") {
152 "macos"
153 } else if cfg!(target_os = "windows") {
154 "windows"
155 } else {
156 "linux"
157 }
158}
159
160fn host_arch() -> &'static str {
161 if cfg!(target_arch = "aarch64") {
162 "arm64"
163 } else {
164 "x64"
165 }
166}
167
168#[derive(Deserialize)]
169struct EngineCatalog {
170 release_api: String,
171 user_agent: String,
172 assets: Vec<AssetSpec>,
173}
174
175#[derive(Deserialize)]
176struct AssetSpec {
177 os: String,
178 arch: String,
179 name: String,
181 archive: ArchiveKind,
182}
183
184#[derive(Deserialize, Clone, Copy)]
185#[serde(rename_all = "lowercase")]
186enum ArchiveKind {
187 Targz,
188 Zip,
189}
190
191impl ArchiveKind {
192 fn ext(self) -> &'static str {
193 match self {
194 ArchiveKind::Targz => ".tar.gz",
195 ArchiveKind::Zip => ".zip",
196 }
197 }
198}
199
200fn load_catalog() -> Result<EngineCatalog, String> {
201 serde_json::from_str(include_str!("engine_catalog.json"))
202 .map_err(|e| format!("engine catalog: {e}"))
203}
204
205#[derive(Deserialize)]
206struct Release {
207 tag_name: String,
208 assets: Vec<ReleaseAsset>,
209}
210
211#[derive(Deserialize)]
212struct ReleaseAsset {
213 name: String,
214 browser_download_url: String,
215}
216
217async fn download(
218 client: &reqwest::Client,
219 url: &str,
220 dest_file: &Path,
221 on_progress: &(impl Fn(EngineFetchProgress) + Send + Sync),
222) -> Result<(), String> {
223 let resp = client
224 .get(url)
225 .send()
226 .await
227 .and_then(|r| r.error_for_status())
228 .map_err(|e| format!("download: {e}"))?;
229 let total = resp.content_length();
230 let mut file =
231 std::fs::File::create(dest_file).map_err(|e| format!("create {}: {e}", dest_file.display()))?;
232 let mut stream = resp.bytes_stream();
233 let mut downloaded: u64 = 0;
234 let mut last_emit: u64 = 0;
235 on_progress(EngineFetchProgress {
236 stage: EngineStage::Downloading,
237 current: 0,
238 total,
239 message: "Downloading engine…".into(),
240 });
241 while let Some(chunk) = stream.next().await {
242 let chunk = chunk.map_err(|e| format!("download stream: {e}"))?;
243 file.write_all(&chunk).map_err(|e| format!("write: {e}"))?;
244 downloaded += chunk.len() as u64;
245 if downloaded - last_emit >= (1 << 20) {
247 last_emit = downloaded;
248 on_progress(EngineFetchProgress {
249 stage: EngineStage::Downloading,
250 current: downloaded,
251 total,
252 message: "Downloading engine…".into(),
253 });
254 }
255 }
256 file.flush().map_err(|e| format!("flush: {e}"))?;
257 Ok(())
258}
259
260fn extract_and_install(
261 archive: &Path,
262 dest: &Path,
263 kind: ArchiveKind,
264 tmp: &Path,
265) -> Result<PathBuf, String> {
266 let unpack = tmp.join("unpack");
267 std::fs::create_dir_all(&unpack).map_err(|e| format!("create unpack dir: {e}"))?;
268 match kind {
269 ArchiveKind::Targz => {
270 let f = std::fs::File::open(archive).map_err(|e| format!("open archive: {e}"))?;
271 let gz = flate2::read::GzDecoder::new(f);
272 tar::Archive::new(gz)
273 .unpack(&unpack)
274 .map_err(|e| format!("untar: {e}"))?;
275 }
276 ArchiveKind::Zip => {
277 let f = std::fs::File::open(archive).map_err(|e| format!("open archive: {e}"))?;
278 zip::ZipArchive::new(f)
279 .map_err(|e| format!("open zip: {e}"))?
280 .extract(&unpack)
281 .map_err(|e| format!("unzip: {e}"))?;
282 }
283 }
284
285 let engine_dir =
286 find_engine_dir(&unpack).ok_or_else(|| format!("{} not found in archive", exe_name()))?;
287
288 copy_flat(&engine_dir, dest)?;
291
292 let binary = dest.join(exe_name());
293 if !binary.is_file() {
294 return Err(format!(
295 "engine binary missing after install: {}",
296 binary.display()
297 ));
298 }
299 make_runnable(dest, &binary);
300 Ok(binary)
301}
302
303fn find_engine_dir(root: &Path) -> Option<PathBuf> {
305 let mut stack = vec![root.to_path_buf()];
306 while let Some(dir) = stack.pop() {
307 let mut subdirs = Vec::new();
308 for entry in std::fs::read_dir(&dir).ok()?.flatten() {
309 if entry.file_name() == std::ffi::OsStr::new(exe_name()) {
310 return Some(dir);
311 }
312 if entry.path().is_dir() {
313 subdirs.push(entry.path());
314 }
315 }
316 stack.extend(subdirs);
317 }
318 None
319}
320
321fn copy_flat(src: &Path, dest: &Path) -> Result<(), String> {
324 std::fs::create_dir_all(dest).map_err(|e| format!("mkdir {}: {e}", dest.display()))?;
325 for entry in std::fs::read_dir(src).map_err(|e| format!("read {}: {e}", src.display()))? {
326 let entry = entry.map_err(|e| format!("dir entry: {e}"))?;
327 let from = entry.path();
328 let to = dest.join(entry.file_name());
329 let meta = std::fs::symlink_metadata(&from).map_err(|e| format!("stat: {e}"))?;
330 if meta.file_type().is_symlink() {
331 let real = std::fs::canonicalize(&from).map_err(|e| format!("resolve symlink: {e}"))?;
332 if real.is_file() {
333 std::fs::copy(&real, &to).map_err(|e| format!("copy: {e}"))?;
334 }
335 } else if meta.is_dir() {
336 copy_flat(&from, &to)?;
337 } else {
338 std::fs::copy(&from, &to).map_err(|e| format!("copy: {e}"))?;
339 }
340 }
341 Ok(())
342}
343
344fn make_runnable(dest: &Path, binary: &Path) {
348 #[cfg(unix)]
349 {
350 use std::os::unix::fs::PermissionsExt;
351 if let Ok(meta) = std::fs::metadata(binary) {
352 let mut perms = meta.permissions();
353 perms.set_mode(0o755);
354 let _ = std::fs::set_permissions(binary, perms);
355 }
356 }
357 #[cfg(target_os = "macos")]
358 {
359 let _ = std::process::Command::new("xattr")
360 .args(["-dr", "com.apple.quarantine"])
361 .arg(dest)
362 .status();
363 if let Ok(entries) = std::fs::read_dir(dest) {
364 for entry in entries.flatten() {
365 let path = entry.path();
366 if path.is_file() {
367 let _ = std::process::Command::new("codesign")
368 .args(["--force", "--sign", "-"])
369 .arg(&path)
370 .status();
371 }
372 }
373 }
374 }
375 let _ = (dest, binary);
377}
378
379#[cfg(test)]
380mod tests {
381 use super::*;
382
383 #[test]
384 fn catalog_parses_and_covers_this_host() {
385 let cat = load_catalog().expect("engine_catalog.json parses");
386 assert!(
387 cat.assets
388 .iter()
389 .any(|a| a.os == host_os() && a.arch == host_arch()),
390 "no engine asset configured for {}/{}",
391 host_os(),
392 host_arch()
393 );
394 assert!(cat.release_api.starts_with("http"));
395 }
396
397 #[test]
398 fn archive_ext_matches_kind() {
399 assert_eq!(ArchiveKind::Targz.ext(), ".tar.gz");
400 assert_eq!(ArchiveKind::Zip.ext(), ".zip");
401 }
402}