clavitor/operations/pop-sync/main.go

1777 lines
48 KiB
Go

package main
import (
"bytes"
"context"
"database/sql"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"os/exec"
"path/filepath"
"strings"
"sync"
"time"
awsconfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/ec2"
ec2types "github.com/aws/aws-sdk-go-v2/service/ec2/types"
"github.com/aws/aws-sdk-go-v2/service/ssm"
_ "modernc.org/sqlite"
)
// ---------------------------------------------------------------------------
// pop-sync — Clavitor fleet management tool
//
// Subcommands:
// sync Reconcile DNS + Tailscale to match the pops DB
// deploy Build clovis-vault for arm64, push to all live nodes, restart
// status Health-check all live nodes (Tailscale reachable, service up)
// exec Run a shell command on all (or specific) live nodes
//
// Designed for both human and agent use:
// --json Machine-readable JSON output
// Exit 0 Everything OK
// Exit 1 Partial failure (some nodes failed)
// Exit 2 Fatal / config error
// ---------------------------------------------------------------------------
// --- Types ---
type POP struct {
PopID int `json:"pop_id"`
City string `json:"city"`
Country string `json:"country"`
RegionName string `json:"region_name"`
IP string `json:"ip"`
DNS string `json:"dns"`
Status string `json:"status"`
Provider string `json:"provider"`
InstanceID string `json:"instance_id"`
}
func (p POP) Subdomain() string {
parts := strings.SplitN(p.DNS, ".", 2)
if len(parts) > 0 {
return parts[0]
}
return ""
}
func (p POP) Zone() string {
parts := strings.SplitN(p.DNS, ".", 2)
if len(parts) == 2 {
return parts[1]
}
return ""
}
type Config struct {
DBPath string
CFToken string
TSKey string
TSAuthKey string
HansHost string
DryRun bool
JSONOut bool
Zone string
CFZoneID string
VaultSrc string // path to clovis-vault source
Nodes string // comma-separated node filter (empty = all)
AWSKeyID string
AWSSecretKey string
}
type NodeResult struct {
Node string `json:"node"`
Action string `json:"action"`
OK bool `json:"ok"`
Message string `json:"message,omitempty"`
Error string `json:"error,omitempty"`
}
// --- Main ---
func main() {
if len(os.Args) < 2 {
printUsage()
os.Exit(2)
}
cmd := os.Args[1]
// Shift args so flags work after subcommand
os.Args = append(os.Args[:1], os.Args[2:]...)
cfg, remaining := parseFlags()
switch cmd {
case "sync":
exitWith(cmdSync(cfg))
case "deploy":
exitWith(cmdDeploy(cfg))
case "status":
exitWith(cmdStatus(cfg))
case "exec":
if len(remaining) == 0 {
fatal("usage: pop-sync exec <command>")
}
exitWith(cmdExec(cfg, strings.Join(remaining, " ")))
case "firewall":
exitWith(cmdFirewall(cfg))
case "provision":
if len(remaining) == 0 {
fatal("usage: pop-sync provision <city> [city2 ...] (e.g. pop-sync provision Tokyo Calgary)")
}
exitWith(cmdProvision(cfg, remaining))
case "maintenance":
if len(remaining) == 0 {
fatal("usage: pop-sync maintenance <on|off> [reason]")
}
cmdMaintenance(cfg, remaining)
case "help", "--help", "-h":
printUsage()
default:
fmt.Fprintf(os.Stderr, "unknown command: %s\n", cmd)
printUsage()
os.Exit(2)
}
}
func printUsage() {
fmt.Println(`pop-sync — Clavitor fleet management
Commands:
sync Reconcile DNS + Tailscale with pops DB (create, update, delete)
deploy Build clovis-vault (arm64), deploy to all live nodes, restart service
status Health-check all live nodes (reachable, service running, version)
exec Run a command on live nodes: pop-sync exec <command>
maintenance Toggle maintenance mode: pop-sync maintenance on "fleet deploy"
Flags:
-db Path to clavitor.db (default: ../clavitor.com/clavitor.db)
-cf-token Cloudflare API token (or CF_API_TOKEN env)
-ts-key Tailscale API key (or TS_API_KEY env)
-ts-authkey Tailscale auth key for joining new nodes (or TS_AUTHKEY env)
-hans SSH target for Hans/SSM relay (default: johan@185.218.204.47)
-vault-src Path to clovis-vault source (default: ../clavis/clavis-vault)
-zone DNS zone (default: clavitor.ai)
-nodes Comma-separated node filter, e.g. "use1,sg1" (default: all)
-dry-run Show what would change without doing it
-json Output results as JSON (agent-friendly)
Examples:
pop-sync sync --dry-run
pop-sync deploy
pop-sync status --json
pop-sync exec "systemctl status clavitor"
pop-sync exec -nodes use1,sg1 "journalctl -u clavitor -n 20"`)
}
func parseFlags() (Config, []string) {
cfg := Config{}
var remaining []string
for i := 1; i < len(os.Args); i++ {
arg := os.Args[i]
if !strings.HasPrefix(arg, "-") {
remaining = append(remaining, arg)
continue
}
next := func() string {
if i+1 < len(os.Args) {
i++
return os.Args[i]
}
fatal("missing value for %s", arg)
return ""
}
switch arg {
case "-db":
cfg.DBPath = next()
case "-cf-token":
cfg.CFToken = next()
case "-ts-key":
cfg.TSKey = next()
case "-ts-authkey":
cfg.TSAuthKey = next()
case "-hans":
cfg.HansHost = next()
case "-vault-src":
cfg.VaultSrc = next()
case "-zone":
cfg.Zone = next()
case "-cf-zone-id":
cfg.CFZoneID = next()
case "-nodes":
cfg.Nodes = next()
case "-aws-key":
cfg.AWSKeyID = next()
case "-aws-secret":
cfg.AWSSecretKey = next()
case "-dry-run", "--dry-run":
cfg.DryRun = true
case "-json", "--json":
cfg.JSONOut = true
default:
remaining = append(remaining, arg)
}
}
// Defaults
if cfg.DBPath == "" {
cfg.DBPath = "../clavitor.com/clavitor.db"
}
if cfg.CFToken == "" {
cfg.CFToken = os.Getenv("CF_API_TOKEN")
}
if cfg.TSKey == "" {
cfg.TSKey = os.Getenv("TS_API_KEY")
}
if cfg.TSAuthKey == "" {
cfg.TSAuthKey = os.Getenv("TS_AUTHKEY")
}
if cfg.HansHost == "" {
cfg.HansHost = "johan@185.218.204.47"
}
if cfg.VaultSrc == "" {
cfg.VaultSrc = "../clavis/clavis-vault"
}
if cfg.Zone == "" {
cfg.Zone = "clavitor.ai"
}
if cfg.AWSKeyID == "" {
cfg.AWSKeyID = os.Getenv("AWS_ACCESS_KEY_ID")
}
if cfg.AWSSecretKey == "" {
cfg.AWSSecretKey = os.Getenv("AWS_SECRET_ACCESS_KEY")
}
return cfg, remaining
}
func exitWith(results []NodeResult) {
hasFailure := false
for _, r := range results {
if !r.OK {
hasFailure = true
break
}
}
if hasFailure {
os.Exit(1)
}
}
// --- Subcommand: sync ---
func cmdSync(cfg Config) []NodeResult {
requireKeys(cfg, "cf", "ts")
pops := loadLivePOPs(cfg)
log(cfg, "DB: %d live POPs with DNS+IP", len(pops))
if cfg.CFZoneID == "" {
var err error
cfg.CFZoneID, err = cfResolveZoneID(cfg.CFToken, cfg.Zone)
if err != nil {
fatal("resolving zone: %v", err)
}
}
var results []NodeResult
// DNS sync
log(cfg, "\n--- Cloudflare DNS ---")
dnsResults := syncDNS(cfg, pops)
results = append(results, dnsResults...)
// Tailscale sync
log(cfg, "\n--- Tailscale ---")
tsResults := syncTailscale(cfg, pops)
results = append(results, tsResults...)
// Firewall sync
if cfg.AWSKeyID != "" {
log(cfg, "\n--- AWS Firewall ---")
for _, p := range pops {
results = append(results, ensureFirewall(cfg, p))
}
}
outputResults(cfg, results)
return results
}
// --- Subcommand: deploy ---
func cmdDeploy(cfg Config) []NodeResult {
pops := filterNodes(cfg, loadLivePOPs(cfg))
if len(pops) == 0 {
fatal("no live nodes to deploy to")
}
// Step 0: Firewall
if cfg.AWSKeyID != "" {
log(cfg, "--- Firewall ---")
for _, p := range pops {
ensureFirewall(cfg, p)
}
}
// Step 1: Build
log(cfg, "\n--- Build ---")
binaryPath := buildVault(cfg)
log(cfg, "Built: %s", binaryPath)
if cfg.DryRun {
var results []NodeResult
for _, p := range pops {
results = append(results, NodeResult{Node: p.Subdomain(), Action: "deploy", OK: true, Message: "would deploy"})
}
outputResults(cfg, results)
return results
}
// Step 2: Upload binary to Hans and serve it temporarily via HTTP
log(cfg, "\n--- Deploy to %d nodes ---", len(pops))
hansDir := "/tmp/clavitor-serve"
hansFile := hansDir + "/clavitor"
log(cfg, "Uploading binary to Hans...")
sshExec(cfg.HansHost, "mkdir -p "+hansDir)
if err := scpToHost(cfg.HansHost, binaryPath, hansFile); err != nil {
fatal("scp to hans: %v", err)
}
// Start a temporary HTTP server on Hans (port 9876)
log(cfg, "Starting temporary file server on Hans:9876...")
sshExec(cfg.HansHost, "pkill -f 'http.server 9876' 2>/dev/null; exit 0")
sshExec(cfg.HansHost, "sudo iptables -I INPUT -p tcp --dport 9876 -j ACCEPT")
time.Sleep(500 * time.Millisecond)
sshBackground(cfg.HansHost, fmt.Sprintf(
"cd %s && exec python3 -m http.server 9876 --bind 0.0.0.0 >/dev/null 2>&1", hansDir))
time.Sleep(2 * time.Second)
hansPublicIP := "185.218.204.47"
downloadURL := fmt.Sprintf("http://%s:9876/clavitor", hansPublicIP)
results := parallelExec(pops, 4, func(p POP) NodeResult {
name := p.Subdomain()
r := NodeResult{Node: name, Action: "deploy"}
if p.InstanceID == "" {
r.Error = "no instance_id — cannot deploy via SSM"
return r
}
log(cfg, " [%s] deploying via SSM...", name)
installCmds := []string{
"mkdir -p /opt/clavitor/bin /opt/clavitor/data",
fmt.Sprintf("curl -sf -o /tmp/clavitor-new %s", downloadURL),
"mv /tmp/clavitor-new /opt/clavitor/bin/clavitor",
"chmod +x /opt/clavitor/bin/clavitor",
fmt.Sprintf(`cat > /opt/clavitor/env << 'ENVEOF'
PORT=1984
VAULT_MODE=hosted
DATA_DIR=/opt/clavitor/data
TELEMETRY_FREQ=30
TELEMETRY_HOST=https://clavitor.ai/telemetry
TELEMETRY_TOKEN=clavitor-fleet-2026
TLS_DOMAIN=%s
CF_API_TOKEN=dSVz7JZtyK023q7kh4MMNmIggK1dahWdnBxVnP3O
TLS_CERT_DIR=/opt/clavitor/certs
TLS_EMAIL=ops@clavitor.ai
ENVEOF`, p.DNS),
`test -f /etc/systemd/system/clavitor.service || cat > /etc/systemd/system/clavitor.service << 'UNITEOF'
[Unit]
Description=Clavitor Vault
After=network-online.target
Wants=network-online.target
[Service]
Type=simple
ExecStart=/opt/clavitor/bin/clavitor
EnvironmentFile=/opt/clavitor/env
WorkingDirectory=/opt/clavitor/data
Restart=always
RestartSec=5
User=root
[Install]
WantedBy=multi-user.target
UNITEOF`,
"systemctl daemon-reload",
"systemctl enable clavitor",
"systemctl restart clavitor",
"sleep 2",
"systemctl is-active clavitor",
}
out, err := ssmRunCommand(cfg.HansHost, p.InstanceID, p.RegionName, installCmds)
if err != nil {
r.Error = fmt.Sprintf("deploy: %v\n%s", err, out)
log(cfg, " [%s] FAIL: %v", name, err)
return r
}
r.OK = true
r.Message = strings.TrimSpace(out)
log(cfg, " [%s] %s", name, r.Message)
return r
})
// Kill the temp HTTP server, close firewall, clean up
sshExec(cfg.HansHost, "pkill -f 'http.server 9876' 2>/dev/null; exit 0")
sshExec(cfg.HansHost, "sudo iptables -D INPUT -p tcp --dport 9876 -j ACCEPT 2>/dev/null; exit 0")
sshExec(cfg.HansHost, "rm -rf "+hansDir)
outputResults(cfg, results)
return results
}
func buildVault(cfg Config) string {
srcDir, _ := filepath.Abs(cfg.VaultSrc)
outPath := filepath.Join(srcDir, "clavitor-linux-arm64")
cmd := exec.Command("go", "build", "-o", outPath, "./cmd/clavitor")
cmd.Dir = srcDir
cmd.Env = append(os.Environ(),
"GOOS=linux",
"GOARCH=arm64",
"CGO_ENABLED=0",
)
out, err := cmd.CombinedOutput()
if err != nil {
fatal("build failed: %v\n%s", err, string(out))
}
return outPath
}
// --- Subcommand: status ---
func cmdStatus(cfg Config) []NodeResult {
pops := filterNodes(cfg, loadLivePOPs(cfg))
log(cfg, "Checking %d nodes...\n", len(pops))
statusCmd := "echo SERVICE=$(systemctl is-active clavitor 2>/dev/null || echo stopped); echo HOST=$(hostname); uptime -p 2>/dev/null; echo TS=$(tailscale ip -4 2>/dev/null || echo none)"
results := parallelExec(pops, 4, func(p POP) NodeResult {
name := p.Subdomain()
r := NodeResult{Node: name, Action: "status"}
out, err := nodeExec(cfg, p, statusCmd)
if err != nil {
r.Error = fmt.Sprintf("unreachable: %v", err)
log(cfg, " FAIL %s — %v", name, err)
return r
}
r.OK = true
r.Message = strings.TrimSpace(out)
log(cfg, " OK %s — %s", name, oneLineStatus(out))
return r
})
outputResults(cfg, results)
return results
}
// --- Subcommand: exec ---
func cmdExec(cfg Config, command string) []NodeResult {
pops := filterNodes(cfg, loadLivePOPs(cfg))
log(cfg, "Running on %d nodes: %s\n", len(pops), command)
results := parallelExec(pops, 4, func(p POP) NodeResult {
name := p.Subdomain()
r := NodeResult{Node: name, Action: "exec"}
out, err := nodeExec(cfg, p, command)
if err != nil {
r.Error = fmt.Sprintf("%v\n%s", err, out)
log(cfg, "--- %s (FAIL) ---\n%s\n%v", name, out, err)
return r
}
r.OK = true
r.Message = strings.TrimSpace(out)
log(cfg, "--- %s ---\n%s", name, r.Message)
return r
})
outputResults(cfg, results)
return results
}
// --- Subcommand: maintenance ---
func cmdMaintenance(cfg Config, args []string) {
action := args[0]
reason := ""
if len(args) > 1 {
reason = strings.Join(args[1:], " ")
}
by := "pop-sync"
if user := os.Getenv("USER"); user != "" {
by = "pop-sync/" + user
}
switch action {
case "on", "start":
body, _ := json.Marshal(map[string]string{"action": "start", "reason": reason, "by": by})
req, _ := http.NewRequest("POST", "https://clavitor.ai/noc/api/maintenance?pin=250365", bytes.NewReader(body))
req.Header.Set("Content-Type", "application/json")
resp, err := http.DefaultClient.Do(req)
if err != nil {
fatal("maintenance start: %v", err)
}
defer resp.Body.Close()
io.ReadAll(resp.Body)
log(cfg, "Maintenance ON: %s (by %s)", reason, by)
case "off", "stop":
body, _ := json.Marshal(map[string]string{"action": "stop", "reason": reason, "by": by})
req, _ := http.NewRequest("POST", "https://clavitor.ai/noc/api/maintenance?pin=250365", bytes.NewReader(body))
req.Header.Set("Content-Type", "application/json")
resp, err := http.DefaultClient.Do(req)
if err != nil {
fatal("maintenance stop: %v", err)
}
defer resp.Body.Close()
io.ReadAll(resp.Body)
log(cfg, "Maintenance OFF (by %s)", by)
case "status":
resp, err := http.Get("https://clavitor.ai/noc/api/maintenance?pin=250365")
if err != nil {
fatal("maintenance status: %v", err)
}
defer resp.Body.Close()
data, _ := io.ReadAll(resp.Body)
if cfg.JSONOut {
fmt.Println(string(data))
} else {
var result struct {
Active bool `json:"active"`
Windows []struct {
StartAt int64 `json:"start_at"`
EndAt *int64 `json:"end_at"`
Reason string `json:"reason"`
StartBy string `json:"started_by"`
EndBy string `json:"ended_by"`
} `json:"windows"`
}
json.Unmarshal(data, &result)
if result.Active {
fmt.Println("MAINTENANCE ACTIVE")
} else {
fmt.Println("No active maintenance")
}
for _, w := range result.Windows {
end := "ongoing"
if w.EndAt != nil {
end = time.Unix(*w.EndAt, 0).Format("2006-01-02 15:04:05")
}
fmt.Printf(" %s — %s %q by=%s\n",
time.Unix(w.StartAt, 0).Format("2006-01-02 15:04:05"),
end, w.Reason, w.StartBy)
}
}
default:
fatal("usage: pop-sync maintenance <on|off|status> [reason]")
}
}
// --- Subcommand: firewall ---
// Ensures every POP's security group has exactly port 1984 open, nothing else.
func cmdFirewall(cfg Config) []NodeResult {
if cfg.AWSKeyID == "" || cfg.AWSSecretKey == "" {
fatal("AWS credentials required: set AWS_ACCESS_KEY_ID/AWS_SECRET_ACCESS_KEY or -aws-key/-aws-secret")
}
pops := filterNodes(cfg, loadLivePOPs(cfg))
log(cfg, "Checking firewall for %d nodes...\n", len(pops))
var results []NodeResult
for _, p := range pops {
r := ensureFirewall(cfg, p)
results = append(results, r)
}
outputResults(cfg, results)
return results
}
// ensureFirewall makes sure the security group for a POP only allows inbound TCP 1984.
func ensureFirewall(cfg Config, pop POP) NodeResult {
name := pop.Subdomain()
region := pop.RegionName
r := NodeResult{Node: name, Action: "firewall"}
if pop.InstanceID == "" {
r.Error = "no instance_id"
return r
}
ctx := context.Background()
awsCfg, err := awsconfig.LoadDefaultConfig(ctx,
awsconfig.WithRegion(region),
awsconfig.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(cfg.AWSKeyID, cfg.AWSSecretKey, "")),
)
if err != nil {
r.Error = fmt.Sprintf("aws config: %v", err)
return r
}
client := ec2.NewFromConfig(awsCfg)
// Get instance's security group
descOut, err := client.DescribeInstances(ctx, &ec2.DescribeInstancesInput{
InstanceIds: []string{pop.InstanceID},
})
if err != nil {
r.Error = fmt.Sprintf("describe instance: %v", err)
return r
}
if len(descOut.Reservations) == 0 || len(descOut.Reservations[0].Instances) == 0 {
r.Error = "instance not found"
return r
}
inst := descOut.Reservations[0].Instances[0]
if len(inst.SecurityGroups) == 0 {
r.Error = "no security groups"
return r
}
sgID := *inst.SecurityGroups[0].GroupId
// Get current rules
sgOut, err := client.DescribeSecurityGroups(ctx, &ec2.DescribeSecurityGroupsInput{
GroupIds: []string{sgID},
})
if err != nil {
r.Error = fmt.Sprintf("describe sg: %v", err)
return r
}
sg := sgOut.SecurityGroups[0]
// Check what needs changing
has1984 := false
var toRevoke []ec2types.IpPermission
for _, perm := range sg.IpPermissions {
if perm.FromPort != nil && *perm.FromPort == 1984 && perm.ToPort != nil && *perm.ToPort == 1984 && perm.IpProtocol != nil && *perm.IpProtocol == "tcp" {
has1984 = true
} else {
toRevoke = append(toRevoke, perm)
}
}
changes := 0
// Remove unwanted rules
if len(toRevoke) > 0 && !cfg.DryRun {
_, err := client.RevokeSecurityGroupIngress(ctx, &ec2.RevokeSecurityGroupIngressInput{
GroupId: &sgID,
IpPermissions: toRevoke,
})
if err != nil {
r.Error = fmt.Sprintf("revoke rules: %v", err)
return r
}
for _, p := range toRevoke {
port := int32(0)
if p.FromPort != nil { port = *p.FromPort }
log(cfg, " [%s] removed port %d from %s", name, port, sgID)
}
changes += len(toRevoke)
} else if len(toRevoke) > 0 {
for _, p := range toRevoke {
port := int32(0)
if p.FromPort != nil { port = *p.FromPort }
log(cfg, " [%s] would remove port %d from %s", name, port, sgID)
}
changes += len(toRevoke)
}
// Add 1984 if missing
if !has1984 && !cfg.DryRun {
proto := "tcp"
port := int32(1984)
_, err := client.AuthorizeSecurityGroupIngress(ctx, &ec2.AuthorizeSecurityGroupIngressInput{
GroupId: &sgID,
IpPermissions: []ec2types.IpPermission{{
IpProtocol: &proto,
FromPort: &port,
ToPort: &port,
IpRanges: []ec2types.IpRange{{CidrIp: strPtr("0.0.0.0/0")}},
}},
})
if err != nil {
r.Error = fmt.Sprintf("add 1984: %v", err)
return r
}
log(cfg, " [%s] opened port 1984 on %s", name, sgID)
changes++
} else if !has1984 {
log(cfg, " [%s] would open port 1984 on %s", name, sgID)
changes++
}
if changes == 0 {
log(cfg, " [%s] OK — port 1984 only (%s)", name, sgID)
}
r.OK = true
r.Message = fmt.Sprintf("sg=%s port=1984", sgID)
return r
}
func strPtr(s string) *string { return &s }
func int32Ptr(i int32) *int32 { return &i }
// --- Subcommand: provision ---
// Spins up t4g.nano EC2 instances for planned POPs.
func cmdProvision(cfg Config, cities []string) []NodeResult {
if cfg.AWSKeyID == "" || cfg.AWSSecretKey == "" {
fatal("AWS credentials required: set AWS_ACCESS_KEY_ID/AWS_SECRET_ACCESS_KEY or -aws-key/-aws-secret")
}
// Load all POPs from DB
allPOPs, err := readPOPs(cfg.DBPath)
if err != nil {
fatal("reading DB: %v", err)
}
// Match cities to planned POPs
var targets []POP
for _, city := range cities {
found := false
for _, p := range allPOPs {
if strings.EqualFold(p.City, city) && p.Status == "planned" {
targets = append(targets, p)
found = true
break
}
}
if !found {
log(cfg, "WARNING: %q not found or not planned, skipping", city)
}
}
if len(targets) == 0 {
fatal("no matching planned POPs found")
}
log(cfg, "Provisioning %d nodes...\n", len(targets))
var results []NodeResult
for _, p := range targets {
r := provisionNode(cfg, p)
results = append(results, r)
}
outputResults(cfg, results)
return results
}
func provisionNode(cfg Config, pop POP) NodeResult {
region := pop.RegionName
r := NodeResult{Node: pop.City, Action: "provision"}
ctx := context.Background()
awsCfg, err := awsconfig.LoadDefaultConfig(ctx,
awsconfig.WithRegion(region),
awsconfig.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(cfg.AWSKeyID, cfg.AWSSecretKey, "")),
)
if err != nil {
r.Error = fmt.Sprintf("aws config: %v", err)
return r
}
ec2Client := ec2.NewFromConfig(awsCfg)
ssmClient := ssm.NewFromConfig(awsCfg)
// Auto-generate DNS subdomain if not set
sub := pop.Subdomain()
if sub == "" {
prefix := strings.ToLower(pop.Country)
// Find next available ordinal
allPOPs, _ := readPOPs(cfg.DBPath)
ordinal := 1
for {
candidate := fmt.Sprintf("%s%d", prefix, ordinal)
taken := false
for _, p := range allPOPs {
if p.Subdomain() == candidate {
taken = true
break
}
}
if !taken {
sub = candidate
break
}
ordinal++
}
// Persist to DB immediately
dns := sub + "." + cfg.Zone
pop.DNS = dns
localDB, _ := sql.Open("sqlite", cfg.DBPath)
localDB.Exec(`UPDATE pops SET dns=? WHERE pop_id=?`, dns, pop.PopID)
localDB.Close()
log(cfg, " [%s] auto-assigned DNS: %s", pop.City, dns)
}
dns := sub + "." + cfg.Zone
if cfg.DryRun {
r.OK = true
r.Message = fmt.Sprintf("would provision %s (%s) as %s", pop.City, region, dns)
log(cfg, " [%s] DRY RUN: %s", pop.City, r.Message)
return r
}
// --- Step 1: Launch EC2 ---
log(cfg, " [%s] looking up AMI in %s...", pop.City, region)
amiParam, err := ssmClient.GetParameter(ctx, &ssm.GetParameterInput{
Name: strPtr("/aws/service/ami-amazon-linux-latest/al2023-ami-kernel-default-arm64"),
})
if err != nil {
r.Error = fmt.Sprintf("AMI lookup: %v", err)
return r
}
amiID := *amiParam.Parameter.Value
sgID, err := ensureSecurityGroup(ctx, ec2Client, region, pop.City, cfg)
if err != nil {
r.Error = fmt.Sprintf("security group: %v", err)
return r
}
log(cfg, " [%s] launching t4g.nano (AMI: %s, SG: %s)...", pop.City, amiID, sgID)
runOut, err := ec2Client.RunInstances(ctx, &ec2.RunInstancesInput{
ImageId: &amiID,
InstanceType: ec2types.InstanceTypeT4gNano,
MinCount: int32Ptr(1),
MaxCount: int32Ptr(1),
SecurityGroupIds: []string{sgID},
IamInstanceProfile: &ec2types.IamInstanceProfileSpecification{
Name: strPtr("vault1984-ssm-profile"),
},
TagSpecifications: []ec2types.TagSpecification{{
ResourceType: ec2types.ResourceTypeInstance,
Tags: []ec2types.Tag{
{Key: strPtr("Name"), Value: strPtr("clavitor-" + sub)},
{Key: strPtr("clavitor-pop"), Value: strPtr(sub)},
},
}},
})
if err != nil {
r.Error = fmt.Sprintf("launch: %v", err)
return r
}
instanceID := *runOut.Instances[0].InstanceId
log(cfg, " [%s] instance: %s — waiting for IP...", pop.City, instanceID)
waiter := ec2.NewInstanceRunningWaiter(ec2Client)
if err := waiter.Wait(ctx, &ec2.DescribeInstancesInput{InstanceIds: []string{instanceID}}, 3*time.Minute); err != nil {
r.Error = fmt.Sprintf("wait: %v", err)
return r
}
descOut, err := ec2Client.DescribeInstances(ctx, &ec2.DescribeInstancesInput{InstanceIds: []string{instanceID}})
if err != nil || len(descOut.Reservations) == 0 || len(descOut.Reservations[0].Instances) == 0 {
r.Error = fmt.Sprintf("describe: %v", err)
return r
}
publicIP := ""
if descOut.Reservations[0].Instances[0].PublicIpAddress != nil {
publicIP = *descOut.Reservations[0].Instances[0].PublicIpAddress
}
if publicIP == "" {
r.Error = "no public IP"
return r
}
log(cfg, " [%s] IP: %s", pop.City, publicIP)
// --- Step 2: Update local DB ---
log(cfg, " [%s] updating DB...", pop.City)
localDB, err := sql.Open("sqlite", cfg.DBPath)
if err != nil {
r.Error = fmt.Sprintf("open db: %v", err)
return r
}
localDB.Exec(`UPDATE pops SET instance_id=?, ip=?, dns=?, status='live' WHERE pop_id=?`,
instanceID, publicIP, dns, pop.PopID)
localDB.Close()
// Reload pop with updated fields
pop.InstanceID = instanceID
pop.IP = publicIP
pop.DNS = dns
pop.Status = "live"
// --- Step 3: DNS ---
log(cfg, " [%s] creating DNS %s → %s...", pop.City, dns, publicIP)
if cfg.CFToken != "" {
zoneID := cfg.CFZoneID
if zoneID == "" {
zoneID, _ = cfResolveZoneID(cfg.CFToken, cfg.Zone)
}
if zoneID != "" {
cfCreateRecord(cfg.CFToken, zoneID, dns, publicIP)
}
}
// --- Step 4: Wait for SSM agent, then bootstrap Tailscale ---
log(cfg, " [%s] waiting for SSM agent (up to 90s)...", pop.City)
time.Sleep(30 * time.Second) // SSM agent takes ~30s to register after boot
if cfg.TSAuthKey != "" {
if err := ssmBootstrapTailscale(cfg, pop); err != nil {
log(cfg, " [%s] tailscale warning: %v", pop.City, err)
}
}
// --- Step 5: Deploy vault binary ---
log(cfg, " [%s] deploying vault...", pop.City)
binaryPath := buildVault(cfg)
hansDir := "/tmp/clavitor-serve"
hansFile := hansDir + "/clavitor"
sshExec(cfg.HansHost, "mkdir -p "+hansDir)
if err := scpToHost(cfg.HansHost, binaryPath, hansFile); err != nil {
r.Error = fmt.Sprintf("scp to hans: %v", err)
return r
}
sshExec(cfg.HansHost, "pkill -f 'http.server 9876' 2>/dev/null; exit 0")
sshExec(cfg.HansHost, "sudo iptables -I INPUT -p tcp --dport 9876 -j ACCEPT")
time.Sleep(500 * time.Millisecond)
sshBackground(cfg.HansHost, fmt.Sprintf("cd %s && exec python3 -m http.server 9876 --bind 0.0.0.0 >/dev/null 2>&1", hansDir))
time.Sleep(2 * time.Second)
downloadURL := fmt.Sprintf("http://185.218.204.47:9876/clavitor")
installCmds := []string{
"mkdir -p /opt/clavitor/bin /opt/clavitor/data /opt/clavitor/certs",
fmt.Sprintf("curl -sf -o /tmp/clavitor-new %s", downloadURL),
"mv /tmp/clavitor-new /opt/clavitor/bin/clavitor",
"chmod +x /opt/clavitor/bin/clavitor",
fmt.Sprintf(`cat > /opt/clavitor/env << 'ENVEOF'
PORT=1984
VAULT_MODE=hosted
DATA_DIR=/opt/clavitor/data
TELEMETRY_FREQ=30
TELEMETRY_HOST=https://clavitor.ai/telemetry
TELEMETRY_TOKEN=clavitor-fleet-2026
TLS_DOMAIN=%s
CF_API_TOKEN=dSVz7JZtyK023q7kh4MMNmIggK1dahWdnBxVnP3O
TLS_CERT_DIR=/opt/clavitor/certs
TLS_EMAIL=ops@clavitor.ai
ENVEOF`, dns),
`cat > /etc/systemd/system/clavitor.service << 'UNITEOF'
[Unit]
Description=Clavitor Vault
After=network-online.target
Wants=network-online.target
[Service]
Type=simple
ExecStart=/opt/clavitor/bin/clavitor
EnvironmentFile=/opt/clavitor/env
WorkingDirectory=/opt/clavitor/data
Restart=always
RestartSec=5
User=root
[Install]
WantedBy=multi-user.target
UNITEOF`,
"systemctl daemon-reload",
"systemctl enable clavitor",
"systemctl restart clavitor",
"sleep 3",
"systemctl is-active clavitor",
}
out, err := ssmRunCommand(cfg.HansHost, instanceID, region, installCmds)
// Clean up Hans
sshExec(cfg.HansHost, "pkill -f 'http.server 9876' 2>/dev/null; exit 0")
sshExec(cfg.HansHost, "sudo iptables -D INPUT -p tcp --dport 9876 -j ACCEPT 2>/dev/null; exit 0")
sshExec(cfg.HansHost, "rm -rf "+hansDir)
if err != nil {
r.Error = fmt.Sprintf("deploy: %v\n%s", err, out)
return r
}
log(cfg, " [%s] service: %s", pop.City, strings.TrimSpace(out))
// --- Step 6: Verify ---
log(cfg, " [%s] verifying TLS...", pop.City)
time.Sleep(5 * time.Second)
verifyURL := fmt.Sprintf("https://%s:1984/ping", dns)
resp, err := http.Get(verifyURL)
if err == nil {
resp.Body.Close()
log(cfg, " [%s] TLS verified ✓", pop.City)
} else {
log(cfg, " [%s] TLS not ready yet (cert may take a minute): %v", pop.City, err)
}
r.OK = true
r.Message = fmt.Sprintf("%s → %s → %s (%s) — live", instanceID, publicIP, dns, region)
log(cfg, "\n [%s] DONE: %s\n", pop.City, r.Message)
return r
}
func ensureSecurityGroup(ctx context.Context, client *ec2.Client, region, city string, cfg Config) (string, error) {
// Check if vault1984-pop exists in this region
descOut, err := client.DescribeSecurityGroups(ctx, &ec2.DescribeSecurityGroupsInput{
Filters: []ec2types.Filter{{
Name: strPtr("group-name"),
Values: []string{"vault1984-pop"},
}},
})
if err == nil && len(descOut.SecurityGroups) > 0 {
return *descOut.SecurityGroups[0].GroupId, nil
}
// Create it
log(cfg, " [%s] creating security group vault1984-pop in %s...", city, region)
createOut, err := client.CreateSecurityGroup(ctx, &ec2.CreateSecurityGroupInput{
GroupName: strPtr("vault1984-pop"),
Description: strPtr("Clavitor POP - port 1984 only"),
})
if err != nil {
return "", fmt.Errorf("create sg: %w", err)
}
sgID := *createOut.GroupId
// Add port 1984
proto := "tcp"
port := int32(1984)
_, err = client.AuthorizeSecurityGroupIngress(ctx, &ec2.AuthorizeSecurityGroupIngressInput{
GroupId: &sgID,
IpPermissions: []ec2types.IpPermission{{
IpProtocol: &proto,
FromPort: &port,
ToPort: &port,
IpRanges: []ec2types.IpRange{{CidrIp: strPtr("0.0.0.0/0")}},
}},
})
if err != nil {
return sgID, fmt.Errorf("add rule: %w", err)
}
return sgID, nil
}
// --- Node execution: Tailscale SSH with SSM fallback ---
// nodeExec runs a command on a node, trying Tailscale SSH first, falling back to SSM.
func nodeExec(cfg Config, pop POP, command string) (string, error) {
out, err := tsSshExec(pop.Subdomain(), command)
if err == nil {
return out, nil
}
// Tailscale SSH failed — fall back to SSM if we have instance_id
if pop.InstanceID != "" {
log(cfg, " [%s] TS SSH failed, falling back to SSM...", pop.Subdomain())
return ssmRunCommand(cfg.HansHost, pop.InstanceID, pop.RegionName, []string{command})
}
return out, err
}
// nodePushFile copies a local file to a node, trying SCP (Tailscale) first, falling back to SSM+S3-less transfer via Hans.
func nodePushFile(cfg Config, pop POP, localPath, remotePath string) error {
err := scpToNode(pop.Subdomain(), localPath, remotePath)
if err == nil {
return nil
}
// Tailscale SCP failed — fall back: SCP to Hans, then Hans SCPs to node's public IP
if pop.InstanceID != "" && pop.IP != "" {
log(cfg, " [%s] TS SCP failed, falling back via Hans...", pop.Subdomain())
hansPath := fmt.Sprintf("/tmp/clavitor-deploy-%s", pop.Subdomain())
// Upload to Hans
if err := scpToHost(cfg.HansHost, localPath, hansPath); err != nil {
return fmt.Errorf("scp to hans: %w", err)
}
// Hans pushes to node via SSM (base64 chunks for small files, or just curl from Hans if reachable)
// For a 14MB binary, use Hans → node SCP via public IP with SSM to temporarily allow it
// Simplest: have the node pull from Hans via Tailscale
pullCmd := fmt.Sprintf("scp -o StrictHostKeyChecking=no -o ConnectTimeout=10 %s:%s %s",
cfg.HansHost, hansPath, remotePath)
_, err := ssmRunCommand(cfg.HansHost, pop.InstanceID, pop.RegionName, []string{pullCmd})
// Clean up
sshExec(cfg.HansHost, "rm -f "+hansPath)
return err
}
return err
}
func tsSshExec(hostname, command string) (string, error) {
cmd := exec.Command("ssh",
"-o", "StrictHostKeyChecking=no",
"-o", "ConnectTimeout=5",
"-o", "UserKnownHostsFile=/dev/null",
"-o", "LogLevel=ERROR",
"root@"+hostname,
command,
)
var stdout, stderr bytes.Buffer
cmd.Stdout = &stdout
cmd.Stderr = &stderr
err := cmd.Run()
if err != nil {
return stdout.String() + stderr.String(), err
}
return stdout.String(), nil
}
func scpToNode(hostname, localPath, remotePath string) error {
cmd := exec.Command("scp",
"-o", "StrictHostKeyChecking=no",
"-o", "ConnectTimeout=5",
"-o", "UserKnownHostsFile=/dev/null",
"-o", "LogLevel=ERROR",
localPath,
"root@"+hostname+":"+remotePath,
)
out, err := cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("%v: %s", err, string(out))
}
return nil
}
func scpToHost(host, localPath, remotePath string) error {
cmd := exec.Command("scp",
"-o", "StrictHostKeyChecking=no",
"-o", "ConnectTimeout=10",
localPath,
host+":"+remotePath,
)
out, err := cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("%v: %s", err, string(out))
}
return nil
}
// --- SSM (for bootstrap only, before Tailscale is available) ---
func ssmBootstrapTailscale(cfg Config, pop POP) error {
hostname := pop.Subdomain()
region := pop.RegionName
log(cfg, " Installing Tailscale...")
out, err := ssmRunCommand(cfg.HansHost, pop.InstanceID, region, []string{
"command -v tailscale >/dev/null 2>&1 && echo 'already installed' && exit 0",
"curl -fsSL https://tailscale.com/install.sh | sh",
})
if err != nil {
return fmt.Errorf("install tailscale: %w", err)
}
log(cfg, " %s", lastLines(out, 2))
log(cfg, " Joining tailnet as %s...", hostname)
out, err = ssmRunCommand(cfg.HansHost, pop.InstanceID, region, []string{
"systemctl enable --now tailscaled 2>/dev/null || true",
"sleep 2",
fmt.Sprintf("tailscale up --authkey=%s --hostname=%s --ssh --reset", cfg.TSAuthKey, hostname),
"tailscale ip -4",
})
if err != nil {
return fmt.Errorf("join tailnet: %w", err)
}
log(cfg, " TS IP: %s", strings.TrimSpace(out))
log(cfg, " Setting hostname...")
_, _ = ssmRunCommand(cfg.HansHost, pop.InstanceID, region, []string{
fmt.Sprintf("hostnamectl set-hostname %s", hostname),
fmt.Sprintf("grep -q '%s' /etc/hosts || echo '127.0.1.1 %s %s.%s' >> /etc/hosts", hostname, hostname, hostname, cfg.Zone),
})
return nil
}
func ssmRunCommand(hansHost, instanceID, region string, commands []string) (string, error) {
params := map[string][]string{"commands": commands}
paramsJSON, _ := json.Marshal(params)
tmpFile := fmt.Sprintf("/tmp/ssm-%s-%d.json", instanceID, time.Now().UnixNano())
writeCmd := fmt.Sprintf("cat > %s << 'SSMEOF'\n%s\nSSMEOF", tmpFile, string(paramsJSON))
if _, err := sshExec(hansHost, writeCmd); err != nil {
return "", fmt.Errorf("writing params: %w", err)
}
ssmCmd := fmt.Sprintf(
`aws ssm send-command --instance-ids %s --document-name AWS-RunShellScript --region %s --parameters file://%s --query Command.CommandId --output text && rm -f %s`,
instanceID, region, tmpFile, tmpFile,
)
cmdID, err := sshExec(hansHost, ssmCmd)
if err != nil {
sshExec(hansHost, "rm -f "+tmpFile)
return "", fmt.Errorf("send-command: %w\n%s", err, cmdID)
}
cmdID = strings.TrimSpace(cmdID)
if cmdID == "" {
return "", fmt.Errorf("empty command ID returned")
}
for i := 0; i < 36; i++ {
time.Sleep(5 * time.Second)
pollCmd := fmt.Sprintf(
`aws ssm get-command-invocation --command-id %s --instance-id %s --region %s --query '[Status,StandardOutputContent,StandardErrorContent]' --output text 2>/dev/null`,
cmdID, instanceID, region,
)
result, err := sshExec(hansHost, pollCmd)
if err != nil {
continue
}
parts := strings.SplitN(strings.TrimSpace(result), "\t", 3)
if len(parts) < 1 {
continue
}
switch parts[0] {
case "Success":
stdout := ""
if len(parts) >= 2 {
stdout = parts[1]
}
return stdout, nil
case "Failed":
stderr := ""
if len(parts) >= 3 {
stderr = parts[2]
}
return "", fmt.Errorf("command failed: %s", stderr)
}
}
return "", fmt.Errorf("timeout waiting for SSM command %s", cmdID)
}
// sshBackground starts a command on a remote host via SSH, forking to background.
func sshBackground(host, command string) {
exec.Command("ssh", "-f", "-o", "StrictHostKeyChecking=no", "-o", "ConnectTimeout=10", host, command).Run()
}
func sshExec(host, command string) (string, error) {
cmd := exec.Command("ssh", "-o", "StrictHostKeyChecking=no", "-o", "ConnectTimeout=10", host, command)
var stdout, stderr bytes.Buffer
cmd.Stdout = &stdout
cmd.Stderr = &stderr
err := cmd.Run()
if err != nil {
return stdout.String() + stderr.String(), err
}
return stdout.String(), nil
}
// --- DNS sync ---
func syncDNS(cfg Config, pops []POP) []NodeResult {
records, err := cfListRecords(cfg.CFToken, cfg.CFZoneID)
if err != nil {
fatal("listing DNS records: %v", err)
}
existing := map[string]cfDNSRecord{}
protected := map[string]bool{cfg.Zone: true, "www." + cfg.Zone: true}
for _, r := range records {
if r.Type == "A" && !protected[r.Name] {
existing[r.Name] = r
}
}
desired := map[string]POP{}
for _, p := range pops {
if p.Zone() == cfg.Zone {
desired[p.DNS] = p
}
}
var results []NodeResult
for fqdn, pop := range desired {
r := NodeResult{Node: pop.Subdomain(), Action: "dns"}
if rec, ok := existing[fqdn]; ok {
if rec.Content == pop.IP {
r.OK = true
r.Message = fmt.Sprintf("%s -> %s", fqdn, pop.IP)
log(cfg, " OK %s -> %s", fqdn, pop.IP)
} else {
if !cfg.DryRun {
if err := cfUpdateRecord(cfg.CFToken, cfg.CFZoneID, rec.ID, fqdn, pop.IP); err != nil {
r.Error = err.Error()
} else {
r.OK = true
}
}
r.Message = fmt.Sprintf("updated %s -> %s (was %s)", fqdn, pop.IP, rec.Content)
log(cfg, " UPD %s: %s -> %s", fqdn, rec.Content, pop.IP)
}
delete(existing, fqdn)
} else {
if !cfg.DryRun {
if err := cfCreateRecord(cfg.CFToken, cfg.CFZoneID, fqdn, pop.IP); err != nil {
r.Error = err.Error()
} else {
r.OK = true
}
}
r.Message = fmt.Sprintf("created %s -> %s", fqdn, pop.IP)
log(cfg, " ADD %s -> %s", fqdn, pop.IP)
}
results = append(results, r)
}
for fqdn, rec := range existing {
sub := strings.TrimSuffix(fqdn, "."+cfg.Zone)
if sub == fqdn || isKnownNonPOP(sub) {
continue
}
r := NodeResult{Node: sub, Action: "dns-delete"}
if !cfg.DryRun {
if err := cfDeleteRecord(cfg.CFToken, cfg.CFZoneID, rec.ID); err != nil {
r.Error = err.Error()
} else {
r.OK = true
}
}
r.Message = fmt.Sprintf("deleted %s (%s)", fqdn, rec.Content)
log(cfg, " DEL %s (%s)", fqdn, rec.Content)
results = append(results, r)
}
return results
}
func isKnownNonPOP(sub string) bool {
skip := map[string]bool{
"dev": true, "soc": true, "api": true, "mail": true,
"mx": true, "ns1": true, "ns2": true, "status": true,
}
return skip[sub]
}
// --- Tailscale sync ---
func syncTailscale(cfg Config, pops []POP) []NodeResult {
devices, err := tsListDevices(cfg.TSKey)
if err != nil {
fatal("listing Tailscale devices: %v", err)
}
desired := map[string]POP{}
for _, p := range pops {
if sub := p.Subdomain(); sub != "" {
desired[sub] = p
}
}
devicesByHostname := map[string][]tsDevice{}
for _, d := range devices {
devicesByHostname[d.Hostname] = append(devicesByHostname[d.Hostname], d)
}
var results []NodeResult
for sub, pop := range desired {
r := NodeResult{Node: sub, Action: "tailscale"}
if devs, ok := devicesByHostname[sub]; ok && len(devs) > 0 {
tsIP := ""
if len(devs[0].Addresses) > 0 {
tsIP = devs[0].Addresses[0]
}
r.OK = true
r.Message = fmt.Sprintf("TS:%s public:%s", tsIP, pop.IP)
log(cfg, " OK %s — %s (TS: %s, public: %s)", sub, pop.City, tsIP, pop.IP)
delete(devicesByHostname, sub)
} else {
if pop.InstanceID != "" && cfg.TSAuthKey != "" && !cfg.DryRun {
log(cfg, " JOIN %s — %s via SSM (%s in %s)...", sub, pop.City, pop.InstanceID, pop.RegionName)
if err := ssmBootstrapTailscale(cfg, pop); err != nil {
r.Error = err.Error()
log(cfg, " ERR: %v", err)
} else {
r.OK = true
r.Message = "joined via SSM"
log(cfg, " Joined tailnet as %s", sub)
}
} else if pop.InstanceID != "" && cfg.TSAuthKey != "" {
r.Message = fmt.Sprintf("would SSM bootstrap %s in %s", pop.InstanceID, pop.RegionName)
log(cfg, " JOIN %s — %s (would bootstrap)", sub, pop.City)
} else if pop.InstanceID != "" {
r.Error = "no -ts-authkey provided"
log(cfg, " MISSING %s — needs -ts-authkey", sub)
} else {
r.Error = "no instance_id, cannot auto-join"
log(cfg, " MISSING %s — no instance_id", sub)
}
}
results = append(results, r)
}
for hostname, devs := range devicesByHostname {
for _, d := range devs {
if !isInfraDevice(d) {
continue
}
r := NodeResult{Node: hostname, Action: "tailscale-delete"}
if !cfg.DryRun {
if err := tsDeleteDevice(cfg.TSKey, d.ID); err != nil {
r.Error = err.Error()
} else {
r.OK = true
r.Message = "removed stale device"
}
} else {
r.Message = "would remove stale device"
}
log(cfg, " STALE %s — removed", hostname)
results = append(results, r)
}
}
return results
}
func isInfraDevice(d tsDevice) bool {
return strings.Contains(d.Name, "tagged-")
}
// --- Parallel execution ---
func parallelExec(pops []POP, workers int, fn func(POP) NodeResult) []NodeResult {
ch := make(chan POP, len(pops))
for _, p := range pops {
ch <- p
}
close(ch)
var mu sync.Mutex
var results []NodeResult
var wg sync.WaitGroup
for i := 0; i < workers && i < len(pops); i++ {
wg.Add(1)
go func() {
defer wg.Done()
for p := range ch {
r := fn(p)
mu.Lock()
results = append(results, r)
mu.Unlock()
}
}()
}
wg.Wait()
return results
}
// --- DB ---
func loadLivePOPs(cfg Config) []POP {
pops, err := readPOPs(cfg.DBPath)
if err != nil {
fatal("reading DB: %v", err)
}
var live []POP
for _, p := range pops {
if p.Status == "live" && p.IP != "" && p.DNS != "" {
live = append(live, p)
}
}
return live
}
func readPOPs(dbPath string) ([]POP, error) {
db, err := sql.Open("sqlite", dbPath)
if err != nil {
return nil, err
}
defer db.Close()
rows, err := db.Query(`SELECT pop_id, city, country, region_name, ip, dns, status, provider, instance_id FROM pops ORDER BY pop_id`)
if err != nil {
return nil, err
}
defer rows.Close()
var pops []POP
for rows.Next() {
var p POP
if err := rows.Scan(&p.PopID, &p.City, &p.Country, &p.RegionName, &p.IP, &p.DNS, &p.Status, &p.Provider, &p.InstanceID); err != nil {
return nil, err
}
pops = append(pops, p)
}
return pops, rows.Err()
}
func filterNodes(cfg Config, pops []POP) []POP {
if cfg.Nodes == "" {
return pops
}
want := map[string]bool{}
for _, n := range strings.Split(cfg.Nodes, ",") {
want[strings.TrimSpace(n)] = true
}
var out []POP
for _, p := range pops {
if want[p.Subdomain()] {
out = append(out, p)
}
}
return out
}
// --- Output ---
func log(cfg Config, format string, args ...interface{}) {
if !cfg.JSONOut {
fmt.Printf(format+"\n", args...)
}
}
func outputResults(cfg Config, results []NodeResult) {
if cfg.JSONOut {
enc := json.NewEncoder(os.Stdout)
enc.SetIndent("", " ")
enc.Encode(results)
}
}
func oneLineStatus(raw string) string {
lines := strings.Split(strings.TrimSpace(raw), "\n")
parts := make([]string, 0, len(lines))
for _, l := range lines {
l = strings.TrimSpace(l)
if l != "" {
parts = append(parts, l)
}
}
return strings.Join(parts, " | ")
}
func lastLines(s string, n int) string {
lines := strings.Split(strings.TrimSpace(s), "\n")
if len(lines) <= n {
return strings.TrimSpace(s)
}
return strings.Join(lines[len(lines)-n:], "\n")
}
func requireKeys(cfg Config, keys ...string) {
for _, k := range keys {
switch k {
case "cf":
if cfg.CFToken == "" {
fatal("Cloudflare token required: set CF_API_TOKEN or -cf-token")
}
case "ts":
if cfg.TSKey == "" {
fatal("Tailscale API key required: set TS_API_KEY or -ts-key")
}
}
}
}
func fatal(format string, args ...interface{}) {
fmt.Fprintf(os.Stderr, "FATAL: "+format+"\n", args...)
os.Exit(2)
}
// --- Cloudflare API ---
type cfDNSRecord struct {
ID string `json:"id"`
Type string `json:"type"`
Name string `json:"name"`
Content string `json:"content"`
Proxied bool `json:"proxied"`
TTL int `json:"ttl"`
}
type cfListResponse struct {
Success bool `json:"success"`
Result []cfDNSRecord `json:"result"`
}
type cfMutateResponse struct {
Success bool `json:"success"`
Result cfDNSRecord `json:"result"`
Errors []struct {
Message string `json:"message"`
} `json:"errors"`
}
func cfResolveZoneID(token, zone string) (string, error) {
req, _ := http.NewRequest("GET", "https://api.cloudflare.com/client/v4/zones?name="+zone, nil)
req.Header.Set("Authorization", "Bearer "+token)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return "", err
}
defer resp.Body.Close()
var result struct {
Result []struct {
ID string `json:"id"`
} `json:"result"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return "", err
}
if len(result.Result) == 0 {
return "", fmt.Errorf("zone %q not found", zone)
}
return result.Result[0].ID, nil
}
func cfListRecords(token, zoneID string) ([]cfDNSRecord, error) {
var all []cfDNSRecord
page := 1
for {
url := fmt.Sprintf("https://api.cloudflare.com/client/v4/zones/%s/dns_records?per_page=100&page=%d", zoneID, page)
req, _ := http.NewRequest("GET", url, nil)
req.Header.Set("Authorization", "Bearer "+token)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
}
var result cfListResponse
err = json.NewDecoder(resp.Body).Decode(&result)
resp.Body.Close()
if err != nil {
return nil, err
}
all = append(all, result.Result...)
if len(result.Result) < 100 {
break
}
page++
}
return all, nil
}
func cfCreateRecord(token, zoneID, fqdn, ip string) error {
body, _ := json.Marshal(map[string]interface{}{
"type": "A", "name": fqdn, "content": ip, "ttl": 1, "proxied": false,
})
url := fmt.Sprintf("https://api.cloudflare.com/client/v4/zones/%s/dns_records", zoneID)
req, _ := http.NewRequest("POST", url, bytes.NewReader(body))
req.Header.Set("Authorization", "Bearer "+token)
req.Header.Set("Content-Type", "application/json")
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
var result cfMutateResponse
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return err
}
if !result.Success {
return fmt.Errorf("API error: %v", result.Errors)
}
return nil
}
func cfUpdateRecord(token, zoneID, recordID, fqdn, ip string) error {
body, _ := json.Marshal(map[string]interface{}{
"type": "A", "name": fqdn, "content": ip, "ttl": 1, "proxied": false,
})
url := fmt.Sprintf("https://api.cloudflare.com/client/v4/zones/%s/dns_records/%s", zoneID, recordID)
req, _ := http.NewRequest("PUT", url, bytes.NewReader(body))
req.Header.Set("Authorization", "Bearer "+token)
req.Header.Set("Content-Type", "application/json")
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
var result cfMutateResponse
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return err
}
if !result.Success {
return fmt.Errorf("API error: %v", result.Errors)
}
return nil
}
func cfDeleteRecord(token, zoneID, recordID string) error {
url := fmt.Sprintf("https://api.cloudflare.com/client/v4/zones/%s/dns_records/%s", zoneID, recordID)
req, _ := http.NewRequest("DELETE", url, nil)
req.Header.Set("Authorization", "Bearer "+token)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
var result struct {
Success bool `json:"success"`
}
json.Unmarshal(body, &result)
if !result.Success {
return fmt.Errorf("delete failed: %s", string(body))
}
return nil
}
// --- Tailscale API ---
type tsDevice struct {
ID string `json:"id"`
Hostname string `json:"hostname"`
Name string `json:"name"`
Addresses []string `json:"addresses"`
}
type tsListResponse struct {
Devices []tsDevice `json:"devices"`
}
func tsListDevices(apiKey string) ([]tsDevice, error) {
req, _ := http.NewRequest("GET", "https://api.tailscale.com/api/v2/tailnet/-/devices", nil)
req.SetBasicAuth(apiKey, "")
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
body, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("HTTP %d: %s", resp.StatusCode, string(body))
}
var result tsListResponse
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, err
}
return result.Devices, nil
}
func tsDeleteDevice(apiKey, deviceID string) error {
req, _ := http.NewRequest("DELETE", "https://api.tailscale.com/api/v2/device/"+deviceID, nil)
req.SetBasicAuth(apiKey, "")
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("HTTP %d: %s", resp.StatusCode, string(body))
}
return nil
}