437 lines
14 KiB
Go
437 lines
14 KiB
Go
//go:build commercial
|
|
|
|
// clavis-telemetry - POP telemetry ingestion service with mTLS
|
|
// Receives telemetry only from authenticated POPs with valid certificates
|
|
// Tarpits all unauthorized requests
|
|
//
|
|
// Log retention: Tarpit logs contain scanner IPs for security analysis.
|
|
// Rotate/delete per your organization's retention policy (recommended: 30 days).
|
|
|
|
package main
|
|
|
|
import (
|
|
"crypto/tls"
|
|
"crypto/x509"
|
|
"database/sql"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log"
|
|
"net/http"
|
|
"os"
|
|
"strings"
|
|
"time"
|
|
|
|
_ "github.com/mattn/go-sqlite3"
|
|
)
|
|
|
|
var db *sql.DB
|
|
var processStartTime = time.Now().Unix()
|
|
var caPool *x509.CertPool
|
|
|
|
func main() {
|
|
// Hardcoded relative path per KISS principle - data directory relative to executable
|
|
// Use symlinks if different layout needed
|
|
dataDir := "./data"
|
|
dbPath := dataDir + "/operations.db"
|
|
caChainPath := dataDir + "/ca-chain.crt"
|
|
|
|
var err error
|
|
db, err = sql.Open("sqlite3", dbPath)
|
|
if err != nil {
|
|
log.Fatalf("ERR-TELEMETRY-001: Failed to open operations.db at %s - %v. Check permissions and disk space.", dbPath, err)
|
|
}
|
|
defer db.Close()
|
|
|
|
// Load CA chain for mTLS - mandatory, no fallback
|
|
if err := loadCA(caChainPath); err != nil {
|
|
log.Fatalf("ERR-TELEMETRY-002: Failed to load CA chain from %s - %v. Ensure CA certificate exists and is valid PEM.", caChainPath, err)
|
|
}
|
|
|
|
// Ensure telemetry table exists
|
|
ensureTables()
|
|
|
|
// Start Kuma push goroutine
|
|
go kumaPush()
|
|
|
|
// Create server with mTLS (mandatory)
|
|
tlsConfig := setupTLS()
|
|
|
|
server := &http.Server{
|
|
Addr: ":1986",
|
|
TLSConfig: tlsConfig,
|
|
Handler: http.HandlerFunc(routeHandler),
|
|
}
|
|
|
|
log.Printf("clavis-telemetry starting on port 1986")
|
|
log.Printf("Database: %s", dbPath)
|
|
log.Printf("mTLS: enabled")
|
|
|
|
log.Fatal(server.ListenAndServeTLS("", ""))
|
|
}
|
|
|
|
func loadCA(caChainPath string) error {
|
|
data, err := os.ReadFile(caChainPath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
caPool = x509.NewCertPool()
|
|
if !caPool.AppendCertsFromPEM(data) {
|
|
return fmt.Errorf("failed to parse CA chain")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func setupTLS() *tls.Config {
|
|
if caPool == nil {
|
|
return nil
|
|
}
|
|
|
|
return &tls.Config{
|
|
ClientCAs: caPool,
|
|
ClientAuth: tls.RequireAndVerifyClientCert,
|
|
MinVersion: tls.VersionTLS13,
|
|
}
|
|
}
|
|
|
|
func routeHandler(w http.ResponseWriter, r *http.Request) {
|
|
// Check if request has valid client certificate
|
|
if caPool != nil {
|
|
if len(r.TLS.PeerCertificates) == 0 {
|
|
tarpit(w, r)
|
|
return
|
|
}
|
|
|
|
// Extract POP identity from certificate CN
|
|
cert := r.TLS.PeerCertificates[0]
|
|
r.Header.Set("X-POP-ID", cert.Subject.CommonName)
|
|
}
|
|
|
|
switch r.URL.Path {
|
|
case "/telemetry":
|
|
handleTelemetry(w, r)
|
|
case "/health":
|
|
handleHealth(w, r)
|
|
default:
|
|
tarpit(w, r)
|
|
}
|
|
}
|
|
|
|
// tarpit wastes scanner resources - holds connection for 30s
|
|
func tarpit(w http.ResponseWriter, r *http.Request) {
|
|
realIP := r.RemoteAddr
|
|
if xff := r.Header.Get("X-Forwarded-For"); xff != "" {
|
|
parts := strings.SplitN(xff, ",", 2)
|
|
realIP = strings.TrimSpace(parts[0])
|
|
}
|
|
|
|
log.Printf("tarpit: %s %s from %s", r.Method, r.URL.Path, realIP)
|
|
|
|
w.Header().Set("Content-Type", "text/plain")
|
|
w.WriteHeader(200)
|
|
|
|
// Verify flusher is available - otherwise tarpit is ineffective
|
|
flusher, ok := w.(http.Flusher)
|
|
if !ok {
|
|
log.Printf("ERR-TELEMETRY-040: tarpit called with ResponseWriter that does not implement http.Flusher - aborting")
|
|
return
|
|
}
|
|
flusher.Flush()
|
|
|
|
// Drip one byte per second for 30 seconds
|
|
for i := 0; i < 30; i++ {
|
|
_, err := w.Write([]byte(" "))
|
|
if err != nil {
|
|
return // Client disconnected
|
|
}
|
|
// Flush has no return value per http.Flusher interface
|
|
// Write error above is the primary signal for client disconnect
|
|
flusher.Flush()
|
|
time.Sleep(time.Second)
|
|
}
|
|
}
|
|
|
|
func ensureTables() {
|
|
// Telemetry table
|
|
if _, err := db.Exec(`CREATE TABLE IF NOT EXISTS telemetry (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
node_id TEXT NOT NULL,
|
|
received_at INTEGER NOT NULL DEFAULT (strftime('%s','now')),
|
|
version TEXT NOT NULL DEFAULT '',
|
|
hostname TEXT NOT NULL DEFAULT '',
|
|
uptime_seconds INTEGER NOT NULL DEFAULT 0,
|
|
cpu_percent REAL NOT NULL DEFAULT 0,
|
|
memory_total_mb INTEGER NOT NULL DEFAULT 0,
|
|
memory_used_mb INTEGER NOT NULL DEFAULT 0,
|
|
disk_total_mb INTEGER NOT NULL DEFAULT 0,
|
|
disk_used_mb INTEGER NOT NULL DEFAULT 0,
|
|
load_1m REAL NOT NULL DEFAULT 0,
|
|
vault_count INTEGER NOT NULL DEFAULT 0,
|
|
vault_size_mb REAL NOT NULL DEFAULT 0,
|
|
vault_entries INTEGER NOT NULL DEFAULT 0,
|
|
mode TEXT NOT NULL DEFAULT ''
|
|
)`); err != nil {
|
|
log.Fatalf("ERR-TELEMETRY-005: Failed to create telemetry table - %v", err)
|
|
}
|
|
if _, err := db.Exec(`CREATE INDEX IF NOT EXISTS idx_telemetry_node_id ON telemetry(node_id)`); err != nil {
|
|
log.Fatalf("ERR-TELEMETRY-006: Failed to create telemetry node_id index - %v", err)
|
|
}
|
|
if _, err := db.Exec(`CREATE INDEX IF NOT EXISTS idx_telemetry_node_latest ON telemetry(node_id, id DESC)`); err != nil {
|
|
log.Fatalf("ERR-TELEMETRY-007: Failed to create telemetry node_latest index - %v", err)
|
|
}
|
|
|
|
// Uptime spans table
|
|
if _, err := db.Exec(`CREATE TABLE IF NOT EXISTS uptime_spans (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
node_id TEXT NOT NULL,
|
|
start_at INTEGER NOT NULL,
|
|
end_at INTEGER NOT NULL
|
|
)`); err != nil {
|
|
log.Fatalf("ERR-TELEMETRY-008: Failed to create uptime_spans table - %v", err)
|
|
}
|
|
if _, err := db.Exec(`CREATE INDEX IF NOT EXISTS idx_spans_node_end ON uptime_spans(node_id, end_at DESC)`); err != nil {
|
|
log.Fatalf("ERR-TELEMETRY-009: Failed to create uptime_spans index - %v", err)
|
|
}
|
|
|
|
// Maintenance table
|
|
if _, err := db.Exec(`CREATE TABLE IF NOT EXISTS maintenance (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
start_at INTEGER NOT NULL DEFAULT (strftime('%s','now')),
|
|
end_at INTEGER,
|
|
reason TEXT NOT NULL DEFAULT '',
|
|
started_by TEXT NOT NULL DEFAULT '',
|
|
ended_by TEXT NOT NULL DEFAULT ''
|
|
)`); err != nil {
|
|
log.Fatalf("ERR-TELEMETRY-010: Failed to create maintenance table - %v", err)
|
|
}
|
|
}
|
|
|
|
func handleHealth(w http.ResponseWriter, r *http.Request) {
|
|
// Check DB writable
|
|
var one int
|
|
err := db.QueryRow("SELECT 1").Scan(&one)
|
|
if err != nil {
|
|
http.Error(w, `{"status":"error","db":"unavailable"}`, 503)
|
|
return
|
|
}
|
|
|
|
// Check recent telemetry (any source)
|
|
var lastBeat int64
|
|
db.QueryRow(`SELECT MAX(received_at) FROM telemetry`).Scan(&lastBeat)
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
fmt.Fprintf(w, `{"status":"ok","db":"ok","last_telemetry":%d}`, lastBeat)
|
|
}
|
|
|
|
func handleTelemetry(w http.ResponseWriter, r *http.Request) {
|
|
if r.Method != "POST" {
|
|
w.WriteHeader(405)
|
|
return
|
|
}
|
|
|
|
// Verify mTLS certificate if enabled
|
|
var popID string
|
|
if caPool != nil {
|
|
if len(r.TLS.PeerCertificates) == 0 {
|
|
tarpit(w, r)
|
|
return
|
|
}
|
|
cert := r.TLS.PeerCertificates[0]
|
|
popID = cert.Subject.CommonName
|
|
|
|
// Verify certificate is valid and not expired
|
|
if _, err := cert.Verify(x509.VerifyOptions{Roots: caPool}); err != nil {
|
|
log.Printf("ERR-TELEMETRY-003: Invalid or expired certificate from %s - %v", popID, err)
|
|
tarpit(w, r)
|
|
return
|
|
}
|
|
}
|
|
|
|
var t struct {
|
|
NodeID string `json:"node_id"`
|
|
Version string `json:"version"`
|
|
Hostname string `json:"hostname"`
|
|
UptimeSeconds int64 `json:"uptime_seconds"`
|
|
CPUPercent float64 `json:"cpu_percent"`
|
|
MemTotalMB int64 `json:"memory_total_mb"`
|
|
MemUsedMB int64 `json:"memory_used_mb"`
|
|
DiskTotalMB int64 `json:"disk_total_mb"`
|
|
DiskUsedMB int64 `json:"disk_used_mb"`
|
|
Load1m float64 `json:"load_1m"`
|
|
VaultCount int `json:"vault_count"`
|
|
VaultSizeMB float64 `json:"vault_size_mb"`
|
|
VaultEntries int `json:"vault_entries"`
|
|
Mode string `json:"mode"`
|
|
System struct {
|
|
OS string `json:"os"`
|
|
Arch string `json:"arch"`
|
|
CPUs int `json:"cpus"`
|
|
CPUPercent float64 `json:"cpu_percent"`
|
|
MemTotalMB int64 `json:"memory_total_mb"`
|
|
MemUsedMB int64 `json:"memory_used_mb"`
|
|
DiskTotalMB int64 `json:"disk_total_mb"`
|
|
DiskUsedMB int64 `json:"disk_used_mb"`
|
|
Load1m float64 `json:"load_1m"`
|
|
} `json:"system"`
|
|
Vaults struct {
|
|
Count int `json:"count"`
|
|
TotalSizeMB int64 `json:"total_size_mb"`
|
|
TotalEntries int64 `json:"total_entries"`
|
|
} `json:"vaults"`
|
|
}
|
|
|
|
if err := json.NewDecoder(r.Body).Decode(&t); err != nil {
|
|
http.Error(w, `{"error":"bad payload"}`, 400)
|
|
return
|
|
}
|
|
|
|
// Use certificate CN as authoritative node_id if mTLS enabled
|
|
if popID != "" {
|
|
t.NodeID = popID
|
|
} else if t.NodeID == "" {
|
|
t.NodeID = t.Hostname
|
|
}
|
|
|
|
if t.NodeID == "" {
|
|
http.Error(w, `{"error":"missing node_id or hostname"}`, 400)
|
|
return
|
|
}
|
|
|
|
// Merge nested fields
|
|
if t.CPUPercent == 0 && t.System.CPUPercent != 0 {
|
|
t.CPUPercent = t.System.CPUPercent
|
|
}
|
|
if t.MemTotalMB == 0 {
|
|
t.MemTotalMB = t.System.MemTotalMB
|
|
}
|
|
if t.MemUsedMB == 0 {
|
|
t.MemUsedMB = t.System.MemUsedMB
|
|
}
|
|
if t.DiskTotalMB == 0 {
|
|
t.DiskTotalMB = t.System.DiskTotalMB
|
|
}
|
|
if t.DiskUsedMB == 0 {
|
|
t.DiskUsedMB = t.System.DiskUsedMB
|
|
}
|
|
if t.Load1m == 0 {
|
|
t.Load1m = t.System.Load1m
|
|
}
|
|
if t.VaultCount == 0 {
|
|
t.VaultCount = int(t.Vaults.Count)
|
|
}
|
|
if t.VaultSizeMB == 0 {
|
|
t.VaultSizeMB = float64(t.Vaults.TotalSizeMB)
|
|
}
|
|
if t.VaultEntries == 0 {
|
|
t.VaultEntries = int(t.Vaults.TotalEntries)
|
|
}
|
|
|
|
// Insert telemetry
|
|
if _, err := db.Exec(`INSERT INTO telemetry (node_id, version, hostname, uptime_seconds, cpu_percent, memory_total_mb, memory_used_mb, disk_total_mb, disk_used_mb, load_1m, vault_count, vault_size_mb, vault_entries, mode) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)`,
|
|
t.NodeID, t.Version, t.Hostname, t.UptimeSeconds, t.CPUPercent, t.MemTotalMB, t.MemUsedMB, t.DiskTotalMB, t.DiskUsedMB, t.Load1m, t.VaultCount, t.VaultSizeMB, t.VaultEntries, t.Mode); err != nil {
|
|
log.Printf("ERR-TELEMETRY-004: Failed to insert telemetry for node=%s - %v", t.NodeID, err)
|
|
http.Error(w, `{"error":"database error"}`, 500)
|
|
return
|
|
}
|
|
|
|
// Uptime span tracking
|
|
updateSpan(t.NodeID, t.Hostname, t.Version, t.CPUPercent, t.MemUsedMB, t.MemTotalMB, t.DiskUsedMB, t.DiskTotalMB, t.Load1m, t.UptimeSeconds)
|
|
|
|
w.WriteHeader(200)
|
|
}
|
|
|
|
func updateSpan(nodeID, hostname, version string, cpuPercent float64, memUsedMB, memTotalMB, diskUsedMB, diskTotalMB int64, load1m float64, uptimeSeconds int64) {
|
|
now := time.Now().Unix()
|
|
serverAge := now - processStartTime
|
|
|
|
var inMaint bool
|
|
if err := db.QueryRow(`SELECT COUNT(*) > 0 FROM maintenance WHERE end_at IS NULL`).Scan(&inMaint); err != nil {
|
|
log.Printf("ERR-TELEMETRY-010: Failed to check maintenance mode - %v", err)
|
|
// Continue with inMaint=false as safe default
|
|
inMaint = false
|
|
}
|
|
|
|
var spanID int64
|
|
var spanEnd int64
|
|
err := db.QueryRow(`SELECT id, end_at FROM uptime_spans WHERE node_id = ? ORDER BY end_at DESC LIMIT 1`, nodeID).Scan(&spanID, &spanEnd)
|
|
if err != nil && err != sql.ErrNoRows {
|
|
log.Printf("ERR-TELEMETRY-011: Failed to query latest uptime span for node=%s - %v", nodeID, err)
|
|
}
|
|
|
|
if err == nil && (inMaint || (now-spanEnd) <= 60) {
|
|
if _, execErr := db.Exec(`UPDATE uptime_spans SET end_at = ? WHERE id = ?`, now, spanID); execErr != nil {
|
|
log.Printf("ERR-TELEMETRY-012: Failed to extend uptime span id=%d for node=%s - %v", spanID, nodeID, execErr)
|
|
}
|
|
} else if err == nil && serverAge < 60 {
|
|
log.Printf("SPAN EXTEND node=%s gap=%ds (server up %ds, too early to judge)", nodeID, now-spanEnd, serverAge)
|
|
if _, execErr := db.Exec(`UPDATE uptime_spans SET end_at = ? WHERE id = ?`, now, spanID); execErr != nil {
|
|
log.Printf("ERR-TELEMETRY-013: Failed to extend early-judgment span id=%d for node=%s - %v", spanID, nodeID, execErr)
|
|
}
|
|
} else if !inMaint {
|
|
gapSeconds := now - spanEnd
|
|
if err == nil {
|
|
log.Printf("OUTAGE SPAN node=%s gap=%ds last_seen=%s resumed=%s prev_span_id=%d hostname=%s version=%s cpu=%.1f%% mem=%d/%dMB disk=%d/%dMB load=%.2f uptime=%ds",
|
|
nodeID, gapSeconds,
|
|
time.Unix(spanEnd, 0).UTC().Format(time.RFC3339),
|
|
time.Unix(now, 0).UTC().Format(time.RFC3339),
|
|
spanID, hostname, version,
|
|
cpuPercent, memUsedMB, memTotalMB,
|
|
diskUsedMB, diskTotalMB, load1m, uptimeSeconds)
|
|
} else {
|
|
log.Printf("OUTAGE SPAN node=%s first_span=true hostname=%s version=%s",
|
|
nodeID, hostname, version)
|
|
}
|
|
|
|
if _, execErr := db.Exec(`INSERT INTO uptime_spans (node_id, start_at, end_at) VALUES (?, ?, ?)`, nodeID, now, now); execErr != nil {
|
|
log.Printf("ERR-TELEMETRY-014: Failed to insert new uptime span for node=%s - %v", nodeID, execErr)
|
|
return // Don't alert if we couldn't record the span
|
|
}
|
|
|
|
go alertOutage(nodeID, hostname, gapSeconds, err != nil)
|
|
}
|
|
}
|
|
|
|
func alertOutage(nodeID, hostname string, gap int64, firstSpan bool) {
|
|
ntfyURL := os.Getenv("NTFY_ALERT_URL")
|
|
ntfyToken := os.Getenv("NTFY_ALERT_TOKEN")
|
|
|
|
if ntfyURL == "" || ntfyToken == "" {
|
|
// Alerting disabled - log only
|
|
if firstSpan {
|
|
log.Printf("OUTAGE SPAN node=%s first_span=true (alerting disabled)", nodeID)
|
|
} else {
|
|
log.Printf("OUTAGE SPAN node=%s gap=%ds (alerting disabled)", nodeID, gap)
|
|
}
|
|
return
|
|
}
|
|
|
|
title := fmt.Sprintf("Outage recovery: %s", nodeID)
|
|
body := fmt.Sprintf("Node **%s** (%s) created new span after **%ds** gap", nodeID, hostname, gap)
|
|
if firstSpan {
|
|
title = fmt.Sprintf("New node online: %s", nodeID)
|
|
body = fmt.Sprintf("Node **%s** (%s) first heartbeat - new span created", nodeID, hostname)
|
|
}
|
|
|
|
req, err := http.NewRequest("POST", ntfyURL, strings.NewReader(body))
|
|
if err != nil {
|
|
log.Printf("ERR-TELEMETRY-020: Failed to create ntfy alert request for node=%s - %v", nodeID, err)
|
|
return
|
|
}
|
|
req.Header.Set("Authorization", "Bearer "+ntfyToken)
|
|
req.Header.Set("Title", title)
|
|
req.Header.Set("Markdown", "yes")
|
|
req.Header.Set("Priority", "high")
|
|
req.Header.Set("Tags", "warning")
|
|
|
|
client := &http.Client{Timeout: 10 * time.Second}
|
|
resp, err := client.Do(req)
|
|
if err != nil {
|
|
log.Printf("ERR-TELEMETRY-021: Failed to send ntfy alert for node=%s to %s - %v", nodeID, ntfyURL, err)
|
|
return
|
|
}
|
|
if err := resp.Body.Close(); err != nil {
|
|
log.Printf("ERR-TELEMETRY-022: Failed to close ntfy response body for node=%s - %v", nodeID, err)
|
|
}
|
|
log.Printf("OUTAGE SPAN ntfy alert sent for node=%s", nodeID)
|
|
}
|