fix: task board SSE wiring, priority enum, webhook event, auto-advance, parallel broadcast (#73) (#89)

- Wire task board panel into Zustand store for real-time SSE updates
  instead of local useState; add useSmartPoll fallback when SSE disconnects
- Fix priority enum mismatch: UI now uses 'critical' matching the Zod
  validation schema instead of 'urgent'
- Add 'task.status_changed' to webhook EVENT_MAP so external consumers
  receive status transition events
- Auto-advance task to 'done' column when aegis quality review approves,
  broadcasting task.status_changed for real-time UI update
- Parallelize broadcast loop with Promise.allSettled so N agents execute
  concurrently (~10s) instead of serially (N×10s)

Closes #73
This commit is contained in:
nyk 2026-03-03 16:20:53 +07:00 committed by GitHub
parent 6ce38b13dc
commit 71f2627138
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 61 additions and 49 deletions

View File

@ -4,6 +4,7 @@ import { requireRole } from '@/lib/auth'
import { validateBody, qualityReviewSchema } from '@/lib/validation'
import { mutationLimiter } from '@/lib/rate-limit'
import { logger } from '@/lib/logger'
import { eventBus } from '@/lib/event-bus'
export async function GET(request: NextRequest) {
const auth = requireRole(request, 'viewer')
@ -98,6 +99,17 @@ export async function POST(request: NextRequest) {
{ status, notes }
)
// Auto-advance task to 'done' when aegis approves
if (status === 'approved' && reviewer === 'aegis') {
db.prepare('UPDATE tasks SET status = ?, updated_at = unixepoch() WHERE id = ?')
.run('done', taskId)
eventBus.broadcast('task.status_changed', {
id: taskId,
status: 'done',
updated_at: Math.floor(Date.now() / 1000),
})
}
return NextResponse.json({ success: true, id: result.lastInsertRowid })
} catch (error) {
logger.error({ err: error }, 'POST /api/quality-review error')

View File

@ -42,15 +42,9 @@ export async function POST(
.prepare('SELECT name, session_key FROM agents WHERE name IN (' + Array.from(subscribers).map(() => '?').join(',') + ')')
.all(...Array.from(subscribers)) as Array<{ name: string; session_key?: string }>
let sent = 0
let skipped = 0
for (const agent of agents) {
if (!agent.session_key) {
skipped += 1
continue
}
try {
const results = await Promise.allSettled(
agents.map(async (agent) => {
if (!agent.session_key) return 'skipped'
await runOpenClaw(
[
'gateway',
@ -62,7 +56,6 @@ export async function POST(
],
{ timeoutMs: 10000 }
)
sent += 1
db_helpers.createNotification(
agent.name,
'message',
@ -71,9 +64,15 @@ export async function POST(
'task',
taskId
)
} catch (error) {
skipped += 1
}
return 'sent'
})
)
let sent = 0
let skipped = 0
for (const r of results) {
if (r.status === 'fulfilled' && r.value === 'sent') sent++
else skipped++
}
db_helpers.logActivity(

View File

@ -9,7 +9,7 @@ interface Task {
title: string
description?: string
status: 'inbox' | 'assigned' | 'in_progress' | 'review' | 'quality_review' | 'done'
priority: 'low' | 'medium' | 'high' | 'urgent'
priority: 'low' | 'medium' | 'high' | 'critical' | 'urgent'
assigned_to?: string
created_by: string
created_at: number
@ -55,24 +55,30 @@ const statusColumns = [
{ key: 'done', title: 'Done', color: 'bg-green-500/20 text-green-400' },
]
const priorityColors = {
const priorityColors: Record<string, string> = {
low: 'border-green-500',
medium: 'border-yellow-500',
high: 'border-orange-500',
urgent: 'border-red-500',
critical: 'border-red-500',
}
export function TaskBoardPanel() {
const [tasks, setTasks] = useState<Task[]>([])
const { tasks: storeTasks, setTasks: storeSetTasks, selectedTask, setSelectedTask } = useMissionControl()
const [agents, setAgents] = useState<Agent[]>([])
const [loading, setLoading] = useState(true)
const [error, setError] = useState<string | null>(null)
const [selectedTask, setSelectedTask] = useState<Task | null>(null)
const [aegisMap, setAegisMap] = useState<Record<number, boolean>>({})
const [draggedTask, setDraggedTask] = useState<Task | null>(null)
const [showCreateModal, setShowCreateModal] = useState(false)
const [editingTask, setEditingTask] = useState<Task | null>(null)
const dragCounter = useRef(0)
// Augment store tasks with aegisApproved flag (computed, not stored)
const tasks: Task[] = storeTasks.map(t => ({
...t,
aegisApproved: Boolean(aegisMap[t.id])
}))
// Fetch tasks and agents
const fetchData = useCallback(async () => {
try {
@ -94,43 +100,42 @@ export function TaskBoardPanel() {
const tasksList = tasksData.tasks || []
const taskIds = tasksList.map((task: Task) => task.id)
let aegisMap: Record<number, boolean> = {}
let newAegisMap: Record<number, boolean> = {}
if (taskIds.length > 0) {
try {
const reviewResponse = await fetch(`/api/quality-review?taskIds=${taskIds.join(',')}`)
if (reviewResponse.ok) {
const reviewData = await reviewResponse.json()
const latest = reviewData.latest || {}
aegisMap = Object.fromEntries(
newAegisMap = Object.fromEntries(
Object.entries(latest).map(([id, row]: [string, any]) => [
Number(id),
row?.reviewer === 'aegis' && row?.status === 'approved'
])
)
}
} catch (error) {
aegisMap = {}
} catch {
newAegisMap = {}
}
}
setTasks(
tasksList.map((task: Task) => ({
...task,
aegisApproved: Boolean(aegisMap[task.id])
}))
)
storeSetTasks(tasksList)
setAegisMap(newAegisMap)
setAgents(agentsData.agents || [])
} catch (err) {
setError(err instanceof Error ? err.message : 'An error occurred')
} finally {
setLoading(false)
}
}, [])
}, [storeSetTasks])
useEffect(() => {
fetchData()
}, [fetchData])
// Poll as SSE fallback — pauses when SSE is delivering events
useSmartPoll(fetchData, 30000, { pauseWhenSseConnected: true })
// Group tasks by status
const tasksByStatus = statusColumns.reduce((acc, column) => {
acc[column.key] = tasks.filter(task => task.status === column.key)
@ -161,6 +166,8 @@ export function TaskBoardPanel() {
e.preventDefault()
}
const { updateTask } = useMissionControl()
const handleDrop = async (e: React.DragEvent, newStatus: string) => {
e.preventDefault()
dragCounter.current = 0
@ -171,6 +178,8 @@ export function TaskBoardPanel() {
return
}
const previousStatus = draggedTask.status
try {
if (newStatus === 'done') {
const reviewResponse = await fetch(`/api/quality-review?taskId=${draggedTask.id}`)
@ -184,14 +193,11 @@ export function TaskBoardPanel() {
}
}
// Optimistically update UI
setTasks(prevTasks =>
prevTasks.map(task =>
task.id === draggedTask.id
? { ...task, status: newStatus as Task['status'], updated_at: Math.floor(Date.now() / 1000) }
: task
)
)
// Optimistically update via Zustand store
updateTask(draggedTask.id, {
status: newStatus as Task['status'],
updated_at: Math.floor(Date.now() / 1000)
})
// Update on server
const response = await fetch('/api/tasks', {
@ -207,14 +213,8 @@ export function TaskBoardPanel() {
throw new Error(data.error || 'Failed to update task status')
}
} catch (err) {
// Revert optimistic update
setTasks(prevTasks =>
prevTasks.map(task =>
task.id === draggedTask.id
? { ...task, status: draggedTask.status }
: task
)
)
// Revert optimistic update via Zustand store
updateTask(draggedTask.id, { status: previousStatus })
setError(err instanceof Error ? err.message : 'Failed to update task status')
} finally {
setDraggedTask(null)
@ -349,7 +349,7 @@ export function TaskBoardPanel() {
</span>
)}
<span className={`text-xs px-2 py-1 rounded font-medium ${
task.priority === 'urgent' ? 'bg-red-500/20 text-red-400' :
task.priority === 'critical' ? 'bg-red-500/20 text-red-400' :
task.priority === 'high' ? 'bg-orange-500/20 text-orange-400' :
task.priority === 'medium' ? 'bg-yellow-500/20 text-yellow-400' :
'bg-green-500/20 text-green-400'
@ -886,7 +886,7 @@ function CreateTaskModal({
<option value="low">Low</option>
<option value="medium">Medium</option>
<option value="high">High</option>
<option value="urgent">Urgent</option>
<option value="critical">Critical</option>
</select>
</div>
@ -1044,7 +1044,7 @@ function EditTaskModal({
<option value="low">Low</option>
<option value="medium">Medium</option>
<option value="high">High</option>
<option value="urgent">Urgent</option>
<option value="critical">Critical</option>
</select>
</div>
</div>

View File

@ -41,6 +41,7 @@ const EVENT_MAP: Record<string, string> = {
'task.created': 'activity.task_created',
'task.updated': 'activity.task_updated',
'task.deleted': 'activity.task_deleted',
'task.status_changed': 'activity.task_status_changed',
}
/**

View File

@ -88,7 +88,7 @@ export interface Task {
title: string
description?: string
status: 'inbox' | 'assigned' | 'in_progress' | 'review' | 'quality_review' | 'done'
priority: 'low' | 'medium' | 'high' | 'urgent'
priority: 'low' | 'medium' | 'high' | 'critical' | 'urgent'
assigned_to?: string
created_by: string
created_at: number