feat: aggregate token usage from db with stable agent grouping
This commit is contained in:
parent
33f28d6877
commit
e4594c7854
|
|
@ -5,6 +5,7 @@ import { config, ensureDirExists } from '@/lib/config'
|
|||
import { requireRole } from '@/lib/auth'
|
||||
import { getAllGatewaySessions } from '@/lib/sessions'
|
||||
import { logger } from '@/lib/logger'
|
||||
import { getDatabase } from '@/lib/db'
|
||||
|
||||
const DATA_PATH = config.tokensPath
|
||||
|
||||
|
|
@ -12,6 +13,7 @@ interface TokenUsageRecord {
|
|||
id: string
|
||||
model: string
|
||||
sessionId: string
|
||||
agentName: string
|
||||
timestamp: number
|
||||
inputTokens: number
|
||||
outputTokens: number
|
||||
|
|
@ -53,6 +55,13 @@ const MODEL_PRICING: Record<string, number> = {
|
|||
'ollama/qwen2.5-coder:14b': 0.0,
|
||||
}
|
||||
|
||||
function extractAgentName(sessionId: string): string {
|
||||
const trimmed = sessionId.trim()
|
||||
if (!trimmed) return 'unknown'
|
||||
const [agent] = trimmed.split(':')
|
||||
return agent?.trim() || 'unknown'
|
||||
}
|
||||
|
||||
function getModelCost(modelName: string): number {
|
||||
if (MODEL_PRICING[modelName] !== undefined) return MODEL_PRICING[modelName]
|
||||
for (const [model, cost] of Object.entries(MODEL_PRICING)) {
|
||||
|
|
@ -61,24 +70,119 @@ function getModelCost(modelName: string): number {
|
|||
return 1.0
|
||||
}
|
||||
|
||||
/**
|
||||
* Load token data from persistent file, falling back to deriving from session stores.
|
||||
*/
|
||||
async function loadTokenData(): Promise<TokenUsageRecord[]> {
|
||||
// First try loading from persistent token file
|
||||
interface DbTokenUsageRow {
|
||||
id: number
|
||||
model: string
|
||||
session_id: string
|
||||
input_tokens: number
|
||||
output_tokens: number
|
||||
created_at: number
|
||||
}
|
||||
|
||||
function loadTokenDataFromDb(): TokenUsageRecord[] {
|
||||
try {
|
||||
const db = getDatabase()
|
||||
const rows = db.prepare(`
|
||||
SELECT id, model, session_id, input_tokens, output_tokens, created_at
|
||||
FROM token_usage
|
||||
ORDER BY created_at DESC, id DESC
|
||||
LIMIT 10000
|
||||
`).all() as DbTokenUsageRow[]
|
||||
|
||||
return rows.map((row) => {
|
||||
const totalTokens = row.input_tokens + row.output_tokens
|
||||
const costPer1k = getModelCost(row.model)
|
||||
return {
|
||||
id: `db-${row.id}`,
|
||||
model: row.model,
|
||||
sessionId: row.session_id,
|
||||
agentName: extractAgentName(row.session_id),
|
||||
timestamp: row.created_at * 1000,
|
||||
inputTokens: row.input_tokens,
|
||||
outputTokens: row.output_tokens,
|
||||
totalTokens,
|
||||
cost: (totalTokens / 1000) * costPer1k,
|
||||
operation: 'heartbeat',
|
||||
}
|
||||
})
|
||||
} catch (error) {
|
||||
logger.warn({ err: error }, 'Failed to load token usage from database')
|
||||
return []
|
||||
}
|
||||
}
|
||||
|
||||
function normalizeTokenRecord(record: Partial<TokenUsageRecord>): TokenUsageRecord | null {
|
||||
if (!record.model || !record.sessionId) return null
|
||||
const inputTokens = Number(record.inputTokens ?? 0)
|
||||
const outputTokens = Number(record.outputTokens ?? 0)
|
||||
const totalTokens = Number(record.totalTokens ?? inputTokens + outputTokens)
|
||||
const model = String(record.model)
|
||||
return {
|
||||
id: String(record.id ?? `${Date.now()}-${Math.random().toString(36).slice(2, 11)}`),
|
||||
model,
|
||||
sessionId: String(record.sessionId),
|
||||
agentName: String(record.agentName ?? extractAgentName(String(record.sessionId))),
|
||||
timestamp: Number(record.timestamp ?? Date.now()),
|
||||
inputTokens,
|
||||
outputTokens,
|
||||
totalTokens,
|
||||
cost: Number(record.cost ?? (totalTokens / 1000) * getModelCost(model)),
|
||||
operation: String(record.operation ?? 'chat_completion'),
|
||||
duration: record.duration,
|
||||
}
|
||||
}
|
||||
|
||||
function dedupeTokenRecords(records: TokenUsageRecord[]): TokenUsageRecord[] {
|
||||
const seen = new Set<string>()
|
||||
const deduped: TokenUsageRecord[] = []
|
||||
|
||||
for (const record of records) {
|
||||
const key = [
|
||||
record.sessionId,
|
||||
record.model,
|
||||
record.timestamp,
|
||||
record.inputTokens,
|
||||
record.outputTokens,
|
||||
record.totalTokens,
|
||||
record.operation,
|
||||
record.duration ?? '',
|
||||
].join('|')
|
||||
if (seen.has(key)) continue
|
||||
seen.add(key)
|
||||
deduped.push(record)
|
||||
}
|
||||
|
||||
return deduped
|
||||
}
|
||||
|
||||
async function loadTokenDataFromFile(): Promise<TokenUsageRecord[]> {
|
||||
try {
|
||||
ensureDirExists(dirname(DATA_PATH))
|
||||
await access(DATA_PATH)
|
||||
const data = await readFile(DATA_PATH, 'utf-8')
|
||||
const records = JSON.parse(data)
|
||||
if (Array.isArray(records) && records.length > 0) {
|
||||
return records
|
||||
}
|
||||
const parsed = JSON.parse(data)
|
||||
if (!Array.isArray(parsed)) return []
|
||||
|
||||
return parsed
|
||||
.map((record: Partial<TokenUsageRecord>) => normalizeTokenRecord(record))
|
||||
.filter((record): record is TokenUsageRecord => record !== null)
|
||||
} catch {
|
||||
// File doesn't exist or is empty — derive from sessions
|
||||
return []
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Load token data from persistent file, falling back to deriving from session stores.
|
||||
*/
|
||||
async function loadTokenData(): Promise<TokenUsageRecord[]> {
|
||||
const dbRecords = loadTokenDataFromDb()
|
||||
const fileRecords = await loadTokenDataFromFile()
|
||||
const combined = dedupeTokenRecords([...dbRecords, ...fileRecords]).sort((a, b) => b.timestamp - a.timestamp)
|
||||
if (combined.length > 0) {
|
||||
return combined
|
||||
}
|
||||
|
||||
// Derive token usage from session stores
|
||||
// Final fallback: derive from in-memory sessions
|
||||
return deriveFromSessions()
|
||||
}
|
||||
|
||||
|
|
@ -103,6 +207,7 @@ function deriveFromSessions(): TokenUsageRecord[] {
|
|||
id: `session-${session.agent}-${session.key}`,
|
||||
model: session.model || 'unknown',
|
||||
sessionId: `${session.agent}:${session.chatType}`,
|
||||
agentName: session.agent || 'unknown',
|
||||
timestamp: session.updatedAt,
|
||||
inputTokens,
|
||||
outputTokens,
|
||||
|
|
@ -218,7 +323,7 @@ export async function GET(request: NextRequest) {
|
|||
|
||||
// Agent aggregation: extract agent name from sessionId (format: "agentName:chatType")
|
||||
const agentGroups = filteredData.reduce((acc, record) => {
|
||||
const agent = record.sessionId.split(':')[0] || 'unknown'
|
||||
const agent = record.agentName || extractAgentName(record.sessionId)
|
||||
if (!acc[agent]) acc[agent] = []
|
||||
acc[agent].push(record)
|
||||
return acc
|
||||
|
|
@ -241,7 +346,7 @@ export async function GET(request: NextRequest) {
|
|||
|
||||
if (action === 'agent-costs') {
|
||||
const agentGroups = filteredData.reduce((acc, record) => {
|
||||
const agent = record.sessionId.split(':')[0] || 'unknown'
|
||||
const agent = record.agentName || extractAgentName(record.sessionId)
|
||||
if (!acc[agent]) acc[agent] = []
|
||||
acc[agent].push(record)
|
||||
return acc
|
||||
|
|
@ -327,12 +432,13 @@ export async function GET(request: NextRequest) {
|
|||
}
|
||||
|
||||
if (format === 'csv') {
|
||||
const headers = ['timestamp', 'model', 'sessionId', 'operation', 'inputTokens', 'outputTokens', 'totalTokens', 'cost', 'duration']
|
||||
const headers = ['timestamp', 'agentName', 'model', 'sessionId', 'operation', 'inputTokens', 'outputTokens', 'totalTokens', 'cost', 'duration']
|
||||
const csvRows = [headers.join(',')]
|
||||
|
||||
filteredData.forEach(record => {
|
||||
csvRows.push([
|
||||
new Date(record.timestamp).toISOString(),
|
||||
record.agentName,
|
||||
record.model,
|
||||
record.sessionId,
|
||||
record.operation,
|
||||
|
|
@ -411,6 +517,7 @@ export async function POST(request: NextRequest) {
|
|||
id: `${Date.now()}-${Math.random().toString(36).substr(2, 9)}`,
|
||||
model,
|
||||
sessionId,
|
||||
agentName: extractAgentName(sessionId),
|
||||
timestamp: Date.now(),
|
||||
inputTokens,
|
||||
outputTokens,
|
||||
|
|
@ -420,7 +527,8 @@ export async function POST(request: NextRequest) {
|
|||
duration,
|
||||
}
|
||||
|
||||
const existingData = await loadTokenData()
|
||||
// Persist only manually posted usage records in the JSON file.
|
||||
const existingData = await loadTokenDataFromFile()
|
||||
existingData.unshift(record)
|
||||
|
||||
if (existingData.length > 10000) {
|
||||
|
|
|
|||
|
|
@ -108,6 +108,14 @@ test.describe('Direct CLI Integration', () => {
|
|||
const hbBody = await hbRes.json()
|
||||
expect(hbBody.token_recorded).toBe(true)
|
||||
expect(hbBody.agent).toBe(agentName)
|
||||
|
||||
const costsRes = await request.get('/api/tokens?action=agent-costs&timeframe=hour', {
|
||||
headers: API_KEY_HEADER,
|
||||
})
|
||||
expect(costsRes.status()).toBe(200)
|
||||
const costsBody = await costsRes.json()
|
||||
expect(costsBody.agents).toHaveProperty(agentName)
|
||||
expect(costsBody.agents[agentName].stats.totalTokens).toBeGreaterThanOrEqual(1500)
|
||||
})
|
||||
|
||||
test('DELETE /api/connect disconnects and sets agent offline', async ({ request }) => {
|
||||
|
|
|
|||
Loading…
Reference in New Issue