Skip to main content

flow_application/
scheduler.rs

1//! Recurring-run scheduling (roadmap E11). Translates user-friendly presets,
2//! intervals, one-shots, and raw cron expressions into persisted `next_run_at`
3//! instants for the background scheduler.
4
5use chrono::{
6    DateTime, Datelike, Duration, LocalResult, NaiveDate, NaiveDateTime, NaiveTime, TimeZone,
7    Timelike, Utc, Weekday,
8};
9use chrono_tz::Tz;
10use croner::Cron;
11use serde::{Deserialize, Serialize};
12use std::str::FromStr;
13
14/// Supported recurrence cadences. Anchored on a user-chosen instant whose
15/// minute/hour/weekday/day-of-month/month select the recurring slot.
16#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
17#[serde(rename_all = "lowercase")]
18pub enum ScheduleFrequency {
19    Hourly,
20    Daily,
21    Weekly,
22    Monthly,
23    Yearly,
24    Cron,
25    EveryNMinutes,
26    Once,
27}
28
29impl ScheduleFrequency {
30    pub fn as_str(self) -> &'static str {
31        match self {
32            Self::Hourly => "hourly",
33            Self::Daily => "daily",
34            Self::Weekly => "weekly",
35            Self::Monthly => "monthly",
36            Self::Yearly => "yearly",
37            Self::Cron => "cron",
38            Self::EveryNMinutes => "every_n_minutes",
39            Self::Once => "once",
40        }
41    }
42
43    /// Parse the wire string; `None` for anything outside the supported modes.
44    pub fn parse(s: &str) -> Option<Self> {
45        match s.trim().to_ascii_lowercase().as_str() {
46            "hourly" => Some(Self::Hourly),
47            "daily" => Some(Self::Daily),
48            "weekly" => Some(Self::Weekly),
49            "monthly" => Some(Self::Monthly),
50            "yearly" => Some(Self::Yearly),
51            "cron" => Some(Self::Cron),
52            "every_n_minutes" | "every-n-minutes" | "interval" => Some(Self::EveryNMinutes),
53            "once" => Some(Self::Once),
54            _ => None,
55        }
56    }
57}
58
59/// Missed-run policy when the app/server was not running at the scheduled time.
60#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
61#[serde(rename_all = "kebab-case")]
62pub enum CatchUpPolicy {
63    Skip,
64    RunOnce,
65    RunAll,
66}
67
68impl CatchUpPolicy {
69    pub fn as_str(self) -> &'static str {
70        match self {
71            Self::Skip => "skip",
72            Self::RunOnce => "run-once",
73            Self::RunAll => "run-all",
74        }
75    }
76
77    pub fn parse(s: &str) -> Option<Self> {
78        match s.trim().to_ascii_lowercase().as_str() {
79            "skip" => Some(Self::Skip),
80            "run-once" | "run_once" => Some(Self::RunOnce),
81            "run-all" | "run_all" => Some(Self::RunAll),
82            _ => None,
83        }
84    }
85}
86
87impl Default for CatchUpPolicy {
88    fn default() -> Self {
89        Self::RunOnce
90    }
91}
92
93/// Patch from the UI to create or update a flow's schedule. camelCase to match
94/// the desktop + server command payloads.
95#[derive(Debug, Clone, Deserialize)]
96#[serde(rename_all = "camelCase")]
97pub struct SchedulePatch {
98    pub template_slug: String,
99    pub collection_slug: String,
100    pub flow_name: String,
101    pub enabled: bool,
102    pub frequency: String,
103    pub anchor_at: DateTime<Utc>,
104    pub timezone: Option<String>,
105    pub cron: Option<String>,
106    pub every_minutes: Option<u32>,
107    pub until: Option<DateTime<Utc>>,
108    pub max_runs: Option<u32>,
109    pub catchup: Option<String>,
110}
111
112/// Schedule preview request used by the UI/CLI to show upcoming fire times
113/// before saving.
114#[derive(Debug, Clone, Deserialize)]
115#[serde(rename_all = "camelCase")]
116pub struct SchedulePreviewRequest {
117    pub frequency: String,
118    pub anchor_at: DateTime<Utc>,
119    pub timezone: Option<String>,
120    pub cron: Option<String>,
121    pub every_minutes: Option<u32>,
122    pub until: Option<DateTime<Utc>>,
123    pub max_runs: Option<u32>,
124    pub run_count: Option<u32>,
125    pub limit: Option<usize>,
126}
127
128/// Calculate the next fire instant strictly after `after`, honoring optional
129/// end conditions.
130pub fn next_run_after(
131    freq: ScheduleFrequency,
132    anchor: DateTime<Utc>,
133    after: DateTime<Utc>,
134    opts: &ScheduleOptions,
135) -> Result<Option<DateTime<Utc>>, String> {
136    if let Some(max) = opts.max_runs {
137        if opts.run_count >= max {
138            return Ok(None);
139        }
140    }
141
142    let tz = parse_timezone(opts.timezone.as_deref())?;
143    let next = match freq {
144        ScheduleFrequency::Hourly => next_wall_after(anchor, after, tz, WallCadence::Hourly),
145        ScheduleFrequency::Daily => next_wall_after(anchor, after, tz, WallCadence::Daily),
146        ScheduleFrequency::Weekly => next_wall_after(anchor, after, tz, WallCadence::Weekly),
147        ScheduleFrequency::Monthly => next_wall_after(anchor, after, tz, WallCadence::Monthly),
148        ScheduleFrequency::Yearly => next_wall_after(anchor, after, tz, WallCadence::Yearly),
149        ScheduleFrequency::Cron => next_cron_after(opts.cron.as_deref(), after, tz)?,
150        ScheduleFrequency::EveryNMinutes => {
151            let minutes = opts.every_minutes.unwrap_or(15).clamp(1, 24 * 60);
152            if anchor > after {
153                anchor
154            } else {
155                step_fixed(anchor, after, Duration::minutes(minutes as i64))
156            }
157        }
158        ScheduleFrequency::Once => {
159            if anchor > after {
160                anchor
161            } else {
162                return Ok(None);
163            }
164        }
165    };
166
167    if opts.until.is_some_and(|until| next > until) {
168        return Ok(None);
169    }
170    Ok(Some(next))
171}
172
173#[derive(Debug, Clone, Default)]
174pub struct ScheduleOptions {
175    pub timezone: Option<String>,
176    pub cron: Option<String>,
177    pub every_minutes: Option<u32>,
178    pub until: Option<DateTime<Utc>>,
179    pub max_runs: Option<u32>,
180    pub run_count: u32,
181}
182
183/// A 5-field cron projection of preset schedules, for display only.
184pub fn cron_expr(
185    freq: ScheduleFrequency,
186    anchor: DateTime<Utc>,
187    timezone: Option<&str>,
188    cron: Option<&str>,
189) -> String {
190    if freq == ScheduleFrequency::Cron {
191        return cron.unwrap_or("").to_string();
192    }
193    let tz = parse_timezone(timezone).unwrap_or(chrono_tz::UTC);
194    let anchor = anchor.with_timezone(&tz);
195    let (m, h, dom, mon) = (
196        anchor.minute(),
197        anchor.hour(),
198        anchor.day(),
199        anchor.month(),
200    );
201    // cron weekday: 0 = Sunday.
202    let dow = anchor.weekday().num_days_from_sunday();
203    match freq {
204        ScheduleFrequency::Hourly => format!("{m} * * * *"),
205        ScheduleFrequency::Daily => format!("{m} {h} * * *"),
206        ScheduleFrequency::Weekly => format!("{m} {h} * * {dow}"),
207        ScheduleFrequency::Monthly => format!("{m} {h} {dom} * *"),
208        ScheduleFrequency::Yearly => format!("{m} {h} {dom} {mon} *"),
209        ScheduleFrequency::EveryNMinutes => "*/15 * * * *".into(),
210        ScheduleFrequency::Once => String::new(),
211        ScheduleFrequency::Cron => unreachable!(),
212    }
213}
214
215pub fn preview(req: SchedulePreviewRequest) -> Result<Vec<DateTime<Utc>>, String> {
216    let freq = ScheduleFrequency::parse(&req.frequency)
217        .ok_or_else(|| format!("unknown schedule frequency `{}`", req.frequency))?;
218    let limit = req.limit.unwrap_or(5).clamp(1, 20);
219    let mut opts = ScheduleOptions {
220        timezone: req.timezone,
221        cron: req.cron,
222        every_minutes: req.every_minutes,
223        until: req.until,
224        max_runs: req.max_runs,
225        run_count: req.run_count.unwrap_or(0),
226    };
227    let mut out = Vec::new();
228    let mut after = Utc::now() - Duration::seconds(1);
229    while out.len() < limit {
230        let Some(next) = next_run_after(freq, req.anchor_at, after, &opts)? else {
231            break;
232        };
233        out.push(next);
234        after = next;
235        opts.run_count = opts.run_count.saturating_add(1);
236    }
237    Ok(out)
238}
239
240/// Direct jump along a fixed-length cadence (hourly/daily/weekly) - no looping.
241fn step_fixed(anchor: DateTime<Utc>, after: DateTime<Utc>, period: Duration) -> DateTime<Utc> {
242    let period_secs = period.num_seconds().max(1);
243    let elapsed = (after - anchor).num_seconds().max(0);
244    let k = elapsed / period_secs + 1;
245    anchor + Duration::seconds(k * period_secs)
246}
247
248#[derive(Debug, Clone, Copy)]
249enum WallCadence {
250    Hourly,
251    Daily,
252    Weekly,
253    Monthly,
254    Yearly,
255}
256
257fn next_wall_after(
258    anchor: DateTime<Utc>,
259    after: DateTime<Utc>,
260    tz: Tz,
261    cadence: WallCadence,
262) -> DateTime<Utc> {
263    if anchor > after {
264        return anchor;
265    }
266    let anchor_local = anchor.with_timezone(&tz);
267    let after_local = after.with_timezone(&tz);
268    let time = NaiveTime::from_hms_opt(
269        anchor_local.hour(),
270        anchor_local.minute(),
271        anchor_local.second(),
272    )
273    .unwrap_or(NaiveTime::MIN);
274    let mut candidate = match cadence {
275        WallCadence::Hourly => after_local
276            .date_naive()
277            .and_hms_opt(after_local.hour(), anchor_local.minute(), anchor_local.second())
278            .unwrap_or_else(|| after_local.naive_local()),
279        WallCadence::Daily => after_local.date_naive().and_time(time),
280        WallCadence::Weekly => {
281            let date = next_weekday(after_local.date_naive(), anchor_local.weekday());
282            date.and_time(time)
283        }
284        WallCadence::Monthly => month_candidate(
285            after_local.year(),
286            after_local.month(),
287            anchor_local.day(),
288            time,
289        ),
290        WallCadence::Yearly => month_candidate(
291            after_local.year(),
292            anchor_local.month(),
293            anchor_local.day(),
294            time,
295        ),
296    };
297    loop {
298        let resolved = resolve_local(tz, candidate);
299        if resolved > after {
300            return resolved;
301        }
302        candidate = match cadence {
303            WallCadence::Hourly => candidate + Duration::hours(1),
304            WallCadence::Daily => candidate + Duration::days(1),
305            WallCadence::Weekly => candidate + Duration::weeks(1),
306            WallCadence::Monthly => add_months_naive(candidate, 1),
307            WallCadence::Yearly => add_months_naive(candidate, 12),
308        };
309    }
310}
311
312fn next_cron_after(expr: Option<&str>, after: DateTime<Utc>, tz: Tz) -> Result<DateTime<Utc>, String> {
313    let expr = expr
314        .map(str::trim)
315        .filter(|s| !s.is_empty())
316        .ok_or_else(|| "cron expression is required".to_string())?;
317    let cron = Cron::from_str(expr).map_err(|e| e.to_string())?;
318    cron.find_next_occurrence(&after.with_timezone(&tz), false)
319        .map(|d| d.with_timezone(&Utc))
320        .map_err(|e| e.to_string())
321}
322
323fn parse_timezone(raw: Option<&str>) -> Result<Tz, String> {
324    let raw = raw.unwrap_or("UTC").trim();
325    if raw.is_empty() {
326        return Ok(chrono_tz::UTC);
327    }
328    raw.parse::<Tz>()
329        .map_err(|_| format!("unknown timezone `{raw}`"))
330}
331
332fn next_weekday(date: NaiveDate, target: Weekday) -> NaiveDate {
333    let current = date.weekday().num_days_from_monday() as i64;
334    let target = target.num_days_from_monday() as i64;
335    date + Duration::days((target - current).rem_euclid(7))
336}
337
338fn month_candidate(year: i32, month: u32, target_day: u32, time: NaiveTime) -> NaiveDateTime {
339    let day = target_day.min(last_day_of_month(year, month));
340    NaiveDate::from_ymd_opt(year, month, day)
341        .unwrap_or(NaiveDate::MIN)
342        .and_time(time)
343}
344
345fn add_months_naive(dt: NaiveDateTime, n: u32) -> NaiveDateTime {
346    let total = dt.month0() as i32 + n as i32;
347    let year = dt.year() + total.div_euclid(12);
348    let month = total.rem_euclid(12) as u32 + 1;
349    let day = dt.day().min(last_day_of_month(year, month));
350    NaiveDate::from_ymd_opt(year, month, day)
351        .and_then(|d| d.and_hms_opt(dt.hour(), dt.minute(), dt.second()))
352        .unwrap_or(dt)
353}
354
355fn resolve_local(tz: Tz, naive: NaiveDateTime) -> DateTime<Utc> {
356    let mut candidate = naive;
357    for _ in 0..180 {
358        match tz.from_local_datetime(&candidate) {
359            LocalResult::Single(dt) => return dt.with_timezone(&Utc),
360            LocalResult::Ambiguous(a, _) => return a.with_timezone(&Utc),
361            LocalResult::None => candidate += Duration::minutes(1),
362        }
363    }
364    Utc.from_utc_datetime(&naive)
365}
366
367fn last_day_of_month(year: i32, month: u32) -> u32 {
368    let (ny, nm) = if month == 12 {
369        (year + 1, 1)
370    } else {
371        (year, month + 1)
372    };
373    let first_next = Utc
374        .with_ymd_and_hms(ny, nm, 1, 0, 0, 0)
375        .single()
376        .unwrap_or_else(Utc::now);
377    (first_next - Duration::days(1)).day()
378}
379
380#[cfg(test)]
381mod tests {
382    use super::*;
383
384    fn dt(y: i32, mo: u32, d: u32, h: u32, mi: u32) -> DateTime<Utc> {
385        Utc.with_ymd_and_hms(y, mo, d, h, mi, 0).single().unwrap()
386    }
387
388    fn opts() -> ScheduleOptions {
389        ScheduleOptions::default()
390    }
391
392    #[test]
393    fn future_anchor_is_the_next_run() {
394        let anchor = dt(2030, 1, 1, 9, 0);
395        let now = dt(2026, 1, 1, 0, 0);
396        assert_eq!(
397            next_run_after(ScheduleFrequency::Daily, anchor, now, &opts()).unwrap(),
398            Some(anchor)
399        );
400    }
401
402    #[test]
403    fn daily_advances_to_next_day_same_time() {
404        let anchor = dt(2026, 1, 1, 9, 0);
405        let now = dt(2026, 1, 1, 10, 0);
406        assert_eq!(
407            next_run_after(ScheduleFrequency::Daily, anchor, now, &opts()).unwrap(),
408            Some(dt(2026, 1, 2, 9, 0))
409        );
410    }
411
412    #[test]
413    fn hourly_keeps_the_anchor_minute() {
414        let anchor = dt(2026, 1, 1, 9, 30);
415        let now = dt(2026, 1, 1, 9, 45);
416        assert_eq!(
417            next_run_after(ScheduleFrequency::Hourly, anchor, now, &opts()).unwrap(),
418            Some(dt(2026, 1, 1, 10, 30))
419        );
420    }
421
422    #[test]
423    fn weekly_stays_on_the_anchor_weekday() {
424        let anchor = dt(2026, 1, 1, 9, 0);
425        let now = dt(2026, 1, 5, 0, 0);
426        let next = next_run_after(ScheduleFrequency::Weekly, anchor, now, &opts())
427            .unwrap()
428            .unwrap();
429        assert_eq!(next, dt(2026, 1, 8, 9, 0));
430        assert_eq!(next.weekday(), anchor.weekday());
431    }
432
433    #[test]
434    fn monthly_clamps_short_months() {
435        let anchor = dt(2026, 1, 31, 9, 0);
436        let now = dt(2026, 2, 1, 0, 0);
437        // February clamps day 31 → 28 (2026 is not a leap year).
438        assert_eq!(
439            next_run_after(ScheduleFrequency::Monthly, anchor, now, &opts()).unwrap(),
440            Some(dt(2026, 2, 28, 9, 0))
441        );
442    }
443
444    #[test]
445    fn yearly_advances_one_year() {
446        let anchor = dt(2026, 3, 15, 9, 0);
447        let now = dt(2026, 6, 1, 0, 0);
448        assert_eq!(
449            next_run_after(ScheduleFrequency::Yearly, anchor, now, &opts()).unwrap(),
450            Some(dt(2027, 3, 15, 9, 0))
451        );
452    }
453
454    #[test]
455    fn cron_projection_matches_cadence() {
456        let anchor = dt(2026, 3, 15, 9, 30);
457        assert_eq!(cron_expr(ScheduleFrequency::Hourly, anchor, None, None), "30 * * * *");
458        assert_eq!(cron_expr(ScheduleFrequency::Daily, anchor, None, None), "30 9 * * *");
459        assert_eq!(cron_expr(ScheduleFrequency::Monthly, anchor, None, None), "30 9 15 * *");
460        assert_eq!(cron_expr(ScheduleFrequency::Yearly, anchor, None, None), "30 9 15 3 *");
461    }
462
463    #[test]
464    fn once_only_fires_in_the_future() {
465        let anchor = dt(2026, 1, 1, 9, 0);
466        assert_eq!(
467            next_run_after(ScheduleFrequency::Once, anchor, dt(2026, 1, 1, 8, 0), &opts()).unwrap(),
468            Some(anchor)
469        );
470        assert_eq!(
471            next_run_after(ScheduleFrequency::Once, anchor, dt(2026, 1, 1, 10, 0), &opts()).unwrap(),
472            None
473        );
474    }
475
476    #[test]
477    fn interval_uses_every_minutes() {
478        let anchor = dt(2026, 1, 1, 9, 0);
479        let options = ScheduleOptions {
480            every_minutes: Some(15),
481            ..Default::default()
482        };
483        assert_eq!(
484            next_run_after(
485                ScheduleFrequency::EveryNMinutes,
486                anchor,
487                dt(2026, 1, 1, 9, 10),
488                &options
489            )
490            .unwrap(),
491            Some(dt(2026, 1, 1, 9, 15))
492        );
493    }
494
495    #[test]
496    fn max_runs_stops_schedule() {
497        let anchor = dt(2026, 1, 1, 9, 0);
498        let options = ScheduleOptions {
499            max_runs: Some(3),
500            run_count: 3,
501            ..Default::default()
502        };
503        assert_eq!(
504            next_run_after(ScheduleFrequency::Daily, anchor, dt(2026, 1, 2, 0, 0), &options)
505                .unwrap(),
506            None
507        );
508    }
509
510    #[test]
511    fn cron_expression_finds_next_run() {
512        let anchor = dt(2026, 1, 1, 0, 0);
513        let options = ScheduleOptions {
514            cron: Some("*/5 * * * *".into()),
515            ..Default::default()
516        };
517        assert_eq!(
518            next_run_after(
519                ScheduleFrequency::Cron,
520                anchor,
521                dt(2026, 1, 1, 9, 2),
522                &options
523            )
524            .unwrap(),
525            Some(dt(2026, 1, 1, 9, 5))
526        );
527    }
528}