Skip to main content

flow_models_server/
fetch.rs

1//! Engine provisioning for the managed local-model server.
2//!
3//! On first use we fetch the prebuilt server engine for the host's OS/arch and
4//! unpack it into the data dir (`engines/`), so there is zero external setup.
5//! This replaces the old Node fetch script.
6//!
7//! The release coordinates - which distribution, and which artifact per
8//! platform - live in `engine_catalog.json` (operator/API data, kept out of
9//! source). This module holds only the generic resolve → download → extract →
10//! install logic, reporting progress so a long first download is never a silent
11//! wait.
12
13use std::io::Write;
14use std::path::{Path, PathBuf};
15
16use futures_util::StreamExt;
17use serde::Deserialize;
18
19/// A progress update emitted while fetching the engine.
20#[derive(Debug, Clone)]
21pub struct EngineFetchProgress {
22    pub stage: EngineStage,
23    /// Bytes done so far (meaningful during `Downloading`).
24    pub current: u64,
25    /// Total download size when the server reports it.
26    pub total: Option<u64>,
27    pub message: String,
28}
29
30/// Coarse phase of the engine fetch.
31#[derive(Debug, Clone, Copy, PartialEq, Eq)]
32pub enum EngineStage {
33    Resolving,
34    Downloading,
35    Extracting,
36    Installing,
37}
38
39/// Ensure a managed engine binary exists under `dest` (the data-dir
40/// `engines/`), fetching the release for this host's OS/arch if absent.
41/// Returns the path to the engine binary; reports progress through
42/// `on_progress`.
43pub async fn ensure_engine(
44    dest: &Path,
45    on_progress: impl Fn(EngineFetchProgress) + Send + Sync,
46) -> Result<PathBuf, String> {
47    let binary = dest.join(exe_name());
48    if binary.is_file() {
49        return Ok(binary);
50    }
51    std::fs::create_dir_all(dest).map_err(|e| format!("create {}: {e}", dest.display()))?;
52
53    let catalog = load_catalog()?;
54    let spec = catalog
55        .assets
56        .iter()
57        .find(|a| a.os == host_os() && a.arch == host_arch())
58        .ok_or_else(|| {
59            format!(
60                "no engine release configured for {}/{}",
61                host_os(),
62                host_arch()
63            )
64        })?;
65
66    on_progress(EngineFetchProgress {
67        stage: EngineStage::Resolving,
68        current: 0,
69        total: None,
70        message: "Resolving engine release…".into(),
71    });
72
73    let client = reqwest::Client::builder()
74        .user_agent(catalog.user_agent.clone())
75        .build()
76        .map_err(|e| format!("http client: {e}"))?;
77
78    let release: Release = client
79        .get(&catalog.release_api)
80        .header(reqwest::header::ACCEPT, "application/vnd.github+json")
81        .send()
82        .await
83        .and_then(|r| r.error_for_status())
84        .map_err(|e| format!("fetch release metadata: {e}"))?
85        .json()
86        .await
87        .map_err(|e| format!("parse release metadata: {e}"))?;
88
89    let ext = spec.archive.ext();
90    let asset = release
91        .assets
92        .iter()
93        .find(|a| a.name.contains(&spec.name) && a.name.ends_with(ext))
94        .ok_or_else(|| {
95            format!(
96                "no '{}'*{} asset in release {}",
97                spec.name, ext, release.tag_name
98            )
99        })?;
100
101    // Download (streamed) into a temp file, reporting byte progress.
102    let tmp = std::env::temp_dir().join(format!("flow-engine-{}", std::process::id()));
103    std::fs::create_dir_all(&tmp).map_err(|e| format!("create temp dir: {e}"))?;
104    let archive_path = tmp.join(&asset.name);
105    download(
106        &client,
107        &asset.browser_download_url,
108        &archive_path,
109        &on_progress,
110    )
111    .await?;
112
113    on_progress(EngineFetchProgress {
114        stage: EngineStage::Extracting,
115        current: 0,
116        total: None,
117        message: "Extracting engine…".into(),
118    });
119
120    // Extraction + install is blocking I/O; keep it off the async runtime.
121    let dest_owned = dest.to_path_buf();
122    let archive = spec.archive;
123    let tmp_for_task = tmp.clone();
124    let binary = tokio::task::spawn_blocking(move || {
125        extract_and_install(&archive_path, &dest_owned, archive, &tmp_for_task)
126    })
127    .await
128    .map_err(|e| format!("extract task: {e}"))??;
129
130    let _ = std::fs::remove_dir_all(&tmp);
131
132    on_progress(EngineFetchProgress {
133        stage: EngineStage::Installing,
134        current: 0,
135        total: None,
136        message: "Engine ready".into(),
137    });
138
139    Ok(binary)
140}
141
142fn exe_name() -> &'static str {
143    if cfg!(windows) {
144        "llama-server.exe"
145    } else {
146        "llama-server"
147    }
148}
149
150fn host_os() -> &'static str {
151    if cfg!(target_os = "macos") {
152        "macos"
153    } else if cfg!(target_os = "windows") {
154        "windows"
155    } else {
156        "linux"
157    }
158}
159
160fn host_arch() -> &'static str {
161    if cfg!(target_arch = "aarch64") {
162        "arm64"
163    } else {
164        "x64"
165    }
166}
167
168#[derive(Deserialize)]
169struct EngineCatalog {
170    release_api: String,
171    user_agent: String,
172    assets: Vec<AssetSpec>,
173}
174
175#[derive(Deserialize)]
176struct AssetSpec {
177    os: String,
178    arch: String,
179    /// Fragment that identifies this host's artifact within the release.
180    name: String,
181    archive: ArchiveKind,
182}
183
184#[derive(Deserialize, Clone, Copy)]
185#[serde(rename_all = "lowercase")]
186enum ArchiveKind {
187    Targz,
188    Zip,
189}
190
191impl ArchiveKind {
192    fn ext(self) -> &'static str {
193        match self {
194            ArchiveKind::Targz => ".tar.gz",
195            ArchiveKind::Zip => ".zip",
196        }
197    }
198}
199
200fn load_catalog() -> Result<EngineCatalog, String> {
201    serde_json::from_str(include_str!("engine_catalog.json"))
202        .map_err(|e| format!("engine catalog: {e}"))
203}
204
205#[derive(Deserialize)]
206struct Release {
207    tag_name: String,
208    assets: Vec<ReleaseAsset>,
209}
210
211#[derive(Deserialize)]
212struct ReleaseAsset {
213    name: String,
214    browser_download_url: String,
215}
216
217async fn download(
218    client: &reqwest::Client,
219    url: &str,
220    dest_file: &Path,
221    on_progress: &(impl Fn(EngineFetchProgress) + Send + Sync),
222) -> Result<(), String> {
223    let resp = client
224        .get(url)
225        .send()
226        .await
227        .and_then(|r| r.error_for_status())
228        .map_err(|e| format!("download: {e}"))?;
229    let total = resp.content_length();
230    let mut file =
231        std::fs::File::create(dest_file).map_err(|e| format!("create {}: {e}", dest_file.display()))?;
232    let mut stream = resp.bytes_stream();
233    let mut downloaded: u64 = 0;
234    let mut last_emit: u64 = 0;
235    on_progress(EngineFetchProgress {
236        stage: EngineStage::Downloading,
237        current: 0,
238        total,
239        message: "Downloading engine…".into(),
240    });
241    while let Some(chunk) = stream.next().await {
242        let chunk = chunk.map_err(|e| format!("download stream: {e}"))?;
243        file.write_all(&chunk).map_err(|e| format!("write: {e}"))?;
244        downloaded += chunk.len() as u64;
245        // Throttle to ~1 MiB steps so the event channel isn't flooded.
246        if downloaded - last_emit >= (1 << 20) {
247            last_emit = downloaded;
248            on_progress(EngineFetchProgress {
249                stage: EngineStage::Downloading,
250                current: downloaded,
251                total,
252                message: "Downloading engine…".into(),
253            });
254        }
255    }
256    file.flush().map_err(|e| format!("flush: {e}"))?;
257    Ok(())
258}
259
260fn extract_and_install(
261    archive: &Path,
262    dest: &Path,
263    kind: ArchiveKind,
264    tmp: &Path,
265) -> Result<PathBuf, String> {
266    let unpack = tmp.join("unpack");
267    std::fs::create_dir_all(&unpack).map_err(|e| format!("create unpack dir: {e}"))?;
268    match kind {
269        ArchiveKind::Targz => {
270            let f = std::fs::File::open(archive).map_err(|e| format!("open archive: {e}"))?;
271            let gz = flate2::read::GzDecoder::new(f);
272            tar::Archive::new(gz)
273                .unpack(&unpack)
274                .map_err(|e| format!("untar: {e}"))?;
275        }
276        ArchiveKind::Zip => {
277            let f = std::fs::File::open(archive).map_err(|e| format!("open archive: {e}"))?;
278            zip::ZipArchive::new(f)
279                .map_err(|e| format!("open zip: {e}"))?
280                .extract(&unpack)
281                .map_err(|e| format!("unzip: {e}"))?;
282        }
283    }
284
285    let engine_dir =
286        find_engine_dir(&unpack).ok_or_else(|| format!("{} not found in archive", exe_name()))?;
287
288    // Copy the engine dir's contents into `dest`, flattening symlinks to real
289    // files (the engine resolves sibling shared libraries by name).
290    copy_flat(&engine_dir, dest)?;
291
292    let binary = dest.join(exe_name());
293    if !binary.is_file() {
294        return Err(format!(
295            "engine binary missing after install: {}",
296            binary.display()
297        ));
298    }
299    make_runnable(dest, &binary);
300    Ok(binary)
301}
302
303/// Depth-first search for the directory that holds the engine binary.
304fn find_engine_dir(root: &Path) -> Option<PathBuf> {
305    let mut stack = vec![root.to_path_buf()];
306    while let Some(dir) = stack.pop() {
307        let mut subdirs = Vec::new();
308        for entry in std::fs::read_dir(&dir).ok()?.flatten() {
309            if entry.file_name() == std::ffi::OsStr::new(exe_name()) {
310                return Some(dir);
311            }
312            if entry.path().is_dir() {
313                subdirs.push(entry.path());
314            }
315        }
316        stack.extend(subdirs);
317    }
318    None
319}
320
321/// Recursively copy `src` into `dest`, replacing each symlink with a real copy
322/// of its resolved target so every name the engine loads exists as a file.
323fn copy_flat(src: &Path, dest: &Path) -> Result<(), String> {
324    std::fs::create_dir_all(dest).map_err(|e| format!("mkdir {}: {e}", dest.display()))?;
325    for entry in std::fs::read_dir(src).map_err(|e| format!("read {}: {e}", src.display()))? {
326        let entry = entry.map_err(|e| format!("dir entry: {e}"))?;
327        let from = entry.path();
328        let to = dest.join(entry.file_name());
329        let meta = std::fs::symlink_metadata(&from).map_err(|e| format!("stat: {e}"))?;
330        if meta.file_type().is_symlink() {
331            let real = std::fs::canonicalize(&from).map_err(|e| format!("resolve symlink: {e}"))?;
332            if real.is_file() {
333                std::fs::copy(&real, &to).map_err(|e| format!("copy: {e}"))?;
334            }
335        } else if meta.is_dir() {
336            copy_flat(&from, &to)?;
337        } else {
338            std::fs::copy(&from, &to).map_err(|e| format!("copy: {e}"))?;
339        }
340    }
341    Ok(())
342}
343
344/// Make the freshly-installed engine launchable: executable bit on Unix, and on
345/// macOS clear the download quarantine + ad-hoc sign so Gatekeeper doesn't kill
346/// the unsigned binary on launch.
347fn make_runnable(dest: &Path, binary: &Path) {
348    #[cfg(unix)]
349    {
350        use std::os::unix::fs::PermissionsExt;
351        if let Ok(meta) = std::fs::metadata(binary) {
352            let mut perms = meta.permissions();
353            perms.set_mode(0o755);
354            let _ = std::fs::set_permissions(binary, perms);
355        }
356    }
357    #[cfg(target_os = "macos")]
358    {
359        let _ = std::process::Command::new("xattr")
360            .args(["-dr", "com.apple.quarantine"])
361            .arg(dest)
362            .status();
363        if let Ok(entries) = std::fs::read_dir(dest) {
364            for entry in entries.flatten() {
365                let path = entry.path();
366                if path.is_file() {
367                    let _ = std::process::Command::new("codesign")
368                        .args(["--force", "--sign", "-"])
369                        .arg(&path)
370                        .status();
371                }
372            }
373        }
374    }
375    // Silence unused-parameter warnings on platforms where a branch compiles out.
376    let _ = (dest, binary);
377}
378
379#[cfg(test)]
380mod tests {
381    use super::*;
382
383    #[test]
384    fn catalog_parses_and_covers_this_host() {
385        let cat = load_catalog().expect("engine_catalog.json parses");
386        assert!(
387            cat.assets
388                .iter()
389                .any(|a| a.os == host_os() && a.arch == host_arch()),
390            "no engine asset configured for {}/{}",
391            host_os(),
392            host_arch()
393        );
394        assert!(cat.release_api.starts_with("http"));
395    }
396
397    #[test]
398    fn archive_ext_matches_kind() {
399        assert_eq!(ArchiveKind::Targz.ext(), ".tar.gz");
400        assert_eq!(ArchiveKind::Zip.ext(), ".zip");
401    }
402}