From e7aa7e6a917715e52b40f1d5012f1402f6a5209d Mon Sep 17 00:00:00 2001 From: nyk <93952610+0xNyk@users.noreply.github.com> Date: Mon, 16 Mar 2026 11:30:02 +0700 Subject: [PATCH] fix: reduce server memory footprint (#405) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix: reduce server memory footprint across session parsing, caching, and rate limiting - Stream-parse Claude JSONL session files instead of loading entire files into memory - Add 50MB file size cap to skip oversized session transcripts - Add 30s TTL cache to getAllGatewaySessions() to avoid redundant disk reads per scheduler tick - Cap rate limiter maps at 10,000 entries with oldest-first eviction - Add request.signal abort listener to SSE route for defense-in-depth cleanup - Add test for rate limiter maxEntries eviction behavior * fix: address audit findings across all memory-fix files claude-sessions.ts: - Wrap readline loop in try/finally to ensure rl.close() on error paths - Guard statSync in file loop to handle files deleted between readdir and stat - Fix variable shadowing: rename inner `now` to `nowSec` to avoid confusion - Update throttle timestamp on empty scan results (prevents repeated disk scans) - Skip sidechain-only sessions (zero user+assistant messages) sessions.ts: - Decouple `active` flag from cache — compute at read time to prevent stale data - Remove activeWithinMs from cache key to eliminate cache thrashing between callers using different windows (Infinity vs default) - Add invalidateSessionCache() and call it after pruneGatewaySessionsOlderThan events/route.ts: - Null out cleanup after first call to prevent double-invoke - Remove unnecessary `if (request.signal)` guard (always defined on NextRequest) rate-limit.test.ts: - Rewrite eviction test with maxRequests=1 to actually prove eviction occurred - Add assertion that non-evicted entries remain tracked --- src/app/api/events/route.ts | 15 ++- src/lib/__tests__/rate-limit.test.ts | 25 +++++ src/lib/claude-sessions.ts | 161 +++++++++++++++------------ src/lib/rate-limit.ts | 21 ++++ src/lib/sessions.ts | 78 ++++++++----- 5 files changed, 195 insertions(+), 105 deletions(-) diff --git a/src/app/api/events/route.ts b/src/app/api/events/route.ts index 1e4c0a8..5554e61 100644 --- a/src/app/api/events/route.ts +++ b/src/app/api/events/route.ts @@ -57,11 +57,22 @@ export async function GET(request: NextRequest) { }, cancel() { - // Client disconnected - if (cleanup) cleanup() + if (cleanup) { + cleanup() + cleanup = null + } }, }) + // Defense-in-depth: if the request is aborted (proxy timeout, network drop) + // ensure we clean up the event listener even if cancel() doesn't fire. + request.signal.addEventListener('abort', () => { + if (cleanup) { + cleanup() + cleanup = null + } + }, { once: true }) + return new Response(stream, { headers: { 'Content-Type': 'text/event-stream', diff --git a/src/lib/__tests__/rate-limit.test.ts b/src/lib/__tests__/rate-limit.test.ts index 0c00437..83c5037 100644 --- a/src/lib/__tests__/rate-limit.test.ts +++ b/src/lib/__tests__/rate-limit.test.ts @@ -72,4 +72,29 @@ describe('createRateLimiter', () => { // Second IP now blocked expect(limiter(makeRequest('10.0.0.2'))).not.toBeNull() }) + + it('evicts oldest entry when maxEntries is reached', () => { + const limiter = createRateLimiter({ windowMs: 60_000, maxRequests: 1, maxEntries: 3 }) + + // Fill to capacity and exhaust 10.0.0.1's quota + limiter(makeRequest('10.0.0.1')) + vi.advanceTimersByTime(1) + expect(limiter(makeRequest('10.0.0.1'))).not.toBeNull() // blocked: quota consumed + + limiter(makeRequest('10.0.0.2')) + vi.advanceTimersByTime(1) + limiter(makeRequest('10.0.0.3')) + vi.advanceTimersByTime(1) + + // Store has 3 entries (A blocked, B, C). Adding D evicts A (oldest resetAt) + limiter(makeRequest('10.0.0.4')) + + // 10.0.0.1 was evicted — counter is gone, this is allowed (fresh entry) + expect(limiter(makeRequest('10.0.0.1'))).toBeNull() + // Now 10.0.0.1 is back at count=1 and blocked + expect(limiter(makeRequest('10.0.0.1'))).not.toBeNull() + + // 10.0.0.3 should still be tracked (not evicted — it had the newest resetAt) + expect(limiter(makeRequest('10.0.0.3'))).not.toBeNull() + }) }) diff --git a/src/lib/claude-sessions.ts b/src/lib/claude-sessions.ts index 1c18a48..38ab4d1 100644 --- a/src/lib/claude-sessions.ts +++ b/src/lib/claude-sessions.ts @@ -12,12 +12,16 @@ * - Activity status (active if last message < 5 minutes ago) */ -import { readdirSync, readFileSync, statSync } from 'fs' +import { createReadStream, readdirSync, statSync } from 'fs' +import { createInterface } from 'readline' import { join } from 'path' import { config } from './config' import { getDatabase } from './db' import { logger } from './logger' +// Skip JSONL files larger than this to avoid excessive I/O +const MAX_SESSION_FILE_BYTES = 50 * 1024 * 1024 // 50 MB + // Rough per-token pricing (USD) for cost estimation const MODEL_PRICING: Record = { 'claude-opus-4-6': { input: 15 / 1_000_000, output: 75 / 1_000_000 }, @@ -78,12 +82,12 @@ function clampTimestamp(ms: number): number { return ms } -function parseSessionFile(filePath: string, projectSlug: string, fileMtimeMs: number): SessionStats | null { +async function parseSessionFile(filePath: string, projectSlug: string, fileMtimeMs: number, fileSizeBytes: number): Promise { try { - const content = readFileSync(filePath, 'utf-8') - const lines = content.split('\n').filter(Boolean) - - if (lines.length === 0) return null + if (fileSizeBytes > MAX_SESSION_FILE_BYTES) { + logger.warn({ filePath, fileSizeBytes }, 'Skipping oversized Claude session file') + return null + } let sessionId: string | null = null let model: string | null = null @@ -99,77 +103,80 @@ function parseSessionFile(filePath: string, projectSlug: string, fileMtimeMs: nu let firstMessageAt: string | null = null let lastMessageAt: string | null = null let lastUserPrompt: string | null = null + let hasLines = false - for (const line of lines) { - let entry: JSONLEntry - try { - entry = JSON.parse(line) - } catch { - continue - } + const rl = createInterface({ + input: createReadStream(filePath, { encoding: 'utf-8' }), + crlfDelay: Infinity, + }) - // Extract session ID from first entry that has one - if (!sessionId && entry.sessionId) { - sessionId = entry.sessionId - } + try { + for await (const line of rl) { + if (!line) continue + hasLines = true - // Extract git branch - if (!gitBranch && entry.gitBranch) { - gitBranch = entry.gitBranch - } - - // Extract project working directory - if (!projectPath && entry.cwd) { - projectPath = entry.cwd - } - - // Track timestamps - if (entry.timestamp) { - if (!firstMessageAt) firstMessageAt = entry.timestamp - lastMessageAt = entry.timestamp - } - - // Skip sidechain messages (subagent work) for counts - if (entry.isSidechain) continue - - if (entry.type === 'user' && entry.message) { - userMessages++ - // Extract last user prompt text - const msg = entry.message - if (typeof msg.content === 'string' && msg.content.length > 0) { - lastUserPrompt = msg.content.slice(0, 500) - } - } - - if (entry.type === 'assistant' && entry.message) { - assistantMessages++ - - // Extract model - if (entry.message.model) { - model = entry.message.model + let entry: JSONLEntry + try { + entry = JSON.parse(line) + } catch { + continue } - // Extract token usage - const usage = entry.message.usage - if (usage) { - inputTokens += (usage.input_tokens || 0) - cacheReadTokens += (usage.cache_read_input_tokens || 0) - cacheCreationTokens += (usage.cache_creation_input_tokens || 0) - outputTokens += (usage.output_tokens || 0) + if (!sessionId && entry.sessionId) { + sessionId = entry.sessionId } - // Count tool uses in assistant content - if (Array.isArray(entry.message.content)) { - for (const block of entry.message.content) { - if (block.type === 'tool_use') toolUses++ + if (!gitBranch && entry.gitBranch) { + gitBranch = entry.gitBranch + } + + if (!projectPath && entry.cwd) { + projectPath = entry.cwd + } + + if (entry.timestamp) { + if (!firstMessageAt) firstMessageAt = entry.timestamp + lastMessageAt = entry.timestamp + } + + if (entry.isSidechain) continue + + if (entry.type === 'user' && entry.message) { + userMessages++ + const msg = entry.message + if (typeof msg.content === 'string' && msg.content.length > 0) { + lastUserPrompt = msg.content.slice(0, 500) + } + } + + if (entry.type === 'assistant' && entry.message) { + assistantMessages++ + + if (entry.message.model) { + model = entry.message.model + } + + const usage = entry.message.usage + if (usage) { + inputTokens += (usage.input_tokens || 0) + cacheReadTokens += (usage.cache_read_input_tokens || 0) + cacheCreationTokens += (usage.cache_creation_input_tokens || 0) + outputTokens += (usage.output_tokens || 0) + } + + if (Array.isArray(entry.message.content)) { + for (const block of entry.message.content) { + if (block.type === 'tool_use') toolUses++ + } } } } + } finally { + rl.close() } - if (!sessionId) return null + if (!hasLines || !sessionId || (userMessages === 0 && assistantMessages === 0)) return null - // Estimate cost (cache reads = 10% of input, cache creation = 125% of input) const pricing = (model && MODEL_PRICING[model]) || DEFAULT_PRICING const estimatedCost = inputTokens * pricing.input + @@ -184,7 +191,6 @@ function parseSessionFile(filePath: string, projectSlug: string, fileMtimeMs: nu const effectiveFirstMs = parsedFirstMs || mtimeMs const isActive = effectiveLastMs > 0 && (Date.now() - effectiveLastMs) < ACTIVE_THRESHOLD_MS - // Store total input tokens (including cache) for display const totalInputTokens = inputTokens + cacheReadTokens + cacheCreationTokens return { @@ -211,7 +217,7 @@ function parseSessionFile(filePath: string, projectSlug: string, fileMtimeMs: nu } /** Scan all Claude Code projects and discover sessions */ -export function scanClaudeSessions(): SessionStats[] { +export async function scanClaudeSessions(): Promise { const claudeHome = config.claudeHome if (!claudeHome) return [] @@ -236,7 +242,6 @@ export function scanClaudeSessions(): SessionStats[] { } if (!stat.isDirectory()) continue - // Find JSONL files in this project let files: string[] try { files = readdirSync(projectDir).filter(f => f.endsWith('.jsonl')) @@ -246,7 +251,13 @@ export function scanClaudeSessions(): SessionStats[] { for (const file of files) { const filePath = join(projectDir, file) - const parsed = parseSessionFile(filePath, projectSlug, statSync(filePath).mtimeMs) + let fileStat + try { + fileStat = statSync(filePath) + } catch { + continue // file disappeared between readdir and stat + } + const parsed = await parseSessionFile(filePath, projectSlug, fileStat.mtimeMs, fileStat.size) if (parsed) sessions.push(parsed) } } @@ -261,18 +272,20 @@ const SYNC_THROTTLE_MS = 30_000 /** Scan and upsert sessions into the database (throttled to avoid repeated disk scans) */ export async function syncClaudeSessions(force = false): Promise<{ ok: boolean; message: string }> { - const now = Date.now() - if (!force && lastSyncAt > 0 && (now - lastSyncAt) < SYNC_THROTTLE_MS) { + const nowMs = Date.now() + if (!force && lastSyncAt > 0 && (nowMs - lastSyncAt) < SYNC_THROTTLE_MS) { return lastSyncResult } try { - const sessions = scanClaudeSessions() + const sessions = await scanClaudeSessions() if (sessions.length === 0) { - return { ok: true, message: 'No Claude sessions found' } + lastSyncAt = Date.now() + lastSyncResult = { ok: true, message: 'No Claude sessions found' } + return lastSyncResult } const db = getDatabase() - const now = Math.floor(Date.now() / 1000) + const nowSec = Math.floor(Date.now() / 1000) const upsert = db.prepare(` INSERT INTO claude_sessions ( @@ -309,7 +322,7 @@ export async function syncClaudeSessions(force = false): Promise<{ ok: boolean; s.userMessages, s.assistantMessages, s.toolUses, s.inputTokens, s.outputTokens, s.estimatedCost, s.firstMessageAt, s.lastMessageAt, s.lastUserPrompt, - s.isActive ? 1 : 0, now, now, + s.isActive ? 1 : 0, nowSec, nowSec, ) upserted++ } diff --git a/src/lib/rate-limit.ts b/src/lib/rate-limit.ts index c561630..4abf32f 100644 --- a/src/lib/rate-limit.ts +++ b/src/lib/rate-limit.ts @@ -12,6 +12,23 @@ interface RateLimiterOptions { message?: string /** If true, MC_DISABLE_RATE_LIMIT will not bypass this limiter */ critical?: boolean + /** Max entries in the backing map before evicting oldest (default: 10_000) */ + maxEntries?: number +} + +const DEFAULT_MAX_ENTRIES = 10_000 + +/** Evict the entry with the earliest resetAt when at capacity */ +function evictOldest(store: Map) { + let oldestKey: string | null = null + let oldestReset = Infinity + for (const [key, entry] of store) { + if (entry.resetAt < oldestReset) { + oldestReset = entry.resetAt + oldestKey = key + } + } + if (oldestKey) store.delete(oldestKey) } // Trusted proxy IPs (comma-separated). Only parse XFF when behind known proxies. @@ -41,6 +58,7 @@ export function extractClientIp(request: Request): string { export function createRateLimiter(options: RateLimiterOptions) { const store = new Map() + const maxEntries = options.maxEntries ?? DEFAULT_MAX_ENTRIES // Periodic cleanup every 60s const cleanupInterval = setInterval(() => { @@ -60,6 +78,7 @@ export function createRateLimiter(options: RateLimiterOptions) { const entry = store.get(ip) if (!entry || now > entry.resetAt) { + if (!entry && store.size >= maxEntries) evictOldest(store) store.set(ip, { count: 1, resetAt: now + options.windowMs }) return null } @@ -113,6 +132,7 @@ export const heavyLimiter = createRateLimiter({ */ export function createAgentRateLimiter(options: RateLimiterOptions) { const store = new Map() + const maxEntries = options.maxEntries ?? DEFAULT_MAX_ENTRIES const cleanupInterval = setInterval(() => { const now = Date.now() @@ -131,6 +151,7 @@ export function createAgentRateLimiter(options: RateLimiterOptions) { const entry = store.get(key) if (!entry || now > entry.resetAt) { + if (!entry && store.size >= maxEntries) evictOldest(store) store.set(key, { count: 1, resetAt: now + options.windowMs }) return null } diff --git a/src/lib/sessions.ts b/src/lib/sessions.ts index 1fb440f..12a663c 100644 --- a/src/lib/sessions.ts +++ b/src/lib/sessions.ts @@ -45,6 +45,17 @@ function getGatewaySessionStoreFiles(): string[] { return files } +// TTL cache to avoid re-reading session files multiple times per scheduler tick. +// Stores sessions without the `active` flag so the cache is independent of activeWithinMs. +type RawSession = Omit +let _sessionCache: { data: RawSession[]; ts: number } | null = null +const SESSION_CACHE_TTL_MS = 30_000 + +/** Invalidate the session cache (e.g. after pruning). */ +export function invalidateSessionCache(): void { + _sessionCache = null +} + /** * Read all sessions from OpenClaw agent session stores on disk. * @@ -54,41 +65,49 @@ function getGatewaySessionStoreFiles(): string[] { * Each file is a JSON object keyed by session key (e.g. "agent::main") * with session metadata as values. */ -export function getAllGatewaySessions(activeWithinMs = 60 * 60 * 1000): GatewaySession[] { - const sessions: GatewaySession[] = [] +export function getAllGatewaySessions(activeWithinMs = 60 * 60 * 1000, force = false): GatewaySession[] { const now = Date.now() - for (const sessionsFile of getGatewaySessionStoreFiles()) { - const agentName = path.basename(path.dirname(path.dirname(sessionsFile))) - try { - const raw = fs.readFileSync(sessionsFile, 'utf-8') - const data = JSON.parse(raw) - for (const [key, entry] of Object.entries(data)) { - const s = entry as Record - const updatedAt = s.updatedAt || 0 - sessions.push({ - key, - agent: agentName, - sessionId: s.sessionId || '', - updatedAt, - chatType: s.chatType || 'unknown', - channel: s.deliveryContext?.channel || s.lastChannel || s.channel || '', - model: typeof s.model === 'object' && s.model?.primary ? String(s.model.primary) : String(s.model || ''), - totalTokens: s.totalTokens || 0, - inputTokens: s.inputTokens || 0, - outputTokens: s.outputTokens || 0, - contextTokens: s.contextTokens || 0, - active: (now - updatedAt) < activeWithinMs, - }) + let raw: RawSession[] + if (!force && _sessionCache && (now - _sessionCache.ts) < SESSION_CACHE_TTL_MS) { + raw = _sessionCache.data + } else { + const sessions: RawSession[] = [] + for (const sessionsFile of getGatewaySessionStoreFiles()) { + const agentName = path.basename(path.dirname(path.dirname(sessionsFile))) + try { + const fileContent = fs.readFileSync(sessionsFile, 'utf-8') + const data = JSON.parse(fileContent) + + for (const [key, entry] of Object.entries(data)) { + const s = entry as Record + const updatedAt = s.updatedAt || 0 + sessions.push({ + key, + agent: agentName, + sessionId: s.sessionId || '', + updatedAt, + chatType: s.chatType || 'unknown', + channel: s.deliveryContext?.channel || s.lastChannel || s.channel || '', + model: typeof s.model === 'object' && s.model?.primary ? String(s.model.primary) : String(s.model || ''), + totalTokens: s.totalTokens || 0, + inputTokens: s.inputTokens || 0, + outputTokens: s.outputTokens || 0, + contextTokens: s.contextTokens || 0, + }) + } + } catch { + // Skip agents without valid session files } - } catch { - // Skip agents without valid session files } + + sessions.sort((a, b) => b.updatedAt - a.updatedAt) + _sessionCache = { data: sessions, ts: Date.now() } + raw = sessions } - // Sort by most recently updated first - sessions.sort((a, b) => b.updatedAt - a.updatedAt) - return sessions + // Compute `active` at read time so it's always fresh regardless of cache age + return raw.map(s => ({ ...s, active: (now - s.updatedAt) < activeWithinMs })) } export function countStaleGatewaySessions(retentionDays: number): number { @@ -146,6 +165,7 @@ export function pruneGatewaySessionsOlderThan(retentionDays: number): { deleted: } } + if (filesTouched > 0) invalidateSessionCache() return { deleted, filesTouched } }