dispatcher: add domain-to-agent mapping and opencode agent spawning
Implements the dispatcher flow for routing issues to specialized agents: - Domain-to-agent mapping from CLAVITOR-AGENT-HANDBOOK.md Section I - Automatic agent spawning via opencode CLI - Webhook handler for real-time Gitea events - Active agent tracking to prevent duplicate work fixes #5
This commit is contained in:
parent
6c2b708c4d
commit
30a904247d
|
|
@ -7,8 +7,8 @@ import (
|
|||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
|
@ -21,6 +21,7 @@ const (
|
|||
RepoName = "clavitor"
|
||||
PollInterval = 60 * time.Second
|
||||
TaskDir = "/home/johan/dev/clavitor/.agent-tasks"
|
||||
WorkDir = "/home/johan/dev/clavitor"
|
||||
LogFile = "/home/johan/dev/clavitor/.agent-dispatcher.log"
|
||||
WebPort = "8098"
|
||||
)
|
||||
|
|
@ -53,6 +54,30 @@ type DispatchedTask struct {
|
|||
TaskFile string `json:"task_file"`
|
||||
}
|
||||
|
||||
// Domain to agent mapping (from CLAVITOR-AGENT-HANDBOOK.md Section I)
|
||||
var domainToAgent = map[string]string{
|
||||
"clavis-vault": "sarah",
|
||||
"clavis-cli": "charles",
|
||||
"clavis-crypto": "maria",
|
||||
"clavis-chrome": "james",
|
||||
"clavis-firefox": "james",
|
||||
"clavis-safari": "james",
|
||||
"clavis-android": "xiao",
|
||||
"clavis-ios": "xiao",
|
||||
"clavitor.ai": "emma",
|
||||
"clavis-telemetry": "hans",
|
||||
"operations": "hans",
|
||||
"monitoring": "hans",
|
||||
"noc": "hans",
|
||||
"security": "victoria",
|
||||
"architecture": "arthur",
|
||||
"design": "luna",
|
||||
"docs": "thomas",
|
||||
"legal": "hugo",
|
||||
"qa": "shakib",
|
||||
"test": "shakib",
|
||||
}
|
||||
|
||||
// Global state
|
||||
type Dispatcher struct {
|
||||
mu sync.RWMutex
|
||||
|
|
@ -60,6 +85,7 @@ type Dispatcher struct {
|
|||
lastDispatch time.Time
|
||||
token string
|
||||
logger *log.Logger
|
||||
activeAgents map[string]time.Time // Track agents currently working
|
||||
}
|
||||
|
||||
func NewDispatcher() *Dispatcher {
|
||||
|
|
@ -80,8 +106,9 @@ func NewDispatcher() *Dispatcher {
|
|||
os.MkdirAll(TaskDir, 0755)
|
||||
|
||||
return &Dispatcher{
|
||||
token: token,
|
||||
logger: logger,
|
||||
token: token,
|
||||
logger: logger,
|
||||
activeAgents: make(map[string]time.Time),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -145,13 +172,58 @@ func (d *Dispatcher) taskFileExists(agent string, issueNum int) bool {
|
|||
return !os.IsNotExist(err)
|
||||
}
|
||||
|
||||
func (d *Dispatcher) dispatchTask(issue Issue) error {
|
||||
agent := d.getAssignee(issue)
|
||||
if agent == "" {
|
||||
return nil // Skip unassigned
|
||||
// Find agent for issue based on domain
|
||||
func (d *Dispatcher) findAgentForIssue(issue Issue) string {
|
||||
domain := d.getDomain(issue)
|
||||
if domain == "" {
|
||||
return "" // No domain found
|
||||
}
|
||||
|
||||
// Check if already dispatched
|
||||
agent, ok := domainToAgent[domain]
|
||||
if !ok {
|
||||
return "" // Unknown domain
|
||||
}
|
||||
|
||||
return agent
|
||||
}
|
||||
|
||||
// Extract domain from issue (body, title, or labels)
|
||||
func (d *Dispatcher) getDomain(issue Issue) string {
|
||||
// Check labels first
|
||||
for _, label := range issue.Labels {
|
||||
if agent, ok := domainToAgent[label.Name]; ok {
|
||||
return agent
|
||||
}
|
||||
}
|
||||
|
||||
// Check title and body for domain keywords
|
||||
text := issue.Title + " " + issue.Body
|
||||
text = strings.ToLower(text)
|
||||
|
||||
// Check domains in priority order (more specific first)
|
||||
domains := []string{
|
||||
"clavis-telemetry", "clavis-vault", "clavis-cli", "clavis-crypto",
|
||||
"clavis-chrome", "clavis-firefox", "clavis-safari",
|
||||
"clavis-android", "clavis-ios", "clavitor.ai",
|
||||
"operations", "monitoring", "noc",
|
||||
"security", "architecture", "design", "docs", "legal", "qa", "test",
|
||||
}
|
||||
|
||||
for _, domain := range domains {
|
||||
if strings.Contains(text, domain) {
|
||||
return domain
|
||||
}
|
||||
}
|
||||
|
||||
return ""
|
||||
}
|
||||
|
||||
func (d *Dispatcher) dispatchTask(issue Issue, agent string) error {
|
||||
if agent == "" {
|
||||
return fmt.Errorf("no agent available")
|
||||
}
|
||||
|
||||
// Check if already dispatched to this agent
|
||||
if d.taskFileExists(agent, issue.Number) {
|
||||
return nil // Already dispatched
|
||||
}
|
||||
|
|
@ -263,43 +335,212 @@ func truncate(s string, maxLen int) string {
|
|||
func (d *Dispatcher) pollAndDispatch() {
|
||||
d.log("Polling Gitea for open issues...")
|
||||
|
||||
// Fetch all issues once
|
||||
issues, err := d.fetchOpenIssues()
|
||||
if err != nil {
|
||||
d.log("❌ Failed to fetch issues: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
d.log("Found %d open issues", len(issues))
|
||||
d.log("Found %d total open issues", len(issues))
|
||||
|
||||
// Sort by priority (CRITICAL first)
|
||||
sort.Slice(issues, func(i, j int) bool {
|
||||
pi := d.getPriority(issues[i])
|
||||
pj := d.getPriority(issues[j])
|
||||
priorityOrder := map[string]int{"CRITICAL": 0, "HIGH": 1, "NORMAL": 2, "LOW": 3}
|
||||
return priorityOrder[pi] < priorityOrder[pj]
|
||||
})
|
||||
|
||||
// Try to dispatch one task
|
||||
dispatched := false
|
||||
// Group issues by assignee
|
||||
byAgent := make(map[string][]Issue)
|
||||
for _, issue := range issues {
|
||||
if d.getAssignee(issue) == "" {
|
||||
continue
|
||||
assignee := d.getAssignee(issue)
|
||||
if assignee == "" {
|
||||
assignee = d.findAgentForIssue(issue) // Fallback to domain mapping
|
||||
}
|
||||
|
||||
err := d.dispatchTask(issue)
|
||||
if err != nil {
|
||||
d.log("⚠️ Skipped Issue #%d: %v", issue.Number, err)
|
||||
continue
|
||||
}
|
||||
|
||||
if err == nil {
|
||||
dispatched = true
|
||||
break // Only 1 per poll
|
||||
if assignee != "" {
|
||||
byAgent[assignee] = append(byAgent[assignee], issue)
|
||||
}
|
||||
}
|
||||
|
||||
if !dispatched {
|
||||
d.log("ℹ️ No new tasks to dispatch (rate limit or all caught up)")
|
||||
// Spawn agents that have work and aren't already active
|
||||
for agent, agentIssues := range byAgent {
|
||||
if len(agentIssues) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
// Check if agent is already working
|
||||
d.mu.Lock()
|
||||
_, isActive := d.activeAgents[agent]
|
||||
d.mu.Unlock()
|
||||
|
||||
if isActive {
|
||||
d.log("ℹ️ Agent %s is already working, skipping", agent)
|
||||
continue
|
||||
}
|
||||
|
||||
// Spawn the agent
|
||||
d.spawnAgent(agent)
|
||||
}
|
||||
}
|
||||
|
||||
// spawnAgent launches opencode for an agent
|
||||
func (d *Dispatcher) spawnAgent(agent string) {
|
||||
d.mu.Lock()
|
||||
d.activeAgents[agent] = time.Now()
|
||||
d.mu.Unlock()
|
||||
|
||||
d.log("🚀 Spawning agent: %s", agent)
|
||||
|
||||
// Build command
|
||||
cmd := exec.Command("opencode", "run",
|
||||
"--agent", agent,
|
||||
"--dangerously-skip-permissions",
|
||||
"Check Gitea for issues assigned to "+agent+" and fix the highest priority one")
|
||||
cmd.Dir = WorkDir
|
||||
cmd.Env = append(os.Environ(),
|
||||
"GITEA_TOKEN="+d.token,
|
||||
)
|
||||
|
||||
// Run in background
|
||||
go func() {
|
||||
output, err := cmd.CombinedOutput()
|
||||
if err != nil {
|
||||
d.log("❌ Agent %s failed: %v\nOutput: %s", agent, err, string(output))
|
||||
} else {
|
||||
d.log("✅ Agent %s completed", agent)
|
||||
}
|
||||
|
||||
// Mark agent as done
|
||||
d.mu.Lock()
|
||||
delete(d.activeAgents, agent)
|
||||
d.mu.Unlock()
|
||||
}()
|
||||
}
|
||||
|
||||
// Webhook handler for Gitea events
|
||||
func (d *Dispatcher) handleWebhook(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != "POST" {
|
||||
http.Error(w, "Method not allowed", 405)
|
||||
return
|
||||
}
|
||||
|
||||
body, err := io.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
http.Error(w, "Bad request", 400)
|
||||
return
|
||||
}
|
||||
|
||||
// Parse webhook payload
|
||||
var payload struct {
|
||||
Action string `json:"action"`
|
||||
Issue *Issue `json:"issue"`
|
||||
PullRequest *struct {
|
||||
Number int `json:"number"`
|
||||
Title string `json:"title"`
|
||||
State string `json:"state"`
|
||||
User struct {
|
||||
Login string `json:"login"`
|
||||
} `json:"user"`
|
||||
Head struct {
|
||||
Ref string `json:"ref"`
|
||||
} `json:"head"`
|
||||
} `json:"pull_request"`
|
||||
Repository struct {
|
||||
FullName string `json:"full_name"`
|
||||
} `json:"repository"`
|
||||
Sender struct {
|
||||
Login string `json:"login"`
|
||||
} `json:"sender"`
|
||||
}
|
||||
|
||||
if err := json.Unmarshal(body, &payload); err != nil {
|
||||
d.log("❌ Webhook: Failed to parse payload: %v", err)
|
||||
http.Error(w, "Bad request", 400)
|
||||
return
|
||||
}
|
||||
|
||||
// Get event type from header
|
||||
eventType := r.Header.Get("X-Gitea-Event")
|
||||
if eventType == "" {
|
||||
eventType = r.Header.Get("X-GitHub-Event") // Fallback
|
||||
}
|
||||
|
||||
d.log("📨 Webhook received: %s from %s", eventType, payload.Sender.Login)
|
||||
|
||||
// Handle issue events
|
||||
if eventType == "issues" && payload.Issue != nil {
|
||||
switch payload.Action {
|
||||
case "opened", "reopened", "edited", "assigned":
|
||||
d.log("📋 Issue #%d %s - processing dispatch", payload.Issue.Number, payload.Action)
|
||||
d.processSingleIssue(*payload.Issue)
|
||||
case "closed":
|
||||
d.log("✅ Issue #%d closed - checking for next task", payload.Issue.Number)
|
||||
d.markTaskDone(*payload.Issue)
|
||||
}
|
||||
}
|
||||
|
||||
// Handle PR merge (task completed)
|
||||
if eventType == "pull_request" && payload.PullRequest != nil {
|
||||
if payload.Action == "closed" && payload.PullRequest.State == "merged" {
|
||||
d.log("🎉 PR #%d merged by %s - agent completed task",
|
||||
payload.PullRequest.Number, payload.PullRequest.User.Login)
|
||||
// Extract agent from branch name (agent/fix-123)
|
||||
branch := payload.PullRequest.Head.Ref
|
||||
if idx := strings.Index(branch, "/"); idx > 0 {
|
||||
agent := branch[:idx]
|
||||
d.markAgentTaskDone(agent)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
w.WriteHeader(200)
|
||||
fmt.Fprintln(w, "OK")
|
||||
}
|
||||
|
||||
// Process a single issue immediately (for webhooks)
|
||||
func (d *Dispatcher) processSingleIssue(issue Issue) {
|
||||
agent := d.findAgentForIssue(issue)
|
||||
if agent == "" {
|
||||
d.log("ℹ️ Issue #%d: No available agent for domain", issue.Number)
|
||||
return
|
||||
}
|
||||
|
||||
err := d.dispatchTask(issue, agent)
|
||||
if err != nil {
|
||||
d.log("⚠️ Issue #%d: Failed to dispatch to %s: %v", issue.Number, agent, err)
|
||||
} else {
|
||||
d.log("✅ Issue #%d dispatched to %s", issue.Number, agent)
|
||||
}
|
||||
}
|
||||
|
||||
// Mark a task as done when issue is closed
|
||||
func (d *Dispatcher) markTaskDone(issue Issue) {
|
||||
// Find which agent had this task
|
||||
for agent := range domainToAgent {
|
||||
taskFile := filepath.Join(TaskDir, agent, fmt.Sprintf("issue-%d.md", issue.Number))
|
||||
if _, err := os.Stat(taskFile); err == nil {
|
||||
// Rename to .done.md
|
||||
doneFile := filepath.Join(TaskDir, agent, fmt.Sprintf("issue-%d.done.md", issue.Number))
|
||||
os.Rename(taskFile, doneFile)
|
||||
d.log("✅ Task for Issue #%d marked as done for %s", issue.Number, agent)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Mark task done when PR is merged
|
||||
func (d *Dispatcher) markAgentTaskDone(agent string) {
|
||||
agentDir := filepath.Join(TaskDir, agent)
|
||||
files, err := os.ReadDir(agentDir)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
for _, f := range files {
|
||||
if strings.HasSuffix(f.Name(), ".md") && !strings.HasSuffix(f.Name(), ".done.md") {
|
||||
taskFile := filepath.Join(agentDir, f.Name())
|
||||
doneFile := filepath.Join(agentDir, f.Name()[:len(f.Name())-3]+".done.md")
|
||||
os.Rename(taskFile, doneFile)
|
||||
d.log("✅ Agent %s task marked as done (PR merged)", agent)
|
||||
|
||||
// Trigger dispatch of next task for this agent
|
||||
go d.pollAndDispatch()
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -440,18 +681,21 @@ func main() {
|
|||
d.log("Repo: %s/%s", RepoOwner, RepoName)
|
||||
d.log("Task Dir: %s", TaskDir)
|
||||
d.log("Web UI: http://localhost:%s", WebPort)
|
||||
d.log("Rate Limit: 1 task per minute")
|
||||
d.log("Webhook endpoint: http://localhost:%s/webhook", WebPort)
|
||||
d.log("Mode: Webhook listener + backup polling")
|
||||
d.log("========================================")
|
||||
|
||||
// Start web server
|
||||
// Start web server (includes webhook listener)
|
||||
go func() {
|
||||
http.HandleFunc("/", d.handleStatus)
|
||||
http.HandleFunc("/tasks", d.handleTasks)
|
||||
http.HandleFunc("/webhook", d.handleWebhook) // Gitea webhooks
|
||||
d.log("Web UI available at http://localhost:%s", WebPort)
|
||||
d.log("Webhook endpoint: http://localhost:%s/webhook", WebPort)
|
||||
log.Fatal(http.ListenAndServe(":"+WebPort, nil))
|
||||
}()
|
||||
|
||||
// Immediate first poll
|
||||
// Immediate first poll (backup for any missed webhooks)
|
||||
d.pollAndDispatch()
|
||||
|
||||
// Main loop
|
||||
|
|
|
|||
Loading…
Reference in New Issue