diff --git a/forge/dispatcher/README.md b/forge/dispatcher/README.md new file mode 100644 index 0000000..f6e90c6 --- /dev/null +++ b/forge/dispatcher/README.md @@ -0,0 +1,143 @@ +# Agent Dispatcher + +Auto-dispatches Gitea issues to agent task files. Runs on **forge** (development center). + +## Quick Start + +```bash +# 1. Set token +export GITEA_TOKEN="775a12730a65cbaf1673da048b7d01859b8b58e0" + +# 2. Build +cd /home/johan/dev/clavitor/forge/dispatcher +go build -o dispatcher . + +# 3. Run +./dispatcher +``` + +## Web Dashboard + +Open: **http://forge:8098** (or http://localhost:8098) + +Shows: +- Real-time dispatch status +- Tasks by agent +- Recent dispatches (with priority highlighting) +- Live log tail + +## How It Works + +1. **Polls Gitea every 60 seconds** (from forge, connects to Zurich) +2. **Finds open issues** with assignees +3. **Dispatches 1 task per minute max** (rate limit) +4. **Priority order:** CRITICAL → HIGH → NORMAL → LOW +5. **Writes to:** `/home/johan/dev/clavitor/.agent-tasks//issue-#.md` + +## Task File Format + +Each dispatched task creates a file like: +``` +/home/johan/dev/clavitor/.agent-tasks/hans/issue-2.md +``` + +Contains full instructions, links, context from the issue. + +## Monitoring + +### Web UI +http://localhost:8098 — Auto-refresh every 10 seconds + +### Command Line +```bash +# Watch live log +tail -f /home/johan/dev/clavitor/.agent-dispatcher.log + +# See tasks for an agent +ls /home/johan/dev/clavitor/.agent-tasks/hans/ + +# Read specific task +cat /home/johan/dev/clavitor/.agent-tasks/hans/issue-2.md +``` + +## Systemd Service (Run Always) + +```bash +# Create service file +sudo tee /etc/systemd/system/clavitor-dispatcher.service << 'EOF' +[Unit] +Description=Clavitor Agent Dispatcher +After=network.target + +[Service] +Type=simple +User=johan +WorkingDirectory=/home/johan/dev/clavitor/forge/dispatcher +Environment="GITEA_TOKEN=775a12730a65cbaf1673da048b7d01859b8b58e0" +ExecStart=/home/johan/dev/clavitor/forge/dispatcher/dispatcher +Restart=always +RestartSec=10 + +[Install] +WantedBy=multi-user.target +EOF + +# Enable and start +sudo systemctl daemon-reload +sudo systemctl enable clavitor-dispatcher +sudo systemctl start clavitor-dispatcher + +# Check status +sudo systemctl status clavitor-dispatcher +journalctl -u clavitor-dispatcher -f +``` + +## Logs + +All activity logged to: +- Console (stdout) +- File: `/home/johan/dev/clavitor/.agent-dispatcher.log` + +Log format: +``` +[DISPATCHER] 2026/04/09 14:32:15.123456 ✅ DISPATCHED: hans → Issue #2 (CRITICAL: silent DB errors) [Priority: CRITICAL] +``` + +## Rate Limiting + +**Hard limit: 1 task per minute** + +Prevents runaway dispatching. Even if 10 CRITICAL issues appear, only 1 dispatched per minute. + +## Manual Override + +If you need to dispatch urgently: + +```bash +# Just tell the agent directly: +"Hans, execute issue #5 now: cat /home/johan/dev/clavitor/.agent-tasks/hans/issue-5.md" +``` + +The dispatcher will skip already-dispatched issues (checks file existence). + +## Troubleshooting + +| Problem | Check | +|---------|-------| +| No tasks appearing | `tail -f .agent-dispatcher.log` — is Gitea reachable? | +| Web UI not loading | `netstat -tlnp \| grep 8098` — port in use? | +| Auth errors | `echo $GITEA_TOKEN` — token set? | +| Not dispatching | Issues must be: open + assigned + not already dispatched | + +## Architecture + +``` +Forge (This Machine) Zurich (Gitea) +┌──────────────────┐ ┌─────────────┐ +│ Dispatcher │◄────────►│ git.clavitor.ai +│ :8098 │ HTTPS │ (issues API) +│ .agent-tasks/ │ └─────────────┘ +└──────────────────┘ +``` + +**Key point:** Dispatcher runs on forge (your control), polls Zurich (Gitea). diff --git a/forge/dispatcher/dispatcher b/forge/dispatcher/dispatcher new file mode 100755 index 0000000..2e3c98e Binary files /dev/null and b/forge/dispatcher/dispatcher differ diff --git a/forge/dispatcher/go.mod b/forge/dispatcher/go.mod new file mode 100644 index 0000000..339de65 --- /dev/null +++ b/forge/dispatcher/go.mod @@ -0,0 +1,3 @@ +module dispatcher + +go 1.26.1 diff --git a/forge/dispatcher/main.go b/forge/dispatcher/main.go new file mode 100644 index 0000000..605d7f4 --- /dev/null +++ b/forge/dispatcher/main.go @@ -0,0 +1,464 @@ +package main + +import ( + "encoding/json" + "fmt" + "io" + "log" + "net/http" + "os" + "path/filepath" + "sort" + "strings" + "sync" + "time" +) + +// Config +const ( + GiteaURL = "https://git.clavitor.ai" + RepoOwner = "johan" + RepoName = "clavitor" + PollInterval = 60 * time.Second + TaskDir = "/home/johan/dev/clavitor/.agent-tasks" + LogFile = "/home/johan/dev/clavitor/.agent-dispatcher.log" + WebPort = "8098" +) + +type Issue struct { + ID int `json:"id"` + Number int `json:"number"` + Title string `json:"title"` + Body string `json:"body"` + State string `json:"state"` + Assignee *struct { + Login string `json:"login"` + } `json:"assignee"` + Assignees []struct { + Login string `json:"login"` + } `json:"assignees"` + Labels []struct { + Name string `json:"name"` + } `json:"labels"` + CreatedAt string `json:"created_at"` + UpdatedAt string `json:"updated_at"` +} + +type DispatchedTask struct { + IssueNumber int `json:"issue_number"` + Title string `json:"title"` + Agent string `json:"agent"` + Priority string `json:"priority"` + DispatchedAt time.Time `json:"dispatched_at"` + TaskFile string `json:"task_file"` +} + +// Global state +type Dispatcher struct { + mu sync.RWMutex + tasks []DispatchedTask + lastDispatch time.Time + token string + logger *log.Logger +} + +func NewDispatcher() *Dispatcher { + // Setup logging to file and stdout + logFile, err := os.OpenFile(LogFile, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) + if err != nil { + log.Fatal("Failed to open log file:", err) + } + + logger := log.New(io.MultiWriter(os.Stdout, logFile), "[DISPATCHER] ", log.LstdFlags|log.Lmicroseconds) + + token := os.Getenv("GITEA_TOKEN") + if token == "" { + logger.Fatal("GITEA_TOKEN not set. Run: export GITEA_TOKEN=775a12730a65cbaf1673da048b7d01859b8b58e0") + } + + // Ensure task directory exists + os.MkdirAll(TaskDir, 0755) + + return &Dispatcher{ + token: token, + logger: logger, + } +} + +func (d *Dispatcher) log(format string, v ...interface{}) { + d.logger.Printf(format, v...) +} + +func (d *Dispatcher) fetchOpenIssues() ([]Issue, error) { + url := fmt.Sprintf("%s/api/v1/repos/%s/%s/issues?state=open&limit=100", GiteaURL, RepoOwner, RepoName) + + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return nil, err + } + req.Header.Set("Authorization", "token "+d.token) + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode != 200 { + body, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("HTTP %d: %s", resp.StatusCode, string(body)) + } + + var issues []Issue + if err := json.NewDecoder(resp.Body).Decode(&issues); err != nil { + return nil, err + } + + return issues, nil +} + +func (d *Dispatcher) getPriority(issue Issue) string { + for _, label := range issue.Labels { + if label.Name == "critical" { + return "CRITICAL" + } + if label.Name == "high" { + return "HIGH" + } + } + return "NORMAL" +} + +func (d *Dispatcher) getAssignee(issue Issue) string { + if issue.Assignee != nil && issue.Assignee.Login != "" { + return issue.Assignee.Login + } + if len(issue.Assignees) > 0 { + return issue.Assignees[0].Login + } + return "" +} + +func (d *Dispatcher) taskFileExists(agent string, issueNum int) bool { + path := filepath.Join(TaskDir, agent, fmt.Sprintf("issue-%d.md", issueNum)) + _, err := os.Stat(path) + return !os.IsNotExist(err) +} + +func (d *Dispatcher) dispatchTask(issue Issue) error { + agent := d.getAssignee(issue) + if agent == "" { + return nil // Skip unassigned + } + + // Check if already dispatched + if d.taskFileExists(agent, issue.Number) { + return nil // Already dispatched + } + + // Rate limit: 1 per minute + d.mu.Lock() + timeSinceLast := time.Since(d.lastDispatch) + if timeSinceLast < time.Minute { + d.mu.Unlock() + return fmt.Errorf("rate limited: only %v since last dispatch", timeSinceLast) + } + d.lastDispatch = time.Now() + d.mu.Unlock() + + priority := d.getPriority(issue) + + // Create task file + agentDir := filepath.Join(TaskDir, agent) + os.MkdirAll(agentDir, 0755) + + taskFile := filepath.Join(agentDir, fmt.Sprintf("issue-%d.md", issue.Number)) + + content := fmt.Sprintf(`# Agent Task — Auto-Dispatched + +**Agent:** %s +**Issue:** #%d +**Title:** %s +**Priority:** %s +**Dispatched:** %s + +--- + +## Your Instruction + +1. **Read QUICKSTART.md** (60 seconds): + /home/johan/dev/clavitor/QUICKSTART.md + +2. **View this issue in Gitea:** + %s/%s/%s/issues/%d + +3. **Execute per handbook Section III** + +4. **Create branch:** + git checkout -b %s/fix-%d + +5. **Implement fix** per issue spec + +6. **Run daily review:** + ./scripts/daily-review.sh (must pass) + +7. **Commit with reference:** + git commit -m "telemetry: fix silent DB error. Fixes #%d" + +8. **Push and create PR:** + git push -u origin %s/fix-%d + tea pulls create --title "%s: %s" --description "Fixes #%d" + +9. **Wait for review** — DO NOT merge your own PR + +--- + +## Issue Context + +%s + +--- + +*This task was auto-dispatched by the agent scheduler.* +*Dispatch time: %s* +`, agent, issue.Number, issue.Title, priority, time.Now().Format(time.RFC3339), + GiteaURL, RepoOwner, RepoName, issue.Number, + agent, issue.Number, + issue.Number, + agent, issue.Number, + agent, issue.Title, issue.Number, + truncate(issue.Body, 500), + time.Now().Format(time.RFC3339)) + + if err := os.WriteFile(taskFile, []byte(content), 0644); err != nil { + return err + } + + // Record in memory + task := DispatchedTask{ + IssueNumber: issue.Number, + Title: issue.Title, + Agent: agent, + Priority: priority, + DispatchedAt: time.Now(), + TaskFile: taskFile, + } + + d.mu.Lock() + d.tasks = append(d.tasks, task) + d.mu.Unlock() + + d.log("✅ DISPATCHED: %s → Issue #%d (%s) [Priority: %s]", agent, issue.Number, issue.Title, priority) + + return nil +} + +func truncate(s string, maxLen int) string { + if len(s) <= maxLen { + return s + } + return s[:maxLen] + "..." +} + +func (d *Dispatcher) pollAndDispatch() { + d.log("Polling Gitea for open issues...") + + issues, err := d.fetchOpenIssues() + if err != nil { + d.log("❌ Failed to fetch issues: %v", err) + return + } + + d.log("Found %d open issues", len(issues)) + + // Sort by priority (CRITICAL first) + sort.Slice(issues, func(i, j int) bool { + pi := d.getPriority(issues[i]) + pj := d.getPriority(issues[j]) + priorityOrder := map[string]int{"CRITICAL": 0, "HIGH": 1, "NORMAL": 2, "LOW": 3} + return priorityOrder[pi] < priorityOrder[pj] + }) + + // Try to dispatch one task + dispatched := false + for _, issue := range issues { + if d.getAssignee(issue) == "" { + continue + } + + err := d.dispatchTask(issue) + if err != nil { + d.log("⚠️ Skipped Issue #%d: %v", issue.Number, err) + continue + } + + if err == nil { + dispatched = true + break // Only 1 per poll + } + } + + if !dispatched { + d.log("ℹ️ No new tasks to dispatch (rate limit or all caught up)") + } +} + +// Web UI handlers +func (d *Dispatcher) handleStatus(w http.ResponseWriter, r *http.Request) { + d.mu.RLock() + defer d.mu.RUnlock() + + // Count tasks by agent + agentCounts := make(map[string]int) + for _, t := range d.tasks { + agentCounts[t.Agent]++ + } + + w.Header().Set("Content-Type", "text/html") + fmt.Fprintf(w, ` + + + Agent Dispatcher Status + + + + +

🔧 Agent Dispatcher Status

+

Auto-refresh every 10 seconds | Last poll: %s

+ +
+

Overview

+

Total dispatched: %d tasks

+

Last dispatch: %s

+

Rate limit: 1 task per minute

+

Task directory: %s

+ +

By Agent

+ + +`, time.Now().Format(time.RFC3339), len(d.tasks), + d.lastDispatch.Format(time.RFC3339), TaskDir) + + for agent, count := range agentCounts { + fmt.Fprintf(w, "\n", agent, count) + } + + fmt.Fprintf(w, ` +
AgentTasks Dispatched
%s%d
+
+ +
+

Recent Dispatches

+`) + + // Show last 10 tasks (reverse order) + for i := len(d.tasks) - 1; i >= 0 && i >= len(d.tasks)-10; i-- { + t := d.tasks[i] + priorityClass := "" + if t.Priority == "CRITICAL" { + priorityClass = "critical" + } else if t.Priority == "HIGH" { + priorityClass = "high" + } + + fmt.Fprintf(w, ` +
+ #%d — %s %s | %s
+ Agent: %s | File: %s +
+`, priorityClass, t.IssueNumber, t.Title, t.Priority, t.DispatchedAt.Format("15:04:05"), t.Agent, t.TaskFile) + } + + fmt.Fprintf(w, ` +
+ +
+

Live Log (tail -f %s)

+
`, LogFile)
+	
+	// Read last 50 lines of log
+	logContent, _ := os.ReadFile(LogFile)
+	lines := strings.Split(string(logContent), "\n")
+	start := 0
+	if len(lines) > 50 {
+		start = len(lines) - 50
+	}
+	for i := start; i < len(lines); i++ {
+		fmt.Fprintf(w, "%s\n", lines[i])
+	}
+	
+	fmt.Fprintf(w, `
+
+ + + +`) +} + +func (d *Dispatcher) handleTasks(w http.ResponseWriter, r *http.Request) { + agent := r.URL.Query().Get("agent") + if agent == "" { + http.Error(w, "Missing agent parameter", 400) + return + } + + // List task files for this agent + agentDir := filepath.Join(TaskDir, agent) + files, err := os.ReadDir(agentDir) + if err != nil { + fmt.Fprintf(w, "No tasks for %s\n", agent) + return + } + + w.Header().Set("Content-Type", "text/plain") + fmt.Fprintf(w, "Tasks for %s:\n\n", agent) + for _, f := range files { + if strings.HasSuffix(f.Name(), ".md") { + fmt.Fprintf(w, "- %s/%s\n", agentDir, f.Name()) + } + } +} + +func main() { + d := NewDispatcher() + + d.log("========================================") + d.log("Agent Dispatcher Starting") + d.log("Gitea: %s", GiteaURL) + d.log("Repo: %s/%s", RepoOwner, RepoName) + d.log("Task Dir: %s", TaskDir) + d.log("Web UI: http://localhost:%s", WebPort) + d.log("Rate Limit: 1 task per minute") + d.log("========================================") + + // Start web server + go func() { + http.HandleFunc("/", d.handleStatus) + http.HandleFunc("/tasks", d.handleTasks) + d.log("Web UI available at http://localhost:%s", WebPort) + log.Fatal(http.ListenAndServe(":"+WebPort, nil)) + }() + + // Immediate first poll + d.pollAndDispatch() + + // Main loop + ticker := time.NewTicker(PollInterval) + defer ticker.Stop() + + for range ticker.C { + d.pollAndDispatch() + } +}