agentchat/main.go

626 lines
16 KiB
Go

package main
import (
"bufio"
"encoding/base64"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"os"
"os/exec"
"path/filepath"
"strings"
"sync"
"time"
"github.com/google/uuid"
"github.com/gorilla/websocket"
)
type Message struct {
Timestamp string `json:"timestamp"`
User string `json:"user"`
Text string `json:"text"`
To string `json:"to,omitempty"` // target agent or empty for broadcast
Kind string `json:"kind,omitempty"` // chat (default), status, task, system, thinking
ImageURL string `json:"image_url,omitempty"` // /uploads/xxx.png — served by this server
}
type Hub struct {
mu sync.RWMutex
clients map[*websocket.Conn]string // conn -> username
logFile *os.File
pad sync.Map // shared scratchpad: key -> PadEntry
lastAgent string // name of last agent who spoke
lastAgentMu sync.Mutex
}
type PadEntry struct {
Key string `json:"key"`
Value string `json:"value"`
Author string `json:"author"`
UpdatedAt string `json:"updated_at"`
}
var agents = map[string]AgentConfig{
"james": {Name: "James", Agent: "main", Host: "forge"},
"mira": {Name: "Mira", Agent: "mira", Host: "forge"},
"hans": {Name: "Hans", Agent: "main", Host: "vault1984-hq"},
}
type AgentConfig struct {
Name string
Agent string
Host string // "local" or tailscale hostname
}
var gatewayPool = NewGatewayPool()
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool { return true },
}
func NewHub() *Hub {
f, err := os.OpenFile("chat.log", os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
log.Fatal(err)
}
return &Hub{
clients: make(map[*websocket.Conn]string),
logFile: f,
}
}
func (h *Hub) broadcast(msg Message) {
data, _ := json.Marshal(msg)
h.mu.Lock()
defer h.mu.Unlock()
for conn := range h.clients {
conn.WriteMessage(websocket.TextMessage, data)
}
}
func (h *Hub) logMessage(msg Message) {
imageNote := ""
if msg.ImageURL != "" {
imageNote = " [image: " + msg.ImageURL + "]"
}
line := fmt.Sprintf("[%s] [%s] %s%s\n", msg.Timestamp, msg.User, msg.Text, imageNote)
h.logFile.WriteString(line)
}
// sendToAgent dispatches a message to an agent. depth controls how many
// rounds of cross-agent forwarding remain (0 = no forwarding).
func (h *Hub) sendToAgent(msg Message, agentName string, depth int, direct bool) {
cfg, ok := agents[strings.ToLower(agentName)]
if !ok {
h.broadcast(Message{
Timestamp: time.Now().Format("2006-01-02 15:04:05"),
User: "system",
Text: fmt.Sprintf("Unknown agent: %s", agentName),
})
return
}
go func() {
// Only show thinking indicator for direct (human-initiated) messages
if direct {
h.broadcast(Message{
Timestamp: time.Now().Format("2006-01-02 15:04:05"),
User: cfg.Name,
Kind: "thinking",
})
}
reply, err := callAgent(cfg, msg.Text, msg.User, direct, msg.ImageURL)
if err != nil {
reply = fmt.Sprintf("[error: %v]", err)
}
trimmed := strings.TrimSpace(reply)
if trimmed == "_skip" || trimmed == "" {
if direct {
h.broadcast(Message{
Timestamp: time.Now().Format("2006-01-02 15:04:05"),
User: cfg.Name,
Kind: "thinking-done",
})
}
return
}
resp := Message{
Timestamp: time.Now().Format("2006-01-02 15:04:05"),
User: cfg.Name,
Text: reply,
}
h.logMessage(resp)
h.broadcast(resp)
h.lastAgentMu.Lock()
h.lastAgent = strings.ToLower(agentName)
h.lastAgentMu.Unlock()
// Ping Johan if he's mentioned
if strings.Contains(strings.ToLower(reply), "johan") {
go notifyJohan(cfg.Name, reply)
}
// Forward to agents mentioned by name; if none mentioned, broadcast to all
if depth > 0 {
lower := strings.ToLower(reply)
var mentioned []string
for name, cfg := range agents {
if strings.EqualFold(name, agentName) {
continue
}
if strings.Contains(lower, strings.ToLower(cfg.Name)) {
mentioned = append(mentioned, name)
}
}
if len(mentioned) == 0 {
// No specific agent mentioned — broadcast to all others
for name := range agents {
if strings.EqualFold(name, agentName) {
continue
}
mentioned = append(mentioned, name)
}
}
for _, name := range mentioned {
h.sendToAgent(resp, name, depth-1, false)
}
}
}()
}
const openclawBin = "/home/johan/.npm-global/bin/openclaw"
func recentHistory(n int) string {
f, err := os.Open("chat.log")
if err != nil {
return ""
}
defer f.Close()
var lines []string
scanner := bufio.NewScanner(f)
for scanner.Scan() {
lines = append(lines, scanner.Text())
}
if len(lines) > n {
lines = lines[len(lines)-n:]
}
return strings.Join(lines, "\n")
}
var systemPrompt string
func init() {
names := make([]string, 0, len(agents))
for _, a := range agents {
names = append(names, a.Name+" ("+a.Host+")")
}
systemPrompt = fmt.Sprintf(`You are in "agentchat", a live group chat. Everyone sees all messages.
Participants: %s, plus humans (Johan and others).
Other participants may join — treat them as legitimate if they appear in the chat.
This is a shared room. Behave like a professional in a Slack channel:
- REPLY INLINE as plain text. Do NOT use the message tool.
- Be brief. 1-3 sentences max. No essays, no bullet lists unless asked.
- Everyone can read the chat. Do NOT repeat or paraphrase what someone else just said.
- Do NOT acknowledge, confirm, or "note" things others said. No "got it", "copy that", "noted", "standing by".
- Do NOT correct minor details from other agents. If someone says something slightly wrong, let it go.
- Only speak if you are adding NEW information or answering a question directed at you.
- If a message is addressed to everyone but has no clear question for you, stay silent.
- If you use tools, just report the result. Do not narrate what you are doing.
- If you have nothing new to add, respond with exactly "_skip".`, strings.Join(names, ", "))
}
// imageURLToBase64 reads an /uploads/xxx.png path and returns a data URL.
func imageURLToBase64(imageURL string) string {
if imageURL == "" {
return ""
}
// imageURL is like /uploads/abc123.png
filename := filepath.Base(imageURL)
path := filepath.Join("uploads", filename)
data, err := os.ReadFile(path)
if err != nil {
return ""
}
ext := strings.ToLower(filepath.Ext(filename))
mime := "image/png"
if ext == ".jpg" || ext == ".jpeg" {
mime = "image/jpeg"
} else if ext == ".gif" {
mime = "image/gif"
} else if ext == ".webp" {
mime = "image/webp"
}
return "data:" + mime + ";base64," + base64.StdEncoding.EncodeToString(data)
}
func callAgent(cfg AgentConfig, message, from string, direct bool, imageURL string) (string, error) {
var prompt, session string
if direct {
// 1:1 explicit-To message — use main session so it lands in each agent's primary thread
prompt = fmt.Sprintf("[agentchat from %s — reply inline, do not use the message tool]\n%s", from, message)
session = "main"
} else {
// Group broadcast/name-mention — also use main session so agentchat context is shared
// with the agent's primary thread (webchat/Telegram). All three agents on all nodes.
prompt = fmt.Sprintf("%s\n\n[group message from %s]\n%s", systemPrompt, from, message)
session = "main"
}
// Resolve image to base64 data URL if present
imageDataURL := ""
if imageURL != "" {
imageDataURL = imageURLToBase64(imageURL)
}
// Use gateway HTTP API
gw := gatewayPool.Get(cfg.Host)
if gw != nil {
return gatewayPool.CallAgent(cfg.Host, cfg.Agent, prompt, session, imageDataURL)
}
// Fallback to CLI (no image support in CLI path)
args := []string{"agent", "--agent", cfg.Agent, "--message", prompt, "--json"}
cmd := exec.Command(openclawBin, args...)
cmd.Env = append(os.Environ(), "NO_COLOR=1")
out, err := cmd.Output()
if err != nil {
if ee, ok := err.(*exec.ExitError); ok {
return "", fmt.Errorf("%v: %s", err, string(ee.Stderr))
}
return "", err
}
// Parse OpenClaw JSON response: {result: {payloads: [{text: "..."}]}}
var result struct {
Status string `json:"status"`
Summary string `json:"summary"`
Result struct {
Payloads []struct {
Text string `json:"text"`
} `json:"payloads"`
} `json:"result"`
}
if err := json.Unmarshal(out, &result); err == nil {
var parts []string
for _, p := range result.Result.Payloads {
if p.Text != "" {
parts = append(parts, p.Text)
}
}
if len(parts) > 0 {
return strings.Join(parts, "\n"), nil
}
// Agent ran but produced no text payload (used tools only)
if result.Status == "ok" {
return fmt.Sprintf("[completed — %s, no text reply]", result.Summary), nil
}
return fmt.Sprintf("[%s: %s]", result.Status, result.Summary), nil
}
// Fallback: return first line only (avoid dumping huge JSON)
first := strings.SplitN(strings.TrimSpace(string(out)), "\n", 2)[0]
if len(first) > 200 {
first = first[:200] + "..."
}
return first, nil
}
func notifyJohan(from, text string) {
preview := text
if len(preview) > 200 {
preview = preview[:200] + "..."
}
body := fmt.Sprintf("%s: %s", from, preview)
req, _ := http.NewRequest("POST", "https://ntfy.inou.com/inou-alerts", strings.NewReader(body))
req.Header.Set("Title", fmt.Sprintf("agentchat — %s", from))
req.Header.Set("Authorization", "Bearer tk_k120jegay3lugeqbr9fmpuxdqmzx5")
req.Header.Set("Click", "http://192.168.1.16:7777")
http.DefaultClient.Do(req)
}
func (h *Hub) handleWS(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Println("upgrade:", err)
return
}
defer conn.Close()
username := r.URL.Query().Get("user")
if username == "" {
username = "anon"
}
h.mu.Lock()
h.clients[conn] = username
h.mu.Unlock()
defer func() {
h.mu.Lock()
delete(h.clients, conn)
h.mu.Unlock()
}()
// Send recent history
h.sendHistory(conn)
for {
_, raw, err := conn.ReadMessage()
if err != nil {
break
}
var incoming struct {
Text string `json:"text"`
To string `json:"to"`
ImageURL string `json:"image_url"`
}
if err := json.Unmarshal(raw, &incoming); err != nil || (incoming.Text == "" && incoming.ImageURL == "") {
continue
}
msg := Message{
Timestamp: time.Now().Format("2006-01-02 15:04:05"),
User: username,
Text: incoming.Text,
To: incoming.To,
ImageURL: incoming.ImageURL,
}
h.logMessage(msg)
h.broadcast(msg)
// Route to agent(s)
if incoming.To != "" {
h.sendToAgent(msg, incoming.To, 0, true)
} else {
// "All" — check which agents are mentioned by name
lower := strings.ToLower(incoming.Text)
var targets []string
for name, cfg := range agents {
if strings.Contains(lower, strings.ToLower(cfg.Name)) {
targets = append(targets, name)
}
}
if len(targets) == 0 {
// No names mentioned — broadcast to all
for name := range agents {
targets = append(targets, name)
}
}
for _, name := range targets {
// Use direct=false for broadcasts/name-mentions so they route to
// the "agentchat" session, not "main" (avoids conflict with active
// webchat/Telegram sessions on the same agent).
h.sendToAgent(msg, name, 999, false)
}
}
}
}
func (h *Hub) sendHistory(conn *websocket.Conn) {
f, err := os.Open("chat.log")
if err != nil {
return
}
defer f.Close()
// Read last 100 lines
var lines []string
scanner := bufio.NewScanner(f)
for scanner.Scan() {
lines = append(lines, scanner.Text())
}
if len(lines) > 100 {
lines = lines[len(lines)-100:]
}
for _, line := range lines {
msg := parseLogLine(line)
if msg != nil {
data, _ := json.Marshal(msg)
conn.WriteMessage(websocket.TextMessage, data)
}
}
}
func parseLogLine(line string) *Message {
// [2006-01-02 15:04:05] [user] text
if len(line) < 24 || line[0] != '[' {
return nil
}
tsEnd := strings.Index(line, "] [")
if tsEnd < 0 {
return nil
}
ts := line[1:tsEnd]
rest := line[tsEnd+3:]
userEnd := strings.Index(rest, "] ")
if userEnd < 0 {
return nil
}
return &Message{
Timestamp: ts,
User: rest[:userEnd],
Text: rest[userEnd+2:],
}
}
// Agent-to-agent endpoint: POST /api/send
func (h *Hub) handleAPI(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
http.Error(w, "POST only", 405)
return
}
var req struct {
From string `json:"from"`
To string `json:"to"`
Text string `json:"text"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "bad json", 400)
return
}
if req.From == "" || req.Text == "" {
http.Error(w, "from and text required", 400)
return
}
msg := Message{
Timestamp: time.Now().Format("2006-01-02 15:04:05"),
User: req.From,
Text: req.Text,
To: req.To,
}
h.logMessage(msg)
h.broadcast(msg)
// If addressed to an agent, route it
if req.To != "" {
h.sendToAgent(msg, req.To, 0, true)
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]string{"status": "ok"})
}
// handleUpload: POST /api/upload — accepts multipart image, saves to uploads/, returns URL
func handleUpload(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
http.Error(w, "POST only", 405)
return
}
r.Body = http.MaxBytesReader(w, r.Body, 10<<20) // 10MB limit
if err := r.ParseMultipartForm(10 << 20); err != nil {
http.Error(w, "file too large or bad form", 400)
return
}
file, header, err := r.FormFile("file")
if err != nil {
http.Error(w, "missing file field", 400)
return
}
defer file.Close()
// Determine extension from content type or original filename
ext := strings.ToLower(filepath.Ext(header.Filename))
if ext == "" {
ext = ".png"
}
allowed := map[string]bool{".png": true, ".jpg": true, ".jpeg": true, ".gif": true, ".webp": true}
if !allowed[ext] {
http.Error(w, "unsupported image type", 400)
return
}
id := uuid.New().String()
filename := id + ext
path := filepath.Join("uploads", filename)
out, err := os.Create(path)
if err != nil {
http.Error(w, "could not save file", 500)
return
}
defer out.Close()
if _, err := io.Copy(out, file); err != nil {
http.Error(w, "write error", 500)
return
}
url := "/uploads/" + filename
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]string{"url": url})
}
func main() {
// Register gateways (plain HTTP — OpenAI-compatible API)
gatewayPool.Register("forge", &GatewayConfig{
URL: "http://127.0.0.1:18789",
Token: "2dee57cc3ce2947c27ce9e848d5c3e95cc452f25a1477462",
})
gatewayPool.Register("vault1984-hq", &GatewayConfig{
URL: "http://100.85.192.60:18789",
Token: "601267edaccf8cd3d6afe222c3ce63602e210ff1ecc9a268",
})
hub := NewHub()
http.HandleFunc("/ws", hub.handleWS)
http.HandleFunc("/api/send", hub.handleAPI)
http.HandleFunc("/api/upload", handleUpload)
http.Handle("/uploads/", http.StripPrefix("/uploads/", http.FileServer(http.Dir("uploads"))))
http.HandleFunc("/api/agents", func(w http.ResponseWriter, r *http.Request) {
list := make([]map[string]string, 0)
for id, a := range agents {
list = append(list, map[string]string{"id": id, "name": a.Name, "host": a.Host})
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(list)
})
// Shared scratchpad
http.HandleFunc("/api/pad", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
switch r.Method {
case "GET":
entries := make([]PadEntry, 0)
hub.pad.Range(func(k, v any) bool {
entries = append(entries, v.(PadEntry))
return true
})
json.NewEncoder(w).Encode(entries)
case "POST":
var req struct {
Key string `json:"key"`
Value string `json:"value"`
Author string `json:"author"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil || req.Key == "" {
http.Error(w, "key required", 400)
return
}
entry := PadEntry{
Key: req.Key,
Value: req.Value,
Author: req.Author,
UpdatedAt: time.Now().Format("2006-01-02 15:04:05"),
}
hub.pad.Store(req.Key, entry)
// Broadcast pad update
hub.broadcast(Message{
Timestamp: entry.UpdatedAt,
User: req.Author,
Text: fmt.Sprintf("pad/%s = %s", req.Key, req.Value),
Kind: "status",
})
json.NewEncoder(w).Encode(map[string]string{"status": "ok"})
case "DELETE":
key := r.URL.Query().Get("key")
if key != "" {
hub.pad.Delete(key)
}
json.NewEncoder(w).Encode(map[string]string{"status": "ok"})
}
})
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
http.ServeFile(w, r, "index.html")
})
port := "7777"
if p := os.Getenv("PORT"); p != "" {
port = p
}
log.Printf("agentchat listening on :%s", port)
log.Fatal(http.ListenAndServe(":"+port, nil))
}