Add detailed task assignment logging to dispatcher

This commit is contained in:
James 2026-04-09 04:56:12 -04:00
parent 8400acffb9
commit 7dbbadb62e
1 changed files with 299 additions and 176 deletions

View File

@ -7,6 +7,7 @@ import (
"log"
"net/http"
"os"
"os/exec"
"path/filepath"
"sort"
"strings"
@ -21,10 +22,35 @@ const (
RepoName = "clavitor"
PollInterval = 60 * time.Second
TaskDir = "/home/johan/dev/clavitor/.agent-tasks"
WorkDir = "/home/johan/dev/clavitor"
LogFile = "/home/johan/dev/clavitor/.agent-dispatcher.log"
WebPort = "8098"
)
// Domain to agent mapping (from CLAVITOR-AGENT-HANDBOOK.md Section I)
var domainToAgent = map[string]string{
"clavis-vault": "sarah",
"clavis-cli": "charles",
"clavis-crypto": "maria",
"clavis-chrome": "james",
"clavis-firefox": "james",
"clavis-safari": "james",
"clavis-android": "xiao",
"clavis-ios": "xiao",
"clavitor.ai": "emma",
"clavis-telemetry": "hans",
"operations": "hans",
"monitoring": "hans",
"noc": "hans",
"security": "victoria",
"architecture": "arthur",
"design": "luna",
"docs": "thomas",
"legal": "hugo",
"qa": "shakib",
"test": "shakib",
}
type Issue struct {
ID int `json:"id"`
Number int `json:"number"`
@ -44,13 +70,27 @@ type Issue struct {
UpdatedAt string `json:"updated_at"`
}
type PullRequest struct {
Number int `json:"number"`
Title string `json:"title"`
Head struct {
Ref string `json:"ref"`
} `json:"head"`
Base struct {
Ref string `json:"ref"`
} `json:"base"`
Labels []struct {
Name string `json:"name"`
} `json:"labels"`
}
type DispatchedTask struct {
IssueNumber int `json:"issue_number"`
Title string `json:"title"`
Agent string `json:"agent"`
Priority string `json:"priority"`
IssueNumber int `json:"issue_number"`
Title string `json:"title"`
Agent string `json:"agent"`
Priority string `json:"priority"`
DispatchedAt time.Time `json:"dispatched_at"`
TaskFile string `json:"task_file"`
TaskFile string `json:"task_file"`
}
// Global state
@ -60,10 +100,10 @@ type Dispatcher struct {
lastDispatch time.Time
token string
logger *log.Logger
activeAgents map[string]time.Time
}
func NewDispatcher() *Dispatcher {
// Setup logging to file and stdout
logFile, err := os.OpenFile(LogFile, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
if err != nil {
log.Fatal("Failed to open log file:", err)
@ -76,12 +116,12 @@ func NewDispatcher() *Dispatcher {
logger.Fatal("GITEA_TOKEN not set. Run: export GITEA_TOKEN=775a12730a65cbaf1673da048b7d01859b8b58e0")
}
// Ensure task directory exists
os.MkdirAll(TaskDir, 0755)
return &Dispatcher{
token: token,
logger: logger,
token: token,
logger: logger,
activeAgents: make(map[string]time.Time),
}
}
@ -117,6 +157,57 @@ func (d *Dispatcher) fetchOpenIssues() ([]Issue, error) {
return issues, nil
}
func (d *Dispatcher) fetchOpenPRs() []PullRequest {
url := fmt.Sprintf("%s/api/v1/repos/%s/%s/pulls?state=open", GiteaURL, RepoOwner, RepoName)
req, err := http.NewRequest("GET", url, nil)
if err != nil {
d.log("Failed to create PR request: %v", err)
return nil
}
req.Header.Set("Authorization", "token "+d.token)
resp, err := http.DefaultClient.Do(req)
if err != nil {
d.log("Failed to fetch PRs: %v", err)
return nil
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
body, _ := io.ReadAll(resp.Body)
d.log("PR fetch failed: HTTP %d: %s", resp.StatusCode, string(body))
return nil
}
var prs []PullRequest
if err := json.NewDecoder(resp.Body).Decode(&prs); err != nil {
d.log("Failed to decode PRs: %v", err)
return nil
}
return prs
}
func (d *Dispatcher) prHasLabel(pr PullRequest, label string) bool {
for _, l := range pr.Labels {
if l.Name == label {
return true
}
}
return false
}
func (d *Dispatcher) getAssignee(issue Issue) string {
if issue.Assignee != nil && issue.Assignee.Login != "" {
return issue.Assignee.Login
}
if len(issue.Assignees) > 0 && issue.Assignees[0].Login != "" {
return issue.Assignees[0].Login
}
return ""
}
func (d *Dispatcher) getPriority(issue Issue) string {
for _, label := range issue.Labels {
if label.Name == "critical" {
@ -129,112 +220,144 @@ func (d *Dispatcher) getPriority(issue Issue) string {
return "NORMAL"
}
func (d *Dispatcher) getAssignee(issue Issue) string {
if issue.Assignee != nil && issue.Assignee.Login != "" {
return issue.Assignee.Login
func (d *Dispatcher) findAgentForIssue(issue Issue) string {
for _, label := range issue.Labels {
if agent, ok := domainToAgent[label.Name]; ok {
return agent
}
}
if len(issue.Assignees) > 0 {
return issue.Assignees[0].Login
text := strings.ToLower(issue.Title + " " + issue.Body)
domains := []string{
"clavis-telemetry", "clavis-vault", "clavis-cli", "clavis-crypto",
"clavis-chrome", "clavis-firefox", "clavis-safari",
"clavis-android", "clavis-ios", "clavitor.ai",
"operations", "monitoring", "noc",
"security", "architecture", "design", "docs", "legal", "qa", "test",
}
for _, domain := range domains {
if strings.Contains(text, domain) {
if agent, ok := domainToAgent[domain]; ok {
return agent
}
}
}
return ""
}
func (d *Dispatcher) isAgentActive(agent string) bool {
d.mu.RLock()
defer d.mu.RUnlock()
_, isActive := d.activeAgents[agent]
return isActive
}
func (d *Dispatcher) taskFileExists(agent string, issueNum int) bool {
path := filepath.Join(TaskDir, agent, fmt.Sprintf("issue-%d.md", issueNum))
_, err := os.Stat(path)
return !os.IsNotExist(err)
}
func (d *Dispatcher) dispatchTask(issue Issue) error {
agent := d.getAssignee(issue)
func truncate(s string, maxLen int) string {
if len(s) <= maxLen {
return s
}
return s[:maxLen] + "..."
}
func (d *Dispatcher) spawnAgent(agent string) {
d.mu.Lock()
d.activeAgents[agent] = time.Now()
d.mu.Unlock()
d.log("Spawning agent: %s", agent)
// Check for agent-specific token
agentToken := d.token
tokensFile := "/home/johan/dev/clavitor/forge/dispatcher/agent-tokens.json"
if data, err := os.ReadFile(tokensFile); err == nil {
var tokens map[string]string
if json.Unmarshal(data, &tokens) == nil {
if token, ok := tokens[agent]; ok && token != "" {
agentToken = token
d.log("Using agent-specific token for %s", agent)
}
}
}
cmd := exec.Command("opencode", "run",
"--agent", agent,
"--dangerously-skip-permissions",
"Check Gitea for work assigned to "+agent)
cmd.Dir = WorkDir
cmd.Env = append(os.Environ(), "GITEA_TOKEN="+agentToken)
go func() {
output, err := cmd.CombinedOutput()
if err != nil {
d.log("Agent %s failed: %v\nOutput: %s", agent, err, string(output))
} else {
d.log("Agent %s completed", agent)
}
d.mu.Lock()
delete(d.activeAgents, agent)
d.mu.Unlock()
}()
}
func (d *Dispatcher) dispatchTask(issue Issue, agent string) error {
if agent == "" {
return nil // Skip unassigned
return fmt.Errorf("no agent specified")
}
// Check if already dispatched
if d.taskFileExists(agent, issue.Number) {
return nil // Already dispatched
return nil
}
// Rate limit: 1 per minute
d.mu.Lock()
timeSinceLast := time.Since(d.lastDispatch)
if timeSinceLast < time.Minute {
d.mu.Unlock()
return fmt.Errorf("rate limited: only %v since last dispatch", timeSinceLast)
return fmt.Errorf("rate limited")
}
d.lastDispatch = time.Now()
d.mu.Unlock()
priority := d.getPriority(issue)
// Create task file
agentDir := filepath.Join(TaskDir, agent)
os.MkdirAll(agentDir, 0755)
taskFile := filepath.Join(agentDir, fmt.Sprintf("issue-%d.md", issue.Number))
content := fmt.Sprintf(`# Agent Task Auto-Dispatched
content := fmt.Sprintf(`# Agent Task - Issue #%d
**Agent:** %s
**Issue:** #%d
**Title:** %s
**Priority:** %s
**Dispatched:** %s
**Title:** %s
---
## Instructions
## Your Instruction
1. Read CLAVITOR-AGENT-HANDBOOK.md completely (mandatory)
2. View issue: %s/%s/%s/issues/%d
3. Create branch: %s/fix-%d
4. Implement fix per issue requirements
5. Commit with: "fixes #%d"
6. Push and create PR
7. Add "needs-qa" label to PR
1. **Read QUICKSTART.md** (60 seconds):
/home/johan/dev/clavitor/QUICKSTART.md
2. **View this issue in Gitea:**
%s/%s/%s/issues/%d
3. **Execute per handbook Section III**
4. **Create branch:**
git checkout -b %s/fix-%d
5. **Implement fix** per issue spec
6. **Run daily review:**
./scripts/daily-review.sh (must pass)
7. **Commit with reference:**
git commit -m "telemetry: fix silent DB error. Fixes #%d"
8. **Push and create PR:**
git push -u origin %s/fix-%d
tea pulls create --title "%s: %s" --description "Fixes #%d"
9. **Wait for review** DO NOT merge your own PR
---
## Issue Context
## Issue Body
%s
---
*This task was auto-dispatched by the agent scheduler.*
*Dispatch time: %s*
`, agent, issue.Number, issue.Title, priority, time.Now().Format(time.RFC3339),
GiteaURL, RepoOwner, RepoName, issue.Number,
agent, issue.Number,
issue.Number,
agent, issue.Number,
agent, issue.Title, issue.Number,
truncate(issue.Body, 500),
time.Now().Format(time.RFC3339))
`, issue.Number, agent, priority, issue.Title, GiteaURL, RepoOwner, RepoName, issue.Number, agent, issue.Number, issue.Number, issue.Body)
if err := os.WriteFile(taskFile, []byte(content), 0644); err != nil {
return err
}
// Record in memory
task := DispatchedTask{
IssueNumber: issue.Number,
Title: issue.Title,
@ -248,30 +371,80 @@ func (d *Dispatcher) dispatchTask(issue Issue) error {
d.tasks = append(d.tasks, task)
d.mu.Unlock()
d.log("DISPATCHED: %s → Issue #%d (%s) [Priority: %s]", agent, issue.Number, issue.Title, priority)
d.log("DISPATCHED: %s → Issue #%d (%s) [%s]", agent, issue.Number, truncate(issue.Title, 50), priority)
return nil
}
func truncate(s string, maxLen int) string {
if len(s) <= maxLen {
return s
}
return s[:maxLen] + "..."
}
func (d *Dispatcher) pollAndDispatch() {
d.log("Polling Gitea for open issues...")
d.log("Polling Gitea for work...")
// 1. Check for PRs needing QA (Shakib)
qaPRs := d.fetchOpenPRs()
if qaPRs != nil {
d.log("Fetched %d open PRs", len(qaPRs))
}
var needsQA []PullRequest
for _, pr := range qaPRs {
if d.prHasLabel(pr, "needs-qa") && !d.prHasLabel(pr, "in-progress") {
needsQA = append(needsQA, pr)
}
}
if len(needsQA) > 0 && !d.isAgentActive("shakib") {
d.log("Assigning %d PRs to Shakib (QA):", len(needsQA))
for i, pr := range needsQA {
d.log(" %d. PR #%d: %s", i+1, pr.Number, truncate(pr.Title, 60))
}
d.spawnAgent("shakib")
}
// 2. Check for PRs needing security review (Victoria)
var needsSecurity []PullRequest
for _, pr := range qaPRs {
if d.prHasLabel(pr, "needs-review") &&
!d.prHasLabel(pr, "security-approved") &&
!d.prHasLabel(pr, "security-issues") {
needsSecurity = append(needsSecurity, pr)
}
}
if len(needsSecurity) > 0 && !d.isAgentActive("victoria") {
d.log("Assigning %d PRs to Victoria (Security Review):", len(needsSecurity))
for i, pr := range needsSecurity {
d.log(" %d. PR #%d: %s", i+1, pr.Number, truncate(pr.Title, 60))
}
d.spawnAgent("victoria")
}
// 3. Check for PRs needing final review (Yurii)
var needsFinal []PullRequest
for _, pr := range qaPRs {
if d.prHasLabel(pr, "needs-review") &&
d.prHasLabel(pr, "security-approved") &&
!d.prHasLabel(pr, "approved") {
needsFinal = append(needsFinal, pr)
}
}
if len(needsFinal) > 0 && !d.isAgentActive("yurii") {
d.log("Assigning %d PRs to Yurii (Final Review):", len(needsFinal))
for i, pr := range needsFinal {
d.log(" %d. PR #%d: %s", i+1, pr.Number, truncate(pr.Title, 60))
}
d.spawnAgent("yurii")
}
// 4. Check for issues assigned to engineers
issues, err := d.fetchOpenIssues()
if err != nil {
d.log("❌ Failed to fetch issues: %v", err)
d.log("Failed to fetch issues: %v", err)
return
}
d.log("Found %d open issues", len(issues))
// Sort by priority (CRITICAL first)
sort.Slice(issues, func(i, j int) bool {
pi := d.getPriority(issues[i])
pj := d.getPriority(issues[j])
@ -279,36 +452,42 @@ func (d *Dispatcher) pollAndDispatch() {
return priorityOrder[pi] < priorityOrder[pj]
})
// Try to dispatch one task
dispatched := false
for _, issue := range issues {
if d.getAssignee(issue) == "" {
agent := d.getAssignee(issue)
if agent == "" {
agent = d.findAgentForIssue(issue)
}
if agent == "" || agent == "shakib" || agent == "victoria" || agent == "yurii" {
continue
}
err := d.dispatchTask(issue)
if d.isAgentActive(agent) {
d.log("Agent %s is already working, skipping", agent)
continue
}
d.log("Assigning Issue #%d (%s) to %s", issue.Number, truncate(issue.Title, 50), agent)
err := d.dispatchTask(issue, agent)
if err != nil {
d.log("⚠️ Skipped Issue #%d: %v", issue.Number, err)
d.log("Skipped Issue #%d: %v", issue.Number, err)
continue
}
if err == nil {
dispatched = true
break // Only 1 per poll
}
dispatched = true
break
}
if !dispatched {
d.log(" No new tasks to dispatch (rate limit or all caught up)")
if !dispatched && len(needsQA) == 0 && len(needsSecurity) == 0 && len(needsFinal) == 0 {
d.log("No new work to dispatch (rate limit or all caught up)")
}
}
// Web UI handlers
func (d *Dispatcher) handleStatus(w http.ResponseWriter, r *http.Request) {
d.mu.RLock()
defer d.mu.RUnlock()
// Count tasks by agent
agentCounts := make(map[string]int)
for _, t := range d.tasks {
agentCounts[t.Agent]++
@ -323,110 +502,59 @@ func (d *Dispatcher) handleStatus(w http.ResponseWriter, r *http.Request) {
body { font-family: sans-serif; margin: 40px; background: #f5f5f5; }
h1 { color: #333; }
.status { background: white; padding: 20px; border-radius: 8px; margin: 20px 0; }
.task { background: #e8f5e9; padding: 10px; margin: 5px 0; border-radius: 4px; }
.critical { background: #ffebee; border-left: 4px solid #f44336; }
.high { background: #fff3e0; border-left: 4px solid #ff9800; }
table { width: 100%%; border-collapse: collapse; }
th, td { text-align: left; padding: 12px; border-bottom: 1px solid #ddd; }
th { background: #333; color: white; }
.refresh { color: #666; font-size: 0.9em; }
.log { font-family: monospace; background: #263238; color: #aed581; padding: 20px; border-radius: 4px; overflow-x: auto; }
</style>
<meta http-equiv="refresh" content="10">
</head>
<body>
<h1>🔧 Agent Dispatcher Status</h1>
<h1>Agent Dispatcher Status</h1>
<p class="refresh">Auto-refresh every 10 seconds | Last poll: %s</p>
<div class="status">
<h2>Overview</h2>
<p><strong>Total dispatched:</strong> %d tasks</p>
<p><strong>Last dispatch:</strong> %s</p>
<p><strong>Rate limit:</strong> 1 task per minute</p>
<p><strong>Task directory:</strong> %s</p>
<h3>By Agent</h3>
<p><strong>Active agents:</strong> %d</p>
<p><strong>Web UI:</strong> http://localhost:%s</p>
</div>
<div class="status">
<h2>Active Agents</h2>
<table>
<tr><th>Agent</th><th>Tasks Dispatched</th></tr>
`, time.Now().Format(time.RFC3339), len(d.tasks),
d.lastDispatch.Format(time.RFC3339), TaskDir)
<tr><th>Agent</th><th>Started</th></tr>
`, time.Now().Format(time.RFC3339), len(d.tasks), len(d.activeAgents), WebPort)
for agent, count := range agentCounts {
fmt.Fprintf(w, "<tr><td>%s</td><td>%d</td></tr>\n", agent, count)
for agent, startTime := range d.activeAgents {
fmt.Fprintf(w, `<tr><td>%s</td><td>%s</td></tr>`, agent, startTime.Format(time.RFC3339))
}
fmt.Fprintf(w, `
</table>
</div>
<div class="status">
<h2>Recent Dispatches</h2>
`)
// Show last 10 tasks (reverse order)
for i := len(d.tasks) - 1; i >= 0 && i >= len(d.tasks)-10; i-- {
t := d.tasks[i]
priorityClass := ""
if t.Priority == "CRITICAL" {
priorityClass = "critical"
} else if t.Priority == "HIGH" {
priorityClass = "high"
}
fmt.Fprintf(w, `
<div class="task %s">
<strong>#%d</strong> %s <span style="float:right">%s | %s</span><br>
<small>Agent: %s | File: %s</small>
</div>
`, priorityClass, t.IssueNumber, t.Title, t.Priority, t.DispatchedAt.Format("15:04:05"), t.Agent, t.TaskFile)
}
fmt.Fprintf(w, `
</div>
<div class="status">
<h2>Live Log (tail -f %s)</h2>
<pre class="log">`, LogFile)
// Read last 50 lines of log
logContent, _ := os.ReadFile(LogFile)
lines := strings.Split(string(logContent), "\n")
start := 0
if len(lines) > 50 {
start = len(lines) - 50
}
for i := start; i < len(lines); i++ {
fmt.Fprintf(w, "%s\n", lines[i])
}
fmt.Fprintf(w, `</pre>
</div>
</body>
</html>
`)
</html>`)
}
func (d *Dispatcher) handleTasks(w http.ResponseWriter, r *http.Request) {
agent := r.URL.Query().Get("agent")
if agent == "" {
http.Error(w, "Missing agent parameter", 400)
return
}
// List task files for this agent
agentDir := filepath.Join(TaskDir, agent)
files, err := os.ReadDir(agentDir)
if err != nil {
fmt.Fprintf(w, "No tasks for %s\n", agent)
return
}
d.mu.RLock()
defer d.mu.RUnlock()
w.Header().Set("Content-Type", "text/plain")
fmt.Fprintf(w, "Tasks for %s:\n\n", agent)
for _, f := range files {
if strings.HasSuffix(f.Name(), ".md") {
fmt.Fprintf(w, "- %s/%s\n", agentDir, f.Name())
fmt.Fprintf(w, "# Dispatched Tasks (%d total)\n\n", len(d.tasks))
for _, t := range d.tasks {
fmt.Fprintf(w, "- %s: Issue #%d (%s) [%s] at %s\n", t.Agent, t.IssueNumber, t.Title, t.Priority, t.DispatchedAt.Format(time.RFC3339))
}
agents, _ := os.ReadDir(TaskDir)
fmt.Fprintf(w, "\n# Agent Directories\n")
for _, agent := range agents {
if agent.IsDir() {
agentDir := filepath.Join(TaskDir, agent.Name())
files, _ := os.ReadDir(agentDir)
fmt.Fprintf(w, "- %s: %d files\n", agent.Name(), len(files))
}
}
}
@ -438,12 +566,9 @@ func main() {
d.log("Agent Dispatcher Starting")
d.log("Gitea: %s", GiteaURL)
d.log("Repo: %s/%s", RepoOwner, RepoName)
d.log("Task Dir: %s", TaskDir)
d.log("Web UI: http://localhost:%s", WebPort)
d.log("Rate Limit: 1 task per minute")
d.log("========================================")
// Start web server
go func() {
http.HandleFunc("/", d.handleStatus)
http.HandleFunc("/tasks", d.handleTasks)
@ -451,10 +576,8 @@ func main() {
log.Fatal(http.ListenAndServe(":"+WebPort, nil))
}()
// Immediate first poll
d.pollAndDispatch()
// Main loop
ticker := time.NewTicker(PollInterval)
defer ticker.Stop()