diff --git a/docs/deployment.md b/docs/deployment.md index 1e63ada..ca29669 100644 --- a/docs/deployment.md +++ b/docs/deployment.md @@ -115,6 +115,82 @@ See `.env.example` for the full list. Key variables: | `OPENCLAW_HOME` | No | - | Path to OpenClaw installation | | `MC_ALLOWED_HOSTS` | No | `localhost,127.0.0.1` | Allowed hosts in production | +## Kubernetes Sidecar Deployment + +When running Mission Control alongside a gateway as containers in the same pod (sidecar pattern), agents are not discovered via the filesystem. Instead, use the gateway's agent registration API. + +### Architecture + +``` +┌──────────────── Pod ────────────────┐ +│ ┌─────────┐ ┌───────────────┐ │ +│ │ MC │◄───►│ Gateway │ │ +│ │ :3000 │ │ :18789 │ │ +│ └─────────┘ └───────────────┘ │ +│ ▲ ▲ │ +│ │ localhost │ │ +│ └──────────────────┘ │ +└─────────────────────────────────────┘ +``` + +### Required Configuration + +**Environment variables** for the MC container: + +```bash +AUTH_USER=admin +AUTH_PASS= +API_KEY= +OPENCLAW_GATEWAY_HOST=127.0.0.1 +NEXT_PUBLIC_GATEWAY_PORT=18789 +``` + +### Agent Registration + +The gateway must register its agents with MC on startup. Include the `agents` array in the gateway registration request: + +```bash +curl -X POST http://localhost:3000/api/gateways \ + -H "Authorization: Bearer " \ + -H "Content-Type: application/json" \ + -d '{ + "name": "sidecar-gateway", + "host": "127.0.0.1", + "port": 18789, + "is_primary": true, + "agents": [ + { "name": "developer-1", "role": "developer" }, + { "name": "researcher-1", "role": "researcher" } + ] + }' +``` + +To update the agent list on reconnect, use `PUT /api/gateways` with the same `agents` field. + +Alternatively, each agent can register itself via the direct connection endpoint: + +```bash +curl -X POST http://localhost:3000/api/connect \ + -H "Authorization: Bearer " \ + -H "Content-Type: application/json" \ + -d '{ + "tool_name": "openclaw-gateway", + "agent_name": "developer-1", + "agent_role": "developer" + }' +``` + +### Health Checks + +Agents must send heartbeats to stay visible: + +```bash +curl http://localhost:3000/api/agents//heartbeat \ + -H "Authorization: Bearer " +``` + +Without heartbeats, agents will be marked offline after 10 minutes (configurable via `general.agent_timeout_minutes` setting). + ## Troubleshooting ### "Module not found: better-sqlite3" diff --git a/src/app/api/gateways/route.ts b/src/app/api/gateways/route.ts index b2d7cd3..d87f8df 100644 --- a/src/app/api/gateways/route.ts +++ b/src/app/api/gateways/route.ts @@ -80,7 +80,7 @@ export async function POST(request: NextRequest) { ensureTable(db) const body = await request.json() - const { name, host, port, token, is_primary } = body + const { name, host, port, token, is_primary, agents } = body if (!name || !host || !port) { return NextResponse.json({ error: 'name, host, and port are required' }, { status: 400 }) @@ -96,14 +96,37 @@ export async function POST(request: NextRequest) { INSERT INTO gateways (name, host, port, token, is_primary) VALUES (?, ?, ?, ?, ?) `).run(name, host, port, token || '', is_primary ? 1 : 0) + // Auto-register agents reported by the gateway (k8s sidecar support) + let agentsRegistered = 0 + if (Array.isArray(agents) && agents.length > 0) { + const workspaceId = auth.user?.workspace_id ?? 1 + const now = Math.floor(Date.now() / 1000) + const upsertAgent = db.prepare(` + INSERT INTO agents (name, role, status, last_seen, source, workspace_id, updated_at) + VALUES (?, ?, 'idle', ?, 'gateway', ?, ?) + ON CONFLICT(name) DO UPDATE SET + status = 'idle', + last_seen = excluded.last_seen, + source = 'gateway', + updated_at = excluded.updated_at + `) + for (const agent of agents.slice(0, 50)) { + if (typeof agent?.name !== 'string' || !agent.name.trim()) continue + const agentName = agent.name.trim().substring(0, 100) + const agentRole = typeof agent?.role === 'string' ? agent.role.trim().substring(0, 100) : 'agent' + upsertAgent.run(agentName, agentRole, now, workspaceId, now) + agentsRegistered++ + } + } + try { db.prepare('INSERT INTO audit_log (action, actor, detail) VALUES (?, ?, ?)').run( - 'gateway_added', auth.user?.username || 'system', `Added gateway: ${name} (${host}:${port})` + 'gateway_added', auth.user?.username || 'system', `Added gateway: ${name} (${host}:${port})${agentsRegistered ? `, registered ${agentsRegistered} agent(s)` : ''}` ) } catch { /* audit might not exist */ } const gw = db.prepare('SELECT * FROM gateways WHERE id = ?').get(result.lastInsertRowid) as GatewayEntry - return NextResponse.json({ gateway: redactToken(gw) }, { status: 201 }) + return NextResponse.json({ gateway: redactToken(gw), agents_registered: agentsRegistered }, { status: 201 }) } catch (err: any) { if (err.message?.includes('UNIQUE')) { return NextResponse.json({ error: 'A gateway with that name already exists' }, { status: 409 }) @@ -145,15 +168,39 @@ export async function PUT(request: NextRequest) { } } - if (sets.length === 0) return NextResponse.json({ error: 'No valid fields to update' }, { status: 400 }) + if (sets.length === 0 && !Array.isArray(updates.agents)) return NextResponse.json({ error: 'No valid fields to update' }, { status: 400 }) - sets.push('updated_at = (unixepoch())') - values.push(id) + if (sets.length > 0) { + sets.push('updated_at = (unixepoch())') + values.push(id) + db.prepare(`UPDATE gateways SET ${sets.join(', ')} WHERE id = ?`).run(...values) + } - db.prepare(`UPDATE gateways SET ${sets.join(', ')} WHERE id = ?`).run(...values) + // Auto-register agents reported by the gateway (k8s sidecar support) + let agentsRegistered = 0 + if (Array.isArray(updates.agents) && updates.agents.length > 0) { + const workspaceId = auth.user?.workspace_id ?? 1 + const now = Math.floor(Date.now() / 1000) + const upsertAgent = db.prepare(` + INSERT INTO agents (name, role, status, last_seen, source, workspace_id, updated_at) + VALUES (?, ?, 'idle', ?, 'gateway', ?, ?) + ON CONFLICT(name, workspace_id) DO UPDATE SET + status = 'idle', + last_seen = excluded.last_seen, + source = 'gateway', + updated_at = excluded.updated_at + `) + for (const agent of updates.agents.slice(0, 50)) { + if (typeof agent?.name !== 'string' || !agent.name.trim()) continue + const agentName = agent.name.trim().substring(0, 100) + const agentRole = typeof agent?.role === 'string' ? agent.role.trim().substring(0, 100) : 'agent' + upsertAgent.run(agentName, agentRole, now, workspaceId, now) + agentsRegistered++ + } + } const updated = db.prepare('SELECT * FROM gateways WHERE id = ?').get(id) as GatewayEntry - return NextResponse.json({ gateway: redactToken(updated) }) + return NextResponse.json({ gateway: redactToken(updated), agents_registered: agentsRegistered }) } /** diff --git a/src/app/api/tasks/queue/route.ts b/src/app/api/tasks/queue/route.ts index 707b9cd..d70f4a3 100644 --- a/src/app/api/tasks/queue/route.ts +++ b/src/app/api/tasks/queue/route.ts @@ -105,37 +105,28 @@ export async function GET(request: NextRequest) { }) } - // Best-effort atomic pickup loop for race safety. - for (let attempt = 0; attempt < 5; attempt += 1) { - const candidate = db.prepare(` - SELECT * - FROM tasks + // Atomic claim: single UPDATE with subquery to eliminate SELECT-UPDATE race condition. + const claimed = db.prepare(` + UPDATE tasks + SET status = 'in_progress', assigned_to = ?, updated_at = ? + WHERE id = ( + SELECT id FROM tasks WHERE workspace_id = ? AND status IN ('assigned', 'inbox') AND (assigned_to IS NULL OR assigned_to = ?) ORDER BY ${priorityRankSql()} ASC, due_date ASC NULLS LAST, created_at ASC LIMIT 1 - `).get(workspaceId, agent) as any | undefined + ) + RETURNING * + `).get(agent, now, workspaceId, agent) as any | undefined - if (!candidate) break - - const claimed = db.prepare(` - UPDATE tasks - SET status = 'in_progress', assigned_to = ?, updated_at = ? - WHERE id = ? AND workspace_id = ? - AND status IN ('assigned', 'inbox') - AND (assigned_to IS NULL OR assigned_to = ?) - `).run(agent, now, candidate.id, workspaceId, agent) - - if (claimed.changes > 0) { - const task = db.prepare('SELECT * FROM tasks WHERE id = ? AND workspace_id = ?').get(candidate.id, workspaceId) as any - return NextResponse.json({ - task: mapTaskRow(task), - reason: 'assigned' as QueueReason, - agent, - timestamp: now, - }) - } + if (claimed) { + return NextResponse.json({ + task: mapTaskRow(claimed), + reason: 'assigned' as QueueReason, + agent, + timestamp: now, + }) } return NextResponse.json({ diff --git a/src/lib/migrations.ts b/src/lib/migrations.ts index 07b97d8..51b16c0 100644 --- a/src/lib/migrations.ts +++ b/src/lib/migrations.ts @@ -1294,6 +1294,16 @@ const migrations: Migration[] = [ db.exec(`CREATE INDEX IF NOT EXISTS idx_spawn_history_created ON spawn_history(created_at)`) db.exec(`CREATE INDEX IF NOT EXISTS idx_spawn_history_status ON spawn_history(status)`) } + }, + { + id: '044_task_dispatch_attempts', + up(db: Database.Database) { + const cols = db.prepare(`PRAGMA table_info(tasks)`).all() as Array<{ name: string }> + if (!cols.some(c => c.name === 'dispatch_attempts')) { + db.exec(`ALTER TABLE tasks ADD COLUMN dispatch_attempts INTEGER NOT NULL DEFAULT 0`) + } + db.exec(`CREATE INDEX IF NOT EXISTS idx_tasks_stale_inprogress ON tasks(status, updated_at) WHERE status = 'in_progress'`) + } } ] diff --git a/src/lib/scheduler.ts b/src/lib/scheduler.ts index c81e68a..e8b9090 100644 --- a/src/lib/scheduler.ts +++ b/src/lib/scheduler.ts @@ -10,7 +10,7 @@ import { pruneGatewaySessionsOlderThan, getAgentLiveStatuses } from './sessions' import { eventBus } from './event-bus' import { syncSkillsFromDisk } from './skill-sync' import { syncLocalAgents } from './local-agent-sync' -import { dispatchAssignedTasks, runAegisReviews } from './task-dispatch' +import { dispatchAssignedTasks, runAegisReviews, requeueStaleTasks } from './task-dispatch' import { spawnRecurringTasks } from './recurring-tasks' const BACKUP_DIR = join(dirname(config.dbPath), 'backups') @@ -389,6 +389,15 @@ export function initScheduler() { running: false, }) + tasks.set('stale_task_requeue', { + name: 'Stale Task Requeue', + intervalMs: TICK_MS, // Every 60s — check for stale in_progress tasks + lastRun: null, + nextRun: now + 25_000, // First check 25s after startup + enabled: true, + running: false, + }) + // Start the tick loop tickInterval = setInterval(tick, TICK_MS) logger.info('Scheduler initialized - backup at ~3AM, cleanup at ~4AM, heartbeat every 5m, webhook/claude/skill/local-agent/gateway-agent sync every 60s') @@ -423,8 +432,9 @@ async function tick() { : id === 'task_dispatch' ? 'general.task_dispatch' : id === 'aegis_review' ? 'general.aegis_review' : id === 'recurring_task_spawn' ? 'general.recurring_task_spawn' + : id === 'stale_task_requeue' ? 'general.stale_task_requeue' : 'general.agent_heartbeat' - const defaultEnabled = id === 'agent_heartbeat' || id === 'webhook_retry' || id === 'claude_session_scan' || id === 'skill_sync' || id === 'local_agent_sync' || id === 'gateway_agent_sync' || id === 'task_dispatch' || id === 'aegis_review' || id === 'recurring_task_spawn' + const defaultEnabled = id === 'agent_heartbeat' || id === 'webhook_retry' || id === 'claude_session_scan' || id === 'skill_sync' || id === 'local_agent_sync' || id === 'gateway_agent_sync' || id === 'task_dispatch' || id === 'aegis_review' || id === 'recurring_task_spawn' || id === 'stale_task_requeue' if (!isSettingEnabled(settingKey, defaultEnabled)) continue task.running = true @@ -442,6 +452,7 @@ async function tick() { : id === 'task_dispatch' ? await dispatchAssignedTasks() : id === 'aegis_review' ? await runAegisReviews() : id === 'recurring_task_spawn' ? await spawnRecurringTasks() + : id === 'stale_task_requeue' ? await requeueStaleTasks() : await runCleanup() task.lastResult = { ...result, timestamp: now } } catch (err: any) { @@ -477,8 +488,9 @@ export function getSchedulerStatus() { : id === 'task_dispatch' ? 'general.task_dispatch' : id === 'aegis_review' ? 'general.aegis_review' : id === 'recurring_task_spawn' ? 'general.recurring_task_spawn' + : id === 'stale_task_requeue' ? 'general.stale_task_requeue' : 'general.agent_heartbeat' - const defaultEnabled = id === 'agent_heartbeat' || id === 'webhook_retry' || id === 'claude_session_scan' || id === 'skill_sync' || id === 'local_agent_sync' || id === 'gateway_agent_sync' || id === 'task_dispatch' || id === 'aegis_review' || id === 'recurring_task_spawn' + const defaultEnabled = id === 'agent_heartbeat' || id === 'webhook_retry' || id === 'claude_session_scan' || id === 'skill_sync' || id === 'local_agent_sync' || id === 'gateway_agent_sync' || id === 'task_dispatch' || id === 'aegis_review' || id === 'recurring_task_spawn' || id === 'stale_task_requeue' result.push({ id, name: task.name, @@ -506,6 +518,7 @@ export async function triggerTask(taskId: string): Promise<{ ok: boolean; messag if (taskId === 'task_dispatch') return dispatchAssignedTasks() if (taskId === 'aegis_review') return runAegisReviews() if (taskId === 'recurring_task_spawn') return spawnRecurringTasks() + if (taskId === 'stale_task_requeue') return requeueStaleTasks() return { ok: false, message: `Unknown task: ${taskId}` } } diff --git a/src/lib/task-dispatch.ts b/src/lib/task-dispatch.ts index f3875be..3b7e9bb 100644 --- a/src/lib/task-dispatch.ts +++ b/src/lib/task-dispatch.ts @@ -306,21 +306,43 @@ export async function runAegisReviews(): Promise<{ ok: boolean; message: string previous_status: 'quality_review', }) } else { - // Rejected: push back to in_progress with feedback - db.prepare('UPDATE tasks SET status = ?, error_message = ?, updated_at = ? WHERE id = ?') - .run('in_progress', `Aegis rejected: ${verdict.notes}`, Math.floor(Date.now() / 1000), task.id) + // Rejected: check dispatch_attempts to decide next status + const now = Math.floor(Date.now() / 1000) + const currentAttempts = (db.prepare('SELECT dispatch_attempts FROM tasks WHERE id = ?').get(task.id) as { dispatch_attempts: number } | undefined)?.dispatch_attempts ?? 0 + const newAttempts = currentAttempts + 1 + const maxAegisRetries = 3 - eventBus.broadcast('task.status_changed', { - id: task.id, - status: 'in_progress', - previous_status: 'quality_review', - }) + if (newAttempts >= maxAegisRetries) { + // Too many rejections — move to failed + db.prepare('UPDATE tasks SET status = ?, error_message = ?, dispatch_attempts = ?, updated_at = ? WHERE id = ?') + .run('failed', `Aegis rejected ${newAttempts} times. Last: ${verdict.notes}`, newAttempts, now, task.id) + + eventBus.broadcast('task.status_changed', { + id: task.id, + status: 'failed', + previous_status: 'quality_review', + error_message: `Aegis rejected ${newAttempts} times`, + reason: 'max_aegis_retries_exceeded', + }) + } else { + // Requeue to assigned for re-dispatch with feedback + db.prepare('UPDATE tasks SET status = ?, error_message = ?, dispatch_attempts = ?, updated_at = ? WHERE id = ?') + .run('assigned', `Aegis rejected: ${verdict.notes}`, newAttempts, now, task.id) + + eventBus.broadcast('task.status_changed', { + id: task.id, + status: 'assigned', + previous_status: 'quality_review', + error_message: `Aegis rejected: ${verdict.notes}`, + reason: 'aegis_rejection', + }) + } // Add rejection as a comment so the agent sees it on next dispatch db.prepare(` INSERT INTO comments (task_id, author, content, created_at, workspace_id) VALUES (?, 'aegis', ?, ?, ?) - `).run(task.id, `Quality Review Rejected:\n${verdict.notes}`, Math.floor(Date.now() / 1000), task.workspace_id) + `).run(task.id, `Quality Review Rejected (attempt ${newAttempts}/${maxAegisRetries}):\n${verdict.notes}`, now, task.workspace_id) } db_helpers.logActivity( @@ -363,6 +385,86 @@ export async function runAegisReviews(): Promise<{ ok: boolean; message: string } } +/** + * Requeue stale tasks stuck in 'in_progress' whose assigned agent is offline. + * Prevents tasks from being permanently stuck when agents crash or disconnect. + */ +export async function requeueStaleTasks(): Promise<{ ok: boolean; message: string }> { + const db = getDatabase() + const now = Math.floor(Date.now() / 1000) + const staleThreshold = now - 10 * 60 // 10 minutes + const maxDispatchRetries = 5 + + const staleTasks = db.prepare(` + SELECT t.id, t.title, t.assigned_to, t.dispatch_attempts, t.workspace_id, + a.status as agent_status, a.last_seen as agent_last_seen + FROM tasks t + LEFT JOIN agents a ON a.name = t.assigned_to AND a.workspace_id = t.workspace_id + WHERE t.status = 'in_progress' + AND t.updated_at < ? + `).all(staleThreshold) as Array<{ + id: number; title: string; assigned_to: string | null; dispatch_attempts: number + workspace_id: number; agent_status: string | null; agent_last_seen: number | null + }> + + if (staleTasks.length === 0) { + return { ok: true, message: 'No stale tasks found' } + } + + let requeued = 0 + let failed = 0 + + for (const task of staleTasks) { + // Only requeue if the agent is offline or unknown + const agentOffline = !task.agent_status || task.agent_status === 'offline' + if (!agentOffline) continue + + const newAttempts = (task.dispatch_attempts ?? 0) + 1 + + if (newAttempts >= maxDispatchRetries) { + db.prepare('UPDATE tasks SET status = ?, error_message = ?, dispatch_attempts = ?, updated_at = ? WHERE id = ?') + .run('failed', `Task stuck in_progress ${newAttempts} times — agent "${task.assigned_to}" offline. Moved to failed.`, newAttempts, now, task.id) + + eventBus.broadcast('task.status_changed', { + id: task.id, + status: 'failed', + previous_status: 'in_progress', + error_message: `Stale task — agent offline after ${newAttempts} attempts`, + reason: 'stale_task_max_retries', + }) + + failed++ + } else { + db.prepare('UPDATE tasks SET status = ?, error_message = ?, dispatch_attempts = ?, updated_at = ? WHERE id = ?') + .run('assigned', `Requeued: agent "${task.assigned_to}" went offline while task was in_progress`, newAttempts, now, task.id) + + // Add a comment explaining the requeue + db.prepare(` + INSERT INTO comments (task_id, author, content, created_at, workspace_id) + VALUES (?, 'scheduler', ?, ?, ?) + `).run(task.id, `Task requeued (attempt ${newAttempts}/${maxDispatchRetries}): agent "${task.assigned_to}" went offline while task was in_progress.`, now, task.workspace_id) + + eventBus.broadcast('task.status_changed', { + id: task.id, + status: 'assigned', + previous_status: 'in_progress', + error_message: `Agent "${task.assigned_to}" went offline`, + reason: 'stale_task_requeue', + }) + + requeued++ + } + } + + const total = requeued + failed + return { + ok: true, + message: total === 0 + ? `Found ${staleTasks.length} stale task(s) but agents still online` + : `Requeued ${requeued}, failed ${failed} of ${staleTasks.length} stale task(s)`, + } +} + export async function dispatchAssignedTasks(): Promise<{ ok: boolean; message: string }> { const db = getDatabase() @@ -559,15 +661,36 @@ export async function dispatchAssignedTasks(): Promise<{ ok: boolean; message: s const errorMsg = err.message || 'Unknown error' logger.error({ taskId: task.id, agent: task.agent_name, err }, 'Task dispatch failed') - // Revert to assigned so it can be retried on the next tick - db.prepare('UPDATE tasks SET status = ?, error_message = ?, updated_at = ? WHERE id = ?') - .run('assigned', errorMsg.substring(0, 5000), Math.floor(Date.now() / 1000), task.id) + // Increment dispatch_attempts and decide next status + const currentAttempts = (db.prepare('SELECT dispatch_attempts FROM tasks WHERE id = ?').get(task.id) as { dispatch_attempts: number } | undefined)?.dispatch_attempts ?? 0 + const newAttempts = currentAttempts + 1 + const maxDispatchRetries = 5 - eventBus.broadcast('task.status_changed', { - id: task.id, - status: 'assigned', - previous_status: 'in_progress', - }) + if (newAttempts >= maxDispatchRetries) { + // Too many failures — move to failed + db.prepare('UPDATE tasks SET status = ?, error_message = ?, dispatch_attempts = ?, updated_at = ? WHERE id = ?') + .run('failed', `Dispatch failed ${newAttempts} times. Last: ${errorMsg.substring(0, 5000)}`, newAttempts, Math.floor(Date.now() / 1000), task.id) + + eventBus.broadcast('task.status_changed', { + id: task.id, + status: 'failed', + previous_status: 'in_progress', + error_message: `Dispatch failed ${newAttempts} times`, + reason: 'max_dispatch_retries_exceeded', + }) + } else { + // Revert to assigned so it can be retried on the next tick + db.prepare('UPDATE tasks SET status = ?, error_message = ?, dispatch_attempts = ?, updated_at = ? WHERE id = ?') + .run('assigned', errorMsg.substring(0, 5000), newAttempts, Math.floor(Date.now() / 1000), task.id) + + eventBus.broadcast('task.status_changed', { + id: task.id, + status: 'assigned', + previous_status: 'in_progress', + error_message: errorMsg.substring(0, 500), + reason: 'dispatch_failed', + }) + } db_helpers.logActivity( 'task_dispatch_failed',