diff --git a/clavis/clavis-vault/api/replication.go b/clavis/clavis-vault/api/replication.go new file mode 100644 index 0000000..92986e5 --- /dev/null +++ b/clavis/clavis-vault/api/replication.go @@ -0,0 +1,143 @@ +//go:build commercial + +// Package api - Commercial replication handler. +// This file is built ONLY when the "commercial" build tag is specified. +package api + +import ( + "encoding/json" + "net/http" + "time" + + "github.com/johanj/clavitor/edition" + "github.com/johanj/clavitor/lib" +) + +// HandleReplicationApply is the HTTP handler for incoming replication. +// Validates vault credentials (L0/L1) before accepting. +func (h *Handlers) HandleReplicationApply(w http.ResponseWriter, r *http.Request) { + // Only accept POST + if r.Method != http.MethodPost { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + // Extract vault credentials from headers + l0Hdr := r.Header.Get("X-Clavitor-L0") + l1Hdr := r.Header.Get("X-Clavitor-L1") + if l0Hdr == "" || l1Hdr == "" { + http.Error(w, "Missing auth headers", http.StatusUnauthorized) + return + } + + l0, err := lib.Base64URLDecode(l0Hdr) + if err != nil { + http.Error(w, "Invalid L0", http.StatusBadRequest) + return + } + l1, err := lib.Base64URLDecode(l1Hdr) + if err != nil { + http.Error(w, "Invalid L1", http.StatusBadRequest) + return + } + + // Validate L0/L1 and get DB + db, err := lib.ValidateL0L1(h.Cfg.DataDir, l0, l1) + if err != nil { + // Alert on auth failure + sourcePOP := r.Header.Get("X-Clavitor-POP") + edition.Current.AlertOperator(r.Context(), "replication_auth_failed", + "Invalid L0/L1 in replication request", map[string]any{ + "source_ip": r.RemoteAddr, + "source_pop": sourcePOP, + "error": err.Error(), + }) + http.Error(w, "Invalid credentials", http.StatusUnauthorized) + return + } + defer db.Close() + + // Decode request + var req edition.ReplicationRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, "Invalid JSON", http.StatusBadRequest) + return + } + + // Validate timestamp (anti-replay: 5 minute window) + now := time.Now().UnixMilli() + if req.Timestamp < now-5*60*1000 || req.Timestamp > now+5*60*1000 { + http.Error(w, "Request expired or future dated", http.StatusBadRequest) + return + } + + // Apply entries to local DB + accepted, rejected := h.applyReplicationEntries(db, req.Entries) + + // Audit alert (only unusual activity) + if len(rejected) > 0 { + edition.Current.AlertOperator(r.Context(), "replication_received", + "Replication batch received with rejections", map[string]any{ + "source_pop": req.SourcePOP, + "accepted": len(accepted), + "rejected": len(rejected), + }) + } + + // Respond + resp := edition.ReplicationResponse{ + Accepted: accepted, + Rejected: rejected, + } + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(resp) +} + +// applyReplicationEntries writes replicated entries to local DB. +func (h *Handlers) applyReplicationEntries(db *lib.DB, entries []edition.ReplicatedEntry) (accepted, rejected []string) { + for _, e := range entries { + // Convert EntryID from hex string to int64 + entryID, err := lib.HexToID(e.EntryID) + if err != nil { + rejected = append(rejected, e.EntryID) + continue + } + + // Convert ReplicatedEntry to Entry + entry := lib.Entry{ + EntryID: lib.HexID(entryID), + Type: e.Type, + Title: e.Title, + TitleIdx: []byte(e.TitleIdx), + Data: e.Data, + DataLevel: e.DataLevel, + Scopes: e.Scopes, + CreatedAt: e.CreatedAt, + UpdatedAt: e.UpdatedAt, + Version: e.Version, + } + + // Upsert (insert or update if newer version) + existing, _ := lib.EntryGet(db, nil, int64(entry.EntryID)) + if existing == nil { + // Insert new + if err := lib.EntryCreate(db, nil, &entry); err != nil { + rejected = append(rejected, e.EntryID) + continue + } + } else if entry.Version > existing.Version { + // Update if newer + if err := lib.EntryUpdate(db, nil, &entry); err != nil { + rejected = append(rejected, e.EntryID) + continue + } + } else { + // Same or older version - skip (not an error) + } + + // Mark as replicated (we received it) + lib.EntryMarkReplicated(db, int64(entry.EntryID)) + accepted = append(accepted, e.EntryID) + } + return +} diff --git a/clavis/clavis-vault/api/routes.go b/clavis/clavis-vault/api/routes.go index 9b4cbfc..e07c094 100644 --- a/clavis/clavis-vault/api/routes.go +++ b/clavis/clavis-vault/api/routes.go @@ -143,4 +143,7 @@ func mountAPIRoutes(r chi.Router, h *Handlers) { r.Post("/webauthn/auth/complete", h.HandleWebAuthnAuthComplete) r.Get("/webauthn/credentials", h.HandleListWebAuthnCredentials) r.Delete("/webauthn/credentials/{id}", h.HandleDeleteWebAuthnCredential) + + // Commercial-only routes (replication, etc.) + registerCommercialRoutes(r, h) } diff --git a/clavis/clavis-vault/api/routes_commercial.go b/clavis/clavis-vault/api/routes_commercial.go new file mode 100644 index 0000000..559967d --- /dev/null +++ b/clavis/clavis-vault/api/routes_commercial.go @@ -0,0 +1,16 @@ +//go:build commercial + +// Package api - Commercial replication routes. +// This file is built ONLY when the "commercial" build tag is specified. +package api + +import ( + "github.com/go-chi/chi/v5" +) + +// registerCommercialRoutes adds commercial-only routes. +// Called from NewRouter after standard routes. +func registerCommercialRoutes(r chi.Router, h *Handlers) { + // Replication endpoint (receives from peer POPs) + r.Post("/replication/apply", h.HandleReplicationApply) +} diff --git a/clavis/clavis-vault/api/routes_community.go b/clavis/clavis-vault/api/routes_community.go new file mode 100644 index 0000000..38ed081 --- /dev/null +++ b/clavis/clavis-vault/api/routes_community.go @@ -0,0 +1,14 @@ +//go:build !commercial + +// Package api - Community routes stub. +// This file is built when NO commercial tag is specified. +package api + +import ( + "github.com/go-chi/chi/v5" +) + +// registerCommercialRoutes is a no-op in Community Edition. +func registerCommercialRoutes(r chi.Router, h *Handlers) { + // No commercial routes in Community Edition +} diff --git a/clavis/clavis-vault/cmd/clavitor/main.go b/clavis/clavis-vault/cmd/clavitor/main.go index 9d3e11d..ce7f192 100644 --- a/clavis/clavis-vault/cmd/clavitor/main.go +++ b/clavis/clavis-vault/cmd/clavitor/main.go @@ -47,12 +47,20 @@ func main() { // Replication is not optional - it's core to commercial value replCfg, err := edition.LoadReplicationConfig("/etc/clavitor/replication.yaml") if err != nil { - log.Fatalf("Commercial edition requires /etc/clavitor/replication.yaml: %v", err) + log.Fatalf("Commercial edition: invalid replication config: %v", err) } + if replCfg == nil { - log.Fatalf("Commercial edition: failed to load replication config") + // Primary-only POP (no replication) + log.Printf("Commercial POP: primary-only (no replication)") + } else { + // Primary with replication peers + log.Printf("Commercial POP: %s (%s), %d replication peers", + replCfg.POPID, replCfg.Region, len(replCfg.Replication.Peers)) + for _, peer := range replCfg.Replication.Peers { + log.Printf(" - Peer: %s at %s", peer.ID, peer.URL) + } } - log.Printf("Commercial POP: %s (%s), role: %s", replCfg.POPID, replCfg.Region, replCfg.Role) edition.SetCommercialConfig(&edition.CommercialConfig{ TelemetryHost: *telemetryHost, @@ -66,11 +74,6 @@ func main() { edition.StartTelemetry(ctx) edition.StartReplication(ctx, cfg.DataDir) - - // COMMERCIAL: Add backup mode middleware if we're in backup role - if replCfg.Role == "backup" { - // TODO: Install BackupModeMiddleware in router - } } else { // COMMUNITY: Single-node operation, no replication log.Printf("Community edition: single-node operation (no replication)") diff --git a/clavis/clavis-vault/edition/backup_mode.go b/clavis/clavis-vault/edition/backup_mode.go index 1d2742e..f4c52f7 100644 --- a/clavis/clavis-vault/edition/backup_mode.go +++ b/clavis/clavis-vault/edition/backup_mode.go @@ -3,27 +3,23 @@ // Package edition - Backup mode detection for Commercial Edition. // This file is built ONLY when the "commercial" build tag is specified. // -// Backup POPs serve read-only traffic when primary is down. -// Community Edition does not have backup functionality. +// Community Edition does not have replication functionality. package edition import ( "context" "net/http" - "os" ) // BackupModeContextKey is used to store backup mode in request context. type BackupModeContextKey struct{} -// isBackupMode returns true if this POP is currently operating as a backup. +// isBackupMode returns true if this POP is configured as a replica receiver. // Assigned to edition.IsBackupMode in commercial builds. +// Currently always returns false - replica status is determined by presence of +// replication config on the sender side, not a mode on the receiver. func isBackupMode() bool { - // Check environment variable first - if os.Getenv("CLAVITOR_BACKUP_MODE") == "true" { - return true - } - // TODO: Check with control plane if this POP has been promoted to active + // TODO: If we add "replica-only" POPs in future, check config here return false } @@ -42,16 +38,9 @@ func BackupModeMiddleware(next http.Handler) http.Handler { // Check if this is a write operation if isWriteMethod(r.Method) { - // Tell client where the primary is - primaryURL := "" - if globalConfig != nil && globalConfig.ReplicationConfig != nil { - // TODO: Need to add primary_pop URL to config for backup role - primaryURL = globalConfig.TelemetryHost // Fallback - should be primary POP URL - } - if primaryURL != "" { - w.Header().Set("X-Primary-Location", primaryURL) - } - http.Error(w, "Write operations not available on backup POP", http.StatusServiceUnavailable) + // We don't have a primary location to redirect to in peer-to-peer model + // Client should use DNS/control plane to find primary + http.Error(w, "Write operations not available on replica", http.StatusServiceUnavailable) return } diff --git a/clavis/clavis-vault/edition/config.go b/clavis/clavis-vault/edition/config.go index d284380..008e1be 100644 --- a/clavis/clavis-vault/edition/config.go +++ b/clavis/clavis-vault/edition/config.go @@ -3,8 +3,9 @@ // Package edition - Commercial replication configuration loading. // This file is built ONLY when the "commercial" build tag is specified. // -// YAML config loading for /etc/clavitor/replication.yaml -// Community Edition does not load replication config. +// Supports both primary-only and primary+replica POPs. +// Primary-only: empty replication.peers list. +// Primary+replica: replication.peers contains peer POPs. package edition import ( @@ -15,12 +16,13 @@ import ( ) // LoadReplicationConfig loads and validates /etc/clavitor/replication.yaml -// Returns error if file missing, invalid, or primary role lacks backup config. -// This is MANDATORY for Commercial Edition - vault refuses to start without it. +// Primary-only POPs have empty replication.peers list. +// Returns nil config if file doesn't exist (for primary-only POPs). func LoadReplicationConfig(path string) (*ReplicationConfig, error) { data, err := os.ReadFile(path) if err != nil { - return nil, fmt.Errorf("cannot read replication config: %w", err) + // File missing = primary-only POP (no replication) + return nil, nil } var cfg ReplicationConfig @@ -35,35 +37,18 @@ func LoadReplicationConfig(path string) (*ReplicationConfig, error) { if cfg.Region == "" { return nil, fmt.Errorf("region is required") } - if cfg.Role != "primary" && cfg.Role != "backup" { - return nil, fmt.Errorf("role must be 'primary' or 'backup', got: %s", cfg.Role) - } - // Primary role requires backup_pop configuration - if cfg.Role == "primary" { - if cfg.BackupPOP.URL == "" { - return nil, fmt.Errorf("primary role requires backup_pop.url") + // Validate peers if configured + for i, peer := range cfg.Replication.Peers { + if peer.ID == "" { + return nil, fmt.Errorf("replication.peers[%d].id is required", i) } - if cfg.BackupPOP.ID == "" { - return nil, fmt.Errorf("primary role requires backup_pop.id") + if peer.URL == "" { + return nil, fmt.Errorf("replication.peers[%d].url is required", i) } - // Check auth token file exists - tokenFile := cfg.Auth.TokenFile - if tokenFile == "" { - tokenFile = cfg.BackupPOP.AuthTokenFile - } - if tokenFile == "" { - return nil, fmt.Errorf("primary role requires auth.token_file or backup_pop.auth_token_file") - } - if _, err := os.Stat(tokenFile); err != nil { - return nil, fmt.Errorf("auth token file not found: %s", tokenFile) - } - } - - // Backup role checks - if cfg.Role == "backup" { - if cfg.BackupPOP.URL != "" { - return nil, fmt.Errorf("backup role should not have backup_pop configured (it receives replication)") + // URL must be HTTPS + if len(peer.URL) < 8 || peer.URL[:8] != "https://" { + return nil, fmt.Errorf("replication.peers[%d].url must use HTTPS", i) } } @@ -74,6 +59,9 @@ func LoadReplicationConfig(path string) (*ReplicationConfig, error) { if cfg.Replication.MaxRetries == 0 { cfg.Replication.MaxRetries = 5 } + if cfg.Replication.RequestTimeout == 0 { + cfg.Replication.RequestTimeout = 30 + } return &cfg, nil } diff --git a/clavis/clavis-vault/edition/edition.go b/clavis/clavis-vault/edition/edition.go index 29e8286..cead79d 100644 --- a/clavis/clavis-vault/edition/edition.go +++ b/clavis/clavis-vault/edition/edition.go @@ -51,24 +51,27 @@ type CommercialConfig struct { type ReplicationConfig struct { POPID string `yaml:"pop_id"` Region string `yaml:"region"` - Role string `yaml:"role"` // "primary" or "backup" - BackupPOP struct { - ID string `yaml:"id"` - URL string `yaml:"url"` - AuthTokenFile string `yaml:"auth_token_file"` - } `yaml:"backup_pop"` + // Replication peers (empty = primary-only POP) + Replication struct { + Peers []ReplicationPeer `yaml:"peers"` + BatchSize int `yaml:"batch_size"` + MaxRetries int `yaml:"max_retries"` + RequestTimeout int `yaml:"request_timeout"` // seconds + } `yaml:"replication"` + // Optional: mTLS or shared secret auth Auth struct { TokenFile string `yaml:"token_file"` MTLSCert string `yaml:"mtls_cert"` MTLSKey string `yaml:"mtls_key"` } `yaml:"auth"` +} - Replication struct { - BatchSize int `yaml:"batch_size"` - MaxRetries int `yaml:"max_retries"` - } `yaml:"replication"` +// ReplicationPeer is a remote POP to replicate to. +type ReplicationPeer struct { + ID string `yaml:"id"` + URL string `yaml:"url"` // HTTPS URL } // SetCommercialConfig is a no-op in community edition. diff --git a/clavis/clavis-vault/edition/replication.go b/clavis/clavis-vault/edition/replication.go index 21aef2f..14bfcbe 100644 --- a/clavis/clavis-vault/edition/replication.go +++ b/clavis/clavis-vault/edition/replication.go @@ -34,21 +34,16 @@ type ReplicationWorker struct { // startReplication initializes and starts the replication worker. // Called at startup in commercial edition via StartReplication variable. +// Primary-only POPs (empty peers list) skip replication worker. func startReplication(ctx context.Context, dataDir string) { if globalConfig == nil || globalConfig.ReplicationConfig == nil { - log.Printf("Commercial edition: replication config missing") + log.Printf("Commercial edition: primary-only POP (no replication config)") return } cfg := globalConfig.ReplicationConfig - if cfg.Role != "primary" { - // Backup role doesn't replicate out (it receives) - log.Printf("Commercial edition: backup POP - replication receiver only") - return - } - - if cfg.BackupPOP.URL == "" { - log.Printf("Commercial edition: primary role but no backup_pop configured") + if len(cfg.Replication.Peers) == 0 { + log.Printf("Commercial edition: primary-only POP (no replication peers)") return } @@ -66,7 +61,10 @@ func startReplication(ctx context.Context, dataDir string) { signal: make(chan struct{}, 1), } - log.Printf("Commercial edition: event-driven replication enabled to %s", replicationWorker.config.BackupPOP.URL) + log.Printf("Commercial edition: event-driven replication enabled to %d peers", len(cfg.Replication.Peers)) + for _, peer := range cfg.Replication.Peers { + log.Printf(" - %s: %s", peer.ID, peer.URL) + } go replicationWorker.Run(ctx) } @@ -129,14 +127,18 @@ func (w *ReplicationWorker) replicateWithRetry(ctx context.Context) { } // Max retries exceeded - alert operator + peerIDs := make([]string, len(w.config.Replication.Peers)) + for i, p := range w.config.Replication.Peers { + peerIDs[i] = p.ID + } Current.AlertOperator(ctx, "replication_failed", - "Backup POP unreachable after max retries", map[string]any{ - "backup_pop": w.config.BackupPOP.URL, - "retries": maxRetries, + "Replication peers unreachable after max retries", map[string]any{ + "peers": peerIDs, + "retries": maxRetries, }) } -// replicateBatch sends all dirty entries to backup POP. +// replicateBatch sends all dirty entries to all replication peers. func (w *ReplicationWorker) replicateBatch() error { // Get up to batch size dirty entries entries, err := lib.EntryListDirty(w.db, w.config.Replication.BatchSize) @@ -147,11 +149,33 @@ func (w *ReplicationWorker) replicateBatch() error { return nil // Nothing to replicate } - // TODO: POST to backup POP - // TODO: On success, mark all replicated - // TODO: On failure, entries stay dirty for retry + // Convert to ReplicatedEntry + replEntries := make([]ReplicatedEntry, len(entries)) + for i, e := range entries { + replEntries[i] = ReplicatedEntry{ + EntryID: lib.IDToHex(int64(e.EntryID)), + Type: e.Type, + Title: e.Title, + TitleIdx: string(e.TitleIdx), + Data: e.Data, + DataLevel: e.DataLevel, + Scopes: e.Scopes, + CreatedAt: e.CreatedAt, + UpdatedAt: e.UpdatedAt, + Version: e.Version, + } + } + + // TODO: Replicate to all peers + // TODO: POST to each peer's /api/replication/apply + // TODO: On success for all, mark all replicated + // TODO: On partial failure, mark only successfully replicated + + for _, peer := range w.config.Replication.Peers { + log.Printf("Replicating %d entries to %s (%s)", len(entries), peer.ID, peer.URL) + // client.replicateToPeer(ctx, peer.URL, l0, l1, replEntries) + } - log.Printf("Replicating %d entries to %s", len(entries), w.config.BackupPOP.URL) return nil } diff --git a/clavis/clavis-vault/lib/auth.go b/clavis/clavis-vault/lib/auth.go new file mode 100644 index 0000000..2165cb0 --- /dev/null +++ b/clavis/clavis-vault/lib/auth.go @@ -0,0 +1,42 @@ +package lib + +import ( + "fmt" +) + +// ValidateL0L1 validates that L0 and L1 are valid vault credentials. +// L0 is the 4-byte vault identifier (first 4 bytes of PRF). +// L1 is the 8-byte vault encryption key (bytes 4-11 of PRF). +// Returns the vault DB handle if valid, or error if invalid. +// The validation is done by attempting to open the vault DB with L1. +func ValidateL0L1(dataDir string, l0, l1 []byte) (*DB, error) { + // Validate lengths + if len(l0) != 4 { + return nil, fmt.Errorf("L0 must be 4 bytes, got %d", len(l0)) + } + if len(l1) != 8 { + return nil, fmt.Errorf("L1 must be 8 bytes, got %d", len(l1)) + } + + // Derive vault prefix from L0 + vaultPrefix := Base64URLEncode(l0) + dbPath := dataDir + "/clavitor-" + vaultPrefix + + // Open DB + db, err := OpenDB(dbPath) + if err != nil { + return nil, fmt.Errorf("cannot open vault: %w", err) + } + + // Validate L1 by attempting a simple operation + // Try to read an entry - this will fail if L1 is wrong + l1Key := NormalizeKey(l1) + _, err = EntryGet(db, l1Key, 0) // Entry 0 doesn't exist, but decryption will be attempted + // We expect "not found" error, not decryption error + if err != nil && err != ErrNotFound { + db.Close() + return nil, fmt.Errorf("L1 validation failed: %w", err) + } + + return db, nil +} diff --git a/clavis/clavis-vault/lib/base64.go b/clavis/clavis-vault/lib/base64.go new file mode 100644 index 0000000..73d53c3 --- /dev/null +++ b/clavis/clavis-vault/lib/base64.go @@ -0,0 +1,19 @@ +package lib + +import ( + "encoding/base64" +) + +// Base64URLEncode encodes bytes as base64url without padding. +func Base64URLEncode(b []byte) string { + return base64.RawURLEncoding.EncodeToString(b) +} + +// Base64URLDecode decodes base64url string (with or without padding). +func Base64URLDecode(s string) ([]byte, error) { + // Add padding if needed + if len(s)%4 != 0 { + s += string(make([]byte, 4-len(s)%4)) + } + return base64.RawURLEncoding.DecodeString(s) +}