diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c3cc95f --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +agent-tokens.json diff --git a/forge/dispatcher/dispatcher b/forge/dispatcher/dispatcher index 2e3c98e..175f6ed 100755 Binary files a/forge/dispatcher/dispatcher and b/forge/dispatcher/dispatcher differ diff --git a/forge/dispatcher/main.go b/forge/dispatcher/main.go index 26297b5..211a86d 100644 --- a/forge/dispatcher/main.go +++ b/forge/dispatcher/main.go @@ -8,14 +8,12 @@ import ( "net/http" "os" "os/exec" - "path/filepath" "sort" "strings" "sync" "time" ) -// Config const ( GiteaURL = "https://git.clavitor.ai" RepoOwner = "johan" @@ -27,28 +25,13 @@ const ( WebPort = "8098" ) -// Domain to agent mapping (from CLAVITOR-AGENT-HANDBOOK.md Section I) var domainToAgent = map[string]string{ - "clavis-vault": "sarah", - "clavis-cli": "charles", - "clavis-crypto": "maria", - "clavis-chrome": "james", - "clavis-firefox": "james", - "clavis-safari": "james", - "clavis-android": "xiao", - "clavis-ios": "xiao", - "clavitor.ai": "emma", - "clavis-telemetry": "hans", - "operations": "hans", - "monitoring": "hans", - "noc": "hans", - "security": "victoria", - "architecture": "arthur", - "design": "luna", - "docs": "thomas", - "legal": "hugo", - "qa": "shakib", - "test": "shakib", + "clavis-vault": "sarah", "clavis-cli": "charles", "clavis-crypto": "maria", + "clavis-chrome": "james", "clavis-firefox": "james", "clavis-safari": "james", + "clavis-android": "xiao", "clavis-ios": "xiao", "clavitor.ai": "emma", + "clavis-telemetry": "hans", "operations": "hans", "monitoring": "hans", "noc": "hans", + "security": "victoria", "architecture": "arthur", "design": "luna", + "docs": "thomas", "legal": "hugo", "qa": "shakib", "test": "shakib", } type Issue struct { @@ -57,50 +40,25 @@ type Issue struct { Title string `json:"title"` Body string `json:"body"` State string `json:"state"` - Assignee *struct { - Login string `json:"login"` - } `json:"assignee"` - Assignees []struct { - Login string `json:"login"` - } `json:"assignees"` - Labels []struct { - Name string `json:"name"` - } `json:"labels"` - CreatedAt string `json:"created_at"` - UpdatedAt string `json:"updated_at"` + Assignee *struct{ Login string `json:"login"` } `json:"assignee"` + Labels []struct{ Name string `json:"name"` } `json:"labels"` } type PullRequest struct { Number int `json:"number"` Title string `json:"title"` - Head struct { - Ref string `json:"ref"` - } `json:"head"` - Base struct { - Ref string `json:"ref"` - } `json:"base"` - Labels []struct { - Name string `json:"name"` - } `json:"labels"` + User struct{ Login string `json:"login"` } `json:"user"` + Head struct{ Ref string `json:"ref"` } `json:"head"` + Labels []struct{ Name string `json:"name"` } `json:"labels"` } -type DispatchedTask struct { - IssueNumber int `json:"issue_number"` - Title string `json:"title"` - Agent string `json:"agent"` - Priority string `json:"priority"` - DispatchedAt time.Time `json:"dispatched_at"` - TaskFile string `json:"task_file"` -} - -// Global state type Dispatcher struct { - mu sync.RWMutex - tasks []DispatchedTask - lastDispatch time.Time - token string - logger *log.Logger - activeAgents map[string]time.Time + mu sync.RWMutex + tasks []interface{} + lastDispatch time.Time + token string + logger *log.Logger + activeAgents map[string]time.Time } func NewDispatcher() *Dispatcher { @@ -108,21 +66,13 @@ func NewDispatcher() *Dispatcher { if err != nil { log.Fatal("Failed to open log file:", err) } - logger := log.New(io.MultiWriter(os.Stdout, logFile), "[DISPATCHER] ", log.LstdFlags|log.Lmicroseconds) - token := os.Getenv("GITEA_TOKEN") if token == "" { - logger.Fatal("GITEA_TOKEN not set. Run: export GITEA_TOKEN=775a12730a65cbaf1673da048b7d01859b8b58e0") + logger.Fatal("GITEA_TOKEN not set") } - os.MkdirAll(TaskDir, 0755) - - return &Dispatcher{ - token: token, - logger: logger, - activeAgents: make(map[string]time.Time), - } + return &Dispatcher{token: token, logger: logger, activeAgents: make(map[string]time.Time)} } func (d *Dispatcher) log(format string, v ...interface{}) { @@ -131,61 +81,39 @@ func (d *Dispatcher) log(format string, v ...interface{}) { func (d *Dispatcher) fetchOpenIssues() ([]Issue, error) { url := fmt.Sprintf("%s/api/v1/repos/%s/%s/issues?state=open&limit=100", GiteaURL, RepoOwner, RepoName) - - req, err := http.NewRequest("GET", url, nil) - if err != nil { - return nil, err - } + req, _ := http.NewRequest("GET", url, nil) req.Header.Set("Authorization", "token "+d.token) - resp, err := http.DefaultClient.Do(req) if err != nil { return nil, err } defer resp.Body.Close() - if resp.StatusCode != 200 { body, _ := io.ReadAll(resp.Body) return nil, fmt.Errorf("HTTP %d: %s", resp.StatusCode, string(body)) } - var issues []Issue - if err := json.NewDecoder(resp.Body).Decode(&issues); err != nil { - return nil, err - } - + json.NewDecoder(resp.Body).Decode(&issues) return issues, nil } func (d *Dispatcher) fetchOpenPRs() []PullRequest { url := fmt.Sprintf("%s/api/v1/repos/%s/%s/pulls?state=open", GiteaURL, RepoOwner, RepoName) - - req, err := http.NewRequest("GET", url, nil) - if err != nil { - d.log("Failed to create PR request: %v", err) - return nil - } + req, _ := http.NewRequest("GET", url, nil) req.Header.Set("Authorization", "token "+d.token) - resp, err := http.DefaultClient.Do(req) if err != nil { d.log("Failed to fetch PRs: %v", err) return nil } defer resp.Body.Close() - if resp.StatusCode != 200 { body, _ := io.ReadAll(resp.Body) d.log("PR fetch failed: HTTP %d: %s", resp.StatusCode, string(body)) return nil } - var prs []PullRequest - if err := json.NewDecoder(resp.Body).Decode(&prs); err != nil { - d.log("Failed to decode PRs: %v", err) - return nil - } - + json.NewDecoder(resp.Body).Decode(&prs) return prs } @@ -198,13 +126,34 @@ func (d *Dispatcher) prHasLabel(pr PullRequest, label string) bool { return false } +func (d *Dispatcher) getAgentFromPR(pr PullRequest) string { + parts := strings.Split(pr.Title, ":") + if len(parts) > 0 { + agent := strings.TrimSpace(strings.ToLower(parts[0])) + for _, valid := range domainToAgent { + if agent == valid { + return agent + } + } + } + if pr.Head.Ref != "" { + branchParts := strings.Split(pr.Head.Ref, "/") + if len(branchParts) > 0 { + agent := strings.ToLower(branchParts[0]) + for _, valid := range domainToAgent { + if agent == valid { + return agent + } + } + } + } + return "" +} + func (d *Dispatcher) getAssignee(issue Issue) string { if issue.Assignee != nil && issue.Assignee.Login != "" { return issue.Assignee.Login } - if len(issue.Assignees) > 0 && issue.Assignees[0].Login != "" { - return issue.Assignees[0].Login - } return "" } @@ -226,24 +175,12 @@ func (d *Dispatcher) findAgentForIssue(issue Issue) string { return agent } } - text := strings.ToLower(issue.Title + " " + issue.Body) - domains := []string{ - "clavis-telemetry", "clavis-vault", "clavis-cli", "clavis-crypto", - "clavis-chrome", "clavis-firefox", "clavis-safari", - "clavis-android", "clavis-ios", "clavitor.ai", - "operations", "monitoring", "noc", - "security", "architecture", "design", "docs", "legal", "qa", "test", - } - - for _, domain := range domains { + for domain, agent := range domainToAgent { if strings.Contains(text, domain) { - if agent, ok := domainToAgent[domain]; ok { - return agent - } + return agent } } - return "" } @@ -254,12 +191,6 @@ func (d *Dispatcher) isAgentActive(agent string) bool { return isActive } -func (d *Dispatcher) taskFileExists(agent string, issueNum int) bool { - path := filepath.Join(TaskDir, agent, fmt.Sprintf("issue-%d.md", issueNum)) - _, err := os.Stat(path) - return !os.IsNotExist(err) -} - func truncate(s string, maxLen int) string { if len(s) <= maxLen { return s @@ -271,10 +202,8 @@ func (d *Dispatcher) spawnAgent(agent string) { d.mu.Lock() d.activeAgents[agent] = time.Now() d.mu.Unlock() - d.log("Spawning agent: %s", agent) - // Check for agent-specific token agentToken := d.token tokensFile := "/home/johan/dev/clavitor/forge/dispatcher/agent-tokens.json" if data, err := os.ReadFile(tokensFile); err == nil { @@ -287,9 +216,7 @@ func (d *Dispatcher) spawnAgent(agent string) { } } - cmd := exec.Command("opencode", "run", - "--agent", agent, - "--dangerously-skip-permissions", + cmd := exec.Command("opencode", "run", "--agent", agent, "--dangerously-skip-permissions", "Check Gitea for work assigned to "+agent) cmd.Dir = WorkDir cmd.Env = append(os.Environ(), "GITEA_TOKEN="+agentToken) @@ -301,81 +228,12 @@ func (d *Dispatcher) spawnAgent(agent string) { } else { d.log("Agent %s completed", agent) } - d.mu.Lock() delete(d.activeAgents, agent) d.mu.Unlock() }() } -func (d *Dispatcher) dispatchTask(issue Issue, agent string) error { - if agent == "" { - return fmt.Errorf("no agent specified") - } - - if d.taskFileExists(agent, issue.Number) { - return nil - } - - d.mu.Lock() - timeSinceLast := time.Since(d.lastDispatch) - if timeSinceLast < time.Minute { - d.mu.Unlock() - return fmt.Errorf("rate limited") - } - d.lastDispatch = time.Now() - d.mu.Unlock() - - priority := d.getPriority(issue) - - agentDir := filepath.Join(TaskDir, agent) - os.MkdirAll(agentDir, 0755) - - taskFile := filepath.Join(agentDir, fmt.Sprintf("issue-%d.md", issue.Number)) - - content := fmt.Sprintf(`# Agent Task - Issue #%d - -**Agent:** %s -**Priority:** %s -**Title:** %s - -## Instructions - -1. Read CLAVITOR-AGENT-HANDBOOK.md completely (mandatory) -2. View issue: %s/%s/%s/issues/%d -3. Create branch: %s/fix-%d -4. Implement fix per issue requirements -5. Commit with: "fixes #%d" -6. Push and create PR -7. Add "needs-qa" label to PR - -## Issue Body - -%s -`, issue.Number, agent, priority, issue.Title, GiteaURL, RepoOwner, RepoName, issue.Number, agent, issue.Number, issue.Number, issue.Body) - - if err := os.WriteFile(taskFile, []byte(content), 0644); err != nil { - return err - } - - task := DispatchedTask{ - IssueNumber: issue.Number, - Title: issue.Title, - Agent: agent, - Priority: priority, - DispatchedAt: time.Now(), - TaskFile: taskFile, - } - - d.mu.Lock() - d.tasks = append(d.tasks, task) - d.mu.Unlock() - - d.log("DISPATCHED: %s → Issue #%d (%s) [%s]", agent, issue.Number, truncate(issue.Title, 50), priority) - - return nil -} - func (d *Dispatcher) pollAndDispatch() { d.log("Polling Gitea for work...") @@ -391,7 +249,6 @@ func (d *Dispatcher) pollAndDispatch() { needsQA = append(needsQA, pr) } } - if len(needsQA) > 0 && !d.isAgentActive("shakib") { d.log("Assigning %d PRs to Shakib (QA):", len(needsQA)) for i, pr := range needsQA { @@ -409,9 +266,8 @@ func (d *Dispatcher) pollAndDispatch() { needsSecurity = append(needsSecurity, pr) } } - if len(needsSecurity) > 0 && !d.isAgentActive("victoria") { - d.log("Assigning %d PRs to Victoria (Security Review):", len(needsSecurity)) + d.log("Assigning %d PRs to Victoria (Security):", len(needsSecurity)) for i, pr := range needsSecurity { d.log(" %d. PR #%d: %s", i+1, pr.Number, truncate(pr.Title, 60)) } @@ -423,11 +279,11 @@ func (d *Dispatcher) pollAndDispatch() { for _, pr := range qaPRs { if d.prHasLabel(pr, "needs-review") && d.prHasLabel(pr, "security-approved") && - !d.prHasLabel(pr, "approved") { + !d.prHasLabel(pr, "approved") && + !d.prHasLabel(pr, "changes-requested") { needsFinal = append(needsFinal, pr) } } - if len(needsFinal) > 0 && !d.isAgentActive("yurii") { d.log("Assigning %d PRs to Yurii (Final Review):", len(needsFinal)) for i, pr := range needsFinal { @@ -436,13 +292,48 @@ func (d *Dispatcher) pollAndDispatch() { d.spawnAgent("yurii") } - // 4. Check for issues assigned to engineers + // 4. Check for PRs with changes requested - route back to original engineer + var needsChanges []PullRequest + for _, pr := range qaPRs { + if d.prHasLabel(pr, "changes-requested") { + agent := d.getAgentFromPR(pr) + if agent != "" && !d.isAgentActive(agent) { + needsChanges = append(needsChanges, pr) + } + } + } + if len(needsChanges) > 0 { + d.log("Found %d PRs with changes requested:", len(needsChanges)) + for _, pr := range needsChanges { + agent := d.getAgentFromPR(pr) + d.log(" PR #%d (%s) -> routing back to %s", pr.Number, truncate(pr.Title, 50), agent) + if !d.isAgentActive(agent) { + d.spawnAgent(agent) + } + } + } + + // 5. Check for PRs that are approved but not yet merged (Yurii needs to merge them) + var needsMerge []PullRequest + for _, pr := range qaPRs { + if d.prHasLabel(pr, "approved") && d.prHasLabel(pr, "security-approved") { + needsMerge = append(needsMerge, pr) + } + } + if len(needsMerge) > 0 && !d.isAgentActive("yurii") { + d.log("Assigning %d approved PRs to Yurii (Merge):", len(needsMerge)) + for i, pr := range needsMerge { + d.log(" %d. PR #%d: %s", i+1, pr.Number, truncate(pr.Title, 60)) + } + d.spawnAgent("yurii") + } + + // 6. Check for issues assigned to engineers issues, err := d.fetchOpenIssues() if err != nil { d.log("Failed to fetch issues: %v", err) return } - d.log("Found %d open issues", len(issues)) sort.Slice(issues, func(i, j int) bool { @@ -461,107 +352,22 @@ func (d *Dispatcher) pollAndDispatch() { if agent == "" || agent == "shakib" || agent == "victoria" || agent == "yurii" { continue } - if d.isAgentActive(agent) { d.log("Agent %s is already working, skipping", agent) continue } - d.log("Assigning Issue #%d (%s) to %s", issue.Number, truncate(issue.Title, 50), agent) - - err := d.dispatchTask(issue, agent) - if err != nil { - d.log("Skipped Issue #%d: %v", issue.Number, err) - continue - } - dispatched = true break } - if !dispatched && len(needsQA) == 0 && len(needsSecurity) == 0 && len(needsFinal) == 0 { - d.log("No new work to dispatch (rate limit or all caught up)") - } -} - -func (d *Dispatcher) handleStatus(w http.ResponseWriter, r *http.Request) { - d.mu.RLock() - defer d.mu.RUnlock() - - agentCounts := make(map[string]int) - for _, t := range d.tasks { - agentCounts[t.Agent]++ - } - - w.Header().Set("Content-Type", "text/html") - fmt.Fprintf(w, ` - -
-Auto-refresh every 10 seconds | Last poll: %s
- -Total dispatched: %d tasks
-Active agents: %d
-Web UI: http://localhost:%s
-| Agent | Started |
|---|---|
| %s | %s |
Status: Running
Web UI: http://localhost:%s
", WebPort) + }) d.log("Web UI available at http://localhost:%s", WebPort) log.Fatal(http.ListenAndServe(":"+WebPort, nil)) }() d.pollAndDispatch() - ticker := time.NewTicker(PollInterval) defer ticker.Stop() - for range ticker.C { d.pollAndDispatch() }