clavitor/clavis/clavis-telemetry/main.go

437 lines
14 KiB
Go

//go:build commercial
// clavis-telemetry - POP telemetry ingestion service with mTLS
// Receives telemetry only from authenticated POPs with valid certificates
// Tarpits all unauthorized requests
//
// Log retention: Tarpit logs contain scanner IPs for security analysis.
// Rotate/delete per your organization's retention policy (recommended: 30 days).
package main
import (
"crypto/tls"
"crypto/x509"
"database/sql"
"encoding/json"
"fmt"
"log"
"net/http"
"os"
"strings"
"time"
_ "github.com/mattn/go-sqlite3"
)
var db *sql.DB
var processStartTime = time.Now().Unix()
var caPool *x509.CertPool
func main() {
// Hardcoded relative path per KISS principle - data directory relative to executable
// Use symlinks if different layout needed
dataDir := "./data"
dbPath := dataDir + "/operations.db"
caChainPath := dataDir + "/ca-chain.crt"
var err error
db, err = sql.Open("sqlite3", dbPath)
if err != nil {
log.Fatalf("ERR-TELEMETRY-001: Failed to open operations.db at %s - %v. Check permissions and disk space.", dbPath, err)
}
defer db.Close()
// Load CA chain for mTLS - mandatory, no fallback
if err := loadCA(caChainPath); err != nil {
log.Fatalf("ERR-TELEMETRY-002: Failed to load CA chain from %s - %v. Ensure CA certificate exists and is valid PEM.", caChainPath, err)
}
// Ensure telemetry table exists
ensureTables()
// Start Kuma push goroutine
go kumaPush()
// Create server with mTLS (mandatory)
tlsConfig := setupTLS()
server := &http.Server{
Addr: ":1986",
TLSConfig: tlsConfig,
Handler: http.HandlerFunc(routeHandler),
}
log.Printf("clavis-telemetry starting on port 1986")
log.Printf("Database: %s", dbPath)
log.Printf("mTLS: enabled")
log.Fatal(server.ListenAndServeTLS("", ""))
}
func loadCA(caChainPath string) error {
data, err := os.ReadFile(caChainPath)
if err != nil {
return err
}
caPool = x509.NewCertPool()
if !caPool.AppendCertsFromPEM(data) {
return fmt.Errorf("failed to parse CA chain")
}
return nil
}
func setupTLS() *tls.Config {
if caPool == nil {
return nil
}
return &tls.Config{
ClientCAs: caPool,
ClientAuth: tls.RequireAndVerifyClientCert,
MinVersion: tls.VersionTLS13,
}
}
func routeHandler(w http.ResponseWriter, r *http.Request) {
// Check if request has valid client certificate
if caPool != nil {
if len(r.TLS.PeerCertificates) == 0 {
tarpit(w, r)
return
}
// Extract POP identity from certificate CN
cert := r.TLS.PeerCertificates[0]
r.Header.Set("X-POP-ID", cert.Subject.CommonName)
}
switch r.URL.Path {
case "/telemetry":
handleTelemetry(w, r)
case "/health":
handleHealth(w, r)
default:
tarpit(w, r)
}
}
// tarpit wastes scanner resources - holds connection for 30s
func tarpit(w http.ResponseWriter, r *http.Request) {
realIP := r.RemoteAddr
if xff := r.Header.Get("X-Forwarded-For"); xff != "" {
parts := strings.SplitN(xff, ",", 2)
realIP = strings.TrimSpace(parts[0])
}
log.Printf("tarpit: %s %s from %s", r.Method, r.URL.Path, realIP)
w.Header().Set("Content-Type", "text/plain")
w.WriteHeader(200)
// Verify flusher is available - otherwise tarpit is ineffective
flusher, ok := w.(http.Flusher)
if !ok {
log.Printf("ERR-TELEMETRY-040: tarpit called with ResponseWriter that does not implement http.Flusher - aborting")
return
}
flusher.Flush()
// Drip one byte per second for 30 seconds
for i := 0; i < 30; i++ {
_, err := w.Write([]byte(" "))
if err != nil {
return // Client disconnected
}
// Flush has no return value per http.Flusher interface
// Write error above is the primary signal for client disconnect
flusher.Flush()
time.Sleep(time.Second)
}
}
func ensureTables() {
// Telemetry table
if _, err := db.Exec(`CREATE TABLE IF NOT EXISTS telemetry (
id INTEGER PRIMARY KEY AUTOINCREMENT,
node_id TEXT NOT NULL,
received_at INTEGER NOT NULL DEFAULT (strftime('%s','now')),
version TEXT NOT NULL DEFAULT '',
hostname TEXT NOT NULL DEFAULT '',
uptime_seconds INTEGER NOT NULL DEFAULT 0,
cpu_percent REAL NOT NULL DEFAULT 0,
memory_total_mb INTEGER NOT NULL DEFAULT 0,
memory_used_mb INTEGER NOT NULL DEFAULT 0,
disk_total_mb INTEGER NOT NULL DEFAULT 0,
disk_used_mb INTEGER NOT NULL DEFAULT 0,
load_1m REAL NOT NULL DEFAULT 0,
vault_count INTEGER NOT NULL DEFAULT 0,
vault_size_mb REAL NOT NULL DEFAULT 0,
vault_entries INTEGER NOT NULL DEFAULT 0,
mode TEXT NOT NULL DEFAULT ''
)`); err != nil {
log.Fatalf("ERR-TELEMETRY-005: Failed to create telemetry table - %v", err)
}
if _, err := db.Exec(`CREATE INDEX IF NOT EXISTS idx_telemetry_node_id ON telemetry(node_id)`); err != nil {
log.Fatalf("ERR-TELEMETRY-006: Failed to create telemetry node_id index - %v", err)
}
if _, err := db.Exec(`CREATE INDEX IF NOT EXISTS idx_telemetry_node_latest ON telemetry(node_id, id DESC)`); err != nil {
log.Fatalf("ERR-TELEMETRY-007: Failed to create telemetry node_latest index - %v", err)
}
// Uptime spans table
if _, err := db.Exec(`CREATE TABLE IF NOT EXISTS uptime_spans (
id INTEGER PRIMARY KEY AUTOINCREMENT,
node_id TEXT NOT NULL,
start_at INTEGER NOT NULL,
end_at INTEGER NOT NULL
)`); err != nil {
log.Fatalf("ERR-TELEMETRY-008: Failed to create uptime_spans table - %v", err)
}
if _, err := db.Exec(`CREATE INDEX IF NOT EXISTS idx_spans_node_end ON uptime_spans(node_id, end_at DESC)`); err != nil {
log.Fatalf("ERR-TELEMETRY-009: Failed to create uptime_spans index - %v", err)
}
// Maintenance table
if _, err := db.Exec(`CREATE TABLE IF NOT EXISTS maintenance (
id INTEGER PRIMARY KEY AUTOINCREMENT,
start_at INTEGER NOT NULL DEFAULT (strftime('%s','now')),
end_at INTEGER,
reason TEXT NOT NULL DEFAULT '',
started_by TEXT NOT NULL DEFAULT '',
ended_by TEXT NOT NULL DEFAULT ''
)`); err != nil {
log.Fatalf("ERR-TELEMETRY-010: Failed to create maintenance table - %v", err)
}
}
func handleHealth(w http.ResponseWriter, r *http.Request) {
// Check DB writable
var one int
err := db.QueryRow("SELECT 1").Scan(&one)
if err != nil {
http.Error(w, `{"status":"error","db":"unavailable"}`, 503)
return
}
// Check recent telemetry (any source)
var lastBeat int64
db.QueryRow(`SELECT MAX(received_at) FROM telemetry`).Scan(&lastBeat)
w.Header().Set("Content-Type", "application/json")
fmt.Fprintf(w, `{"status":"ok","db":"ok","last_telemetry":%d}`, lastBeat)
}
func handleTelemetry(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
w.WriteHeader(405)
return
}
// Verify mTLS certificate if enabled
var popID string
if caPool != nil {
if len(r.TLS.PeerCertificates) == 0 {
tarpit(w, r)
return
}
cert := r.TLS.PeerCertificates[0]
popID = cert.Subject.CommonName
// Verify certificate is valid and not expired
if _, err := cert.Verify(x509.VerifyOptions{Roots: caPool}); err != nil {
log.Printf("ERR-TELEMETRY-003: Invalid or expired certificate from %s - %v", popID, err)
tarpit(w, r)
return
}
}
var t struct {
NodeID string `json:"node_id"`
Version string `json:"version"`
Hostname string `json:"hostname"`
UptimeSeconds int64 `json:"uptime_seconds"`
CPUPercent float64 `json:"cpu_percent"`
MemTotalMB int64 `json:"memory_total_mb"`
MemUsedMB int64 `json:"memory_used_mb"`
DiskTotalMB int64 `json:"disk_total_mb"`
DiskUsedMB int64 `json:"disk_used_mb"`
Load1m float64 `json:"load_1m"`
VaultCount int `json:"vault_count"`
VaultSizeMB float64 `json:"vault_size_mb"`
VaultEntries int `json:"vault_entries"`
Mode string `json:"mode"`
System struct {
OS string `json:"os"`
Arch string `json:"arch"`
CPUs int `json:"cpus"`
CPUPercent float64 `json:"cpu_percent"`
MemTotalMB int64 `json:"memory_total_mb"`
MemUsedMB int64 `json:"memory_used_mb"`
DiskTotalMB int64 `json:"disk_total_mb"`
DiskUsedMB int64 `json:"disk_used_mb"`
Load1m float64 `json:"load_1m"`
} `json:"system"`
Vaults struct {
Count int `json:"count"`
TotalSizeMB int64 `json:"total_size_mb"`
TotalEntries int64 `json:"total_entries"`
} `json:"vaults"`
}
if err := json.NewDecoder(r.Body).Decode(&t); err != nil {
http.Error(w, `{"error":"bad payload"}`, 400)
return
}
// Use certificate CN as authoritative node_id if mTLS enabled
if popID != "" {
t.NodeID = popID
} else if t.NodeID == "" {
t.NodeID = t.Hostname
}
if t.NodeID == "" {
http.Error(w, `{"error":"missing node_id or hostname"}`, 400)
return
}
// Merge nested fields
if t.CPUPercent == 0 && t.System.CPUPercent != 0 {
t.CPUPercent = t.System.CPUPercent
}
if t.MemTotalMB == 0 {
t.MemTotalMB = t.System.MemTotalMB
}
if t.MemUsedMB == 0 {
t.MemUsedMB = t.System.MemUsedMB
}
if t.DiskTotalMB == 0 {
t.DiskTotalMB = t.System.DiskTotalMB
}
if t.DiskUsedMB == 0 {
t.DiskUsedMB = t.System.DiskUsedMB
}
if t.Load1m == 0 {
t.Load1m = t.System.Load1m
}
if t.VaultCount == 0 {
t.VaultCount = int(t.Vaults.Count)
}
if t.VaultSizeMB == 0 {
t.VaultSizeMB = float64(t.Vaults.TotalSizeMB)
}
if t.VaultEntries == 0 {
t.VaultEntries = int(t.Vaults.TotalEntries)
}
// Insert telemetry
if _, err := db.Exec(`INSERT INTO telemetry (node_id, version, hostname, uptime_seconds, cpu_percent, memory_total_mb, memory_used_mb, disk_total_mb, disk_used_mb, load_1m, vault_count, vault_size_mb, vault_entries, mode) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)`,
t.NodeID, t.Version, t.Hostname, t.UptimeSeconds, t.CPUPercent, t.MemTotalMB, t.MemUsedMB, t.DiskTotalMB, t.DiskUsedMB, t.Load1m, t.VaultCount, t.VaultSizeMB, t.VaultEntries, t.Mode); err != nil {
log.Printf("ERR-TELEMETRY-004: Failed to insert telemetry for node=%s - %v", t.NodeID, err)
http.Error(w, `{"error":"database error"}`, 500)
return
}
// Uptime span tracking
updateSpan(t.NodeID, t.Hostname, t.Version, t.CPUPercent, t.MemUsedMB, t.MemTotalMB, t.DiskUsedMB, t.DiskTotalMB, t.Load1m, t.UptimeSeconds)
w.WriteHeader(200)
}
func updateSpan(nodeID, hostname, version string, cpuPercent float64, memUsedMB, memTotalMB, diskUsedMB, diskTotalMB int64, load1m float64, uptimeSeconds int64) {
now := time.Now().Unix()
serverAge := now - processStartTime
var inMaint bool
if err := db.QueryRow(`SELECT COUNT(*) > 0 FROM maintenance WHERE end_at IS NULL`).Scan(&inMaint); err != nil {
log.Printf("ERR-TELEMETRY-010: Failed to check maintenance mode - %v", err)
// Continue with inMaint=false as safe default
inMaint = false
}
var spanID int64
var spanEnd int64
err := db.QueryRow(`SELECT id, end_at FROM uptime_spans WHERE node_id = ? ORDER BY end_at DESC LIMIT 1`, nodeID).Scan(&spanID, &spanEnd)
if err != nil && err != sql.ErrNoRows {
log.Printf("ERR-TELEMETRY-011: Failed to query latest uptime span for node=%s - %v", nodeID, err)
}
if err == nil && (inMaint || (now-spanEnd) <= 60) {
if _, execErr := db.Exec(`UPDATE uptime_spans SET end_at = ? WHERE id = ?`, now, spanID); execErr != nil {
log.Printf("ERR-TELEMETRY-012: Failed to extend uptime span id=%d for node=%s - %v", spanID, nodeID, execErr)
}
} else if err == nil && serverAge < 60 {
log.Printf("SPAN EXTEND node=%s gap=%ds (server up %ds, too early to judge)", nodeID, now-spanEnd, serverAge)
if _, execErr := db.Exec(`UPDATE uptime_spans SET end_at = ? WHERE id = ?`, now, spanID); execErr != nil {
log.Printf("ERR-TELEMETRY-013: Failed to extend early-judgment span id=%d for node=%s - %v", spanID, nodeID, execErr)
}
} else if !inMaint {
gapSeconds := now - spanEnd
if err == nil {
log.Printf("OUTAGE SPAN node=%s gap=%ds last_seen=%s resumed=%s prev_span_id=%d hostname=%s version=%s cpu=%.1f%% mem=%d/%dMB disk=%d/%dMB load=%.2f uptime=%ds",
nodeID, gapSeconds,
time.Unix(spanEnd, 0).UTC().Format(time.RFC3339),
time.Unix(now, 0).UTC().Format(time.RFC3339),
spanID, hostname, version,
cpuPercent, memUsedMB, memTotalMB,
diskUsedMB, diskTotalMB, load1m, uptimeSeconds)
} else {
log.Printf("OUTAGE SPAN node=%s first_span=true hostname=%s version=%s",
nodeID, hostname, version)
}
if _, execErr := db.Exec(`INSERT INTO uptime_spans (node_id, start_at, end_at) VALUES (?, ?, ?)`, nodeID, now, now); execErr != nil {
log.Printf("ERR-TELEMETRY-014: Failed to insert new uptime span for node=%s - %v", nodeID, execErr)
return // Don't alert if we couldn't record the span
}
go alertOutage(nodeID, hostname, gapSeconds, err != nil)
}
}
func alertOutage(nodeID, hostname string, gap int64, firstSpan bool) {
ntfyURL := os.Getenv("NTFY_ALERT_URL")
ntfyToken := os.Getenv("NTFY_ALERT_TOKEN")
if ntfyURL == "" || ntfyToken == "" {
// Alerting disabled - log only
if firstSpan {
log.Printf("OUTAGE SPAN node=%s first_span=true (alerting disabled)", nodeID)
} else {
log.Printf("OUTAGE SPAN node=%s gap=%ds (alerting disabled)", nodeID, gap)
}
return
}
title := fmt.Sprintf("Outage recovery: %s", nodeID)
body := fmt.Sprintf("Node **%s** (%s) created new span after **%ds** gap", nodeID, hostname, gap)
if firstSpan {
title = fmt.Sprintf("New node online: %s", nodeID)
body = fmt.Sprintf("Node **%s** (%s) first heartbeat - new span created", nodeID, hostname)
}
req, err := http.NewRequest("POST", ntfyURL, strings.NewReader(body))
if err != nil {
log.Printf("ERR-TELEMETRY-020: Failed to create ntfy alert request for node=%s - %v", nodeID, err)
return
}
req.Header.Set("Authorization", "Bearer "+ntfyToken)
req.Header.Set("Title", title)
req.Header.Set("Markdown", "yes")
req.Header.Set("Priority", "high")
req.Header.Set("Tags", "warning")
client := &http.Client{Timeout: 10 * time.Second}
resp, err := client.Do(req)
if err != nil {
log.Printf("ERR-TELEMETRY-021: Failed to send ntfy alert for node=%s to %s - %v", nodeID, ntfyURL, err)
return
}
if err := resp.Body.Close(); err != nil {
log.Printf("ERR-TELEMETRY-022: Failed to close ntfy response body for node=%s - %v", nodeID, err)
}
log.Printf("OUTAGE SPAN ntfy alert sent for node=%s", nodeID)
}