vault1984-dashboard/chat-ws.go

534 lines
14 KiB
Go

package main
import (
"encoding/json"
"fmt"
"log"
"net/http"
"strings"
"sync"
"time"
"github.com/gorilla/websocket"
)
const (
listenAddr = "100.85.192.60:1985"
gatewayURL = "ws://127.0.0.1:18789/"
gatewayToken = "601267edaccf8cd3d6afe222c3ce63602e210ff1ecc9a268"
sessionKey = "agent:main:main"
)
// ---------------------------------------------------------------------------
// Gateway connection
// ---------------------------------------------------------------------------
type Gateway struct {
mu sync.Mutex
conn *websocket.Conn
connected bool
// Pending requests: reqId/runId → channel that receives gateway frames
pending map[string]chan map[string]interface{}
pendingMu sync.Mutex
}
var gw = &Gateway{pending: make(map[string]chan map[string]interface{})}
func (g *Gateway) register(key string) chan map[string]interface{} {
ch := make(chan map[string]interface{}, 128)
g.pendingMu.Lock()
g.pending[key] = ch
g.pendingMu.Unlock()
return ch
}
func (g *Gateway) unregister(key string) {
g.pendingMu.Lock()
delete(g.pending, key)
g.pendingMu.Unlock()
}
func (g *Gateway) dispatch(key string, msg map[string]interface{}) bool {
g.pendingMu.Lock()
ch, ok := g.pending[key]
g.pendingMu.Unlock()
if ok {
select {
case ch <- msg:
default: // drop if full
}
}
return ok
}
func (g *Gateway) send(msg interface{}) error {
g.mu.Lock()
defer g.mu.Unlock()
if g.conn == nil {
return fmt.Errorf("not connected")
}
return g.conn.WriteMessage(websocket.TextMessage, mustJSON(msg))
}
// connect dials the gateway, performs the challenge/connect handshake,
// then enters a read loop that dispatches incoming frames.
func (g *Gateway) connect() {
for {
conn, _, err := websocket.DefaultDialer.Dial(gatewayURL, nil)
if err != nil {
log.Printf("[gw] dial: %v", err)
time.Sleep(3 * time.Second)
continue
}
// 1. Read challenge
var challenge map[string]interface{}
if err := conn.ReadJSON(&challenge); err != nil {
log.Printf("[gw] challenge read: %v", err)
conn.Close()
time.Sleep(3 * time.Second)
continue
}
// 2. Send connect request
connectReq := map[string]interface{}{
"type": "req",
"id": fmt.Sprintf("conn-%d", time.Now().UnixMilli()),
"method": "connect",
"params": map[string]interface{}{
"minProtocol": 3,
"maxProtocol": 3,
"client": map[string]interface{}{
"id": "vault1984-chat",
"version": "2026.3.2",
"platform": "linux",
"mode": "cli",
},
"auth": map[string]string{"token": gatewayToken},
"scopes": []string{"operator.admin"},
},
}
conn.WriteJSON(connectReq)
// 3. Read connect response
conn.SetReadDeadline(time.Now().Add(10 * time.Second))
var resp map[string]interface{}
if err := conn.ReadJSON(&resp); err != nil || resp["ok"] != true {
log.Printf("[gw] handshake failed: %v resp=%v", err, resp)
conn.Close()
time.Sleep(3 * time.Second)
continue
}
log.Println("[gw] connected")
g.mu.Lock()
g.conn = conn
g.connected = true
g.mu.Unlock()
// Keep-alive pinger
go func() {
for g.connected {
time.Sleep(25 * time.Second)
g.mu.Lock()
if g.conn != nil {
g.conn.WriteMessage(websocket.PingMessage, nil)
}
g.mu.Unlock()
}
}()
// 4. Read loop — route frames to waiting callers
for {
conn.SetReadDeadline(time.Now().Add(90 * time.Second))
var frame map[string]interface{}
if err := conn.ReadJSON(&frame); err != nil {
log.Printf("[gw] read: %v", err)
break
}
switch frame["type"] {
case "res":
// Response to a request — dispatch by id
if id, _ := frame["id"].(string); id != "" {
g.dispatch(id, frame)
}
case "event":
event, _ := frame["event"].(string)
if event == "chat" {
if p, ok := frame["payload"].(map[string]interface{}); ok {
if runId, _ := p["runId"].(string); runId != "" {
g.dispatch(runId, p)
}
}
}
// Ignore health, tick, presence, etc.
}
}
// Disconnected — reset and retry
g.mu.Lock()
g.conn = nil
g.connected = false
g.mu.Unlock()
log.Println("[gw] disconnected, reconnecting...")
time.Sleep(2 * time.Second)
}
}
// Chat sends a message to the agent and collects the streamed response.
func (g *Gateway) Chat(text string) (string, error) {
reqId := fmt.Sprintf("req-%d", time.Now().UnixMilli())
// Register to receive the ack (keyed by reqId)
ch := g.register(reqId)
defer g.unregister(reqId)
// Send chat.send
err := g.send(map[string]interface{}{
"type": "req",
"id": reqId,
"method": "chat.send",
"params": map[string]interface{}{
"sessionKey": sessionKey,
"message": text,
"idempotencyKey": reqId,
},
})
if err != nil {
return "", err
}
timeout := time.After(120 * time.Second)
var buf strings.Builder
runRegistered := false
for {
select {
case <-timeout:
if buf.Len() > 0 {
return buf.String(), nil
}
return "", fmt.Errorf("timeout")
case msg := <-ch:
// Handle ack response to chat.send
if msg["type"] == "res" {
if msg["ok"] != true {
errObj, _ := msg["error"].(map[string]interface{})
errMsg, _ := errObj["message"].(string)
return "", fmt.Errorf("gateway: %s", errMsg)
}
// Extract runId and register for chat events
if payload, ok := msg["payload"].(map[string]interface{}); ok {
if runId, _ := payload["runId"].(string); runId != "" && !runRegistered {
g.pendingMu.Lock()
g.pending[runId] = ch // reuse same channel
g.pendingMu.Unlock()
defer g.unregister(runId)
runRegistered = true
}
}
continue
}
// Handle chat event (delta / final / error)
state, _ := msg["state"].(string)
if state == "delta" {
for _, text := range extractText(msg) {
buf.WriteString(text)
}
}
if state == "final" {
// If we got deltas, use them; otherwise use the final message
if buf.Len() == 0 {
for _, text := range extractText(msg) {
buf.WriteString(text)
}
}
result := strings.TrimSpace(buf.String())
if result == "" {
return "[no response]", nil
}
return result, nil
}
if state == "error" {
errMsg, _ := msg["errorMessage"].(string)
if buf.Len() > 0 {
return buf.String(), nil
}
return "", fmt.Errorf("agent error: %s", errMsg)
}
}
}
}
// extractText pulls text parts from a chat event message payload.
func extractText(evt map[string]interface{}) []string {
msg, ok := evt["message"].(map[string]interface{})
if !ok {
return nil
}
parts, ok := msg["content"].([]interface{})
if !ok {
return nil
}
var texts []string
for _, p := range parts {
if m, ok := p.(map[string]interface{}); ok {
if t, ok := m["text"].(string); ok {
texts = append(texts, t)
}
}
}
return texts
}
// ---------------------------------------------------------------------------
// Chat message store
// ---------------------------------------------------------------------------
type Message struct {
ID int64 `json:"id"`
Sender string `json:"sender"`
SenderType string `json:"sender_type"`
Content string `json:"content"`
Timestamp int64 `json:"timestamp"`
}
var (
messages []Message
msgMu sync.RWMutex
uiClients []*websocket.Conn
uiMu sync.Mutex
)
func addMessage(m Message) {
msgMu.Lock()
messages = append(messages, m)
if len(messages) > 200 {
messages = messages[len(messages)-200:]
}
msgMu.Unlock()
broadcastUI(m)
}
func broadcastUI(m Message) {
data := mustJSON(m)
uiMu.Lock()
defer uiMu.Unlock()
alive := make([]*websocket.Conn, 0, len(uiClients))
for _, c := range uiClients {
if err := c.WriteMessage(websocket.TextMessage, data); err == nil {
alive = append(alive, c)
}
}
uiClients = alive
}
// ---------------------------------------------------------------------------
// HTTP handlers
// ---------------------------------------------------------------------------
var wsUpgrader = websocket.Upgrader{CheckOrigin: func(r *http.Request) bool { return true }}
func handleWS(w http.ResponseWriter, r *http.Request) {
conn, err := wsUpgrader.Upgrade(w, r, nil)
if err != nil {
return
}
// Add to broadcast list
uiMu.Lock()
uiClients = append(uiClients, conn)
uiMu.Unlock()
// Read messages from the UI
for {
_, data, err := conn.ReadMessage()
if err != nil {
break
}
var incoming struct {
Content string `json:"content"`
Sender string `json:"sender"`
}
json.Unmarshal(data, &incoming)
if incoming.Content == "" {
continue
}
if incoming.Sender == "" {
incoming.Sender = "Johan"
}
userMsg := Message{
ID: time.Now().UnixMilli(),
Sender: incoming.Sender,
SenderType: "human",
Content: incoming.Content,
Timestamp: time.Now().UnixMilli(),
}
addMessage(userMsg)
// Get agent response
go func(text string) {
response, err := gw.Chat(text)
if err != nil {
response = fmt.Sprintf("[error: %v]", err)
}
addMessage(Message{
ID: time.Now().UnixMilli(),
Sender: "Hans",
SenderType: "ai",
Content: response,
Timestamp: time.Now().UnixMilli(),
})
}(incoming.Content)
}
}
func handleAPI(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "POST only", 405)
return
}
var msg Message
json.NewDecoder(r.Body).Decode(&msg)
if msg.Content == "" {
http.Error(w, "content required", 400)
return
}
if msg.Sender == "" {
msg.Sender = "Johan"
}
msg.SenderType = "human"
msg.Timestamp = time.Now().UnixMilli()
msg.ID = msg.Timestamp
addMessage(msg)
go func() {
response, err := gw.Chat(msg.Content)
if err != nil {
response = fmt.Sprintf("[error: %v]", err)
}
addMessage(Message{
ID: time.Now().UnixMilli(),
Sender: "Hans",
SenderType: "ai",
Content: response,
Timestamp: time.Now().UnixMilli(),
})
}()
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]bool{"ok": true})
}
func handleMessages(w http.ResponseWriter, r *http.Request) {
msgMu.RLock()
defer msgMu.RUnlock()
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(messages)
}
func handleStatus(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]interface{}{
"gateway": gw.connected,
"session": sessionKey,
})
}
// ---------------------------------------------------------------------------
// Main
// ---------------------------------------------------------------------------
func main() {
go gw.connect()
// Wait for gateway
for !gw.connected {
time.Sleep(200 * time.Millisecond)
}
log.Printf("Vault1984 Chat ready — http://%s", listenAddr)
http.HandleFunc("/", serveHTML)
http.HandleFunc("/ws", handleWS)
http.HandleFunc("/api/send", handleAPI)
http.HandleFunc("/api/messages", handleMessages)
http.HandleFunc("/api/status", handleStatus)
log.Fatal(http.ListenAndServe(listenAddr, nil))
}
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
func mustJSON(v interface{}) []byte {
b, _ := json.Marshal(v)
return b
}
// ---------------------------------------------------------------------------
// Embedded UI
// ---------------------------------------------------------------------------
func serveHTML(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/html")
w.Write([]byte(`<!DOCTYPE html>
<html><head><meta charset="UTF-8"><meta name="viewport" content="width=device-width,initial-scale=1">
<title>Vault1984 Chat</title>
<style>
*{margin:0;padding:0;box-sizing:border-box}
body{font-family:-apple-system,BlinkMacSystemFont,sans-serif;background:#050505;color:#e0e0e0;height:100vh;display:flex;justify-content:center}
.container{width:100%;max-width:640px;display:flex;flex-direction:column;height:100vh;border-left:1px solid #1a1a1a;border-right:1px solid #1a1a1a}
header{padding:16px 20px;border-bottom:1px solid #1a1a1a}
header h1{font-size:16px;color:#00ff88;letter-spacing:2px}
#chat{flex:1;overflow-y:auto;padding:20px;display:flex;flex-direction:column;gap:12px}
.msg{max-width:85%;padding:10px 14px;border-radius:8px;font-size:14px;line-height:1.5;white-space:pre-wrap;word-wrap:break-word}
.msg .who{font-size:11px;color:#666;margin-bottom:4px}
.msg.human{align-self:flex-end;background:#2a1a1a;border-right:3px solid #ff6b6b}
.msg.ai{align-self:flex-start;background:#1a2a1a;border-left:3px solid #00ff88}
#bar{padding:16px 20px;border-top:1px solid #1a1a1a;display:flex;gap:10px}
#bar input{flex:1;background:#0d0d0d;border:1px solid #333;color:#e0e0e0;padding:10px 14px;border-radius:8px;font-size:14px;outline:none}
#bar input:focus{border-color:#00ff88}
#bar button{background:#00ff88;color:#0a0a0a;border:none;padding:10px 20px;border-radius:8px;font-weight:700;font-size:14px;cursor:pointer}
</style></head>
<body><div class="container">
<header><h1>&#128274; VAULT1984 // CHAT</h1></header>
<div id="chat"></div>
<div id="bar"><input id="inp" placeholder="Type..." autofocus><button onclick="send()">SEND</button></div>
<script>
const chat=document.getElementById('chat'),inp=document.getElementById('inp');
let ws,reconn;
function connect(){
ws=new WebSocket('ws://'+location.host+'/ws');
ws.onmessage=e=>{const m=JSON.parse(e.data);show(m)};
ws.onclose=()=>{clearTimeout(reconn);reconn=setTimeout(connect,2000)};
ws.onerror=()=>ws.close();
}
function show(m){
const d=document.createElement('div');
d.className='msg '+(m.sender_type||'system');
d.innerHTML='<div class="who">'+esc(m.sender)+'</div>'+esc(m.content);
chat.appendChild(d);
chat.scrollTop=chat.scrollHeight;
}
function esc(s){return(s||'').replace(/&/g,'&amp;').replace(/</g,'&lt;').replace(/>/g,'&gt;')}
function send(){
const v=inp.value.trim();if(!v||!ws||ws.readyState!==1)return;
ws.send(JSON.stringify({content:v,sender:'Johan'}));
inp.value='';
}
inp.addEventListener('keydown',e=>{if(e.key==='Enter')send()});
connect();
// Load history
fetch('/api/messages').then(r=>r.json()).then(msgs=>{if(msgs)msgs.forEach(show)});
</script>
</div></body></html>`))
}