feat: scope workflows and webhooks to workspace (#132)
This commit is contained in:
parent
3662ab0fe7
commit
57dee2094a
|
|
@ -12,6 +12,7 @@ export async function GET(request: NextRequest) {
|
|||
|
||||
try {
|
||||
const db = getDatabase()
|
||||
const workspaceId = auth.user.workspace_id ?? 1
|
||||
const { searchParams } = new URL(request.url)
|
||||
const webhookId = searchParams.get('webhook_id')
|
||||
const limit = Math.min(parseInt(searchParams.get('limit') || '50'), 200)
|
||||
|
|
@ -20,12 +21,13 @@ export async function GET(request: NextRequest) {
|
|||
let query = `
|
||||
SELECT wd.*, w.name as webhook_name, w.url as webhook_url
|
||||
FROM webhook_deliveries wd
|
||||
JOIN webhooks w ON wd.webhook_id = w.id
|
||||
JOIN webhooks w ON wd.webhook_id = w.id AND w.workspace_id = wd.workspace_id
|
||||
WHERE wd.workspace_id = ?
|
||||
`
|
||||
const params: any[] = []
|
||||
const params: any[] = [workspaceId]
|
||||
|
||||
if (webhookId) {
|
||||
query += ' WHERE wd.webhook_id = ?'
|
||||
query += ' AND wd.webhook_id = ?'
|
||||
params.push(webhookId)
|
||||
}
|
||||
|
||||
|
|
@ -35,10 +37,10 @@ export async function GET(request: NextRequest) {
|
|||
const deliveries = db.prepare(query).all(...params)
|
||||
|
||||
// Get total count
|
||||
let countQuery = 'SELECT COUNT(*) as count FROM webhook_deliveries'
|
||||
const countParams: any[] = []
|
||||
let countQuery = 'SELECT COUNT(*) as count FROM webhook_deliveries WHERE workspace_id = ?'
|
||||
const countParams: any[] = [workspaceId]
|
||||
if (webhookId) {
|
||||
countQuery += ' WHERE webhook_id = ?'
|
||||
countQuery += ' AND webhook_id = ?'
|
||||
countParams.push(webhookId)
|
||||
}
|
||||
const { count: total } = db.prepare(countQuery).get(...countParams) as { count: number }
|
||||
|
|
|
|||
|
|
@ -13,6 +13,7 @@ export async function POST(request: NextRequest) {
|
|||
|
||||
try {
|
||||
const db = getDatabase()
|
||||
const workspaceId = auth.user.workspace_id ?? 1
|
||||
const { delivery_id } = await request.json()
|
||||
|
||||
if (!delivery_id) {
|
||||
|
|
@ -21,11 +22,11 @@ export async function POST(request: NextRequest) {
|
|||
|
||||
const delivery = db.prepare(`
|
||||
SELECT wd.*, w.id as w_id, w.name as w_name, w.url as w_url, w.secret as w_secret,
|
||||
w.events as w_events, w.enabled as w_enabled
|
||||
w.events as w_events, w.enabled as w_enabled, w.workspace_id as w_workspace_id
|
||||
FROM webhook_deliveries wd
|
||||
JOIN webhooks w ON w.id = wd.webhook_id
|
||||
WHERE wd.id = ?
|
||||
`).get(delivery_id) as any
|
||||
JOIN webhooks w ON w.id = wd.webhook_id AND w.workspace_id = wd.workspace_id
|
||||
WHERE wd.id = ? AND wd.workspace_id = ?
|
||||
`).get(delivery_id, workspaceId) as any
|
||||
|
||||
if (!delivery) {
|
||||
return NextResponse.json({ error: 'Delivery not found' }, { status: 404 })
|
||||
|
|
@ -38,6 +39,7 @@ export async function POST(request: NextRequest) {
|
|||
secret: delivery.w_secret,
|
||||
events: delivery.w_events,
|
||||
enabled: delivery.w_enabled,
|
||||
workspace_id: delivery.w_workspace_id,
|
||||
}
|
||||
|
||||
// Parse the original payload
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
import { NextRequest, NextResponse } from 'next/server'
|
||||
import { getDatabase } from '@/lib/db'
|
||||
import { requireRole } from '@/lib/auth'
|
||||
import { randomBytes, createHmac } from 'crypto'
|
||||
import { randomBytes } from 'crypto'
|
||||
import { mutationLimiter } from '@/lib/rate-limit'
|
||||
import { logger } from '@/lib/logger'
|
||||
import { validateBody, createWebhookSchema } from '@/lib/validation'
|
||||
|
|
@ -15,14 +15,16 @@ export async function GET(request: NextRequest) {
|
|||
|
||||
try {
|
||||
const db = getDatabase()
|
||||
const workspaceId = auth.user.workspace_id ?? 1
|
||||
const webhooks = db.prepare(`
|
||||
SELECT w.*,
|
||||
(SELECT COUNT(*) FROM webhook_deliveries wd WHERE wd.webhook_id = w.id) as total_deliveries,
|
||||
(SELECT COUNT(*) FROM webhook_deliveries wd WHERE wd.webhook_id = w.id AND wd.status_code BETWEEN 200 AND 299) as successful_deliveries,
|
||||
(SELECT COUNT(*) FROM webhook_deliveries wd WHERE wd.webhook_id = w.id AND (wd.error IS NOT NULL OR wd.status_code NOT BETWEEN 200 AND 299)) as failed_deliveries
|
||||
(SELECT COUNT(*) FROM webhook_deliveries wd WHERE wd.webhook_id = w.id AND wd.workspace_id = w.workspace_id) as total_deliveries,
|
||||
(SELECT COUNT(*) FROM webhook_deliveries wd WHERE wd.webhook_id = w.id AND wd.workspace_id = w.workspace_id AND wd.status_code BETWEEN 200 AND 299) as successful_deliveries,
|
||||
(SELECT COUNT(*) FROM webhook_deliveries wd WHERE wd.webhook_id = w.id AND wd.workspace_id = w.workspace_id AND (wd.error IS NOT NULL OR wd.status_code NOT BETWEEN 200 AND 299)) as failed_deliveries
|
||||
FROM webhooks w
|
||||
WHERE w.workspace_id = ?
|
||||
ORDER BY w.created_at DESC
|
||||
`).all() as any[]
|
||||
`).all(workspaceId) as any[]
|
||||
|
||||
// Parse events JSON, mask secret, add circuit breaker status
|
||||
const maxRetries = parseInt(process.env.MC_WEBHOOK_MAX_RETRIES || '5', 10) || 5
|
||||
|
|
@ -54,6 +56,7 @@ export async function POST(request: NextRequest) {
|
|||
|
||||
try {
|
||||
const db = getDatabase()
|
||||
const workspaceId = auth.user.workspace_id ?? 1
|
||||
const validated = await validateBody(request, createWebhookSchema)
|
||||
if ('error' in validated) return validated.error
|
||||
const body = validated.data
|
||||
|
|
@ -63,9 +66,9 @@ export async function POST(request: NextRequest) {
|
|||
const eventsJson = JSON.stringify(events || ['*'])
|
||||
|
||||
const dbResult = db.prepare(`
|
||||
INSERT INTO webhooks (name, url, secret, events, created_by)
|
||||
VALUES (?, ?, ?, ?, ?)
|
||||
`).run(name, url, secret, eventsJson, auth.user.username)
|
||||
INSERT INTO webhooks (name, url, secret, events, created_by, workspace_id)
|
||||
VALUES (?, ?, ?, ?, ?, ?)
|
||||
`).run(name, url, secret, eventsJson, auth.user.username, workspaceId)
|
||||
|
||||
return NextResponse.json({
|
||||
id: dbResult.lastInsertRowid,
|
||||
|
|
@ -94,6 +97,7 @@ export async function PUT(request: NextRequest) {
|
|||
|
||||
try {
|
||||
const db = getDatabase()
|
||||
const workspaceId = auth.user.workspace_id ?? 1
|
||||
const body = await request.json()
|
||||
const { id, name, url, events, enabled, regenerate_secret, reset_circuit } = body
|
||||
|
||||
|
|
@ -101,7 +105,7 @@ export async function PUT(request: NextRequest) {
|
|||
return NextResponse.json({ error: 'Webhook ID is required' }, { status: 400 })
|
||||
}
|
||||
|
||||
const existing = db.prepare('SELECT * FROM webhooks WHERE id = ?').get(id) as any
|
||||
const existing = db.prepare('SELECT * FROM webhooks WHERE id = ? AND workspace_id = ?').get(id, workspaceId) as any
|
||||
if (!existing) {
|
||||
return NextResponse.json({ error: 'Webhook not found' }, { status: 404 })
|
||||
}
|
||||
|
|
@ -133,8 +137,8 @@ export async function PUT(request: NextRequest) {
|
|||
params.push(newSecret)
|
||||
}
|
||||
|
||||
params.push(id)
|
||||
db.prepare(`UPDATE webhooks SET ${updates.join(', ')} WHERE id = ?`).run(...params)
|
||||
params.push(id, workspaceId)
|
||||
db.prepare(`UPDATE webhooks SET ${updates.join(', ')} WHERE id = ? AND workspace_id = ?`).run(...params)
|
||||
|
||||
return NextResponse.json({
|
||||
success: true,
|
||||
|
|
@ -158,6 +162,7 @@ export async function DELETE(request: NextRequest) {
|
|||
|
||||
try {
|
||||
const db = getDatabase()
|
||||
const workspaceId = auth.user.workspace_id ?? 1
|
||||
let body: any
|
||||
try { body = await request.json() } catch { return NextResponse.json({ error: 'Request body required' }, { status: 400 }) }
|
||||
const id = body.id
|
||||
|
|
@ -167,8 +172,8 @@ export async function DELETE(request: NextRequest) {
|
|||
}
|
||||
|
||||
// Delete deliveries first (cascade should handle it, but be explicit)
|
||||
db.prepare('DELETE FROM webhook_deliveries WHERE webhook_id = ?').run(id)
|
||||
const result = db.prepare('DELETE FROM webhooks WHERE id = ?').run(id)
|
||||
db.prepare('DELETE FROM webhook_deliveries WHERE webhook_id = ? AND workspace_id = ?').run(id, workspaceId)
|
||||
const result = db.prepare('DELETE FROM webhooks WHERE id = ? AND workspace_id = ?').run(id, workspaceId)
|
||||
|
||||
if (result.changes === 0) {
|
||||
return NextResponse.json({ error: 'Webhook not found' }, { status: 404 })
|
||||
|
|
|
|||
|
|
@ -13,13 +13,14 @@ export async function POST(request: NextRequest) {
|
|||
|
||||
try {
|
||||
const db = getDatabase()
|
||||
const workspaceId = auth.user.workspace_id ?? 1
|
||||
const { id } = await request.json()
|
||||
|
||||
if (!id) {
|
||||
return NextResponse.json({ error: 'Webhook ID is required' }, { status: 400 })
|
||||
}
|
||||
|
||||
const webhook = db.prepare('SELECT * FROM webhooks WHERE id = ?').get(id) as any
|
||||
const webhook = db.prepare('SELECT * FROM webhooks WHERE id = ? AND workspace_id = ?').get(id, workspaceId) as any
|
||||
if (!webhook) {
|
||||
return NextResponse.json({ error: 'Webhook not found' }, { status: 404 })
|
||||
}
|
||||
|
|
|
|||
|
|
@ -30,7 +30,10 @@ export async function GET(request: NextRequest) {
|
|||
|
||||
try {
|
||||
const db = getDatabase()
|
||||
const templates = db.prepare('SELECT * FROM workflow_templates ORDER BY use_count DESC, updated_at DESC').all() as WorkflowTemplate[]
|
||||
const workspaceId = auth.user.workspace_id ?? 1
|
||||
const templates = db
|
||||
.prepare('SELECT * FROM workflow_templates WHERE workspace_id = ? ORDER BY use_count DESC, updated_at DESC')
|
||||
.all(workspaceId) as WorkflowTemplate[]
|
||||
|
||||
const parsed = templates.map(t => ({
|
||||
...t,
|
||||
|
|
@ -61,15 +64,36 @@ export async function POST(request: NextRequest) {
|
|||
|
||||
const db = getDatabase()
|
||||
const user = auth.user
|
||||
const workspaceId = auth.user.workspace_id ?? 1
|
||||
|
||||
const insertResult = db.prepare(`
|
||||
INSERT INTO workflow_templates (name, description, model, task_prompt, timeout_seconds, agent_role, tags, created_by)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
||||
`).run(name, description || null, model, task_prompt, timeout_seconds, agent_role || null, JSON.stringify(tags), user?.username || 'system')
|
||||
INSERT INTO workflow_templates (name, description, model, task_prompt, timeout_seconds, agent_role, tags, created_by, workspace_id)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
`).run(
|
||||
name,
|
||||
description || null,
|
||||
model,
|
||||
task_prompt,
|
||||
timeout_seconds,
|
||||
agent_role || null,
|
||||
JSON.stringify(tags),
|
||||
user?.username || 'system',
|
||||
workspaceId
|
||||
)
|
||||
|
||||
const template = db.prepare('SELECT * FROM workflow_templates WHERE id = ?').get(insertResult.lastInsertRowid) as WorkflowTemplate
|
||||
const template = db
|
||||
.prepare('SELECT * FROM workflow_templates WHERE id = ? AND workspace_id = ?')
|
||||
.get(insertResult.lastInsertRowid, workspaceId) as WorkflowTemplate
|
||||
|
||||
db_helpers.logActivity('workflow_created', 'workflow', Number(insertResult.lastInsertRowid), user?.username || 'system', `Created workflow template: ${name}`)
|
||||
db_helpers.logActivity(
|
||||
'workflow_created',
|
||||
'workflow',
|
||||
Number(insertResult.lastInsertRowid),
|
||||
user?.username || 'system',
|
||||
`Created workflow template: ${name}`,
|
||||
undefined,
|
||||
workspaceId
|
||||
)
|
||||
|
||||
return NextResponse.json({
|
||||
template: { ...template, tags: template.tags ? JSON.parse(template.tags) : [] }
|
||||
|
|
@ -89,6 +113,7 @@ export async function PUT(request: NextRequest) {
|
|||
|
||||
try {
|
||||
const db = getDatabase()
|
||||
const workspaceId = auth.user.workspace_id ?? 1
|
||||
const body = await request.json()
|
||||
const { id, ...updates } = body
|
||||
|
||||
|
|
@ -96,7 +121,9 @@ export async function PUT(request: NextRequest) {
|
|||
return NextResponse.json({ error: 'Template ID is required' }, { status: 400 })
|
||||
}
|
||||
|
||||
const existing = db.prepare('SELECT * FROM workflow_templates WHERE id = ?').get(id) as WorkflowTemplate
|
||||
const existing = db
|
||||
.prepare('SELECT * FROM workflow_templates WHERE id = ? AND workspace_id = ?')
|
||||
.get(id, workspaceId) as WorkflowTemplate
|
||||
if (!existing) {
|
||||
return NextResponse.json({ error: 'Template not found' }, { status: 404 })
|
||||
}
|
||||
|
|
@ -121,11 +148,13 @@ export async function PUT(request: NextRequest) {
|
|||
|
||||
fields.push('updated_at = ?')
|
||||
params.push(Math.floor(Date.now() / 1000))
|
||||
params.push(id)
|
||||
params.push(id, workspaceId)
|
||||
|
||||
db.prepare(`UPDATE workflow_templates SET ${fields.join(', ')} WHERE id = ?`).run(...params)
|
||||
db.prepare(`UPDATE workflow_templates SET ${fields.join(', ')} WHERE id = ? AND workspace_id = ?`).run(...params)
|
||||
|
||||
const updated = db.prepare('SELECT * FROM workflow_templates WHERE id = ?').get(id) as WorkflowTemplate
|
||||
const updated = db
|
||||
.prepare('SELECT * FROM workflow_templates WHERE id = ? AND workspace_id = ?')
|
||||
.get(id, workspaceId) as WorkflowTemplate
|
||||
return NextResponse.json({ template: { ...updated, tags: updated.tags ? JSON.parse(updated.tags) : [] } })
|
||||
} catch (error) {
|
||||
logger.error({ err: error }, 'PUT /api/workflows error')
|
||||
|
|
@ -142,6 +171,7 @@ export async function DELETE(request: NextRequest) {
|
|||
|
||||
try {
|
||||
const db = getDatabase()
|
||||
const workspaceId = auth.user.workspace_id ?? 1
|
||||
let body: any
|
||||
try { body = await request.json() } catch { return NextResponse.json({ error: 'Request body required' }, { status: 400 }) }
|
||||
const id = body.id
|
||||
|
|
@ -150,7 +180,10 @@ export async function DELETE(request: NextRequest) {
|
|||
return NextResponse.json({ error: 'Template ID is required' }, { status: 400 })
|
||||
}
|
||||
|
||||
db.prepare('DELETE FROM workflow_templates WHERE id = ?').run(parseInt(id))
|
||||
const result = db.prepare('DELETE FROM workflow_templates WHERE id = ? AND workspace_id = ?').run(parseInt(id), workspaceId)
|
||||
if (result.changes === 0) {
|
||||
return NextResponse.json({ error: 'Template not found' }, { status: 404 })
|
||||
}
|
||||
return NextResponse.json({ success: true })
|
||||
} catch (error) {
|
||||
logger.error({ err: error }, 'DELETE /api/workflows error')
|
||||
|
|
|
|||
|
|
@ -643,6 +643,39 @@ const migrations: Migration[] = [
|
|||
db.exec(`CREATE INDEX IF NOT EXISTS idx_workflow_pipelines_workspace_id ON workflow_pipelines(workspace_id)`)
|
||||
db.exec(`CREATE INDEX IF NOT EXISTS idx_pipeline_runs_workspace_id ON pipeline_runs(workspace_id)`)
|
||||
}
|
||||
},
|
||||
{
|
||||
id: '023_workspace_isolation_phase3',
|
||||
up: (db) => {
|
||||
const addWorkspaceIdColumn = (table: string) => {
|
||||
const tableExists = db
|
||||
.prepare(`SELECT 1 as ok FROM sqlite_master WHERE type = 'table' AND name = ?`)
|
||||
.get(table) as { ok?: number } | undefined
|
||||
if (!tableExists?.ok) return
|
||||
|
||||
const cols = db.prepare(`PRAGMA table_info(${table})`).all() as Array<{ name: string }>
|
||||
if (!cols.some((c) => c.name === 'workspace_id')) {
|
||||
db.exec(`ALTER TABLE ${table} ADD COLUMN workspace_id INTEGER NOT NULL DEFAULT 1`)
|
||||
}
|
||||
db.exec(`UPDATE ${table} SET workspace_id = COALESCE(workspace_id, 1)`)
|
||||
}
|
||||
|
||||
const scopedTables = [
|
||||
'workflow_templates',
|
||||
'webhooks',
|
||||
'webhook_deliveries',
|
||||
'token_usage',
|
||||
]
|
||||
|
||||
for (const table of scopedTables) {
|
||||
addWorkspaceIdColumn(table)
|
||||
}
|
||||
|
||||
db.exec(`CREATE INDEX IF NOT EXISTS idx_workflow_templates_workspace_id ON workflow_templates(workspace_id)`)
|
||||
db.exec(`CREATE INDEX IF NOT EXISTS idx_webhooks_workspace_id ON webhooks(workspace_id)`)
|
||||
db.exec(`CREATE INDEX IF NOT EXISTS idx_webhook_deliveries_workspace_id ON webhook_deliveries(workspace_id)`)
|
||||
db.exec(`CREATE INDEX IF NOT EXISTS idx_token_usage_workspace_id ON token_usage(workspace_id)`)
|
||||
}
|
||||
}
|
||||
]
|
||||
|
||||
|
|
|
|||
|
|
@ -9,6 +9,7 @@ interface Webhook {
|
|||
secret: string | null
|
||||
events: string // JSON array
|
||||
enabled: number
|
||||
workspace_id?: number
|
||||
consecutive_failures?: number
|
||||
}
|
||||
|
||||
|
|
@ -103,13 +104,14 @@ export function initWebhookListener() {
|
|||
|
||||
// Also fire agent.error for error status specifically
|
||||
const isAgentError = event.type === 'agent.status_changed' && event.data?.status === 'error'
|
||||
const workspaceId = typeof event.data?.workspace_id === 'number' ? event.data.workspace_id : 1
|
||||
|
||||
fireWebhooksAsync(webhookEventType, event.data).catch((err) => {
|
||||
fireWebhooksAsync(webhookEventType, event.data, workspaceId).catch((err) => {
|
||||
logger.error({ err }, 'Webhook dispatch error')
|
||||
})
|
||||
|
||||
if (isAgentError) {
|
||||
fireWebhooksAsync('agent.error', event.data).catch((err) => {
|
||||
fireWebhooksAsync('agent.error', event.data, workspaceId).catch((err) => {
|
||||
logger.error({ err }, 'Webhook dispatch error')
|
||||
})
|
||||
}
|
||||
|
|
@ -119,21 +121,23 @@ export function initWebhookListener() {
|
|||
/**
|
||||
* Fire all matching webhooks for an event type (public for test endpoint).
|
||||
*/
|
||||
export function fireWebhooks(eventType: string, payload: Record<string, any>) {
|
||||
fireWebhooksAsync(eventType, payload).catch((err) => {
|
||||
export function fireWebhooks(eventType: string, payload: Record<string, any>, workspaceId?: number) {
|
||||
fireWebhooksAsync(eventType, payload, workspaceId).catch((err) => {
|
||||
logger.error({ err }, 'Webhook dispatch error')
|
||||
})
|
||||
}
|
||||
|
||||
async function fireWebhooksAsync(eventType: string, payload: Record<string, any>) {
|
||||
async function fireWebhooksAsync(eventType: string, payload: Record<string, any>, workspaceId?: number) {
|
||||
const resolvedWorkspaceId =
|
||||
workspaceId ?? (typeof payload?.workspace_id === 'number' ? payload.workspace_id : 1)
|
||||
let webhooks: Webhook[]
|
||||
try {
|
||||
// Lazy import to avoid circular dependency
|
||||
const { getDatabase } = await import('./db')
|
||||
const db = getDatabase()
|
||||
webhooks = db.prepare(
|
||||
'SELECT * FROM webhooks WHERE enabled = 1'
|
||||
).all() as Webhook[]
|
||||
'SELECT * FROM webhooks WHERE enabled = 1 AND workspace_id = ?'
|
||||
).all(resolvedWorkspaceId) as Webhook[]
|
||||
} catch {
|
||||
return // DB not ready or table doesn't exist yet
|
||||
}
|
||||
|
|
@ -229,8 +233,8 @@ async function deliverWebhook(
|
|||
const db = getDatabase()
|
||||
|
||||
const insertResult = db.prepare(`
|
||||
INSERT INTO webhook_deliveries (webhook_id, event_type, payload, status_code, response_body, error, duration_ms, attempt, is_retry, parent_delivery_id)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
INSERT INTO webhook_deliveries (webhook_id, event_type, payload, status_code, response_body, error, duration_ms, attempt, is_retry, parent_delivery_id, workspace_id)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
`).run(
|
||||
webhook.id,
|
||||
eventType,
|
||||
|
|
@ -241,24 +245,25 @@ async function deliverWebhook(
|
|||
durationMs,
|
||||
attempt,
|
||||
attempt > 0 ? 1 : 0,
|
||||
parentDeliveryId
|
||||
parentDeliveryId,
|
||||
webhook.workspace_id ?? 1
|
||||
)
|
||||
deliveryId = Number(insertResult.lastInsertRowid)
|
||||
|
||||
// Update webhook last_fired
|
||||
db.prepare(`
|
||||
UPDATE webhooks SET last_fired_at = unixepoch(), last_status = ?, updated_at = unixepoch()
|
||||
WHERE id = ?
|
||||
`).run(statusCode ?? -1, webhook.id)
|
||||
WHERE id = ? AND workspace_id = ?
|
||||
`).run(statusCode ?? -1, webhook.id, webhook.workspace_id ?? 1)
|
||||
|
||||
// Circuit breaker + retry scheduling (skip for test deliveries)
|
||||
if (allowRetry) {
|
||||
if (success) {
|
||||
// Reset consecutive failures on success
|
||||
db.prepare(`UPDATE webhooks SET consecutive_failures = 0 WHERE id = ?`).run(webhook.id)
|
||||
db.prepare(`UPDATE webhooks SET consecutive_failures = 0 WHERE id = ? AND workspace_id = ?`).run(webhook.id, webhook.workspace_id ?? 1)
|
||||
} else {
|
||||
// Increment consecutive failures
|
||||
db.prepare(`UPDATE webhooks SET consecutive_failures = consecutive_failures + 1 WHERE id = ?`).run(webhook.id)
|
||||
db.prepare(`UPDATE webhooks SET consecutive_failures = consecutive_failures + 1 WHERE id = ? AND workspace_id = ?`).run(webhook.id, webhook.workspace_id ?? 1)
|
||||
|
||||
if (attempt < MAX_RETRIES - 1) {
|
||||
// Schedule retry
|
||||
|
|
@ -267,9 +272,9 @@ async function deliverWebhook(
|
|||
db.prepare(`UPDATE webhook_deliveries SET next_retry_at = ? WHERE id = ?`).run(nextRetryAt, deliveryId)
|
||||
} else {
|
||||
// Exhausted retries — trip circuit breaker
|
||||
const wh = db.prepare(`SELECT consecutive_failures FROM webhooks WHERE id = ?`).get(webhook.id) as { consecutive_failures: number } | undefined
|
||||
const wh = db.prepare(`SELECT consecutive_failures FROM webhooks WHERE id = ? AND workspace_id = ?`).get(webhook.id, webhook.workspace_id ?? 1) as { consecutive_failures: number } | undefined
|
||||
if (wh && wh.consecutive_failures >= MAX_RETRIES) {
|
||||
db.prepare(`UPDATE webhooks SET enabled = 0, updated_at = unixepoch() WHERE id = ?`).run(webhook.id)
|
||||
db.prepare(`UPDATE webhooks SET enabled = 0, updated_at = unixepoch() WHERE id = ? AND workspace_id = ?`).run(webhook.id, webhook.workspace_id ?? 1)
|
||||
logger.warn({ webhookId: webhook.id, name: webhook.name }, 'Webhook circuit breaker tripped — disabled after exhausting retries')
|
||||
}
|
||||
}
|
||||
|
|
@ -279,10 +284,10 @@ async function deliverWebhook(
|
|||
// Prune old deliveries (keep last 200 per webhook)
|
||||
db.prepare(`
|
||||
DELETE FROM webhook_deliveries
|
||||
WHERE webhook_id = ? AND id NOT IN (
|
||||
SELECT id FROM webhook_deliveries WHERE webhook_id = ? ORDER BY created_at DESC LIMIT 200
|
||||
WHERE webhook_id = ? AND workspace_id = ? AND id NOT IN (
|
||||
SELECT id FROM webhook_deliveries WHERE webhook_id = ? AND workspace_id = ? ORDER BY created_at DESC LIMIT 200
|
||||
)
|
||||
`).run(webhook.id, webhook.id)
|
||||
`).run(webhook.id, webhook.workspace_id ?? 1, webhook.id, webhook.workspace_id ?? 1)
|
||||
} catch (logErr) {
|
||||
logger.error({ err: logErr, webhookId: webhook.id }, 'Webhook delivery logging/pruning failed')
|
||||
}
|
||||
|
|
@ -304,15 +309,16 @@ export async function processWebhookRetries(): Promise<{ ok: boolean; message: s
|
|||
const pendingRetries = db.prepare(`
|
||||
SELECT wd.id, wd.webhook_id, wd.event_type, wd.payload, wd.attempt,
|
||||
w.id as w_id, w.name as w_name, w.url as w_url, w.secret as w_secret,
|
||||
w.events as w_events, w.enabled as w_enabled, w.consecutive_failures as w_consecutive_failures
|
||||
w.events as w_events, w.enabled as w_enabled, w.consecutive_failures as w_consecutive_failures,
|
||||
wd.workspace_id as wd_workspace_id
|
||||
FROM webhook_deliveries wd
|
||||
JOIN webhooks w ON w.id = wd.webhook_id AND w.enabled = 1
|
||||
JOIN webhooks w ON w.id = wd.webhook_id AND w.workspace_id = wd.workspace_id AND w.enabled = 1
|
||||
WHERE wd.next_retry_at IS NOT NULL AND wd.next_retry_at <= ?
|
||||
LIMIT 50
|
||||
`).all(now) as Array<{
|
||||
id: number; webhook_id: number; event_type: string; payload: string; attempt: number
|
||||
w_id: number; w_name: string; w_url: string; w_secret: string | null
|
||||
w_events: string; w_enabled: number; w_consecutive_failures: number
|
||||
w_events: string; w_enabled: number; w_consecutive_failures: number; wd_workspace_id: number
|
||||
}>
|
||||
|
||||
if (pendingRetries.length === 0) {
|
||||
|
|
@ -320,9 +326,9 @@ export async function processWebhookRetries(): Promise<{ ok: boolean; message: s
|
|||
}
|
||||
|
||||
// Clear next_retry_at immediately to prevent double-processing
|
||||
const clearStmt = db.prepare(`UPDATE webhook_deliveries SET next_retry_at = NULL WHERE id = ?`)
|
||||
const clearStmt = db.prepare(`UPDATE webhook_deliveries SET next_retry_at = NULL WHERE id = ? AND workspace_id = ?`)
|
||||
for (const row of pendingRetries) {
|
||||
clearStmt.run(row.id)
|
||||
clearStmt.run(row.id, row.wd_workspace_id)
|
||||
}
|
||||
|
||||
// Re-deliver each
|
||||
|
|
@ -337,6 +343,7 @@ export async function processWebhookRetries(): Promise<{ ok: boolean; message: s
|
|||
events: row.w_events,
|
||||
enabled: row.w_enabled,
|
||||
consecutive_failures: row.w_consecutive_failures,
|
||||
workspace_id: row.wd_workspace_id,
|
||||
}
|
||||
|
||||
// Parse the original payload from the stored JSON body
|
||||
|
|
|
|||
|
|
@ -6,7 +6,9 @@ import { test, expect } from '@playwright/test'
|
|||
*/
|
||||
|
||||
test.describe('CSRF Origin Validation (Issue #20)', () => {
|
||||
const TEST_PASS = 'testpass1234!'
|
||||
const TEST_USER = process.env.AUTH_USER || 'testadmin'
|
||||
const TEST_PASS = process.env.AUTH_PASS || 'testpass1234!'
|
||||
const TEST_API_KEY = process.env.API_KEY || 'test-api-key-e2e-12345'
|
||||
|
||||
test('POST with mismatched Origin is rejected', async ({ request }) => {
|
||||
const res = await request.post('/api/auth/login', {
|
||||
|
|
@ -23,7 +25,7 @@ test.describe('CSRF Origin Validation (Issue #20)', () => {
|
|||
|
||||
test('POST with matching Origin is allowed', async ({ request }) => {
|
||||
const res = await request.post('/api/auth/login', {
|
||||
data: { username: 'testadmin', password: TEST_PASS },
|
||||
data: { username: TEST_USER, password: TEST_PASS },
|
||||
headers: {
|
||||
'origin': 'http://127.0.0.1:3005',
|
||||
'host': '127.0.0.1:3005'
|
||||
|
|
@ -35,7 +37,7 @@ test.describe('CSRF Origin Validation (Issue #20)', () => {
|
|||
|
||||
test('POST without Origin header is allowed (non-browser client)', async ({ request }) => {
|
||||
const res = await request.post('/api/auth/login', {
|
||||
data: { username: 'testadmin', password: TEST_PASS },
|
||||
data: { username: TEST_USER, password: TEST_PASS },
|
||||
})
|
||||
// No Origin = non-browser client, should be allowed through CSRF check
|
||||
expect(res.status()).not.toBe(403)
|
||||
|
|
@ -45,7 +47,7 @@ test.describe('CSRF Origin Validation (Issue #20)', () => {
|
|||
const res = await request.get('/api/agents', {
|
||||
headers: {
|
||||
'origin': 'https://evil.example.com',
|
||||
'x-api-key': 'test-api-key-e2e-12345'
|
||||
'x-api-key': TEST_API_KEY
|
||||
}
|
||||
})
|
||||
// GET is exempt from CSRF — should not be 403
|
||||
|
|
|
|||
|
|
@ -6,7 +6,25 @@ import { test, expect } from '@playwright/test'
|
|||
*/
|
||||
|
||||
test.describe('Login Flow', () => {
|
||||
const TEST_API_KEY = process.env.API_KEY || 'test-api-key-e2e-12345'
|
||||
const TEST_PASS = 'testpass1234!'
|
||||
const TEST_USER = `login-e2e-${Date.now()}`
|
||||
|
||||
test.beforeAll(async ({ request }) => {
|
||||
const createRes = await request.post('/api/auth/users', {
|
||||
data: {
|
||||
username: TEST_USER,
|
||||
password: TEST_PASS,
|
||||
display_name: 'Login E2E User',
|
||||
role: 'admin',
|
||||
},
|
||||
headers: {
|
||||
'x-api-key': TEST_API_KEY,
|
||||
},
|
||||
})
|
||||
|
||||
expect([201, 409]).toContain(createRes.status())
|
||||
})
|
||||
|
||||
test('login page loads', async ({ page }) => {
|
||||
await page.goto('/login')
|
||||
|
|
@ -20,7 +38,7 @@ test.describe('Login Flow', () => {
|
|||
|
||||
test('login API returns session cookie on success', async ({ request }) => {
|
||||
const res = await request.post('/api/auth/login', {
|
||||
data: { username: 'testadmin', password: TEST_PASS },
|
||||
data: { username: TEST_USER, password: TEST_PASS },
|
||||
headers: { 'x-forwarded-for': '10.88.88.1' }
|
||||
})
|
||||
expect(res.status()).toBe(200)
|
||||
|
|
@ -32,7 +50,7 @@ test.describe('Login Flow', () => {
|
|||
|
||||
test('login API rejects wrong password', async ({ request }) => {
|
||||
const res = await request.post('/api/auth/login', {
|
||||
data: { username: 'testadmin', password: 'wrongpassword' },
|
||||
data: { username: TEST_USER, password: 'wrongpassword' },
|
||||
headers: { 'x-forwarded-for': '10.77.77.77' }
|
||||
})
|
||||
expect(res.status()).toBe(401)
|
||||
|
|
@ -41,7 +59,7 @@ test.describe('Login Flow', () => {
|
|||
test('session cookie grants API access', async ({ request }) => {
|
||||
// Login to get a session
|
||||
const loginRes = await request.post('/api/auth/login', {
|
||||
data: { username: 'testadmin', password: TEST_PASS },
|
||||
data: { username: TEST_USER, password: TEST_PASS },
|
||||
headers: { 'x-forwarded-for': '10.88.88.2' }
|
||||
})
|
||||
expect(loginRes.status()).toBe(200)
|
||||
|
|
@ -58,6 +76,6 @@ test.describe('Login Flow', () => {
|
|||
})
|
||||
expect(meRes.status()).toBe(200)
|
||||
const body = await meRes.json()
|
||||
expect(body.user?.username).toBe('testadmin')
|
||||
expect(body.user?.username).toBe(TEST_USER)
|
||||
})
|
||||
})
|
||||
|
|
|
|||
|
|
@ -6,7 +6,8 @@ import { test, expect } from '@playwright/test'
|
|||
*/
|
||||
|
||||
test.describe('Login Rate Limiting (Issue #8)', () => {
|
||||
const TEST_PASS = 'testpass1234!'
|
||||
const TEST_USER = process.env.AUTH_USER || 'testadmin'
|
||||
const TEST_PASS = process.env.AUTH_PASS || 'testpass1234!'
|
||||
|
||||
test('blocks login after 5 rapid failed attempts', async ({ request }) => {
|
||||
const results: number[] = []
|
||||
|
|
@ -14,7 +15,7 @@ test.describe('Login Rate Limiting (Issue #8)', () => {
|
|||
// Send 7 rapid login attempts with wrong password
|
||||
for (let i = 0; i < 7; i++) {
|
||||
const res = await request.post('/api/auth/login', {
|
||||
data: { username: 'testadmin', password: 'wrongpassword' },
|
||||
data: { username: TEST_USER, password: 'wrongpassword' },
|
||||
headers: { 'x-real-ip': '10.99.99.99' }
|
||||
})
|
||||
results.push(res.status())
|
||||
|
|
@ -27,7 +28,7 @@ test.describe('Login Rate Limiting (Issue #8)', () => {
|
|||
|
||||
test('successful login is not blocked for fresh IP', async ({ request }) => {
|
||||
const res = await request.post('/api/auth/login', {
|
||||
data: { username: 'testadmin', password: TEST_PASS },
|
||||
data: { username: TEST_USER, password: TEST_PASS },
|
||||
headers: { 'x-real-ip': '10.88.88.88' }
|
||||
})
|
||||
// Should succeed (200) or at least not be rate limited
|
||||
|
|
|
|||
Loading…
Reference in New Issue