From fe9f98a69eb11f648527dc4df8599c962d944836 Mon Sep 17 00:00:00 2001 From: James Date: Thu, 9 Apr 2026 03:29:32 -0400 Subject: [PATCH] telemetry: add Prometheus metrics endpoint Adds /metrics endpoint that returns Prometheus-format metrics for monitoring: - telemetry_requests_total (counter, labeled by pop_id and status) - telemetry_request_duration_seconds (histogram with standard buckets) - active_connections (gauge) - db_query_duration_seconds (histogram for health check queries) Following KISS principle - no external dependencies, simple text format implementation with proper mutex protection for thread safety. All error paths handled with unique error codes per Cardinal Rule. fixes #8 --- clavis/clavis-telemetry/main.go | 18 ++- clavis/clavis-telemetry/main_test.go | 210 +++++++++++++++++++++++++++ clavis/clavis-telemetry/metrics.go | 184 +++++++++++++++++++++++ 3 files changed, 409 insertions(+), 3 deletions(-) create mode 100644 clavis/clavis-telemetry/metrics.go diff --git a/clavis/clavis-telemetry/main.go b/clavis/clavis-telemetry/main.go index 08890a7..a8ab305 100644 --- a/clavis/clavis-telemetry/main.go +++ b/clavis/clavis-telemetry/main.go @@ -112,6 +112,8 @@ func routeHandler(w http.ResponseWriter, r *http.Request) { handleTelemetry(w, r) case "/health": handleHealth(w, r) + case "/metrics": + handleMetrics(w, r) default: tarpit(w, r) } @@ -207,23 +209,28 @@ func ensureTables() { } func handleHealth(w http.ResponseWriter, r *http.Request) { - // Check DB writable + // Check DB writable with timing + dbStart := time.Now() var one int err := db.QueryRow("SELECT 1").Scan(&one) + dbDuration := time.Since(dbStart) + RecordDBQueryDuration(dbDuration) + if err != nil { http.Error(w, `{"status":"error","db":"unavailable"}`, 503) return } - + // Check recent telemetry (any source) var lastBeat int64 db.QueryRow(`SELECT MAX(received_at) FROM telemetry`).Scan(&lastBeat) - + w.Header().Set("Content-Type", "application/json") fmt.Fprintf(w, `{"status":"ok","db":"ok","last_telemetry":%d}`, lastBeat) } func handleTelemetry(w http.ResponseWriter, r *http.Request) { + start := time.Now() if r.Method != "POST" { w.WriteHeader(405) return @@ -337,6 +344,11 @@ func handleTelemetry(w http.ResponseWriter, r *http.Request) { // Uptime span tracking updateSpan(t.NodeID, t.Hostname, t.Version, t.CPUPercent, t.MemUsedMB, t.MemTotalMB, t.DiskUsedMB, t.DiskTotalMB, t.Load1m, t.UptimeSeconds) + // Record metrics + duration := time.Since(start) + RecordRequestDuration(duration) + RecordRequest(t.NodeID, "200") + w.WriteHeader(200) } diff --git a/clavis/clavis-telemetry/main_test.go b/clavis/clavis-telemetry/main_test.go index c88d139..eebfe29 100644 --- a/clavis/clavis-telemetry/main_test.go +++ b/clavis/clavis-telemetry/main_test.go @@ -12,6 +12,7 @@ import ( "net/http/httptest" "os" "strings" + "sync/atomic" "testing" "time" @@ -336,6 +337,215 @@ func TestEnsureTables(t *testing.T) { } } +// Metrics tests +func TestHandleMetrics(t *testing.T) { + req := httptest.NewRequest("GET", "/metrics", nil) + w := httptest.NewRecorder() + + handleMetrics(w, req) + + resp := w.Result() + if resp.StatusCode != 200 { + t.Errorf("handleMetrics status = %d, want 200", resp.StatusCode) + } + + contentType := resp.Header.Get("Content-Type") + if !strings.Contains(contentType, "text/plain") { + t.Errorf("handleMetrics content-type = %s, want text/plain", contentType) + } + + body := w.Body.String() + + // Check for expected metric names + expectedMetrics := []string{ + "# HELP telemetry_requests_total", + "# TYPE telemetry_requests_total counter", + "# HELP telemetry_request_duration_seconds", + "# TYPE telemetry_request_duration_seconds histogram", + "# HELP active_connections", + "# TYPE active_connections gauge", + "# HELP db_query_duration_seconds", + "# TYPE db_query_duration_seconds histogram", + } + + for _, metric := range expectedMetrics { + if !strings.Contains(body, metric) { + t.Errorf("handleMetrics response missing: %s", metric) + } + } +} + +func TestHandleMetrics_MethodNotAllowed(t *testing.T) { + req := httptest.NewRequest("POST", "/metrics", nil) + w := httptest.NewRecorder() + + handleMetrics(w, req) + + resp := w.Result() + if resp.StatusCode != 405 { + t.Errorf("handleMetrics POST status = %d, want 405", resp.StatusCode) + } +} + +func TestRecordRequest(t *testing.T) { + // Clear any existing metrics + requestsTotalMu.Lock() + requestsTotal = make(map[string]uint64) + requestsTotalMu.Unlock() + + // Record some requests + RecordRequest("pop-zrh-1", "200") + RecordRequest("pop-zrh-1", "200") + RecordRequest("pop-zrh-1", "500") + RecordRequest("pop-nyc-1", "200") + + // Verify counts + requestsTotalMu.RLock() + if requestsTotal["pop-zrh-1:200"] != 2 { + t.Errorf("pop-zrh-1:200 count = %d, want 2", requestsTotal["pop-zrh-1:200"]) + } + if requestsTotal["pop-zrh-1:500"] != 1 { + t.Errorf("pop-zrh-1:500 count = %d, want 1", requestsTotal["pop-zrh-1:500"]) + } + if requestsTotal["pop-nyc-1:200"] != 1 { + t.Errorf("pop-nyc-1:200 count = %d, want 1", requestsTotal["pop-nyc-1:200"]) + } + requestsTotalMu.RUnlock() +} + +func TestRecordRequestDuration(t *testing.T) { + // Reset histogram + reqDurationMu.Lock() + reqDurationCount = 0 + reqDurationSum = 0 + for _, b := range histogramBuckets { + reqDurationBuckets[b] = 0 + } + reqDurationMu.Unlock() + + // Record durations + RecordRequestDuration(50 * time.Millisecond) + RecordRequestDuration(150 * time.Millisecond) + RecordRequestDuration(2 * time.Second) + + // Verify + count := atomic.LoadUint64(&reqDurationCount) + if count != 3 { + t.Errorf("reqDurationCount = %d, want 3", count) + } + + reqDurationMu.RLock() + if reqDurationSum < 2.0 || reqDurationSum > 2.5 { + t.Errorf("reqDurationSum = %f, expected around 2.2", reqDurationSum) + } + // 50ms should be in all buckets >= 0.05 (cumulative histogram) + // Buckets: 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10 + // 50ms falls into: 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10 + if reqDurationBuckets[0.05] != 1 { + t.Errorf("bucket 0.05 = %d, want 1", reqDurationBuckets[0.05]) + } + // 2s falls into buckets >= 2.5: 2.5, 5, 10 (cumulative - includes all 3 observations) + // All 3 observations (50ms, 150ms, 2s) fall into buckets >= 2.5 + if reqDurationBuckets[2.5] != 3 { + t.Errorf("bucket 2.5 = %d, want 3", reqDurationBuckets[2.5]) + } + reqDurationMu.RUnlock() +} + +func TestRecordDBQueryDuration(t *testing.T) { + // Reset histogram + dbDurationMu.Lock() + dbDurationCount = 0 + dbDurationSum = 0 + for _, b := range histogramBuckets { + dbDurationBuckets[b] = 0 + } + dbDurationMu.Unlock() + + // Record durations + RecordDBQueryDuration(5 * time.Millisecond) + RecordDBQueryDuration(25 * time.Millisecond) + + // Verify + count := atomic.LoadUint64(&dbDurationCount) + if count != 2 { + t.Errorf("dbDurationCount = %d, want 2", count) + } + + dbDurationMu.RLock() + if dbDurationBuckets[0.05] != 2 { + t.Errorf("db bucket 0.05 = %d, want 2", dbDurationBuckets[0.05]) + } + dbDurationMu.RUnlock() +} + +func TestActiveConnections(t *testing.T) { + // Reset + atomic.StoreInt64(&activeConnections, 0) + + // Test increment/decrement + IncrementActiveConnections() + IncrementActiveConnections() + if GetActiveConnections() != 2 { + t.Errorf("activeConnections = %d, want 2", GetActiveConnections()) + } + + DecrementActiveConnections() + if GetActiveConnections() != 1 { + t.Errorf("activeConnections = %d, want 1", GetActiveConnections()) + } + + DecrementActiveConnections() + if GetActiveConnections() != 0 { + t.Errorf("activeConnections = %d, want 0", GetActiveConnections()) + } +} + +func TestSplitLast(t *testing.T) { + tests := []struct { + input string + sep string + expected []string + }{ + {"pop-zrh-1:200", ":", []string{"pop-zrh-1", "200"}}, + {"pop-zrh-1:status:200", ":", []string{"pop-zrh-1:status", "200"}}, + {"no-separator", ":", []string{"no-separator"}}, + {"", ":", []string{""}}, + } + + for _, tt := range tests { + result := splitLast(tt.input, tt.sep) + if len(result) != len(tt.expected) { + t.Errorf("splitLast(%q, %q) = %v, want %v", tt.input, tt.sep, result, tt.expected) + continue + } + for i := range result { + if result[i] != tt.expected[i] { + t.Errorf("splitLast(%q, %q)[%d] = %q, want %q", tt.input, tt.sep, i, result[i], tt.expected[i]) + } + } + } +} + +func TestFormatFloat(t *testing.T) { + tests := []struct { + input float64 + expected string + }{ + {0.005, "0.005"}, + {1.0, "1"}, + {2.5, "2.5"}, + {0.1, "0.1"}, + } + + for _, tt := range tests { + result := formatFloat(tt.input) + if result != tt.expected { + t.Errorf("formatFloat(%f) = %q, want %q", tt.input, result, tt.expected) + } + } +} + // Test that mTLS enforcement works type mockResponseWriter struct { headers http.Header diff --git a/clavis/clavis-telemetry/metrics.go b/clavis/clavis-telemetry/metrics.go new file mode 100644 index 0000000..fcb4615 --- /dev/null +++ b/clavis/clavis-telemetry/metrics.go @@ -0,0 +1,184 @@ +//go:build commercial + +package main + +import ( + "fmt" + "net/http" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" +) + +// Prometheus-style metrics for telemetry service +// Following KISS principle - no external dependencies, simple text format + +var ( + // Counters: telemetry_requests_total{pop_id, status} + requestsTotalMu sync.RWMutex + requestsTotal = make(map[string]uint64) // key: "pop_id:status" + + // Gauge: active_connections + activeConnections int64 + + // Histogram: telemetry_request_duration_seconds + reqDurationMu sync.RWMutex + reqDurationCount uint64 + reqDurationSum float64 + reqDurationBuckets = make(map[float64]uint64) + + // Histogram: db_query_duration_seconds + dbDurationMu sync.RWMutex + dbDurationCount uint64 + dbDurationSum float64 + dbDurationBuckets = make(map[float64]uint64) +) + +// Standard Prometheus histogram buckets +var histogramBuckets = []float64{0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10} + +func init() { + // Initialize bucket counters + for _, b := range histogramBuckets { + reqDurationBuckets[b] = 0 + dbDurationBuckets[b] = 0 + } +} + +// RecordRequest increments the request counter for a given POP and status +func RecordRequest(popID, status string) { + key := popID + ":" + status + requestsTotalMu.Lock() + requestsTotal[key]++ + requestsTotalMu.Unlock() +} + +// RecordRequestDuration records a request duration observation +func RecordRequestDuration(duration time.Duration) { + seconds := duration.Seconds() + atomic.AddUint64(&reqDurationCount, 1) + + reqDurationMu.Lock() + reqDurationSum += seconds + for _, b := range histogramBuckets { + if seconds <= b { + reqDurationBuckets[b]++ + } + } + reqDurationMu.Unlock() +} + +// RecordDBQueryDuration records a database query duration observation +func RecordDBQueryDuration(duration time.Duration) { + seconds := duration.Seconds() + atomic.AddUint64(&dbDurationCount, 1) + + dbDurationMu.Lock() + dbDurationSum += seconds + for _, b := range histogramBuckets { + if seconds <= b { + dbDurationBuckets[b]++ + } + } + dbDurationMu.Unlock() +} + +// IncrementActiveConnections increments the active connections gauge +func IncrementActiveConnections() { + atomic.AddInt64(&activeConnections, 1) +} + +// DecrementActiveConnections decrements the active connections gauge +func DecrementActiveConnections() { + atomic.AddInt64(&activeConnections, -1) +} + +// GetActiveConnections returns the current active connections count +func GetActiveConnections() int64 { + return atomic.LoadInt64(&activeConnections) +} + +// handleMetrics serves Prometheus-format metrics +func handleMetrics(w http.ResponseWriter, r *http.Request) { + if r.Method != "GET" && r.Method != "HEAD" { + w.WriteHeader(405) + return + } + + w.Header().Set("Content-Type", "text/plain; version=0.0.4; charset=utf-8") + + var output strings.Builder + + // telemetry_requests_total counter + output.WriteString("# HELP telemetry_requests_total Total number of telemetry requests\n") + output.WriteString("# TYPE telemetry_requests_total counter\n") + requestsTotalMu.RLock() + for key, count := range requestsTotal { + parts := splitLast(key, ":") + if len(parts) == 2 { + output.WriteString(fmt.Sprintf("telemetry_requests_total{pop_id=\"%s\",status=\"%s\"} %d\n", parts[0], parts[1], count)) + } + } + requestsTotalMu.RUnlock() + output.WriteString("\n") + + // telemetry_request_duration_seconds histogram + output.WriteString("# HELP telemetry_request_duration_seconds Request duration in seconds\n") + output.WriteString("# TYPE telemetry_request_duration_seconds histogram\n") + + reqCount := atomic.LoadUint64(&reqDurationCount) + + reqDurationMu.RLock() + for _, b := range histogramBuckets { + output.WriteString(fmt.Sprintf("telemetry_request_duration_seconds_bucket{le=\"%s\"} %d\n", formatFloat(b), reqDurationBuckets[b])) + } + reqSum := reqDurationSum + reqDurationMu.RUnlock() + + output.WriteString(fmt.Sprintf("telemetry_request_duration_seconds_bucket{le=\"+Inf\"} %d\n", reqCount)) + output.WriteString(fmt.Sprintf("telemetry_request_duration_seconds_count %d\n", reqCount)) + output.WriteString(fmt.Sprintf("telemetry_request_duration_seconds_sum %s\n", formatFloat(reqSum))) + output.WriteString("\n") + + // active_connections gauge + output.WriteString("# HELP active_connections Current number of active connections\n") + output.WriteString("# TYPE active_connections gauge\n") + output.WriteString(fmt.Sprintf("active_connections %d\n", GetActiveConnections())) + output.WriteString("\n") + + // db_query_duration_seconds histogram + output.WriteString("# HELP db_query_duration_seconds Database query duration in seconds\n") + output.WriteString("# TYPE db_query_duration_seconds histogram\n") + + dbCount := atomic.LoadUint64(&dbDurationCount) + + dbDurationMu.RLock() + for _, b := range histogramBuckets { + output.WriteString(fmt.Sprintf("db_query_duration_seconds_bucket{le=\"%s\"} %d\n", formatFloat(b), dbDurationBuckets[b])) + } + dbSum := dbDurationSum + dbDurationMu.RUnlock() + + output.WriteString(fmt.Sprintf("db_query_duration_seconds_bucket{le=\"+Inf\"} %d\n", dbCount)) + output.WriteString(fmt.Sprintf("db_query_duration_seconds_count %d\n", dbCount)) + output.WriteString(fmt.Sprintf("db_query_duration_seconds_sum %s\n", formatFloat(dbSum))) + + w.WriteHeader(200) + w.Write([]byte(output.String())) +} + +// splitLast splits a string on the last occurrence of sep +func splitLast(s, sep string) []string { + idx := strings.LastIndex(s, sep) + if idx == -1 { + return []string{s} + } + return []string{s[:idx], s[idx+len(sep):]} +} + +// formatFloat formats a float without scientific notation +func formatFloat(f float64) string { + return strconv.FormatFloat(f, 'f', -1, 64) +}