clavitor/clavis/clavis-vault/lib/telemetry.go

352 lines
8.6 KiB
Go

package lib
import (
"bytes"
"database/sql"
"encoding/json"
"fmt"
"log"
"net/http"
"os"
"path/filepath"
"runtime"
"strconv"
"strings"
"syscall"
"time"
)
// TelemetryConfig controls the optional telemetry reporter.
// All fields zero/empty = telemetry disabled.
type TelemetryConfig struct {
FreqSeconds int // interval between POSTs (0 = disabled)
Host string // e.g. https://hq.clavitor.com/telemetry
Token string // Bearer token for auth
DataDir string // vault data directory (to scan DBs)
Version string // build version string
}
// TelemetryPayload is the JSON body posted to the telemetry endpoint.
type TelemetryPayload struct {
Version string `json:"version"`
Hostname string `json:"hostname"`
UptimeSeconds int64 `json:"uptime_seconds"`
Timestamp string `json:"timestamp"`
System SystemMetrics `json:"system"`
Vaults VaultMetrics `json:"vaults"`
}
type SystemMetrics struct {
OS string `json:"os"`
Arch string `json:"arch"`
CPUs int `json:"cpus"`
CPUPercent float64 `json:"cpu_percent"`
MemTotalMB int64 `json:"memory_total_mb"`
MemUsedMB int64 `json:"memory_used_mb"`
DiskTotalMB int64 `json:"disk_total_mb"`
DiskUsedMB int64 `json:"disk_used_mb"`
Load1m float64 `json:"load_1m"`
}
type VaultMetrics struct {
Count int `json:"count"`
TotalSizeMB int64 `json:"total_size_mb"`
TotalEntries int64 `json:"total_entries"`
}
// AlertOperator sends critical operational alerts.
// Community: logs to stderr only.
// Commercial (telemetry enabled): also POSTs to alert endpoint.
func AlertOperator(cfg TelemetryConfig, alertType, message string, details map[string]any) {
// Always log locally
if details != nil {
log.Printf("OPERATOR ALERT [%s]: %s - %+v", alertType, message, details)
} else {
log.Printf("OPERATOR ALERT [%s]: %s", alertType, message)
}
// Commercial: POST to telemetry alert endpoint if configured
if cfg.Host == "" {
return
}
hostname, _ := os.Hostname()
alert := map[string]any{
"type": alertType,
"message": message,
"details": details,
"hostname": hostname,
"version": cfg.Version,
"timestamp": time.Now().UTC().Format(time.RFC3339),
}
body, _ := json.Marshal(alert)
req, _ := http.NewRequest("POST", cfg.Host+"/alerts", bytes.NewReader(body))
req.Header.Set("Content-Type", "application/json")
if cfg.Token != "" {
req.Header.Set("Authorization", "Bearer "+cfg.Token)
}
client := &http.Client{Timeout: 5 * time.Second}
resp, err := client.Do(req)
if err != nil {
log.Printf("telemetry alert failed: %v", err)
return
}
resp.Body.Close()
}
// collects metrics and POSTs them to cfg.Host. Does nothing if
// FreqSeconds <= 0 or Host is empty.
func StartTelemetry(cfg TelemetryConfig) {
if cfg.FreqSeconds <= 0 || cfg.Host == "" {
return
}
startTime := time.Now()
interval := time.Duration(cfg.FreqSeconds) * time.Second
client := &http.Client{Timeout: 10 * time.Second}
log.Printf("Telemetry enabled: posting every %ds to %s", cfg.FreqSeconds, cfg.Host)
go func() {
// Post immediately on startup at 10s intervals until ACK'd,
// then settle into the normal interval.
retry := 10 * time.Second
for {
payload := CollectPayload(cfg, startTime)
if postTelemetry(client, cfg.Host, cfg.Token, payload) {
time.Sleep(interval)
} else {
log.Printf("telemetry: no ACK, retrying in %s", retry)
time.Sleep(retry)
}
}
}()
}
// CollectPayload gathers system and vault metrics into a TelemetryPayload.
func CollectPayload(cfg TelemetryConfig, startTime time.Time) TelemetryPayload {
hostname, _ := os.Hostname()
return TelemetryPayload{
Version: cfg.Version,
Hostname: hostname,
UptimeSeconds: int64(time.Since(startTime).Seconds()),
Timestamp: time.Now().UTC().Format(time.RFC3339),
System: collectSystemMetrics(cfg.DataDir),
Vaults: collectVaultMetrics(cfg.DataDir),
}
}
func postTelemetry(client *http.Client, host, token string, payload TelemetryPayload) bool {
body, err := json.Marshal(payload)
if err != nil {
log.Printf("telemetry: marshal error: %v", err)
return false
}
req, err := http.NewRequest("POST", host, bytes.NewReader(body))
if err != nil {
log.Printf("telemetry: request error: %v", err)
return false
}
req.Header.Set("Content-Type", "application/json")
if token != "" {
req.Header.Set("Authorization", "Bearer "+token)
}
resp, err := client.Do(req)
if err != nil {
log.Printf("telemetry: post error: %v", err)
return false
}
defer resp.Body.Close()
if resp.StatusCode >= 300 {
log.Printf("telemetry: unexpected status %d", resp.StatusCode)
return false
}
var ack struct {
OK bool `json:"ok"`
}
json.NewDecoder(resp.Body).Decode(&ack)
return ack.OK
}
func collectSystemMetrics(dataDir string) SystemMetrics {
m := SystemMetrics{
OS: runtime.GOOS,
Arch: runtime.GOARCH,
CPUs: runtime.NumCPU(),
}
m.CPUPercent = readCPUPercent()
m.MemTotalMB, m.MemUsedMB = readMemInfo()
m.DiskTotalMB, m.DiskUsedMB = readDiskUsage(dataDir)
m.Load1m = readLoadAvg()
return m
}
// readCPUPercent samples /proc/stat twice 500ms apart to compute real CPU usage.
func readCPUPercent() float64 {
s1 := readCPUStat()
time.Sleep(500 * time.Millisecond)
s2 := readCPUStat()
total1, total2 := sumUint64(s1), sumUint64(s2)
totalDiff := total2 - total1
if totalDiff == 0 {
return 0
}
// Field index 3 is idle time.
idleDiff := s2[3] - s1[3]
return float64(totalDiff-idleDiff) / float64(totalDiff) * 100
}
func readCPUStat() []uint64 {
data, err := os.ReadFile("/proc/stat")
if err != nil {
return make([]uint64, 10)
}
for _, line := range strings.Split(string(data), "\n") {
if !strings.HasPrefix(line, "cpu ") {
continue
}
fields := strings.Fields(line)[1:] // skip "cpu"
vals := make([]uint64, len(fields))
for i, f := range fields {
vals[i], _ = strconv.ParseUint(f, 10, 64)
}
return vals
}
return make([]uint64, 10)
}
func sumUint64(vals []uint64) uint64 {
var t uint64
for _, v := range vals {
t += v
}
return t
}
// readMemInfo parses /proc/meminfo for total and used memory.
// Falls back to Go runtime stats on non-Linux.
func readMemInfo() (totalMB, usedMB int64) {
data, err := os.ReadFile("/proc/meminfo")
if err != nil {
// Fallback: Go runtime memory stats (process only, not system).
var ms runtime.MemStats
runtime.ReadMemStats(&ms)
return int64(ms.Sys / 1024 / 1024), int64(ms.Alloc / 1024 / 1024)
}
var total, available int64
for _, line := range strings.Split(string(data), "\n") {
fields := strings.Fields(line)
if len(fields) < 2 {
continue
}
val, _ := strconv.ParseInt(fields[1], 10, 64)
switch fields[0] {
case "MemTotal:":
total = val // kB
case "MemAvailable:":
available = val // kB
}
}
totalMB = total / 1024
usedMB = (total - available) / 1024
return
}
// readDiskUsage returns total and used disk space for the filesystem
// containing dataDir.
func readDiskUsage(path string) (totalMB, usedMB int64) {
if path == "" {
path = "."
}
var stat syscall.Statfs_t
if err := syscall.Statfs(path, &stat); err != nil {
return 0, 0
}
totalBytes := stat.Blocks * uint64(stat.Bsize)
freeBytes := stat.Bavail * uint64(stat.Bsize)
totalMB = int64(totalBytes / 1024 / 1024)
usedMB = int64((totalBytes - freeBytes) / 1024 / 1024)
return
}
// readLoadAvg parses /proc/loadavg for the 1-minute load average.
func readLoadAvg() float64 {
data, err := os.ReadFile("/proc/loadavg")
if err != nil {
return 0
}
fields := strings.Fields(string(data))
if len(fields) < 1 {
return 0
}
load, _ := strconv.ParseFloat(fields[0], 64)
return load
}
// collectVaultMetrics scans dataDir for .db files and counts entries.
func collectVaultMetrics(dataDir string) VaultMetrics {
if dataDir == "" {
dataDir = "."
}
var m VaultMetrics
matches, err := filepath.Glob(filepath.Join(dataDir, "*.db"))
if err != nil {
return m
}
for _, dbPath := range matches {
base := filepath.Base(dbPath)
// Skip non-vault databases.
if base == "node.db" {
continue
}
info, err := os.Stat(dbPath)
if err != nil {
continue
}
m.Count++
m.TotalSizeMB += info.Size() / 1024 / 1024
count := countEntries(dbPath)
m.TotalEntries += count
}
// For self-hosted mode with a single DB, the size might round to 0.
// Report in KB precision via MB (allow fractional).
return m
}
func countEntries(dbPath string) int64 {
db, err := sql.Open("sqlite3", fmt.Sprintf("file:%s?mode=ro&_journal_mode=WAL", dbPath))
if err != nil {
return 0
}
defer db.Close()
var count int64
err = db.QueryRow("SELECT COUNT(*) FROM entries WHERE deleted_at IS NULL").Scan(&count)
if err != nil {
return 0
}
return count
}