clavitor/forge/dispatcher/main.go

709 lines
18 KiB
Go
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package main
import (
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"os"
"os/exec"
"path/filepath"
"strings"
"sync"
"time"
)
// Config
const (
GiteaURL = "https://git.clavitor.ai"
RepoOwner = "johan"
RepoName = "clavitor"
PollInterval = 60 * time.Second
TaskDir = "/home/johan/dev/clavitor/.agent-tasks"
WorkDir = "/home/johan/dev/clavitor"
LogFile = "/home/johan/dev/clavitor/.agent-dispatcher.log"
WebPort = "8098"
)
type Issue struct {
ID int `json:"id"`
Number int `json:"number"`
Title string `json:"title"`
Body string `json:"body"`
State string `json:"state"`
Assignee *struct {
Login string `json:"login"`
} `json:"assignee"`
Assignees []struct {
Login string `json:"login"`
} `json:"assignees"`
Labels []struct {
Name string `json:"name"`
} `json:"labels"`
CreatedAt string `json:"created_at"`
UpdatedAt string `json:"updated_at"`
}
type DispatchedTask struct {
IssueNumber int `json:"issue_number"`
Title string `json:"title"`
Agent string `json:"agent"`
Priority string `json:"priority"`
DispatchedAt time.Time `json:"dispatched_at"`
TaskFile string `json:"task_file"`
}
// Domain to agent mapping (from CLAVITOR-AGENT-HANDBOOK.md Section I)
var domainToAgent = map[string]string{
"clavis-vault": "sarah",
"clavis-cli": "charles",
"clavis-crypto": "maria",
"clavis-chrome": "james",
"clavis-firefox": "james",
"clavis-safari": "james",
"clavis-android": "xiao",
"clavis-ios": "xiao",
"clavitor.ai": "emma",
"clavis-telemetry": "hans",
"operations": "hans",
"monitoring": "hans",
"noc": "hans",
"security": "victoria",
"architecture": "arthur",
"design": "luna",
"docs": "thomas",
"legal": "hugo",
"qa": "shakib",
"test": "shakib",
}
// Global state
type Dispatcher struct {
mu sync.RWMutex
tasks []DispatchedTask
lastDispatch time.Time
token string
logger *log.Logger
activeAgents map[string]time.Time // Track agents currently working
}
func NewDispatcher() *Dispatcher {
// Setup logging to file and stdout
logFile, err := os.OpenFile(LogFile, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
if err != nil {
log.Fatal("Failed to open log file:", err)
}
logger := log.New(io.MultiWriter(os.Stdout, logFile), "[DISPATCHER] ", log.LstdFlags|log.Lmicroseconds)
token := os.Getenv("GITEA_TOKEN")
if token == "" {
logger.Fatal("GITEA_TOKEN not set. Run: export GITEA_TOKEN=775a12730a65cbaf1673da048b7d01859b8b58e0")
}
// Ensure task directory exists
os.MkdirAll(TaskDir, 0755)
return &Dispatcher{
token: token,
logger: logger,
activeAgents: make(map[string]time.Time),
}
}
func (d *Dispatcher) log(format string, v ...interface{}) {
d.logger.Printf(format, v...)
}
func (d *Dispatcher) fetchOpenIssues() ([]Issue, error) {
url := fmt.Sprintf("%s/api/v1/repos/%s/%s/issues?state=open&limit=100", GiteaURL, RepoOwner, RepoName)
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, err
}
req.Header.Set("Authorization", "token "+d.token)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
body, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("HTTP %d: %s", resp.StatusCode, string(body))
}
var issues []Issue
if err := json.NewDecoder(resp.Body).Decode(&issues); err != nil {
return nil, err
}
return issues, nil
}
func (d *Dispatcher) getPriority(issue Issue) string {
for _, label := range issue.Labels {
if label.Name == "critical" {
return "CRITICAL"
}
if label.Name == "high" {
return "HIGH"
}
}
return "NORMAL"
}
func (d *Dispatcher) getAssignee(issue Issue) string {
if issue.Assignee != nil && issue.Assignee.Login != "" {
return issue.Assignee.Login
}
if len(issue.Assignees) > 0 {
return issue.Assignees[0].Login
}
return ""
}
func (d *Dispatcher) taskFileExists(agent string, issueNum int) bool {
path := filepath.Join(TaskDir, agent, fmt.Sprintf("issue-%d.md", issueNum))
_, err := os.Stat(path)
return !os.IsNotExist(err)
}
// Find agent for issue based on domain
func (d *Dispatcher) findAgentForIssue(issue Issue) string {
domain := d.getDomain(issue)
if domain == "" {
return "" // No domain found
}
agent, ok := domainToAgent[domain]
if !ok {
return "" // Unknown domain
}
return agent
}
// Extract domain from issue (body, title, or labels)
func (d *Dispatcher) getDomain(issue Issue) string {
// Check labels first
for _, label := range issue.Labels {
if agent, ok := domainToAgent[label.Name]; ok {
return agent
}
}
// Check title and body for domain keywords
text := issue.Title + " " + issue.Body
text = strings.ToLower(text)
// Check domains in priority order (more specific first)
domains := []string{
"clavis-telemetry", "clavis-vault", "clavis-cli", "clavis-crypto",
"clavis-chrome", "clavis-firefox", "clavis-safari",
"clavis-android", "clavis-ios", "clavitor.ai",
"operations", "monitoring", "noc",
"security", "architecture", "design", "docs", "legal", "qa", "test",
}
for _, domain := range domains {
if strings.Contains(text, domain) {
return domain
}
}
return ""
}
func (d *Dispatcher) dispatchTask(issue Issue, agent string) error {
if agent == "" {
return fmt.Errorf("no agent available")
}
// Check if already dispatched to this agent
if d.taskFileExists(agent, issue.Number) {
return nil // Already dispatched
}
// Rate limit: 1 per minute
d.mu.Lock()
timeSinceLast := time.Since(d.lastDispatch)
if timeSinceLast < time.Minute {
d.mu.Unlock()
return fmt.Errorf("rate limited: only %v since last dispatch", timeSinceLast)
}
d.lastDispatch = time.Now()
d.mu.Unlock()
priority := d.getPriority(issue)
// Create task file
agentDir := filepath.Join(TaskDir, agent)
os.MkdirAll(agentDir, 0755)
taskFile := filepath.Join(agentDir, fmt.Sprintf("issue-%d.md", issue.Number))
content := fmt.Sprintf(`# Agent Task — Auto-Dispatched
**Agent:** %s
**Issue:** #%d
**Title:** %s
**Priority:** %s
**Dispatched:** %s
---
## Your Instruction
1. **Read QUICKSTART.md** (60 seconds):
/home/johan/dev/clavitor/QUICKSTART.md
2. **View this issue in Gitea:**
%s/%s/%s/issues/%d
3. **Execute per handbook Section III**
4. **Create branch:**
git checkout -b %s/fix-%d
5. **Implement fix** per issue spec
6. **Run daily review:**
./scripts/daily-review.sh (must pass)
7. **Commit with reference:**
git commit -m "telemetry: fix silent DB error. Fixes #%d"
8. **Push and create PR:**
git push -u origin %s/fix-%d
tea pulls create --title "%s: %s" --description "Fixes #%d"
9. **Wait for review** — DO NOT merge your own PR
---
## Issue Context
%s
---
*This task was auto-dispatched by the agent scheduler.*
*Dispatch time: %s*
`, agent, issue.Number, issue.Title, priority, time.Now().Format(time.RFC3339),
GiteaURL, RepoOwner, RepoName, issue.Number,
agent, issue.Number,
issue.Number,
agent, issue.Number,
agent, issue.Title, issue.Number,
truncate(issue.Body, 500),
time.Now().Format(time.RFC3339))
if err := os.WriteFile(taskFile, []byte(content), 0644); err != nil {
return err
}
// Record in memory
task := DispatchedTask{
IssueNumber: issue.Number,
Title: issue.Title,
Agent: agent,
Priority: priority,
DispatchedAt: time.Now(),
TaskFile: taskFile,
}
d.mu.Lock()
d.tasks = append(d.tasks, task)
d.mu.Unlock()
d.log("✅ DISPATCHED: %s → Issue #%d (%s) [Priority: %s]", agent, issue.Number, issue.Title, priority)
return nil
}
func truncate(s string, maxLen int) string {
if len(s) <= maxLen {
return s
}
return s[:maxLen] + "..."
}
func (d *Dispatcher) pollAndDispatch() {
d.log("Polling Gitea for open issues...")
// Fetch all issues once
issues, err := d.fetchOpenIssues()
if err != nil {
d.log("❌ Failed to fetch issues: %v", err)
return
}
d.log("Found %d total open issues", len(issues))
// Group issues by assignee
byAgent := make(map[string][]Issue)
for _, issue := range issues {
assignee := d.getAssignee(issue)
if assignee == "" {
assignee = d.findAgentForIssue(issue) // Fallback to domain mapping
}
if assignee != "" {
byAgent[assignee] = append(byAgent[assignee], issue)
}
}
// Spawn agents that have work and aren't already active
for agent, agentIssues := range byAgent {
if len(agentIssues) == 0 {
continue
}
// Check if agent is already working
d.mu.Lock()
_, isActive := d.activeAgents[agent]
d.mu.Unlock()
if isActive {
d.log(" Agent %s is already working, skipping", agent)
continue
}
// Spawn the agent
d.spawnAgent(agent)
}
}
// spawnAgent launches opencode for an agent
func (d *Dispatcher) spawnAgent(agent string) {
d.mu.Lock()
d.activeAgents[agent] = time.Now()
d.mu.Unlock()
d.log("🚀 Spawning agent: %s", agent)
// Build command
cmd := exec.Command("opencode", "run",
"--agent", agent,
"--dangerously-skip-permissions",
"Check Gitea for issues assigned to "+agent+" and fix the highest priority one")
cmd.Dir = WorkDir
cmd.Env = append(os.Environ(),
"GITEA_TOKEN="+d.token,
)
// Run in background
go func() {
output, err := cmd.CombinedOutput()
if err != nil {
d.log("❌ Agent %s failed: %v\nOutput: %s", agent, err, string(output))
} else {
d.log("✅ Agent %s completed", agent)
}
// Mark agent as done
d.mu.Lock()
delete(d.activeAgents, agent)
d.mu.Unlock()
}()
}
// Webhook handler for Gitea events
func (d *Dispatcher) handleWebhook(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
http.Error(w, "Method not allowed", 405)
return
}
body, err := io.ReadAll(r.Body)
if err != nil {
http.Error(w, "Bad request", 400)
return
}
// Parse webhook payload
var payload struct {
Action string `json:"action"`
Issue *Issue `json:"issue"`
PullRequest *struct {
Number int `json:"number"`
Title string `json:"title"`
State string `json:"state"`
User struct {
Login string `json:"login"`
} `json:"user"`
Head struct {
Ref string `json:"ref"`
} `json:"head"`
} `json:"pull_request"`
Repository struct {
FullName string `json:"full_name"`
} `json:"repository"`
Sender struct {
Login string `json:"login"`
} `json:"sender"`
}
if err := json.Unmarshal(body, &payload); err != nil {
d.log("❌ Webhook: Failed to parse payload: %v", err)
http.Error(w, "Bad request", 400)
return
}
// Get event type from header
eventType := r.Header.Get("X-Gitea-Event")
if eventType == "" {
eventType = r.Header.Get("X-GitHub-Event") // Fallback
}
d.log("📨 Webhook received: %s from %s", eventType, payload.Sender.Login)
// Handle issue events
if eventType == "issues" && payload.Issue != nil {
switch payload.Action {
case "opened", "reopened", "edited", "assigned":
d.log("📋 Issue #%d %s - processing dispatch", payload.Issue.Number, payload.Action)
d.processSingleIssue(*payload.Issue)
case "closed":
d.log("✅ Issue #%d closed - checking for next task", payload.Issue.Number)
d.markTaskDone(*payload.Issue)
}
}
// Handle PR merge (task completed)
if eventType == "pull_request" && payload.PullRequest != nil {
if payload.Action == "closed" && payload.PullRequest.State == "merged" {
d.log("🎉 PR #%d merged by %s - agent completed task",
payload.PullRequest.Number, payload.PullRequest.User.Login)
// Extract agent from branch name (agent/fix-123)
branch := payload.PullRequest.Head.Ref
if idx := strings.Index(branch, "/"); idx > 0 {
agent := branch[:idx]
d.markAgentTaskDone(agent)
}
}
}
w.WriteHeader(200)
fmt.Fprintln(w, "OK")
}
// Process a single issue immediately (for webhooks)
func (d *Dispatcher) processSingleIssue(issue Issue) {
agent := d.findAgentForIssue(issue)
if agent == "" {
d.log(" Issue #%d: No available agent for domain", issue.Number)
return
}
err := d.dispatchTask(issue, agent)
if err != nil {
d.log("⚠️ Issue #%d: Failed to dispatch to %s: %v", issue.Number, agent, err)
} else {
d.log("✅ Issue #%d dispatched to %s", issue.Number, agent)
}
}
// Mark a task as done when issue is closed
func (d *Dispatcher) markTaskDone(issue Issue) {
// Find which agent had this task
for agent := range domainToAgent {
taskFile := filepath.Join(TaskDir, agent, fmt.Sprintf("issue-%d.md", issue.Number))
if _, err := os.Stat(taskFile); err == nil {
// Rename to .done.md
doneFile := filepath.Join(TaskDir, agent, fmt.Sprintf("issue-%d.done.md", issue.Number))
os.Rename(taskFile, doneFile)
d.log("✅ Task for Issue #%d marked as done for %s", issue.Number, agent)
break
}
}
}
// Mark task done when PR is merged
func (d *Dispatcher) markAgentTaskDone(agent string) {
agentDir := filepath.Join(TaskDir, agent)
files, err := os.ReadDir(agentDir)
if err != nil {
return
}
for _, f := range files {
if strings.HasSuffix(f.Name(), ".md") && !strings.HasSuffix(f.Name(), ".done.md") {
taskFile := filepath.Join(agentDir, f.Name())
doneFile := filepath.Join(agentDir, f.Name()[:len(f.Name())-3]+".done.md")
os.Rename(taskFile, doneFile)
d.log("✅ Agent %s task marked as done (PR merged)", agent)
// Trigger dispatch of next task for this agent
go d.pollAndDispatch()
break
}
}
}
// Web UI handlers
func (d *Dispatcher) handleStatus(w http.ResponseWriter, r *http.Request) {
d.mu.RLock()
defer d.mu.RUnlock()
// Count tasks by agent
agentCounts := make(map[string]int)
for _, t := range d.tasks {
agentCounts[t.Agent]++
}
w.Header().Set("Content-Type", "text/html")
fmt.Fprintf(w, `<!DOCTYPE html>
<html>
<head>
<title>Agent Dispatcher Status</title>
<style>
body { font-family: sans-serif; margin: 40px; background: #f5f5f5; }
h1 { color: #333; }
.status { background: white; padding: 20px; border-radius: 8px; margin: 20px 0; }
.task { background: #e8f5e9; padding: 10px; margin: 5px 0; border-radius: 4px; }
.critical { background: #ffebee; border-left: 4px solid #f44336; }
.high { background: #fff3e0; border-left: 4px solid #ff9800; }
table { width: 100%%; border-collapse: collapse; }
th, td { text-align: left; padding: 12px; border-bottom: 1px solid #ddd; }
th { background: #333; color: white; }
.refresh { color: #666; font-size: 0.9em; }
.log { font-family: monospace; background: #263238; color: #aed581; padding: 20px; border-radius: 4px; overflow-x: auto; }
</style>
<meta http-equiv="refresh" content="10">
</head>
<body>
<h1>🔧 Agent Dispatcher Status</h1>
<p class="refresh">Auto-refresh every 10 seconds | Last poll: %s</p>
<div class="status">
<h2>Overview</h2>
<p><strong>Total dispatched:</strong> %d tasks</p>
<p><strong>Last dispatch:</strong> %s</p>
<p><strong>Rate limit:</strong> 1 task per minute</p>
<p><strong>Task directory:</strong> %s</p>
<h3>By Agent</h3>
<table>
<tr><th>Agent</th><th>Tasks Dispatched</th></tr>
`, time.Now().Format(time.RFC3339), len(d.tasks),
d.lastDispatch.Format(time.RFC3339), TaskDir)
for agent, count := range agentCounts {
fmt.Fprintf(w, "<tr><td>%s</td><td>%d</td></tr>\n", agent, count)
}
fmt.Fprintf(w, `
</table>
</div>
<div class="status">
<h2>Recent Dispatches</h2>
`)
// Show last 10 tasks (reverse order)
for i := len(d.tasks) - 1; i >= 0 && i >= len(d.tasks)-10; i-- {
t := d.tasks[i]
priorityClass := ""
if t.Priority == "CRITICAL" {
priorityClass = "critical"
} else if t.Priority == "HIGH" {
priorityClass = "high"
}
fmt.Fprintf(w, `
<div class="task %s">
<strong>#%d</strong> — %s <span style="float:right">%s | %s</span><br>
<small>Agent: %s | File: %s</small>
</div>
`, priorityClass, t.IssueNumber, t.Title, t.Priority, t.DispatchedAt.Format("15:04:05"), t.Agent, t.TaskFile)
}
fmt.Fprintf(w, `
</div>
<div class="status">
<h2>Live Log (tail -f %s)</h2>
<pre class="log">`, LogFile)
// Read last 50 lines of log
logContent, _ := os.ReadFile(LogFile)
lines := strings.Split(string(logContent), "\n")
start := 0
if len(lines) > 50 {
start = len(lines) - 50
}
for i := start; i < len(lines); i++ {
fmt.Fprintf(w, "%s\n", lines[i])
}
fmt.Fprintf(w, `</pre>
</div>
</body>
</html>
`)
}
func (d *Dispatcher) handleTasks(w http.ResponseWriter, r *http.Request) {
agent := r.URL.Query().Get("agent")
if agent == "" {
http.Error(w, "Missing agent parameter", 400)
return
}
// List task files for this agent
agentDir := filepath.Join(TaskDir, agent)
files, err := os.ReadDir(agentDir)
if err != nil {
fmt.Fprintf(w, "No tasks for %s\n", agent)
return
}
w.Header().Set("Content-Type", "text/plain")
fmt.Fprintf(w, "Tasks for %s:\n\n", agent)
for _, f := range files {
if strings.HasSuffix(f.Name(), ".md") {
fmt.Fprintf(w, "- %s/%s\n", agentDir, f.Name())
}
}
}
func main() {
d := NewDispatcher()
d.log("========================================")
d.log("Agent Dispatcher Starting")
d.log("Gitea: %s", GiteaURL)
d.log("Repo: %s/%s", RepoOwner, RepoName)
d.log("Task Dir: %s", TaskDir)
d.log("Web UI: http://localhost:%s", WebPort)
d.log("Webhook endpoint: http://localhost:%s/webhook", WebPort)
d.log("Mode: Webhook listener + backup polling")
d.log("========================================")
// Start web server (includes webhook listener)
go func() {
http.HandleFunc("/", d.handleStatus)
http.HandleFunc("/tasks", d.handleTasks)
http.HandleFunc("/webhook", d.handleWebhook) // Gitea webhooks
d.log("Web UI available at http://localhost:%s", WebPort)
d.log("Webhook endpoint: http://localhost:%s/webhook", WebPort)
log.Fatal(http.ListenAndServe(":"+WebPort, nil))
}()
// Immediate first poll (backup for any missed webhooks)
d.pollAndDispatch()
// Main loop
ticker := time.NewTicker(PollInterval)
defer ticker.Stop()
for range ticker.C {
d.pollAndDispatch()
}
}