diff --git a/clavis/clavis-vault/SPEC-replication-async.md b/clavis/clavis-vault/SPEC-replication-async.md new file mode 100644 index 0000000..c51cbc0 --- /dev/null +++ b/clavis/clavis-vault/SPEC-replication-async.md @@ -0,0 +1,199 @@ +# Replication Design — Event-Driven Async (Commercial Only) + +## Core Principle: Trigger on Change, Not Time + +Polling every 30s is wasteful when vaults may go days without changes. +Replication fires **immediately** when a write happens, then goes idle. + +## Architecture + +### On Primary (Calgary) + +``` +Client Request → Primary Handler + ↓ + [1] Apply to local DB + [2] Mark entry dirty (replication_dirty = 1) + [3] Signal replication worker (non-blocking channel) + [4] Return success to client (don't wait) + ↓ + Replication Worker (event-driven, wakes on signal) + ↓ + POST dirty entries to Backup /api/replication/apply + ↓ + Clear dirty flag on ACK +``` + +**No polling. No timer. The worker sleeps until woken.** + +### Replication Worker + +```go +type ReplicationWorker struct { + db *lib.DB + config *ReplicationConfig + signal chan struct{} // Buffered channel (size 1) + pending map[int64]bool // Dedup in-memory + mu sync.Mutex +} + +func (w *ReplicationWorker) Signal() { + select { + case w.signal <- struct{}{}: + default: + // Already signaled, worker will pick up all dirty entries + } +} + +func (w *ReplicationWorker) Run(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case <-w.signal: + w.replicateBatch() + } + } +} + +func (w *ReplicationWorker) replicateBatch() { + // Get all dirty entries (could be 1 or many if burst) + entries, _ := lib.EntryListDirty(w.db, 100) + if len(entries) == 0 { + return + } + + // POST to backup + // Retry with backoff on failure + // Mark replicated on success +} +``` + +### Signal Flow + +```go +// In CreateEntry, UpdateEntry, DeleteEntry handlers: +func (h *Handlers) CreateEntry(...) { + // ... create entry ... + + // Commercial only: mark dirty and signal replicator + if edition.Current.Name() == "commercial" { + lib.EntryMarkDirty(h.db(r), entry.EntryID) + edition.SignalReplication() // Non-blocking + } + + // Return to client immediately +} +``` + +### On Backup (Zurich) + +Same as before: Read-only mode, applies replication pushes, rejects client writes. + +## Efficiency Gains + +| Metric | Polling (30s) | Event-Driven | +|--------|---------------|--------------| +| CPU wakeups/day | 2,880 | ~number of actual writes | +| Network requests/day | 2,880 | ~number of actual writes | +| Egress/day | High (always checking) | Low (only when data changes) | +| Latency | 0-30s | Immediate | + +For a vault with 10 writes/day: **288x fewer wakeups.** + +## Burst Handling + +If 50 entries change in a burst (e.g., batch import): +1. All 50 marked dirty +2. Worker wakes once +3. Sends all 50 in single batch +4. Goes back to sleep + +No 50 separate HTTP requests. + +## Failure & Retry + +```go +func replicateBatch() { + entries, _ := lib.EntryListDirty(db, 100) + + for attempt := 0; attempt < maxRetries; attempt++ { + err := postToBackup(entries) + if err == nil { + // Success: clear dirty flags + for _, e := range entries { + lib.EntryMarkReplicated(db, e.EntryID) + } + return + } + + // Failure: entries stay dirty, will be picked up next signal + // Backoff: 1s, 5s, 25s, 125s... + time.Sleep(time.Duration(math.Pow(5, attempt)) * time.Second) + } + + // Max retries exceeded: alert operator + edition.Current.AlertOperator(ctx, "replication_failed", + "Backup unreachable after retries", map[string]any{ + "count": len(entries), + "last_error": err.Error(), + }) +} +``` + +No persistent queue needed - dirty flags in SQLite are the queue. + +## Code Changes Required + +### 1. Signal Function (Commercial Only) + +```go +// edition/replication.go +var replicationSignal chan struct{} + +func SignalReplication() { + if replicationSignal != nil { + select { + case replicationSignal <- struct{}{}: + default: + } + } +} +``` + +### 2. Modified Handlers + +All write handlers need: +```go +if edition.Current.Name() == "commercial" { + lib.EntryMarkDirty(db, entryID) + edition.SignalReplication() +} +``` + +### 3. Remove Polling + +Delete the ticker from replication worker. Replace with `<-signal` only. + +## Resource Usage + +| Resource | Polling | Event-Driven | +|----------|---------|--------------| +| Goroutine | Always running | Running but blocked on channel (idle) | +| Memory | Minimal | Minimal (just channel + map) | +| CPU | 2,880 wakeups/day | #writes wakeups/day | +| Network | 2,880 requests/day | #writes requests/day | +| SQLite queries | 2,880/day | #writes/day | + +## Design Notes + +**No persistent queue needed** - the `replication_dirty` column IS the queue. +Worker crash? On restart, `EntryListDirty()` finds all pending work. + +**No timer needed** - Go channel with `select` is the most efficient wait mechanism. + +**Batching automatic** - Multiple signals while worker is busy? Channel size 1 means worker picks up ALL dirty entries on next iteration, not one-by-one. + +--- + +**This is the right design for low-resource, low-change vaults.** diff --git a/clavis/clavis-vault/edition/commercial.go b/clavis/clavis-vault/edition/commercial.go index 6c169b0..3b7d774 100644 --- a/clavis/clavis-vault/edition/commercial.go +++ b/clavis/clavis-vault/edition/commercial.go @@ -26,6 +26,7 @@ func init() { Current = &commercialEdition{name: "commercial"} SetCommercialConfig = setCommercialConfig StartReplication = startReplication + SignalReplication = signalReplication IsBackupMode = isBackupMode IsBackupRequest = isBackupRequest } diff --git a/clavis/clavis-vault/edition/community.go b/clavis/clavis-vault/edition/community.go index 9e70fc8..98b2f9e 100644 --- a/clavis/clavis-vault/edition/community.go +++ b/clavis/clavis-vault/edition/community.go @@ -24,6 +24,9 @@ func init() { StartReplication = func(ctx context.Context, dataDir string) { // No-op: replication not available in Community Edition } + SignalReplication = func() { + // No-op: replication not available in Community Edition + } } // communityEdition is the Community Edition implementation. diff --git a/clavis/clavis-vault/edition/edition.go b/clavis/clavis-vault/edition/edition.go index eb51a0d..c796cbe 100644 --- a/clavis/clavis-vault/edition/edition.go +++ b/clavis/clavis-vault/edition/edition.go @@ -63,6 +63,10 @@ var SetCommercialConfig func(cfg *CommercialConfig) // Stub here - actual implementation in commercial.go. var StartReplication func(ctx context.Context, dataDir string) +// SignalReplication wakes the replication worker (commercial only). +// Stub here - community edition does nothing. +var SignalReplication func() + // IsBackupMode returns false in community edition (always single-node). // Stub here - actual implementation in backup_mode.go for commercial builds. var IsBackupMode func() bool = func() bool { return false } diff --git a/clavis/clavis-vault/edition/replication.go b/clavis/clavis-vault/edition/replication.go index ab132a0..1b8cdff 100644 --- a/clavis/clavis-vault/edition/replication.go +++ b/clavis/clavis-vault/edition/replication.go @@ -3,7 +3,8 @@ // Package edition - Commercial replication implementation. // This file is built ONLY when the "commercial" build tag is specified. // -// Real-time replication to backup POPs (Calgary/Zurich). +// Event-driven async replication to backup POPs (Calgary/Zurich). +// No polling - worker sleeps until woken by signal. // Community Edition does not have replication functionality. // // This is PROPRIETARY code - part of Commercial Edition licensing. @@ -12,10 +13,26 @@ package edition import ( "context" "log" + "math" + "sync" "time" + + "github.com/johanj/clavitor/lib" ) -// startReplication begins the background replication goroutine. +// replicationWorker is the singleton event-driven replication worker. +var replicationWorker *ReplicationWorker + +// ReplicationWorker handles event-driven async replication. +// No polling - sleeps until SignalReplication() wakes it. +type ReplicationWorker struct { + db *lib.DB + config *ReplicationConfig + signal chan struct{} // Buffered (size 1), wakes worker + mu sync.Mutex +} + +// startReplication initializes and starts the replication worker. // Called at startup in commercial edition via StartReplication variable. func startReplication(ctx context.Context, dataDir string) { if globalConfig == nil || globalConfig.ReplicationConfig == nil || globalConfig.ReplicationConfig.PrimaryPOP == "" { @@ -23,35 +40,106 @@ func startReplication(ctx context.Context, dataDir string) { return } - log.Printf("Commercial edition: replication enabled to %s", globalConfig.ReplicationConfig.PrimaryPOP) + // Open DB for replication worker + dbPath := dataDir + "/clavitor-*.db" // TODO: proper vault lookup + db, err := lib.OpenDB(dbPath) + if err != nil { + log.Printf("Commercial edition: replication disabled (cannot open DB): %v", err) + return + } - go func() { - ticker := time.NewTicker(time.Duration(globalConfig.ReplicationConfig.PollInterval) * time.Second) - defer ticker.Stop() + replicationWorker = &ReplicationWorker{ + db: db, + config: globalConfig.ReplicationConfig, + signal: make(chan struct{}, 1), + } - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - if err := replicateBatch(ctx, dataDir); err != nil { - log.Printf("replication error: %v", err) - // Alert operator on repeated failures - // TODO: Track consecutive failures, alert after threshold - } - } - } - }() + log.Printf("Commercial edition: event-driven replication enabled to %s", replicationWorker.config.PrimaryPOP) + + go replicationWorker.Run(ctx) } -// replicateBatch sends unreplicated entries to backup POP. -func replicateBatch(ctx context.Context, dataDir string) error { - // Implementation TBD - stub for now - // 1. Open DB - // 2. Call lib.EntryListUnreplicated() - // 3. Encrypt/encode entries - // 4. POST to backup POP - // 5. Mark replicated with lib.EntryMarkReplicated() +// signalReplication wakes the replication worker (non-blocking). +// Called by write handlers after marking entries dirty. +// Assigned to edition.SignalReplication in commercial builds. +func signalReplication() { + if replicationWorker == nil { + return + } + select { + case replicationWorker.signal <- struct{}{}: + // Signaled successfully + default: + // Already signaled - worker will process all dirty entries on next iteration + } +} + +// Run is the main event loop. Sleeps on signal channel. +func (w *ReplicationWorker) Run(ctx context.Context) { + for { + select { + case <-ctx.Done(): + w.db.Close() + return + case <-w.signal: + w.replicateWithRetry(ctx) + } + } +} + +// replicateWithRetry attempts replication with exponential backoff. +func (w *ReplicationWorker) replicateWithRetry(ctx context.Context) { + maxRetries := 5 + + for attempt := 0; attempt < maxRetries; attempt++ { + if attempt > 0 { + // Exponential backoff: 1s, 5s, 25s, 125s, 625s (~10min max) + backoff := time.Duration(math.Pow(5, float64(attempt-1))) * time.Second + if backoff > 10*time.Minute { + backoff = 10 * time.Minute + } + time.Sleep(backoff) + } + + err := w.replicateBatch() + if err == nil { + return // Success + } + + log.Printf("Replication attempt %d failed: %v", attempt+1, err) + + // Check for context cancellation + select { + case <-ctx.Done(): + return + default: + } + } + + // Max retries exceeded - alert operator + Current.AlertOperator(ctx, "replication_failed", + "Backup POP unreachable after max retries", map[string]any{ + "backup_pop": w.config.PrimaryPOP, + "retries": maxRetries, + }) +} + +// replicateBatch sends all dirty entries to backup POP. +func (w *ReplicationWorker) replicateBatch() error { + // Get up to batch size dirty entries + entries, err := lib.EntryListDirty(w.db, w.config.BatchSize) + if err != nil { + return err + } + if len(entries) == 0 { + return nil // Nothing to replicate + } + + // TODO: POST to backup POP + // TODO: On success, mark all replicated + // TODO: On failure, entries stay dirty for retry + + log.Printf("Replicating %d entries to %s", len(entries), w.config.PrimaryPOP) return nil }