feat(tasks): add agent queue polling endpoint

This commit is contained in:
Nyk 2026-03-05 13:22:12 +07:00
parent fce3b78706
commit c024731764
4 changed files with 301 additions and 0 deletions

View File

@ -217,6 +217,7 @@ All endpoints require authentication unless noted. Full reference below.
| `POST` | `/api/agents/message` | operator | Send message to agent | | `POST` | `/api/agents/message` | operator | Send message to agent |
| `GET` | `/api/tasks` | viewer | List tasks (filter: `?status=`, `?assigned_to=`, `?priority=`) | | `GET` | `/api/tasks` | viewer | List tasks (filter: `?status=`, `?assigned_to=`, `?priority=`) |
| `POST` | `/api/tasks` | operator | Create task | | `POST` | `/api/tasks` | operator | Create task |
| `GET` | `/api/tasks/queue` | operator | Poll next task for an agent (`?agent=`, optional `?max_capacity=`) |
| `GET` | `/api/tasks/[id]` | viewer | Task details | | `GET` | `/api/tasks/[id]` | viewer | Task details |
| `PUT` | `/api/tasks/[id]` | operator | Update task | | `PUT` | `/api/tasks/[id]` | operator | Update task |
| `DELETE` | `/api/tasks/[id]` | admin | Delete task | | `DELETE` | `/api/tasks/[id]` | admin | Delete task |

View File

@ -5814,6 +5814,90 @@
} }
} }
}, },
"/api/tasks/queue": {
"get": {
"tags": [
"Tasks"
],
"summary": "Poll next task for an agent",
"operationId": "pollTaskQueue",
"parameters": [
{
"name": "agent",
"in": "query",
"required": false,
"schema": {
"type": "string"
},
"description": "Agent name. Optional when x-agent-name header is provided."
},
{
"name": "max_capacity",
"in": "query",
"required": false,
"schema": {
"type": "integer",
"minimum": 1,
"maximum": 20,
"default": 1
}
},
{
"name": "x-agent-name",
"in": "header",
"required": false,
"schema": {
"type": "string"
},
"description": "Agent attribution header used when `agent` query param is omitted."
}
],
"responses": {
"200": {
"description": "Queue poll result",
"content": {
"application/json": {
"schema": {
"type": "object",
"properties": {
"task": {
"oneOf": [
{ "$ref": "#/components/schemas/Task" },
{ "type": "null" }
]
},
"reason": {
"type": "string",
"enum": [
"continue_current",
"assigned",
"at_capacity",
"no_tasks_available"
]
},
"agent": {
"type": "string"
},
"timestamp": {
"type": "integer"
}
}
}
}
}
},
"400": {
"$ref": "#/components/responses/BadRequest"
},
"401": {
"$ref": "#/components/responses/Unauthorized"
},
"403": {
"$ref": "#/components/responses/Forbidden"
}
}
}
},
"/api/tasks/{id}": { "/api/tasks/{id}": {
"get": { "get": {
"tags": [ "tags": [

View File

@ -0,0 +1,147 @@
import { NextRequest, NextResponse } from 'next/server'
import { getDatabase } from '@/lib/db'
import { requireRole } from '@/lib/auth'
import { logger } from '@/lib/logger'
type QueueReason = 'continue_current' | 'assigned' | 'at_capacity' | 'no_tasks_available'
function safeParseJson<T>(raw: string | null | undefined, fallback: T): T {
if (!raw) return fallback
try {
return JSON.parse(raw) as T
} catch {
return fallback
}
}
function mapTaskRow(task: any) {
return {
...task,
tags: safeParseJson(task.tags, [] as string[]),
metadata: safeParseJson(task.metadata, {} as Record<string, unknown>),
}
}
function priorityRankSql() {
return `
CASE priority
WHEN 'critical' THEN 0
WHEN 'high' THEN 1
WHEN 'medium' THEN 2
WHEN 'low' THEN 3
ELSE 4
END
`
}
/**
* GET /api/tasks/queue - Poll next task for an agent.
*
* Query params:
* - agent: required agent name (or use x-agent-name header)
* - max_capacity: optional integer 1..20 (default 1)
*/
export async function GET(request: NextRequest) {
const auth = requireRole(request, 'operator')
if ('error' in auth) return NextResponse.json({ error: auth.error }, { status: auth.status })
try {
const db = getDatabase()
const workspaceId = auth.user.workspace_id
const { searchParams } = new URL(request.url)
const agent =
(searchParams.get('agent') || '').trim() ||
(request.headers.get('x-agent-name') || '').trim()
if (!agent) {
return NextResponse.json({ error: 'Missing agent. Provide ?agent=... or x-agent-name header.' }, { status: 400 })
}
const maxCapacityRaw = searchParams.get('max_capacity') || '1'
if (!/^\d+$/.test(maxCapacityRaw)) {
return NextResponse.json({ error: 'Invalid max_capacity. Expected integer 1..20.' }, { status: 400 })
}
const maxCapacity = Number(maxCapacityRaw)
if (!Number.isInteger(maxCapacity) || maxCapacity < 1 || maxCapacity > 20) {
return NextResponse.json({ error: 'Invalid max_capacity. Expected integer 1..20.' }, { status: 400 })
}
const now = Math.floor(Date.now() / 1000)
const currentTask = db.prepare(`
SELECT *
FROM tasks
WHERE workspace_id = ? AND assigned_to = ? AND status = 'in_progress'
ORDER BY updated_at DESC
LIMIT 1
`).get(workspaceId, agent) as any | undefined
if (currentTask) {
return NextResponse.json({
task: mapTaskRow(currentTask),
reason: 'continue_current' as QueueReason,
agent,
timestamp: now,
})
}
const inProgressCount = (db.prepare(`
SELECT COUNT(*) as c
FROM tasks
WHERE workspace_id = ? AND assigned_to = ? AND status = 'in_progress'
`).get(workspaceId, agent) as { c: number }).c
if (inProgressCount >= maxCapacity) {
return NextResponse.json({
task: null,
reason: 'at_capacity' as QueueReason,
agent,
timestamp: now,
})
}
// Best-effort atomic pickup loop for race safety.
for (let attempt = 0; attempt < 5; attempt += 1) {
const candidate = db.prepare(`
SELECT *
FROM tasks
WHERE workspace_id = ?
AND status IN ('assigned', 'inbox')
AND (assigned_to IS NULL OR assigned_to = ?)
ORDER BY ${priorityRankSql()} ASC, due_date ASC NULLS LAST, created_at ASC
LIMIT 1
`).get(workspaceId, agent) as any | undefined
if (!candidate) break
const claimed = db.prepare(`
UPDATE tasks
SET status = 'in_progress', assigned_to = ?, updated_at = ?
WHERE id = ? AND workspace_id = ?
AND status IN ('assigned', 'inbox')
AND (assigned_to IS NULL OR assigned_to = ?)
`).run(agent, now, candidate.id, workspaceId, agent)
if (claimed.changes > 0) {
const task = db.prepare('SELECT * FROM tasks WHERE id = ? AND workspace_id = ?').get(candidate.id, workspaceId) as any
return NextResponse.json({
task: mapTaskRow(task),
reason: 'assigned' as QueueReason,
agent,
timestamp: now,
})
}
}
return NextResponse.json({
task: null,
reason: 'no_tasks_available' as QueueReason,
agent,
timestamp: now,
})
} catch (error) {
logger.error({ err: error }, 'GET /api/tasks/queue error')
return NextResponse.json({ error: 'Failed to poll task queue' }, { status: 500 })
}
}

69
tests/task-queue.spec.ts Normal file
View File

@ -0,0 +1,69 @@
import { expect, test } from '@playwright/test'
import { API_KEY_HEADER, createTestTask, deleteTestTask } from './helpers'
test.describe('Task Queue API', () => {
const cleanup: number[] = []
test.afterEach(async ({ request }) => {
for (const id of cleanup) {
await deleteTestTask(request, id).catch(() => {})
}
cleanup.length = 0
})
test('picks the next task and marks it in_progress for agent', async ({ request }) => {
const low = await createTestTask(request, { priority: 'low', status: 'inbox' })
const critical = await createTestTask(request, { priority: 'critical', status: 'inbox' })
cleanup.push(low.id, critical.id)
const res = await request.get('/api/tasks/queue?agent=queue-agent', { headers: API_KEY_HEADER })
expect(res.status()).toBe(200)
const body = await res.json()
expect(body.reason).toBe('assigned')
expect(body.task).toBeTruthy()
expect(body.task.id).toBe(critical.id)
expect(body.task.status).toBe('in_progress')
expect(body.task.assigned_to).toBe('queue-agent')
})
test('returns current in_progress task as continue_current', async ({ request }) => {
const task = await createTestTask(request, {
status: 'in_progress',
assigned_to: 'queue-agent-2',
priority: 'high',
})
cleanup.push(task.id)
const res = await request.get('/api/tasks/queue?agent=queue-agent-2', { headers: API_KEY_HEADER })
expect(res.status()).toBe(200)
const body = await res.json()
expect(body.reason).toBe('continue_current')
expect(body.task?.id).toBe(task.id)
})
test('validates max_capacity input', async ({ request }) => {
const res = await request.get('/api/tasks/queue?agent=queue-agent-empty&max_capacity=999', {
headers: API_KEY_HEADER,
})
expect(res.status()).toBe(400)
})
test('uses x-agent-name header when query param is omitted', async ({ request }) => {
const task = await createTestTask(request, {
status: 'assigned',
assigned_to: 'header-agent',
priority: 'high',
})
cleanup.push(task.id)
const res = await request.get('/api/tasks/queue', {
headers: { ...API_KEY_HEADER, 'x-agent-name': 'header-agent' },
})
expect(res.status()).toBe(200)
const body = await res.json()
expect(body.reason).toBe('assigned')
expect(body.agent).toBe('header-agent')
expect(body.task?.id).toBe(task.id)
})
})