Merge pull request #204 from builderz-labs/fix/188-task-queue
feat(tasks): add queue polling endpoint for agent pickup
This commit is contained in:
commit
e948a1399b
|
|
@ -217,6 +217,7 @@ All endpoints require authentication unless noted. Full reference below.
|
||||||
| `POST` | `/api/agents/message` | operator | Send message to agent |
|
| `POST` | `/api/agents/message` | operator | Send message to agent |
|
||||||
| `GET` | `/api/tasks` | viewer | List tasks (filter: `?status=`, `?assigned_to=`, `?priority=`) |
|
| `GET` | `/api/tasks` | viewer | List tasks (filter: `?status=`, `?assigned_to=`, `?priority=`) |
|
||||||
| `POST` | `/api/tasks` | operator | Create task |
|
| `POST` | `/api/tasks` | operator | Create task |
|
||||||
|
| `GET` | `/api/tasks/queue` | operator | Poll next task for an agent (`?agent=`, optional `?max_capacity=`) |
|
||||||
| `GET` | `/api/tasks/[id]` | viewer | Task details |
|
| `GET` | `/api/tasks/[id]` | viewer | Task details |
|
||||||
| `PUT` | `/api/tasks/[id]` | operator | Update task |
|
| `PUT` | `/api/tasks/[id]` | operator | Update task |
|
||||||
| `DELETE` | `/api/tasks/[id]` | admin | Delete task |
|
| `DELETE` | `/api/tasks/[id]` | admin | Delete task |
|
||||||
|
|
|
||||||
84
openapi.json
84
openapi.json
|
|
@ -5958,6 +5958,90 @@
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
"/api/tasks/queue": {
|
||||||
|
"get": {
|
||||||
|
"tags": [
|
||||||
|
"Tasks"
|
||||||
|
],
|
||||||
|
"summary": "Poll next task for an agent",
|
||||||
|
"operationId": "pollTaskQueue",
|
||||||
|
"parameters": [
|
||||||
|
{
|
||||||
|
"name": "agent",
|
||||||
|
"in": "query",
|
||||||
|
"required": false,
|
||||||
|
"schema": {
|
||||||
|
"type": "string"
|
||||||
|
},
|
||||||
|
"description": "Agent name. Optional when x-agent-name header is provided."
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "max_capacity",
|
||||||
|
"in": "query",
|
||||||
|
"required": false,
|
||||||
|
"schema": {
|
||||||
|
"type": "integer",
|
||||||
|
"minimum": 1,
|
||||||
|
"maximum": 20,
|
||||||
|
"default": 1
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "x-agent-name",
|
||||||
|
"in": "header",
|
||||||
|
"required": false,
|
||||||
|
"schema": {
|
||||||
|
"type": "string"
|
||||||
|
},
|
||||||
|
"description": "Agent attribution header used when `agent` query param is omitted."
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"responses": {
|
||||||
|
"200": {
|
||||||
|
"description": "Queue poll result",
|
||||||
|
"content": {
|
||||||
|
"application/json": {
|
||||||
|
"schema": {
|
||||||
|
"type": "object",
|
||||||
|
"properties": {
|
||||||
|
"task": {
|
||||||
|
"oneOf": [
|
||||||
|
{ "$ref": "#/components/schemas/Task" },
|
||||||
|
{ "type": "null" }
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"reason": {
|
||||||
|
"type": "string",
|
||||||
|
"enum": [
|
||||||
|
"continue_current",
|
||||||
|
"assigned",
|
||||||
|
"at_capacity",
|
||||||
|
"no_tasks_available"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"agent": {
|
||||||
|
"type": "string"
|
||||||
|
},
|
||||||
|
"timestamp": {
|
||||||
|
"type": "integer"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"400": {
|
||||||
|
"$ref": "#/components/responses/BadRequest"
|
||||||
|
},
|
||||||
|
"401": {
|
||||||
|
"$ref": "#/components/responses/Unauthorized"
|
||||||
|
},
|
||||||
|
"403": {
|
||||||
|
"$ref": "#/components/responses/Forbidden"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
"/api/tasks/{id}": {
|
"/api/tasks/{id}": {
|
||||||
"get": {
|
"get": {
|
||||||
"tags": [
|
"tags": [
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,147 @@
|
||||||
|
import { NextRequest, NextResponse } from 'next/server'
|
||||||
|
import { getDatabase } from '@/lib/db'
|
||||||
|
import { requireRole } from '@/lib/auth'
|
||||||
|
import { logger } from '@/lib/logger'
|
||||||
|
|
||||||
|
type QueueReason = 'continue_current' | 'assigned' | 'at_capacity' | 'no_tasks_available'
|
||||||
|
|
||||||
|
function safeParseJson<T>(raw: string | null | undefined, fallback: T): T {
|
||||||
|
if (!raw) return fallback
|
||||||
|
try {
|
||||||
|
return JSON.parse(raw) as T
|
||||||
|
} catch {
|
||||||
|
return fallback
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function mapTaskRow(task: any) {
|
||||||
|
return {
|
||||||
|
...task,
|
||||||
|
tags: safeParseJson(task.tags, [] as string[]),
|
||||||
|
metadata: safeParseJson(task.metadata, {} as Record<string, unknown>),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function priorityRankSql() {
|
||||||
|
return `
|
||||||
|
CASE priority
|
||||||
|
WHEN 'critical' THEN 0
|
||||||
|
WHEN 'high' THEN 1
|
||||||
|
WHEN 'medium' THEN 2
|
||||||
|
WHEN 'low' THEN 3
|
||||||
|
ELSE 4
|
||||||
|
END
|
||||||
|
`
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* GET /api/tasks/queue - Poll next task for an agent.
|
||||||
|
*
|
||||||
|
* Query params:
|
||||||
|
* - agent: required agent name (or use x-agent-name header)
|
||||||
|
* - max_capacity: optional integer 1..20 (default 1)
|
||||||
|
*/
|
||||||
|
export async function GET(request: NextRequest) {
|
||||||
|
const auth = requireRole(request, 'operator')
|
||||||
|
if ('error' in auth) return NextResponse.json({ error: auth.error }, { status: auth.status })
|
||||||
|
|
||||||
|
try {
|
||||||
|
const db = getDatabase()
|
||||||
|
const workspaceId = auth.user.workspace_id
|
||||||
|
const { searchParams } = new URL(request.url)
|
||||||
|
|
||||||
|
const agent =
|
||||||
|
(searchParams.get('agent') || '').trim() ||
|
||||||
|
(request.headers.get('x-agent-name') || '').trim()
|
||||||
|
|
||||||
|
if (!agent) {
|
||||||
|
return NextResponse.json({ error: 'Missing agent. Provide ?agent=... or x-agent-name header.' }, { status: 400 })
|
||||||
|
}
|
||||||
|
|
||||||
|
const maxCapacityRaw = searchParams.get('max_capacity') || '1'
|
||||||
|
if (!/^\d+$/.test(maxCapacityRaw)) {
|
||||||
|
return NextResponse.json({ error: 'Invalid max_capacity. Expected integer 1..20.' }, { status: 400 })
|
||||||
|
}
|
||||||
|
const maxCapacity = Number(maxCapacityRaw)
|
||||||
|
if (!Number.isInteger(maxCapacity) || maxCapacity < 1 || maxCapacity > 20) {
|
||||||
|
return NextResponse.json({ error: 'Invalid max_capacity. Expected integer 1..20.' }, { status: 400 })
|
||||||
|
}
|
||||||
|
|
||||||
|
const now = Math.floor(Date.now() / 1000)
|
||||||
|
|
||||||
|
const currentTask = db.prepare(`
|
||||||
|
SELECT *
|
||||||
|
FROM tasks
|
||||||
|
WHERE workspace_id = ? AND assigned_to = ? AND status = 'in_progress'
|
||||||
|
ORDER BY updated_at DESC
|
||||||
|
LIMIT 1
|
||||||
|
`).get(workspaceId, agent) as any | undefined
|
||||||
|
|
||||||
|
if (currentTask) {
|
||||||
|
return NextResponse.json({
|
||||||
|
task: mapTaskRow(currentTask),
|
||||||
|
reason: 'continue_current' as QueueReason,
|
||||||
|
agent,
|
||||||
|
timestamp: now,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
const inProgressCount = (db.prepare(`
|
||||||
|
SELECT COUNT(*) as c
|
||||||
|
FROM tasks
|
||||||
|
WHERE workspace_id = ? AND assigned_to = ? AND status = 'in_progress'
|
||||||
|
`).get(workspaceId, agent) as { c: number }).c
|
||||||
|
|
||||||
|
if (inProgressCount >= maxCapacity) {
|
||||||
|
return NextResponse.json({
|
||||||
|
task: null,
|
||||||
|
reason: 'at_capacity' as QueueReason,
|
||||||
|
agent,
|
||||||
|
timestamp: now,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Best-effort atomic pickup loop for race safety.
|
||||||
|
for (let attempt = 0; attempt < 5; attempt += 1) {
|
||||||
|
const candidate = db.prepare(`
|
||||||
|
SELECT *
|
||||||
|
FROM tasks
|
||||||
|
WHERE workspace_id = ?
|
||||||
|
AND status IN ('assigned', 'inbox')
|
||||||
|
AND (assigned_to IS NULL OR assigned_to = ?)
|
||||||
|
ORDER BY ${priorityRankSql()} ASC, due_date ASC NULLS LAST, created_at ASC
|
||||||
|
LIMIT 1
|
||||||
|
`).get(workspaceId, agent) as any | undefined
|
||||||
|
|
||||||
|
if (!candidate) break
|
||||||
|
|
||||||
|
const claimed = db.prepare(`
|
||||||
|
UPDATE tasks
|
||||||
|
SET status = 'in_progress', assigned_to = ?, updated_at = ?
|
||||||
|
WHERE id = ? AND workspace_id = ?
|
||||||
|
AND status IN ('assigned', 'inbox')
|
||||||
|
AND (assigned_to IS NULL OR assigned_to = ?)
|
||||||
|
`).run(agent, now, candidate.id, workspaceId, agent)
|
||||||
|
|
||||||
|
if (claimed.changes > 0) {
|
||||||
|
const task = db.prepare('SELECT * FROM tasks WHERE id = ? AND workspace_id = ?').get(candidate.id, workspaceId) as any
|
||||||
|
return NextResponse.json({
|
||||||
|
task: mapTaskRow(task),
|
||||||
|
reason: 'assigned' as QueueReason,
|
||||||
|
agent,
|
||||||
|
timestamp: now,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return NextResponse.json({
|
||||||
|
task: null,
|
||||||
|
reason: 'no_tasks_available' as QueueReason,
|
||||||
|
agent,
|
||||||
|
timestamp: now,
|
||||||
|
})
|
||||||
|
} catch (error) {
|
||||||
|
logger.error({ err: error }, 'GET /api/tasks/queue error')
|
||||||
|
return NextResponse.json({ error: 'Failed to poll task queue' }, { status: 500 })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,69 @@
|
||||||
|
import { expect, test } from '@playwright/test'
|
||||||
|
import { API_KEY_HEADER, createTestTask, deleteTestTask } from './helpers'
|
||||||
|
|
||||||
|
test.describe('Task Queue API', () => {
|
||||||
|
const cleanup: number[] = []
|
||||||
|
|
||||||
|
test.afterEach(async ({ request }) => {
|
||||||
|
for (const id of cleanup) {
|
||||||
|
await deleteTestTask(request, id).catch(() => {})
|
||||||
|
}
|
||||||
|
cleanup.length = 0
|
||||||
|
})
|
||||||
|
|
||||||
|
test('picks the next task and marks it in_progress for agent', async ({ request }) => {
|
||||||
|
const low = await createTestTask(request, { priority: 'low', status: 'inbox' })
|
||||||
|
const critical = await createTestTask(request, { priority: 'critical', status: 'inbox' })
|
||||||
|
cleanup.push(low.id, critical.id)
|
||||||
|
|
||||||
|
const res = await request.get('/api/tasks/queue?agent=queue-agent', { headers: API_KEY_HEADER })
|
||||||
|
expect(res.status()).toBe(200)
|
||||||
|
const body = await res.json()
|
||||||
|
|
||||||
|
expect(body.reason).toBe('assigned')
|
||||||
|
expect(body.task).toBeTruthy()
|
||||||
|
expect(body.task.id).toBe(critical.id)
|
||||||
|
expect(body.task.status).toBe('in_progress')
|
||||||
|
expect(body.task.assigned_to).toBe('queue-agent')
|
||||||
|
})
|
||||||
|
|
||||||
|
test('returns current in_progress task as continue_current', async ({ request }) => {
|
||||||
|
const task = await createTestTask(request, {
|
||||||
|
status: 'in_progress',
|
||||||
|
assigned_to: 'queue-agent-2',
|
||||||
|
priority: 'high',
|
||||||
|
})
|
||||||
|
cleanup.push(task.id)
|
||||||
|
|
||||||
|
const res = await request.get('/api/tasks/queue?agent=queue-agent-2', { headers: API_KEY_HEADER })
|
||||||
|
expect(res.status()).toBe(200)
|
||||||
|
const body = await res.json()
|
||||||
|
expect(body.reason).toBe('continue_current')
|
||||||
|
expect(body.task?.id).toBe(task.id)
|
||||||
|
})
|
||||||
|
|
||||||
|
test('validates max_capacity input', async ({ request }) => {
|
||||||
|
const res = await request.get('/api/tasks/queue?agent=queue-agent-empty&max_capacity=999', {
|
||||||
|
headers: API_KEY_HEADER,
|
||||||
|
})
|
||||||
|
expect(res.status()).toBe(400)
|
||||||
|
})
|
||||||
|
|
||||||
|
test('uses x-agent-name header when query param is omitted', async ({ request }) => {
|
||||||
|
const task = await createTestTask(request, {
|
||||||
|
status: 'assigned',
|
||||||
|
assigned_to: 'header-agent',
|
||||||
|
priority: 'high',
|
||||||
|
})
|
||||||
|
cleanup.push(task.id)
|
||||||
|
|
||||||
|
const res = await request.get('/api/tasks/queue', {
|
||||||
|
headers: { ...API_KEY_HEADER, 'x-agent-name': 'header-agent' },
|
||||||
|
})
|
||||||
|
expect(res.status()).toBe(200)
|
||||||
|
const body = await res.json()
|
||||||
|
expect(body.reason).toBe('assigned')
|
||||||
|
expect(body.agent).toBe('header-agent')
|
||||||
|
expect(body.task?.id).toBe(task.id)
|
||||||
|
})
|
||||||
|
})
|
||||||
Loading…
Reference in New Issue