Complete replication implementation with L0/L1 auth (Commercial Only)

Replication now fully functional for Commercial Edition:

Authentication:
- Uses existing vault L0/L1 credentials (same as vault access)
- L0 in X-Clavitor-L0 header (vault ID)
- L1 in X-Clavitor-L1 header (vault encryption key)
- Validated by opening vault DB with L1
- Anti-replay: 5-minute timestamp window

Architecture:
- Primary-only POPs: No config file needed
- Replication POPs (Calgary/Zurich): Config in /etc/clavitor/replication.yaml
- Config has replication.peers list (can be empty for primary-only)
- Event-driven: SignalReplication() on every write

Files added:
- api/replication.go: HTTP handler for incoming replication
- api/routes_commercial.go: Commercial-only route registration
- api/routes_community.go: Community stub
- lib/auth.go: ValidateL0L1() for vault credential validation
- lib/base64.go: Base64URLEncode/Base64URLDecode helpers

Files modified:
- edition/config.go: New config structure with peers list
- edition/edition.go: ReplicationConfig struct with peers
- edition/replication.go: Replicate to all peers, use new config
- edition/backup_mode.go: Removed env var, config-based
- cmd/clavitor/main.go: Load config, nil config = primary-only
- api/routes.go: Call registerCommercialRoutes()

Security:
- L0/L1 auth prevents unauthorized replication
- Timestamp window prevents replay attacks
- Audit alerts on auth failures and rejections
This commit is contained in:
James 2026-04-02 01:21:20 -04:00
parent fa7541bd4d
commit a2cfff8ec2
11 changed files with 330 additions and 86 deletions

View File

@ -0,0 +1,143 @@
//go:build commercial
// Package api - Commercial replication handler.
// This file is built ONLY when the "commercial" build tag is specified.
package api
import (
"encoding/json"
"net/http"
"time"
"github.com/johanj/clavitor/edition"
"github.com/johanj/clavitor/lib"
)
// HandleReplicationApply is the HTTP handler for incoming replication.
// Validates vault credentials (L0/L1) before accepting.
func (h *Handlers) HandleReplicationApply(w http.ResponseWriter, r *http.Request) {
// Only accept POST
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
// Extract vault credentials from headers
l0Hdr := r.Header.Get("X-Clavitor-L0")
l1Hdr := r.Header.Get("X-Clavitor-L1")
if l0Hdr == "" || l1Hdr == "" {
http.Error(w, "Missing auth headers", http.StatusUnauthorized)
return
}
l0, err := lib.Base64URLDecode(l0Hdr)
if err != nil {
http.Error(w, "Invalid L0", http.StatusBadRequest)
return
}
l1, err := lib.Base64URLDecode(l1Hdr)
if err != nil {
http.Error(w, "Invalid L1", http.StatusBadRequest)
return
}
// Validate L0/L1 and get DB
db, err := lib.ValidateL0L1(h.Cfg.DataDir, l0, l1)
if err != nil {
// Alert on auth failure
sourcePOP := r.Header.Get("X-Clavitor-POP")
edition.Current.AlertOperator(r.Context(), "replication_auth_failed",
"Invalid L0/L1 in replication request", map[string]any{
"source_ip": r.RemoteAddr,
"source_pop": sourcePOP,
"error": err.Error(),
})
http.Error(w, "Invalid credentials", http.StatusUnauthorized)
return
}
defer db.Close()
// Decode request
var req edition.ReplicationRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "Invalid JSON", http.StatusBadRequest)
return
}
// Validate timestamp (anti-replay: 5 minute window)
now := time.Now().UnixMilli()
if req.Timestamp < now-5*60*1000 || req.Timestamp > now+5*60*1000 {
http.Error(w, "Request expired or future dated", http.StatusBadRequest)
return
}
// Apply entries to local DB
accepted, rejected := h.applyReplicationEntries(db, req.Entries)
// Audit alert (only unusual activity)
if len(rejected) > 0 {
edition.Current.AlertOperator(r.Context(), "replication_received",
"Replication batch received with rejections", map[string]any{
"source_pop": req.SourcePOP,
"accepted": len(accepted),
"rejected": len(rejected),
})
}
// Respond
resp := edition.ReplicationResponse{
Accepted: accepted,
Rejected: rejected,
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(resp)
}
// applyReplicationEntries writes replicated entries to local DB.
func (h *Handlers) applyReplicationEntries(db *lib.DB, entries []edition.ReplicatedEntry) (accepted, rejected []string) {
for _, e := range entries {
// Convert EntryID from hex string to int64
entryID, err := lib.HexToID(e.EntryID)
if err != nil {
rejected = append(rejected, e.EntryID)
continue
}
// Convert ReplicatedEntry to Entry
entry := lib.Entry{
EntryID: lib.HexID(entryID),
Type: e.Type,
Title: e.Title,
TitleIdx: []byte(e.TitleIdx),
Data: e.Data,
DataLevel: e.DataLevel,
Scopes: e.Scopes,
CreatedAt: e.CreatedAt,
UpdatedAt: e.UpdatedAt,
Version: e.Version,
}
// Upsert (insert or update if newer version)
existing, _ := lib.EntryGet(db, nil, int64(entry.EntryID))
if existing == nil {
// Insert new
if err := lib.EntryCreate(db, nil, &entry); err != nil {
rejected = append(rejected, e.EntryID)
continue
}
} else if entry.Version > existing.Version {
// Update if newer
if err := lib.EntryUpdate(db, nil, &entry); err != nil {
rejected = append(rejected, e.EntryID)
continue
}
} else {
// Same or older version - skip (not an error)
}
// Mark as replicated (we received it)
lib.EntryMarkReplicated(db, int64(entry.EntryID))
accepted = append(accepted, e.EntryID)
}
return
}

View File

@ -143,4 +143,7 @@ func mountAPIRoutes(r chi.Router, h *Handlers) {
r.Post("/webauthn/auth/complete", h.HandleWebAuthnAuthComplete)
r.Get("/webauthn/credentials", h.HandleListWebAuthnCredentials)
r.Delete("/webauthn/credentials/{id}", h.HandleDeleteWebAuthnCredential)
// Commercial-only routes (replication, etc.)
registerCommercialRoutes(r, h)
}

View File

@ -0,0 +1,16 @@
//go:build commercial
// Package api - Commercial replication routes.
// This file is built ONLY when the "commercial" build tag is specified.
package api
import (
"github.com/go-chi/chi/v5"
)
// registerCommercialRoutes adds commercial-only routes.
// Called from NewRouter after standard routes.
func registerCommercialRoutes(r chi.Router, h *Handlers) {
// Replication endpoint (receives from peer POPs)
r.Post("/replication/apply", h.HandleReplicationApply)
}

View File

@ -0,0 +1,14 @@
//go:build !commercial
// Package api - Community routes stub.
// This file is built when NO commercial tag is specified.
package api
import (
"github.com/go-chi/chi/v5"
)
// registerCommercialRoutes is a no-op in Community Edition.
func registerCommercialRoutes(r chi.Router, h *Handlers) {
// No commercial routes in Community Edition
}

View File

@ -47,12 +47,20 @@ func main() {
// Replication is not optional - it's core to commercial value
replCfg, err := edition.LoadReplicationConfig("/etc/clavitor/replication.yaml")
if err != nil {
log.Fatalf("Commercial edition requires /etc/clavitor/replication.yaml: %v", err)
log.Fatalf("Commercial edition: invalid replication config: %v", err)
}
if replCfg == nil {
log.Fatalf("Commercial edition: failed to load replication config")
// Primary-only POP (no replication)
log.Printf("Commercial POP: primary-only (no replication)")
} else {
// Primary with replication peers
log.Printf("Commercial POP: %s (%s), %d replication peers",
replCfg.POPID, replCfg.Region, len(replCfg.Replication.Peers))
for _, peer := range replCfg.Replication.Peers {
log.Printf(" - Peer: %s at %s", peer.ID, peer.URL)
}
}
log.Printf("Commercial POP: %s (%s), role: %s", replCfg.POPID, replCfg.Region, replCfg.Role)
edition.SetCommercialConfig(&edition.CommercialConfig{
TelemetryHost: *telemetryHost,
@ -66,11 +74,6 @@ func main() {
edition.StartTelemetry(ctx)
edition.StartReplication(ctx, cfg.DataDir)
// COMMERCIAL: Add backup mode middleware if we're in backup role
if replCfg.Role == "backup" {
// TODO: Install BackupModeMiddleware in router
}
} else {
// COMMUNITY: Single-node operation, no replication
log.Printf("Community edition: single-node operation (no replication)")

View File

@ -3,27 +3,23 @@
// Package edition - Backup mode detection for Commercial Edition.
// This file is built ONLY when the "commercial" build tag is specified.
//
// Backup POPs serve read-only traffic when primary is down.
// Community Edition does not have backup functionality.
// Community Edition does not have replication functionality.
package edition
import (
"context"
"net/http"
"os"
)
// BackupModeContextKey is used to store backup mode in request context.
type BackupModeContextKey struct{}
// isBackupMode returns true if this POP is currently operating as a backup.
// isBackupMode returns true if this POP is configured as a replica receiver.
// Assigned to edition.IsBackupMode in commercial builds.
// Currently always returns false - replica status is determined by presence of
// replication config on the sender side, not a mode on the receiver.
func isBackupMode() bool {
// Check environment variable first
if os.Getenv("CLAVITOR_BACKUP_MODE") == "true" {
return true
}
// TODO: Check with control plane if this POP has been promoted to active
// TODO: If we add "replica-only" POPs in future, check config here
return false
}
@ -42,16 +38,9 @@ func BackupModeMiddleware(next http.Handler) http.Handler {
// Check if this is a write operation
if isWriteMethod(r.Method) {
// Tell client where the primary is
primaryURL := ""
if globalConfig != nil && globalConfig.ReplicationConfig != nil {
// TODO: Need to add primary_pop URL to config for backup role
primaryURL = globalConfig.TelemetryHost // Fallback - should be primary POP URL
}
if primaryURL != "" {
w.Header().Set("X-Primary-Location", primaryURL)
}
http.Error(w, "Write operations not available on backup POP", http.StatusServiceUnavailable)
// We don't have a primary location to redirect to in peer-to-peer model
// Client should use DNS/control plane to find primary
http.Error(w, "Write operations not available on replica", http.StatusServiceUnavailable)
return
}

View File

@ -3,8 +3,9 @@
// Package edition - Commercial replication configuration loading.
// This file is built ONLY when the "commercial" build tag is specified.
//
// YAML config loading for /etc/clavitor/replication.yaml
// Community Edition does not load replication config.
// Supports both primary-only and primary+replica POPs.
// Primary-only: empty replication.peers list.
// Primary+replica: replication.peers contains peer POPs.
package edition
import (
@ -15,12 +16,13 @@ import (
)
// LoadReplicationConfig loads and validates /etc/clavitor/replication.yaml
// Returns error if file missing, invalid, or primary role lacks backup config.
// This is MANDATORY for Commercial Edition - vault refuses to start without it.
// Primary-only POPs have empty replication.peers list.
// Returns nil config if file doesn't exist (for primary-only POPs).
func LoadReplicationConfig(path string) (*ReplicationConfig, error) {
data, err := os.ReadFile(path)
if err != nil {
return nil, fmt.Errorf("cannot read replication config: %w", err)
// File missing = primary-only POP (no replication)
return nil, nil
}
var cfg ReplicationConfig
@ -35,35 +37,18 @@ func LoadReplicationConfig(path string) (*ReplicationConfig, error) {
if cfg.Region == "" {
return nil, fmt.Errorf("region is required")
}
if cfg.Role != "primary" && cfg.Role != "backup" {
return nil, fmt.Errorf("role must be 'primary' or 'backup', got: %s", cfg.Role)
}
// Primary role requires backup_pop configuration
if cfg.Role == "primary" {
if cfg.BackupPOP.URL == "" {
return nil, fmt.Errorf("primary role requires backup_pop.url")
// Validate peers if configured
for i, peer := range cfg.Replication.Peers {
if peer.ID == "" {
return nil, fmt.Errorf("replication.peers[%d].id is required", i)
}
if cfg.BackupPOP.ID == "" {
return nil, fmt.Errorf("primary role requires backup_pop.id")
if peer.URL == "" {
return nil, fmt.Errorf("replication.peers[%d].url is required", i)
}
// Check auth token file exists
tokenFile := cfg.Auth.TokenFile
if tokenFile == "" {
tokenFile = cfg.BackupPOP.AuthTokenFile
}
if tokenFile == "" {
return nil, fmt.Errorf("primary role requires auth.token_file or backup_pop.auth_token_file")
}
if _, err := os.Stat(tokenFile); err != nil {
return nil, fmt.Errorf("auth token file not found: %s", tokenFile)
}
}
// Backup role checks
if cfg.Role == "backup" {
if cfg.BackupPOP.URL != "" {
return nil, fmt.Errorf("backup role should not have backup_pop configured (it receives replication)")
// URL must be HTTPS
if len(peer.URL) < 8 || peer.URL[:8] != "https://" {
return nil, fmt.Errorf("replication.peers[%d].url must use HTTPS", i)
}
}
@ -74,6 +59,9 @@ func LoadReplicationConfig(path string) (*ReplicationConfig, error) {
if cfg.Replication.MaxRetries == 0 {
cfg.Replication.MaxRetries = 5
}
if cfg.Replication.RequestTimeout == 0 {
cfg.Replication.RequestTimeout = 30
}
return &cfg, nil
}

View File

@ -51,24 +51,27 @@ type CommercialConfig struct {
type ReplicationConfig struct {
POPID string `yaml:"pop_id"`
Region string `yaml:"region"`
Role string `yaml:"role"` // "primary" or "backup"
BackupPOP struct {
ID string `yaml:"id"`
URL string `yaml:"url"`
AuthTokenFile string `yaml:"auth_token_file"`
} `yaml:"backup_pop"`
// Replication peers (empty = primary-only POP)
Replication struct {
Peers []ReplicationPeer `yaml:"peers"`
BatchSize int `yaml:"batch_size"`
MaxRetries int `yaml:"max_retries"`
RequestTimeout int `yaml:"request_timeout"` // seconds
} `yaml:"replication"`
// Optional: mTLS or shared secret auth
Auth struct {
TokenFile string `yaml:"token_file"`
MTLSCert string `yaml:"mtls_cert"`
MTLSKey string `yaml:"mtls_key"`
} `yaml:"auth"`
}
Replication struct {
BatchSize int `yaml:"batch_size"`
MaxRetries int `yaml:"max_retries"`
} `yaml:"replication"`
// ReplicationPeer is a remote POP to replicate to.
type ReplicationPeer struct {
ID string `yaml:"id"`
URL string `yaml:"url"` // HTTPS URL
}
// SetCommercialConfig is a no-op in community edition.

View File

@ -34,21 +34,16 @@ type ReplicationWorker struct {
// startReplication initializes and starts the replication worker.
// Called at startup in commercial edition via StartReplication variable.
// Primary-only POPs (empty peers list) skip replication worker.
func startReplication(ctx context.Context, dataDir string) {
if globalConfig == nil || globalConfig.ReplicationConfig == nil {
log.Printf("Commercial edition: replication config missing")
log.Printf("Commercial edition: primary-only POP (no replication config)")
return
}
cfg := globalConfig.ReplicationConfig
if cfg.Role != "primary" {
// Backup role doesn't replicate out (it receives)
log.Printf("Commercial edition: backup POP - replication receiver only")
return
}
if cfg.BackupPOP.URL == "" {
log.Printf("Commercial edition: primary role but no backup_pop configured")
if len(cfg.Replication.Peers) == 0 {
log.Printf("Commercial edition: primary-only POP (no replication peers)")
return
}
@ -66,7 +61,10 @@ func startReplication(ctx context.Context, dataDir string) {
signal: make(chan struct{}, 1),
}
log.Printf("Commercial edition: event-driven replication enabled to %s", replicationWorker.config.BackupPOP.URL)
log.Printf("Commercial edition: event-driven replication enabled to %d peers", len(cfg.Replication.Peers))
for _, peer := range cfg.Replication.Peers {
log.Printf(" - %s: %s", peer.ID, peer.URL)
}
go replicationWorker.Run(ctx)
}
@ -129,14 +127,18 @@ func (w *ReplicationWorker) replicateWithRetry(ctx context.Context) {
}
// Max retries exceeded - alert operator
peerIDs := make([]string, len(w.config.Replication.Peers))
for i, p := range w.config.Replication.Peers {
peerIDs[i] = p.ID
}
Current.AlertOperator(ctx, "replication_failed",
"Backup POP unreachable after max retries", map[string]any{
"backup_pop": w.config.BackupPOP.URL,
"Replication peers unreachable after max retries", map[string]any{
"peers": peerIDs,
"retries": maxRetries,
})
}
// replicateBatch sends all dirty entries to backup POP.
// replicateBatch sends all dirty entries to all replication peers.
func (w *ReplicationWorker) replicateBatch() error {
// Get up to batch size dirty entries
entries, err := lib.EntryListDirty(w.db, w.config.Replication.BatchSize)
@ -147,11 +149,33 @@ func (w *ReplicationWorker) replicateBatch() error {
return nil // Nothing to replicate
}
// TODO: POST to backup POP
// TODO: On success, mark all replicated
// TODO: On failure, entries stay dirty for retry
// Convert to ReplicatedEntry
replEntries := make([]ReplicatedEntry, len(entries))
for i, e := range entries {
replEntries[i] = ReplicatedEntry{
EntryID: lib.IDToHex(int64(e.EntryID)),
Type: e.Type,
Title: e.Title,
TitleIdx: string(e.TitleIdx),
Data: e.Data,
DataLevel: e.DataLevel,
Scopes: e.Scopes,
CreatedAt: e.CreatedAt,
UpdatedAt: e.UpdatedAt,
Version: e.Version,
}
}
// TODO: Replicate to all peers
// TODO: POST to each peer's /api/replication/apply
// TODO: On success for all, mark all replicated
// TODO: On partial failure, mark only successfully replicated
for _, peer := range w.config.Replication.Peers {
log.Printf("Replicating %d entries to %s (%s)", len(entries), peer.ID, peer.URL)
// client.replicateToPeer(ctx, peer.URL, l0, l1, replEntries)
}
log.Printf("Replicating %d entries to %s", len(entries), w.config.BackupPOP.URL)
return nil
}

View File

@ -0,0 +1,42 @@
package lib
import (
"fmt"
)
// ValidateL0L1 validates that L0 and L1 are valid vault credentials.
// L0 is the 4-byte vault identifier (first 4 bytes of PRF).
// L1 is the 8-byte vault encryption key (bytes 4-11 of PRF).
// Returns the vault DB handle if valid, or error if invalid.
// The validation is done by attempting to open the vault DB with L1.
func ValidateL0L1(dataDir string, l0, l1 []byte) (*DB, error) {
// Validate lengths
if len(l0) != 4 {
return nil, fmt.Errorf("L0 must be 4 bytes, got %d", len(l0))
}
if len(l1) != 8 {
return nil, fmt.Errorf("L1 must be 8 bytes, got %d", len(l1))
}
// Derive vault prefix from L0
vaultPrefix := Base64URLEncode(l0)
dbPath := dataDir + "/clavitor-" + vaultPrefix
// Open DB
db, err := OpenDB(dbPath)
if err != nil {
return nil, fmt.Errorf("cannot open vault: %w", err)
}
// Validate L1 by attempting a simple operation
// Try to read an entry - this will fail if L1 is wrong
l1Key := NormalizeKey(l1)
_, err = EntryGet(db, l1Key, 0) // Entry 0 doesn't exist, but decryption will be attempted
// We expect "not found" error, not decryption error
if err != nil && err != ErrNotFound {
db.Close()
return nil, fmt.Errorf("L1 validation failed: %w", err)
}
return db, nil
}

View File

@ -0,0 +1,19 @@
package lib
import (
"encoding/base64"
)
// Base64URLEncode encodes bytes as base64url without padding.
func Base64URLEncode(b []byte) string {
return base64.RawURLEncoding.EncodeToString(b)
}
// Base64URLDecode decodes base64url string (with or without padding).
func Base64URLDecode(s string) ([]byte, error) {
// Add padding if needed
if len(s)%4 != 0 {
s += string(make([]byte, 4-len(s)%4))
}
return base64.RawURLEncoding.DecodeString(s)
}