feat: direct Claude API task dispatch (gateway-free) (#473)
* feat: direct Claude API task dispatch (gateway-free) Add a built-in task executor that calls the Anthropic Messages API directly when no OpenClaw gateway is available. This makes the full task lifecycle work out of the box — tasks are actually executed by Claude, not just tracked as metadata. How it works: - Scheduler checks: is a gateway registered? If yes, use gateway dispatch (existing path). If no, check for ANTHROPIC_API_KEY. - When dispatching via direct API: builds prompt from task + agent SOUL, selects model by complexity (Opus/Sonnet/Haiku), calls Claude Messages API, stores response as resolution. - Aegis reviews also work via direct API — same fallback logic. - Token usage is recorded in the token_usage table. - After dispatch, task moves to 'review' for Aegis quality check. Setup: add ANTHROPIC_API_KEY=sk-ant-... to .env.local No gateway, no OpenClaw, no extra dependencies needed. * fix(tui): add missing ansi.blue color function
This commit is contained in:
parent
32447a4b08
commit
78b472a63a
|
|
@ -88,6 +88,7 @@ const ansi = {
|
|||
yellow: (s) => `${ESC}33m${s}${ESC}0m`,
|
||||
red: (s) => `${ESC}31m${s}${ESC}0m`,
|
||||
cyan: (s) => `${ESC}36m${s}${ESC}0m`,
|
||||
blue: (s) => `${ESC}34m${s}${ESC}0m`,
|
||||
magenta: (s) => `${ESC}35m${s}${ESC}0m`,
|
||||
bgBlue: (s) => `${ESC}48;5;17m${ESC}97m${s}${ESC}0m`,
|
||||
bgCyan: (s) => `${ESC}46m${ESC}30m${s}${ESC}0m`,
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@ import { runOpenClaw } from './command'
|
|||
import { callOpenClawGateway } from './openclaw-gateway'
|
||||
import { eventBus } from './event-bus'
|
||||
import { logger } from './logger'
|
||||
import { config } from './config'
|
||||
|
||||
interface DispatchableTask {
|
||||
id: number
|
||||
|
|
@ -157,6 +158,150 @@ function parseAgentResponse(stdout: string): AgentResponseParsed {
|
|||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Direct Claude API dispatch (gateway-free)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
function getAnthropicApiKey(): string | null {
|
||||
return (process.env.ANTHROPIC_API_KEY || '').trim() || null
|
||||
}
|
||||
|
||||
function isGatewayAvailable(): boolean {
|
||||
// Gateway is available if OpenClaw is installed OR a gateway is registered in the DB
|
||||
if (config.openclawHome) return true
|
||||
try {
|
||||
const db = getDatabase()
|
||||
const row = db.prepare('SELECT COUNT(*) as c FROM gateways').get() as { c: number } | undefined
|
||||
return (row?.c ?? 0) > 0
|
||||
} catch {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
function classifyDirectModel(task: DispatchableTask): string {
|
||||
// Check per-agent config override first
|
||||
if (task.agent_config) {
|
||||
try {
|
||||
const cfg = JSON.parse(task.agent_config)
|
||||
if (typeof cfg.dispatchModel === 'string' && cfg.dispatchModel) {
|
||||
// Strip gateway prefixes like "9router/cc/" to get bare model ID
|
||||
return cfg.dispatchModel.replace(/^.*\//, '')
|
||||
}
|
||||
} catch { /* ignore */ }
|
||||
}
|
||||
|
||||
const text = `${task.title} ${task.description ?? ''}`.toLowerCase()
|
||||
const priority = task.priority?.toLowerCase() ?? ''
|
||||
|
||||
// Complex → Opus
|
||||
const complexSignals = [
|
||||
'debug', 'diagnos', 'architect', 'design system', 'security audit',
|
||||
'root cause', 'investigate', 'incident', 'refactor', 'migration',
|
||||
]
|
||||
if (priority === 'critical' || complexSignals.some(s => text.includes(s))) {
|
||||
return 'claude-opus-4-6'
|
||||
}
|
||||
|
||||
// Routine → Haiku
|
||||
const routineSignals = [
|
||||
'status check', 'health check', 'format', 'rename', 'summarize',
|
||||
'translate', 'quick ', 'simple ', 'routine ', 'minor ',
|
||||
]
|
||||
if (routineSignals.some(s => text.includes(s)) && priority !== 'high' && priority !== 'critical') {
|
||||
return 'claude-haiku-4-5-20251001'
|
||||
}
|
||||
|
||||
// Default → Sonnet
|
||||
return 'claude-sonnet-4-6'
|
||||
}
|
||||
|
||||
function getAgentSoulContent(task: DispatchableTask): string | null {
|
||||
try {
|
||||
const db = getDatabase()
|
||||
const row = db.prepare(
|
||||
'SELECT soul_content FROM agents WHERE id = ? AND workspace_id = ?'
|
||||
).get(task.agent_id, task.workspace_id) as { soul_content: string | null } | undefined
|
||||
return row?.soul_content || null
|
||||
} catch {
|
||||
return null
|
||||
}
|
||||
}
|
||||
|
||||
async function callClaudeDirectly(
|
||||
task: DispatchableTask,
|
||||
prompt: string,
|
||||
): Promise<AgentResponseParsed> {
|
||||
const apiKey = getAnthropicApiKey()
|
||||
if (!apiKey) throw new Error('ANTHROPIC_API_KEY not set — cannot dispatch without gateway')
|
||||
|
||||
const model = classifyDirectModel(task)
|
||||
const soul = getAgentSoulContent(task)
|
||||
|
||||
const messages: Array<{ role: string; content: string }> = [
|
||||
{ role: 'user', content: prompt },
|
||||
]
|
||||
|
||||
const body: Record<string, unknown> = {
|
||||
model,
|
||||
max_tokens: 4096,
|
||||
messages,
|
||||
}
|
||||
|
||||
if (soul) {
|
||||
body.system = soul
|
||||
}
|
||||
|
||||
logger.info({ taskId: task.id, model, agent: task.agent_name }, 'Dispatching task via direct Claude API')
|
||||
|
||||
const res = await fetch('https://api.anthropic.com/v1/messages', {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
'x-api-key': apiKey,
|
||||
'anthropic-version': '2023-06-01',
|
||||
},
|
||||
body: JSON.stringify(body),
|
||||
})
|
||||
|
||||
if (!res.ok) {
|
||||
const errorBody = await res.text().catch(() => '')
|
||||
throw new Error(`Claude API ${res.status}: ${errorBody.substring(0, 500)}`)
|
||||
}
|
||||
|
||||
const data = await res.json() as {
|
||||
content: Array<{ type: string; text?: string }>
|
||||
usage?: { input_tokens?: number; output_tokens?: number }
|
||||
}
|
||||
|
||||
const text = data.content
|
||||
?.filter((b: { type: string }) => b.type === 'text')
|
||||
.map((b: { text?: string }) => b.text || '')
|
||||
.join('\n') || null
|
||||
|
||||
// Record token usage
|
||||
if (data.usage) {
|
||||
try {
|
||||
const db = getDatabase()
|
||||
const now = Math.floor(Date.now() / 1000)
|
||||
db.prepare(`
|
||||
INSERT INTO token_usage (model, session_id, input_tokens, output_tokens, total_tokens, cost, created_at, workspace_id)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
||||
`).run(
|
||||
model,
|
||||
`task-${task.id}`,
|
||||
data.usage.input_tokens || 0,
|
||||
data.usage.output_tokens || 0,
|
||||
(data.usage.input_tokens || 0) + (data.usage.output_tokens || 0),
|
||||
0, // cost calculated separately
|
||||
now,
|
||||
task.workspace_id,
|
||||
)
|
||||
} catch { /* non-fatal */ }
|
||||
}
|
||||
|
||||
return { text, sessionId: null }
|
||||
}
|
||||
|
||||
interface ReviewableTask {
|
||||
id: number
|
||||
title: string
|
||||
|
|
@ -262,6 +407,19 @@ export async function runAegisReviews(): Promise<{ ok: boolean; message: string
|
|||
|
||||
try {
|
||||
const prompt = buildReviewPrompt(task)
|
||||
let agentResponse: AgentResponseParsed
|
||||
|
||||
if (!isGatewayAvailable() && getAnthropicApiKey()) {
|
||||
// Direct Claude API review — no gateway needed
|
||||
const reviewTask: DispatchableTask = {
|
||||
id: task.id, title: task.title, description: task.description,
|
||||
status: 'quality_review', priority: 'high', assigned_to: 'aegis',
|
||||
workspace_id: task.workspace_id, agent_name: 'aegis', agent_id: 0,
|
||||
agent_config: null, ticket_prefix: task.ticket_prefix,
|
||||
project_ticket_no: task.project_ticket_no, project_id: null,
|
||||
}
|
||||
agentResponse = await callClaudeDirectly(reviewTask, prompt)
|
||||
} else {
|
||||
// Resolve the gateway agent ID from config, falling back to assigned_to or default
|
||||
const reviewAgent = resolveGatewayAgentIdForReview(task)
|
||||
|
||||
|
|
@ -271,19 +429,17 @@ export async function runAegisReviews(): Promise<{ ok: boolean; message: string
|
|||
idempotencyKey: `aegis-review-${task.id}-${Date.now()}`,
|
||||
deliver: false,
|
||||
}
|
||||
// Use --expect-final to block until the agent completes and returns the full
|
||||
// response payload (payloads[0].text). The two-step agent → agent.wait pattern
|
||||
// only returns lifecycle metadata (runId/status/timestamps) and never includes
|
||||
// the agent's actual text, so Aegis could never parse a verdict.
|
||||
const finalResult = await runOpenClaw(
|
||||
['gateway', 'call', 'agent', '--expect-final', '--timeout', '120000', '--params', JSON.stringify(invokeParams), '--json'],
|
||||
{ timeoutMs: 125_000 }
|
||||
)
|
||||
const finalPayload = parseGatewayJson(finalResult.stdout)
|
||||
?? parseGatewayJson(String((finalResult as any)?.stderr || ''))
|
||||
const agentResponse = parseAgentResponse(
|
||||
agentResponse = parseAgentResponse(
|
||||
finalPayload?.result ? JSON.stringify(finalPayload.result) : finalResult.stdout
|
||||
)
|
||||
}
|
||||
|
||||
if (!agentResponse.text) {
|
||||
throw new Error('Aegis review returned empty response')
|
||||
}
|
||||
|
|
@ -540,8 +696,12 @@ export async function dispatchAssignedTasks(): Promise<{ ok: boolean; message: s
|
|||
: null
|
||||
|
||||
let agentResponse: AgentResponseParsed
|
||||
const useDirectApi = !isGatewayAvailable() && getAnthropicApiKey()
|
||||
|
||||
if (targetSession) {
|
||||
if (useDirectApi && !targetSession) {
|
||||
// Direct Claude API dispatch — no gateway needed
|
||||
agentResponse = await callClaudeDirectly(task, prompt)
|
||||
} else if (targetSession) {
|
||||
// Dispatch to a specific existing session via chat.send
|
||||
logger.info({ taskId: task.id, targetSession, agent: task.agent_name }, 'Dispatching task to targeted session')
|
||||
const sendResult = await callOpenClawGateway<any>(
|
||||
|
|
|
|||
Loading…
Reference in New Issue