clavitor/forge/dispatcher/main.go

465 lines
11 KiB
Go
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package main
import (
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"os"
"path/filepath"
"sort"
"strings"
"sync"
"time"
)
// Config
const (
GiteaURL = "https://git.clavitor.ai"
RepoOwner = "johan"
RepoName = "clavitor"
PollInterval = 60 * time.Second
TaskDir = "/home/johan/dev/clavitor/.agent-tasks"
LogFile = "/home/johan/dev/clavitor/.agent-dispatcher.log"
WebPort = "8098"
)
type Issue struct {
ID int `json:"id"`
Number int `json:"number"`
Title string `json:"title"`
Body string `json:"body"`
State string `json:"state"`
Assignee *struct {
Login string `json:"login"`
} `json:"assignee"`
Assignees []struct {
Login string `json:"login"`
} `json:"assignees"`
Labels []struct {
Name string `json:"name"`
} `json:"labels"`
CreatedAt string `json:"created_at"`
UpdatedAt string `json:"updated_at"`
}
type DispatchedTask struct {
IssueNumber int `json:"issue_number"`
Title string `json:"title"`
Agent string `json:"agent"`
Priority string `json:"priority"`
DispatchedAt time.Time `json:"dispatched_at"`
TaskFile string `json:"task_file"`
}
// Global state
type Dispatcher struct {
mu sync.RWMutex
tasks []DispatchedTask
lastDispatch time.Time
token string
logger *log.Logger
}
func NewDispatcher() *Dispatcher {
// Setup logging to file and stdout
logFile, err := os.OpenFile(LogFile, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
if err != nil {
log.Fatal("Failed to open log file:", err)
}
logger := log.New(io.MultiWriter(os.Stdout, logFile), "[DISPATCHER] ", log.LstdFlags|log.Lmicroseconds)
token := os.Getenv("GITEA_TOKEN")
if token == "" {
logger.Fatal("GITEA_TOKEN not set. Run: export GITEA_TOKEN=775a12730a65cbaf1673da048b7d01859b8b58e0")
}
// Ensure task directory exists
os.MkdirAll(TaskDir, 0755)
return &Dispatcher{
token: token,
logger: logger,
}
}
func (d *Dispatcher) log(format string, v ...interface{}) {
d.logger.Printf(format, v...)
}
func (d *Dispatcher) fetchOpenIssues() ([]Issue, error) {
url := fmt.Sprintf("%s/api/v1/repos/%s/%s/issues?state=open&limit=100", GiteaURL, RepoOwner, RepoName)
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, err
}
req.Header.Set("Authorization", "token "+d.token)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
body, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("HTTP %d: %s", resp.StatusCode, string(body))
}
var issues []Issue
if err := json.NewDecoder(resp.Body).Decode(&issues); err != nil {
return nil, err
}
return issues, nil
}
func (d *Dispatcher) getPriority(issue Issue) string {
for _, label := range issue.Labels {
if label.Name == "critical" {
return "CRITICAL"
}
if label.Name == "high" {
return "HIGH"
}
}
return "NORMAL"
}
func (d *Dispatcher) getAssignee(issue Issue) string {
if issue.Assignee != nil && issue.Assignee.Login != "" {
return issue.Assignee.Login
}
if len(issue.Assignees) > 0 {
return issue.Assignees[0].Login
}
return ""
}
func (d *Dispatcher) taskFileExists(agent string, issueNum int) bool {
path := filepath.Join(TaskDir, agent, fmt.Sprintf("issue-%d.md", issueNum))
_, err := os.Stat(path)
return !os.IsNotExist(err)
}
func (d *Dispatcher) dispatchTask(issue Issue) error {
agent := d.getAssignee(issue)
if agent == "" {
return nil // Skip unassigned
}
// Check if already dispatched
if d.taskFileExists(agent, issue.Number) {
return nil // Already dispatched
}
// Rate limit: 1 per minute
d.mu.Lock()
timeSinceLast := time.Since(d.lastDispatch)
if timeSinceLast < time.Minute {
d.mu.Unlock()
return fmt.Errorf("rate limited: only %v since last dispatch", timeSinceLast)
}
d.lastDispatch = time.Now()
d.mu.Unlock()
priority := d.getPriority(issue)
// Create task file
agentDir := filepath.Join(TaskDir, agent)
os.MkdirAll(agentDir, 0755)
taskFile := filepath.Join(agentDir, fmt.Sprintf("issue-%d.md", issue.Number))
content := fmt.Sprintf(`# Agent Task — Auto-Dispatched
**Agent:** %s
**Issue:** #%d
**Title:** %s
**Priority:** %s
**Dispatched:** %s
---
## Your Instruction
1. **Read QUICKSTART.md** (60 seconds):
/home/johan/dev/clavitor/QUICKSTART.md
2. **View this issue in Gitea:**
%s/%s/%s/issues/%d
3. **Execute per handbook Section III**
4. **Create branch:**
git checkout -b %s/fix-%d
5. **Implement fix** per issue spec
6. **Run daily review:**
./scripts/daily-review.sh (must pass)
7. **Commit with reference:**
git commit -m "telemetry: fix silent DB error. Fixes #%d"
8. **Push and create PR:**
git push -u origin %s/fix-%d
tea pulls create --title "%s: %s" --description "Fixes #%d"
9. **Wait for review** — DO NOT merge your own PR
---
## Issue Context
%s
---
*This task was auto-dispatched by the agent scheduler.*
*Dispatch time: %s*
`, agent, issue.Number, issue.Title, priority, time.Now().Format(time.RFC3339),
GiteaURL, RepoOwner, RepoName, issue.Number,
agent, issue.Number,
issue.Number,
agent, issue.Number,
agent, issue.Title, issue.Number,
truncate(issue.Body, 500),
time.Now().Format(time.RFC3339))
if err := os.WriteFile(taskFile, []byte(content), 0644); err != nil {
return err
}
// Record in memory
task := DispatchedTask{
IssueNumber: issue.Number,
Title: issue.Title,
Agent: agent,
Priority: priority,
DispatchedAt: time.Now(),
TaskFile: taskFile,
}
d.mu.Lock()
d.tasks = append(d.tasks, task)
d.mu.Unlock()
d.log("✅ DISPATCHED: %s → Issue #%d (%s) [Priority: %s]", agent, issue.Number, issue.Title, priority)
return nil
}
func truncate(s string, maxLen int) string {
if len(s) <= maxLen {
return s
}
return s[:maxLen] + "..."
}
func (d *Dispatcher) pollAndDispatch() {
d.log("Polling Gitea for open issues...")
issues, err := d.fetchOpenIssues()
if err != nil {
d.log("❌ Failed to fetch issues: %v", err)
return
}
d.log("Found %d open issues", len(issues))
// Sort by priority (CRITICAL first)
sort.Slice(issues, func(i, j int) bool {
pi := d.getPriority(issues[i])
pj := d.getPriority(issues[j])
priorityOrder := map[string]int{"CRITICAL": 0, "HIGH": 1, "NORMAL": 2, "LOW": 3}
return priorityOrder[pi] < priorityOrder[pj]
})
// Try to dispatch one task
dispatched := false
for _, issue := range issues {
if d.getAssignee(issue) == "" {
continue
}
err := d.dispatchTask(issue)
if err != nil {
d.log("⚠️ Skipped Issue #%d: %v", issue.Number, err)
continue
}
if err == nil {
dispatched = true
break // Only 1 per poll
}
}
if !dispatched {
d.log(" No new tasks to dispatch (rate limit or all caught up)")
}
}
// Web UI handlers
func (d *Dispatcher) handleStatus(w http.ResponseWriter, r *http.Request) {
d.mu.RLock()
defer d.mu.RUnlock()
// Count tasks by agent
agentCounts := make(map[string]int)
for _, t := range d.tasks {
agentCounts[t.Agent]++
}
w.Header().Set("Content-Type", "text/html")
fmt.Fprintf(w, `<!DOCTYPE html>
<html>
<head>
<title>Agent Dispatcher Status</title>
<style>
body { font-family: sans-serif; margin: 40px; background: #f5f5f5; }
h1 { color: #333; }
.status { background: white; padding: 20px; border-radius: 8px; margin: 20px 0; }
.task { background: #e8f5e9; padding: 10px; margin: 5px 0; border-radius: 4px; }
.critical { background: #ffebee; border-left: 4px solid #f44336; }
.high { background: #fff3e0; border-left: 4px solid #ff9800; }
table { width: 100%%; border-collapse: collapse; }
th, td { text-align: left; padding: 12px; border-bottom: 1px solid #ddd; }
th { background: #333; color: white; }
.refresh { color: #666; font-size: 0.9em; }
.log { font-family: monospace; background: #263238; color: #aed581; padding: 20px; border-radius: 4px; overflow-x: auto; }
</style>
<meta http-equiv="refresh" content="10">
</head>
<body>
<h1>🔧 Agent Dispatcher Status</h1>
<p class="refresh">Auto-refresh every 10 seconds | Last poll: %s</p>
<div class="status">
<h2>Overview</h2>
<p><strong>Total dispatched:</strong> %d tasks</p>
<p><strong>Last dispatch:</strong> %s</p>
<p><strong>Rate limit:</strong> 1 task per minute</p>
<p><strong>Task directory:</strong> %s</p>
<h3>By Agent</h3>
<table>
<tr><th>Agent</th><th>Tasks Dispatched</th></tr>
`, time.Now().Format(time.RFC3339), len(d.tasks),
d.lastDispatch.Format(time.RFC3339), TaskDir)
for agent, count := range agentCounts {
fmt.Fprintf(w, "<tr><td>%s</td><td>%d</td></tr>\n", agent, count)
}
fmt.Fprintf(w, `
</table>
</div>
<div class="status">
<h2>Recent Dispatches</h2>
`)
// Show last 10 tasks (reverse order)
for i := len(d.tasks) - 1; i >= 0 && i >= len(d.tasks)-10; i-- {
t := d.tasks[i]
priorityClass := ""
if t.Priority == "CRITICAL" {
priorityClass = "critical"
} else if t.Priority == "HIGH" {
priorityClass = "high"
}
fmt.Fprintf(w, `
<div class="task %s">
<strong>#%d</strong> — %s <span style="float:right">%s | %s</span><br>
<small>Agent: %s | File: %s</small>
</div>
`, priorityClass, t.IssueNumber, t.Title, t.Priority, t.DispatchedAt.Format("15:04:05"), t.Agent, t.TaskFile)
}
fmt.Fprintf(w, `
</div>
<div class="status">
<h2>Live Log (tail -f %s)</h2>
<pre class="log">`, LogFile)
// Read last 50 lines of log
logContent, _ := os.ReadFile(LogFile)
lines := strings.Split(string(logContent), "\n")
start := 0
if len(lines) > 50 {
start = len(lines) - 50
}
for i := start; i < len(lines); i++ {
fmt.Fprintf(w, "%s\n", lines[i])
}
fmt.Fprintf(w, `</pre>
</div>
</body>
</html>
`)
}
func (d *Dispatcher) handleTasks(w http.ResponseWriter, r *http.Request) {
agent := r.URL.Query().Get("agent")
if agent == "" {
http.Error(w, "Missing agent parameter", 400)
return
}
// List task files for this agent
agentDir := filepath.Join(TaskDir, agent)
files, err := os.ReadDir(agentDir)
if err != nil {
fmt.Fprintf(w, "No tasks for %s\n", agent)
return
}
w.Header().Set("Content-Type", "text/plain")
fmt.Fprintf(w, "Tasks for %s:\n\n", agent)
for _, f := range files {
if strings.HasSuffix(f.Name(), ".md") {
fmt.Fprintf(w, "- %s/%s\n", agentDir, f.Name())
}
}
}
func main() {
d := NewDispatcher()
d.log("========================================")
d.log("Agent Dispatcher Starting")
d.log("Gitea: %s", GiteaURL)
d.log("Repo: %s/%s", RepoOwner, RepoName)
d.log("Task Dir: %s", TaskDir)
d.log("Web UI: http://localhost:%s", WebPort)
d.log("Rate Limit: 1 task per minute")
d.log("========================================")
// Start web server
go func() {
http.HandleFunc("/", d.handleStatus)
http.HandleFunc("/tasks", d.handleTasks)
d.log("Web UI available at http://localhost:%s", WebPort)
log.Fatal(http.ListenAndServe(":"+WebPort, nil))
}()
// Immediate first poll
d.pollAndDispatch()
// Main loop
ticker := time.NewTicker(PollInterval)
defer ticker.Stop()
for range ticker.C {
d.pollAndDispatch()
}
}