Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/openfang-api/static/index_body.html
Original file line number Diff line number Diff line change
Expand Up @@ -1523,7 +1523,7 @@ <h2>Scheduler</h2>
<th>Agent</th>
<th>Status</th>
<th>Last Run</th>
<th>Runs</th>
<th>Next Run</th>
<th>Actions</th>
</tr>
</thead>
Expand All @@ -1543,7 +1543,7 @@ <h2>Scheduler</h2>
<span class="badge" :class="job.enabled ? 'badge-success' : 'badge-dim'" x-text="job.enabled ? 'Active' : 'Paused'"></span>
</td>
<td class="text-xs" :title="formatTime(job.last_run)" x-text="relativeTime(job.last_run)"></td>
<td class="text-xs" x-text="job.run_count || 0"></td>
<td class="text-xs" :title="formatTime(job.next_run)" x-text="relativeTime(job.next_run)"></td>
<td>
<div class="flex gap-1">
<button class="btn btn-primary btn-sm" @click="runNow(job)" :disabled="runningJobId === job.id">
Expand Down
91 changes: 60 additions & 31 deletions crates/openfang-api/static/js/pages/scheduler.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,29 @@ function schedulerPage() {
},

async loadJobs() {
var data = await OpenFangAPI.get('/api/schedules');
this.jobs = data.schedules || [];
var data = await OpenFangAPI.get('/api/cron/jobs');
var raw = data.jobs || [];
// Normalize cron API response to flat fields the UI expects
this.jobs = raw.map(function(j) {
var cron = '';
if (j.schedule) {
if (j.schedule.kind === 'cron') cron = j.schedule.expr || '';
else if (j.schedule.kind === 'every') cron = 'every ' + j.schedule.every_secs + 's';
else if (j.schedule.kind === 'at') cron = 'at ' + (j.schedule.at || '');
}
return {
id: j.id,
name: j.name,
cron: cron,
agent_id: j.agent_id,
message: j.action ? j.action.message || '' : '',
enabled: j.enabled,
last_run: j.last_run,
next_run: j.next_run,
delivery: j.delivery ? j.delivery.kind || '' : '',
created_at: j.created_at
};
});
},

async loadTriggers() {
Expand All @@ -82,25 +103,20 @@ function schedulerPage() {
async loadHistory() {
this.historyLoading = true;
try {
// Build history from jobs with run data + recent audit entries
var historyItems = [];

// Add job run info from schedule data
var jobs = this.jobs || [];
for (var i = 0; i < jobs.length; i++) {
var job = jobs[i];
if (job.last_run) {
historyItems.push({
timestamp: job.last_run,
name: job.name || job.description || '(unnamed)',
name: job.name || '(unnamed)',
type: 'schedule',
status: 'completed',
run_count: job.run_count || 0
run_count: 0
});
}
}

// Also load trigger fire counts
var triggers = this.triggers || [];
for (var j = 0; j < triggers.length; j++) {
var t = triggers[j];
Expand All @@ -114,12 +130,9 @@ function schedulerPage() {
});
}
}

// Sort by timestamp descending
historyItems.sort(function(a, b) {
return new Date(b.timestamp).getTime() - new Date(a.timestamp).getTime();
});

this.history = historyItems;
} catch(e) {
this.history = [];
Expand All @@ -141,13 +154,16 @@ function schedulerPage() {
this.creating = true;
try {
var jobName = this.newJob.name;
await OpenFangAPI.post('/api/schedules', {
name: this.newJob.name,
cron: this.newJob.cron,
// Use the real cron API format
var body = {
agent_id: this.newJob.agent_id,
message: this.newJob.message,
name: this.newJob.name,
schedule: { kind: 'cron', expr: this.newJob.cron },
action: { kind: 'agent_turn', message: this.newJob.message || 'Scheduled task: ' + this.newJob.name },
delivery: { kind: 'last_channel' },
enabled: this.newJob.enabled
});
};
await OpenFangAPI.post('/api/cron/jobs', body);
this.showCreateForm = false;
this.newJob = { name: '', cron: '', agent_id: '', message: '', enabled: true };
OpenFangToast.success('Schedule "' + jobName + '" created');
Expand All @@ -161,7 +177,7 @@ function schedulerPage() {
async toggleJob(job) {
try {
var newState = !job.enabled;
await OpenFangAPI.put('/api/schedules/' + job.id, { enabled: newState });
await OpenFangAPI.put('/api/cron/jobs/' + job.id + '/enable', { enabled: newState });
job.enabled = newState;
OpenFangToast.success('Schedule ' + (newState ? 'enabled' : 'paused'));
} catch(e) {
Expand All @@ -174,7 +190,7 @@ function schedulerPage() {
var jobName = job.name || job.id;
OpenFangToast.confirm('Delete Schedule', 'Delete "' + jobName + '"? This cannot be undone.', async function() {
try {
await OpenFangAPI.del('/api/schedules/' + job.id);
await OpenFangAPI.del('/api/cron/jobs/' + job.id);
self.jobs = self.jobs.filter(function(j) { return j.id !== job.id; });
OpenFangToast.success('Schedule "' + jobName + '" deleted');
} catch(e) {
Expand All @@ -186,22 +202,21 @@ function schedulerPage() {
async runNow(job) {
this.runningJobId = job.id;
try {
// Use the schedules run endpoint as fallback, or just toggle enable
var result = await OpenFangAPI.post('/api/schedules/' + job.id + '/run', {});
if (result.status === 'completed') {
OpenFangToast.success('Schedule "' + (job.name || 'job') + '" executed successfully');
// Update the job's last_run locally
job.last_run = new Date().toISOString();
job.run_count = (job.run_count || 0) + 1;
} else {
OpenFangToast.error('Schedule run failed: ' + (result.error || 'Unknown error'));
}
} catch(e) {
OpenFangToast.error('Failed to run schedule: ' + (e.message || e));
OpenFangToast.error('Run Now is not yet available for cron jobs. Use Telegram to trigger manually.');
}
this.runningJobId = '';
},

// ── Trigger helpers (reused from workflows page) ──
// ── Trigger helpers ──

triggerType(pattern) {
if (!pattern) return 'unknown';
Expand Down Expand Up @@ -259,13 +274,16 @@ function schedulerPage() {
for (var i = 0; i < agents.length; i++) {
if (agents[i].id === agentId) return agents[i].name;
}
// Truncate UUID
if (agentId.length > 12) return agentId.substring(0, 8) + '...';
return agentId;
},

describeCron(expr) {
if (!expr) return '';
// Handle non-cron schedule descriptions
if (expr.indexOf('every ') === 0) return expr;
if (expr.indexOf('at ') === 0) return 'One-time: ' + expr.substring(3);

var map = {
'* * * * *': 'Every minute',
'*/2 * * * *': 'Every 2 minutes',
Expand All @@ -291,7 +309,6 @@ function schedulerPage() {
};
if (map[expr]) return map[expr];

// Try to parse common patterns
var parts = expr.split(' ');
if (parts.length !== 5) return expr;

Expand All @@ -301,22 +318,27 @@ function schedulerPage() {
var mon = parts[3];
var dow = parts[4];

// "*/N * * * *" patterns
if (min.indexOf('*/') === 0 && hour === '*' && dom === '*' && mon === '*' && dow === '*') {
return 'Every ' + min.substring(2) + ' minutes';
}
// "0 */N * * *" patterns
if (min === '0' && hour.indexOf('*/') === 0 && dom === '*' && mon === '*' && dow === '*') {
return 'Every ' + hour.substring(2) + ' hours';
}
// "M H * * *" — daily at specific time
if (dom === '*' && mon === '*' && dow === '*' && min.match(/^\d+$/) && hour.match(/^\d+$/)) {

// Day-of-week names for friendly display
var dowNames = { '0': 'Sun', '1': 'Mon', '2': 'Tue', '3': 'Wed', '4': 'Thu', '5': 'Fri', '6': 'Sat', '7': 'Sun',
'1-5': 'Weekdays', '0,6': 'Weekends', '6,0': 'Weekends' };

if (dom === '*' && mon === '*' && min.match(/^\d+$/) && hour.match(/^\d+$/)) {
var h = parseInt(hour, 10);
var m = parseInt(min, 10);
var ampm = h >= 12 ? 'PM' : 'AM';
var h12 = h === 0 ? 12 : (h > 12 ? h - 12 : h);
var mStr = m < 10 ? '0' + m : '' + m;
return 'Daily at ' + h12 + ':' + mStr + ' ' + ampm;
var timeStr = h12 + ':' + mStr + ' ' + ampm;
if (dow === '*') return 'Daily at ' + timeStr;
var dowLabel = dowNames[dow] || ('DoW ' + dow);
return dowLabel + ' at ' + timeStr;
}

return expr;
Expand All @@ -340,7 +362,14 @@ function schedulerPage() {
try {
var diff = Date.now() - new Date(ts).getTime();
if (isNaN(diff)) return 'never';
if (diff < 0) return 'just now';
if (diff < 0) {
// Future time
var absDiff = Math.abs(diff);
if (absDiff < 60000) return 'in <1m';
if (absDiff < 3600000) return 'in ' + Math.floor(absDiff / 60000) + 'm';
if (absDiff < 86400000) return 'in ' + Math.floor(absDiff / 3600000) + 'h';
return 'in ' + Math.floor(absDiff / 86400000) + 'd';
}
if (diff < 60000) return 'just now';
if (diff < 3600000) return Math.floor(diff / 60000) + 'm ago';
if (diff < 86400000) return Math.floor(diff / 3600000) + 'h ago';
Expand Down
1 change: 1 addition & 0 deletions crates/openfang-kernel/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ subtle = { workspace = true }
rand = { workspace = true }
hex = { workspace = true }
reqwest = { workspace = true }
cron = "0.15"

[target.'cfg(unix)'.dependencies]
libc = "0.2"
Expand Down
65 changes: 53 additions & 12 deletions crates/openfang-kernel/src/cron.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,16 +293,36 @@ impl CronScheduler {
///
/// - `At { at }` — returns `at` directly.
/// - `Every { every_secs }` — returns `now + every_secs`.
/// - `Cron { .. }` — returns 60 seconds from now (placeholder until a cron
/// expression parser is added).
/// - `Cron { expr, tz }` — parses the cron expression and computes the next
/// matching time. Supports standard 5-field (`min hour dom month dow`) and
/// 6-field (`sec min hour dom month dow`) formats by converting to the
/// 7-field format required by the `cron` crate.
pub fn compute_next_run(schedule: &CronSchedule) -> chrono::DateTime<Utc> {
match schedule {
CronSchedule::At { at } => *at,
CronSchedule::Every { every_secs } => Utc::now() + Duration::seconds(*every_secs as i64),
CronSchedule::Cron { .. } => {
// Placeholder: real cron parsing will be added when the `cron`
// crate is brought in. For now, fire 60 seconds from now.
Utc::now() + Duration::seconds(60)
CronSchedule::Cron { expr, tz: _ } => {
// Convert standard 5/6-field cron to 7-field for the `cron` crate.
// Standard 5-field: min hour dom month dow
// 6-field: sec min hour dom month dow
// cron crate: sec min hour dom month dow year
let fields: Vec<&str> = expr.trim().split_whitespace().collect();
let seven_field = match fields.len() {
5 => format!("0 {} *", expr.trim()),
6 => format!("{} *", expr.trim()),
_ => expr.clone(),
};

match seven_field.parse::<cron::Schedule>() {
Ok(sched) => sched
.upcoming(Utc)
.next()
.unwrap_or_else(|| Utc::now() + Duration::hours(1)),
Err(e) => {
warn!("Failed to parse cron expression '{}': {}", expr, e);
Utc::now() + Duration::hours(1)
}
}
}
}
}
Expand Down Expand Up @@ -655,18 +675,39 @@ mod tests {
}

#[test]
fn test_compute_next_run_cron_placeholder() {
let before = Utc::now();
fn test_compute_next_run_cron_daily() {
let now = Utc::now();
let schedule = CronSchedule::Cron {
expr: "0 9 * * *".into(),
tz: None,
};
let next = compute_next_run(&schedule);
let after = Utc::now();

// Placeholder returns ~60s from now
assert!(next >= before + Duration::seconds(59));
assert!(next <= after + Duration::seconds(61));
// Should be within the next 24 hours (next 09:00 UTC)
assert!(next > now);
assert!(next <= now + Duration::hours(24));
// Should fire at minute 0 of hour 9
assert_eq!(next.format("%M").to_string(), "00");
assert_eq!(next.format("%H").to_string(), "09");
}

#[test]
fn test_compute_next_run_cron_weekday() {
let now = Utc::now();
let schedule = CronSchedule::Cron {
expr: "30 14 * * 1-5".into(),
tz: None,
};
let next = compute_next_run(&schedule);

// Should be within the next 7 days
assert!(next > now);
assert!(next <= now + Duration::days(7));
// Should fire at 14:30
assert_eq!(next.format("%H:%M").to_string(), "14:30");
// Should be a weekday (Mon=1 .. Fri=5 in chrono)
let weekday = next.weekday().num_days_from_monday(); // 0=Mon, 4=Fri
assert!(weekday <= 4, "Expected weekday, got {}", next.weekday());
}

// -- error message truncation in record_failure -------------------------
Expand Down
4 changes: 3 additions & 1 deletion crates/openfang-kernel/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,9 @@ impl AgentScheduler {
// Reset the window if an hour has passed
tracker.reset_if_expired();

if tracker.total_tokens > quota.max_llm_tokens_per_hour {
if quota.max_llm_tokens_per_hour > 0
&& tracker.total_tokens > quota.max_llm_tokens_per_hour
{
return Err(OpenFangError::QuotaExceeded(format!(
"Token limit exceeded: {} / {}",
tracker.total_tokens, quota.max_llm_tokens_per_hour
Expand Down