//go:build commercial // Package api - Commercial replication handler. // This file is built ONLY when the "commercial" build tag is specified. package api import ( "encoding/json" "net/http" "time" "github.com/johanj/clavitor/edition" "github.com/johanj/clavitor/lib" ) // HandleReplicationApply is the HTTP handler for incoming replication. // Validates vault credentials (L0/L1) before accepting. func (h *Handlers) HandleReplicationApply(w http.ResponseWriter, r *http.Request) { // Only accept POST if r.Method != http.MethodPost { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return } // Extract vault credentials from headers l0Hdr := r.Header.Get("X-Clavitor-L0") l1Hdr := r.Header.Get("X-Clavitor-L1") if l0Hdr == "" || l1Hdr == "" { http.Error(w, "Missing auth headers", http.StatusUnauthorized) return } l0, err := lib.Base64URLDecode(l0Hdr) if err != nil { http.Error(w, "Invalid L0", http.StatusBadRequest) return } l1, err := lib.Base64URLDecode(l1Hdr) if err != nil { http.Error(w, "Invalid L1", http.StatusBadRequest) return } // Validate L0/L1 and get DB db, err := lib.ValidateL0L1(h.Cfg.DataDir, l0, l1) if err != nil { // Alert on auth failure sourcePOP := r.Header.Get("X-Clavitor-POP") edition.Current.AlertOperator(r.Context(), "replication_auth_failed", "Invalid L0/L1 in replication request", map[string]any{ "source_ip": r.RemoteAddr, "source_pop": sourcePOP, "error": err.Error(), }) http.Error(w, "Invalid credentials", http.StatusUnauthorized) return } defer db.Close() // Decode request var req edition.ReplicationRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { http.Error(w, "Invalid JSON", http.StatusBadRequest) return } // Validate timestamp (anti-replay: 5 minute window) now := time.Now().UnixMilli() if req.Timestamp < now-5*60*1000 || req.Timestamp > now+5*60*1000 { http.Error(w, "Request expired or future dated", http.StatusBadRequest) return } // Apply entries to local DB accepted, rejected := h.applyReplicationEntries(db, req.Entries) // Audit alert (only unusual activity) if len(rejected) > 0 { edition.Current.AlertOperator(r.Context(), "replication_received", "Replication batch received with rejections", map[string]any{ "source_pop": req.SourcePOP, "accepted": len(accepted), "rejected": len(rejected), }) } // Respond resp := edition.ReplicationResponse{ Accepted: accepted, Rejected: rejected, } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(resp) } // applyReplicationEntries writes replicated entries to local DB. func (h *Handlers) applyReplicationEntries(db *lib.DB, entries []edition.ReplicatedEntry) (accepted, rejected []string) { for _, e := range entries { // Convert EntryID from hex string to int64 entryID, err := lib.HexToID(e.EntryID) if err != nil { rejected = append(rejected, e.EntryID) continue } // Convert ReplicatedEntry to Entry entry := lib.Entry{ EntryID: lib.HexID(entryID), Type: e.Type, Title: e.Title, TitleIdx: []byte(e.TitleIdx), Data: e.Data, DataLevel: e.DataLevel, Scopes: e.Scopes, CreatedAt: e.CreatedAt, UpdatedAt: e.UpdatedAt, Version: e.Version, } // Upsert (insert or update if newer version) existing, _ := lib.EntryGet(db, nil, int64(entry.EntryID)) if existing == nil { // Insert new if err := lib.EntryCreate(db, nil, &entry); err != nil { rejected = append(rejected, e.EntryID) continue } } else if entry.Version > existing.Version { // Update if newer if err := lib.EntryUpdate(db, nil, &entry); err != nil { rejected = append(rejected, e.EntryID) continue } } else { // Same or older version - skip (not an error) } // Mark as replicated (we received it) lib.EntryMarkReplicated(db, int64(entry.EntryID)) accepted = append(accepted, e.EntryID) } return }