From 90485d381d6605c0896a95a7e61f32f866e33f1c Mon Sep 17 00:00:00 2001 From: James Date: Fri, 13 Feb 2026 23:30:11 -0500 Subject: [PATCH] chore: auto-commit uncommitted changes --- config.yaml | 9 ++ main.go | 32 +++++- store.go | 17 ++++ triage.go | 281 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 336 insertions(+), 3 deletions(-) create mode 100644 triage.go diff --git a/config.yaml b/config.yaml index 258264a..3de412f 100644 --- a/config.yaml +++ b/config.yaml @@ -48,3 +48,12 @@ webhook: enabled: true url: http://localhost:18789/hooks/messages token: "kuma-alert-token-2026" + +triage: + enabled: true + prompt_file: /home/johan/clawd/config/email-triage-prompt.md + provider: + base_url: https://api.fireworks.ai/inference/v1 + api_key: ${FIREWORKS_API_KEY} + model: accounts/fireworks/models/kimi-k2p5 + dashboard_url: http://localhost:9202/api/alerts diff --git a/main.go b/main.go index d5fb39c..b0c7d4e 100644 --- a/main.go +++ b/main.go @@ -25,6 +25,7 @@ type Config struct { Accounts map[string]AccountConfig `yaml:"accounts"` Connectors ConnectorsConfig `yaml:"connectors"` Webhook WebhookConfig `yaml:"webhook"` + Triage TriageConfig `yaml:"triage"` } type ServerConfig struct { @@ -169,13 +170,38 @@ func main() { log.Printf("Registered SMS connector: %s -> %s", name, gatewayURL) } - // Start all connectors with webhook callback - webhookCallback := func() { + // Log triage config + if config.Triage.Enabled { + log.Printf("Email triage enabled (model: %s, prompt: %s)", config.Triage.Provider.Model, config.Triage.PromptFile) + } + + // Start connectors with appropriate callbacks + // Email connectors: triage first, only webhook on escalation + // Other connectors: webhook directly + emailCallback := func() { + if !config.Webhook.Enabled { + return + } + if config.Triage.Enabled { + shouldEscalate := false + for name := range config.Accounts { + if triageNewEmails(name, config.Triage) { + shouldEscalate = true + } + } + if shouldEscalate { + sendWebhook() + } + } else { + sendWebhook() + } + } + directCallback := func() { if config.Webhook.Enabled { sendWebhook() } } - if err := store.StartAll(webhookCallback); err != nil { + if err := store.StartAllWithCallbacks(emailCallback, directCallback); err != nil { log.Fatalf("Failed to start connectors: %v", err) } diff --git a/store.go b/store.go index 5778072..06557b1 100644 --- a/store.go +++ b/store.go @@ -276,6 +276,23 @@ func (s *MessageStore) StartAll(callback func()) error { return nil } +// StartAllWithCallbacks starts connectors with different callbacks for email vs other connectors +func (s *MessageStore) StartAllWithCallbacks(emailCallback, otherCallback func()) error { + s.mu.RLock() + defer s.mu.RUnlock() + + for _, c := range s.connectors { + cb := otherCallback + if _, isEmail := c.(*EmailConnector); isEmail { + cb = emailCallback + } + if err := c.Start(cb); err != nil { + return fmt.Errorf("start %s: %w", c.Name(), err) + } + } + return nil +} + // StopAll stops all connectors func (s *MessageStore) StopAll() { s.mu.RLock() diff --git a/triage.go b/triage.go new file mode 100644 index 0000000..e809880 --- /dev/null +++ b/triage.go @@ -0,0 +1,281 @@ +package main + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "log" + "net/http" + "os" + "strings" + "time" +) + +// TriageConfig holds LLM triage configuration +type TriageConfig struct { + Enabled bool `yaml:"enabled"` + PromptFile string `yaml:"prompt_file"` + Provider ProviderConfig `yaml:"provider"` + DashboardURL string `yaml:"dashboard_url"` +} + +type ProviderConfig struct { + BaseURL string `yaml:"base_url"` + APIKey string `yaml:"api_key"` + Model string `yaml:"model"` +} + +// TriageResult is the expected LLM response +type TriageResult struct { + Action string `json:"action"` // trash, archive, keep, escalate + Reason string `json:"reason"` +} + +// triageEmail runs LLM triage on an email message. +// Returns the triage result, or nil if triage should be skipped (fall through to webhook). +func triageEmail(msg UnifiedMessage, cfg TriageConfig) *TriageResult { + // Read prompt file fresh every time + promptBytes, err := os.ReadFile(cfg.PromptFile) + if err != nil { + log.Printf("[triage] Failed to read prompt file %s: %v — falling through to webhook", cfg.PromptFile, err) + return nil + } + systemPrompt := string(promptBytes) + + // Build user message with structured email data + attachmentNames := make([]string, 0, len(msg.Attachments)) + for _, a := range msg.Attachments { + attachmentNames = append(attachmentNames, a.Name) + } + + userMsg := fmt.Sprintf(`From: %s (%s) +To: %s +Account: %s +Subject: %s +Date: %s +Has Attachments: %v +Attachment Names: %s + +Body: +%s`, + msg.From, msg.FromName, + msg.To, + msg.Source, + msg.Subject, + msg.Timestamp.Format(time.RFC1123), + len(msg.Attachments) > 0, + strings.Join(attachmentNames, ", "), + msg.Body, + ) + + // Truncate body to avoid huge payloads + if len(userMsg) > 12000 { + userMsg = userMsg[:12000] + "\n...[truncated]" + } + + // Call LLM + result, err := callLLM(cfg.Provider, systemPrompt, userMsg) + if err != nil { + log.Printf("[triage] LLM call failed: %v — falling through to webhook", err) + return nil + } + + return result +} + +// callLLM calls the OpenAI-compatible API +func callLLM(provider ProviderConfig, systemPrompt, userMessage string) (*TriageResult, error) { + reqBody := map[string]interface{}{ + "model": provider.Model, + "messages": []map[string]string{ + {"role": "system", "content": systemPrompt}, + {"role": "user", "content": userMessage}, + }, + "temperature": 0.1, + "max_tokens": 256, + "response_format": map[string]string{"type": "json_object"}, + } + + data, err := json.Marshal(reqBody) + if err != nil { + return nil, fmt.Errorf("marshal request: %w", err) + } + + url := strings.TrimRight(provider.BaseURL, "/") + "/chat/completions" + req, err := http.NewRequest("POST", url, bytes.NewReader(data)) + if err != nil { + return nil, fmt.Errorf("create request: %w", err) + } + + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", "Bearer "+provider.APIKey) + + client := &http.Client{Timeout: 30 * time.Second} + resp, err := client.Do(req) + if err != nil { + return nil, fmt.Errorf("send request: %w", err) + } + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("read response: %w", err) + } + + if resp.StatusCode != 200 { + return nil, fmt.Errorf("API error %d: %s", resp.StatusCode, string(body)) + } + + // Parse OpenAI-compatible response + var apiResp struct { + Choices []struct { + Message struct { + Content string `json:"content"` + } `json:"message"` + } `json:"choices"` + } + + if err := json.Unmarshal(body, &apiResp); err != nil { + return nil, fmt.Errorf("parse API response: %w", err) + } + + if len(apiResp.Choices) == 0 { + return nil, fmt.Errorf("no choices in API response") + } + + content := apiResp.Choices[0].Message.Content + + // Parse the triage result JSON + var result TriageResult + if err := json.Unmarshal([]byte(content), &result); err != nil { + return nil, fmt.Errorf("parse triage result %q: %w", content, err) + } + + // Validate action + switch result.Action { + case "trash", "archive", "keep", "escalate": + // valid + default: + return nil, fmt.Errorf("unknown triage action: %q", result.Action) + } + + return &result, nil +} + +// executeTriageAction performs the action decided by the LLM +func executeTriageAction(msg UnifiedMessage, result TriageResult) error { + switch result.Action { + case "trash": + if err := store.Delete(msg.ID); err != nil { + return fmt.Errorf("trash: %w", err) + } + orch.RecordAction(msg.ID, "delete") + case "archive": + if err := store.Archive(msg.ID); err != nil { + return fmt.Errorf("archive: %w", err) + } + orch.RecordAction(msg.ID, "archive") + case "keep": + if err := store.MarkSeen(msg.ID); err != nil { + return fmt.Errorf("mark seen: %w", err) + } + orch.RecordAction(msg.ID, "seen") + case "escalate": + // Record as seen but don't action — webhook will fire + source, sourceID, _ := parseMessageID(msg.ID) + orch.RecordSeen(msg.ID, source, sourceID, "INBOX") + } + + return nil +} + +// logTriageToDashboard posts a log entry to the alert dashboard +func logTriageToDashboard(dashboardURL string, msg UnifiedMessage, result TriageResult) { + if dashboardURL == "" { + return + } + + sender := msg.FromName + if sender == "" { + sender = msg.From + } + + priority := "info" + if result.Action == "escalate" { + priority = "warning" + } + + alert := map[string]string{ + "message": fmt.Sprintf("📧 [%s] %s: %s — %s", result.Action, sender, msg.Subject, result.Reason), + "priority": priority, + } + + data, _ := json.Marshal(alert) + resp, err := http.Post(dashboardURL, "application/json", bytes.NewReader(data)) + if err != nil { + log.Printf("[triage] Dashboard alert failed: %v", err) + return + } + resp.Body.Close() +} + +// triageNewEmails fetches new emails from a specific account, triages them, +// and returns true if any messages should escalate (fire webhook). +func triageNewEmails(accountName string, cfg TriageConfig) bool { + c, ok := store.GetConnector(accountName) + if !ok { + log.Printf("[triage] Connector %s not found", accountName) + return true // fall through to webhook + } + + // Only triage email connectors + _, isEmail := c.(*EmailConnector) + if !isEmail { + return true // non-email connectors always webhook + } + + msgs, err := c.FetchNew() + if err != nil { + log.Printf("[triage] Failed to fetch new from %s: %v", accountName, err) + return true + } + + if len(msgs) == 0 { + return false + } + + shouldEscalate := false + + for _, msg := range msgs { + // Skip already-actioned messages + if orch.HasAction(msg.ID) { + continue + } + + log.Printf("[triage] Processing %s: %s from %s", msg.ID, msg.Subject, msg.From) + + result := triageEmail(msg, cfg) + if result == nil { + // Triage failed — escalate this message + shouldEscalate = true + continue + } + + log.Printf("[triage] %s → %s: %s", msg.ID, result.Action, result.Reason) + + if err := executeTriageAction(msg, *result); err != nil { + log.Printf("[triage] Action failed for %s: %v — escalating", msg.ID, err) + shouldEscalate = true + continue + } + + logTriageToDashboard(cfg.DashboardURL, msg, *result) + + if result.Action == "escalate" { + shouldEscalate = true + } + } + + return shouldEscalate +}