Cherry-picks three valuable fixes from @doanbactam's WebSocket refactor PR: 1. Feed item ID collision fix — prefix log IDs with 'log-' to avoid React key collisions with activity IDs in the combined feed 2. Jittered reconnect backoff — add random jitter (0-50% of base) to WebSocket exponential backoff to prevent thundering-herd reconnects when multiple tabs reconnect after a server restart 3. Cron job deduplication + async I/O — deduplicate jobs.json entries by name (keeps latest), prevent duplicates on add, and convert sync file reads/writes to async to avoid blocking the event loop Co-authored-by: Doan Bac Tam <24356000+doanbactam@users.noreply.github.com>
This commit is contained in:
parent
55fdb45e53
commit
304a9b3194
|
|
@ -2,7 +2,7 @@ import { NextRequest, NextResponse } from 'next/server'
|
|||
import { requireRole } from '@/lib/auth'
|
||||
import { config } from '@/lib/config'
|
||||
import { logger } from '@/lib/logger'
|
||||
import fs from 'node:fs'
|
||||
import { readFile, writeFile } from 'node:fs/promises'
|
||||
import path from 'node:path'
|
||||
|
||||
interface CronJob {
|
||||
|
|
@ -72,22 +72,22 @@ function getCronFilePath(): string {
|
|||
return path.join(openclawHome, 'cron', 'jobs.json')
|
||||
}
|
||||
|
||||
function loadCronFile(): OpenClawCronFile | null {
|
||||
async function loadCronFile(): Promise<OpenClawCronFile | null> {
|
||||
const filePath = getCronFilePath()
|
||||
if (!filePath) return null
|
||||
try {
|
||||
const raw = fs.readFileSync(filePath, 'utf-8')
|
||||
const raw = await readFile(filePath, 'utf-8')
|
||||
return JSON.parse(raw)
|
||||
} catch {
|
||||
return null
|
||||
}
|
||||
}
|
||||
|
||||
function saveCronFile(data: OpenClawCronFile): boolean {
|
||||
async function saveCronFile(data: OpenClawCronFile): Promise<boolean> {
|
||||
const filePath = getCronFilePath()
|
||||
if (!filePath) return false
|
||||
try {
|
||||
fs.writeFileSync(filePath, JSON.stringify(data, null, 2))
|
||||
await writeFile(filePath, JSON.stringify(data, null, 2))
|
||||
return true
|
||||
} catch (err) {
|
||||
logger.error({ err }, 'Failed to write cron file')
|
||||
|
|
@ -95,6 +95,18 @@ function saveCronFile(data: OpenClawCronFile): boolean {
|
|||
}
|
||||
}
|
||||
|
||||
/** Deduplicate jobs by name — keep the latest (by createdAtMs) per unique name */
|
||||
function deduplicateJobs(jobs: OpenClawCronJob[]): OpenClawCronJob[] {
|
||||
const latest = new Map<string, OpenClawCronJob>()
|
||||
for (const job of jobs) {
|
||||
const existing = latest.get(job.name)
|
||||
if (!existing || (job.createdAtMs ?? 0) > (existing.createdAtMs ?? 0)) {
|
||||
latest.set(job.name, job)
|
||||
}
|
||||
}
|
||||
return [...latest.values()]
|
||||
}
|
||||
|
||||
function mapLastStatus(status?: string): 'success' | 'error' | 'running' | undefined {
|
||||
if (!status) return undefined
|
||||
const s = status.toLowerCase()
|
||||
|
|
@ -140,12 +152,12 @@ export async function GET(request: NextRequest) {
|
|||
const action = searchParams.get('action')
|
||||
|
||||
if (action === 'list') {
|
||||
const cronFile = loadCronFile()
|
||||
const cronFile = await loadCronFile()
|
||||
if (!cronFile || !cronFile.jobs) {
|
||||
return NextResponse.json({ jobs: [] })
|
||||
}
|
||||
|
||||
const jobs = cronFile.jobs.map(mapOpenClawJob)
|
||||
const jobs = deduplicateJobs(cronFile.jobs).map(mapOpenClawJob)
|
||||
return NextResponse.json({ jobs })
|
||||
}
|
||||
|
||||
|
|
@ -156,7 +168,7 @@ export async function GET(request: NextRequest) {
|
|||
}
|
||||
|
||||
// Find the job to get its state info
|
||||
const cronFile = loadCronFile()
|
||||
const cronFile = await loadCronFile()
|
||||
const job = cronFile?.jobs.find(j => j.id === jobId || j.name === jobId)
|
||||
|
||||
const logs: Array<{ timestamp: number; message: string; level: string }> = []
|
||||
|
|
@ -209,7 +221,7 @@ export async function POST(request: NextRequest) {
|
|||
return NextResponse.json({ error: 'Job ID or name required' }, { status: 400 })
|
||||
}
|
||||
|
||||
const cronFile = loadCronFile()
|
||||
const cronFile = await loadCronFile()
|
||||
if (!cronFile) {
|
||||
return NextResponse.json({ error: 'Cron file not found' }, { status: 404 })
|
||||
}
|
||||
|
|
@ -222,7 +234,7 @@ export async function POST(request: NextRequest) {
|
|||
job.enabled = !job.enabled
|
||||
job.updatedAtMs = Date.now()
|
||||
|
||||
if (!saveCronFile(cronFile)) {
|
||||
if (!(await saveCronFile(cronFile))) {
|
||||
return NextResponse.json({ error: 'Failed to save cron file' }, { status: 500 })
|
||||
}
|
||||
|
||||
|
|
@ -242,7 +254,7 @@ export async function POST(request: NextRequest) {
|
|||
)
|
||||
}
|
||||
|
||||
const cronFile = loadCronFile()
|
||||
const cronFile = await loadCronFile()
|
||||
const job = cronFile?.jobs.find(j => j.id === id || j.name === id)
|
||||
if (!job) {
|
||||
return NextResponse.json({ error: 'Job not found' }, { status: 404 })
|
||||
|
|
@ -276,7 +288,7 @@ export async function POST(request: NextRequest) {
|
|||
return NextResponse.json({ error: 'Job ID or name required' }, { status: 400 })
|
||||
}
|
||||
|
||||
const cronFile = loadCronFile()
|
||||
const cronFile = await loadCronFile()
|
||||
if (!cronFile) {
|
||||
return NextResponse.json({ error: 'Cron file not found' }, { status: 404 })
|
||||
}
|
||||
|
|
@ -288,7 +300,7 @@ export async function POST(request: NextRequest) {
|
|||
|
||||
cronFile.jobs.splice(idx, 1)
|
||||
|
||||
if (!saveCronFile(cronFile)) {
|
||||
if (!(await saveCronFile(cronFile))) {
|
||||
return NextResponse.json({ error: 'Failed to save cron file' }, { status: 500 })
|
||||
}
|
||||
|
||||
|
|
@ -305,7 +317,10 @@ export async function POST(request: NextRequest) {
|
|||
)
|
||||
}
|
||||
|
||||
const cronFile = loadCronFile() || { version: 1, jobs: [] }
|
||||
const cronFile = (await loadCronFile()) || { version: 1, jobs: [] }
|
||||
|
||||
// Prevent duplicates: remove existing jobs with the same name
|
||||
cronFile.jobs = cronFile.jobs.filter(j => j.name !== name)
|
||||
|
||||
const newJob: OpenClawCronJob = {
|
||||
id: `mc-${Date.now().toString(36)}`,
|
||||
|
|
@ -330,7 +345,7 @@ export async function POST(request: NextRequest) {
|
|||
|
||||
cronFile.jobs.push(newJob)
|
||||
|
||||
if (!saveCronFile(cronFile)) {
|
||||
if (!(await saveCronFile(cronFile))) {
|
||||
return NextResponse.json({ error: 'Failed to save cron file' }, { status: 500 })
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@ export function LiveFeed() {
|
|||
// Combine logs and activities into a unified feed
|
||||
const feedItems = [
|
||||
...logs.slice(0, 30).map(log => ({
|
||||
id: log.id,
|
||||
id: `log-${log.id}`,
|
||||
type: 'log' as const,
|
||||
level: log.level,
|
||||
message: log.message,
|
||||
|
|
|
|||
|
|
@ -415,7 +415,8 @@ export function useWebSocket() {
|
|||
// Auto-reconnect logic with exponential backoff (uses ref to avoid stale closure)
|
||||
const attempts = reconnectAttemptsRef.current
|
||||
if (attempts < maxReconnectAttempts) {
|
||||
const timeout = Math.min(Math.pow(2, attempts) * 1000, 30000)
|
||||
const base = Math.min(Math.pow(2, attempts) * 1000, 30000)
|
||||
const timeout = Math.round(base + Math.random() * base * 0.5)
|
||||
console.log(`Reconnecting in ${timeout}ms... (attempt ${attempts + 1}/${maxReconnectAttempts})`)
|
||||
|
||||
reconnectAttemptsRef.current = attempts + 1
|
||||
|
|
|
|||
Loading…
Reference in New Issue