diff --git a/forge/dispatcher/main.go b/forge/dispatcher/main.go index 605d7f4..6c95230 100644 --- a/forge/dispatcher/main.go +++ b/forge/dispatcher/main.go @@ -7,8 +7,8 @@ import ( "log" "net/http" "os" + "os/exec" "path/filepath" - "sort" "strings" "sync" "time" @@ -21,6 +21,7 @@ const ( RepoName = "clavitor" PollInterval = 60 * time.Second TaskDir = "/home/johan/dev/clavitor/.agent-tasks" + WorkDir = "/home/johan/dev/clavitor" LogFile = "/home/johan/dev/clavitor/.agent-dispatcher.log" WebPort = "8098" ) @@ -53,6 +54,30 @@ type DispatchedTask struct { TaskFile string `json:"task_file"` } +// Domain to agent mapping (from CLAVITOR-AGENT-HANDBOOK.md Section I) +var domainToAgent = map[string]string{ + "clavis-vault": "sarah", + "clavis-cli": "charles", + "clavis-crypto": "maria", + "clavis-chrome": "james", + "clavis-firefox": "james", + "clavis-safari": "james", + "clavis-android": "xiao", + "clavis-ios": "xiao", + "clavitor.ai": "emma", + "clavis-telemetry": "hans", + "operations": "hans", + "monitoring": "hans", + "noc": "hans", + "security": "victoria", + "architecture": "arthur", + "design": "luna", + "docs": "thomas", + "legal": "hugo", + "qa": "shakib", + "test": "shakib", +} + // Global state type Dispatcher struct { mu sync.RWMutex @@ -60,6 +85,7 @@ type Dispatcher struct { lastDispatch time.Time token string logger *log.Logger + activeAgents map[string]time.Time // Track agents currently working } func NewDispatcher() *Dispatcher { @@ -80,8 +106,9 @@ func NewDispatcher() *Dispatcher { os.MkdirAll(TaskDir, 0755) return &Dispatcher{ - token: token, - logger: logger, + token: token, + logger: logger, + activeAgents: make(map[string]time.Time), } } @@ -145,13 +172,58 @@ func (d *Dispatcher) taskFileExists(agent string, issueNum int) bool { return !os.IsNotExist(err) } -func (d *Dispatcher) dispatchTask(issue Issue) error { - agent := d.getAssignee(issue) - if agent == "" { - return nil // Skip unassigned +// Find agent for issue based on domain +func (d *Dispatcher) findAgentForIssue(issue Issue) string { + domain := d.getDomain(issue) + if domain == "" { + return "" // No domain found } - // Check if already dispatched + agent, ok := domainToAgent[domain] + if !ok { + return "" // Unknown domain + } + + return agent +} + +// Extract domain from issue (body, title, or labels) +func (d *Dispatcher) getDomain(issue Issue) string { + // Check labels first + for _, label := range issue.Labels { + if agent, ok := domainToAgent[label.Name]; ok { + return agent + } + } + + // Check title and body for domain keywords + text := issue.Title + " " + issue.Body + text = strings.ToLower(text) + + // Check domains in priority order (more specific first) + domains := []string{ + "clavis-telemetry", "clavis-vault", "clavis-cli", "clavis-crypto", + "clavis-chrome", "clavis-firefox", "clavis-safari", + "clavis-android", "clavis-ios", "clavitor.ai", + "operations", "monitoring", "noc", + "security", "architecture", "design", "docs", "legal", "qa", "test", + } + + for _, domain := range domains { + if strings.Contains(text, domain) { + return domain + } + } + + return "" +} + +func (d *Dispatcher) dispatchTask(issue Issue, agent string) error { + if agent == "" { + return fmt.Errorf("no agent available") + } + + // Check if already dispatched to this agent if d.taskFileExists(agent, issue.Number) { return nil // Already dispatched } @@ -263,43 +335,212 @@ func truncate(s string, maxLen int) string { func (d *Dispatcher) pollAndDispatch() { d.log("Polling Gitea for open issues...") + // Fetch all issues once issues, err := d.fetchOpenIssues() if err != nil { d.log("❌ Failed to fetch issues: %v", err) return } - d.log("Found %d open issues", len(issues)) + d.log("Found %d total open issues", len(issues)) - // Sort by priority (CRITICAL first) - sort.Slice(issues, func(i, j int) bool { - pi := d.getPriority(issues[i]) - pj := d.getPriority(issues[j]) - priorityOrder := map[string]int{"CRITICAL": 0, "HIGH": 1, "NORMAL": 2, "LOW": 3} - return priorityOrder[pi] < priorityOrder[pj] - }) - - // Try to dispatch one task - dispatched := false + // Group issues by assignee + byAgent := make(map[string][]Issue) for _, issue := range issues { - if d.getAssignee(issue) == "" { - continue + assignee := d.getAssignee(issue) + if assignee == "" { + assignee = d.findAgentForIssue(issue) // Fallback to domain mapping } - - err := d.dispatchTask(issue) - if err != nil { - d.log("⚠️ Skipped Issue #%d: %v", issue.Number, err) - continue - } - - if err == nil { - dispatched = true - break // Only 1 per poll + if assignee != "" { + byAgent[assignee] = append(byAgent[assignee], issue) } } - if !dispatched { - d.log("ℹ️ No new tasks to dispatch (rate limit or all caught up)") + // Spawn agents that have work and aren't already active + for agent, agentIssues := range byAgent { + if len(agentIssues) == 0 { + continue + } + + // Check if agent is already working + d.mu.Lock() + _, isActive := d.activeAgents[agent] + d.mu.Unlock() + + if isActive { + d.log("ℹ️ Agent %s is already working, skipping", agent) + continue + } + + // Spawn the agent + d.spawnAgent(agent) + } +} + +// spawnAgent launches opencode for an agent +func (d *Dispatcher) spawnAgent(agent string) { + d.mu.Lock() + d.activeAgents[agent] = time.Now() + d.mu.Unlock() + + d.log("🚀 Spawning agent: %s", agent) + + // Build command + cmd := exec.Command("opencode", "run", + "--agent", agent, + "--dangerously-skip-permissions", + "Check Gitea for issues assigned to "+agent+" and fix the highest priority one") + cmd.Dir = WorkDir + cmd.Env = append(os.Environ(), + "GITEA_TOKEN="+d.token, + ) + + // Run in background + go func() { + output, err := cmd.CombinedOutput() + if err != nil { + d.log("❌ Agent %s failed: %v\nOutput: %s", agent, err, string(output)) + } else { + d.log("✅ Agent %s completed", agent) + } + + // Mark agent as done + d.mu.Lock() + delete(d.activeAgents, agent) + d.mu.Unlock() + }() +} + +// Webhook handler for Gitea events +func (d *Dispatcher) handleWebhook(w http.ResponseWriter, r *http.Request) { + if r.Method != "POST" { + http.Error(w, "Method not allowed", 405) + return + } + + body, err := io.ReadAll(r.Body) + if err != nil { + http.Error(w, "Bad request", 400) + return + } + + // Parse webhook payload + var payload struct { + Action string `json:"action"` + Issue *Issue `json:"issue"` + PullRequest *struct { + Number int `json:"number"` + Title string `json:"title"` + State string `json:"state"` + User struct { + Login string `json:"login"` + } `json:"user"` + Head struct { + Ref string `json:"ref"` + } `json:"head"` + } `json:"pull_request"` + Repository struct { + FullName string `json:"full_name"` + } `json:"repository"` + Sender struct { + Login string `json:"login"` + } `json:"sender"` + } + + if err := json.Unmarshal(body, &payload); err != nil { + d.log("❌ Webhook: Failed to parse payload: %v", err) + http.Error(w, "Bad request", 400) + return + } + + // Get event type from header + eventType := r.Header.Get("X-Gitea-Event") + if eventType == "" { + eventType = r.Header.Get("X-GitHub-Event") // Fallback + } + + d.log("📨 Webhook received: %s from %s", eventType, payload.Sender.Login) + + // Handle issue events + if eventType == "issues" && payload.Issue != nil { + switch payload.Action { + case "opened", "reopened", "edited", "assigned": + d.log("📋 Issue #%d %s - processing dispatch", payload.Issue.Number, payload.Action) + d.processSingleIssue(*payload.Issue) + case "closed": + d.log("✅ Issue #%d closed - checking for next task", payload.Issue.Number) + d.markTaskDone(*payload.Issue) + } + } + + // Handle PR merge (task completed) + if eventType == "pull_request" && payload.PullRequest != nil { + if payload.Action == "closed" && payload.PullRequest.State == "merged" { + d.log("🎉 PR #%d merged by %s - agent completed task", + payload.PullRequest.Number, payload.PullRequest.User.Login) + // Extract agent from branch name (agent/fix-123) + branch := payload.PullRequest.Head.Ref + if idx := strings.Index(branch, "/"); idx > 0 { + agent := branch[:idx] + d.markAgentTaskDone(agent) + } + } + } + + w.WriteHeader(200) + fmt.Fprintln(w, "OK") +} + +// Process a single issue immediately (for webhooks) +func (d *Dispatcher) processSingleIssue(issue Issue) { + agent := d.findAgentForIssue(issue) + if agent == "" { + d.log("ℹ️ Issue #%d: No available agent for domain", issue.Number) + return + } + + err := d.dispatchTask(issue, agent) + if err != nil { + d.log("⚠️ Issue #%d: Failed to dispatch to %s: %v", issue.Number, agent, err) + } else { + d.log("✅ Issue #%d dispatched to %s", issue.Number, agent) + } +} + +// Mark a task as done when issue is closed +func (d *Dispatcher) markTaskDone(issue Issue) { + // Find which agent had this task + for agent := range domainToAgent { + taskFile := filepath.Join(TaskDir, agent, fmt.Sprintf("issue-%d.md", issue.Number)) + if _, err := os.Stat(taskFile); err == nil { + // Rename to .done.md + doneFile := filepath.Join(TaskDir, agent, fmt.Sprintf("issue-%d.done.md", issue.Number)) + os.Rename(taskFile, doneFile) + d.log("✅ Task for Issue #%d marked as done for %s", issue.Number, agent) + break + } + } +} + +// Mark task done when PR is merged +func (d *Dispatcher) markAgentTaskDone(agent string) { + agentDir := filepath.Join(TaskDir, agent) + files, err := os.ReadDir(agentDir) + if err != nil { + return + } + + for _, f := range files { + if strings.HasSuffix(f.Name(), ".md") && !strings.HasSuffix(f.Name(), ".done.md") { + taskFile := filepath.Join(agentDir, f.Name()) + doneFile := filepath.Join(agentDir, f.Name()[:len(f.Name())-3]+".done.md") + os.Rename(taskFile, doneFile) + d.log("✅ Agent %s task marked as done (PR merged)", agent) + + // Trigger dispatch of next task for this agent + go d.pollAndDispatch() + break + } } } @@ -440,18 +681,21 @@ func main() { d.log("Repo: %s/%s", RepoOwner, RepoName) d.log("Task Dir: %s", TaskDir) d.log("Web UI: http://localhost:%s", WebPort) - d.log("Rate Limit: 1 task per minute") + d.log("Webhook endpoint: http://localhost:%s/webhook", WebPort) + d.log("Mode: Webhook listener + backup polling") d.log("========================================") - // Start web server + // Start web server (includes webhook listener) go func() { http.HandleFunc("/", d.handleStatus) http.HandleFunc("/tasks", d.handleTasks) + http.HandleFunc("/webhook", d.handleWebhook) // Gitea webhooks d.log("Web UI available at http://localhost:%s", WebPort) + d.log("Webhook endpoint: http://localhost:%s/webhook", WebPort) log.Fatal(http.ListenAndServe(":"+WebPort, nil)) }() - // Immediate first poll + // Immediate first poll (backup for any missed webhooks) d.pollAndDispatch() // Main loop