chore: auto-commit uncommitted changes
This commit is contained in:
parent
ee0bade365
commit
90485d381d
|
|
@ -48,3 +48,12 @@ webhook:
|
|||
enabled: true
|
||||
url: http://localhost:18789/hooks/messages
|
||||
token: "kuma-alert-token-2026"
|
||||
|
||||
triage:
|
||||
enabled: true
|
||||
prompt_file: /home/johan/clawd/config/email-triage-prompt.md
|
||||
provider:
|
||||
base_url: https://api.fireworks.ai/inference/v1
|
||||
api_key: ${FIREWORKS_API_KEY}
|
||||
model: accounts/fireworks/models/kimi-k2p5
|
||||
dashboard_url: http://localhost:9202/api/alerts
|
||||
|
|
|
|||
32
main.go
32
main.go
|
|
@ -25,6 +25,7 @@ type Config struct {
|
|||
Accounts map[string]AccountConfig `yaml:"accounts"`
|
||||
Connectors ConnectorsConfig `yaml:"connectors"`
|
||||
Webhook WebhookConfig `yaml:"webhook"`
|
||||
Triage TriageConfig `yaml:"triage"`
|
||||
}
|
||||
|
||||
type ServerConfig struct {
|
||||
|
|
@ -169,13 +170,38 @@ func main() {
|
|||
log.Printf("Registered SMS connector: %s -> %s", name, gatewayURL)
|
||||
}
|
||||
|
||||
// Start all connectors with webhook callback
|
||||
webhookCallback := func() {
|
||||
// Log triage config
|
||||
if config.Triage.Enabled {
|
||||
log.Printf("Email triage enabled (model: %s, prompt: %s)", config.Triage.Provider.Model, config.Triage.PromptFile)
|
||||
}
|
||||
|
||||
// Start connectors with appropriate callbacks
|
||||
// Email connectors: triage first, only webhook on escalation
|
||||
// Other connectors: webhook directly
|
||||
emailCallback := func() {
|
||||
if !config.Webhook.Enabled {
|
||||
return
|
||||
}
|
||||
if config.Triage.Enabled {
|
||||
shouldEscalate := false
|
||||
for name := range config.Accounts {
|
||||
if triageNewEmails(name, config.Triage) {
|
||||
shouldEscalate = true
|
||||
}
|
||||
}
|
||||
if shouldEscalate {
|
||||
sendWebhook()
|
||||
}
|
||||
} else {
|
||||
sendWebhook()
|
||||
}
|
||||
}
|
||||
directCallback := func() {
|
||||
if config.Webhook.Enabled {
|
||||
sendWebhook()
|
||||
}
|
||||
}
|
||||
if err := store.StartAll(webhookCallback); err != nil {
|
||||
if err := store.StartAllWithCallbacks(emailCallback, directCallback); err != nil {
|
||||
log.Fatalf("Failed to start connectors: %v", err)
|
||||
}
|
||||
|
||||
|
|
|
|||
17
store.go
17
store.go
|
|
@ -276,6 +276,23 @@ func (s *MessageStore) StartAll(callback func()) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// StartAllWithCallbacks starts connectors with different callbacks for email vs other connectors
|
||||
func (s *MessageStore) StartAllWithCallbacks(emailCallback, otherCallback func()) error {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
|
||||
for _, c := range s.connectors {
|
||||
cb := otherCallback
|
||||
if _, isEmail := c.(*EmailConnector); isEmail {
|
||||
cb = emailCallback
|
||||
}
|
||||
if err := c.Start(cb); err != nil {
|
||||
return fmt.Errorf("start %s: %w", c.Name(), err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// StopAll stops all connectors
|
||||
func (s *MessageStore) StopAll() {
|
||||
s.mu.RLock()
|
||||
|
|
|
|||
|
|
@ -0,0 +1,281 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
// TriageConfig holds LLM triage configuration
|
||||
type TriageConfig struct {
|
||||
Enabled bool `yaml:"enabled"`
|
||||
PromptFile string `yaml:"prompt_file"`
|
||||
Provider ProviderConfig `yaml:"provider"`
|
||||
DashboardURL string `yaml:"dashboard_url"`
|
||||
}
|
||||
|
||||
type ProviderConfig struct {
|
||||
BaseURL string `yaml:"base_url"`
|
||||
APIKey string `yaml:"api_key"`
|
||||
Model string `yaml:"model"`
|
||||
}
|
||||
|
||||
// TriageResult is the expected LLM response
|
||||
type TriageResult struct {
|
||||
Action string `json:"action"` // trash, archive, keep, escalate
|
||||
Reason string `json:"reason"`
|
||||
}
|
||||
|
||||
// triageEmail runs LLM triage on an email message.
|
||||
// Returns the triage result, or nil if triage should be skipped (fall through to webhook).
|
||||
func triageEmail(msg UnifiedMessage, cfg TriageConfig) *TriageResult {
|
||||
// Read prompt file fresh every time
|
||||
promptBytes, err := os.ReadFile(cfg.PromptFile)
|
||||
if err != nil {
|
||||
log.Printf("[triage] Failed to read prompt file %s: %v — falling through to webhook", cfg.PromptFile, err)
|
||||
return nil
|
||||
}
|
||||
systemPrompt := string(promptBytes)
|
||||
|
||||
// Build user message with structured email data
|
||||
attachmentNames := make([]string, 0, len(msg.Attachments))
|
||||
for _, a := range msg.Attachments {
|
||||
attachmentNames = append(attachmentNames, a.Name)
|
||||
}
|
||||
|
||||
userMsg := fmt.Sprintf(`From: %s (%s)
|
||||
To: %s
|
||||
Account: %s
|
||||
Subject: %s
|
||||
Date: %s
|
||||
Has Attachments: %v
|
||||
Attachment Names: %s
|
||||
|
||||
Body:
|
||||
%s`,
|
||||
msg.From, msg.FromName,
|
||||
msg.To,
|
||||
msg.Source,
|
||||
msg.Subject,
|
||||
msg.Timestamp.Format(time.RFC1123),
|
||||
len(msg.Attachments) > 0,
|
||||
strings.Join(attachmentNames, ", "),
|
||||
msg.Body,
|
||||
)
|
||||
|
||||
// Truncate body to avoid huge payloads
|
||||
if len(userMsg) > 12000 {
|
||||
userMsg = userMsg[:12000] + "\n...[truncated]"
|
||||
}
|
||||
|
||||
// Call LLM
|
||||
result, err := callLLM(cfg.Provider, systemPrompt, userMsg)
|
||||
if err != nil {
|
||||
log.Printf("[triage] LLM call failed: %v — falling through to webhook", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
// callLLM calls the OpenAI-compatible API
|
||||
func callLLM(provider ProviderConfig, systemPrompt, userMessage string) (*TriageResult, error) {
|
||||
reqBody := map[string]interface{}{
|
||||
"model": provider.Model,
|
||||
"messages": []map[string]string{
|
||||
{"role": "system", "content": systemPrompt},
|
||||
{"role": "user", "content": userMessage},
|
||||
},
|
||||
"temperature": 0.1,
|
||||
"max_tokens": 256,
|
||||
"response_format": map[string]string{"type": "json_object"},
|
||||
}
|
||||
|
||||
data, err := json.Marshal(reqBody)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("marshal request: %w", err)
|
||||
}
|
||||
|
||||
url := strings.TrimRight(provider.BaseURL, "/") + "/chat/completions"
|
||||
req, err := http.NewRequest("POST", url, bytes.NewReader(data))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create request: %w", err)
|
||||
}
|
||||
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
req.Header.Set("Authorization", "Bearer "+provider.APIKey)
|
||||
|
||||
client := &http.Client{Timeout: 30 * time.Second}
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("send request: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("read response: %w", err)
|
||||
}
|
||||
|
||||
if resp.StatusCode != 200 {
|
||||
return nil, fmt.Errorf("API error %d: %s", resp.StatusCode, string(body))
|
||||
}
|
||||
|
||||
// Parse OpenAI-compatible response
|
||||
var apiResp struct {
|
||||
Choices []struct {
|
||||
Message struct {
|
||||
Content string `json:"content"`
|
||||
} `json:"message"`
|
||||
} `json:"choices"`
|
||||
}
|
||||
|
||||
if err := json.Unmarshal(body, &apiResp); err != nil {
|
||||
return nil, fmt.Errorf("parse API response: %w", err)
|
||||
}
|
||||
|
||||
if len(apiResp.Choices) == 0 {
|
||||
return nil, fmt.Errorf("no choices in API response")
|
||||
}
|
||||
|
||||
content := apiResp.Choices[0].Message.Content
|
||||
|
||||
// Parse the triage result JSON
|
||||
var result TriageResult
|
||||
if err := json.Unmarshal([]byte(content), &result); err != nil {
|
||||
return nil, fmt.Errorf("parse triage result %q: %w", content, err)
|
||||
}
|
||||
|
||||
// Validate action
|
||||
switch result.Action {
|
||||
case "trash", "archive", "keep", "escalate":
|
||||
// valid
|
||||
default:
|
||||
return nil, fmt.Errorf("unknown triage action: %q", result.Action)
|
||||
}
|
||||
|
||||
return &result, nil
|
||||
}
|
||||
|
||||
// executeTriageAction performs the action decided by the LLM
|
||||
func executeTriageAction(msg UnifiedMessage, result TriageResult) error {
|
||||
switch result.Action {
|
||||
case "trash":
|
||||
if err := store.Delete(msg.ID); err != nil {
|
||||
return fmt.Errorf("trash: %w", err)
|
||||
}
|
||||
orch.RecordAction(msg.ID, "delete")
|
||||
case "archive":
|
||||
if err := store.Archive(msg.ID); err != nil {
|
||||
return fmt.Errorf("archive: %w", err)
|
||||
}
|
||||
orch.RecordAction(msg.ID, "archive")
|
||||
case "keep":
|
||||
if err := store.MarkSeen(msg.ID); err != nil {
|
||||
return fmt.Errorf("mark seen: %w", err)
|
||||
}
|
||||
orch.RecordAction(msg.ID, "seen")
|
||||
case "escalate":
|
||||
// Record as seen but don't action — webhook will fire
|
||||
source, sourceID, _ := parseMessageID(msg.ID)
|
||||
orch.RecordSeen(msg.ID, source, sourceID, "INBOX")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// logTriageToDashboard posts a log entry to the alert dashboard
|
||||
func logTriageToDashboard(dashboardURL string, msg UnifiedMessage, result TriageResult) {
|
||||
if dashboardURL == "" {
|
||||
return
|
||||
}
|
||||
|
||||
sender := msg.FromName
|
||||
if sender == "" {
|
||||
sender = msg.From
|
||||
}
|
||||
|
||||
priority := "info"
|
||||
if result.Action == "escalate" {
|
||||
priority = "warning"
|
||||
}
|
||||
|
||||
alert := map[string]string{
|
||||
"message": fmt.Sprintf("📧 [%s] %s: %s — %s", result.Action, sender, msg.Subject, result.Reason),
|
||||
"priority": priority,
|
||||
}
|
||||
|
||||
data, _ := json.Marshal(alert)
|
||||
resp, err := http.Post(dashboardURL, "application/json", bytes.NewReader(data))
|
||||
if err != nil {
|
||||
log.Printf("[triage] Dashboard alert failed: %v", err)
|
||||
return
|
||||
}
|
||||
resp.Body.Close()
|
||||
}
|
||||
|
||||
// triageNewEmails fetches new emails from a specific account, triages them,
|
||||
// and returns true if any messages should escalate (fire webhook).
|
||||
func triageNewEmails(accountName string, cfg TriageConfig) bool {
|
||||
c, ok := store.GetConnector(accountName)
|
||||
if !ok {
|
||||
log.Printf("[triage] Connector %s not found", accountName)
|
||||
return true // fall through to webhook
|
||||
}
|
||||
|
||||
// Only triage email connectors
|
||||
_, isEmail := c.(*EmailConnector)
|
||||
if !isEmail {
|
||||
return true // non-email connectors always webhook
|
||||
}
|
||||
|
||||
msgs, err := c.FetchNew()
|
||||
if err != nil {
|
||||
log.Printf("[triage] Failed to fetch new from %s: %v", accountName, err)
|
||||
return true
|
||||
}
|
||||
|
||||
if len(msgs) == 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
shouldEscalate := false
|
||||
|
||||
for _, msg := range msgs {
|
||||
// Skip already-actioned messages
|
||||
if orch.HasAction(msg.ID) {
|
||||
continue
|
||||
}
|
||||
|
||||
log.Printf("[triage] Processing %s: %s from %s", msg.ID, msg.Subject, msg.From)
|
||||
|
||||
result := triageEmail(msg, cfg)
|
||||
if result == nil {
|
||||
// Triage failed — escalate this message
|
||||
shouldEscalate = true
|
||||
continue
|
||||
}
|
||||
|
||||
log.Printf("[triage] %s → %s: %s", msg.ID, result.Action, result.Reason)
|
||||
|
||||
if err := executeTriageAction(msg, *result); err != nil {
|
||||
log.Printf("[triage] Action failed for %s: %v — escalating", msg.ID, err)
|
||||
shouldEscalate = true
|
||||
continue
|
||||
}
|
||||
|
||||
logTriageToDashboard(cfg.DashboardURL, msg, *result)
|
||||
|
||||
if result.Action == "escalate" {
|
||||
shouldEscalate = true
|
||||
}
|
||||
}
|
||||
|
||||
return shouldEscalate
|
||||
}
|
||||
Loading…
Reference in New Issue