From e4594c78545969c7c060d753158130337c7e9bb3 Mon Sep 17 00:00:00 2001 From: nyk <93952610+0xNyk@users.noreply.github.com> Date: Tue, 3 Mar 2026 21:06:54 +0700 Subject: [PATCH] feat: aggregate token usage from db with stable agent grouping --- src/app/api/tokens/route.ts | 138 ++++++++++++++++++++++++++++++++---- tests/direct-cli.spec.ts | 8 +++ 2 files changed, 131 insertions(+), 15 deletions(-) diff --git a/src/app/api/tokens/route.ts b/src/app/api/tokens/route.ts index 28b6cd7..24fefd5 100644 --- a/src/app/api/tokens/route.ts +++ b/src/app/api/tokens/route.ts @@ -5,6 +5,7 @@ import { config, ensureDirExists } from '@/lib/config' import { requireRole } from '@/lib/auth' import { getAllGatewaySessions } from '@/lib/sessions' import { logger } from '@/lib/logger' +import { getDatabase } from '@/lib/db' const DATA_PATH = config.tokensPath @@ -12,6 +13,7 @@ interface TokenUsageRecord { id: string model: string sessionId: string + agentName: string timestamp: number inputTokens: number outputTokens: number @@ -53,6 +55,13 @@ const MODEL_PRICING: Record = { 'ollama/qwen2.5-coder:14b': 0.0, } +function extractAgentName(sessionId: string): string { + const trimmed = sessionId.trim() + if (!trimmed) return 'unknown' + const [agent] = trimmed.split(':') + return agent?.trim() || 'unknown' +} + function getModelCost(modelName: string): number { if (MODEL_PRICING[modelName] !== undefined) return MODEL_PRICING[modelName] for (const [model, cost] of Object.entries(MODEL_PRICING)) { @@ -61,24 +70,119 @@ function getModelCost(modelName: string): number { return 1.0 } -/** - * Load token data from persistent file, falling back to deriving from session stores. - */ -async function loadTokenData(): Promise { - // First try loading from persistent token file +interface DbTokenUsageRow { + id: number + model: string + session_id: string + input_tokens: number + output_tokens: number + created_at: number +} + +function loadTokenDataFromDb(): TokenUsageRecord[] { + try { + const db = getDatabase() + const rows = db.prepare(` + SELECT id, model, session_id, input_tokens, output_tokens, created_at + FROM token_usage + ORDER BY created_at DESC, id DESC + LIMIT 10000 + `).all() as DbTokenUsageRow[] + + return rows.map((row) => { + const totalTokens = row.input_tokens + row.output_tokens + const costPer1k = getModelCost(row.model) + return { + id: `db-${row.id}`, + model: row.model, + sessionId: row.session_id, + agentName: extractAgentName(row.session_id), + timestamp: row.created_at * 1000, + inputTokens: row.input_tokens, + outputTokens: row.output_tokens, + totalTokens, + cost: (totalTokens / 1000) * costPer1k, + operation: 'heartbeat', + } + }) + } catch (error) { + logger.warn({ err: error }, 'Failed to load token usage from database') + return [] + } +} + +function normalizeTokenRecord(record: Partial): TokenUsageRecord | null { + if (!record.model || !record.sessionId) return null + const inputTokens = Number(record.inputTokens ?? 0) + const outputTokens = Number(record.outputTokens ?? 0) + const totalTokens = Number(record.totalTokens ?? inputTokens + outputTokens) + const model = String(record.model) + return { + id: String(record.id ?? `${Date.now()}-${Math.random().toString(36).slice(2, 11)}`), + model, + sessionId: String(record.sessionId), + agentName: String(record.agentName ?? extractAgentName(String(record.sessionId))), + timestamp: Number(record.timestamp ?? Date.now()), + inputTokens, + outputTokens, + totalTokens, + cost: Number(record.cost ?? (totalTokens / 1000) * getModelCost(model)), + operation: String(record.operation ?? 'chat_completion'), + duration: record.duration, + } +} + +function dedupeTokenRecords(records: TokenUsageRecord[]): TokenUsageRecord[] { + const seen = new Set() + const deduped: TokenUsageRecord[] = [] + + for (const record of records) { + const key = [ + record.sessionId, + record.model, + record.timestamp, + record.inputTokens, + record.outputTokens, + record.totalTokens, + record.operation, + record.duration ?? '', + ].join('|') + if (seen.has(key)) continue + seen.add(key) + deduped.push(record) + } + + return deduped +} + +async function loadTokenDataFromFile(): Promise { try { ensureDirExists(dirname(DATA_PATH)) await access(DATA_PATH) const data = await readFile(DATA_PATH, 'utf-8') - const records = JSON.parse(data) - if (Array.isArray(records) && records.length > 0) { - return records - } + const parsed = JSON.parse(data) + if (!Array.isArray(parsed)) return [] + + return parsed + .map((record: Partial) => normalizeTokenRecord(record)) + .filter((record): record is TokenUsageRecord => record !== null) } catch { - // File doesn't exist or is empty — derive from sessions + return [] + } +} + +/** + * Load token data from persistent file, falling back to deriving from session stores. + */ +async function loadTokenData(): Promise { + const dbRecords = loadTokenDataFromDb() + const fileRecords = await loadTokenDataFromFile() + const combined = dedupeTokenRecords([...dbRecords, ...fileRecords]).sort((a, b) => b.timestamp - a.timestamp) + if (combined.length > 0) { + return combined } - // Derive token usage from session stores + // Final fallback: derive from in-memory sessions return deriveFromSessions() } @@ -103,6 +207,7 @@ function deriveFromSessions(): TokenUsageRecord[] { id: `session-${session.agent}-${session.key}`, model: session.model || 'unknown', sessionId: `${session.agent}:${session.chatType}`, + agentName: session.agent || 'unknown', timestamp: session.updatedAt, inputTokens, outputTokens, @@ -218,7 +323,7 @@ export async function GET(request: NextRequest) { // Agent aggregation: extract agent name from sessionId (format: "agentName:chatType") const agentGroups = filteredData.reduce((acc, record) => { - const agent = record.sessionId.split(':')[0] || 'unknown' + const agent = record.agentName || extractAgentName(record.sessionId) if (!acc[agent]) acc[agent] = [] acc[agent].push(record) return acc @@ -241,7 +346,7 @@ export async function GET(request: NextRequest) { if (action === 'agent-costs') { const agentGroups = filteredData.reduce((acc, record) => { - const agent = record.sessionId.split(':')[0] || 'unknown' + const agent = record.agentName || extractAgentName(record.sessionId) if (!acc[agent]) acc[agent] = [] acc[agent].push(record) return acc @@ -327,12 +432,13 @@ export async function GET(request: NextRequest) { } if (format === 'csv') { - const headers = ['timestamp', 'model', 'sessionId', 'operation', 'inputTokens', 'outputTokens', 'totalTokens', 'cost', 'duration'] + const headers = ['timestamp', 'agentName', 'model', 'sessionId', 'operation', 'inputTokens', 'outputTokens', 'totalTokens', 'cost', 'duration'] const csvRows = [headers.join(',')] filteredData.forEach(record => { csvRows.push([ new Date(record.timestamp).toISOString(), + record.agentName, record.model, record.sessionId, record.operation, @@ -411,6 +517,7 @@ export async function POST(request: NextRequest) { id: `${Date.now()}-${Math.random().toString(36).substr(2, 9)}`, model, sessionId, + agentName: extractAgentName(sessionId), timestamp: Date.now(), inputTokens, outputTokens, @@ -420,7 +527,8 @@ export async function POST(request: NextRequest) { duration, } - const existingData = await loadTokenData() + // Persist only manually posted usage records in the JSON file. + const existingData = await loadTokenDataFromFile() existingData.unshift(record) if (existingData.length > 10000) { diff --git a/tests/direct-cli.spec.ts b/tests/direct-cli.spec.ts index 04fdb62..42903aa 100644 --- a/tests/direct-cli.spec.ts +++ b/tests/direct-cli.spec.ts @@ -108,6 +108,14 @@ test.describe('Direct CLI Integration', () => { const hbBody = await hbRes.json() expect(hbBody.token_recorded).toBe(true) expect(hbBody.agent).toBe(agentName) + + const costsRes = await request.get('/api/tokens?action=agent-costs&timeframe=hour', { + headers: API_KEY_HEADER, + }) + expect(costsRes.status()).toBe(200) + const costsBody = await costsRes.json() + expect(costsBody.agents).toHaveProperty(agentName) + expect(costsBody.agents[agentName].stats.totalTokens).toBeGreaterThanOrEqual(1500) }) test('DELETE /api/connect disconnects and sets agent offline', async ({ request }) => {