clavitor/clavis/clavis-vault/edition/replication.go

185 lines
5.1 KiB
Go

//go:build commercial
// Package edition - Commercial replication implementation.
// This file is built ONLY when the "commercial" build tag is specified.
//
// Event-driven async replication to backup POPs (Calgary/Zurich).
// No polling - worker sleeps until woken by signal.
// Community Edition does not have replication functionality.
//
// This is PROPRIETARY code - part of Commercial Edition licensing.
package edition
import (
"context"
"log"
"math"
"sync"
"time"
"github.com/johanj/clavitor/lib"
)
// replicationWorker is the singleton event-driven replication worker.
var replicationWorker *ReplicationWorker
// ReplicationWorker handles event-driven async replication.
// No polling - sleeps until SignalReplication() wakes it.
type ReplicationWorker struct {
db *lib.DB
config *ReplicationConfig
signal chan struct{} // Buffered (size 1), wakes worker
mu sync.Mutex
}
// startReplication initializes and starts the replication worker.
// Called at startup in commercial edition via StartReplication variable.
func startReplication(ctx context.Context, dataDir string) {
if globalConfig == nil || globalConfig.ReplicationConfig == nil {
log.Printf("Commercial edition: replication config missing")
return
}
cfg := globalConfig.ReplicationConfig
if cfg.Role != "primary" {
// Backup role doesn't replicate out (it receives)
log.Printf("Commercial edition: backup POP - replication receiver only")
return
}
if cfg.BackupPOP.URL == "" {
log.Printf("Commercial edition: primary role but no backup_pop configured")
return
}
// Open DB for replication worker
dbPath := dataDir + "/clavitor-*.db" // TODO: proper vault lookup
db, err := lib.OpenDB(dbPath)
if err != nil {
log.Printf("Commercial edition: replication disabled (cannot open DB): %v", err)
return
}
replicationWorker = &ReplicationWorker{
db: db,
config: globalConfig.ReplicationConfig,
signal: make(chan struct{}, 1),
}
log.Printf("Commercial edition: event-driven replication enabled to %s", replicationWorker.config.BackupPOP.URL)
go replicationWorker.Run(ctx)
}
// signalReplication wakes the replication worker (non-blocking).
// Called by write handlers after marking entries dirty.
// Assigned to edition.SignalReplication in commercial builds.
func signalReplication() {
if replicationWorker == nil {
return
}
select {
case replicationWorker.signal <- struct{}{}:
// Signaled successfully
default:
// Already signaled - worker will process all dirty entries on next iteration
}
}
// Run is the main event loop. Sleeps on signal channel.
func (w *ReplicationWorker) Run(ctx context.Context) {
for {
select {
case <-ctx.Done():
w.db.Close()
return
case <-w.signal:
w.replicateWithRetry(ctx)
}
}
}
// replicateWithRetry attempts replication with exponential backoff.
func (w *ReplicationWorker) replicateWithRetry(ctx context.Context) {
maxRetries := 5
for attempt := 0; attempt < maxRetries; attempt++ {
if attempt > 0 {
// Exponential backoff: 1s, 5s, 25s, 125s, 625s (~10min max)
backoff := time.Duration(math.Pow(5, float64(attempt-1))) * time.Second
if backoff > 10*time.Minute {
backoff = 10 * time.Minute
}
time.Sleep(backoff)
}
err := w.replicateBatch()
if err == nil {
return // Success
}
log.Printf("Replication attempt %d failed: %v", attempt+1, err)
// Check for context cancellation
select {
case <-ctx.Done():
return
default:
}
}
// Max retries exceeded - alert operator
Current.AlertOperator(ctx, "replication_failed",
"Backup POP unreachable after max retries", map[string]any{
"backup_pop": w.config.BackupPOP.URL,
"retries": maxRetries,
})
}
// replicateBatch sends all dirty entries to backup POP.
func (w *ReplicationWorker) replicateBatch() error {
// Get up to batch size dirty entries
entries, err := lib.EntryListDirty(w.db, w.config.Replication.BatchSize)
if err != nil {
return err
}
if len(entries) == 0 {
return nil // Nothing to replicate
}
// TODO: POST to backup POP
// TODO: On success, mark all replicated
// TODO: On failure, entries stay dirty for retry
log.Printf("Replicating %d entries to %s", len(entries), w.config.BackupPOP.URL)
return nil
}
// ReplicatedEntry represents an entry being sent to backup POP.
type ReplicatedEntry struct {
EntryID string `json:"entry_id"`
Type string `json:"type"`
Title string `json:"title"`
TitleIdx string `json:"title_idx"`
Data []byte `json:"data"` // Encrypted blob
DataLevel int `json:"data_level"`
Scopes string `json:"scopes"`
CreatedAt int64 `json:"created_at"`
UpdatedAt int64 `json:"updated_at"`
Version int `json:"version"`
}
// ReplicationRequest is sent to backup POP.
type ReplicationRequest struct {
SourcePOP string `json:"source_pop"`
Entries []ReplicatedEntry `json:"entries"`
Timestamp int64 `json:"timestamp"`
}
// ReplicationResponse from backup POP.
type ReplicationResponse struct {
Accepted []string `json:"accepted"` // EntryIDs successfully stored
Rejected []string `json:"rejected"` // EntryIDs failed validation
Duplicate []string `json:"duplicate"` // EntryIDs already present (version conflict)
}