diff --git a/forge/dispatcher/main.go b/forge/dispatcher/main.go index 605d7f4..26297b5 100644 --- a/forge/dispatcher/main.go +++ b/forge/dispatcher/main.go @@ -7,6 +7,7 @@ import ( "log" "net/http" "os" + "os/exec" "path/filepath" "sort" "strings" @@ -21,10 +22,35 @@ const ( RepoName = "clavitor" PollInterval = 60 * time.Second TaskDir = "/home/johan/dev/clavitor/.agent-tasks" + WorkDir = "/home/johan/dev/clavitor" LogFile = "/home/johan/dev/clavitor/.agent-dispatcher.log" WebPort = "8098" ) +// Domain to agent mapping (from CLAVITOR-AGENT-HANDBOOK.md Section I) +var domainToAgent = map[string]string{ + "clavis-vault": "sarah", + "clavis-cli": "charles", + "clavis-crypto": "maria", + "clavis-chrome": "james", + "clavis-firefox": "james", + "clavis-safari": "james", + "clavis-android": "xiao", + "clavis-ios": "xiao", + "clavitor.ai": "emma", + "clavis-telemetry": "hans", + "operations": "hans", + "monitoring": "hans", + "noc": "hans", + "security": "victoria", + "architecture": "arthur", + "design": "luna", + "docs": "thomas", + "legal": "hugo", + "qa": "shakib", + "test": "shakib", +} + type Issue struct { ID int `json:"id"` Number int `json:"number"` @@ -44,13 +70,27 @@ type Issue struct { UpdatedAt string `json:"updated_at"` } +type PullRequest struct { + Number int `json:"number"` + Title string `json:"title"` + Head struct { + Ref string `json:"ref"` + } `json:"head"` + Base struct { + Ref string `json:"ref"` + } `json:"base"` + Labels []struct { + Name string `json:"name"` + } `json:"labels"` +} + type DispatchedTask struct { - IssueNumber int `json:"issue_number"` - Title string `json:"title"` - Agent string `json:"agent"` - Priority string `json:"priority"` + IssueNumber int `json:"issue_number"` + Title string `json:"title"` + Agent string `json:"agent"` + Priority string `json:"priority"` DispatchedAt time.Time `json:"dispatched_at"` - TaskFile string `json:"task_file"` + TaskFile string `json:"task_file"` } // Global state @@ -60,10 +100,10 @@ type Dispatcher struct { lastDispatch time.Time token string logger *log.Logger + activeAgents map[string]time.Time } func NewDispatcher() *Dispatcher { - // Setup logging to file and stdout logFile, err := os.OpenFile(LogFile, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) if err != nil { log.Fatal("Failed to open log file:", err) @@ -76,12 +116,12 @@ func NewDispatcher() *Dispatcher { logger.Fatal("GITEA_TOKEN not set. Run: export GITEA_TOKEN=775a12730a65cbaf1673da048b7d01859b8b58e0") } - // Ensure task directory exists os.MkdirAll(TaskDir, 0755) return &Dispatcher{ - token: token, - logger: logger, + token: token, + logger: logger, + activeAgents: make(map[string]time.Time), } } @@ -117,6 +157,57 @@ func (d *Dispatcher) fetchOpenIssues() ([]Issue, error) { return issues, nil } +func (d *Dispatcher) fetchOpenPRs() []PullRequest { + url := fmt.Sprintf("%s/api/v1/repos/%s/%s/pulls?state=open", GiteaURL, RepoOwner, RepoName) + + req, err := http.NewRequest("GET", url, nil) + if err != nil { + d.log("Failed to create PR request: %v", err) + return nil + } + req.Header.Set("Authorization", "token "+d.token) + + resp, err := http.DefaultClient.Do(req) + if err != nil { + d.log("Failed to fetch PRs: %v", err) + return nil + } + defer resp.Body.Close() + + if resp.StatusCode != 200 { + body, _ := io.ReadAll(resp.Body) + d.log("PR fetch failed: HTTP %d: %s", resp.StatusCode, string(body)) + return nil + } + + var prs []PullRequest + if err := json.NewDecoder(resp.Body).Decode(&prs); err != nil { + d.log("Failed to decode PRs: %v", err) + return nil + } + + return prs +} + +func (d *Dispatcher) prHasLabel(pr PullRequest, label string) bool { + for _, l := range pr.Labels { + if l.Name == label { + return true + } + } + return false +} + +func (d *Dispatcher) getAssignee(issue Issue) string { + if issue.Assignee != nil && issue.Assignee.Login != "" { + return issue.Assignee.Login + } + if len(issue.Assignees) > 0 && issue.Assignees[0].Login != "" { + return issue.Assignees[0].Login + } + return "" +} + func (d *Dispatcher) getPriority(issue Issue) string { for _, label := range issue.Labels { if label.Name == "critical" { @@ -129,112 +220,144 @@ func (d *Dispatcher) getPriority(issue Issue) string { return "NORMAL" } -func (d *Dispatcher) getAssignee(issue Issue) string { - if issue.Assignee != nil && issue.Assignee.Login != "" { - return issue.Assignee.Login +func (d *Dispatcher) findAgentForIssue(issue Issue) string { + for _, label := range issue.Labels { + if agent, ok := domainToAgent[label.Name]; ok { + return agent + } } - if len(issue.Assignees) > 0 { - return issue.Assignees[0].Login + + text := strings.ToLower(issue.Title + " " + issue.Body) + domains := []string{ + "clavis-telemetry", "clavis-vault", "clavis-cli", "clavis-crypto", + "clavis-chrome", "clavis-firefox", "clavis-safari", + "clavis-android", "clavis-ios", "clavitor.ai", + "operations", "monitoring", "noc", + "security", "architecture", "design", "docs", "legal", "qa", "test", } + + for _, domain := range domains { + if strings.Contains(text, domain) { + if agent, ok := domainToAgent[domain]; ok { + return agent + } + } + } + return "" } +func (d *Dispatcher) isAgentActive(agent string) bool { + d.mu.RLock() + defer d.mu.RUnlock() + _, isActive := d.activeAgents[agent] + return isActive +} + func (d *Dispatcher) taskFileExists(agent string, issueNum int) bool { path := filepath.Join(TaskDir, agent, fmt.Sprintf("issue-%d.md", issueNum)) _, err := os.Stat(path) return !os.IsNotExist(err) } -func (d *Dispatcher) dispatchTask(issue Issue) error { - agent := d.getAssignee(issue) +func truncate(s string, maxLen int) string { + if len(s) <= maxLen { + return s + } + return s[:maxLen] + "..." +} + +func (d *Dispatcher) spawnAgent(agent string) { + d.mu.Lock() + d.activeAgents[agent] = time.Now() + d.mu.Unlock() + + d.log("Spawning agent: %s", agent) + + // Check for agent-specific token + agentToken := d.token + tokensFile := "/home/johan/dev/clavitor/forge/dispatcher/agent-tokens.json" + if data, err := os.ReadFile(tokensFile); err == nil { + var tokens map[string]string + if json.Unmarshal(data, &tokens) == nil { + if token, ok := tokens[agent]; ok && token != "" { + agentToken = token + d.log("Using agent-specific token for %s", agent) + } + } + } + + cmd := exec.Command("opencode", "run", + "--agent", agent, + "--dangerously-skip-permissions", + "Check Gitea for work assigned to "+agent) + cmd.Dir = WorkDir + cmd.Env = append(os.Environ(), "GITEA_TOKEN="+agentToken) + + go func() { + output, err := cmd.CombinedOutput() + if err != nil { + d.log("Agent %s failed: %v\nOutput: %s", agent, err, string(output)) + } else { + d.log("Agent %s completed", agent) + } + + d.mu.Lock() + delete(d.activeAgents, agent) + d.mu.Unlock() + }() +} + +func (d *Dispatcher) dispatchTask(issue Issue, agent string) error { if agent == "" { - return nil // Skip unassigned + return fmt.Errorf("no agent specified") } - // Check if already dispatched if d.taskFileExists(agent, issue.Number) { - return nil // Already dispatched + return nil } - // Rate limit: 1 per minute d.mu.Lock() timeSinceLast := time.Since(d.lastDispatch) if timeSinceLast < time.Minute { d.mu.Unlock() - return fmt.Errorf("rate limited: only %v since last dispatch", timeSinceLast) + return fmt.Errorf("rate limited") } d.lastDispatch = time.Now() d.mu.Unlock() priority := d.getPriority(issue) - // Create task file agentDir := filepath.Join(TaskDir, agent) os.MkdirAll(agentDir, 0755) taskFile := filepath.Join(agentDir, fmt.Sprintf("issue-%d.md", issue.Number)) - content := fmt.Sprintf(`# Agent Task — Auto-Dispatched + content := fmt.Sprintf(`# Agent Task - Issue #%d **Agent:** %s -**Issue:** #%d -**Title:** %s **Priority:** %s -**Dispatched:** %s +**Title:** %s ---- +## Instructions -## Your Instruction +1. Read CLAVITOR-AGENT-HANDBOOK.md completely (mandatory) +2. View issue: %s/%s/%s/issues/%d +3. Create branch: %s/fix-%d +4. Implement fix per issue requirements +5. Commit with: "fixes #%d" +6. Push and create PR +7. Add "needs-qa" label to PR -1. **Read QUICKSTART.md** (60 seconds): - /home/johan/dev/clavitor/QUICKSTART.md - -2. **View this issue in Gitea:** - %s/%s/%s/issues/%d - -3. **Execute per handbook Section III** - -4. **Create branch:** - git checkout -b %s/fix-%d - -5. **Implement fix** per issue spec - -6. **Run daily review:** - ./scripts/daily-review.sh (must pass) - -7. **Commit with reference:** - git commit -m "telemetry: fix silent DB error. Fixes #%d" - -8. **Push and create PR:** - git push -u origin %s/fix-%d - tea pulls create --title "%s: %s" --description "Fixes #%d" - -9. **Wait for review** — DO NOT merge your own PR - ---- - -## Issue Context +## Issue Body %s - ---- - -*This task was auto-dispatched by the agent scheduler.* -*Dispatch time: %s* -`, agent, issue.Number, issue.Title, priority, time.Now().Format(time.RFC3339), - GiteaURL, RepoOwner, RepoName, issue.Number, - agent, issue.Number, - issue.Number, - agent, issue.Number, - agent, issue.Title, issue.Number, - truncate(issue.Body, 500), - time.Now().Format(time.RFC3339)) +`, issue.Number, agent, priority, issue.Title, GiteaURL, RepoOwner, RepoName, issue.Number, agent, issue.Number, issue.Number, issue.Body) if err := os.WriteFile(taskFile, []byte(content), 0644); err != nil { return err } - // Record in memory task := DispatchedTask{ IssueNumber: issue.Number, Title: issue.Title, @@ -248,30 +371,80 @@ func (d *Dispatcher) dispatchTask(issue Issue) error { d.tasks = append(d.tasks, task) d.mu.Unlock() - d.log("✅ DISPATCHED: %s → Issue #%d (%s) [Priority: %s]", agent, issue.Number, issue.Title, priority) + d.log("DISPATCHED: %s → Issue #%d (%s) [%s]", agent, issue.Number, truncate(issue.Title, 50), priority) return nil } -func truncate(s string, maxLen int) string { - if len(s) <= maxLen { - return s - } - return s[:maxLen] + "..." -} - func (d *Dispatcher) pollAndDispatch() { - d.log("Polling Gitea for open issues...") + d.log("Polling Gitea for work...") + // 1. Check for PRs needing QA (Shakib) + qaPRs := d.fetchOpenPRs() + if qaPRs != nil { + d.log("Fetched %d open PRs", len(qaPRs)) + } + + var needsQA []PullRequest + for _, pr := range qaPRs { + if d.prHasLabel(pr, "needs-qa") && !d.prHasLabel(pr, "in-progress") { + needsQA = append(needsQA, pr) + } + } + + if len(needsQA) > 0 && !d.isAgentActive("shakib") { + d.log("Assigning %d PRs to Shakib (QA):", len(needsQA)) + for i, pr := range needsQA { + d.log(" %d. PR #%d: %s", i+1, pr.Number, truncate(pr.Title, 60)) + } + d.spawnAgent("shakib") + } + + // 2. Check for PRs needing security review (Victoria) + var needsSecurity []PullRequest + for _, pr := range qaPRs { + if d.prHasLabel(pr, "needs-review") && + !d.prHasLabel(pr, "security-approved") && + !d.prHasLabel(pr, "security-issues") { + needsSecurity = append(needsSecurity, pr) + } + } + + if len(needsSecurity) > 0 && !d.isAgentActive("victoria") { + d.log("Assigning %d PRs to Victoria (Security Review):", len(needsSecurity)) + for i, pr := range needsSecurity { + d.log(" %d. PR #%d: %s", i+1, pr.Number, truncate(pr.Title, 60)) + } + d.spawnAgent("victoria") + } + + // 3. Check for PRs needing final review (Yurii) + var needsFinal []PullRequest + for _, pr := range qaPRs { + if d.prHasLabel(pr, "needs-review") && + d.prHasLabel(pr, "security-approved") && + !d.prHasLabel(pr, "approved") { + needsFinal = append(needsFinal, pr) + } + } + + if len(needsFinal) > 0 && !d.isAgentActive("yurii") { + d.log("Assigning %d PRs to Yurii (Final Review):", len(needsFinal)) + for i, pr := range needsFinal { + d.log(" %d. PR #%d: %s", i+1, pr.Number, truncate(pr.Title, 60)) + } + d.spawnAgent("yurii") + } + + // 4. Check for issues assigned to engineers issues, err := d.fetchOpenIssues() if err != nil { - d.log("❌ Failed to fetch issues: %v", err) + d.log("Failed to fetch issues: %v", err) return } d.log("Found %d open issues", len(issues)) - // Sort by priority (CRITICAL first) sort.Slice(issues, func(i, j int) bool { pi := d.getPriority(issues[i]) pj := d.getPriority(issues[j]) @@ -279,36 +452,42 @@ func (d *Dispatcher) pollAndDispatch() { return priorityOrder[pi] < priorityOrder[pj] }) - // Try to dispatch one task dispatched := false for _, issue := range issues { - if d.getAssignee(issue) == "" { + agent := d.getAssignee(issue) + if agent == "" { + agent = d.findAgentForIssue(issue) + } + if agent == "" || agent == "shakib" || agent == "victoria" || agent == "yurii" { continue } - err := d.dispatchTask(issue) + if d.isAgentActive(agent) { + d.log("Agent %s is already working, skipping", agent) + continue + } + + d.log("Assigning Issue #%d (%s) to %s", issue.Number, truncate(issue.Title, 50), agent) + + err := d.dispatchTask(issue, agent) if err != nil { - d.log("⚠️ Skipped Issue #%d: %v", issue.Number, err) + d.log("Skipped Issue #%d: %v", issue.Number, err) continue } - if err == nil { - dispatched = true - break // Only 1 per poll - } + dispatched = true + break } - if !dispatched { - d.log("ℹ️ No new tasks to dispatch (rate limit or all caught up)") + if !dispatched && len(needsQA) == 0 && len(needsSecurity) == 0 && len(needsFinal) == 0 { + d.log("No new work to dispatch (rate limit or all caught up)") } } -// Web UI handlers func (d *Dispatcher) handleStatus(w http.ResponseWriter, r *http.Request) { d.mu.RLock() defer d.mu.RUnlock() - // Count tasks by agent agentCounts := make(map[string]int) for _, t := range d.tasks { agentCounts[t.Agent]++ @@ -323,110 +502,59 @@ func (d *Dispatcher) handleStatus(w http.ResponseWriter, r *http.Request) { body { font-family: sans-serif; margin: 40px; background: #f5f5f5; } h1 { color: #333; } .status { background: white; padding: 20px; border-radius: 8px; margin: 20px 0; } - .task { background: #e8f5e9; padding: 10px; margin: 5px 0; border-radius: 4px; } - .critical { background: #ffebee; border-left: 4px solid #f44336; } - .high { background: #fff3e0; border-left: 4px solid #ff9800; } table { width: 100%%; border-collapse: collapse; } th, td { text-align: left; padding: 12px; border-bottom: 1px solid #ddd; } th { background: #333; color: white; } .refresh { color: #666; font-size: 0.9em; } - .log { font-family: monospace; background: #263238; color: #aed581; padding: 20px; border-radius: 4px; overflow-x: auto; } -

🔧 Agent Dispatcher Status

+

Agent Dispatcher Status

Auto-refresh every 10 seconds | Last poll: %s

Overview

Total dispatched: %d tasks

-

Last dispatch: %s

-

Rate limit: 1 task per minute

-

Task directory: %s

- -

By Agent

+

Active agents: %d

+

Web UI: http://localhost:%s

+
+ +
+

Active Agents

- -`, time.Now().Format(time.RFC3339), len(d.tasks), - d.lastDispatch.Format(time.RFC3339), TaskDir) + +`, time.Now().Format(time.RFC3339), len(d.tasks), len(d.activeAgents), WebPort) - for agent, count := range agentCounts { - fmt.Fprintf(w, "\n", agent, count) + for agent, startTime := range d.activeAgents { + fmt.Fprintf(w, ``, agent, startTime.Format(time.RFC3339)) } fmt.Fprintf(w, `
AgentTasks Dispatched
AgentStarted
%s%d
%s%s
- -
-

Recent Dispatches

-`) - - // Show last 10 tasks (reverse order) - for i := len(d.tasks) - 1; i >= 0 && i >= len(d.tasks)-10; i-- { - t := d.tasks[i] - priorityClass := "" - if t.Priority == "CRITICAL" { - priorityClass = "critical" - } else if t.Priority == "HIGH" { - priorityClass = "high" - } - - fmt.Fprintf(w, ` -
- #%d — %s %s | %s
- Agent: %s | File: %s -
-`, priorityClass, t.IssueNumber, t.Title, t.Priority, t.DispatchedAt.Format("15:04:05"), t.Agent, t.TaskFile) - } - - fmt.Fprintf(w, ` -
- -
-

Live Log (tail -f %s)

-
`, LogFile)
-	
-	// Read last 50 lines of log
-	logContent, _ := os.ReadFile(LogFile)
-	lines := strings.Split(string(logContent), "\n")
-	start := 0
-	if len(lines) > 50 {
-		start = len(lines) - 50
-	}
-	for i := start; i < len(lines); i++ {
-		fmt.Fprintf(w, "%s\n", lines[i])
-	}
-	
-	fmt.Fprintf(w, `
-
- - -`) +`) } func (d *Dispatcher) handleTasks(w http.ResponseWriter, r *http.Request) { - agent := r.URL.Query().Get("agent") - if agent == "" { - http.Error(w, "Missing agent parameter", 400) - return - } - - // List task files for this agent - agentDir := filepath.Join(TaskDir, agent) - files, err := os.ReadDir(agentDir) - if err != nil { - fmt.Fprintf(w, "No tasks for %s\n", agent) - return - } + d.mu.RLock() + defer d.mu.RUnlock() w.Header().Set("Content-Type", "text/plain") - fmt.Fprintf(w, "Tasks for %s:\n\n", agent) - for _, f := range files { - if strings.HasSuffix(f.Name(), ".md") { - fmt.Fprintf(w, "- %s/%s\n", agentDir, f.Name()) + fmt.Fprintf(w, "# Dispatched Tasks (%d total)\n\n", len(d.tasks)) + + for _, t := range d.tasks { + fmt.Fprintf(w, "- %s: Issue #%d (%s) [%s] at %s\n", t.Agent, t.IssueNumber, t.Title, t.Priority, t.DispatchedAt.Format(time.RFC3339)) + } + + agents, _ := os.ReadDir(TaskDir) + fmt.Fprintf(w, "\n# Agent Directories\n") + for _, agent := range agents { + if agent.IsDir() { + agentDir := filepath.Join(TaskDir, agent.Name()) + files, _ := os.ReadDir(agentDir) + fmt.Fprintf(w, "- %s: %d files\n", agent.Name(), len(files)) } } } @@ -438,12 +566,9 @@ func main() { d.log("Agent Dispatcher Starting") d.log("Gitea: %s", GiteaURL) d.log("Repo: %s/%s", RepoOwner, RepoName) - d.log("Task Dir: %s", TaskDir) d.log("Web UI: http://localhost:%s", WebPort) - d.log("Rate Limit: 1 task per minute") d.log("========================================") - // Start web server go func() { http.HandleFunc("/", d.handleStatus) http.HandleFunc("/tasks", d.handleTasks) @@ -451,10 +576,8 @@ func main() { log.Fatal(http.ListenAndServe(":"+WebPort, nil)) }() - // Immediate first poll d.pollAndDispatch() - // Main loop ticker := time.NewTicker(PollInterval) defer ticker.Stop()