package main import ( "bytes" "encoding/json" "flag" "fmt" "io" "log" "net/http" "os" "os/signal" "path/filepath" "strings" "syscall" "time" "gopkg.in/yaml.v3" ) // Config structures type Config struct { Server ServerConfig `yaml:"server"` DataDir string `yaml:"data_dir"` Accounts map[string]AccountConfig `yaml:"accounts"` Connectors ConnectorsConfig `yaml:"connectors"` Webhook WebhookConfig `yaml:"webhook"` } type ServerConfig struct { Host string `yaml:"host"` Port int `yaml:"port"` } type AccountConfig struct { Host string `yaml:"host"` Port int `yaml:"port"` Username string `yaml:"username"` Password string `yaml:"password"` TLS string `yaml:"tls"` Watch []string `yaml:"watch"` SMTP struct { Host string `yaml:"host"` Port int `yaml:"port"` Username string `yaml:"username"` Password string `yaml:"password"` From string `yaml:"from"` } `yaml:"smtp"` } type ConnectorsConfig struct { WhatsApp struct { Enabled bool `yaml:"enabled"` Name string `yaml:"name"` BaseURL string `yaml:"base_url"` } `yaml:"whatsapp"` } type WebhookConfig struct { Enabled bool `yaml:"enabled"` URL string `yaml:"url"` Token string `yaml:"token"` } // Global state var ( config Config store *MessageStore orch *OrchestrationDB ) func main() { configPath := flag.String("config", "config.yaml", "Path to config file") flag.Parse() // Load config data, err := os.ReadFile(*configPath) if err != nil { log.Fatalf("Failed to read config: %v", err) } // Expand env vars in config expanded := os.ExpandEnv(string(data)) if err := yaml.Unmarshal([]byte(expanded), &config); err != nil { log.Fatalf("Failed to parse config: %v", err) } // Default data directory if config.DataDir == "" { home, _ := os.UserHomeDir() config.DataDir = filepath.Join(home, ".message-center") } else if strings.HasPrefix(config.DataDir, "~/") { home, _ := os.UserHomeDir() config.DataDir = filepath.Join(home, config.DataDir[2:]) } os.MkdirAll(config.DataDir, 0755) log.Printf("Message Center starting...") log.Printf("Data directory: %s", config.DataDir) // Initialize store store = NewMessageStore(config.DataDir) // Initialize orchestration DB orchPath := filepath.Join(config.DataDir, "orchestration.db") orch, err = NewOrchestrationDB(orchPath) if err != nil { log.Fatalf("Failed to open orchestration DB: %v", err) } defer orch.Close() // Register email connectors for name, acc := range config.Accounts { smtpConfig := SMTPConfig{ Host: acc.SMTP.Host, Port: acc.SMTP.Port, Username: acc.SMTP.Username, Password: acc.SMTP.Password, From: acc.SMTP.From, } // Default SMTP to same credentials if not specified if smtpConfig.Host == "" { smtpConfig.Host = acc.Host smtpConfig.Port = 1025 // Default Proton Bridge SMTP smtpConfig.Username = acc.Username smtpConfig.Password = acc.Password smtpConfig.From = acc.Username } connector := NewEmailConnector(name, acc, smtpConfig) store.RegisterConnector(connector) log.Printf("Registered email connector: %s", name) } // Register WhatsApp connector if config.Connectors.WhatsApp.Enabled { name := config.Connectors.WhatsApp.Name if name == "" { name = "whatsapp" } baseURL := config.Connectors.WhatsApp.BaseURL if baseURL == "" { baseURL = "http://localhost:8030" } connector := NewWhatsAppConnector(name, baseURL) store.RegisterConnector(connector) log.Printf("Registered WhatsApp connector: %s -> %s", name, baseURL) } // Start all connectors with webhook callback webhookCallback := func() { if config.Webhook.Enabled { sendWebhook() } } if err := store.StartAll(webhookCallback); err != nil { log.Fatalf("Failed to start connectors: %v", err) } // HTTP server mux := http.NewServeMux() // Health check mux.HandleFunc("/health", handleHealth) // Unified message endpoints mux.HandleFunc("/messages", handleMessages) mux.HandleFunc("/messages/", handleMessageRoutes) // Legacy account-based endpoints (for backwards compatibility) mux.HandleFunc("/accounts", handleListAccounts) mux.HandleFunc("/accounts/", handleAccountRoutes) addr := fmt.Sprintf("%s:%d", config.Server.Host, config.Server.Port) server := &http.Server{Addr: addr, Handler: logMiddleware(mux)} // Graceful shutdown go func() { sigCh := make(chan os.Signal, 1) signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) <-sigCh log.Println("Shutting down...") store.StopAll() server.Shutdown(nil) }() log.Printf("Server listening on %s", addr) if err := server.ListenAndServe(); err != http.ErrServerClosed { log.Fatalf("Server error: %v", err) } } func logMiddleware(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { start := time.Now() next.ServeHTTP(w, r) log.Printf("%s %s %v", r.Method, r.URL.Path, time.Since(start).Round(time.Millisecond)) }) } func sendWebhook() { if !config.Webhook.Enabled || config.Webhook.URL == "" { return } // Simple webhook: just notify that there are new messages payload := map[string]string{"event": "new"} data, _ := json.Marshal(payload) req, err := http.NewRequest("POST", config.Webhook.URL, bytes.NewReader(data)) if err != nil { log.Printf("Webhook request error: %v", err) return } req.Header.Set("Content-Type", "application/json") if config.Webhook.Token != "" { req.Header.Set("Authorization", "Bearer "+config.Webhook.Token) } resp, err := http.DefaultClient.Do(req) if err != nil { log.Printf("Webhook send error: %v", err) return } defer resp.Body.Close() if resp.StatusCode >= 400 { body, _ := io.ReadAll(resp.Body) log.Printf("Webhook error %d: %s", resp.StatusCode, string(body)) } else { log.Printf("Webhook sent: new") } } // HTTP handlers func handleHealth(w http.ResponseWriter, r *http.Request) { w.Write([]byte("ok")) } // GET /messages?since=24h - list messages // GET /messages/new - unseen messages // POST /messages/ack - acknowledge messages (advance cursor) func handleMessages(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") // Handle /messages/new and /messages/ack via the path if strings.HasSuffix(r.URL.Path, "/new") { handleMessagesNew(w, r) return } if strings.HasSuffix(r.URL.Path, "/ack") { handleMessagesAck(w, r) return } if r.Method != "GET" { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return } // Parse since parameter sinceStr := r.URL.Query().Get("since") if sinceStr == "" { sinceStr = "24h" // Default to last 24 hours } duration, err := ParseDuration(sinceStr) if err != nil { http.Error(w, fmt.Sprintf("Invalid since parameter: %v", err), http.StatusBadRequest) return } messages, err := store.FetchSince(duration) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } json.NewEncoder(w).Encode(messages) } func handleMessagesNew(w http.ResponseWriter, r *http.Request) { if r.Method != "GET" { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return } messages, err := store.FetchNew() if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } // Filter by orchestration state and record new messages var filtered []UnifiedMessage for _, msg := range messages { // Record that we've seen this message source, sourceID, _ := parseMessageID(msg.ID) orch.RecordSeen(msg.ID, source, sourceID, "INBOX") // Only include if no action has been taken if !orch.HasAction(msg.ID) { filtered = append(filtered, msg) } } json.NewEncoder(w).Encode(filtered) } func handleMessagesAck(w http.ResponseWriter, r *http.Request) { if r.Method != "POST" { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return } var req struct { Consumer string `json:"consumer"` Timestamp time.Time `json:"timestamp"` IDs []string `json:"ids"` // Optional: specific message IDs to ack } if err := json.NewDecoder(r.Body).Decode(&req); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } if req.Consumer == "" { http.Error(w, "consumer is required", http.StatusBadRequest) return } if req.Timestamp.IsZero() { req.Timestamp = time.Now() } // Ack cursor-based if err := store.Ack(req.Consumer, req.Timestamp); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } // Also record per-message acks in orchestration DB for _, id := range req.IDs { orch.RecordAck(id, req.Consumer) } json.NewEncoder(w).Encode(map[string]interface{}{ "status": "ok", "consumer": req.Consumer, "timestamp": req.Timestamp, "acked": len(req.IDs), }) } // Handle /messages/{id} and /messages/{id}/action func handleMessageRoutes(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") // Parse path: /messages/{id}[/action] path := strings.TrimPrefix(r.URL.Path, "/messages/") if path == "" || path == "new" || path == "ack" { handleMessages(w, r) return } parts := strings.SplitN(path, "/", 2) messageID := parts[0] // Reconstruct full message ID if it doesn't contain ':' // This handles URL-encoded colons messageID = strings.ReplaceAll(messageID, "%3A", ":") if len(parts) == 1 { // GET /messages/{id} handleGetMessage(w, r, messageID) return } action := parts[1] switch action { case "archive": handleArchive(w, r, messageID) case "delete": handleDelete(w, r, messageID) case "reply": handleReply(w, r, messageID) case "to-docs": handleToDocs(w, r, messageID) case "attachments": handleAttachments(w, r, messageID) default: http.Error(w, "Unknown action", http.StatusNotFound) } } func handleGetMessage(w http.ResponseWriter, r *http.Request, id string) { if r.Method != "GET" { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return } msg, err := store.FetchOne(id) if err != nil { http.Error(w, err.Error(), http.StatusNotFound) return } json.NewEncoder(w).Encode(msg) } func handleArchive(w http.ResponseWriter, r *http.Request, id string) { if r.Method != "POST" { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return } if err := store.Archive(id); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } // Record action in orchestration DB orch.RecordAction(id, "archive") json.NewEncoder(w).Encode(map[string]string{"status": "archived"}) } func handleDelete(w http.ResponseWriter, r *http.Request, id string) { if r.Method != "POST" { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return } if err := store.Delete(id); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } // Record action in orchestration DB orch.RecordAction(id, "delete") json.NewEncoder(w).Encode(map[string]string{"status": "deleted"}) } func handleReply(w http.ResponseWriter, r *http.Request, id string) { if r.Method != "POST" { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return } var req struct { Body string `json:"body"` Attachments []string `json:"attachments"` } if err := json.NewDecoder(r.Body).Decode(&req); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } if req.Body == "" { http.Error(w, "body is required", http.StatusBadRequest) return } if err := store.Reply(id, req.Body, req.Attachments); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } // Record action in orchestration DB orch.RecordAction(id, "reply") json.NewEncoder(w).Encode(map[string]string{"status": "sent"}) } func handleToDocs(w http.ResponseWriter, r *http.Request, id string) { if r.Method != "POST" { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return } var req struct { Attachments []string `json:"attachments"` // Optional: specific attachments, empty = all } json.NewDecoder(r.Body).Decode(&req) // Ignore error, req is optional // Get the message msg, err := store.FetchOne(id) if err != nil { http.Error(w, err.Error(), http.StatusNotFound) return } if len(msg.Attachments) == 0 { http.Error(w, "Message has no attachments", http.StatusBadRequest) return } // Create inbox directory home, _ := os.UserHomeDir() inboxDir := filepath.Join(home, "documents", "inbox") os.MkdirAll(inboxDir, 0755) // Download and save each attachment var saved []string var errors []string wantedSet := make(map[string]bool) for _, name := range req.Attachments { wantedSet[name] = true } for _, att := range msg.Attachments { // Skip if not in requested list (when list is specified) if len(req.Attachments) > 0 && !wantedSet[att.Name] { continue } content, err := store.GetAttachment(id, att.Name) if err != nil { errors = append(errors, fmt.Sprintf("%s: %v", att.Name, err)) continue } destPath := filepath.Join(inboxDir, att.Name) if err := os.WriteFile(destPath, content, 0644); err != nil { errors = append(errors, fmt.Sprintf("%s: %v", att.Name, err)) continue } saved = append(saved, destPath) } result := map[string]interface{}{ "saved": saved, "errors": errors, } if len(errors) > 0 && len(saved) == 0 { w.WriteHeader(http.StatusInternalServerError) } else if len(saved) > 0 { // Record action in orchestration DB (only if we saved something) orch.RecordAction(id, "to-docs") } json.NewEncoder(w).Encode(result) } func handleAttachments(w http.ResponseWriter, r *http.Request, id string) { if r.Method != "GET" { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return } // Check for specific attachment name in query filename := r.URL.Query().Get("name") if filename != "" { content, err := store.GetAttachment(id, filename) if err != nil { http.Error(w, err.Error(), http.StatusNotFound) return } // Return raw content w.Header().Set("Content-Type", "application/octet-stream") w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%q", filename)) w.Write(content) return } // Return attachment list msg, err := store.FetchOne(id) if err != nil { http.Error(w, err.Error(), http.StatusNotFound) return } json.NewEncoder(w).Encode(msg.Attachments) } // Legacy endpoints for backwards compatibility func handleListAccounts(w http.ResponseWriter, r *http.Request) { accounts := make([]string, 0, len(config.Accounts)) for name := range config.Accounts { accounts = append(accounts, name) } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(accounts) } func handleAccountRoutes(w http.ResponseWriter, r *http.Request) { // Parse path: /accounts/{account}/... parts := strings.Split(strings.Trim(r.URL.Path, "/"), "/") if len(parts) < 2 { http.Error(w, "Invalid path", http.StatusBadRequest) return } accountName := parts[1] acc, ok := config.Accounts[accountName] if !ok { http.Error(w, "Account not found", http.StatusNotFound) return } if len(parts) == 2 { // /accounts/{account} - account info w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(map[string]interface{}{ "name": accountName, "host": acc.Host, "watch": acc.Watch, }) return } switch parts[2] { case "mailboxes": handleLegacyMailboxes(w, r, accountName) case "messages": handleLegacyMessages(w, r, accountName, parts[3:]) default: http.Error(w, "Not found", http.StatusNotFound) } } func handleLegacyMailboxes(w http.ResponseWriter, r *http.Request, accountName string) { // Get connector c, ok := store.GetConnector(accountName) if !ok { http.Error(w, "Account not found", http.StatusNotFound) return } // Only email connectors support mailboxes emailConn, ok := c.(*EmailConnector) if !ok { http.Error(w, "Not an email account", http.StatusBadRequest) return } client, err := emailConn.connect() if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } defer client.Close() mailboxes, err := client.List("", "*", nil).Collect() if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } result := make([]map[string]interface{}, 0, len(mailboxes)) for _, mbox := range mailboxes { result = append(result, map[string]interface{}{ "name": mbox.Mailbox, "delimiter": string(mbox.Delim), }) } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(result) } func handleLegacyMessages(w http.ResponseWriter, r *http.Request, accountName string, pathParts []string) { // Redirect to unified API if len(pathParts) == 0 { // List messages from this account only c, ok := store.GetConnector(accountName) if !ok { http.Error(w, "Account not found", http.StatusNotFound) return } since := time.Now().Add(-24 * time.Hour) if s := r.URL.Query().Get("since"); s != "" { if d, err := ParseDuration(s); err == nil { since = time.Now().Add(-d) } } msgs, err := c.FetchSince(since) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(msgs) return } // Single message - redirect to unified endpoint uid := pathParts[0] fullID := fmt.Sprintf("%s:%s", accountName, uid) handleMessageRoutes(w, r) _ = fullID // Silence unused variable warning }