clavitor/clavis/clavis-vault/api/replication.go

144 lines
3.9 KiB
Go

//go:build commercial
// Package api - Commercial replication handler.
// This file is built ONLY when the "commercial" build tag is specified.
package api
import (
"encoding/json"
"net/http"
"time"
"github.com/johanj/clavitor/edition"
"github.com/johanj/clavitor/lib"
)
// HandleReplicationApply is the HTTP handler for incoming replication.
// Validates vault credentials (L0/L1) before accepting.
func (h *Handlers) HandleReplicationApply(w http.ResponseWriter, r *http.Request) {
// Only accept POST
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
// Extract vault credentials from headers
l0Hdr := r.Header.Get("X-Clavitor-L0")
l1Hdr := r.Header.Get("X-Clavitor-L1")
if l0Hdr == "" || l1Hdr == "" {
http.Error(w, "Missing auth headers", http.StatusUnauthorized)
return
}
l0, err := lib.Base64URLDecode(l0Hdr)
if err != nil {
http.Error(w, "Invalid L0", http.StatusBadRequest)
return
}
l1, err := lib.Base64URLDecode(l1Hdr)
if err != nil {
http.Error(w, "Invalid L1", http.StatusBadRequest)
return
}
// Validate L0/L1 and get DB
db, err := lib.ValidateL0L1(h.Cfg.DataDir, l0, l1)
if err != nil {
// Alert on auth failure
sourcePOP := r.Header.Get("X-Clavitor-POP")
edition.Current.AlertOperator(r.Context(), "replication_auth_failed",
"Invalid L0/L1 in replication request", map[string]any{
"source_ip": r.RemoteAddr,
"source_pop": sourcePOP,
"error": err.Error(),
})
http.Error(w, "Invalid credentials", http.StatusUnauthorized)
return
}
defer db.Close()
// Decode request
var req edition.ReplicationRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "Invalid JSON", http.StatusBadRequest)
return
}
// Validate timestamp (anti-replay: 5 minute window)
now := time.Now().UnixMilli()
if req.Timestamp < now-5*60*1000 || req.Timestamp > now+5*60*1000 {
http.Error(w, "Request expired or future dated", http.StatusBadRequest)
return
}
// Apply entries to local DB
accepted, rejected := h.applyReplicationEntries(db, req.Entries)
// Audit alert (only unusual activity)
if len(rejected) > 0 {
edition.Current.AlertOperator(r.Context(), "replication_received",
"Replication batch received with rejections", map[string]any{
"source_pop": req.SourcePOP,
"accepted": len(accepted),
"rejected": len(rejected),
})
}
// Respond
resp := edition.ReplicationResponse{
Accepted: accepted,
Rejected: rejected,
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(resp)
}
// applyReplicationEntries writes replicated entries to local DB.
func (h *Handlers) applyReplicationEntries(db *lib.DB, entries []edition.ReplicatedEntry) (accepted, rejected []string) {
for _, e := range entries {
// Convert EntryID from hex string to int64
entryID, err := lib.HexToID(e.EntryID)
if err != nil {
rejected = append(rejected, e.EntryID)
continue
}
// Convert ReplicatedEntry to Entry
entry := lib.Entry{
EntryID: lib.HexID(entryID),
Type: e.Type,
Title: e.Title,
TitleIdx: []byte(e.TitleIdx),
Data: e.Data,
DataLevel: e.DataLevel,
Scopes: e.Scopes,
CreatedAt: e.CreatedAt,
UpdatedAt: e.UpdatedAt,
Version: e.Version,
}
// Upsert (insert or update if newer version)
existing, _ := lib.EntryGet(db, nil, int64(entry.EntryID))
if existing == nil {
// Insert new
if err := lib.EntryCreate(db, nil, &entry); err != nil {
rejected = append(rejected, e.EntryID)
continue
}
} else if entry.Version > existing.Version {
// Update if newer
if err := lib.EntryUpdate(db, nil, &entry); err != nil {
rejected = append(rejected, e.EntryID)
continue
}
} else {
// Same or older version - skip (not an error)
}
// Mark as replicated (we received it)
lib.EntryMarkReplicated(db, int64(entry.EntryID))
accepted = append(accepted, e.EntryID)
}
return
}