fix: task routing stuck issues + k8s agent visibility

- Add stale task watchdog (requeueStaleTasks) to scheduler — detects
  in_progress tasks with offline agents and requeues or fails them
- Fix Aegis rejection loop: rejected tasks now requeue to 'assigned'
  instead of staying in 'in_progress', with max 3 retries before failing
- Track dispatch_attempts on tasks (migration 044) to prevent infinite
  retry loops — tasks fail after 5 dispatch attempts
- Include error_message and reason in SSE event broadcasts so UI can
  show why a task reverted
- Atomic task queue claim: replace SELECT-then-UPDATE race with single
  UPDATE...RETURNING statement
- Gateway agent auto-registration: POST/PUT /api/gateways accepts
  optional 'agents' array to upsert agents (k8s sidecar support)
- Document k8s sidecar deployment in docs/deployment.md

Fixes: tasks stuck in assigned, Aegis rejection loops, agents invisible
in k8s sidecar deployments
This commit is contained in:
Nyk 2026-03-21 22:21:33 +07:00
parent dd7d663a36
commit 2d171ad464
6 changed files with 313 additions and 53 deletions

View File

@ -115,6 +115,82 @@ See `.env.example` for the full list. Key variables:
| `OPENCLAW_HOME` | No | - | Path to OpenClaw installation |
| `MC_ALLOWED_HOSTS` | No | `localhost,127.0.0.1` | Allowed hosts in production |
## Kubernetes Sidecar Deployment
When running Mission Control alongside a gateway as containers in the same pod (sidecar pattern), agents are not discovered via the filesystem. Instead, use the gateway's agent registration API.
### Architecture
```
┌──────────────── Pod ────────────────┐
│ ┌─────────┐ ┌───────────────┐ │
│ │ MC │◄───►│ Gateway │ │
│ │ :3000 │ │ :18789 │ │
│ └─────────┘ └───────────────┘ │
│ ▲ ▲ │
│ │ localhost │ │
│ └──────────────────┘ │
└─────────────────────────────────────┘
```
### Required Configuration
**Environment variables** for the MC container:
```bash
AUTH_USER=admin
AUTH_PASS=<secure-password>
API_KEY=<your-api-key>
OPENCLAW_GATEWAY_HOST=127.0.0.1
NEXT_PUBLIC_GATEWAY_PORT=18789
```
### Agent Registration
The gateway must register its agents with MC on startup. Include the `agents` array in the gateway registration request:
```bash
curl -X POST http://localhost:3000/api/gateways \
-H "Authorization: Bearer <API_KEY>" \
-H "Content-Type: application/json" \
-d '{
"name": "sidecar-gateway",
"host": "127.0.0.1",
"port": 18789,
"is_primary": true,
"agents": [
{ "name": "developer-1", "role": "developer" },
{ "name": "researcher-1", "role": "researcher" }
]
}'
```
To update the agent list on reconnect, use `PUT /api/gateways` with the same `agents` field.
Alternatively, each agent can register itself via the direct connection endpoint:
```bash
curl -X POST http://localhost:3000/api/connect \
-H "Authorization: Bearer <API_KEY>" \
-H "Content-Type: application/json" \
-d '{
"tool_name": "openclaw-gateway",
"agent_name": "developer-1",
"agent_role": "developer"
}'
```
### Health Checks
Agents must send heartbeats to stay visible:
```bash
curl http://localhost:3000/api/agents/<agent-id>/heartbeat \
-H "Authorization: Bearer <API_KEY>"
```
Without heartbeats, agents will be marked offline after 10 minutes (configurable via `general.agent_timeout_minutes` setting).
## Troubleshooting
### "Module not found: better-sqlite3"

View File

@ -80,7 +80,7 @@ export async function POST(request: NextRequest) {
ensureTable(db)
const body = await request.json()
const { name, host, port, token, is_primary } = body
const { name, host, port, token, is_primary, agents } = body
if (!name || !host || !port) {
return NextResponse.json({ error: 'name, host, and port are required' }, { status: 400 })
@ -96,14 +96,37 @@ export async function POST(request: NextRequest) {
INSERT INTO gateways (name, host, port, token, is_primary) VALUES (?, ?, ?, ?, ?)
`).run(name, host, port, token || '', is_primary ? 1 : 0)
// Auto-register agents reported by the gateway (k8s sidecar support)
let agentsRegistered = 0
if (Array.isArray(agents) && agents.length > 0) {
const workspaceId = auth.user?.workspace_id ?? 1
const now = Math.floor(Date.now() / 1000)
const upsertAgent = db.prepare(`
INSERT INTO agents (name, role, status, last_seen, source, workspace_id, updated_at)
VALUES (?, ?, 'idle', ?, 'gateway', ?, ?)
ON CONFLICT(name) DO UPDATE SET
status = 'idle',
last_seen = excluded.last_seen,
source = 'gateway',
updated_at = excluded.updated_at
`)
for (const agent of agents.slice(0, 50)) {
if (typeof agent?.name !== 'string' || !agent.name.trim()) continue
const agentName = agent.name.trim().substring(0, 100)
const agentRole = typeof agent?.role === 'string' ? agent.role.trim().substring(0, 100) : 'agent'
upsertAgent.run(agentName, agentRole, now, workspaceId, now)
agentsRegistered++
}
}
try {
db.prepare('INSERT INTO audit_log (action, actor, detail) VALUES (?, ?, ?)').run(
'gateway_added', auth.user?.username || 'system', `Added gateway: ${name} (${host}:${port})`
'gateway_added', auth.user?.username || 'system', `Added gateway: ${name} (${host}:${port})${agentsRegistered ? `, registered ${agentsRegistered} agent(s)` : ''}`
)
} catch { /* audit might not exist */ }
const gw = db.prepare('SELECT * FROM gateways WHERE id = ?').get(result.lastInsertRowid) as GatewayEntry
return NextResponse.json({ gateway: redactToken(gw) }, { status: 201 })
return NextResponse.json({ gateway: redactToken(gw), agents_registered: agentsRegistered }, { status: 201 })
} catch (err: any) {
if (err.message?.includes('UNIQUE')) {
return NextResponse.json({ error: 'A gateway with that name already exists' }, { status: 409 })
@ -145,15 +168,39 @@ export async function PUT(request: NextRequest) {
}
}
if (sets.length === 0) return NextResponse.json({ error: 'No valid fields to update' }, { status: 400 })
if (sets.length === 0 && !Array.isArray(updates.agents)) return NextResponse.json({ error: 'No valid fields to update' }, { status: 400 })
sets.push('updated_at = (unixepoch())')
values.push(id)
if (sets.length > 0) {
sets.push('updated_at = (unixepoch())')
values.push(id)
db.prepare(`UPDATE gateways SET ${sets.join(', ')} WHERE id = ?`).run(...values)
}
db.prepare(`UPDATE gateways SET ${sets.join(', ')} WHERE id = ?`).run(...values)
// Auto-register agents reported by the gateway (k8s sidecar support)
let agentsRegistered = 0
if (Array.isArray(updates.agents) && updates.agents.length > 0) {
const workspaceId = auth.user?.workspace_id ?? 1
const now = Math.floor(Date.now() / 1000)
const upsertAgent = db.prepare(`
INSERT INTO agents (name, role, status, last_seen, source, workspace_id, updated_at)
VALUES (?, ?, 'idle', ?, 'gateway', ?, ?)
ON CONFLICT(name, workspace_id) DO UPDATE SET
status = 'idle',
last_seen = excluded.last_seen,
source = 'gateway',
updated_at = excluded.updated_at
`)
for (const agent of updates.agents.slice(0, 50)) {
if (typeof agent?.name !== 'string' || !agent.name.trim()) continue
const agentName = agent.name.trim().substring(0, 100)
const agentRole = typeof agent?.role === 'string' ? agent.role.trim().substring(0, 100) : 'agent'
upsertAgent.run(agentName, agentRole, now, workspaceId, now)
agentsRegistered++
}
}
const updated = db.prepare('SELECT * FROM gateways WHERE id = ?').get(id) as GatewayEntry
return NextResponse.json({ gateway: redactToken(updated) })
return NextResponse.json({ gateway: redactToken(updated), agents_registered: agentsRegistered })
}
/**

View File

@ -105,37 +105,28 @@ export async function GET(request: NextRequest) {
})
}
// Best-effort atomic pickup loop for race safety.
for (let attempt = 0; attempt < 5; attempt += 1) {
const candidate = db.prepare(`
SELECT *
FROM tasks
// Atomic claim: single UPDATE with subquery to eliminate SELECT-UPDATE race condition.
const claimed = db.prepare(`
UPDATE tasks
SET status = 'in_progress', assigned_to = ?, updated_at = ?
WHERE id = (
SELECT id FROM tasks
WHERE workspace_id = ?
AND status IN ('assigned', 'inbox')
AND (assigned_to IS NULL OR assigned_to = ?)
ORDER BY ${priorityRankSql()} ASC, due_date ASC NULLS LAST, created_at ASC
LIMIT 1
`).get(workspaceId, agent) as any | undefined
)
RETURNING *
`).get(agent, now, workspaceId, agent) as any | undefined
if (!candidate) break
const claimed = db.prepare(`
UPDATE tasks
SET status = 'in_progress', assigned_to = ?, updated_at = ?
WHERE id = ? AND workspace_id = ?
AND status IN ('assigned', 'inbox')
AND (assigned_to IS NULL OR assigned_to = ?)
`).run(agent, now, candidate.id, workspaceId, agent)
if (claimed.changes > 0) {
const task = db.prepare('SELECT * FROM tasks WHERE id = ? AND workspace_id = ?').get(candidate.id, workspaceId) as any
return NextResponse.json({
task: mapTaskRow(task),
reason: 'assigned' as QueueReason,
agent,
timestamp: now,
})
}
if (claimed) {
return NextResponse.json({
task: mapTaskRow(claimed),
reason: 'assigned' as QueueReason,
agent,
timestamp: now,
})
}
return NextResponse.json({

View File

@ -1294,6 +1294,16 @@ const migrations: Migration[] = [
db.exec(`CREATE INDEX IF NOT EXISTS idx_spawn_history_created ON spawn_history(created_at)`)
db.exec(`CREATE INDEX IF NOT EXISTS idx_spawn_history_status ON spawn_history(status)`)
}
},
{
id: '044_task_dispatch_attempts',
up(db: Database.Database) {
const cols = db.prepare(`PRAGMA table_info(tasks)`).all() as Array<{ name: string }>
if (!cols.some(c => c.name === 'dispatch_attempts')) {
db.exec(`ALTER TABLE tasks ADD COLUMN dispatch_attempts INTEGER NOT NULL DEFAULT 0`)
}
db.exec(`CREATE INDEX IF NOT EXISTS idx_tasks_stale_inprogress ON tasks(status, updated_at) WHERE status = 'in_progress'`)
}
}
]

View File

@ -10,7 +10,7 @@ import { pruneGatewaySessionsOlderThan, getAgentLiveStatuses } from './sessions'
import { eventBus } from './event-bus'
import { syncSkillsFromDisk } from './skill-sync'
import { syncLocalAgents } from './local-agent-sync'
import { dispatchAssignedTasks, runAegisReviews } from './task-dispatch'
import { dispatchAssignedTasks, runAegisReviews, requeueStaleTasks } from './task-dispatch'
import { spawnRecurringTasks } from './recurring-tasks'
const BACKUP_DIR = join(dirname(config.dbPath), 'backups')
@ -389,6 +389,15 @@ export function initScheduler() {
running: false,
})
tasks.set('stale_task_requeue', {
name: 'Stale Task Requeue',
intervalMs: TICK_MS, // Every 60s — check for stale in_progress tasks
lastRun: null,
nextRun: now + 25_000, // First check 25s after startup
enabled: true,
running: false,
})
// Start the tick loop
tickInterval = setInterval(tick, TICK_MS)
logger.info('Scheduler initialized - backup at ~3AM, cleanup at ~4AM, heartbeat every 5m, webhook/claude/skill/local-agent/gateway-agent sync every 60s')
@ -423,8 +432,9 @@ async function tick() {
: id === 'task_dispatch' ? 'general.task_dispatch'
: id === 'aegis_review' ? 'general.aegis_review'
: id === 'recurring_task_spawn' ? 'general.recurring_task_spawn'
: id === 'stale_task_requeue' ? 'general.stale_task_requeue'
: 'general.agent_heartbeat'
const defaultEnabled = id === 'agent_heartbeat' || id === 'webhook_retry' || id === 'claude_session_scan' || id === 'skill_sync' || id === 'local_agent_sync' || id === 'gateway_agent_sync' || id === 'task_dispatch' || id === 'aegis_review' || id === 'recurring_task_spawn'
const defaultEnabled = id === 'agent_heartbeat' || id === 'webhook_retry' || id === 'claude_session_scan' || id === 'skill_sync' || id === 'local_agent_sync' || id === 'gateway_agent_sync' || id === 'task_dispatch' || id === 'aegis_review' || id === 'recurring_task_spawn' || id === 'stale_task_requeue'
if (!isSettingEnabled(settingKey, defaultEnabled)) continue
task.running = true
@ -442,6 +452,7 @@ async function tick() {
: id === 'task_dispatch' ? await dispatchAssignedTasks()
: id === 'aegis_review' ? await runAegisReviews()
: id === 'recurring_task_spawn' ? await spawnRecurringTasks()
: id === 'stale_task_requeue' ? await requeueStaleTasks()
: await runCleanup()
task.lastResult = { ...result, timestamp: now }
} catch (err: any) {
@ -477,8 +488,9 @@ export function getSchedulerStatus() {
: id === 'task_dispatch' ? 'general.task_dispatch'
: id === 'aegis_review' ? 'general.aegis_review'
: id === 'recurring_task_spawn' ? 'general.recurring_task_spawn'
: id === 'stale_task_requeue' ? 'general.stale_task_requeue'
: 'general.agent_heartbeat'
const defaultEnabled = id === 'agent_heartbeat' || id === 'webhook_retry' || id === 'claude_session_scan' || id === 'skill_sync' || id === 'local_agent_sync' || id === 'gateway_agent_sync' || id === 'task_dispatch' || id === 'aegis_review' || id === 'recurring_task_spawn'
const defaultEnabled = id === 'agent_heartbeat' || id === 'webhook_retry' || id === 'claude_session_scan' || id === 'skill_sync' || id === 'local_agent_sync' || id === 'gateway_agent_sync' || id === 'task_dispatch' || id === 'aegis_review' || id === 'recurring_task_spawn' || id === 'stale_task_requeue'
result.push({
id,
name: task.name,
@ -506,6 +518,7 @@ export async function triggerTask(taskId: string): Promise<{ ok: boolean; messag
if (taskId === 'task_dispatch') return dispatchAssignedTasks()
if (taskId === 'aegis_review') return runAegisReviews()
if (taskId === 'recurring_task_spawn') return spawnRecurringTasks()
if (taskId === 'stale_task_requeue') return requeueStaleTasks()
return { ok: false, message: `Unknown task: ${taskId}` }
}

View File

@ -306,21 +306,43 @@ export async function runAegisReviews(): Promise<{ ok: boolean; message: string
previous_status: 'quality_review',
})
} else {
// Rejected: push back to in_progress with feedback
db.prepare('UPDATE tasks SET status = ?, error_message = ?, updated_at = ? WHERE id = ?')
.run('in_progress', `Aegis rejected: ${verdict.notes}`, Math.floor(Date.now() / 1000), task.id)
// Rejected: check dispatch_attempts to decide next status
const now = Math.floor(Date.now() / 1000)
const currentAttempts = (db.prepare('SELECT dispatch_attempts FROM tasks WHERE id = ?').get(task.id) as { dispatch_attempts: number } | undefined)?.dispatch_attempts ?? 0
const newAttempts = currentAttempts + 1
const maxAegisRetries = 3
eventBus.broadcast('task.status_changed', {
id: task.id,
status: 'in_progress',
previous_status: 'quality_review',
})
if (newAttempts >= maxAegisRetries) {
// Too many rejections — move to failed
db.prepare('UPDATE tasks SET status = ?, error_message = ?, dispatch_attempts = ?, updated_at = ? WHERE id = ?')
.run('failed', `Aegis rejected ${newAttempts} times. Last: ${verdict.notes}`, newAttempts, now, task.id)
eventBus.broadcast('task.status_changed', {
id: task.id,
status: 'failed',
previous_status: 'quality_review',
error_message: `Aegis rejected ${newAttempts} times`,
reason: 'max_aegis_retries_exceeded',
})
} else {
// Requeue to assigned for re-dispatch with feedback
db.prepare('UPDATE tasks SET status = ?, error_message = ?, dispatch_attempts = ?, updated_at = ? WHERE id = ?')
.run('assigned', `Aegis rejected: ${verdict.notes}`, newAttempts, now, task.id)
eventBus.broadcast('task.status_changed', {
id: task.id,
status: 'assigned',
previous_status: 'quality_review',
error_message: `Aegis rejected: ${verdict.notes}`,
reason: 'aegis_rejection',
})
}
// Add rejection as a comment so the agent sees it on next dispatch
db.prepare(`
INSERT INTO comments (task_id, author, content, created_at, workspace_id)
VALUES (?, 'aegis', ?, ?, ?)
`).run(task.id, `Quality Review Rejected:\n${verdict.notes}`, Math.floor(Date.now() / 1000), task.workspace_id)
`).run(task.id, `Quality Review Rejected (attempt ${newAttempts}/${maxAegisRetries}):\n${verdict.notes}`, now, task.workspace_id)
}
db_helpers.logActivity(
@ -363,6 +385,86 @@ export async function runAegisReviews(): Promise<{ ok: boolean; message: string
}
}
/**
* Requeue stale tasks stuck in 'in_progress' whose assigned agent is offline.
* Prevents tasks from being permanently stuck when agents crash or disconnect.
*/
export async function requeueStaleTasks(): Promise<{ ok: boolean; message: string }> {
const db = getDatabase()
const now = Math.floor(Date.now() / 1000)
const staleThreshold = now - 10 * 60 // 10 minutes
const maxDispatchRetries = 5
const staleTasks = db.prepare(`
SELECT t.id, t.title, t.assigned_to, t.dispatch_attempts, t.workspace_id,
a.status as agent_status, a.last_seen as agent_last_seen
FROM tasks t
LEFT JOIN agents a ON a.name = t.assigned_to AND a.workspace_id = t.workspace_id
WHERE t.status = 'in_progress'
AND t.updated_at < ?
`).all(staleThreshold) as Array<{
id: number; title: string; assigned_to: string | null; dispatch_attempts: number
workspace_id: number; agent_status: string | null; agent_last_seen: number | null
}>
if (staleTasks.length === 0) {
return { ok: true, message: 'No stale tasks found' }
}
let requeued = 0
let failed = 0
for (const task of staleTasks) {
// Only requeue if the agent is offline or unknown
const agentOffline = !task.agent_status || task.agent_status === 'offline'
if (!agentOffline) continue
const newAttempts = (task.dispatch_attempts ?? 0) + 1
if (newAttempts >= maxDispatchRetries) {
db.prepare('UPDATE tasks SET status = ?, error_message = ?, dispatch_attempts = ?, updated_at = ? WHERE id = ?')
.run('failed', `Task stuck in_progress ${newAttempts} times — agent "${task.assigned_to}" offline. Moved to failed.`, newAttempts, now, task.id)
eventBus.broadcast('task.status_changed', {
id: task.id,
status: 'failed',
previous_status: 'in_progress',
error_message: `Stale task — agent offline after ${newAttempts} attempts`,
reason: 'stale_task_max_retries',
})
failed++
} else {
db.prepare('UPDATE tasks SET status = ?, error_message = ?, dispatch_attempts = ?, updated_at = ? WHERE id = ?')
.run('assigned', `Requeued: agent "${task.assigned_to}" went offline while task was in_progress`, newAttempts, now, task.id)
// Add a comment explaining the requeue
db.prepare(`
INSERT INTO comments (task_id, author, content, created_at, workspace_id)
VALUES (?, 'scheduler', ?, ?, ?)
`).run(task.id, `Task requeued (attempt ${newAttempts}/${maxDispatchRetries}): agent "${task.assigned_to}" went offline while task was in_progress.`, now, task.workspace_id)
eventBus.broadcast('task.status_changed', {
id: task.id,
status: 'assigned',
previous_status: 'in_progress',
error_message: `Agent "${task.assigned_to}" went offline`,
reason: 'stale_task_requeue',
})
requeued++
}
}
const total = requeued + failed
return {
ok: true,
message: total === 0
? `Found ${staleTasks.length} stale task(s) but agents still online`
: `Requeued ${requeued}, failed ${failed} of ${staleTasks.length} stale task(s)`,
}
}
export async function dispatchAssignedTasks(): Promise<{ ok: boolean; message: string }> {
const db = getDatabase()
@ -559,15 +661,36 @@ export async function dispatchAssignedTasks(): Promise<{ ok: boolean; message: s
const errorMsg = err.message || 'Unknown error'
logger.error({ taskId: task.id, agent: task.agent_name, err }, 'Task dispatch failed')
// Revert to assigned so it can be retried on the next tick
db.prepare('UPDATE tasks SET status = ?, error_message = ?, updated_at = ? WHERE id = ?')
.run('assigned', errorMsg.substring(0, 5000), Math.floor(Date.now() / 1000), task.id)
// Increment dispatch_attempts and decide next status
const currentAttempts = (db.prepare('SELECT dispatch_attempts FROM tasks WHERE id = ?').get(task.id) as { dispatch_attempts: number } | undefined)?.dispatch_attempts ?? 0
const newAttempts = currentAttempts + 1
const maxDispatchRetries = 5
eventBus.broadcast('task.status_changed', {
id: task.id,
status: 'assigned',
previous_status: 'in_progress',
})
if (newAttempts >= maxDispatchRetries) {
// Too many failures — move to failed
db.prepare('UPDATE tasks SET status = ?, error_message = ?, dispatch_attempts = ?, updated_at = ? WHERE id = ?')
.run('failed', `Dispatch failed ${newAttempts} times. Last: ${errorMsg.substring(0, 5000)}`, newAttempts, Math.floor(Date.now() / 1000), task.id)
eventBus.broadcast('task.status_changed', {
id: task.id,
status: 'failed',
previous_status: 'in_progress',
error_message: `Dispatch failed ${newAttempts} times`,
reason: 'max_dispatch_retries_exceeded',
})
} else {
// Revert to assigned so it can be retried on the next tick
db.prepare('UPDATE tasks SET status = ?, error_message = ?, dispatch_attempts = ?, updated_at = ? WHERE id = ?')
.run('assigned', errorMsg.substring(0, 5000), newAttempts, Math.floor(Date.now() / 1000), task.id)
eventBus.broadcast('task.status_changed', {
id: task.id,
status: 'assigned',
previous_status: 'in_progress',
error_message: errorMsg.substring(0, 500),
reason: 'dispatch_failed',
})
}
db_helpers.logActivity(
'task_dispatch_failed',