feat(#160): add Real-Time Workload Signals API endpoint

New GET /api/workload endpoint providing system-wide capacity metrics
and throttle recommendations for agent load awareness.

Response sections:
- capacity: active tasks, recent task/error rates, completion throughput
- queue: pending task depth by status/priority, estimated wait time
- agents: online/busy/idle counts, busy ratio, per-agent load distribution
- recommendation: actionable signal (normal|throttle|shed|pause)
  with reasons, submit_ok flag, and suggested delay
- thresholds: current threshold configuration for transparency

Recommendation engine factors:
- Queue depth (20/50/100 thresholds)
- Agent saturation ratio (80%/95%)
- Error rate in last 5 minutes (10%/25%)
- Agent availability (pause if none online)

Fixes #160
This commit is contained in:
Bhavikprit 2026-03-04 23:12:45 +04:00
parent 4cb86bc80b
commit b38ad43272
1 changed files with 294 additions and 0 deletions

View File

@ -0,0 +1,294 @@
import { NextRequest, NextResponse } from 'next/server';
import { getDatabase } from '@/lib/db';
import { requireRole } from '@/lib/auth';
import { logger } from '@/lib/logger';
/**
* GET /api/workload - Real-Time Workload Signals
*
* Provides system-wide capacity metrics and throttle recommendations
* so agents can make informed decisions about work submission.
*
* Response:
* capacity - Current system capacity metrics
* queue - Task queue depth and breakdown
* agents - Agent availability and load distribution
* recommendation - Actionable signal: normal | throttle | shed | pause
* thresholds - Current threshold configuration
*
* Agents should call this before submitting new work to avoid
* cascading failures and SLO breaches.
*/
export async function GET(request: NextRequest) {
const auth = requireRole(request, 'viewer');
if ('error' in auth) return NextResponse.json({ error: auth.error }, { status: auth.status });
try {
const db = getDatabase();
const workspaceId = auth.user.workspace_id ?? 1;
const now = Math.floor(Date.now() / 1000);
// --- Capacity metrics ---
const capacity = buildCapacityMetrics(db, workspaceId, now);
// --- Queue depth ---
const queue = buildQueueMetrics(db, workspaceId);
// --- Agent availability ---
const agents = buildAgentMetrics(db, workspaceId, now);
// --- Recommendation ---
const recommendation = computeRecommendation(capacity, queue, agents);
return NextResponse.json({
timestamp: now,
workspace_id: workspaceId,
capacity,
queue,
agents,
recommendation,
thresholds: THRESHOLDS,
});
} catch (error) {
logger.error({ err: error }, 'GET /api/workload error');
return NextResponse.json({ error: 'Failed to fetch workload signals' }, { status: 500 });
}
}
// Configurable thresholds for recommendation engine
const THRESHOLDS = {
queue_depth_normal: 20,
queue_depth_throttle: 50,
queue_depth_shed: 100,
busy_agent_ratio_throttle: 0.8,
busy_agent_ratio_shed: 0.95,
error_rate_throttle: 0.1,
error_rate_shed: 0.25,
recent_window_seconds: 300, // 5 minutes for recent activity
};
interface CapacityMetrics {
active_tasks: number;
tasks_last_5m: number;
errors_last_5m: number;
error_rate_5m: number;
completions_last_hour: number;
avg_completion_rate_per_hour: number;
}
interface QueueMetrics {
total_pending: number;
by_status: Record<string, number>;
by_priority: Record<string, number>;
oldest_pending_age_seconds: number | null;
estimated_wait_seconds: number | null;
}
interface AgentMetrics {
total: number;
online: number;
busy: number;
idle: number;
offline: number;
busy_ratio: number;
load_distribution: Array<{ agent: string; assigned: number; in_progress: number }>;
}
function buildCapacityMetrics(db: any, workspaceId: number, now: number): CapacityMetrics {
const recentWindow = now - THRESHOLDS.recent_window_seconds;
const hourAgo = now - 3600;
const activeTasks = (db.prepare(
`SELECT COUNT(*) as c FROM tasks WHERE workspace_id = ? AND status IN ('assigned', 'in_progress', 'review', 'quality_review')`
).get(workspaceId) as any).c;
const tasksLast5m = (db.prepare(
`SELECT COUNT(*) as c FROM activities WHERE workspace_id = ? AND created_at >= ? AND type IN ('task_created', 'task_assigned')`
).get(workspaceId, recentWindow) as any).c;
const errorsLast5m = (db.prepare(
`SELECT COUNT(*) as c FROM activities WHERE workspace_id = ? AND created_at >= ? AND (type LIKE '%error%' OR type LIKE '%fail%')`
).get(workspaceId, recentWindow) as any).c;
const totalLast5m = (db.prepare(
`SELECT COUNT(*) as c FROM activities WHERE workspace_id = ? AND created_at >= ?`
).get(workspaceId, recentWindow) as any).c;
const completionsLastHour = (db.prepare(
`SELECT COUNT(*) as c FROM tasks WHERE workspace_id = ? AND status = 'done' AND updated_at >= ?`
).get(workspaceId, hourAgo) as any).c;
// Average completion rate over last 24h
const dayAgo = now - 86400;
const completionsLastDay = (db.prepare(
`SELECT COUNT(*) as c FROM tasks WHERE workspace_id = ? AND status = 'done' AND updated_at >= ?`
).get(workspaceId, dayAgo) as any).c;
return {
active_tasks: activeTasks,
tasks_last_5m: tasksLast5m,
errors_last_5m: errorsLast5m,
error_rate_5m: totalLast5m > 0 ? Math.round((errorsLast5m / totalLast5m) * 10000) / 10000 : 0,
completions_last_hour: completionsLastHour,
avg_completion_rate_per_hour: Math.round((completionsLastDay / 24) * 100) / 100,
};
}
function buildQueueMetrics(db: any, workspaceId: number): QueueMetrics {
const now = Math.floor(Date.now() / 1000);
const pendingStatuses = ['inbox', 'assigned', 'in_progress', 'review', 'quality_review'];
const byStatus = db.prepare(
`SELECT status, COUNT(*) as count FROM tasks WHERE workspace_id = ? AND status IN (${pendingStatuses.map(() => '?').join(',')}) GROUP BY status`
).all(workspaceId, ...pendingStatuses) as Array<{ status: string; count: number }>;
const byPriority = db.prepare(
`SELECT priority, COUNT(*) as count FROM tasks WHERE workspace_id = ? AND status IN (${pendingStatuses.map(() => '?').join(',')}) GROUP BY priority`
).all(workspaceId, ...pendingStatuses) as Array<{ priority: string; count: number }>;
const totalPending = byStatus.reduce((sum, r) => sum + r.count, 0);
const oldest = db.prepare(
`SELECT MIN(created_at) as oldest FROM tasks WHERE workspace_id = ? AND status IN ('inbox', 'assigned')`
).get(workspaceId) as any;
const oldestAge = oldest?.oldest ? now - oldest.oldest : null;
// Estimate wait: pending tasks / completion rate per hour * 3600
const hourAgo = now - 3600;
const completionsLastHour = (db.prepare(
`SELECT COUNT(*) as c FROM tasks WHERE workspace_id = ? AND status = 'done' AND updated_at >= ?`
).get(workspaceId, hourAgo) as any).c;
const estimatedWait = completionsLastHour > 0
? Math.round((totalPending / completionsLastHour) * 3600)
: null;
return {
total_pending: totalPending,
by_status: Object.fromEntries(byStatus.map(r => [r.status, r.count])),
by_priority: Object.fromEntries(byPriority.map(r => [r.priority, r.count])),
oldest_pending_age_seconds: oldestAge,
estimated_wait_seconds: estimatedWait,
};
}
function buildAgentMetrics(db: any, workspaceId: number, now: number): AgentMetrics {
const agentStatuses = db.prepare(
`SELECT status, COUNT(*) as count FROM agents WHERE workspace_id = ? GROUP BY status`
).all(workspaceId) as Array<{ status: string; count: number }>;
const statusMap: Record<string, number> = {};
let total = 0;
for (const row of agentStatuses) {
statusMap[row.status] = row.count;
total += row.count;
}
const online = (statusMap['idle'] || 0) + (statusMap['busy'] || 0);
const busy = statusMap['busy'] || 0;
const idle = statusMap['idle'] || 0;
const offline = statusMap['offline'] || 0;
// Load distribution per agent
const loadDist = db.prepare(`
SELECT a.name as agent,
SUM(CASE WHEN t.status = 'assigned' THEN 1 ELSE 0 END) as assigned,
SUM(CASE WHEN t.status = 'in_progress' THEN 1 ELSE 0 END) as in_progress
FROM agents a
LEFT JOIN tasks t ON t.assigned_to = a.name AND t.workspace_id = a.workspace_id AND t.status IN ('assigned', 'in_progress')
WHERE a.workspace_id = ? AND a.status != 'offline'
GROUP BY a.name
ORDER BY (assigned + in_progress) DESC
`).all(workspaceId) as Array<{ agent: string; assigned: number; in_progress: number }>;
return {
total,
online,
busy,
idle,
offline,
busy_ratio: online > 0 ? Math.round((busy / online) * 100) / 100 : 0,
load_distribution: loadDist,
};
}
type RecommendationLevel = 'normal' | 'throttle' | 'shed' | 'pause';
interface Recommendation {
action: RecommendationLevel;
reason: string;
details: string[];
submit_ok: boolean;
suggested_delay_ms: number;
}
function computeRecommendation(
capacity: CapacityMetrics,
queue: QueueMetrics,
agents: AgentMetrics
): Recommendation {
const reasons: string[] = [];
let level: RecommendationLevel = 'normal';
// Check error rate
if (capacity.error_rate_5m >= THRESHOLDS.error_rate_shed) {
level = escalate(level, 'shed');
reasons.push(`High error rate: ${(capacity.error_rate_5m * 100).toFixed(1)}%`);
} else if (capacity.error_rate_5m >= THRESHOLDS.error_rate_throttle) {
level = escalate(level, 'throttle');
reasons.push(`Elevated error rate: ${(capacity.error_rate_5m * 100).toFixed(1)}%`);
}
// Check queue depth
if (queue.total_pending >= THRESHOLDS.queue_depth_shed) {
level = escalate(level, 'shed');
reasons.push(`Queue depth critical: ${queue.total_pending} pending tasks`);
} else if (queue.total_pending >= THRESHOLDS.queue_depth_throttle) {
level = escalate(level, 'throttle');
reasons.push(`Queue depth high: ${queue.total_pending} pending tasks`);
}
// Check agent saturation
if (agents.busy_ratio >= THRESHOLDS.busy_agent_ratio_shed) {
level = escalate(level, 'shed');
reasons.push(`Agent saturation critical: ${(agents.busy_ratio * 100).toFixed(0)}% busy`);
} else if (agents.busy_ratio >= THRESHOLDS.busy_agent_ratio_throttle) {
level = escalate(level, 'throttle');
reasons.push(`Agent saturation high: ${(agents.busy_ratio * 100).toFixed(0)}% busy`);
}
// No online agents = pause
if (agents.online === 0 && agents.total > 0) {
level = 'pause';
reasons.push('No agents online');
}
const delayMap: Record<RecommendationLevel, number> = {
normal: 0,
throttle: 2000,
shed: 10000,
pause: 30000,
};
const actionDescriptions: Record<RecommendationLevel, string> = {
normal: 'System healthy — submit work freely',
throttle: 'System under load — reduce submission rate and defer non-critical work',
shed: 'System overloaded — submit only critical/high-priority work, defer everything else',
pause: 'System unavailable — hold all submissions until capacity returns',
};
return {
action: level,
reason: actionDescriptions[level],
details: reasons.length > 0 ? reasons : ['All metrics within normal bounds'],
submit_ok: level === 'normal' || level === 'throttle',
suggested_delay_ms: delayMap[level],
};
}
function escalate(current: RecommendationLevel, proposed: RecommendationLevel): RecommendationLevel {
const order: RecommendationLevel[] = ['normal', 'throttle', 'shed', 'pause'];
return order.indexOf(proposed) > order.indexOf(current) ? proposed : current;
}