fix: sync agent live statuses from gateway sessions during scheduled sync (#454)
Closes #450 Co-authored-by: Dan <github@bookkept.com.au>
This commit is contained in:
parent
f872aac504
commit
8431992866
|
|
@ -6,7 +6,8 @@ import { readdirSync, statSync, unlinkSync } from 'fs'
|
|||
import { logger } from './logger'
|
||||
import { processWebhookRetries } from './webhooks'
|
||||
import { syncClaudeSessions } from './claude-sessions'
|
||||
import { pruneGatewaySessionsOlderThan } from './sessions'
|
||||
import { pruneGatewaySessionsOlderThan, getAgentLiveStatuses } from './sessions'
|
||||
import { eventBus } from './event-bus'
|
||||
import { syncSkillsFromDisk } from './skill-sync'
|
||||
import { syncLocalAgents } from './local-agent-sync'
|
||||
import { dispatchAssignedTasks, runAegisReviews } from './task-dispatch'
|
||||
|
|
@ -212,6 +213,64 @@ async function runHeartbeatCheck(): Promise<{ ok: boolean; message: string }> {
|
|||
}
|
||||
}
|
||||
|
||||
/** Sync live agent statuses from gateway session files into the DB */
|
||||
async function syncAgentLiveStatuses(): Promise<number> {
|
||||
const liveStatuses = getAgentLiveStatuses()
|
||||
if (liveStatuses.size === 0) return 0
|
||||
|
||||
const db = getDatabase()
|
||||
const agents = db.prepare('SELECT id, name, config FROM agents').all() as Array<{
|
||||
id: number; name: string; config: string | null
|
||||
}>
|
||||
|
||||
const update = db.prepare('UPDATE agents SET status = ?, last_seen = ?, last_activity = ?, updated_at = ? WHERE id = ?')
|
||||
let refreshed = 0
|
||||
|
||||
const normalize = (s: string) => s.toLowerCase().replace(/[^a-z0-9._-]+/g, '-')
|
||||
|
||||
db.transaction(() => {
|
||||
for (const agent of agents) {
|
||||
// Match by agent name or openclawId from config
|
||||
let openclawId: string | null = null
|
||||
if (agent.config) {
|
||||
try {
|
||||
const cfg = JSON.parse(agent.config)
|
||||
if (typeof cfg.openclawId === 'string' && cfg.openclawId.trim()) {
|
||||
openclawId = cfg.openclawId.trim()
|
||||
}
|
||||
} catch { /* ignore */ }
|
||||
}
|
||||
|
||||
const candidates = [openclawId, agent.name].filter(Boolean).map(s => normalize(s!))
|
||||
let matched: { status: 'active' | 'idle' | 'offline'; lastActivity: number; channel: string } | undefined
|
||||
|
||||
for (const [sessionAgent, info] of liveStatuses) {
|
||||
if (candidates.includes(normalize(sessionAgent))) {
|
||||
matched = info
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if (!matched || matched.status === 'offline') continue
|
||||
|
||||
const now = Math.floor(Date.now() / 1000)
|
||||
const activity = `Gateway session (${matched.channel || 'unknown'})`
|
||||
update.run(matched.status, now, activity, now, agent.id)
|
||||
refreshed++
|
||||
|
||||
eventBus.broadcast('agent.status_changed', {
|
||||
id: agent.id,
|
||||
name: agent.name,
|
||||
status: matched.status,
|
||||
last_seen: now,
|
||||
last_activity: activity,
|
||||
})
|
||||
}
|
||||
})()
|
||||
|
||||
return refreshed
|
||||
}
|
||||
|
||||
const DAILY_MS = 24 * 60 * 60 * 1000
|
||||
const FIVE_MINUTES_MS = 5 * 60 * 1000
|
||||
const TICK_MS = 60 * 1000 // Check every minute
|
||||
|
|
@ -376,7 +435,10 @@ async function tick() {
|
|||
: id === 'claude_session_scan' ? await syncClaudeSessions()
|
||||
: id === 'skill_sync' ? await syncSkillsFromDisk()
|
||||
: id === 'local_agent_sync' ? await syncLocalAgents()
|
||||
: id === 'gateway_agent_sync' ? await syncAgentsFromConfig('scheduled').then(r => ({ ok: true, message: `Gateway sync: ${r.created} created, ${r.updated} updated, ${r.synced} total` }))
|
||||
: id === 'gateway_agent_sync' ? await syncAgentsFromConfig('scheduled').then(async r => {
|
||||
const refreshed = await syncAgentLiveStatuses()
|
||||
return { ok: true, message: `Gateway sync: ${r.created} created, ${r.updated} updated, ${r.synced} total | Live status: ${refreshed} refreshed` }
|
||||
})
|
||||
: id === 'task_dispatch' ? await dispatchAssignedTasks()
|
||||
: id === 'aegis_review' ? await runAegisReviews()
|
||||
: id === 'recurring_task_spawn' ? await spawnRecurringTasks()
|
||||
|
|
|
|||
Loading…
Reference in New Issue