clavitor/clavis/clavis-vault/edition/replication.go

209 lines
5.9 KiB
Go

//go:build commercial
// Package edition - Commercial replication implementation.
// This file is built ONLY when the "commercial" build tag is specified.
//
// Event-driven async replication to backup POPs (Calgary/Zurich).
// No polling - worker sleeps until woken by signal.
// Community Edition does not have replication functionality.
//
// This is PROPRIETARY code - part of Commercial Edition licensing.
package edition
import (
"context"
"log"
"math"
"sync"
"time"
"github.com/johanj/clavitor/lib"
)
// replicationWorker is the singleton event-driven replication worker.
var replicationWorker *ReplicationWorker
// ReplicationWorker handles event-driven async replication.
// No polling - sleeps until SignalReplication() wakes it.
type ReplicationWorker struct {
db *lib.DB
config *ReplicationConfig
signal chan struct{} // Buffered (size 1), wakes worker
mu sync.Mutex
}
// startReplication initializes and starts the replication worker.
// Called at startup in commercial edition via StartReplication variable.
// Primary-only POPs (empty peers list) skip replication worker.
func startReplication(ctx context.Context, dataDir string) {
if globalConfig == nil || globalConfig.ReplicationConfig == nil {
log.Printf("Commercial edition: primary-only POP (no replication config)")
return
}
cfg := globalConfig.ReplicationConfig
if len(cfg.Replication.Peers) == 0 {
log.Printf("Commercial edition: primary-only POP (no replication peers)")
return
}
// Open DB for replication worker
dbPath := dataDir + "/clavitor-*.db" // TODO: proper vault lookup
db, err := lib.OpenDB(dbPath)
if err != nil {
log.Printf("Commercial edition: replication disabled (cannot open DB): %v", err)
return
}
replicationWorker = &ReplicationWorker{
db: db,
config: globalConfig.ReplicationConfig,
signal: make(chan struct{}, 1),
}
log.Printf("Commercial edition: event-driven replication enabled to %d peers", len(cfg.Replication.Peers))
for _, peer := range cfg.Replication.Peers {
log.Printf(" - %s: %s", peer.ID, peer.URL)
}
go replicationWorker.Run(ctx)
}
// signalReplication wakes the replication worker (non-blocking).
// Called by write handlers after marking entries dirty.
// Assigned to edition.SignalReplication in commercial builds.
func signalReplication() {
if replicationWorker == nil {
return
}
select {
case replicationWorker.signal <- struct{}{}:
// Signaled successfully
default:
// Already signaled - worker will process all dirty entries on next iteration
}
}
// Run is the main event loop. Sleeps on signal channel.
func (w *ReplicationWorker) Run(ctx context.Context) {
for {
select {
case <-ctx.Done():
w.db.Close()
return
case <-w.signal:
w.replicateWithRetry(ctx)
}
}
}
// replicateWithRetry attempts replication with exponential backoff.
func (w *ReplicationWorker) replicateWithRetry(ctx context.Context) {
maxRetries := 5
for attempt := 0; attempt < maxRetries; attempt++ {
if attempt > 0 {
// Exponential backoff: 1s, 5s, 25s, 125s, 625s (~10min max)
backoff := time.Duration(math.Pow(5, float64(attempt-1))) * time.Second
if backoff > 10*time.Minute {
backoff = 10 * time.Minute
}
time.Sleep(backoff)
}
err := w.replicateBatch()
if err == nil {
return // Success
}
log.Printf("Replication attempt %d failed: %v", attempt+1, err)
// Check for context cancellation
select {
case <-ctx.Done():
return
default:
}
}
// Max retries exceeded - alert operator
peerIDs := make([]string, len(w.config.Replication.Peers))
for i, p := range w.config.Replication.Peers {
peerIDs[i] = p.ID
}
Current.AlertOperator(ctx, "replication_failed",
"Replication peers unreachable after max retries", map[string]any{
"peers": peerIDs,
"retries": maxRetries,
})
}
// replicateBatch sends all dirty entries to all replication peers.
func (w *ReplicationWorker) replicateBatch() error {
// Get up to batch size dirty entries
entries, err := lib.EntryListDirty(w.db, w.config.Replication.BatchSize)
if err != nil {
return err
}
if len(entries) == 0 {
return nil // Nothing to replicate
}
// Convert to ReplicatedEntry
replEntries := make([]ReplicatedEntry, len(entries))
for i, e := range entries {
replEntries[i] = ReplicatedEntry{
EntryID: lib.IDToHex(int64(e.EntryID)),
Type: e.Type,
Title: e.Title,
TitleIdx: string(e.TitleIdx),
Data: e.Data,
DataLevel: e.DataLevel,
Scopes: e.Scopes,
CreatedAt: e.CreatedAt,
UpdatedAt: e.UpdatedAt,
Version: e.Version,
}
}
// TODO: Replicate to all peers
// TODO: POST to each peer's /api/replication/apply
// TODO: On success for all, mark all replicated
// TODO: On partial failure, mark only successfully replicated
for _, peer := range w.config.Replication.Peers {
log.Printf("Replicating %d entries to %s (%s)", len(entries), peer.ID, peer.URL)
// client.replicateToPeer(ctx, peer.URL, l0, l1, replEntries)
}
return nil
}
// ReplicatedEntry represents an entry being sent to backup POP.
type ReplicatedEntry struct {
EntryID string `json:"entry_id"`
Type string `json:"type"`
Title string `json:"title"`
TitleIdx string `json:"title_idx"`
Data []byte `json:"data"` // Encrypted blob
DataLevel int `json:"data_level"`
Scopes string `json:"scopes"`
CreatedAt int64 `json:"created_at"`
UpdatedAt int64 `json:"updated_at"`
Version int `json:"version"`
}
// ReplicationRequest is sent to backup POP.
type ReplicationRequest struct {
SourcePOP string `json:"source_pop"`
Entries []ReplicatedEntry `json:"entries"`
Timestamp int64 `json:"timestamp"`
}
// ReplicationResponse from backup POP.
type ReplicationResponse struct {
Accepted []string `json:"accepted"` // EntryIDs successfully stored
Rejected []string `json:"rejected"` // EntryIDs failed validation
Duplicate []string `json:"duplicate"` // EntryIDs already present (version conflict)
}