fix: reduce server memory footprint (#405)
* fix: reduce server memory footprint across session parsing, caching, and rate limiting - Stream-parse Claude JSONL session files instead of loading entire files into memory - Add 50MB file size cap to skip oversized session transcripts - Add 30s TTL cache to getAllGatewaySessions() to avoid redundant disk reads per scheduler tick - Cap rate limiter maps at 10,000 entries with oldest-first eviction - Add request.signal abort listener to SSE route for defense-in-depth cleanup - Add test for rate limiter maxEntries eviction behavior * fix: address audit findings across all memory-fix files claude-sessions.ts: - Wrap readline loop in try/finally to ensure rl.close() on error paths - Guard statSync in file loop to handle files deleted between readdir and stat - Fix variable shadowing: rename inner `now` to `nowSec` to avoid confusion - Update throttle timestamp on empty scan results (prevents repeated disk scans) - Skip sidechain-only sessions (zero user+assistant messages) sessions.ts: - Decouple `active` flag from cache — compute at read time to prevent stale data - Remove activeWithinMs from cache key to eliminate cache thrashing between callers using different windows (Infinity vs default) - Add invalidateSessionCache() and call it after pruneGatewaySessionsOlderThan events/route.ts: - Null out cleanup after first call to prevent double-invoke - Remove unnecessary `if (request.signal)` guard (always defined on NextRequest) rate-limit.test.ts: - Rewrite eviction test with maxRequests=1 to actually prove eviction occurred - Add assertion that non-evicted entries remain tracked
This commit is contained in:
parent
b6717b8bf8
commit
e7aa7e6a91
|
|
@ -57,11 +57,22 @@ export async function GET(request: NextRequest) {
|
|||
},
|
||||
|
||||
cancel() {
|
||||
// Client disconnected
|
||||
if (cleanup) cleanup()
|
||||
if (cleanup) {
|
||||
cleanup()
|
||||
cleanup = null
|
||||
}
|
||||
},
|
||||
})
|
||||
|
||||
// Defense-in-depth: if the request is aborted (proxy timeout, network drop)
|
||||
// ensure we clean up the event listener even if cancel() doesn't fire.
|
||||
request.signal.addEventListener('abort', () => {
|
||||
if (cleanup) {
|
||||
cleanup()
|
||||
cleanup = null
|
||||
}
|
||||
}, { once: true })
|
||||
|
||||
return new Response(stream, {
|
||||
headers: {
|
||||
'Content-Type': 'text/event-stream',
|
||||
|
|
|
|||
|
|
@ -72,4 +72,29 @@ describe('createRateLimiter', () => {
|
|||
// Second IP now blocked
|
||||
expect(limiter(makeRequest('10.0.0.2'))).not.toBeNull()
|
||||
})
|
||||
|
||||
it('evicts oldest entry when maxEntries is reached', () => {
|
||||
const limiter = createRateLimiter({ windowMs: 60_000, maxRequests: 1, maxEntries: 3 })
|
||||
|
||||
// Fill to capacity and exhaust 10.0.0.1's quota
|
||||
limiter(makeRequest('10.0.0.1'))
|
||||
vi.advanceTimersByTime(1)
|
||||
expect(limiter(makeRequest('10.0.0.1'))).not.toBeNull() // blocked: quota consumed
|
||||
|
||||
limiter(makeRequest('10.0.0.2'))
|
||||
vi.advanceTimersByTime(1)
|
||||
limiter(makeRequest('10.0.0.3'))
|
||||
vi.advanceTimersByTime(1)
|
||||
|
||||
// Store has 3 entries (A blocked, B, C). Adding D evicts A (oldest resetAt)
|
||||
limiter(makeRequest('10.0.0.4'))
|
||||
|
||||
// 10.0.0.1 was evicted — counter is gone, this is allowed (fresh entry)
|
||||
expect(limiter(makeRequest('10.0.0.1'))).toBeNull()
|
||||
// Now 10.0.0.1 is back at count=1 and blocked
|
||||
expect(limiter(makeRequest('10.0.0.1'))).not.toBeNull()
|
||||
|
||||
// 10.0.0.3 should still be tracked (not evicted — it had the newest resetAt)
|
||||
expect(limiter(makeRequest('10.0.0.3'))).not.toBeNull()
|
||||
})
|
||||
})
|
||||
|
|
|
|||
|
|
@ -12,12 +12,16 @@
|
|||
* - Activity status (active if last message < 5 minutes ago)
|
||||
*/
|
||||
|
||||
import { readdirSync, readFileSync, statSync } from 'fs'
|
||||
import { createReadStream, readdirSync, statSync } from 'fs'
|
||||
import { createInterface } from 'readline'
|
||||
import { join } from 'path'
|
||||
import { config } from './config'
|
||||
import { getDatabase } from './db'
|
||||
import { logger } from './logger'
|
||||
|
||||
// Skip JSONL files larger than this to avoid excessive I/O
|
||||
const MAX_SESSION_FILE_BYTES = 50 * 1024 * 1024 // 50 MB
|
||||
|
||||
// Rough per-token pricing (USD) for cost estimation
|
||||
const MODEL_PRICING: Record<string, { input: number; output: number }> = {
|
||||
'claude-opus-4-6': { input: 15 / 1_000_000, output: 75 / 1_000_000 },
|
||||
|
|
@ -78,12 +82,12 @@ function clampTimestamp(ms: number): number {
|
|||
return ms
|
||||
}
|
||||
|
||||
function parseSessionFile(filePath: string, projectSlug: string, fileMtimeMs: number): SessionStats | null {
|
||||
async function parseSessionFile(filePath: string, projectSlug: string, fileMtimeMs: number, fileSizeBytes: number): Promise<SessionStats | null> {
|
||||
try {
|
||||
const content = readFileSync(filePath, 'utf-8')
|
||||
const lines = content.split('\n').filter(Boolean)
|
||||
|
||||
if (lines.length === 0) return null
|
||||
if (fileSizeBytes > MAX_SESSION_FILE_BYTES) {
|
||||
logger.warn({ filePath, fileSizeBytes }, 'Skipping oversized Claude session file')
|
||||
return null
|
||||
}
|
||||
|
||||
let sessionId: string | null = null
|
||||
let model: string | null = null
|
||||
|
|
@ -99,77 +103,80 @@ function parseSessionFile(filePath: string, projectSlug: string, fileMtimeMs: nu
|
|||
let firstMessageAt: string | null = null
|
||||
let lastMessageAt: string | null = null
|
||||
let lastUserPrompt: string | null = null
|
||||
let hasLines = false
|
||||
|
||||
for (const line of lines) {
|
||||
let entry: JSONLEntry
|
||||
try {
|
||||
entry = JSON.parse(line)
|
||||
} catch {
|
||||
continue
|
||||
}
|
||||
const rl = createInterface({
|
||||
input: createReadStream(filePath, { encoding: 'utf-8' }),
|
||||
crlfDelay: Infinity,
|
||||
})
|
||||
|
||||
// Extract session ID from first entry that has one
|
||||
if (!sessionId && entry.sessionId) {
|
||||
sessionId = entry.sessionId
|
||||
}
|
||||
try {
|
||||
for await (const line of rl) {
|
||||
if (!line) continue
|
||||
hasLines = true
|
||||
|
||||
// Extract git branch
|
||||
if (!gitBranch && entry.gitBranch) {
|
||||
gitBranch = entry.gitBranch
|
||||
}
|
||||
|
||||
// Extract project working directory
|
||||
if (!projectPath && entry.cwd) {
|
||||
projectPath = entry.cwd
|
||||
}
|
||||
|
||||
// Track timestamps
|
||||
if (entry.timestamp) {
|
||||
if (!firstMessageAt) firstMessageAt = entry.timestamp
|
||||
lastMessageAt = entry.timestamp
|
||||
}
|
||||
|
||||
// Skip sidechain messages (subagent work) for counts
|
||||
if (entry.isSidechain) continue
|
||||
|
||||
if (entry.type === 'user' && entry.message) {
|
||||
userMessages++
|
||||
// Extract last user prompt text
|
||||
const msg = entry.message
|
||||
if (typeof msg.content === 'string' && msg.content.length > 0) {
|
||||
lastUserPrompt = msg.content.slice(0, 500)
|
||||
}
|
||||
}
|
||||
|
||||
if (entry.type === 'assistant' && entry.message) {
|
||||
assistantMessages++
|
||||
|
||||
// Extract model
|
||||
if (entry.message.model) {
|
||||
model = entry.message.model
|
||||
let entry: JSONLEntry
|
||||
try {
|
||||
entry = JSON.parse(line)
|
||||
} catch {
|
||||
continue
|
||||
}
|
||||
|
||||
// Extract token usage
|
||||
const usage = entry.message.usage
|
||||
if (usage) {
|
||||
inputTokens += (usage.input_tokens || 0)
|
||||
cacheReadTokens += (usage.cache_read_input_tokens || 0)
|
||||
cacheCreationTokens += (usage.cache_creation_input_tokens || 0)
|
||||
outputTokens += (usage.output_tokens || 0)
|
||||
if (!sessionId && entry.sessionId) {
|
||||
sessionId = entry.sessionId
|
||||
}
|
||||
|
||||
// Count tool uses in assistant content
|
||||
if (Array.isArray(entry.message.content)) {
|
||||
for (const block of entry.message.content) {
|
||||
if (block.type === 'tool_use') toolUses++
|
||||
if (!gitBranch && entry.gitBranch) {
|
||||
gitBranch = entry.gitBranch
|
||||
}
|
||||
|
||||
if (!projectPath && entry.cwd) {
|
||||
projectPath = entry.cwd
|
||||
}
|
||||
|
||||
if (entry.timestamp) {
|
||||
if (!firstMessageAt) firstMessageAt = entry.timestamp
|
||||
lastMessageAt = entry.timestamp
|
||||
}
|
||||
|
||||
if (entry.isSidechain) continue
|
||||
|
||||
if (entry.type === 'user' && entry.message) {
|
||||
userMessages++
|
||||
const msg = entry.message
|
||||
if (typeof msg.content === 'string' && msg.content.length > 0) {
|
||||
lastUserPrompt = msg.content.slice(0, 500)
|
||||
}
|
||||
}
|
||||
|
||||
if (entry.type === 'assistant' && entry.message) {
|
||||
assistantMessages++
|
||||
|
||||
if (entry.message.model) {
|
||||
model = entry.message.model
|
||||
}
|
||||
|
||||
const usage = entry.message.usage
|
||||
if (usage) {
|
||||
inputTokens += (usage.input_tokens || 0)
|
||||
cacheReadTokens += (usage.cache_read_input_tokens || 0)
|
||||
cacheCreationTokens += (usage.cache_creation_input_tokens || 0)
|
||||
outputTokens += (usage.output_tokens || 0)
|
||||
}
|
||||
|
||||
if (Array.isArray(entry.message.content)) {
|
||||
for (const block of entry.message.content) {
|
||||
if (block.type === 'tool_use') toolUses++
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
rl.close()
|
||||
}
|
||||
|
||||
if (!sessionId) return null
|
||||
if (!hasLines || !sessionId || (userMessages === 0 && assistantMessages === 0)) return null
|
||||
|
||||
// Estimate cost (cache reads = 10% of input, cache creation = 125% of input)
|
||||
const pricing = (model && MODEL_PRICING[model]) || DEFAULT_PRICING
|
||||
const estimatedCost =
|
||||
inputTokens * pricing.input +
|
||||
|
|
@ -184,7 +191,6 @@ function parseSessionFile(filePath: string, projectSlug: string, fileMtimeMs: nu
|
|||
const effectiveFirstMs = parsedFirstMs || mtimeMs
|
||||
const isActive = effectiveLastMs > 0 && (Date.now() - effectiveLastMs) < ACTIVE_THRESHOLD_MS
|
||||
|
||||
// Store total input tokens (including cache) for display
|
||||
const totalInputTokens = inputTokens + cacheReadTokens + cacheCreationTokens
|
||||
|
||||
return {
|
||||
|
|
@ -211,7 +217,7 @@ function parseSessionFile(filePath: string, projectSlug: string, fileMtimeMs: nu
|
|||
}
|
||||
|
||||
/** Scan all Claude Code projects and discover sessions */
|
||||
export function scanClaudeSessions(): SessionStats[] {
|
||||
export async function scanClaudeSessions(): Promise<SessionStats[]> {
|
||||
const claudeHome = config.claudeHome
|
||||
if (!claudeHome) return []
|
||||
|
||||
|
|
@ -236,7 +242,6 @@ export function scanClaudeSessions(): SessionStats[] {
|
|||
}
|
||||
if (!stat.isDirectory()) continue
|
||||
|
||||
// Find JSONL files in this project
|
||||
let files: string[]
|
||||
try {
|
||||
files = readdirSync(projectDir).filter(f => f.endsWith('.jsonl'))
|
||||
|
|
@ -246,7 +251,13 @@ export function scanClaudeSessions(): SessionStats[] {
|
|||
|
||||
for (const file of files) {
|
||||
const filePath = join(projectDir, file)
|
||||
const parsed = parseSessionFile(filePath, projectSlug, statSync(filePath).mtimeMs)
|
||||
let fileStat
|
||||
try {
|
||||
fileStat = statSync(filePath)
|
||||
} catch {
|
||||
continue // file disappeared between readdir and stat
|
||||
}
|
||||
const parsed = await parseSessionFile(filePath, projectSlug, fileStat.mtimeMs, fileStat.size)
|
||||
if (parsed) sessions.push(parsed)
|
||||
}
|
||||
}
|
||||
|
|
@ -261,18 +272,20 @@ const SYNC_THROTTLE_MS = 30_000
|
|||
|
||||
/** Scan and upsert sessions into the database (throttled to avoid repeated disk scans) */
|
||||
export async function syncClaudeSessions(force = false): Promise<{ ok: boolean; message: string }> {
|
||||
const now = Date.now()
|
||||
if (!force && lastSyncAt > 0 && (now - lastSyncAt) < SYNC_THROTTLE_MS) {
|
||||
const nowMs = Date.now()
|
||||
if (!force && lastSyncAt > 0 && (nowMs - lastSyncAt) < SYNC_THROTTLE_MS) {
|
||||
return lastSyncResult
|
||||
}
|
||||
try {
|
||||
const sessions = scanClaudeSessions()
|
||||
const sessions = await scanClaudeSessions()
|
||||
if (sessions.length === 0) {
|
||||
return { ok: true, message: 'No Claude sessions found' }
|
||||
lastSyncAt = Date.now()
|
||||
lastSyncResult = { ok: true, message: 'No Claude sessions found' }
|
||||
return lastSyncResult
|
||||
}
|
||||
|
||||
const db = getDatabase()
|
||||
const now = Math.floor(Date.now() / 1000)
|
||||
const nowSec = Math.floor(Date.now() / 1000)
|
||||
|
||||
const upsert = db.prepare(`
|
||||
INSERT INTO claude_sessions (
|
||||
|
|
@ -309,7 +322,7 @@ export async function syncClaudeSessions(force = false): Promise<{ ok: boolean;
|
|||
s.userMessages, s.assistantMessages, s.toolUses,
|
||||
s.inputTokens, s.outputTokens, s.estimatedCost,
|
||||
s.firstMessageAt, s.lastMessageAt, s.lastUserPrompt,
|
||||
s.isActive ? 1 : 0, now, now,
|
||||
s.isActive ? 1 : 0, nowSec, nowSec,
|
||||
)
|
||||
upserted++
|
||||
}
|
||||
|
|
|
|||
|
|
@ -12,6 +12,23 @@ interface RateLimiterOptions {
|
|||
message?: string
|
||||
/** If true, MC_DISABLE_RATE_LIMIT will not bypass this limiter */
|
||||
critical?: boolean
|
||||
/** Max entries in the backing map before evicting oldest (default: 10_000) */
|
||||
maxEntries?: number
|
||||
}
|
||||
|
||||
const DEFAULT_MAX_ENTRIES = 10_000
|
||||
|
||||
/** Evict the entry with the earliest resetAt when at capacity */
|
||||
function evictOldest(store: Map<string, RateLimitEntry>) {
|
||||
let oldestKey: string | null = null
|
||||
let oldestReset = Infinity
|
||||
for (const [key, entry] of store) {
|
||||
if (entry.resetAt < oldestReset) {
|
||||
oldestReset = entry.resetAt
|
||||
oldestKey = key
|
||||
}
|
||||
}
|
||||
if (oldestKey) store.delete(oldestKey)
|
||||
}
|
||||
|
||||
// Trusted proxy IPs (comma-separated). Only parse XFF when behind known proxies.
|
||||
|
|
@ -41,6 +58,7 @@ export function extractClientIp(request: Request): string {
|
|||
|
||||
export function createRateLimiter(options: RateLimiterOptions) {
|
||||
const store = new Map<string, RateLimitEntry>()
|
||||
const maxEntries = options.maxEntries ?? DEFAULT_MAX_ENTRIES
|
||||
|
||||
// Periodic cleanup every 60s
|
||||
const cleanupInterval = setInterval(() => {
|
||||
|
|
@ -60,6 +78,7 @@ export function createRateLimiter(options: RateLimiterOptions) {
|
|||
const entry = store.get(ip)
|
||||
|
||||
if (!entry || now > entry.resetAt) {
|
||||
if (!entry && store.size >= maxEntries) evictOldest(store)
|
||||
store.set(ip, { count: 1, resetAt: now + options.windowMs })
|
||||
return null
|
||||
}
|
||||
|
|
@ -113,6 +132,7 @@ export const heavyLimiter = createRateLimiter({
|
|||
*/
|
||||
export function createAgentRateLimiter(options: RateLimiterOptions) {
|
||||
const store = new Map<string, RateLimitEntry>()
|
||||
const maxEntries = options.maxEntries ?? DEFAULT_MAX_ENTRIES
|
||||
|
||||
const cleanupInterval = setInterval(() => {
|
||||
const now = Date.now()
|
||||
|
|
@ -131,6 +151,7 @@ export function createAgentRateLimiter(options: RateLimiterOptions) {
|
|||
const entry = store.get(key)
|
||||
|
||||
if (!entry || now > entry.resetAt) {
|
||||
if (!entry && store.size >= maxEntries) evictOldest(store)
|
||||
store.set(key, { count: 1, resetAt: now + options.windowMs })
|
||||
return null
|
||||
}
|
||||
|
|
|
|||
|
|
@ -45,6 +45,17 @@ function getGatewaySessionStoreFiles(): string[] {
|
|||
return files
|
||||
}
|
||||
|
||||
// TTL cache to avoid re-reading session files multiple times per scheduler tick.
|
||||
// Stores sessions without the `active` flag so the cache is independent of activeWithinMs.
|
||||
type RawSession = Omit<GatewaySession, 'active'>
|
||||
let _sessionCache: { data: RawSession[]; ts: number } | null = null
|
||||
const SESSION_CACHE_TTL_MS = 30_000
|
||||
|
||||
/** Invalidate the session cache (e.g. after pruning). */
|
||||
export function invalidateSessionCache(): void {
|
||||
_sessionCache = null
|
||||
}
|
||||
|
||||
/**
|
||||
* Read all sessions from OpenClaw agent session stores on disk.
|
||||
*
|
||||
|
|
@ -54,41 +65,49 @@ function getGatewaySessionStoreFiles(): string[] {
|
|||
* Each file is a JSON object keyed by session key (e.g. "agent:<agent>:main")
|
||||
* with session metadata as values.
|
||||
*/
|
||||
export function getAllGatewaySessions(activeWithinMs = 60 * 60 * 1000): GatewaySession[] {
|
||||
const sessions: GatewaySession[] = []
|
||||
export function getAllGatewaySessions(activeWithinMs = 60 * 60 * 1000, force = false): GatewaySession[] {
|
||||
const now = Date.now()
|
||||
for (const sessionsFile of getGatewaySessionStoreFiles()) {
|
||||
const agentName = path.basename(path.dirname(path.dirname(sessionsFile)))
|
||||
try {
|
||||
const raw = fs.readFileSync(sessionsFile, 'utf-8')
|
||||
const data = JSON.parse(raw)
|
||||
|
||||
for (const [key, entry] of Object.entries(data)) {
|
||||
const s = entry as Record<string, any>
|
||||
const updatedAt = s.updatedAt || 0
|
||||
sessions.push({
|
||||
key,
|
||||
agent: agentName,
|
||||
sessionId: s.sessionId || '',
|
||||
updatedAt,
|
||||
chatType: s.chatType || 'unknown',
|
||||
channel: s.deliveryContext?.channel || s.lastChannel || s.channel || '',
|
||||
model: typeof s.model === 'object' && s.model?.primary ? String(s.model.primary) : String(s.model || ''),
|
||||
totalTokens: s.totalTokens || 0,
|
||||
inputTokens: s.inputTokens || 0,
|
||||
outputTokens: s.outputTokens || 0,
|
||||
contextTokens: s.contextTokens || 0,
|
||||
active: (now - updatedAt) < activeWithinMs,
|
||||
})
|
||||
let raw: RawSession[]
|
||||
if (!force && _sessionCache && (now - _sessionCache.ts) < SESSION_CACHE_TTL_MS) {
|
||||
raw = _sessionCache.data
|
||||
} else {
|
||||
const sessions: RawSession[] = []
|
||||
for (const sessionsFile of getGatewaySessionStoreFiles()) {
|
||||
const agentName = path.basename(path.dirname(path.dirname(sessionsFile)))
|
||||
try {
|
||||
const fileContent = fs.readFileSync(sessionsFile, 'utf-8')
|
||||
const data = JSON.parse(fileContent)
|
||||
|
||||
for (const [key, entry] of Object.entries(data)) {
|
||||
const s = entry as Record<string, any>
|
||||
const updatedAt = s.updatedAt || 0
|
||||
sessions.push({
|
||||
key,
|
||||
agent: agentName,
|
||||
sessionId: s.sessionId || '',
|
||||
updatedAt,
|
||||
chatType: s.chatType || 'unknown',
|
||||
channel: s.deliveryContext?.channel || s.lastChannel || s.channel || '',
|
||||
model: typeof s.model === 'object' && s.model?.primary ? String(s.model.primary) : String(s.model || ''),
|
||||
totalTokens: s.totalTokens || 0,
|
||||
inputTokens: s.inputTokens || 0,
|
||||
outputTokens: s.outputTokens || 0,
|
||||
contextTokens: s.contextTokens || 0,
|
||||
})
|
||||
}
|
||||
} catch {
|
||||
// Skip agents without valid session files
|
||||
}
|
||||
} catch {
|
||||
// Skip agents without valid session files
|
||||
}
|
||||
|
||||
sessions.sort((a, b) => b.updatedAt - a.updatedAt)
|
||||
_sessionCache = { data: sessions, ts: Date.now() }
|
||||
raw = sessions
|
||||
}
|
||||
|
||||
// Sort by most recently updated first
|
||||
sessions.sort((a, b) => b.updatedAt - a.updatedAt)
|
||||
return sessions
|
||||
// Compute `active` at read time so it's always fresh regardless of cache age
|
||||
return raw.map(s => ({ ...s, active: (now - s.updatedAt) < activeWithinMs }))
|
||||
}
|
||||
|
||||
export function countStaleGatewaySessions(retentionDays: number): number {
|
||||
|
|
@ -146,6 +165,7 @@ export function pruneGatewaySessionsOlderThan(retentionDays: number): { deleted:
|
|||
}
|
||||
}
|
||||
|
||||
if (filesTouched > 0) invalidateSessionCache()
|
||||
return { deleted, filesTouched }
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue