message-center/main.go

768 lines
19 KiB
Go

package main
import (
"bytes"
"encoding/json"
"flag"
"fmt"
"io"
"log"
"net/http"
"os"
"os/signal"
"path/filepath"
"strings"
"syscall"
"time"
"gopkg.in/yaml.v3"
)
// Config structures
type Config struct {
Server ServerConfig `yaml:"server"`
DataDir string `yaml:"data_dir"`
Accounts map[string]AccountConfig `yaml:"accounts"`
Connectors ConnectorsConfig `yaml:"connectors"`
Webhook WebhookConfig `yaml:"webhook"`
}
type ServerConfig struct {
Host string `yaml:"host"`
Port int `yaml:"port"`
}
type AccountConfig struct {
Host string `yaml:"host"`
Port int `yaml:"port"`
Username string `yaml:"username"`
Password string `yaml:"password"`
TLS string `yaml:"tls"`
Watch []string `yaml:"watch"`
SMTP struct {
Host string `yaml:"host"`
Port int `yaml:"port"`
Username string `yaml:"username"`
Password string `yaml:"password"`
From string `yaml:"from"`
} `yaml:"smtp"`
}
type ConnectorsConfig struct {
WhatsApp struct {
Enabled bool `yaml:"enabled"`
Name string `yaml:"name"`
BaseURL string `yaml:"base_url"`
} `yaml:"whatsapp"`
SMS struct {
Enabled bool `yaml:"enabled"`
Name string `yaml:"name"`
GatewayURL string `yaml:"gateway_url"`
} `yaml:"sms"`
}
type WebhookConfig struct {
Enabled bool `yaml:"enabled"`
URL string `yaml:"url"`
Token string `yaml:"token"`
}
// Global state
var (
config Config
store *MessageStore
orch *OrchestrationDB
)
func main() {
configPath := flag.String("config", "config.yaml", "Path to config file")
flag.Parse()
// Load config
data, err := os.ReadFile(*configPath)
if err != nil {
log.Fatalf("Failed to read config: %v", err)
}
// Expand env vars in config
expanded := os.ExpandEnv(string(data))
if err := yaml.Unmarshal([]byte(expanded), &config); err != nil {
log.Fatalf("Failed to parse config: %v", err)
}
// Default data directory
if config.DataDir == "" {
home, _ := os.UserHomeDir()
config.DataDir = filepath.Join(home, ".message-center")
} else if strings.HasPrefix(config.DataDir, "~/") {
home, _ := os.UserHomeDir()
config.DataDir = filepath.Join(home, config.DataDir[2:])
}
os.MkdirAll(config.DataDir, 0755)
log.Printf("Message Center starting...")
log.Printf("Data directory: %s", config.DataDir)
// Initialize store
store = NewMessageStore(config.DataDir)
// Initialize orchestration DB
orchPath := filepath.Join(config.DataDir, "orchestration.db")
orch, err = NewOrchestrationDB(orchPath)
if err != nil {
log.Fatalf("Failed to open orchestration DB: %v", err)
}
defer orch.Close()
// Register email connectors
for name, acc := range config.Accounts {
smtpConfig := SMTPConfig{
Host: acc.SMTP.Host,
Port: acc.SMTP.Port,
Username: acc.SMTP.Username,
Password: acc.SMTP.Password,
From: acc.SMTP.From,
}
// Default SMTP to same credentials if not specified
if smtpConfig.Host == "" {
smtpConfig.Host = acc.Host
smtpConfig.Port = 1025 // Default Proton Bridge SMTP
smtpConfig.Username = acc.Username
smtpConfig.Password = acc.Password
smtpConfig.From = acc.Username
}
connector := NewEmailConnector(name, acc, smtpConfig)
store.RegisterConnector(connector)
log.Printf("Registered email connector: %s", name)
}
// Register WhatsApp connector
if config.Connectors.WhatsApp.Enabled {
name := config.Connectors.WhatsApp.Name
if name == "" {
name = "whatsapp"
}
baseURL := config.Connectors.WhatsApp.BaseURL
if baseURL == "" {
baseURL = "http://localhost:8030"
}
connector := NewWhatsAppConnector(name, baseURL)
store.RegisterConnector(connector)
log.Printf("Registered WhatsApp connector: %s -> %s", name, baseURL)
}
// Register SMS connector (via ClawdNode Gateway)
if config.Connectors.SMS.Enabled {
name := config.Connectors.SMS.Name
if name == "" {
name = "sms"
}
gatewayURL := config.Connectors.SMS.GatewayURL
if gatewayURL == "" {
gatewayURL = "http://localhost:9877"
}
connector := NewSmsConnector(name, gatewayURL)
store.RegisterConnector(connector)
log.Printf("Registered SMS connector: %s -> %s", name, gatewayURL)
}
// Start all connectors with webhook callback
webhookCallback := func() {
if config.Webhook.Enabled {
sendWebhook()
}
}
if err := store.StartAll(webhookCallback); err != nil {
log.Fatalf("Failed to start connectors: %v", err)
}
// HTTP server
mux := http.NewServeMux()
// Health check
mux.HandleFunc("/health", handleHealth)
// Unified message endpoints
mux.HandleFunc("/messages", handleMessages)
mux.HandleFunc("/messages/", handleMessageRoutes)
// Legacy account-based endpoints (for backwards compatibility)
mux.HandleFunc("/accounts", handleListAccounts)
mux.HandleFunc("/accounts/", handleAccountRoutes)
addr := fmt.Sprintf("%s:%d", config.Server.Host, config.Server.Port)
server := &http.Server{Addr: addr, Handler: logMiddleware(mux)}
// Graceful shutdown
go func() {
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
<-sigCh
log.Println("Shutting down...")
store.StopAll()
server.Shutdown(nil)
}()
log.Printf("Server listening on %s", addr)
if err := server.ListenAndServe(); err != http.ErrServerClosed {
log.Fatalf("Server error: %v", err)
}
}
func logMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
start := time.Now()
next.ServeHTTP(w, r)
log.Printf("%s %s %v", r.Method, r.URL.Path, time.Since(start).Round(time.Millisecond))
})
}
func sendWebhook() {
if !config.Webhook.Enabled || config.Webhook.URL == "" {
return
}
// Simple webhook: just notify that there are new messages
payload := map[string]string{"event": "new"}
data, _ := json.Marshal(payload)
req, err := http.NewRequest("POST", config.Webhook.URL, bytes.NewReader(data))
if err != nil {
log.Printf("Webhook request error: %v", err)
return
}
req.Header.Set("Content-Type", "application/json")
if config.Webhook.Token != "" {
req.Header.Set("Authorization", "Bearer "+config.Webhook.Token)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
log.Printf("Webhook send error: %v", err)
return
}
defer resp.Body.Close()
if resp.StatusCode >= 400 {
body, _ := io.ReadAll(resp.Body)
log.Printf("Webhook error %d: %s", resp.StatusCode, string(body))
} else {
log.Printf("Webhook sent: new")
}
}
// HTTP handlers
func handleHealth(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("ok"))
}
// GET /messages?since=24h - list messages
// GET /messages/new - unseen messages
// POST /messages/ack - acknowledge messages (advance cursor)
func handleMessages(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
// Handle /messages/new and /messages/ack via the path
if strings.HasSuffix(r.URL.Path, "/new") {
handleMessagesNew(w, r)
return
}
if strings.HasSuffix(r.URL.Path, "/ack") {
handleMessagesAck(w, r)
return
}
if r.Method != "GET" {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
// Parse since parameter
sinceStr := r.URL.Query().Get("since")
if sinceStr == "" {
sinceStr = "24h" // Default to last 24 hours
}
duration, err := ParseDuration(sinceStr)
if err != nil {
http.Error(w, fmt.Sprintf("Invalid since parameter: %v", err), http.StatusBadRequest)
return
}
messages, err := store.FetchSince(duration)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
// Filter by source if specified
sourceFilter := r.URL.Query().Get("source")
showAll := r.URL.Query().Get("all") == "true"
var filtered []UnifiedMessage
for _, msg := range messages {
if sourceFilter != "" && msg.Source != sourceFilter {
continue
}
// By default, exclude actioned messages (archived/deleted/etc.)
// Use ?all=true to include everything
if !showAll && orch.HasAction(msg.ID) {
continue
}
filtered = append(filtered, msg)
}
json.NewEncoder(w).Encode(filtered)
}
func handleMessagesNew(w http.ResponseWriter, r *http.Request) {
if r.Method != "GET" {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
messages, err := store.FetchNew()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
// Filter by source if specified
sourceFilter := r.URL.Query().Get("source")
// Filter by orchestration state and record new messages
var filtered []UnifiedMessage
for _, msg := range messages {
// Skip if source filter is set and doesn't match
if sourceFilter != "" && msg.Source != sourceFilter {
continue
}
// Record that we've seen this message
source, sourceID, _ := parseMessageID(msg.ID)
orch.RecordSeen(msg.ID, source, sourceID, "INBOX")
// Only include if no action has been taken
if !orch.HasAction(msg.ID) {
filtered = append(filtered, msg)
}
}
json.NewEncoder(w).Encode(filtered)
}
func handleMessagesAck(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
var req struct {
Consumer string `json:"consumer"`
Timestamp time.Time `json:"timestamp"`
IDs []string `json:"ids"` // Optional: specific message IDs to ack
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if req.Consumer == "" {
http.Error(w, "consumer is required", http.StatusBadRequest)
return
}
if req.Timestamp.IsZero() {
req.Timestamp = time.Now()
}
// Ack cursor-based
if err := store.Ack(req.Consumer, req.Timestamp); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
// Also record per-message acks in orchestration DB
for _, id := range req.IDs {
orch.RecordAck(id, req.Consumer)
}
json.NewEncoder(w).Encode(map[string]interface{}{
"status": "ok",
"consumer": req.Consumer,
"timestamp": req.Timestamp,
"acked": len(req.IDs),
})
}
// Handle /messages/{id} and /messages/{id}/action
func handleMessageRoutes(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
// Parse path: /messages/{id}[/action]
path := strings.TrimPrefix(r.URL.Path, "/messages/")
if path == "" || path == "new" || path == "ack" {
handleMessages(w, r)
return
}
parts := strings.SplitN(path, "/", 2)
messageID := parts[0]
// Reconstruct full message ID if it doesn't contain ':'
// This handles URL-encoded colons
messageID = strings.ReplaceAll(messageID, "%3A", ":")
if len(parts) == 1 {
// GET /messages/{id}
handleGetMessage(w, r, messageID)
return
}
action := parts[1]
switch action {
case "archive":
handleArchive(w, r, messageID)
case "delete":
handleDelete(w, r, messageID)
case "reply":
handleReply(w, r, messageID)
case "to-docs":
handleToDocs(w, r, messageID)
case "attachments":
handleAttachments(w, r, messageID)
case "seen":
handleSeen(w, r, messageID)
default:
http.Error(w, "Unknown action", http.StatusNotFound)
}
}
func handleGetMessage(w http.ResponseWriter, r *http.Request, id string) {
if r.Method != "GET" {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
msg, err := store.FetchOne(id)
if err != nil {
http.Error(w, err.Error(), http.StatusNotFound)
return
}
json.NewEncoder(w).Encode(msg)
}
func handleArchive(w http.ResponseWriter, r *http.Request, id string) {
if r.Method != "POST" {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
if err := store.Archive(id); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
// Record action in orchestration DB
orch.RecordAction(id, "archive")
json.NewEncoder(w).Encode(map[string]string{"status": "archived"})
}
// POST /messages/{id}/seen - mark as seen without archiving/deleting
// Removes from /messages/new but keeps message in its original mailbox
func handleSeen(w http.ResponseWriter, r *http.Request, id string) {
if r.Method != "POST" {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
orch.RecordAction(id, "seen")
json.NewEncoder(w).Encode(map[string]string{"status": "seen"})
}
func handleDelete(w http.ResponseWriter, r *http.Request, id string) {
if r.Method != "POST" {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
if err := store.Delete(id); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
// Record action in orchestration DB
orch.RecordAction(id, "delete")
json.NewEncoder(w).Encode(map[string]string{"status": "deleted"})
}
func handleReply(w http.ResponseWriter, r *http.Request, id string) {
if r.Method != "POST" {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
var req struct {
Body string `json:"body"`
Attachments []string `json:"attachments"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if req.Body == "" {
http.Error(w, "body is required", http.StatusBadRequest)
return
}
if err := store.Reply(id, req.Body, req.Attachments); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
// Record action in orchestration DB
orch.RecordAction(id, "reply")
json.NewEncoder(w).Encode(map[string]string{"status": "sent"})
}
func handleToDocs(w http.ResponseWriter, r *http.Request, id string) {
if r.Method != "POST" {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
var req struct {
Attachments []string `json:"attachments"` // Optional: specific attachments, empty = all
}
json.NewDecoder(r.Body).Decode(&req) // Ignore error, req is optional
// Get the message
msg, err := store.FetchOne(id)
if err != nil {
http.Error(w, err.Error(), http.StatusNotFound)
return
}
if len(msg.Attachments) == 0 {
http.Error(w, "Message has no attachments", http.StatusBadRequest)
return
}
// Create inbox directory
home, _ := os.UserHomeDir()
inboxDir := filepath.Join(home, "documents", "inbox")
os.MkdirAll(inboxDir, 0755)
// Download and save each attachment
var saved []string
var errors []string
wantedSet := make(map[string]bool)
for _, name := range req.Attachments {
wantedSet[name] = true
}
for _, att := range msg.Attachments {
// Skip if not in requested list (when list is specified)
if len(req.Attachments) > 0 && !wantedSet[att.Name] {
continue
}
content, err := store.GetAttachment(id, att.Name)
if err != nil {
errors = append(errors, fmt.Sprintf("%s: %v", att.Name, err))
continue
}
destPath := filepath.Join(inboxDir, att.Name)
if err := os.WriteFile(destPath, content, 0644); err != nil {
errors = append(errors, fmt.Sprintf("%s: %v", att.Name, err))
continue
}
saved = append(saved, destPath)
}
result := map[string]interface{}{
"saved": saved,
"errors": errors,
}
if len(errors) > 0 && len(saved) == 0 {
w.WriteHeader(http.StatusInternalServerError)
} else if len(saved) > 0 {
// Record action in orchestration DB (only if we saved something)
orch.RecordAction(id, "to-docs")
}
json.NewEncoder(w).Encode(result)
}
func handleAttachments(w http.ResponseWriter, r *http.Request, id string) {
if r.Method != "GET" {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
// Check for specific attachment name in query
filename := r.URL.Query().Get("name")
if filename != "" {
content, err := store.GetAttachment(id, filename)
if err != nil {
http.Error(w, err.Error(), http.StatusNotFound)
return
}
// Return raw content
w.Header().Set("Content-Type", "application/octet-stream")
w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%q", filename))
w.Write(content)
return
}
// Return attachment list
msg, err := store.FetchOne(id)
if err != nil {
http.Error(w, err.Error(), http.StatusNotFound)
return
}
json.NewEncoder(w).Encode(msg.Attachments)
}
// Legacy endpoints for backwards compatibility
func handleListAccounts(w http.ResponseWriter, r *http.Request) {
accounts := make([]string, 0, len(config.Accounts))
for name := range config.Accounts {
accounts = append(accounts, name)
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(accounts)
}
func handleAccountRoutes(w http.ResponseWriter, r *http.Request) {
// Parse path: /accounts/{account}/...
parts := strings.Split(strings.Trim(r.URL.Path, "/"), "/")
if len(parts) < 2 {
http.Error(w, "Invalid path", http.StatusBadRequest)
return
}
accountName := parts[1]
acc, ok := config.Accounts[accountName]
if !ok {
http.Error(w, "Account not found", http.StatusNotFound)
return
}
if len(parts) == 2 {
// /accounts/{account} - account info
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]interface{}{
"name": accountName,
"host": acc.Host,
"watch": acc.Watch,
})
return
}
switch parts[2] {
case "mailboxes":
handleLegacyMailboxes(w, r, accountName)
case "messages":
handleLegacyMessages(w, r, accountName, parts[3:])
default:
http.Error(w, "Not found", http.StatusNotFound)
}
}
func handleLegacyMailboxes(w http.ResponseWriter, r *http.Request, accountName string) {
// Get connector
c, ok := store.GetConnector(accountName)
if !ok {
http.Error(w, "Account not found", http.StatusNotFound)
return
}
// Only email connectors support mailboxes
emailConn, ok := c.(*EmailConnector)
if !ok {
http.Error(w, "Not an email account", http.StatusBadRequest)
return
}
client, err := emailConn.connect()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
defer client.Close()
mailboxes, err := client.List("", "*", nil).Collect()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
result := make([]map[string]interface{}, 0, len(mailboxes))
for _, mbox := range mailboxes {
result = append(result, map[string]interface{}{
"name": mbox.Mailbox,
"delimiter": string(mbox.Delim),
})
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(result)
}
func handleLegacyMessages(w http.ResponseWriter, r *http.Request, accountName string, pathParts []string) {
// Redirect to unified API
if len(pathParts) == 0 {
// List messages from this account only
c, ok := store.GetConnector(accountName)
if !ok {
http.Error(w, "Account not found", http.StatusNotFound)
return
}
since := time.Now().Add(-24 * time.Hour)
if s := r.URL.Query().Get("since"); s != "" {
if d, err := ParseDuration(s); err == nil {
since = time.Now().Add(-d)
}
}
msgs, err := c.FetchSince(since)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(msgs)
return
}
// Single message - redirect to unified endpoint
uid := pathParts[0]
fullID := fmt.Sprintf("%s:%s", accountName, uid)
handleMessageRoutes(w, r)
_ = fullID // Silence unused variable warning
}