diff --git a/README.md b/README.md index 7354f9a..abaa5c5 100644 --- a/README.md +++ b/README.md @@ -217,6 +217,7 @@ All endpoints require authentication unless noted. Full reference below. | `POST` | `/api/agents/message` | operator | Send message to agent | | `GET` | `/api/tasks` | viewer | List tasks (filter: `?status=`, `?assigned_to=`, `?priority=`) | | `POST` | `/api/tasks` | operator | Create task | +| `GET` | `/api/tasks/queue` | operator | Poll next task for an agent (`?agent=`, optional `?max_capacity=`) | | `GET` | `/api/tasks/[id]` | viewer | Task details | | `PUT` | `/api/tasks/[id]` | operator | Update task | | `DELETE` | `/api/tasks/[id]` | admin | Delete task | diff --git a/openapi.json b/openapi.json index 1b8e4d5..dfe02b0 100644 --- a/openapi.json +++ b/openapi.json @@ -5958,6 +5958,90 @@ } } }, + "/api/tasks/queue": { + "get": { + "tags": [ + "Tasks" + ], + "summary": "Poll next task for an agent", + "operationId": "pollTaskQueue", + "parameters": [ + { + "name": "agent", + "in": "query", + "required": false, + "schema": { + "type": "string" + }, + "description": "Agent name. Optional when x-agent-name header is provided." + }, + { + "name": "max_capacity", + "in": "query", + "required": false, + "schema": { + "type": "integer", + "minimum": 1, + "maximum": 20, + "default": 1 + } + }, + { + "name": "x-agent-name", + "in": "header", + "required": false, + "schema": { + "type": "string" + }, + "description": "Agent attribution header used when `agent` query param is omitted." + } + ], + "responses": { + "200": { + "description": "Queue poll result", + "content": { + "application/json": { + "schema": { + "type": "object", + "properties": { + "task": { + "oneOf": [ + { "$ref": "#/components/schemas/Task" }, + { "type": "null" } + ] + }, + "reason": { + "type": "string", + "enum": [ + "continue_current", + "assigned", + "at_capacity", + "no_tasks_available" + ] + }, + "agent": { + "type": "string" + }, + "timestamp": { + "type": "integer" + } + } + } + } + } + }, + "400": { + "$ref": "#/components/responses/BadRequest" + }, + "401": { + "$ref": "#/components/responses/Unauthorized" + }, + "403": { + "$ref": "#/components/responses/Forbidden" + } + } + } + }, "/api/tasks/{id}": { "get": { "tags": [ diff --git a/src/app/api/tasks/queue/route.ts b/src/app/api/tasks/queue/route.ts new file mode 100644 index 0000000..f0aa68c --- /dev/null +++ b/src/app/api/tasks/queue/route.ts @@ -0,0 +1,147 @@ +import { NextRequest, NextResponse } from 'next/server' +import { getDatabase } from '@/lib/db' +import { requireRole } from '@/lib/auth' +import { logger } from '@/lib/logger' + +type QueueReason = 'continue_current' | 'assigned' | 'at_capacity' | 'no_tasks_available' + +function safeParseJson(raw: string | null | undefined, fallback: T): T { + if (!raw) return fallback + try { + return JSON.parse(raw) as T + } catch { + return fallback + } +} + +function mapTaskRow(task: any) { + return { + ...task, + tags: safeParseJson(task.tags, [] as string[]), + metadata: safeParseJson(task.metadata, {} as Record), + } +} + +function priorityRankSql() { + return ` + CASE priority + WHEN 'critical' THEN 0 + WHEN 'high' THEN 1 + WHEN 'medium' THEN 2 + WHEN 'low' THEN 3 + ELSE 4 + END + ` +} + +/** + * GET /api/tasks/queue - Poll next task for an agent. + * + * Query params: + * - agent: required agent name (or use x-agent-name header) + * - max_capacity: optional integer 1..20 (default 1) + */ +export async function GET(request: NextRequest) { + const auth = requireRole(request, 'operator') + if ('error' in auth) return NextResponse.json({ error: auth.error }, { status: auth.status }) + + try { + const db = getDatabase() + const workspaceId = auth.user.workspace_id + const { searchParams } = new URL(request.url) + + const agent = + (searchParams.get('agent') || '').trim() || + (request.headers.get('x-agent-name') || '').trim() + + if (!agent) { + return NextResponse.json({ error: 'Missing agent. Provide ?agent=... or x-agent-name header.' }, { status: 400 }) + } + + const maxCapacityRaw = searchParams.get('max_capacity') || '1' + if (!/^\d+$/.test(maxCapacityRaw)) { + return NextResponse.json({ error: 'Invalid max_capacity. Expected integer 1..20.' }, { status: 400 }) + } + const maxCapacity = Number(maxCapacityRaw) + if (!Number.isInteger(maxCapacity) || maxCapacity < 1 || maxCapacity > 20) { + return NextResponse.json({ error: 'Invalid max_capacity. Expected integer 1..20.' }, { status: 400 }) + } + + const now = Math.floor(Date.now() / 1000) + + const currentTask = db.prepare(` + SELECT * + FROM tasks + WHERE workspace_id = ? AND assigned_to = ? AND status = 'in_progress' + ORDER BY updated_at DESC + LIMIT 1 + `).get(workspaceId, agent) as any | undefined + + if (currentTask) { + return NextResponse.json({ + task: mapTaskRow(currentTask), + reason: 'continue_current' as QueueReason, + agent, + timestamp: now, + }) + } + + const inProgressCount = (db.prepare(` + SELECT COUNT(*) as c + FROM tasks + WHERE workspace_id = ? AND assigned_to = ? AND status = 'in_progress' + `).get(workspaceId, agent) as { c: number }).c + + if (inProgressCount >= maxCapacity) { + return NextResponse.json({ + task: null, + reason: 'at_capacity' as QueueReason, + agent, + timestamp: now, + }) + } + + // Best-effort atomic pickup loop for race safety. + for (let attempt = 0; attempt < 5; attempt += 1) { + const candidate = db.prepare(` + SELECT * + FROM tasks + WHERE workspace_id = ? + AND status IN ('assigned', 'inbox') + AND (assigned_to IS NULL OR assigned_to = ?) + ORDER BY ${priorityRankSql()} ASC, due_date ASC NULLS LAST, created_at ASC + LIMIT 1 + `).get(workspaceId, agent) as any | undefined + + if (!candidate) break + + const claimed = db.prepare(` + UPDATE tasks + SET status = 'in_progress', assigned_to = ?, updated_at = ? + WHERE id = ? AND workspace_id = ? + AND status IN ('assigned', 'inbox') + AND (assigned_to IS NULL OR assigned_to = ?) + `).run(agent, now, candidate.id, workspaceId, agent) + + if (claimed.changes > 0) { + const task = db.prepare('SELECT * FROM tasks WHERE id = ? AND workspace_id = ?').get(candidate.id, workspaceId) as any + return NextResponse.json({ + task: mapTaskRow(task), + reason: 'assigned' as QueueReason, + agent, + timestamp: now, + }) + } + } + + return NextResponse.json({ + task: null, + reason: 'no_tasks_available' as QueueReason, + agent, + timestamp: now, + }) + } catch (error) { + logger.error({ err: error }, 'GET /api/tasks/queue error') + return NextResponse.json({ error: 'Failed to poll task queue' }, { status: 500 }) + } +} diff --git a/tests/task-queue.spec.ts b/tests/task-queue.spec.ts new file mode 100644 index 0000000..63b7966 --- /dev/null +++ b/tests/task-queue.spec.ts @@ -0,0 +1,69 @@ +import { expect, test } from '@playwright/test' +import { API_KEY_HEADER, createTestTask, deleteTestTask } from './helpers' + +test.describe('Task Queue API', () => { + const cleanup: number[] = [] + + test.afterEach(async ({ request }) => { + for (const id of cleanup) { + await deleteTestTask(request, id).catch(() => {}) + } + cleanup.length = 0 + }) + + test('picks the next task and marks it in_progress for agent', async ({ request }) => { + const low = await createTestTask(request, { priority: 'low', status: 'inbox' }) + const critical = await createTestTask(request, { priority: 'critical', status: 'inbox' }) + cleanup.push(low.id, critical.id) + + const res = await request.get('/api/tasks/queue?agent=queue-agent', { headers: API_KEY_HEADER }) + expect(res.status()).toBe(200) + const body = await res.json() + + expect(body.reason).toBe('assigned') + expect(body.task).toBeTruthy() + expect(body.task.id).toBe(critical.id) + expect(body.task.status).toBe('in_progress') + expect(body.task.assigned_to).toBe('queue-agent') + }) + + test('returns current in_progress task as continue_current', async ({ request }) => { + const task = await createTestTask(request, { + status: 'in_progress', + assigned_to: 'queue-agent-2', + priority: 'high', + }) + cleanup.push(task.id) + + const res = await request.get('/api/tasks/queue?agent=queue-agent-2', { headers: API_KEY_HEADER }) + expect(res.status()).toBe(200) + const body = await res.json() + expect(body.reason).toBe('continue_current') + expect(body.task?.id).toBe(task.id) + }) + + test('validates max_capacity input', async ({ request }) => { + const res = await request.get('/api/tasks/queue?agent=queue-agent-empty&max_capacity=999', { + headers: API_KEY_HEADER, + }) + expect(res.status()).toBe(400) + }) + + test('uses x-agent-name header when query param is omitted', async ({ request }) => { + const task = await createTestTask(request, { + status: 'assigned', + assigned_to: 'header-agent', + priority: 'high', + }) + cleanup.push(task.id) + + const res = await request.get('/api/tasks/queue', { + headers: { ...API_KEY_HEADER, 'x-agent-name': 'header-agent' }, + }) + expect(res.status()).toBe(200) + const body = await res.json() + expect(body.reason).toBe('assigned') + expect(body.agent).toBe('header-agent') + expect(body.task?.id).toBe(task.id) + }) +})