feat: add direct CLI integration for gateway-free tool connections

- Add migration 016 for direct_connections table
- Add POST/GET/DELETE /api/connect for CLI tool registration
- Enhance heartbeat POST to accept connection_id and inline token_usage
- Add connectSchema to validation
- Add connection.created/disconnected event types to event bus
- Show direct CLI connections in gateway manager panel
- Add 5 E2E tests for connection lifecycle
- Add CLI integration documentation (docs/cli-integration.md)
- Fix openapi.json brace mismatch on line 642 (Phase 2 bug)
- Add /api/connect endpoints to OpenAPI spec
This commit is contained in:
Nyk 2026-03-02 11:45:12 +07:00
parent e88942e8f8
commit f7aa1db27e
9 changed files with 6470 additions and 689 deletions

121
docs/cli-integration.md Normal file
View File

@ -0,0 +1,121 @@
# Direct CLI Integration
Connect CLI tools (Claude Code, Codex, custom agents) directly to Mission Control without a gateway.
## Quick Start
### 1. Register a connection
```bash
curl -X POST http://localhost:3000/api/connect \
-H "Content-Type: application/json" \
-H "x-api-key: YOUR_API_KEY" \
-d '{
"tool_name": "claude-code",
"tool_version": "1.0.0",
"agent_name": "my-agent",
"agent_role": "developer"
}'
```
Response:
```json
{
"connection_id": "550e8400-e29b-41d4-a716-446655440000",
"agent_id": 42,
"agent_name": "my-agent",
"status": "connected",
"sse_url": "/api/events",
"heartbeat_url": "/api/agents/42/heartbeat",
"token_report_url": "/api/tokens"
}
```
- If `agent_name` doesn't exist, it's auto-created.
- Previous connections for the same agent are automatically deactivated.
### 2. Heartbeat loop
Send heartbeats to stay alive and optionally report token usage:
```bash
curl -X POST http://localhost:3000/api/agents/42/heartbeat \
-H "Content-Type: application/json" \
-H "x-api-key: YOUR_API_KEY" \
-d '{
"connection_id": "550e8400-e29b-41d4-a716-446655440000",
"token_usage": {
"model": "claude-sonnet-4",
"inputTokens": 1500,
"outputTokens": 800
}
}'
```
Response includes work items (mentions, assigned tasks, notifications) plus `"token_recorded": true` if usage was reported.
Recommended heartbeat interval: **30 seconds**.
### 3. Subscribe to events (SSE)
```bash
curl -N http://localhost:3000/api/events \
-H "x-api-key: YOUR_API_KEY"
```
Receives real-time events: task assignments, mentions, agent status changes, etc.
### 4. Report token usage
For bulk token reporting (separate from heartbeat):
```bash
curl -X POST http://localhost:3000/api/tokens \
-H "Content-Type: application/json" \
-H "x-api-key: YOUR_API_KEY" \
-d '{
"model": "claude-sonnet-4",
"sessionId": "my-agent:chat",
"inputTokens": 5000,
"outputTokens": 2000
}'
```
### 5. Disconnect
```bash
curl -X DELETE http://localhost:3000/api/connect \
-H "Content-Type: application/json" \
-H "x-api-key: YOUR_API_KEY" \
-d '{"connection_id": "550e8400-e29b-41d4-a716-446655440000"}'
```
Sets the agent offline if no other active connections exist.
## API Reference
| Method | Endpoint | Auth | Description |
|--------|----------|------|-------------|
| POST | `/api/connect` | operator | Register CLI connection |
| GET | `/api/connect` | viewer | List all connections |
| DELETE | `/api/connect` | operator | Disconnect by connection_id |
| POST | `/api/agents/{id}/heartbeat` | operator | Heartbeat with optional token reporting |
| GET | `/api/events` | viewer | SSE event stream |
| POST | `/api/tokens` | operator | Report token usage |
## Connection Lifecycle
```
POST /api/connect → Agent set online
Heartbeat loop (30s) → Reports tokens, receives work items
DELETE /api/connect → Agent set offline (if no other connections)
```
## Notes
- Each agent can only have one active connection at a time. A new `POST /api/connect` for the same agent deactivates the previous connection.
- The `sessionId` format for token reporting follows `{agentName}:{chatType}` convention (e.g., `my-agent:chat`, `my-agent:cli`).
- Heartbeat responses include pending work items (assigned tasks, mentions, notifications) so CLI tools can act on them.

File diff suppressed because it is too large Load Diff

View File

@ -167,8 +167,13 @@ export async function GET(
} }
/** /**
* POST /api/agents/[id]/heartbeat - Manual heartbeat trigger * POST /api/agents/[id]/heartbeat - Enhanced heartbeat
* Allows manual heartbeat checks from UI or scripts *
* Accepts optional body:
* - connection_id: update direct_connections.last_heartbeat
* - status: agent status override
* - last_activity: activity description
* - token_usage: { model, inputTokens, outputTokens } for inline token reporting
*/ */
export async function POST( export async function POST(
request: NextRequest, request: NextRequest,
@ -177,6 +182,51 @@ export async function POST(
const auth = requireRole(request, 'operator'); const auth = requireRole(request, 'operator');
if ('error' in auth) return NextResponse.json({ error: auth.error }, { status: auth.status }); if ('error' in auth) return NextResponse.json({ error: auth.error }, { status: auth.status });
// Reuse GET logic for manual triggers let body: any = {};
return GET(request, { params }); try {
body = await request.json();
} catch {
// No body is fine — fall through to standard heartbeat
}
const { connection_id, token_usage } = body;
const db = getDatabase();
const now = Math.floor(Date.now() / 1000);
// Update direct connection heartbeat if connection_id provided
if (connection_id) {
db.prepare('UPDATE direct_connections SET last_heartbeat = ?, updated_at = ? WHERE connection_id = ? AND status = ?')
.run(now, now, connection_id, 'connected');
}
// Inline token reporting
let tokenRecorded = false;
if (token_usage && token_usage.model && token_usage.inputTokens != null && token_usage.outputTokens != null) {
const resolvedParams = await params;
const agentId = resolvedParams.id;
let agent: any;
if (isNaN(Number(agentId))) {
agent = db.prepare('SELECT * FROM agents WHERE name = ?').get(agentId);
} else {
agent = db.prepare('SELECT * FROM agents WHERE id = ?').get(Number(agentId));
}
if (agent) {
const sessionId = `${agent.name}:cli`;
db.prepare(
`INSERT INTO token_usage (model, session_id, input_tokens, output_tokens, created_at)
VALUES (?, ?, ?, ?, ?)`
).run(token_usage.model, sessionId, token_usage.inputTokens, token_usage.outputTokens, now);
tokenRecorded = true;
}
}
// Reuse GET logic for work-items check, then augment response
const getResponse = await GET(request, { params });
const getBody = await getResponse.json();
return NextResponse.json({
...getBody,
token_recorded: tokenRecorded,
});
} }

View File

@ -0,0 +1,144 @@
import { NextRequest, NextResponse } from 'next/server'
import { getDatabase, db_helpers } from '@/lib/db'
import { requireRole } from '@/lib/auth'
import { validateBody, connectSchema } from '@/lib/validation'
import { eventBus } from '@/lib/event-bus'
import { randomUUID } from 'crypto'
/**
* POST /api/connect Register a direct CLI connection
*
* Auto-creates agent if name doesn't exist, deactivates previous connections
* for the same agent, and returns connection details + helper URLs.
*/
export async function POST(request: NextRequest) {
const auth = requireRole(request, 'operator')
if ('error' in auth) return NextResponse.json({ error: auth.error }, { status: auth.status })
const validation = await validateBody(request, connectSchema)
if ('error' in validation) return validation.error
const { tool_name, tool_version, agent_name, agent_role, metadata } = validation.data
const db = getDatabase()
const now = Math.floor(Date.now() / 1000)
// Find or create agent
let agent = db.prepare('SELECT * FROM agents WHERE name = ?').get(agent_name) as any
if (!agent) {
const result = db.prepare(
`INSERT INTO agents (name, role, status, created_at, updated_at)
VALUES (?, ?, 'online', ?, ?)`
).run(agent_name, agent_role || 'cli', now, now)
agent = { id: result.lastInsertRowid, name: agent_name }
db_helpers.logActivity('agent_created', 'agent', agent.id as number, 'system',
`Auto-created agent "${agent_name}" via direct CLI connection`)
eventBus.broadcast('agent.created', { id: agent.id, name: agent_name })
} else {
// Set agent online
db.prepare('UPDATE agents SET status = ?, updated_at = ? WHERE id = ?')
.run('online', now, agent.id)
eventBus.broadcast('agent.status_changed', { id: agent.id, name: agent.name, status: 'online' })
}
// Deactivate previous connections for this agent
db.prepare(
`UPDATE direct_connections SET status = 'disconnected', updated_at = ? WHERE agent_id = ? AND status = 'connected'`
).run(now, agent.id)
// Create new connection
const connectionId = randomUUID()
db.prepare(
`INSERT INTO direct_connections (agent_id, tool_name, tool_version, connection_id, status, last_heartbeat, metadata, created_at, updated_at)
VALUES (?, ?, ?, ?, 'connected', ?, ?, ?, ?)`
).run(agent.id, tool_name, tool_version || null, connectionId, now, metadata ? JSON.stringify(metadata) : null, now, now)
db_helpers.logActivity('connection_created', 'agent', agent.id as number, agent_name,
`CLI connection established via ${tool_name}${tool_version ? ` v${tool_version}` : ''}`)
eventBus.broadcast('connection.created', {
connection_id: connectionId,
agent_id: agent.id,
agent_name,
tool_name,
})
return NextResponse.json({
connection_id: connectionId,
agent_id: agent.id,
agent_name,
status: 'connected',
sse_url: `/api/events`,
heartbeat_url: `/api/agents/${agent.id}/heartbeat`,
token_report_url: `/api/tokens`,
})
}
/**
* GET /api/connect List all direct connections
*/
export async function GET(request: NextRequest) {
const auth = requireRole(request, 'viewer')
if ('error' in auth) return NextResponse.json({ error: auth.error }, { status: auth.status })
const db = getDatabase()
const connections = db.prepare(`
SELECT dc.*, a.name as agent_name, a.status as agent_status, a.role as agent_role
FROM direct_connections dc
JOIN agents a ON dc.agent_id = a.id
ORDER BY dc.created_at DESC
`).all()
return NextResponse.json({ connections })
}
/**
* DELETE /api/connect Disconnect by connection_id
*/
export async function DELETE(request: NextRequest) {
const auth = requireRole(request, 'operator')
if ('error' in auth) return NextResponse.json({ error: auth.error }, { status: auth.status })
let body: any
try {
body = await request.json()
} catch {
return NextResponse.json({ error: 'Invalid request body' }, { status: 400 })
}
const { connection_id } = body
if (!connection_id) {
return NextResponse.json({ error: 'connection_id is required' }, { status: 400 })
}
const db = getDatabase()
const now = Math.floor(Date.now() / 1000)
const conn = db.prepare('SELECT * FROM direct_connections WHERE connection_id = ?').get(connection_id) as any
if (!conn) {
return NextResponse.json({ error: 'Connection not found' }, { status: 404 })
}
db.prepare('UPDATE direct_connections SET status = ?, updated_at = ? WHERE connection_id = ?')
.run('disconnected', now, connection_id)
// Check if agent has other active connections; if not, set offline
const otherActive = db.prepare(
'SELECT COUNT(*) as count FROM direct_connections WHERE agent_id = ? AND status = ? AND connection_id != ?'
).get(conn.agent_id, 'connected', connection_id) as any
if (!otherActive?.count) {
db.prepare('UPDATE agents SET status = ?, updated_at = ? WHERE id = ?')
.run('offline', now, conn.agent_id)
}
const agent = db.prepare('SELECT name FROM agents WHERE id = ?').get(conn.agent_id) as any
db_helpers.logActivity('connection_disconnected', 'agent', conn.agent_id, agent?.name || 'unknown',
`CLI connection disconnected (${conn.tool_name})`)
eventBus.broadcast('connection.disconnected', {
connection_id,
agent_id: conn.agent_id,
agent_name: agent?.name,
})
return NextResponse.json({ status: 'disconnected', connection_id })
}

View File

@ -20,8 +20,24 @@ interface Gateway {
updated_at: number updated_at: number
} }
interface DirectConnection {
id: number
agent_id: number
tool_name: string
tool_version: string | null
connection_id: string
status: string
last_heartbeat: number | null
metadata: string | null
created_at: number
agent_name: string
agent_status: string
agent_role: string
}
export function MultiGatewayPanel() { export function MultiGatewayPanel() {
const [gateways, setGateways] = useState<Gateway[]>([]) const [gateways, setGateways] = useState<Gateway[]>([])
const [directConnections, setDirectConnections] = useState<DirectConnection[]>([])
const [loading, setLoading] = useState(true) const [loading, setLoading] = useState(true)
const [showAdd, setShowAdd] = useState(false) const [showAdd, setShowAdd] = useState(false)
const [probing, setProbing] = useState<number | null>(null) const [probing, setProbing] = useState<number | null>(null)
@ -37,7 +53,15 @@ export function MultiGatewayPanel() {
setLoading(false) setLoading(false)
}, []) }, [])
useEffect(() => { fetchGateways() }, [fetchGateways]) const fetchDirectConnections = useCallback(async () => {
try {
const res = await fetch('/api/connect')
const data = await res.json()
setDirectConnections(data.connections || [])
} catch { /* ignore */ }
}, [])
useEffect(() => { fetchGateways(); fetchDirectConnections() }, [fetchGateways, fetchDirectConnections])
const setPrimary = async (gw: Gateway) => { const setPrimary = async (gw: Gateway) => {
await fetch('/api/gateways', { await fetch('/api/gateways', {
@ -75,6 +99,17 @@ export function MultiGatewayPanel() {
setProbing(null) setProbing(null)
} }
const disconnectCli = async (connectionId: string) => {
try {
await fetch('/api/connect', {
method: 'DELETE',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ connection_id: connectionId }),
})
fetchDirectConnections()
} catch { /* ignore */ }
}
return ( return (
<div className="p-4 md:p-6 max-w-5xl mx-auto space-y-6"> <div className="p-4 md:p-6 max-w-5xl mx-auto space-y-6">
{/* Header */} {/* Header */}
@ -146,6 +181,70 @@ export function MultiGatewayPanel() {
))} ))}
</div> </div>
)} )}
{/* Direct CLI Connections */}
<div>
<div className="flex items-center justify-between mb-3">
<div>
<h3 className="text-sm font-semibold text-foreground">Direct CLI Connections</h3>
<p className="text-xs text-muted-foreground mt-0.5">
CLI tools connected directly without a gateway
</p>
</div>
<button
onClick={fetchDirectConnections}
className="h-7 px-2.5 rounded-md text-2xs font-medium bg-secondary text-foreground hover:bg-secondary/80 transition-smooth"
>
Refresh
</button>
</div>
{directConnections.length === 0 ? (
<div className="text-center py-8 bg-card border border-border rounded-lg">
<p className="text-xs text-muted-foreground">No direct CLI connections</p>
<p className="text-2xs text-muted-foreground mt-1">
Use <code className="font-mono bg-secondary px-1 rounded">POST /api/connect</code> to register a CLI tool
</p>
</div>
) : (
<div className="space-y-2">
{directConnections.map(conn => (
<div key={conn.id} className="bg-card border border-border rounded-lg p-4">
<div className="flex items-start justify-between gap-3">
<div className="flex-1 min-w-0">
<div className="flex items-center gap-2">
<span className={`w-2 h-2 rounded-full ${conn.status === 'connected' ? 'bg-green-500' : 'bg-red-500'}`} />
<span className="text-sm font-semibold text-foreground">{conn.agent_name}</span>
<span className="text-2xs px-1.5 py-0.5 rounded bg-blue-500/20 text-blue-400 border border-blue-500/30 font-medium">
{conn.tool_name}{conn.tool_version ? ` v${conn.tool_version}` : ''}
</span>
<span className={`text-2xs px-1.5 py-0.5 rounded font-medium ${
conn.status === 'connected'
? 'bg-green-500/20 text-green-400 border border-green-500/30'
: 'bg-red-500/20 text-red-400 border border-red-500/30'
}`}>
{conn.status.toUpperCase()}
</span>
</div>
<div className="flex items-center gap-4 mt-1.5 text-xs text-muted-foreground">
<span>Role: {conn.agent_role || 'cli'}</span>
<span>Heartbeat: {conn.last_heartbeat ? new Date(conn.last_heartbeat * 1000).toLocaleString() : 'Never'}</span>
<span className="font-mono text-2xs">{conn.connection_id.slice(0, 8)}...</span>
</div>
</div>
{conn.status === 'connected' && (
<button
onClick={() => disconnectCli(conn.connection_id)}
className="h-7 px-2.5 rounded-md text-2xs font-medium text-red-400 hover:bg-red-500/10 transition-smooth"
>
Disconnect
</button>
)}
</div>
</div>
))}
</div>
)}
</div>
</div> </div>
) )
} }

View File

@ -28,6 +28,8 @@ export type EventType =
| 'agent.synced' | 'agent.synced'
| 'agent.status_changed' | 'agent.status_changed'
| 'audit.security' | 'audit.security'
| 'connection.created'
| 'connection.disconnected'
class ServerEventBus extends EventEmitter { class ServerEventBus extends EventEmitter {
private static instance: ServerEventBus | null = null private static instance: ServerEventBus | null = null

View File

@ -436,6 +436,28 @@ const migrations: Migration[] = [
CREATE INDEX IF NOT EXISTS idx_messages_read_at ON messages(read_at); CREATE INDEX IF NOT EXISTS idx_messages_read_at ON messages(read_at);
`) `)
} }
},
{
id: '016_direct_connections',
up: (db) => {
db.exec(`
CREATE TABLE IF NOT EXISTS direct_connections (
id INTEGER PRIMARY KEY AUTOINCREMENT,
agent_id INTEGER NOT NULL REFERENCES agents(id) ON DELETE CASCADE,
tool_name TEXT NOT NULL,
tool_version TEXT,
connection_id TEXT NOT NULL UNIQUE,
status TEXT NOT NULL DEFAULT 'connected',
last_heartbeat INTEGER,
metadata TEXT,
created_at INTEGER NOT NULL DEFAULT (unixepoch()),
updated_at INTEGER NOT NULL DEFAULT (unixepoch())
);
CREATE INDEX IF NOT EXISTS idx_direct_connections_agent_id ON direct_connections(agent_id);
CREATE INDEX IF NOT EXISTS idx_direct_connections_connection_id ON direct_connections(connection_id);
CREATE INDEX IF NOT EXISTS idx_direct_connections_status ON direct_connections(status);
`)
}
} }
] ]

View File

@ -153,3 +153,11 @@ export const accessRequestActionSchema = z.object({
role: z.enum(['admin', 'operator', 'viewer']).default('viewer'), role: z.enum(['admin', 'operator', 'viewer']).default('viewer'),
note: z.string().optional(), note: z.string().optional(),
}) })
export const connectSchema = z.object({
tool_name: z.string().min(1, 'Tool name is required').max(100),
tool_version: z.string().max(50).optional(),
agent_name: z.string().min(1, 'Agent name is required').max(100),
agent_role: z.string().max(100).optional(),
metadata: z.record(z.string(), z.unknown()).optional(),
})

150
tests/direct-cli.spec.ts Normal file
View File

@ -0,0 +1,150 @@
import { test, expect } from '@playwright/test'
import { API_KEY_HEADER } from './helpers'
test.describe('Direct CLI Integration', () => {
const createdConnectionIds: string[] = []
const createdAgentIds: number[] = []
test.afterEach(async ({ request }) => {
// Clean up connections
for (const connId of createdConnectionIds) {
await request.delete('/api/connect', {
headers: API_KEY_HEADER,
data: { connection_id: connId },
})
}
createdConnectionIds.length = 0
// Clean up auto-created agents
for (const agentId of createdAgentIds) {
await request.delete(`/api/agents/${agentId}`, { headers: API_KEY_HEADER })
}
createdAgentIds.length = 0
})
test('POST /api/connect creates connection and auto-creates agent', async ({ request }) => {
const agentName = `e2e-cli-${Date.now()}`
const res = await request.post('/api/connect', {
headers: API_KEY_HEADER,
data: {
tool_name: 'claude-code',
tool_version: '1.0.0',
agent_name: agentName,
agent_role: 'developer',
},
})
expect(res.status()).toBe(200)
const body = await res.json()
expect(body.connection_id).toBeDefined()
expect(body.agent_id).toBeDefined()
expect(body.agent_name).toBe(agentName)
expect(body.status).toBe('connected')
expect(body.sse_url).toBe('/api/events')
expect(body.heartbeat_url).toContain('/api/agents/')
expect(body.token_report_url).toBe('/api/tokens')
createdConnectionIds.push(body.connection_id)
createdAgentIds.push(body.agent_id)
// Verify agent was created
const agentRes = await request.get(`/api/agents/${body.agent_id}`, {
headers: API_KEY_HEADER,
})
expect(agentRes.status()).toBe(200)
const agentBody = await agentRes.json()
expect(agentBody.agent.name).toBe(agentName)
expect(agentBody.agent.status).toBe('online')
})
test('GET /api/connect lists connections', async ({ request }) => {
const agentName = `e2e-cli-list-${Date.now()}`
const postRes = await request.post('/api/connect', {
headers: API_KEY_HEADER,
data: {
tool_name: 'codex',
agent_name: agentName,
},
})
const postBody = await postRes.json()
createdConnectionIds.push(postBody.connection_id)
createdAgentIds.push(postBody.agent_id)
const res = await request.get('/api/connect', { headers: API_KEY_HEADER })
expect(res.status()).toBe(200)
const body = await res.json()
expect(Array.isArray(body.connections)).toBe(true)
const found = body.connections.find((c: any) => c.connection_id === postBody.connection_id)
expect(found).toBeDefined()
expect(found.agent_name).toBe(agentName)
expect(found.tool_name).toBe('codex')
})
test('POST heartbeat with inline token_usage', async ({ request }) => {
const agentName = `e2e-cli-hb-${Date.now()}`
const postRes = await request.post('/api/connect', {
headers: API_KEY_HEADER,
data: {
tool_name: 'claude-code',
agent_name: agentName,
},
})
const postBody = await postRes.json()
createdConnectionIds.push(postBody.connection_id)
createdAgentIds.push(postBody.agent_id)
const hbRes = await request.post(`/api/agents/${postBody.agent_id}/heartbeat`, {
headers: API_KEY_HEADER,
data: {
connection_id: postBody.connection_id,
token_usage: {
model: 'claude-sonnet-4',
inputTokens: 1000,
outputTokens: 500,
},
},
})
expect(hbRes.status()).toBe(200)
const hbBody = await hbRes.json()
expect(hbBody.token_recorded).toBe(true)
expect(hbBody.agent).toBe(agentName)
})
test('DELETE /api/connect disconnects and sets agent offline', async ({ request }) => {
const agentName = `e2e-cli-del-${Date.now()}`
const postRes = await request.post('/api/connect', {
headers: API_KEY_HEADER,
data: {
tool_name: 'claude-code',
agent_name: agentName,
},
})
const postBody = await postRes.json()
createdAgentIds.push(postBody.agent_id)
const delRes = await request.delete('/api/connect', {
headers: API_KEY_HEADER,
data: { connection_id: postBody.connection_id },
})
expect(delRes.status()).toBe(200)
const delBody = await delRes.json()
expect(delBody.status).toBe('disconnected')
// Agent should be offline
const agentRes = await request.get(`/api/agents/${postBody.agent_id}`, {
headers: API_KEY_HEADER,
})
const agentBody = await agentRes.json()
expect(agentBody.agent.status).toBe('offline')
})
test('POST /api/connect requires auth', async ({ request }) => {
const res = await request.post('/api/connect', {
data: {
tool_name: 'claude-code',
agent_name: 'unauthorized-agent',
},
})
expect(res.status()).toBe(401)
})
})