1use chrono::{
6 DateTime, Datelike, Duration, LocalResult, NaiveDate, NaiveDateTime, NaiveTime, TimeZone,
7 Timelike, Utc, Weekday,
8};
9use chrono_tz::Tz;
10use croner::Cron;
11use serde::{Deserialize, Serialize};
12use std::str::FromStr;
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
17#[serde(rename_all = "lowercase")]
18pub enum ScheduleFrequency {
19 Hourly,
20 Daily,
21 Weekly,
22 Monthly,
23 Yearly,
24 Cron,
25 EveryNMinutes,
26 Once,
27}
28
29impl ScheduleFrequency {
30 pub fn as_str(self) -> &'static str {
31 match self {
32 Self::Hourly => "hourly",
33 Self::Daily => "daily",
34 Self::Weekly => "weekly",
35 Self::Monthly => "monthly",
36 Self::Yearly => "yearly",
37 Self::Cron => "cron",
38 Self::EveryNMinutes => "every_n_minutes",
39 Self::Once => "once",
40 }
41 }
42
43 pub fn parse(s: &str) -> Option<Self> {
45 match s.trim().to_ascii_lowercase().as_str() {
46 "hourly" => Some(Self::Hourly),
47 "daily" => Some(Self::Daily),
48 "weekly" => Some(Self::Weekly),
49 "monthly" => Some(Self::Monthly),
50 "yearly" => Some(Self::Yearly),
51 "cron" => Some(Self::Cron),
52 "every_n_minutes" | "every-n-minutes" | "interval" => Some(Self::EveryNMinutes),
53 "once" => Some(Self::Once),
54 _ => None,
55 }
56 }
57}
58
59#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
61#[serde(rename_all = "kebab-case")]
62pub enum CatchUpPolicy {
63 Skip,
64 RunOnce,
65 RunAll,
66}
67
68impl CatchUpPolicy {
69 pub fn as_str(self) -> &'static str {
70 match self {
71 Self::Skip => "skip",
72 Self::RunOnce => "run-once",
73 Self::RunAll => "run-all",
74 }
75 }
76
77 pub fn parse(s: &str) -> Option<Self> {
78 match s.trim().to_ascii_lowercase().as_str() {
79 "skip" => Some(Self::Skip),
80 "run-once" | "run_once" => Some(Self::RunOnce),
81 "run-all" | "run_all" => Some(Self::RunAll),
82 _ => None,
83 }
84 }
85}
86
87impl Default for CatchUpPolicy {
88 fn default() -> Self {
89 Self::RunOnce
90 }
91}
92
93#[derive(Debug, Clone, Deserialize)]
96#[serde(rename_all = "camelCase")]
97pub struct SchedulePatch {
98 pub template_slug: String,
99 pub collection_slug: String,
100 pub flow_name: String,
101 pub enabled: bool,
102 pub frequency: String,
103 pub anchor_at: DateTime<Utc>,
104 pub timezone: Option<String>,
105 pub cron: Option<String>,
106 pub every_minutes: Option<u32>,
107 pub until: Option<DateTime<Utc>>,
108 pub max_runs: Option<u32>,
109 pub catchup: Option<String>,
110}
111
112#[derive(Debug, Clone, Deserialize)]
115#[serde(rename_all = "camelCase")]
116pub struct SchedulePreviewRequest {
117 pub frequency: String,
118 pub anchor_at: DateTime<Utc>,
119 pub timezone: Option<String>,
120 pub cron: Option<String>,
121 pub every_minutes: Option<u32>,
122 pub until: Option<DateTime<Utc>>,
123 pub max_runs: Option<u32>,
124 pub run_count: Option<u32>,
125 pub limit: Option<usize>,
126}
127
128pub fn next_run_after(
131 freq: ScheduleFrequency,
132 anchor: DateTime<Utc>,
133 after: DateTime<Utc>,
134 opts: &ScheduleOptions,
135) -> Result<Option<DateTime<Utc>>, String> {
136 if let Some(max) = opts.max_runs {
137 if opts.run_count >= max {
138 return Ok(None);
139 }
140 }
141
142 let tz = parse_timezone(opts.timezone.as_deref())?;
143 let next = match freq {
144 ScheduleFrequency::Hourly => next_wall_after(anchor, after, tz, WallCadence::Hourly),
145 ScheduleFrequency::Daily => next_wall_after(anchor, after, tz, WallCadence::Daily),
146 ScheduleFrequency::Weekly => next_wall_after(anchor, after, tz, WallCadence::Weekly),
147 ScheduleFrequency::Monthly => next_wall_after(anchor, after, tz, WallCadence::Monthly),
148 ScheduleFrequency::Yearly => next_wall_after(anchor, after, tz, WallCadence::Yearly),
149 ScheduleFrequency::Cron => next_cron_after(opts.cron.as_deref(), after, tz)?,
150 ScheduleFrequency::EveryNMinutes => {
151 let minutes = opts.every_minutes.unwrap_or(15).clamp(1, 24 * 60);
152 if anchor > after {
153 anchor
154 } else {
155 step_fixed(anchor, after, Duration::minutes(minutes as i64))
156 }
157 }
158 ScheduleFrequency::Once => {
159 if anchor > after {
160 anchor
161 } else {
162 return Ok(None);
163 }
164 }
165 };
166
167 if opts.until.is_some_and(|until| next > until) {
168 return Ok(None);
169 }
170 Ok(Some(next))
171}
172
173#[derive(Debug, Clone, Default)]
174pub struct ScheduleOptions {
175 pub timezone: Option<String>,
176 pub cron: Option<String>,
177 pub every_minutes: Option<u32>,
178 pub until: Option<DateTime<Utc>>,
179 pub max_runs: Option<u32>,
180 pub run_count: u32,
181}
182
183pub fn cron_expr(
185 freq: ScheduleFrequency,
186 anchor: DateTime<Utc>,
187 timezone: Option<&str>,
188 cron: Option<&str>,
189) -> String {
190 if freq == ScheduleFrequency::Cron {
191 return cron.unwrap_or("").to_string();
192 }
193 let tz = parse_timezone(timezone).unwrap_or(chrono_tz::UTC);
194 let anchor = anchor.with_timezone(&tz);
195 let (m, h, dom, mon) = (
196 anchor.minute(),
197 anchor.hour(),
198 anchor.day(),
199 anchor.month(),
200 );
201 let dow = anchor.weekday().num_days_from_sunday();
203 match freq {
204 ScheduleFrequency::Hourly => format!("{m} * * * *"),
205 ScheduleFrequency::Daily => format!("{m} {h} * * *"),
206 ScheduleFrequency::Weekly => format!("{m} {h} * * {dow}"),
207 ScheduleFrequency::Monthly => format!("{m} {h} {dom} * *"),
208 ScheduleFrequency::Yearly => format!("{m} {h} {dom} {mon} *"),
209 ScheduleFrequency::EveryNMinutes => "*/15 * * * *".into(),
210 ScheduleFrequency::Once => String::new(),
211 ScheduleFrequency::Cron => unreachable!(),
212 }
213}
214
215pub fn preview(req: SchedulePreviewRequest) -> Result<Vec<DateTime<Utc>>, String> {
216 let freq = ScheduleFrequency::parse(&req.frequency)
217 .ok_or_else(|| format!("unknown schedule frequency `{}`", req.frequency))?;
218 let limit = req.limit.unwrap_or(5).clamp(1, 20);
219 let mut opts = ScheduleOptions {
220 timezone: req.timezone,
221 cron: req.cron,
222 every_minutes: req.every_minutes,
223 until: req.until,
224 max_runs: req.max_runs,
225 run_count: req.run_count.unwrap_or(0),
226 };
227 let mut out = Vec::new();
228 let mut after = Utc::now() - Duration::seconds(1);
229 while out.len() < limit {
230 let Some(next) = next_run_after(freq, req.anchor_at, after, &opts)? else {
231 break;
232 };
233 out.push(next);
234 after = next;
235 opts.run_count = opts.run_count.saturating_add(1);
236 }
237 Ok(out)
238}
239
240fn step_fixed(anchor: DateTime<Utc>, after: DateTime<Utc>, period: Duration) -> DateTime<Utc> {
242 let period_secs = period.num_seconds().max(1);
243 let elapsed = (after - anchor).num_seconds().max(0);
244 let k = elapsed / period_secs + 1;
245 anchor + Duration::seconds(k * period_secs)
246}
247
248#[derive(Debug, Clone, Copy)]
249enum WallCadence {
250 Hourly,
251 Daily,
252 Weekly,
253 Monthly,
254 Yearly,
255}
256
257fn next_wall_after(
258 anchor: DateTime<Utc>,
259 after: DateTime<Utc>,
260 tz: Tz,
261 cadence: WallCadence,
262) -> DateTime<Utc> {
263 if anchor > after {
264 return anchor;
265 }
266 let anchor_local = anchor.with_timezone(&tz);
267 let after_local = after.with_timezone(&tz);
268 let time = NaiveTime::from_hms_opt(
269 anchor_local.hour(),
270 anchor_local.minute(),
271 anchor_local.second(),
272 )
273 .unwrap_or(NaiveTime::MIN);
274 let mut candidate = match cadence {
275 WallCadence::Hourly => after_local
276 .date_naive()
277 .and_hms_opt(after_local.hour(), anchor_local.minute(), anchor_local.second())
278 .unwrap_or_else(|| after_local.naive_local()),
279 WallCadence::Daily => after_local.date_naive().and_time(time),
280 WallCadence::Weekly => {
281 let date = next_weekday(after_local.date_naive(), anchor_local.weekday());
282 date.and_time(time)
283 }
284 WallCadence::Monthly => month_candidate(
285 after_local.year(),
286 after_local.month(),
287 anchor_local.day(),
288 time,
289 ),
290 WallCadence::Yearly => month_candidate(
291 after_local.year(),
292 anchor_local.month(),
293 anchor_local.day(),
294 time,
295 ),
296 };
297 loop {
298 let resolved = resolve_local(tz, candidate);
299 if resolved > after {
300 return resolved;
301 }
302 candidate = match cadence {
303 WallCadence::Hourly => candidate + Duration::hours(1),
304 WallCadence::Daily => candidate + Duration::days(1),
305 WallCadence::Weekly => candidate + Duration::weeks(1),
306 WallCadence::Monthly => add_months_naive(candidate, 1),
307 WallCadence::Yearly => add_months_naive(candidate, 12),
308 };
309 }
310}
311
312fn next_cron_after(expr: Option<&str>, after: DateTime<Utc>, tz: Tz) -> Result<DateTime<Utc>, String> {
313 let expr = expr
314 .map(str::trim)
315 .filter(|s| !s.is_empty())
316 .ok_or_else(|| "cron expression is required".to_string())?;
317 let cron = Cron::from_str(expr).map_err(|e| e.to_string())?;
318 cron.find_next_occurrence(&after.with_timezone(&tz), false)
319 .map(|d| d.with_timezone(&Utc))
320 .map_err(|e| e.to_string())
321}
322
323fn parse_timezone(raw: Option<&str>) -> Result<Tz, String> {
324 let raw = raw.unwrap_or("UTC").trim();
325 if raw.is_empty() {
326 return Ok(chrono_tz::UTC);
327 }
328 raw.parse::<Tz>()
329 .map_err(|_| format!("unknown timezone `{raw}`"))
330}
331
332fn next_weekday(date: NaiveDate, target: Weekday) -> NaiveDate {
333 let current = date.weekday().num_days_from_monday() as i64;
334 let target = target.num_days_from_monday() as i64;
335 date + Duration::days((target - current).rem_euclid(7))
336}
337
338fn month_candidate(year: i32, month: u32, target_day: u32, time: NaiveTime) -> NaiveDateTime {
339 let day = target_day.min(last_day_of_month(year, month));
340 NaiveDate::from_ymd_opt(year, month, day)
341 .unwrap_or(NaiveDate::MIN)
342 .and_time(time)
343}
344
345fn add_months_naive(dt: NaiveDateTime, n: u32) -> NaiveDateTime {
346 let total = dt.month0() as i32 + n as i32;
347 let year = dt.year() + total.div_euclid(12);
348 let month = total.rem_euclid(12) as u32 + 1;
349 let day = dt.day().min(last_day_of_month(year, month));
350 NaiveDate::from_ymd_opt(year, month, day)
351 .and_then(|d| d.and_hms_opt(dt.hour(), dt.minute(), dt.second()))
352 .unwrap_or(dt)
353}
354
355fn resolve_local(tz: Tz, naive: NaiveDateTime) -> DateTime<Utc> {
356 let mut candidate = naive;
357 for _ in 0..180 {
358 match tz.from_local_datetime(&candidate) {
359 LocalResult::Single(dt) => return dt.with_timezone(&Utc),
360 LocalResult::Ambiguous(a, _) => return a.with_timezone(&Utc),
361 LocalResult::None => candidate += Duration::minutes(1),
362 }
363 }
364 Utc.from_utc_datetime(&naive)
365}
366
367fn last_day_of_month(year: i32, month: u32) -> u32 {
368 let (ny, nm) = if month == 12 {
369 (year + 1, 1)
370 } else {
371 (year, month + 1)
372 };
373 let first_next = Utc
374 .with_ymd_and_hms(ny, nm, 1, 0, 0, 0)
375 .single()
376 .unwrap_or_else(Utc::now);
377 (first_next - Duration::days(1)).day()
378}
379
380#[cfg(test)]
381mod tests {
382 use super::*;
383
384 fn dt(y: i32, mo: u32, d: u32, h: u32, mi: u32) -> DateTime<Utc> {
385 Utc.with_ymd_and_hms(y, mo, d, h, mi, 0).single().unwrap()
386 }
387
388 fn opts() -> ScheduleOptions {
389 ScheduleOptions::default()
390 }
391
392 #[test]
393 fn future_anchor_is_the_next_run() {
394 let anchor = dt(2030, 1, 1, 9, 0);
395 let now = dt(2026, 1, 1, 0, 0);
396 assert_eq!(
397 next_run_after(ScheduleFrequency::Daily, anchor, now, &opts()).unwrap(),
398 Some(anchor)
399 );
400 }
401
402 #[test]
403 fn daily_advances_to_next_day_same_time() {
404 let anchor = dt(2026, 1, 1, 9, 0);
405 let now = dt(2026, 1, 1, 10, 0);
406 assert_eq!(
407 next_run_after(ScheduleFrequency::Daily, anchor, now, &opts()).unwrap(),
408 Some(dt(2026, 1, 2, 9, 0))
409 );
410 }
411
412 #[test]
413 fn hourly_keeps_the_anchor_minute() {
414 let anchor = dt(2026, 1, 1, 9, 30);
415 let now = dt(2026, 1, 1, 9, 45);
416 assert_eq!(
417 next_run_after(ScheduleFrequency::Hourly, anchor, now, &opts()).unwrap(),
418 Some(dt(2026, 1, 1, 10, 30))
419 );
420 }
421
422 #[test]
423 fn weekly_stays_on_the_anchor_weekday() {
424 let anchor = dt(2026, 1, 1, 9, 0);
425 let now = dt(2026, 1, 5, 0, 0);
426 let next = next_run_after(ScheduleFrequency::Weekly, anchor, now, &opts())
427 .unwrap()
428 .unwrap();
429 assert_eq!(next, dt(2026, 1, 8, 9, 0));
430 assert_eq!(next.weekday(), anchor.weekday());
431 }
432
433 #[test]
434 fn monthly_clamps_short_months() {
435 let anchor = dt(2026, 1, 31, 9, 0);
436 let now = dt(2026, 2, 1, 0, 0);
437 assert_eq!(
439 next_run_after(ScheduleFrequency::Monthly, anchor, now, &opts()).unwrap(),
440 Some(dt(2026, 2, 28, 9, 0))
441 );
442 }
443
444 #[test]
445 fn yearly_advances_one_year() {
446 let anchor = dt(2026, 3, 15, 9, 0);
447 let now = dt(2026, 6, 1, 0, 0);
448 assert_eq!(
449 next_run_after(ScheduleFrequency::Yearly, anchor, now, &opts()).unwrap(),
450 Some(dt(2027, 3, 15, 9, 0))
451 );
452 }
453
454 #[test]
455 fn cron_projection_matches_cadence() {
456 let anchor = dt(2026, 3, 15, 9, 30);
457 assert_eq!(cron_expr(ScheduleFrequency::Hourly, anchor, None, None), "30 * * * *");
458 assert_eq!(cron_expr(ScheduleFrequency::Daily, anchor, None, None), "30 9 * * *");
459 assert_eq!(cron_expr(ScheduleFrequency::Monthly, anchor, None, None), "30 9 15 * *");
460 assert_eq!(cron_expr(ScheduleFrequency::Yearly, anchor, None, None), "30 9 15 3 *");
461 }
462
463 #[test]
464 fn once_only_fires_in_the_future() {
465 let anchor = dt(2026, 1, 1, 9, 0);
466 assert_eq!(
467 next_run_after(ScheduleFrequency::Once, anchor, dt(2026, 1, 1, 8, 0), &opts()).unwrap(),
468 Some(anchor)
469 );
470 assert_eq!(
471 next_run_after(ScheduleFrequency::Once, anchor, dt(2026, 1, 1, 10, 0), &opts()).unwrap(),
472 None
473 );
474 }
475
476 #[test]
477 fn interval_uses_every_minutes() {
478 let anchor = dt(2026, 1, 1, 9, 0);
479 let options = ScheduleOptions {
480 every_minutes: Some(15),
481 ..Default::default()
482 };
483 assert_eq!(
484 next_run_after(
485 ScheduleFrequency::EveryNMinutes,
486 anchor,
487 dt(2026, 1, 1, 9, 10),
488 &options
489 )
490 .unwrap(),
491 Some(dt(2026, 1, 1, 9, 15))
492 );
493 }
494
495 #[test]
496 fn max_runs_stops_schedule() {
497 let anchor = dt(2026, 1, 1, 9, 0);
498 let options = ScheduleOptions {
499 max_runs: Some(3),
500 run_count: 3,
501 ..Default::default()
502 };
503 assert_eq!(
504 next_run_after(ScheduleFrequency::Daily, anchor, dt(2026, 1, 2, 0, 0), &options)
505 .unwrap(),
506 None
507 );
508 }
509
510 #[test]
511 fn cron_expression_finds_next_run() {
512 let anchor = dt(2026, 1, 1, 0, 0);
513 let options = ScheduleOptions {
514 cron: Some("*/5 * * * *".into()),
515 ..Default::default()
516 };
517 assert_eq!(
518 next_run_after(
519 ScheduleFrequency::Cron,
520 anchor,
521 dt(2026, 1, 1, 9, 2),
522 &options
523 )
524 .unwrap(),
525 Some(dt(2026, 1, 1, 9, 5))
526 );
527 }
528}