package main import ( "bytes" "context" "database/sql" "encoding/json" "fmt" "io" "net/http" "os" "os/exec" "path/filepath" "strings" "sync" "time" awsconfig "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/service/ec2" ec2types "github.com/aws/aws-sdk-go-v2/service/ec2/types" _ "modernc.org/sqlite" ) // --------------------------------------------------------------------------- // pop-sync — Clavitor fleet management tool // // Subcommands: // sync Reconcile DNS + Tailscale to match the pops DB // deploy Build clovis-vault for arm64, push to all live nodes, restart // status Health-check all live nodes (Tailscale reachable, service up) // exec Run a shell command on all (or specific) live nodes // // Designed for both human and agent use: // --json Machine-readable JSON output // Exit 0 Everything OK // Exit 1 Partial failure (some nodes failed) // Exit 2 Fatal / config error // --------------------------------------------------------------------------- // --- Types --- type POP struct { PopID int `json:"pop_id"` City string `json:"city"` Country string `json:"country"` RegionName string `json:"region_name"` IP string `json:"ip"` DNS string `json:"dns"` Status string `json:"status"` Provider string `json:"provider"` InstanceID string `json:"instance_id"` } func (p POP) Subdomain() string { parts := strings.SplitN(p.DNS, ".", 2) if len(parts) > 0 { return parts[0] } return "" } func (p POP) Zone() string { parts := strings.SplitN(p.DNS, ".", 2) if len(parts) == 2 { return parts[1] } return "" } type Config struct { DBPath string CFToken string TSKey string TSAuthKey string HansHost string DryRun bool JSONOut bool Zone string CFZoneID string VaultSrc string // path to clovis-vault source Nodes string // comma-separated node filter (empty = all) AWSKeyID string AWSSecretKey string } type NodeResult struct { Node string `json:"node"` Action string `json:"action"` OK bool `json:"ok"` Message string `json:"message,omitempty"` Error string `json:"error,omitempty"` } // --- Main --- func main() { if len(os.Args) < 2 { printUsage() os.Exit(2) } cmd := os.Args[1] // Shift args so flags work after subcommand os.Args = append(os.Args[:1], os.Args[2:]...) cfg, remaining := parseFlags() switch cmd { case "sync": exitWith(cmdSync(cfg)) case "deploy": exitWith(cmdDeploy(cfg)) case "status": exitWith(cmdStatus(cfg)) case "exec": if len(remaining) == 0 { fatal("usage: pop-sync exec ") } exitWith(cmdExec(cfg, strings.Join(remaining, " "))) case "firewall": exitWith(cmdFirewall(cfg)) case "maintenance": if len(remaining) == 0 { fatal("usage: pop-sync maintenance [reason]") } cmdMaintenance(cfg, remaining) case "help", "--help", "-h": printUsage() default: fmt.Fprintf(os.Stderr, "unknown command: %s\n", cmd) printUsage() os.Exit(2) } } func printUsage() { fmt.Println(`pop-sync — Clavitor fleet management Commands: sync Reconcile DNS + Tailscale with pops DB (create, update, delete) deploy Build clovis-vault (arm64), deploy to all live nodes, restart service status Health-check all live nodes (reachable, service running, version) exec Run a command on live nodes: pop-sync exec maintenance Toggle maintenance mode: pop-sync maintenance on "fleet deploy" Flags: -db Path to clavitor.db (default: ../clavitor.com/clavitor.db) -cf-token Cloudflare API token (or CF_API_TOKEN env) -ts-key Tailscale API key (or TS_API_KEY env) -ts-authkey Tailscale auth key for joining new nodes (or TS_AUTHKEY env) -hans SSH target for Hans/SSM relay (default: johan@185.218.204.47) -vault-src Path to clovis-vault source (default: ../clavis/clavis-vault) -zone DNS zone (default: clavitor.ai) -nodes Comma-separated node filter, e.g. "use1,sg1" (default: all) -dry-run Show what would change without doing it -json Output results as JSON (agent-friendly) Examples: pop-sync sync --dry-run pop-sync deploy pop-sync status --json pop-sync exec "systemctl status clavitor" pop-sync exec -nodes use1,sg1 "journalctl -u clavitor -n 20"`) } func parseFlags() (Config, []string) { cfg := Config{} var remaining []string for i := 1; i < len(os.Args); i++ { arg := os.Args[i] if !strings.HasPrefix(arg, "-") { remaining = append(remaining, arg) continue } next := func() string { if i+1 < len(os.Args) { i++ return os.Args[i] } fatal("missing value for %s", arg) return "" } switch arg { case "-db": cfg.DBPath = next() case "-cf-token": cfg.CFToken = next() case "-ts-key": cfg.TSKey = next() case "-ts-authkey": cfg.TSAuthKey = next() case "-hans": cfg.HansHost = next() case "-vault-src": cfg.VaultSrc = next() case "-zone": cfg.Zone = next() case "-cf-zone-id": cfg.CFZoneID = next() case "-nodes": cfg.Nodes = next() case "-aws-key": cfg.AWSKeyID = next() case "-aws-secret": cfg.AWSSecretKey = next() case "-dry-run", "--dry-run": cfg.DryRun = true case "-json", "--json": cfg.JSONOut = true default: remaining = append(remaining, arg) } } // Defaults if cfg.DBPath == "" { cfg.DBPath = "../clavitor.com/clavitor.db" } if cfg.CFToken == "" { cfg.CFToken = os.Getenv("CF_API_TOKEN") } if cfg.TSKey == "" { cfg.TSKey = os.Getenv("TS_API_KEY") } if cfg.TSAuthKey == "" { cfg.TSAuthKey = os.Getenv("TS_AUTHKEY") } if cfg.HansHost == "" { cfg.HansHost = "johan@185.218.204.47" } if cfg.VaultSrc == "" { cfg.VaultSrc = "../clavis/clavis-vault" } if cfg.Zone == "" { cfg.Zone = "clavitor.ai" } if cfg.AWSKeyID == "" { cfg.AWSKeyID = os.Getenv("AWS_ACCESS_KEY_ID") } if cfg.AWSSecretKey == "" { cfg.AWSSecretKey = os.Getenv("AWS_SECRET_ACCESS_KEY") } return cfg, remaining } func exitWith(results []NodeResult) { hasFailure := false for _, r := range results { if !r.OK { hasFailure = true break } } if hasFailure { os.Exit(1) } } // --- Subcommand: sync --- func cmdSync(cfg Config) []NodeResult { requireKeys(cfg, "cf", "ts") pops := loadLivePOPs(cfg) log(cfg, "DB: %d live POPs with DNS+IP", len(pops)) if cfg.CFZoneID == "" { var err error cfg.CFZoneID, err = cfResolveZoneID(cfg.CFToken, cfg.Zone) if err != nil { fatal("resolving zone: %v", err) } } var results []NodeResult // DNS sync log(cfg, "\n--- Cloudflare DNS ---") dnsResults := syncDNS(cfg, pops) results = append(results, dnsResults...) // Tailscale sync log(cfg, "\n--- Tailscale ---") tsResults := syncTailscale(cfg, pops) results = append(results, tsResults...) // Firewall sync if cfg.AWSKeyID != "" { log(cfg, "\n--- AWS Firewall ---") for _, p := range pops { results = append(results, ensureFirewall(cfg, p)) } } outputResults(cfg, results) return results } // --- Subcommand: deploy --- func cmdDeploy(cfg Config) []NodeResult { pops := filterNodes(cfg, loadLivePOPs(cfg)) if len(pops) == 0 { fatal("no live nodes to deploy to") } // Step 0: Firewall if cfg.AWSKeyID != "" { log(cfg, "--- Firewall ---") for _, p := range pops { ensureFirewall(cfg, p) } } // Step 1: Build log(cfg, "\n--- Build ---") binaryPath := buildVault(cfg) log(cfg, "Built: %s", binaryPath) if cfg.DryRun { var results []NodeResult for _, p := range pops { results = append(results, NodeResult{Node: p.Subdomain(), Action: "deploy", OK: true, Message: "would deploy"}) } outputResults(cfg, results) return results } // Step 2: Upload binary to Hans and serve it temporarily via HTTP log(cfg, "\n--- Deploy to %d nodes ---", len(pops)) hansDir := "/tmp/clavitor-serve" hansFile := hansDir + "/clavitor" log(cfg, "Uploading binary to Hans...") sshExec(cfg.HansHost, "mkdir -p "+hansDir) if err := scpToHost(cfg.HansHost, binaryPath, hansFile); err != nil { fatal("scp to hans: %v", err) } // Start a temporary HTTP server on Hans (port 9876) log(cfg, "Starting temporary file server on Hans:9876...") sshExec(cfg.HansHost, "pkill -f 'http.server 9876' 2>/dev/null; exit 0") sshExec(cfg.HansHost, "sudo iptables -I INPUT -p tcp --dport 9876 -j ACCEPT") time.Sleep(500 * time.Millisecond) sshBackground(cfg.HansHost, fmt.Sprintf( "cd %s && exec python3 -m http.server 9876 --bind 0.0.0.0 >/dev/null 2>&1", hansDir)) time.Sleep(2 * time.Second) hansPublicIP := "185.218.204.47" downloadURL := fmt.Sprintf("http://%s:9876/clavitor", hansPublicIP) results := parallelExec(pops, 4, func(p POP) NodeResult { name := p.Subdomain() r := NodeResult{Node: name, Action: "deploy"} if p.InstanceID == "" { r.Error = "no instance_id — cannot deploy via SSM" return r } log(cfg, " [%s] deploying via SSM...", name) installCmds := []string{ "mkdir -p /opt/clavitor/bin /opt/clavitor/data", fmt.Sprintf("curl -sf -o /tmp/clavitor-new %s", downloadURL), "mv /tmp/clavitor-new /opt/clavitor/bin/clavitor", "chmod +x /opt/clavitor/bin/clavitor", fmt.Sprintf(`cat > /opt/clavitor/env << 'ENVEOF' PORT=1984 VAULT_MODE=hosted DATA_DIR=/opt/clavitor/data TELEMETRY_FREQ=30 TELEMETRY_HOST=https://clavitor.ai/telemetry TELEMETRY_TOKEN=clavitor-fleet-2026 TLS_DOMAIN=%s CF_API_TOKEN=dSVz7JZtyK023q7kh4MMNmIggK1dahWdnBxVnP3O TLS_CERT_DIR=/opt/clavitor/certs TLS_EMAIL=ops@clavitor.ai ENVEOF`, p.DNS), `test -f /etc/systemd/system/clavitor.service || cat > /etc/systemd/system/clavitor.service << 'UNITEOF' [Unit] Description=Clavitor Vault After=network-online.target Wants=network-online.target [Service] Type=simple ExecStart=/opt/clavitor/bin/clavitor EnvironmentFile=/opt/clavitor/env WorkingDirectory=/opt/clavitor/data Restart=always RestartSec=5 User=root [Install] WantedBy=multi-user.target UNITEOF`, "systemctl daemon-reload", "systemctl enable clavitor", "systemctl restart clavitor", "sleep 2", "systemctl is-active clavitor", } out, err := ssmRunCommand(cfg.HansHost, p.InstanceID, p.RegionName, installCmds) if err != nil { r.Error = fmt.Sprintf("deploy: %v\n%s", err, out) log(cfg, " [%s] FAIL: %v", name, err) return r } r.OK = true r.Message = strings.TrimSpace(out) log(cfg, " [%s] %s", name, r.Message) return r }) // Kill the temp HTTP server, close firewall, clean up sshExec(cfg.HansHost, "pkill -f 'http.server 9876' 2>/dev/null; exit 0") sshExec(cfg.HansHost, "sudo iptables -D INPUT -p tcp --dport 9876 -j ACCEPT 2>/dev/null; exit 0") sshExec(cfg.HansHost, "rm -rf "+hansDir) outputResults(cfg, results) return results } func buildVault(cfg Config) string { srcDir, _ := filepath.Abs(cfg.VaultSrc) outPath := filepath.Join(srcDir, "clavitor-linux-arm64") cmd := exec.Command("go", "build", "-o", outPath, "./cmd/clavitor") cmd.Dir = srcDir cmd.Env = append(os.Environ(), "GOOS=linux", "GOARCH=arm64", "CGO_ENABLED=0", ) out, err := cmd.CombinedOutput() if err != nil { fatal("build failed: %v\n%s", err, string(out)) } return outPath } // --- Subcommand: status --- func cmdStatus(cfg Config) []NodeResult { pops := filterNodes(cfg, loadLivePOPs(cfg)) log(cfg, "Checking %d nodes...\n", len(pops)) statusCmd := "echo SERVICE=$(systemctl is-active clavitor 2>/dev/null || echo stopped); echo HOST=$(hostname); uptime -p 2>/dev/null; echo TS=$(tailscale ip -4 2>/dev/null || echo none)" results := parallelExec(pops, 4, func(p POP) NodeResult { name := p.Subdomain() r := NodeResult{Node: name, Action: "status"} out, err := nodeExec(cfg, p, statusCmd) if err != nil { r.Error = fmt.Sprintf("unreachable: %v", err) log(cfg, " FAIL %s — %v", name, err) return r } r.OK = true r.Message = strings.TrimSpace(out) log(cfg, " OK %s — %s", name, oneLineStatus(out)) return r }) outputResults(cfg, results) return results } // --- Subcommand: exec --- func cmdExec(cfg Config, command string) []NodeResult { pops := filterNodes(cfg, loadLivePOPs(cfg)) log(cfg, "Running on %d nodes: %s\n", len(pops), command) results := parallelExec(pops, 4, func(p POP) NodeResult { name := p.Subdomain() r := NodeResult{Node: name, Action: "exec"} out, err := nodeExec(cfg, p, command) if err != nil { r.Error = fmt.Sprintf("%v\n%s", err, out) log(cfg, "--- %s (FAIL) ---\n%s\n%v", name, out, err) return r } r.OK = true r.Message = strings.TrimSpace(out) log(cfg, "--- %s ---\n%s", name, r.Message) return r }) outputResults(cfg, results) return results } // --- Subcommand: maintenance --- func cmdMaintenance(cfg Config, args []string) { action := args[0] reason := "" if len(args) > 1 { reason = strings.Join(args[1:], " ") } by := "pop-sync" if user := os.Getenv("USER"); user != "" { by = "pop-sync/" + user } switch action { case "on", "start": body, _ := json.Marshal(map[string]string{"action": "start", "reason": reason, "by": by}) req, _ := http.NewRequest("POST", "https://clavitor.ai/noc/api/maintenance?pin=250365", bytes.NewReader(body)) req.Header.Set("Content-Type", "application/json") resp, err := http.DefaultClient.Do(req) if err != nil { fatal("maintenance start: %v", err) } defer resp.Body.Close() io.ReadAll(resp.Body) log(cfg, "Maintenance ON: %s (by %s)", reason, by) case "off", "stop": body, _ := json.Marshal(map[string]string{"action": "stop", "reason": reason, "by": by}) req, _ := http.NewRequest("POST", "https://clavitor.ai/noc/api/maintenance?pin=250365", bytes.NewReader(body)) req.Header.Set("Content-Type", "application/json") resp, err := http.DefaultClient.Do(req) if err != nil { fatal("maintenance stop: %v", err) } defer resp.Body.Close() io.ReadAll(resp.Body) log(cfg, "Maintenance OFF (by %s)", by) case "status": resp, err := http.Get("https://clavitor.ai/noc/api/maintenance?pin=250365") if err != nil { fatal("maintenance status: %v", err) } defer resp.Body.Close() data, _ := io.ReadAll(resp.Body) if cfg.JSONOut { fmt.Println(string(data)) } else { var result struct { Active bool `json:"active"` Windows []struct { StartAt int64 `json:"start_at"` EndAt *int64 `json:"end_at"` Reason string `json:"reason"` StartBy string `json:"started_by"` EndBy string `json:"ended_by"` } `json:"windows"` } json.Unmarshal(data, &result) if result.Active { fmt.Println("MAINTENANCE ACTIVE") } else { fmt.Println("No active maintenance") } for _, w := range result.Windows { end := "ongoing" if w.EndAt != nil { end = time.Unix(*w.EndAt, 0).Format("2006-01-02 15:04:05") } fmt.Printf(" %s — %s %q by=%s\n", time.Unix(w.StartAt, 0).Format("2006-01-02 15:04:05"), end, w.Reason, w.StartBy) } } default: fatal("usage: pop-sync maintenance [reason]") } } // --- Subcommand: firewall --- // Ensures every POP's security group has exactly port 1984 open, nothing else. func cmdFirewall(cfg Config) []NodeResult { if cfg.AWSKeyID == "" || cfg.AWSSecretKey == "" { fatal("AWS credentials required: set AWS_ACCESS_KEY_ID/AWS_SECRET_ACCESS_KEY or -aws-key/-aws-secret") } pops := filterNodes(cfg, loadLivePOPs(cfg)) log(cfg, "Checking firewall for %d nodes...\n", len(pops)) var results []NodeResult for _, p := range pops { r := ensureFirewall(cfg, p) results = append(results, r) } outputResults(cfg, results) return results } // ensureFirewall makes sure the security group for a POP only allows inbound TCP 1984. func ensureFirewall(cfg Config, pop POP) NodeResult { name := pop.Subdomain() region := pop.RegionName r := NodeResult{Node: name, Action: "firewall"} if pop.InstanceID == "" { r.Error = "no instance_id" return r } ctx := context.Background() awsCfg, err := awsconfig.LoadDefaultConfig(ctx, awsconfig.WithRegion(region), awsconfig.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(cfg.AWSKeyID, cfg.AWSSecretKey, "")), ) if err != nil { r.Error = fmt.Sprintf("aws config: %v", err) return r } client := ec2.NewFromConfig(awsCfg) // Get instance's security group descOut, err := client.DescribeInstances(ctx, &ec2.DescribeInstancesInput{ InstanceIds: []string{pop.InstanceID}, }) if err != nil { r.Error = fmt.Sprintf("describe instance: %v", err) return r } if len(descOut.Reservations) == 0 || len(descOut.Reservations[0].Instances) == 0 { r.Error = "instance not found" return r } inst := descOut.Reservations[0].Instances[0] if len(inst.SecurityGroups) == 0 { r.Error = "no security groups" return r } sgID := *inst.SecurityGroups[0].GroupId // Get current rules sgOut, err := client.DescribeSecurityGroups(ctx, &ec2.DescribeSecurityGroupsInput{ GroupIds: []string{sgID}, }) if err != nil { r.Error = fmt.Sprintf("describe sg: %v", err) return r } sg := sgOut.SecurityGroups[0] // Check what needs changing has1984 := false var toRevoke []ec2types.IpPermission for _, perm := range sg.IpPermissions { if perm.FromPort != nil && *perm.FromPort == 1984 && perm.ToPort != nil && *perm.ToPort == 1984 && perm.IpProtocol != nil && *perm.IpProtocol == "tcp" { has1984 = true } else { toRevoke = append(toRevoke, perm) } } changes := 0 // Remove unwanted rules if len(toRevoke) > 0 && !cfg.DryRun { _, err := client.RevokeSecurityGroupIngress(ctx, &ec2.RevokeSecurityGroupIngressInput{ GroupId: &sgID, IpPermissions: toRevoke, }) if err != nil { r.Error = fmt.Sprintf("revoke rules: %v", err) return r } for _, p := range toRevoke { port := int32(0) if p.FromPort != nil { port = *p.FromPort } log(cfg, " [%s] removed port %d from %s", name, port, sgID) } changes += len(toRevoke) } else if len(toRevoke) > 0 { for _, p := range toRevoke { port := int32(0) if p.FromPort != nil { port = *p.FromPort } log(cfg, " [%s] would remove port %d from %s", name, port, sgID) } changes += len(toRevoke) } // Add 1984 if missing if !has1984 && !cfg.DryRun { proto := "tcp" port := int32(1984) _, err := client.AuthorizeSecurityGroupIngress(ctx, &ec2.AuthorizeSecurityGroupIngressInput{ GroupId: &sgID, IpPermissions: []ec2types.IpPermission{{ IpProtocol: &proto, FromPort: &port, ToPort: &port, IpRanges: []ec2types.IpRange{{CidrIp: strPtr("0.0.0.0/0")}}, }}, }) if err != nil { r.Error = fmt.Sprintf("add 1984: %v", err) return r } log(cfg, " [%s] opened port 1984 on %s", name, sgID) changes++ } else if !has1984 { log(cfg, " [%s] would open port 1984 on %s", name, sgID) changes++ } if changes == 0 { log(cfg, " [%s] OK — port 1984 only (%s)", name, sgID) } r.OK = true r.Message = fmt.Sprintf("sg=%s port=1984", sgID) return r } func strPtr(s string) *string { return &s } // --- Node execution: Tailscale SSH with SSM fallback --- // nodeExec runs a command on a node, trying Tailscale SSH first, falling back to SSM. func nodeExec(cfg Config, pop POP, command string) (string, error) { out, err := tsSshExec(pop.Subdomain(), command) if err == nil { return out, nil } // Tailscale SSH failed — fall back to SSM if we have instance_id if pop.InstanceID != "" { log(cfg, " [%s] TS SSH failed, falling back to SSM...", pop.Subdomain()) return ssmRunCommand(cfg.HansHost, pop.InstanceID, pop.RegionName, []string{command}) } return out, err } // nodePushFile copies a local file to a node, trying SCP (Tailscale) first, falling back to SSM+S3-less transfer via Hans. func nodePushFile(cfg Config, pop POP, localPath, remotePath string) error { err := scpToNode(pop.Subdomain(), localPath, remotePath) if err == nil { return nil } // Tailscale SCP failed — fall back: SCP to Hans, then Hans SCPs to node's public IP if pop.InstanceID != "" && pop.IP != "" { log(cfg, " [%s] TS SCP failed, falling back via Hans...", pop.Subdomain()) hansPath := fmt.Sprintf("/tmp/clavitor-deploy-%s", pop.Subdomain()) // Upload to Hans if err := scpToHost(cfg.HansHost, localPath, hansPath); err != nil { return fmt.Errorf("scp to hans: %w", err) } // Hans pushes to node via SSM (base64 chunks for small files, or just curl from Hans if reachable) // For a 14MB binary, use Hans → node SCP via public IP with SSM to temporarily allow it // Simplest: have the node pull from Hans via Tailscale pullCmd := fmt.Sprintf("scp -o StrictHostKeyChecking=no -o ConnectTimeout=10 %s:%s %s", cfg.HansHost, hansPath, remotePath) _, err := ssmRunCommand(cfg.HansHost, pop.InstanceID, pop.RegionName, []string{pullCmd}) // Clean up sshExec(cfg.HansHost, "rm -f "+hansPath) return err } return err } func tsSshExec(hostname, command string) (string, error) { cmd := exec.Command("ssh", "-o", "StrictHostKeyChecking=no", "-o", "ConnectTimeout=5", "-o", "UserKnownHostsFile=/dev/null", "-o", "LogLevel=ERROR", "root@"+hostname, command, ) var stdout, stderr bytes.Buffer cmd.Stdout = &stdout cmd.Stderr = &stderr err := cmd.Run() if err != nil { return stdout.String() + stderr.String(), err } return stdout.String(), nil } func scpToNode(hostname, localPath, remotePath string) error { cmd := exec.Command("scp", "-o", "StrictHostKeyChecking=no", "-o", "ConnectTimeout=5", "-o", "UserKnownHostsFile=/dev/null", "-o", "LogLevel=ERROR", localPath, "root@"+hostname+":"+remotePath, ) out, err := cmd.CombinedOutput() if err != nil { return fmt.Errorf("%v: %s", err, string(out)) } return nil } func scpToHost(host, localPath, remotePath string) error { cmd := exec.Command("scp", "-o", "StrictHostKeyChecking=no", "-o", "ConnectTimeout=10", localPath, host+":"+remotePath, ) out, err := cmd.CombinedOutput() if err != nil { return fmt.Errorf("%v: %s", err, string(out)) } return nil } // --- SSM (for bootstrap only, before Tailscale is available) --- func ssmBootstrapTailscale(cfg Config, pop POP) error { hostname := pop.Subdomain() region := pop.RegionName log(cfg, " Installing Tailscale...") out, err := ssmRunCommand(cfg.HansHost, pop.InstanceID, region, []string{ "command -v tailscale >/dev/null 2>&1 && echo 'already installed' && exit 0", "curl -fsSL https://tailscale.com/install.sh | sh", }) if err != nil { return fmt.Errorf("install tailscale: %w", err) } log(cfg, " %s", lastLines(out, 2)) log(cfg, " Joining tailnet as %s...", hostname) out, err = ssmRunCommand(cfg.HansHost, pop.InstanceID, region, []string{ "systemctl enable --now tailscaled 2>/dev/null || true", "sleep 2", fmt.Sprintf("tailscale up --authkey=%s --hostname=%s --ssh --reset", cfg.TSAuthKey, hostname), "tailscale ip -4", }) if err != nil { return fmt.Errorf("join tailnet: %w", err) } log(cfg, " TS IP: %s", strings.TrimSpace(out)) log(cfg, " Setting hostname...") _, _ = ssmRunCommand(cfg.HansHost, pop.InstanceID, region, []string{ fmt.Sprintf("hostnamectl set-hostname %s", hostname), fmt.Sprintf("grep -q '%s' /etc/hosts || echo '127.0.1.1 %s %s.%s' >> /etc/hosts", hostname, hostname, hostname, cfg.Zone), }) return nil } func ssmRunCommand(hansHost, instanceID, region string, commands []string) (string, error) { params := map[string][]string{"commands": commands} paramsJSON, _ := json.Marshal(params) tmpFile := fmt.Sprintf("/tmp/ssm-%s-%d.json", instanceID, time.Now().UnixNano()) writeCmd := fmt.Sprintf("cat > %s << 'SSMEOF'\n%s\nSSMEOF", tmpFile, string(paramsJSON)) if _, err := sshExec(hansHost, writeCmd); err != nil { return "", fmt.Errorf("writing params: %w", err) } ssmCmd := fmt.Sprintf( `aws ssm send-command --instance-ids %s --document-name AWS-RunShellScript --region %s --parameters file://%s --query Command.CommandId --output text && rm -f %s`, instanceID, region, tmpFile, tmpFile, ) cmdID, err := sshExec(hansHost, ssmCmd) if err != nil { sshExec(hansHost, "rm -f "+tmpFile) return "", fmt.Errorf("send-command: %w\n%s", err, cmdID) } cmdID = strings.TrimSpace(cmdID) if cmdID == "" { return "", fmt.Errorf("empty command ID returned") } for i := 0; i < 36; i++ { time.Sleep(5 * time.Second) pollCmd := fmt.Sprintf( `aws ssm get-command-invocation --command-id %s --instance-id %s --region %s --query '[Status,StandardOutputContent,StandardErrorContent]' --output text 2>/dev/null`, cmdID, instanceID, region, ) result, err := sshExec(hansHost, pollCmd) if err != nil { continue } parts := strings.SplitN(strings.TrimSpace(result), "\t", 3) if len(parts) < 1 { continue } switch parts[0] { case "Success": stdout := "" if len(parts) >= 2 { stdout = parts[1] } return stdout, nil case "Failed": stderr := "" if len(parts) >= 3 { stderr = parts[2] } return "", fmt.Errorf("command failed: %s", stderr) } } return "", fmt.Errorf("timeout waiting for SSM command %s", cmdID) } // sshBackground starts a command on a remote host via SSH, forking to background. func sshBackground(host, command string) { exec.Command("ssh", "-f", "-o", "StrictHostKeyChecking=no", "-o", "ConnectTimeout=10", host, command).Run() } func sshExec(host, command string) (string, error) { cmd := exec.Command("ssh", "-o", "StrictHostKeyChecking=no", "-o", "ConnectTimeout=10", host, command) var stdout, stderr bytes.Buffer cmd.Stdout = &stdout cmd.Stderr = &stderr err := cmd.Run() if err != nil { return stdout.String() + stderr.String(), err } return stdout.String(), nil } // --- DNS sync --- func syncDNS(cfg Config, pops []POP) []NodeResult { records, err := cfListRecords(cfg.CFToken, cfg.CFZoneID) if err != nil { fatal("listing DNS records: %v", err) } existing := map[string]cfDNSRecord{} protected := map[string]bool{cfg.Zone: true, "www." + cfg.Zone: true} for _, r := range records { if r.Type == "A" && !protected[r.Name] { existing[r.Name] = r } } desired := map[string]POP{} for _, p := range pops { if p.Zone() == cfg.Zone { desired[p.DNS] = p } } var results []NodeResult for fqdn, pop := range desired { r := NodeResult{Node: pop.Subdomain(), Action: "dns"} if rec, ok := existing[fqdn]; ok { if rec.Content == pop.IP { r.OK = true r.Message = fmt.Sprintf("%s -> %s", fqdn, pop.IP) log(cfg, " OK %s -> %s", fqdn, pop.IP) } else { if !cfg.DryRun { if err := cfUpdateRecord(cfg.CFToken, cfg.CFZoneID, rec.ID, fqdn, pop.IP); err != nil { r.Error = err.Error() } else { r.OK = true } } r.Message = fmt.Sprintf("updated %s -> %s (was %s)", fqdn, pop.IP, rec.Content) log(cfg, " UPD %s: %s -> %s", fqdn, rec.Content, pop.IP) } delete(existing, fqdn) } else { if !cfg.DryRun { if err := cfCreateRecord(cfg.CFToken, cfg.CFZoneID, fqdn, pop.IP); err != nil { r.Error = err.Error() } else { r.OK = true } } r.Message = fmt.Sprintf("created %s -> %s", fqdn, pop.IP) log(cfg, " ADD %s -> %s", fqdn, pop.IP) } results = append(results, r) } for fqdn, rec := range existing { sub := strings.TrimSuffix(fqdn, "."+cfg.Zone) if sub == fqdn || isKnownNonPOP(sub) { continue } r := NodeResult{Node: sub, Action: "dns-delete"} if !cfg.DryRun { if err := cfDeleteRecord(cfg.CFToken, cfg.CFZoneID, rec.ID); err != nil { r.Error = err.Error() } else { r.OK = true } } r.Message = fmt.Sprintf("deleted %s (%s)", fqdn, rec.Content) log(cfg, " DEL %s (%s)", fqdn, rec.Content) results = append(results, r) } return results } func isKnownNonPOP(sub string) bool { skip := map[string]bool{ "dev": true, "soc": true, "api": true, "mail": true, "mx": true, "ns1": true, "ns2": true, "status": true, } return skip[sub] } // --- Tailscale sync --- func syncTailscale(cfg Config, pops []POP) []NodeResult { devices, err := tsListDevices(cfg.TSKey) if err != nil { fatal("listing Tailscale devices: %v", err) } desired := map[string]POP{} for _, p := range pops { if sub := p.Subdomain(); sub != "" { desired[sub] = p } } devicesByHostname := map[string][]tsDevice{} for _, d := range devices { devicesByHostname[d.Hostname] = append(devicesByHostname[d.Hostname], d) } var results []NodeResult for sub, pop := range desired { r := NodeResult{Node: sub, Action: "tailscale"} if devs, ok := devicesByHostname[sub]; ok && len(devs) > 0 { tsIP := "" if len(devs[0].Addresses) > 0 { tsIP = devs[0].Addresses[0] } r.OK = true r.Message = fmt.Sprintf("TS:%s public:%s", tsIP, pop.IP) log(cfg, " OK %s — %s (TS: %s, public: %s)", sub, pop.City, tsIP, pop.IP) delete(devicesByHostname, sub) } else { if pop.InstanceID != "" && cfg.TSAuthKey != "" && !cfg.DryRun { log(cfg, " JOIN %s — %s via SSM (%s in %s)...", sub, pop.City, pop.InstanceID, pop.RegionName) if err := ssmBootstrapTailscale(cfg, pop); err != nil { r.Error = err.Error() log(cfg, " ERR: %v", err) } else { r.OK = true r.Message = "joined via SSM" log(cfg, " Joined tailnet as %s", sub) } } else if pop.InstanceID != "" && cfg.TSAuthKey != "" { r.Message = fmt.Sprintf("would SSM bootstrap %s in %s", pop.InstanceID, pop.RegionName) log(cfg, " JOIN %s — %s (would bootstrap)", sub, pop.City) } else if pop.InstanceID != "" { r.Error = "no -ts-authkey provided" log(cfg, " MISSING %s — needs -ts-authkey", sub) } else { r.Error = "no instance_id, cannot auto-join" log(cfg, " MISSING %s — no instance_id", sub) } } results = append(results, r) } for hostname, devs := range devicesByHostname { for _, d := range devs { if !isInfraDevice(d) { continue } r := NodeResult{Node: hostname, Action: "tailscale-delete"} if !cfg.DryRun { if err := tsDeleteDevice(cfg.TSKey, d.ID); err != nil { r.Error = err.Error() } else { r.OK = true r.Message = "removed stale device" } } else { r.Message = "would remove stale device" } log(cfg, " STALE %s — removed", hostname) results = append(results, r) } } return results } func isInfraDevice(d tsDevice) bool { return strings.Contains(d.Name, "tagged-") } // --- Parallel execution --- func parallelExec(pops []POP, workers int, fn func(POP) NodeResult) []NodeResult { ch := make(chan POP, len(pops)) for _, p := range pops { ch <- p } close(ch) var mu sync.Mutex var results []NodeResult var wg sync.WaitGroup for i := 0; i < workers && i < len(pops); i++ { wg.Add(1) go func() { defer wg.Done() for p := range ch { r := fn(p) mu.Lock() results = append(results, r) mu.Unlock() } }() } wg.Wait() return results } // --- DB --- func loadLivePOPs(cfg Config) []POP { pops, err := readPOPs(cfg.DBPath) if err != nil { fatal("reading DB: %v", err) } var live []POP for _, p := range pops { if p.Status == "live" && p.IP != "" && p.DNS != "" { live = append(live, p) } } return live } func readPOPs(dbPath string) ([]POP, error) { db, err := sql.Open("sqlite", dbPath) if err != nil { return nil, err } defer db.Close() rows, err := db.Query(`SELECT pop_id, city, country, region_name, ip, dns, status, provider, instance_id FROM pops ORDER BY pop_id`) if err != nil { return nil, err } defer rows.Close() var pops []POP for rows.Next() { var p POP if err := rows.Scan(&p.PopID, &p.City, &p.Country, &p.RegionName, &p.IP, &p.DNS, &p.Status, &p.Provider, &p.InstanceID); err != nil { return nil, err } pops = append(pops, p) } return pops, rows.Err() } func filterNodes(cfg Config, pops []POP) []POP { if cfg.Nodes == "" { return pops } want := map[string]bool{} for _, n := range strings.Split(cfg.Nodes, ",") { want[strings.TrimSpace(n)] = true } var out []POP for _, p := range pops { if want[p.Subdomain()] { out = append(out, p) } } return out } // --- Output --- func log(cfg Config, format string, args ...interface{}) { if !cfg.JSONOut { fmt.Printf(format+"\n", args...) } } func outputResults(cfg Config, results []NodeResult) { if cfg.JSONOut { enc := json.NewEncoder(os.Stdout) enc.SetIndent("", " ") enc.Encode(results) } } func oneLineStatus(raw string) string { lines := strings.Split(strings.TrimSpace(raw), "\n") parts := make([]string, 0, len(lines)) for _, l := range lines { l = strings.TrimSpace(l) if l != "" { parts = append(parts, l) } } return strings.Join(parts, " | ") } func lastLines(s string, n int) string { lines := strings.Split(strings.TrimSpace(s), "\n") if len(lines) <= n { return strings.TrimSpace(s) } return strings.Join(lines[len(lines)-n:], "\n") } func requireKeys(cfg Config, keys ...string) { for _, k := range keys { switch k { case "cf": if cfg.CFToken == "" { fatal("Cloudflare token required: set CF_API_TOKEN or -cf-token") } case "ts": if cfg.TSKey == "" { fatal("Tailscale API key required: set TS_API_KEY or -ts-key") } } } } func fatal(format string, args ...interface{}) { fmt.Fprintf(os.Stderr, "FATAL: "+format+"\n", args...) os.Exit(2) } // --- Cloudflare API --- type cfDNSRecord struct { ID string `json:"id"` Type string `json:"type"` Name string `json:"name"` Content string `json:"content"` Proxied bool `json:"proxied"` TTL int `json:"ttl"` } type cfListResponse struct { Success bool `json:"success"` Result []cfDNSRecord `json:"result"` } type cfMutateResponse struct { Success bool `json:"success"` Result cfDNSRecord `json:"result"` Errors []struct { Message string `json:"message"` } `json:"errors"` } func cfResolveZoneID(token, zone string) (string, error) { req, _ := http.NewRequest("GET", "https://api.cloudflare.com/client/v4/zones?name="+zone, nil) req.Header.Set("Authorization", "Bearer "+token) resp, err := http.DefaultClient.Do(req) if err != nil { return "", err } defer resp.Body.Close() var result struct { Result []struct { ID string `json:"id"` } `json:"result"` } if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { return "", err } if len(result.Result) == 0 { return "", fmt.Errorf("zone %q not found", zone) } return result.Result[0].ID, nil } func cfListRecords(token, zoneID string) ([]cfDNSRecord, error) { var all []cfDNSRecord page := 1 for { url := fmt.Sprintf("https://api.cloudflare.com/client/v4/zones/%s/dns_records?per_page=100&page=%d", zoneID, page) req, _ := http.NewRequest("GET", url, nil) req.Header.Set("Authorization", "Bearer "+token) resp, err := http.DefaultClient.Do(req) if err != nil { return nil, err } var result cfListResponse err = json.NewDecoder(resp.Body).Decode(&result) resp.Body.Close() if err != nil { return nil, err } all = append(all, result.Result...) if len(result.Result) < 100 { break } page++ } return all, nil } func cfCreateRecord(token, zoneID, fqdn, ip string) error { body, _ := json.Marshal(map[string]interface{}{ "type": "A", "name": fqdn, "content": ip, "ttl": 1, "proxied": false, }) url := fmt.Sprintf("https://api.cloudflare.com/client/v4/zones/%s/dns_records", zoneID) req, _ := http.NewRequest("POST", url, bytes.NewReader(body)) req.Header.Set("Authorization", "Bearer "+token) req.Header.Set("Content-Type", "application/json") resp, err := http.DefaultClient.Do(req) if err != nil { return err } defer resp.Body.Close() var result cfMutateResponse if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { return err } if !result.Success { return fmt.Errorf("API error: %v", result.Errors) } return nil } func cfUpdateRecord(token, zoneID, recordID, fqdn, ip string) error { body, _ := json.Marshal(map[string]interface{}{ "type": "A", "name": fqdn, "content": ip, "ttl": 1, "proxied": false, }) url := fmt.Sprintf("https://api.cloudflare.com/client/v4/zones/%s/dns_records/%s", zoneID, recordID) req, _ := http.NewRequest("PUT", url, bytes.NewReader(body)) req.Header.Set("Authorization", "Bearer "+token) req.Header.Set("Content-Type", "application/json") resp, err := http.DefaultClient.Do(req) if err != nil { return err } defer resp.Body.Close() var result cfMutateResponse if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { return err } if !result.Success { return fmt.Errorf("API error: %v", result.Errors) } return nil } func cfDeleteRecord(token, zoneID, recordID string) error { url := fmt.Sprintf("https://api.cloudflare.com/client/v4/zones/%s/dns_records/%s", zoneID, recordID) req, _ := http.NewRequest("DELETE", url, nil) req.Header.Set("Authorization", "Bearer "+token) resp, err := http.DefaultClient.Do(req) if err != nil { return err } defer resp.Body.Close() body, _ := io.ReadAll(resp.Body) var result struct { Success bool `json:"success"` } json.Unmarshal(body, &result) if !result.Success { return fmt.Errorf("delete failed: %s", string(body)) } return nil } // --- Tailscale API --- type tsDevice struct { ID string `json:"id"` Hostname string `json:"hostname"` Name string `json:"name"` Addresses []string `json:"addresses"` } type tsListResponse struct { Devices []tsDevice `json:"devices"` } func tsListDevices(apiKey string) ([]tsDevice, error) { req, _ := http.NewRequest("GET", "https://api.tailscale.com/api/v2/tailnet/-/devices", nil) req.SetBasicAuth(apiKey, "") resp, err := http.DefaultClient.Do(req) if err != nil { return nil, err } defer resp.Body.Close() if resp.StatusCode != 200 { body, _ := io.ReadAll(resp.Body) return nil, fmt.Errorf("HTTP %d: %s", resp.StatusCode, string(body)) } var result tsListResponse if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { return nil, err } return result.Devices, nil } func tsDeleteDevice(apiKey, deviceID string) error { req, _ := http.NewRequest("DELETE", "https://api.tailscale.com/api/v2/device/"+deviceID, nil) req.SetBasicAuth(apiKey, "") resp, err := http.DefaultClient.Do(req) if err != nil { return err } defer resp.Body.Close() if resp.StatusCode != 200 { body, _ := io.ReadAll(resp.Body) return fmt.Errorf("HTTP %d: %s", resp.StatusCode, string(body)) } return nil }