diff --git a/.gitignore b/.gitignore index 6006ca2..a21a2da 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,4 @@ node_modules/ .DS_Store mail-bridge +message-center diff --git a/config.yaml b/config.yaml index f8e7d6d..357e2aa 100644 --- a/config.yaml +++ b/config.yaml @@ -2,6 +2,8 @@ server: host: 127.0.0.1 port: 8025 +data_dir: ~/.message-center + accounts: proton: host: 127.0.0.1 @@ -11,6 +13,12 @@ accounts: tls: starttls watch: - INBOX + smtp: + host: 127.0.0.1 + port: 1025 + username: tj@jongsma.me + password: ${PROTON_BRIDGE_PASSWORD} + from: tj@jongsma.me johan: host: 127.0.0.1 port: 1143 @@ -19,8 +27,20 @@ accounts: tls: starttls watch: - INBOX + smtp: + host: 127.0.0.1 + port: 1025 + username: johan@jongsma.me + password: ${JOHAN_BRIDGE_PASSWORD} + from: johan@jongsma.me + +connectors: + whatsapp: + enabled: true + name: whatsapp + base_url: http://localhost:8030 webhook: enabled: true - url: http://localhost:18789/hooks/mail - token: "kuma-alert-token-2026" + url: http://localhost:18789/hooks/messages + token: "mc-webhook-token-2026" diff --git a/connector.go b/connector.go new file mode 100644 index 0000000..cf2551d --- /dev/null +++ b/connector.go @@ -0,0 +1,65 @@ +package main + +import ( + "time" +) + +// AttachmentMeta holds attachment metadata (content fetched on demand) +type AttachmentMeta struct { + Name string `json:"name"` + Mime string `json:"mime"` + Size int `json:"size"` +} + +// UnifiedMessage is the canonical message format across all sources +type UnifiedMessage struct { + ID string `json:"id"` // source:unique_id + Source string `json:"source"` // proton, whatsapp, etc. + From string `json:"from"` + FromName string `json:"from_name,omitempty"` + To string `json:"to"` + Timestamp time.Time `json:"timestamp"` + Subject string `json:"subject,omitempty"` + Body string `json:"body"` + Attachments []AttachmentMeta `json:"attachments"` + Seen bool `json:"seen"` + // Internal fields (not exposed in JSON) + SourceUID string `json:"-"` // Original UID/ID from source + SourceExtra any `json:"-"` // Source-specific data (e.g., folder for email) +} + +// Connector defines the interface for message sources +type Connector interface { + // Name returns the connector identifier (e.g., "proton", "whatsapp") + Name() string + + // FetchNew returns unseen/new messages + FetchNew() ([]UnifiedMessage, error) + + // FetchSince returns messages since the given time + FetchSince(since time.Time) ([]UnifiedMessage, error) + + // FetchOne returns a single message by its source-specific ID + FetchOne(sourceID string) (*UnifiedMessage, error) + + // Archive marks a message as archived + Archive(sourceID string) error + + // Delete permanently removes a message + Delete(sourceID string) error + + // Reply sends a reply to a message + Reply(sourceID string, body string, attachments []string) error + + // MarkSeen marks a message as seen/read + MarkSeen(sourceID string) error + + // GetAttachment retrieves attachment content (base64) + GetAttachment(sourceID string, filename string) ([]byte, error) + + // Start begins watching for new messages (calls callback on new) + Start(callback func()) error + + // Stop stops the connector + Stop() +} diff --git a/connector_email.go b/connector_email.go new file mode 100644 index 0000000..59fb7eb --- /dev/null +++ b/connector_email.go @@ -0,0 +1,626 @@ +package main + +import ( + "bytes" + "context" + "encoding/base64" + "fmt" + "log" + "mime" + "net/smtp" + "strings" + "sync" + "time" + + "github.com/emersion/go-imap/v2" + "github.com/emersion/go-imap/v2/imapclient" +) + +// EmailConnector implements Connector for IMAP email accounts +type EmailConnector struct { + name string + config AccountConfig + smtpConfig SMTPConfig + ctx context.Context + cancel context.CancelFunc + callback func() + mu sync.Mutex +} + +// SMTPConfig holds SMTP settings for sending replies +type SMTPConfig struct { + Host string + Port int + Username string + Password string + From string +} + +// NewEmailConnector creates a new email connector +func NewEmailConnector(name string, config AccountConfig, smtpConfig SMTPConfig) *EmailConnector { + return &EmailConnector{ + name: name, + config: config, + smtpConfig: smtpConfig, + } +} + +func (e *EmailConnector) Name() string { + return e.name +} + +func (e *EmailConnector) connect() (*imapclient.Client, error) { + addr := fmt.Sprintf("%s:%d", e.config.Host, e.config.Port) + + var client *imapclient.Client + var err error + + switch e.config.TLS { + case "ssl": + client, err = imapclient.DialTLS(addr, nil) + case "starttls": + client, err = imapclient.DialStartTLS(addr, nil) + default: + client, err = imapclient.DialInsecure(addr, nil) + } + if err != nil { + return nil, fmt.Errorf("dial: %w", err) + } + + if err := client.Login(e.config.Username, e.config.Password).Wait(); err != nil { + client.Close() + return nil, fmt.Errorf("login: %w", err) + } + + return client, nil +} + +func (e *EmailConnector) FetchNew() ([]UnifiedMessage, error) { + client, err := e.connect() + if err != nil { + return nil, err + } + defer client.Close() + + // Get messages from watched folders (default INBOX) + folders := e.config.Watch + if len(folders) == 0 { + folders = []string{"INBOX"} + } + + var messages []UnifiedMessage + for _, folder := range folders { + msgs, err := e.fetchFromFolder(client, folder, true, time.Time{}) + if err != nil { + log.Printf("[%s] Error fetching from %s: %v", e.name, folder, err) + continue + } + messages = append(messages, msgs...) + } + + return messages, nil +} + +func (e *EmailConnector) FetchSince(since time.Time) ([]UnifiedMessage, error) { + client, err := e.connect() + if err != nil { + return nil, err + } + defer client.Close() + + folders := e.config.Watch + if len(folders) == 0 { + folders = []string{"INBOX"} + } + + var messages []UnifiedMessage + for _, folder := range folders { + msgs, err := e.fetchFromFolder(client, folder, false, since) + if err != nil { + log.Printf("[%s] Error fetching from %s: %v", e.name, folder, err) + continue + } + messages = append(messages, msgs...) + } + + return messages, nil +} + +func (e *EmailConnector) fetchFromFolder(client *imapclient.Client, folder string, unseenOnly bool, since time.Time) ([]UnifiedMessage, error) { + _, err := client.Select(folder, nil).Wait() + if err != nil { + return nil, fmt.Errorf("select %s: %w", folder, err) + } + + // Build search criteria + criteria := &imap.SearchCriteria{} + if unseenOnly { + criteria.NotFlag = []imap.Flag{imap.FlagSeen} + } + if !since.IsZero() { + criteria.Since = since + } + + searchData, err := client.Search(criteria, nil).Wait() + if err != nil { + return nil, fmt.Errorf("search: %w", err) + } + + seqNums := searchData.AllSeqNums() + if len(seqNums) == 0 { + return nil, nil + } + + var seqSet imap.SeqSet + for _, num := range seqNums { + seqSet.AddNum(num) + } + + return e.fetchMessages(client, seqSet, folder) +} + +func (e *EmailConnector) fetchMessages(client *imapclient.Client, seqSet imap.SeqSet, folder string) ([]UnifiedMessage, error) { + options := &imap.FetchOptions{ + Envelope: true, + Flags: true, + UID: true, + BodyStructure: &imap.FetchItemBodyStructure{}, + BodySection: []*imap.FetchItemBodySection{{}}, + } + + fetchCmd := client.Fetch(seqSet, options) + + var messages []UnifiedMessage + for { + msgData := fetchCmd.Next() + if msgData == nil { + break + } + + buf, err := msgData.Collect() + if err != nil { + continue + } + + msg := e.convertMessage(*buf, folder) + messages = append(messages, msg) + } + + if err := fetchCmd.Close(); err != nil { + return nil, err + } + + return messages, nil +} + +func (e *EmailConnector) convertMessage(buf imapclient.FetchMessageBuffer, folder string) UnifiedMessage { + msg := UnifiedMessage{ + Source: e.name, + SourceUID: fmt.Sprintf("%d", buf.UID), + SourceExtra: folder, + Seen: false, + Attachments: []AttachmentMeta{}, + } + + // Generate unified ID + msg.ID = fmt.Sprintf("%s:%d", e.name, buf.UID) + + // Parse envelope + if env := buf.Envelope; env != nil { + msg.Subject = env.Subject + msg.Timestamp = env.Date + if msg.Timestamp.IsZero() { + msg.Timestamp = buf.InternalDate + } + + if len(env.From) > 0 { + from := env.From[0] + msg.From = fmt.Sprintf("%s@%s", from.Mailbox, from.Host) + if from.Name != "" { + msg.FromName = from.Name + } + } + + if len(env.To) > 0 { + to := env.To[0] + msg.To = fmt.Sprintf("%s@%s", to.Mailbox, to.Host) + } + } + + // Check seen flag + for _, f := range buf.Flags { + if f == imap.FlagSeen { + msg.Seen = true + break + } + } + + // Parse body + if len(buf.BodySection) > 0 { + raw := buf.BodySection[0].Bytes + parsed := ParseMIMEBody(raw) + msg.Body = parsed.Text + if msg.Body == "" { + msg.Body = stripHTML(parsed.HTML) + } + + // Extract attachment metadata + attachments := ExtractAttachments(raw) + for _, att := range attachments { + msg.Attachments = append(msg.Attachments, AttachmentMeta{ + Name: att.Filename, + Mime: att.ContentType, + Size: att.Size, + }) + } + } + + return msg +} + +func (e *EmailConnector) FetchOne(sourceID string) (*UnifiedMessage, error) { + client, err := e.connect() + if err != nil { + return nil, err + } + defer client.Close() + + // Parse UID from sourceID + var uid uint32 + if _, err := fmt.Sscanf(sourceID, "%d", &uid); err != nil { + return nil, fmt.Errorf("invalid source ID: %s", sourceID) + } + + // Try each watched folder + folders := e.config.Watch + if len(folders) == 0 { + folders = []string{"INBOX"} + } + + for _, folder := range folders { + _, err := client.Select(folder, nil).Wait() + if err != nil { + continue + } + + uidSet := imap.UIDSetNum(imap.UID(uid)) + options := &imap.FetchOptions{ + Envelope: true, + Flags: true, + UID: true, + BodyStructure: &imap.FetchItemBodyStructure{}, + BodySection: []*imap.FetchItemBodySection{{}}, + } + + fetchCmd := client.Fetch(uidSet, options) + msgData := fetchCmd.Next() + if msgData != nil { + buf, err := msgData.Collect() + fetchCmd.Close() + if err == nil { + msg := e.convertMessage(*buf, folder) + return &msg, nil + } + } + fetchCmd.Close() + } + + return nil, fmt.Errorf("message not found: %s", sourceID) +} + +func (e *EmailConnector) Archive(sourceID string) error { + return e.moveMessage(sourceID, "Archive") +} + +func (e *EmailConnector) Delete(sourceID string) error { + client, err := e.connect() + if err != nil { + return err + } + defer client.Close() + + var uid uint32 + if _, err := fmt.Sscanf(sourceID, "%d", &uid); err != nil { + return fmt.Errorf("invalid source ID: %s", sourceID) + } + + folders := e.config.Watch + if len(folders) == 0 { + folders = []string{"INBOX"} + } + + for _, folder := range folders { + _, err := client.Select(folder, nil).Wait() + if err != nil { + continue + } + + uidSet := imap.UIDSetNum(imap.UID(uid)) + + // Mark deleted + storeCmd := client.Store(uidSet, &imap.StoreFlags{ + Op: imap.StoreFlagsAdd, + Silent: true, + Flags: []imap.Flag{imap.FlagDeleted}, + }, nil) + storeCmd.Close() + + // Expunge + expungeCmd := client.Expunge() + expungeCmd.Close() + return nil + } + + return fmt.Errorf("message not found: %s", sourceID) +} + +func (e *EmailConnector) moveMessage(sourceID, destFolder string) error { + client, err := e.connect() + if err != nil { + return err + } + defer client.Close() + + var uid uint32 + if _, err := fmt.Sscanf(sourceID, "%d", &uid); err != nil { + return fmt.Errorf("invalid source ID: %s", sourceID) + } + + folders := e.config.Watch + if len(folders) == 0 { + folders = []string{"INBOX"} + } + + for _, folder := range folders { + _, err := client.Select(folder, nil).Wait() + if err != nil { + continue + } + + uidSet := imap.UIDSetNum(imap.UID(uid)) + moveCmd := client.Move(uidSet, destFolder) + if _, err := moveCmd.Wait(); err != nil { + // Try creating folder + if strings.Contains(err.Error(), "TRYCREATE") || strings.Contains(err.Error(), "no such mailbox") { + client.Create(destFolder, nil).Wait() + moveCmd2 := client.Move(uidSet, destFolder) + if _, err2 := moveCmd2.Wait(); err2 != nil { + return err2 + } + } else { + continue + } + } + return nil + } + + return fmt.Errorf("message not found: %s", sourceID) +} + +func (e *EmailConnector) Reply(sourceID string, body string, attachments []string) error { + // Get original message to extract reply-to address + msg, err := e.FetchOne(sourceID) + if err != nil { + return err + } + + // Build email + to := msg.From + subject := msg.Subject + if !strings.HasPrefix(strings.ToLower(subject), "re:") { + subject = "Re: " + subject + } + + // Simple SMTP send + smtpAddr := fmt.Sprintf("%s:%d", e.smtpConfig.Host, e.smtpConfig.Port) + + var msgBuf bytes.Buffer + msgBuf.WriteString(fmt.Sprintf("To: %s\r\n", to)) + msgBuf.WriteString(fmt.Sprintf("From: %s\r\n", e.smtpConfig.From)) + msgBuf.WriteString(fmt.Sprintf("Subject: %s\r\n", mime.QEncoding.Encode("utf-8", subject))) + msgBuf.WriteString("MIME-Version: 1.0\r\n") + msgBuf.WriteString("Content-Type: text/plain; charset=utf-8\r\n") + msgBuf.WriteString("\r\n") + msgBuf.WriteString(body) + + auth := smtp.PlainAuth("", e.smtpConfig.Username, e.smtpConfig.Password, e.smtpConfig.Host) + return smtp.SendMail(smtpAddr, auth, e.smtpConfig.From, []string{to}, msgBuf.Bytes()) +} + +func (e *EmailConnector) MarkSeen(sourceID string) error { + client, err := e.connect() + if err != nil { + return err + } + defer client.Close() + + var uid uint32 + if _, err := fmt.Sscanf(sourceID, "%d", &uid); err != nil { + return fmt.Errorf("invalid source ID: %s", sourceID) + } + + folders := e.config.Watch + if len(folders) == 0 { + folders = []string{"INBOX"} + } + + for _, folder := range folders { + _, err := client.Select(folder, nil).Wait() + if err != nil { + continue + } + + uidSet := imap.UIDSetNum(imap.UID(uid)) + storeCmd := client.Store(uidSet, &imap.StoreFlags{ + Op: imap.StoreFlagsAdd, + Silent: true, + Flags: []imap.Flag{imap.FlagSeen}, + }, nil) + storeCmd.Close() + return nil + } + + return fmt.Errorf("message not found: %s", sourceID) +} + +func (e *EmailConnector) GetAttachment(sourceID string, filename string) ([]byte, error) { + client, err := e.connect() + if err != nil { + return nil, err + } + defer client.Close() + + var uid uint32 + if _, err := fmt.Sscanf(sourceID, "%d", &uid); err != nil { + return nil, fmt.Errorf("invalid source ID: %s", sourceID) + } + + folders := e.config.Watch + if len(folders) == 0 { + folders = []string{"INBOX"} + } + + for _, folder := range folders { + _, err := client.Select(folder, nil).Wait() + if err != nil { + continue + } + + uidSet := imap.UIDSetNum(imap.UID(uid)) + options := &imap.FetchOptions{ + UID: true, + BodySection: []*imap.FetchItemBodySection{{}}, + } + + fetchCmd := client.Fetch(uidSet, options) + msgData := fetchCmd.Next() + if msgData == nil { + fetchCmd.Close() + continue + } + + buf, err := msgData.Collect() + fetchCmd.Close() + if err != nil { + continue + } + + if len(buf.BodySection) == 0 { + continue + } + + raw := buf.BodySection[0].Bytes + attachments := ExtractAttachments(raw) + for _, att := range attachments { + if att.Filename == filename { + decoded, err := base64.StdEncoding.DecodeString(att.Content) + if err != nil { + return nil, err + } + return decoded, nil + } + } + } + + return nil, fmt.Errorf("attachment not found: %s", filename) +} + +func (e *EmailConnector) Start(callback func()) error { + e.mu.Lock() + defer e.mu.Unlock() + + e.ctx, e.cancel = context.WithCancel(context.Background()) + e.callback = callback + + for _, folder := range e.config.Watch { + go e.watchFolder(folder) + } + + return nil +} + +func (e *EmailConnector) watchFolder(folder string) { + log.Printf("[%s] Starting IDLE watcher for %s", e.name, folder) + + for { + select { + case <-e.ctx.Done(): + return + default: + } + + err := e.runIDLE(folder) + if err != nil { + log.Printf("[%s] IDLE error on %s: %v, reconnecting in 10s", e.name, folder, err) + select { + case <-e.ctx.Done(): + return + case <-time.After(10 * time.Second): + } + } + } +} + +func (e *EmailConnector) runIDLE(folder string) error { + client, err := e.connect() + if err != nil { + return fmt.Errorf("connect: %w", err) + } + defer client.Close() + + mbox, err := client.Select(folder, nil).Wait() + if err != nil { + return fmt.Errorf("select %s: %w", folder, err) + } + + prevCount := mbox.NumMessages + log.Printf("[%s] IDLE connected to %s (%d messages)", e.name, folder, prevCount) + + for { + idleCmd, err := client.Idle() + if err != nil { + return fmt.Errorf("idle start: %w", err) + } + + idleDone := make(chan error, 1) + go func() { + idleDone <- idleCmd.Wait() + }() + + select { + case <-e.ctx.Done(): + idleCmd.Close() + return nil + case <-time.After(4 * time.Minute): + idleCmd.Close() + case err := <-idleDone: + if err != nil { + log.Printf("[%s] IDLE ended with error: %v", e.name, err) + } + } + + mbox, err = client.Select(folder, nil).Wait() + if err != nil { + return fmt.Errorf("re-select %s: %w", folder, err) + } + + if mbox.NumMessages > prevCount { + log.Printf("[%s] New mail in %s: %d -> %d", e.name, folder, prevCount, mbox.NumMessages) + if e.callback != nil { + go e.callback() + } + prevCount = mbox.NumMessages + } else { + prevCount = mbox.NumMessages + } + } +} + +func (e *EmailConnector) Stop() { + e.mu.Lock() + defer e.mu.Unlock() + + if e.cancel != nil { + e.cancel() + } +} diff --git a/connector_whatsapp.go b/connector_whatsapp.go new file mode 100644 index 0000000..306ef3f --- /dev/null +++ b/connector_whatsapp.go @@ -0,0 +1,353 @@ +package main + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "log" + "net/http" + "os" + "path/filepath" + "strings" + "sync" + "time" +) + +// WhatsAppConnector wraps the message-bridge HTTP API +type WhatsAppConnector struct { + name string + baseURL string + ctx chan struct{} + callback func() + mu sync.Mutex + lastSeen map[string]bool // Track seen message IDs +} + +// WhatsAppMessage is the format from message-bridge +type WhatsAppMessage struct { + ID string `json:"id"` + Platform string `json:"platform"` + From string `json:"from"` + FromName string `json:"from_name,omitempty"` + To string `json:"to,omitempty"` + Body string `json:"body"` + Timestamp time.Time `json:"timestamp"` + IsGroup bool `json:"is_group"` + GroupName string `json:"group_name,omitempty"` + MediaType string `json:"media_type,omitempty"` + MediaURL string `json:"media_url,omitempty"` + MediaPath string `json:"media_path,omitempty"` + HasMedia bool `json:"has_media"` + Transcription string `json:"transcription,omitempty"` +} + +// NewWhatsAppConnector creates a new WhatsApp connector +func NewWhatsAppConnector(name, baseURL string) *WhatsAppConnector { + return &WhatsAppConnector{ + name: name, + baseURL: baseURL, + lastSeen: make(map[string]bool), + } +} + +func (w *WhatsAppConnector) Name() string { + return w.name +} + +func (w *WhatsAppConnector) fetchMessages() ([]WhatsAppMessage, error) { + resp, err := http.Get(w.baseURL + "/messages") + if err != nil { + return nil, fmt.Errorf("fetch messages: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != 200 { + body, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("fetch messages: status %d: %s", resp.StatusCode, string(body)) + } + + var messages []WhatsAppMessage + if err := json.NewDecoder(resp.Body).Decode(&messages); err != nil { + return nil, fmt.Errorf("decode messages: %w", err) + } + + return messages, nil +} + +func (w *WhatsAppConnector) convertMessage(msg WhatsAppMessage) UnifiedMessage { + um := UnifiedMessage{ + ID: fmt.Sprintf("%s:%s", w.name, msg.ID), + Source: w.name, + SourceUID: msg.ID, + From: msg.From, + FromName: msg.FromName, + Timestamp: msg.Timestamp, + Body: msg.Body, + Seen: false, // WhatsApp doesn't have a seen concept in our bridge + Attachments: []AttachmentMeta{}, + } + + // Handle group vs direct + if msg.IsGroup { + um.To = msg.GroupName + if um.To == "" { + um.To = msg.To + } + } else { + // For direct messages, "to" is the recipient (us) + um.To = msg.To + } + + // Use transcription as body for voice messages if available + if msg.Transcription != "" && msg.Body == "" { + um.Body = msg.Transcription + } + + // Handle media as attachment + if msg.HasMedia && msg.MediaPath != "" { + filename := filepath.Base(msg.MediaPath) + mimeType := "application/octet-stream" + switch msg.MediaType { + case "image": + mimeType = "image/jpeg" + case "video": + mimeType = "video/mp4" + case "voice", "audio": + mimeType = "audio/ogg" + case "document": + mimeType = "application/pdf" + } + + // Get file size + var size int + if fi, err := os.Stat(msg.MediaPath); err == nil { + size = int(fi.Size()) + } + + um.Attachments = append(um.Attachments, AttachmentMeta{ + Name: filename, + Mime: mimeType, + Size: size, + }) + + // Store media path for later retrieval + um.SourceExtra = msg.MediaPath + } + + return um +} + +func (w *WhatsAppConnector) FetchNew() ([]UnifiedMessage, error) { + messages, err := w.fetchMessages() + if err != nil { + return nil, err + } + + var result []UnifiedMessage + for _, msg := range messages { + // WhatsApp bridge doesn't track seen status, so return all recent + // Filter to last 24 hours for "new" + if time.Since(msg.Timestamp) < 24*time.Hour { + result = append(result, w.convertMessage(msg)) + } + } + + return result, nil +} + +func (w *WhatsAppConnector) FetchSince(since time.Time) ([]UnifiedMessage, error) { + messages, err := w.fetchMessages() + if err != nil { + return nil, err + } + + var result []UnifiedMessage + for _, msg := range messages { + if msg.Timestamp.After(since) || msg.Timestamp.Equal(since) { + result = append(result, w.convertMessage(msg)) + } + } + + return result, nil +} + +func (w *WhatsAppConnector) FetchOne(sourceID string) (*UnifiedMessage, error) { + messages, err := w.fetchMessages() + if err != nil { + return nil, err + } + + for _, msg := range messages { + if msg.ID == sourceID { + um := w.convertMessage(msg) + return &um, nil + } + } + + return nil, fmt.Errorf("message not found: %s", sourceID) +} + +func (w *WhatsAppConnector) Archive(sourceID string) error { + // WhatsApp doesn't support archiving individual messages + // Mark it as seen in our local tracking + w.mu.Lock() + w.lastSeen[sourceID] = true + w.mu.Unlock() + return nil +} + +func (w *WhatsAppConnector) Delete(sourceID string) error { + // WhatsApp doesn't support deleting messages via API + return fmt.Errorf("WhatsApp does not support message deletion") +} + +func (w *WhatsAppConnector) Reply(sourceID string, body string, attachments []string) error { + // Get original message to find the chat + msg, err := w.FetchOne(sourceID) + if err != nil { + return err + } + + // Build send request + sendReq := map[string]interface{}{ + "to": msg.From, // Reply to sender + "body": body, + } + + // Handle attachments if any + if len(attachments) > 0 { + sendReq["media"] = attachments[0] // message-bridge only supports one attachment + } + + reqBody, _ := json.Marshal(sendReq) + resp, err := http.Post(w.baseURL+"/send", "application/json", bytes.NewReader(reqBody)) + if err != nil { + return fmt.Errorf("send reply: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode >= 400 { + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("send reply: status %d: %s", resp.StatusCode, string(body)) + } + + return nil +} + +func (w *WhatsAppConnector) MarkSeen(sourceID string) error { + // Track locally + w.mu.Lock() + w.lastSeen[sourceID] = true + w.mu.Unlock() + return nil +} + +func (w *WhatsAppConnector) GetAttachment(sourceID string, filename string) ([]byte, error) { + // First, find the message to get the media path + messages, err := w.fetchMessages() + if err != nil { + return nil, err + } + + for _, msg := range messages { + if msg.ID == sourceID && msg.MediaPath != "" { + // Check if this matches the requested filename + if strings.HasSuffix(msg.MediaPath, filename) || filepath.Base(msg.MediaPath) == filename { + return os.ReadFile(msg.MediaPath) + } + } + } + + // Try fetching from message-bridge media endpoint + resp, err := http.Get(w.baseURL + "/media/" + filename) + if err != nil { + return nil, fmt.Errorf("fetch media: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != 200 { + return nil, fmt.Errorf("media not found: %s", filename) + } + + return io.ReadAll(resp.Body) +} + +func (w *WhatsAppConnector) Start(callback func()) error { + w.mu.Lock() + defer w.mu.Unlock() + + w.ctx = make(chan struct{}) + w.callback = callback + + // Poll for new messages periodically + go w.pollLoop() + + return nil +} + +func (w *WhatsAppConnector) pollLoop() { + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + + // Initial fetch to populate seen set + msgs, err := w.fetchMessages() + if err == nil { + w.mu.Lock() + for _, msg := range msgs { + w.lastSeen[msg.ID] = true + } + w.mu.Unlock() + } + + log.Printf("[%s] Started polling for new messages", w.name) + + for { + select { + case <-w.ctx: + return + case <-ticker.C: + w.checkForNew() + } + } +} + +func (w *WhatsAppConnector) checkForNew() { + msgs, err := w.fetchMessages() + if err != nil { + log.Printf("[%s] Poll error: %v", w.name, err) + return + } + + hasNew := false + w.mu.Lock() + for _, msg := range msgs { + if !w.lastSeen[msg.ID] { + hasNew = true + w.lastSeen[msg.ID] = true + log.Printf("[%s] New message from %s: %s", w.name, msg.FromName, truncateStr(msg.Body, 50)) + } + } + callback := w.callback + w.mu.Unlock() + + if hasNew && callback != nil { + go callback() + } +} + +func truncateStr(s string, maxLen int) string { + if len(s) <= maxLen { + return s + } + return s[:maxLen] + "..." +} + +func (w *WhatsAppConnector) Stop() { + w.mu.Lock() + defer w.mu.Unlock() + + if w.ctx != nil { + close(w.ctx) + } +} diff --git a/go.mod b/go.mod index 214ffe2..0a3c491 100644 --- a/go.mod +++ b/go.mod @@ -1,4 +1,4 @@ -module mail-bridge +module message-center go 1.22.0 diff --git a/main.go b/main.go index a4d803b..f9c8c75 100644 --- a/main.go +++ b/main.go @@ -2,7 +2,6 @@ package main import ( "bytes" - "context" "encoding/json" "flag" "fmt" @@ -11,22 +10,21 @@ import ( "net/http" "os" "os/signal" - "sort" + "path/filepath" "strings" - "sync" "syscall" "time" - "github.com/emersion/go-imap/v2" - "github.com/emersion/go-imap/v2/imapclient" "gopkg.in/yaml.v3" ) // Config structures type Config struct { - Server ServerConfig `yaml:"server"` - Accounts map[string]AccountConfig `yaml:"accounts"` - Webhook WebhookConfig `yaml:"webhook"` + Server ServerConfig `yaml:"server"` + DataDir string `yaml:"data_dir"` + Accounts map[string]AccountConfig `yaml:"accounts"` + Connectors ConnectorsConfig `yaml:"connectors"` + Webhook WebhookConfig `yaml:"webhook"` } type ServerConfig struct { @@ -39,8 +37,23 @@ type AccountConfig struct { Port int `yaml:"port"` Username string `yaml:"username"` Password string `yaml:"password"` - TLS string `yaml:"tls"` // "ssl", "starttls", "none" + TLS string `yaml:"tls"` Watch []string `yaml:"watch"` + SMTP struct { + Host string `yaml:"host"` + Port int `yaml:"port"` + Username string `yaml:"username"` + Password string `yaml:"password"` + From string `yaml:"from"` + } `yaml:"smtp"` +} + +type ConnectorsConfig struct { + WhatsApp struct { + Enabled bool `yaml:"enabled"` + Name string `yaml:"name"` + BaseURL string `yaml:"base_url"` + } `yaml:"whatsapp"` } type WebhookConfig struct { @@ -49,41 +62,10 @@ type WebhookConfig struct { Token string `yaml:"token"` } -// Message represents an email message -type Message struct { - UID uint32 `json:"uid"` - Folder string `json:"folder"` - MessageID string `json:"message_id,omitempty"` - Date time.Time `json:"date"` - From string `json:"from"` - To []string `json:"to,omitempty"` - Subject string `json:"subject"` - BodyPreview string `json:"body_preview,omitempty"` - BodyText string `json:"body_text,omitempty"` - BodyHTML string `json:"body_html,omitempty"` - Flags []string `json:"flags"` - HasAttach bool `json:"has_attachments"` - AttachNames []string `json:"attachment_names,omitempty"` -} - -// WebhookPayload for new mail events -// Fields are flattened for simpler template access ({{body.from}} vs {{body.message.from}}) -type WebhookPayload struct { - Event string `json:"event"` - Account string `json:"account"` - Folder string `json:"folder"` - // Flattened message fields for easier template access - UID uint32 `json:"uid"` - From string `json:"from"` - Subject string `json:"subject"` - // Full message object still available for detailed access - Message *Message `json:"message,omitempty"` -} - // Global state var ( - config Config - idlesMu sync.Mutex + config Config + store *MessageStore ) func main() { @@ -102,21 +84,79 @@ func main() { log.Fatalf("Failed to parse config: %v", err) } - log.Printf("Loaded %d accounts", len(config.Accounts)) + // Default data directory + if config.DataDir == "" { + home, _ := os.UserHomeDir() + config.DataDir = filepath.Join(home, ".message-center") + } + os.MkdirAll(config.DataDir, 0755) - // Start IDLE watchers for each account - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + log.Printf("Message Center starting...") + log.Printf("Data directory: %s", config.DataDir) + // Initialize store + store = NewMessageStore(config.DataDir) + + // Register email connectors for name, acc := range config.Accounts { - if len(acc.Watch) > 0 { - go startIDLE(ctx, name, acc) + smtpConfig := SMTPConfig{ + Host: acc.SMTP.Host, + Port: acc.SMTP.Port, + Username: acc.SMTP.Username, + Password: acc.SMTP.Password, + From: acc.SMTP.From, } + // Default SMTP to same credentials if not specified + if smtpConfig.Host == "" { + smtpConfig.Host = acc.Host + smtpConfig.Port = 1025 // Default Proton Bridge SMTP + smtpConfig.Username = acc.Username + smtpConfig.Password = acc.Password + smtpConfig.From = acc.Username + } + + connector := NewEmailConnector(name, acc, smtpConfig) + store.RegisterConnector(connector) + log.Printf("Registered email connector: %s", name) + } + + // Register WhatsApp connector + if config.Connectors.WhatsApp.Enabled { + name := config.Connectors.WhatsApp.Name + if name == "" { + name = "whatsapp" + } + baseURL := config.Connectors.WhatsApp.BaseURL + if baseURL == "" { + baseURL = "http://localhost:8030" + } + + connector := NewWhatsAppConnector(name, baseURL) + store.RegisterConnector(connector) + log.Printf("Registered WhatsApp connector: %s -> %s", name, baseURL) + } + + // Start all connectors with webhook callback + webhookCallback := func() { + if config.Webhook.Enabled { + sendWebhook() + } + } + if err := store.StartAll(webhookCallback); err != nil { + log.Fatalf("Failed to start connectors: %v", err) } // HTTP server mux := http.NewServeMux() + + // Health check mux.HandleFunc("/health", handleHealth) + + // Unified message endpoints + mux.HandleFunc("/messages", handleMessages) + mux.HandleFunc("/messages/", handleMessageRoutes) + + // Legacy account-based endpoints (for backwards compatibility) mux.HandleFunc("/accounts", handleListAccounts) mux.HandleFunc("/accounts/", handleAccountRoutes) @@ -129,11 +169,11 @@ func main() { signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) <-sigCh log.Println("Shutting down...") - cancel() - server.Shutdown(context.Background()) + store.StopAll() + server.Shutdown(nil) }() - log.Printf("Starting server on %s", addr) + log.Printf("Server listening on %s", addr) if err := server.ListenAndServe(); err != http.ErrServerClosed { log.Fatalf("Server error: %v", err) } @@ -147,151 +187,14 @@ func logMiddleware(next http.Handler) http.Handler { }) } -// IDLE watcher -func startIDLE(ctx context.Context, accountName string, acc AccountConfig) { - for _, folder := range acc.Watch { - go watchFolder(ctx, accountName, acc, folder) - } -} - -func watchFolder(ctx context.Context, accountName string, acc AccountConfig, folder string) { - log.Printf("[%s] Starting IDLE watcher for %s", accountName, folder) - - for { - select { - case <-ctx.Done(): - return - default: - } - - err := runIDLE(ctx, accountName, acc, folder) - if err != nil { - log.Printf("[%s] IDLE error on %s: %v, reconnecting in 10s", accountName, folder, err) - select { - case <-ctx.Done(): - return - case <-time.After(10 * time.Second): - } - } - } -} - -func runIDLE(ctx context.Context, accountName string, acc AccountConfig, folder string) error { - client, err := connect(acc) - if err != nil { - return fmt.Errorf("connect: %w", err) - } - defer client.Close() - - // Select folder and set up unilateral data handler - mbox, err := client.Select(folder, nil).Wait() - if err != nil { - return fmt.Errorf("select %s: %w", folder, err) - } - - prevCount := mbox.NumMessages - log.Printf("[%s] IDLE connected to %s (%d messages)", accountName, folder, prevCount) - - // Run IDLE with periodic refresh - for { - idleCmd, err := client.Idle() - if err != nil { - return fmt.Errorf("idle start: %w", err) - } - - // Use a goroutine to wait for IDLE to complete or get interrupted - idleDone := make(chan error, 1) - go func() { - idleDone <- idleCmd.Wait() - }() - - // Wait for either: context cancel, timeout, or IDLE response - select { - case <-ctx.Done(): - idleCmd.Close() - return nil - case <-time.After(4 * time.Minute): - // Periodic refresh - break IDLE to check for changes - idleCmd.Close() - case err := <-idleDone: - // IDLE ended (server sent something) - if err != nil { - log.Printf("[%s] IDLE ended with error: %v", accountName, err) - } - } - - // Re-select folder to get fresh message count (cached Mailbox() may be stale) - mbox, err = client.Select(folder, nil).Wait() - if err != nil { - log.Printf("[%s] Failed to re-select %s: %v", accountName, folder, err) - return fmt.Errorf("re-select %s: %w", folder, err) - } - - // Check for new messages - if mbox.NumMessages > prevCount { - log.Printf("[%s] New mail in %s: %d -> %d", accountName, folder, prevCount, mbox.NumMessages) - go handleNewMail(accountName, acc, folder) - prevCount = mbox.NumMessages - } else { - prevCount = mbox.NumMessages - } - } -} - -func handleNewMail(accountName string, acc AccountConfig, folder string) { - if !config.Webhook.Enabled { +func sendWebhook() { + if !config.Webhook.Enabled || config.Webhook.URL == "" { return } - // Get the newest message - client, err := connect(acc) - if err != nil { - log.Printf("[%s] Failed to connect for new mail: %v", accountName, err) - return - } - defer client.Close() - - mbox, err := client.Select(folder, nil).Wait() - if err != nil { - log.Printf("[%s] Failed to select %s: %v", accountName, folder, err) - return - } - - if mbox.NumMessages == 0 { - return - } - - // Fetch the newest message by sequence number - seqSet := imap.SeqSetNum(mbox.NumMessages) - msgs, err := fetchMessages(client, seqSet, folder, true) - if err != nil || len(msgs) == 0 { - log.Printf("[%s] Failed to fetch new message: %v", accountName, err) - return - } - - msg := msgs[0] - log.Printf("[%s] Webhook: from=%q subject=%q", accountName, msg.From, msg.Subject) - - // Send webhook with flattened fields for easy template access - payload := WebhookPayload{ - Event: "new_mail", - Account: accountName, - Folder: folder, - UID: msg.UID, - From: msg.From, - Subject: msg.Subject, - Message: msg, - } - - sendWebhook(payload) -} - -func sendWebhook(payload WebhookPayload) { - data, err := json.Marshal(payload) - if err != nil { - log.Printf("Webhook marshal error: %v", err) - return - } + // Simple webhook: just notify that there are new messages + payload := map[string]string{"event": "new"} + data, _ := json.Marshal(payload) req, err := http.NewRequest("POST", config.Webhook.URL, bytes.NewReader(data)) if err != nil { @@ -315,47 +218,334 @@ func sendWebhook(payload WebhookPayload) { body, _ := io.ReadAll(resp.Body) log.Printf("Webhook error %d: %s", resp.StatusCode, string(body)) } else { - log.Printf("Webhook sent: %s %s", payload.Event, payload.Account) + log.Printf("Webhook sent: new") } } -// IMAP connection -func connect(acc AccountConfig) (*imapclient.Client, error) { - addr := fmt.Sprintf("%s:%d", acc.Host, acc.Port) - - var client *imapclient.Client - var err error - - switch acc.TLS { - case "ssl": - client, err = imapclient.DialTLS(addr, nil) - case "starttls": - client, err = imapclient.DialStartTLS(addr, nil) - default: - client, err = imapclient.DialInsecure(addr, nil) - } - if err != nil { - return nil, fmt.Errorf("dial: %w", err) - } - - if err := client.Login(acc.Username, acc.Password).Wait(); err != nil { - client.Close() - return nil, fmt.Errorf("login: %w", err) - } - - return client, nil -} - // HTTP handlers + func handleHealth(w http.ResponseWriter, r *http.Request) { w.Write([]byte("ok")) } +// GET /messages?since=24h - list messages +// GET /messages/new - unseen messages +// POST /messages/ack - acknowledge messages (advance cursor) +func handleMessages(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + + // Handle /messages/new and /messages/ack via the path + if strings.HasSuffix(r.URL.Path, "/new") { + handleMessagesNew(w, r) + return + } + if strings.HasSuffix(r.URL.Path, "/ack") { + handleMessagesAck(w, r) + return + } + + if r.Method != "GET" { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + // Parse since parameter + sinceStr := r.URL.Query().Get("since") + if sinceStr == "" { + sinceStr = "24h" // Default to last 24 hours + } + + duration, err := ParseDuration(sinceStr) + if err != nil { + http.Error(w, fmt.Sprintf("Invalid since parameter: %v", err), http.StatusBadRequest) + return + } + + messages, err := store.FetchSince(duration) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + json.NewEncoder(w).Encode(messages) +} + +func handleMessagesNew(w http.ResponseWriter, r *http.Request) { + if r.Method != "GET" { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + messages, err := store.FetchNew() + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + json.NewEncoder(w).Encode(messages) +} + +func handleMessagesAck(w http.ResponseWriter, r *http.Request) { + if r.Method != "POST" { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + var req struct { + Consumer string `json:"consumer"` + Timestamp time.Time `json:"timestamp"` + } + + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + if req.Consumer == "" { + http.Error(w, "consumer is required", http.StatusBadRequest) + return + } + + if req.Timestamp.IsZero() { + req.Timestamp = time.Now() + } + + if err := store.Ack(req.Consumer, req.Timestamp); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + json.NewEncoder(w).Encode(map[string]interface{}{ + "status": "ok", + "consumer": req.Consumer, + "timestamp": req.Timestamp, + }) +} + +// Handle /messages/{id} and /messages/{id}/action +func handleMessageRoutes(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + + // Parse path: /messages/{id}[/action] + path := strings.TrimPrefix(r.URL.Path, "/messages/") + if path == "" || path == "new" || path == "ack" { + handleMessages(w, r) + return + } + + parts := strings.SplitN(path, "/", 2) + messageID := parts[0] + + // Reconstruct full message ID if it doesn't contain ':' + // This handles URL-encoded colons + messageID = strings.ReplaceAll(messageID, "%3A", ":") + + if len(parts) == 1 { + // GET /messages/{id} + handleGetMessage(w, r, messageID) + return + } + + action := parts[1] + switch action { + case "archive": + handleArchive(w, r, messageID) + case "delete": + handleDelete(w, r, messageID) + case "reply": + handleReply(w, r, messageID) + case "to-docs": + handleToDocs(w, r, messageID) + case "attachments": + handleAttachments(w, r, messageID) + default: + http.Error(w, "Unknown action", http.StatusNotFound) + } +} + +func handleGetMessage(w http.ResponseWriter, r *http.Request, id string) { + if r.Method != "GET" { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + msg, err := store.FetchOne(id) + if err != nil { + http.Error(w, err.Error(), http.StatusNotFound) + return + } + + json.NewEncoder(w).Encode(msg) +} + +func handleArchive(w http.ResponseWriter, r *http.Request, id string) { + if r.Method != "POST" { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + if err := store.Archive(id); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + json.NewEncoder(w).Encode(map[string]string{"status": "archived"}) +} + +func handleDelete(w http.ResponseWriter, r *http.Request, id string) { + if r.Method != "POST" { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + if err := store.Delete(id); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + json.NewEncoder(w).Encode(map[string]string{"status": "deleted"}) +} + +func handleReply(w http.ResponseWriter, r *http.Request, id string) { + if r.Method != "POST" { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + var req struct { + Body string `json:"body"` + Attachments []string `json:"attachments"` + } + + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + if req.Body == "" { + http.Error(w, "body is required", http.StatusBadRequest) + return + } + + if err := store.Reply(id, req.Body, req.Attachments); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + json.NewEncoder(w).Encode(map[string]string{"status": "sent"}) +} + +func handleToDocs(w http.ResponseWriter, r *http.Request, id string) { + if r.Method != "POST" { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + var req struct { + Attachments []string `json:"attachments"` // Optional: specific attachments, empty = all + } + + json.NewDecoder(r.Body).Decode(&req) // Ignore error, req is optional + + // Get the message + msg, err := store.FetchOne(id) + if err != nil { + http.Error(w, err.Error(), http.StatusNotFound) + return + } + + if len(msg.Attachments) == 0 { + http.Error(w, "Message has no attachments", http.StatusBadRequest) + return + } + + // Create inbox directory + home, _ := os.UserHomeDir() + inboxDir := filepath.Join(home, "documents", "inbox") + os.MkdirAll(inboxDir, 0755) + + // Download and save each attachment + var saved []string + var errors []string + + wantedSet := make(map[string]bool) + for _, name := range req.Attachments { + wantedSet[name] = true + } + + for _, att := range msg.Attachments { + // Skip if not in requested list (when list is specified) + if len(req.Attachments) > 0 && !wantedSet[att.Name] { + continue + } + + content, err := store.GetAttachment(id, att.Name) + if err != nil { + errors = append(errors, fmt.Sprintf("%s: %v", att.Name, err)) + continue + } + + destPath := filepath.Join(inboxDir, att.Name) + if err := os.WriteFile(destPath, content, 0644); err != nil { + errors = append(errors, fmt.Sprintf("%s: %v", att.Name, err)) + continue + } + + saved = append(saved, destPath) + } + + result := map[string]interface{}{ + "saved": saved, + "errors": errors, + } + + if len(errors) > 0 && len(saved) == 0 { + w.WriteHeader(http.StatusInternalServerError) + } + + json.NewEncoder(w).Encode(result) +} + +func handleAttachments(w http.ResponseWriter, r *http.Request, id string) { + if r.Method != "GET" { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + // Check for specific attachment name in query + filename := r.URL.Query().Get("name") + if filename != "" { + content, err := store.GetAttachment(id, filename) + if err != nil { + http.Error(w, err.Error(), http.StatusNotFound) + return + } + + // Return raw content + w.Header().Set("Content-Type", "application/octet-stream") + w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%q", filename)) + w.Write(content) + return + } + + // Return attachment list + msg, err := store.FetchOne(id) + if err != nil { + http.Error(w, err.Error(), http.StatusNotFound) + return + } + + json.NewEncoder(w).Encode(msg.Attachments) +} + +// Legacy endpoints for backwards compatibility + func handleListAccounts(w http.ResponseWriter, r *http.Request) { accounts := make([]string, 0, len(config.Accounts)) for name := range config.Accounts { accounts = append(accounts, name) } + w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(accounts) } @@ -376,6 +566,7 @@ func handleAccountRoutes(w http.ResponseWriter, r *http.Request) { if len(parts) == 2 { // /accounts/{account} - account info + w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(map[string]interface{}{ "name": accountName, "host": acc.Host, @@ -386,649 +577,85 @@ func handleAccountRoutes(w http.ResponseWriter, r *http.Request) { switch parts[2] { case "mailboxes": - handleMailboxes(w, r, accountName, acc) + handleLegacyMailboxes(w, r, accountName) case "messages": - handleMessages(w, r, accountName, acc, parts[3:]) + handleLegacyMessages(w, r, accountName, parts[3:]) default: http.Error(w, "Not found", http.StatusNotFound) } } -func handleMailboxes(w http.ResponseWriter, r *http.Request, accountName string, acc AccountConfig) { - client, err := connect(acc) +func handleLegacyMailboxes(w http.ResponseWriter, r *http.Request, accountName string) { + // Get connector + c, ok := store.GetConnector(accountName) + if !ok { + http.Error(w, "Account not found", http.StatusNotFound) + return + } + + // Only email connectors support mailboxes + emailConn, ok := c.(*EmailConnector) + if !ok { + http.Error(w, "Not an email account", http.StatusBadRequest) + return + } + + client, err := emailConn.connect() if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } defer client.Close() - switch r.Method { - case "GET": - // List mailboxes - mailboxes, err := client.List("", "*", nil).Collect() + mailboxes, err := client.List("", "*", nil).Collect() + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + result := make([]map[string]interface{}, 0, len(mailboxes)) + for _, mbox := range mailboxes { + result = append(result, map[string]interface{}{ + "name": mbox.Mailbox, + "delimiter": string(mbox.Delim), + }) + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(result) +} + +func handleLegacyMessages(w http.ResponseWriter, r *http.Request, accountName string, pathParts []string) { + // Redirect to unified API + if len(pathParts) == 0 { + // List messages from this account only + c, ok := store.GetConnector(accountName) + if !ok { + http.Error(w, "Account not found", http.StatusNotFound) + return + } + + since := time.Now().Add(-24 * time.Hour) + if s := r.URL.Query().Get("since"); s != "" { + if d, err := ParseDuration(s); err == nil { + since = time.Now().Add(-d) + } + } + + msgs, err := c.FetchSince(since) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } - result := make([]map[string]interface{}, 0, len(mailboxes)) - for _, mbox := range mailboxes { - result = append(result, map[string]interface{}{ - "name": mbox.Mailbox, - "delimiter": string(mbox.Delim), - }) - } - w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode(result) - - case "POST": - // Create mailbox - var req struct { - Name string `json:"name"` - } - if err := json.NewDecoder(r.Body).Decode(&req); err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - - if req.Name == "" { - http.Error(w, "name is required", http.StatusBadRequest) - return - } - - if err := client.Create(req.Name, nil).Wait(); err != nil { - http.Error(w, fmt.Sprintf("Failed to create mailbox: %v", err), http.StatusInternalServerError) - return - } - - w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode(map[string]string{"status": "created", "name": req.Name}) - - case "DELETE": - // Delete mailbox - name := r.URL.Query().Get("name") - if name == "" { - http.Error(w, "name query param is required", http.StatusBadRequest) - return - } - - if err := client.Delete(name).Wait(); err != nil { - http.Error(w, fmt.Sprintf("Failed to delete mailbox: %v", err), http.StatusInternalServerError) - return - } - - w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode(map[string]string{"status": "deleted", "name": name}) - - default: - http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + json.NewEncoder(w).Encode(msgs) + return } -} - -func handleMessages(w http.ResponseWriter, r *http.Request, accountName string, acc AccountConfig, pathParts []string) { - folder := r.URL.Query().Get("folder") - if folder == "" { - folder = "INBOX" - } - - client, err := connect(acc) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - defer client.Close() - - // Select folder - mbox, err := client.Select(folder, nil).Wait() - if err != nil { - http.Error(w, fmt.Sprintf("Failed to select folder: %v", err), http.StatusInternalServerError) - return - } - - if len(pathParts) == 0 { - // List messages - handleListMessages(w, r, client, mbox, folder) - return - } - - // Single message operations - var uid uint32 - if _, err := fmt.Sscanf(pathParts[0], "%d", &uid); err != nil { - http.Error(w, "Invalid UID", http.StatusBadRequest) - return - } - - // Check for /messages/{uid}/attachments - if len(pathParts) > 1 && pathParts[1] == "attachments" { - handleGetAttachments(w, r, client, folder, uid) - return - } - - // Check for /messages/{uid}/ingest - if len(pathParts) > 1 && pathParts[1] == "ingest" { - handleIngestAttachments(w, r, client, folder, uid) - return - } - - switch r.Method { - case "GET": - handleGetMessage(w, r, client, folder, uid) - case "PATCH": - handleUpdateMessage(w, r, client, folder, uid) - case "DELETE": - handleDeleteMessage(w, r, client, folder, uid) - default: - http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) - } -} - -func handleListMessages(w http.ResponseWriter, r *http.Request, client *imapclient.Client, mbox *imap.SelectData, folder string) { - limit := 50 - if l := r.URL.Query().Get("limit"); l != "" { - fmt.Sscanf(l, "%d", &limit) - if limit > 2000 { - limit = 2000 - } - } - - if mbox.NumMessages == 0 { - w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode([]Message{}) - return - } - - // Build search criteria - support year filter for bulk operations - searchCriteria := &imap.SearchCriteria{} - if year := r.URL.Query().Get("year"); year != "" { - var y int - fmt.Sscanf(year, "%d", &y) - if y >= 2000 && y <= 2100 { - searchCriteria.Since = time.Date(y, 1, 1, 0, 0, 0, 0, time.UTC) - searchCriteria.Before = time.Date(y+1, 1, 1, 0, 0, 0, 0, time.UTC) - } - } - - searchCmd := client.Search(searchCriteria, nil) - searchData, err := searchCmd.Wait() - if err != nil { - http.Error(w, fmt.Sprintf("Search failed: %v", err), http.StatusInternalServerError) - return - } - - if len(searchData.AllSeqNums()) == 0 { - w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode([]Message{}) - return - } - - // Convert to SeqSet - seqNums := searchData.AllSeqNums() - var seqSet imap.SeqSet - for _, num := range seqNums { - seqSet.AddNum(num) - } - - msgs, err := fetchMessages(client, seqSet, folder, false) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - // Sort by date descending (newest first) - sort.Slice(msgs, func(i, j int) bool { - return msgs[i].Date.After(msgs[j].Date) - }) - - // Apply limit after sort - if len(msgs) > limit { - msgs = msgs[:limit] - } - - w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode(msgs) -} - -func fetchMessages(client *imapclient.Client, seqSet imap.SeqSet, folder string, withBody bool) ([]*Message, error) { - options := &imap.FetchOptions{ - Envelope: true, - Flags: true, - UID: true, - // BodyStructure disabled - causes parser errors on malformed MIME - // BodyStructure: &imap.FetchItemBodyStructure{}, - } - - if withBody { - options.BodySection = []*imap.FetchItemBodySection{{}} - } - - fetchCmd := client.Fetch(seqSet, options) - - var messages []*Message - for { - msgData := fetchCmd.Next() - if msgData == nil { - break - } - - buf, err := msgData.Collect() - if err != nil { - continue - } - - m := &Message{ - UID: uint32(buf.UID), - Folder: folder, - Flags: make([]string, 0), - Date: buf.InternalDate, - } - - if env := buf.Envelope; env != nil { - m.Subject = env.Subject - m.MessageID = env.MessageID - if !env.Date.IsZero() { - m.Date = env.Date - } - - if len(env.From) > 0 { - from := env.From[0] - if from.Name != "" { - m.From = fmt.Sprintf("%s <%s@%s>", from.Name, from.Mailbox, from.Host) - } else { - m.From = fmt.Sprintf("%s@%s", from.Mailbox, from.Host) - } - } - - for _, to := range env.To { - m.To = append(m.To, fmt.Sprintf("%s@%s", to.Mailbox, to.Host)) - } - } - - for _, f := range buf.Flags { - m.Flags = append(m.Flags, string(f)) - } - - // Check for attachments - if buf.BodyStructure != nil { - m.HasAttach, m.AttachNames = checkAttachments(buf.BodyStructure) - } - - // Get body if requested - if withBody && len(buf.BodySection) > 0 { - raw := buf.BodySection[0].Bytes - parsed := ParseMIMEBody(raw) - m.BodyText = parsed.Text - m.BodyHTML = parsed.HTML - m.BodyPreview = truncate(m.BodyText, 500) - } - - messages = append(messages, m) - } - - if err := fetchCmd.Close(); err != nil { - return nil, err - } - - return messages, nil -} - -func checkAttachments(bs imap.BodyStructure) (bool, []string) { - var names []string - - switch s := bs.(type) { - case *imap.BodyStructureSinglePart: - disp := s.Disposition() - if disp != nil && strings.EqualFold(disp.Value, "attachment") { - name := disp.Params["filename"] - if name == "" { - name = disp.Params["name"] - } - if name == "" { - name = s.Filename() - } - if name != "" { - names = append(names, name) - } - return true, names - } - case *imap.BodyStructureMultiPart: - hasAttach := false - for _, child := range s.Children { - if childHas, childNames := checkAttachments(child); childHas { - hasAttach = true - names = append(names, childNames...) - } - } - return hasAttach, names - } - - return false, nil -} - -func handleGetMessage(w http.ResponseWriter, r *http.Request, client *imapclient.Client, folder string, uid uint32) { - uidSet := imap.UIDSetNum(imap.UID(uid)) - - options := &imap.FetchOptions{ - Envelope: true, - Flags: true, - UID: true, - BodyStructure: &imap.FetchItemBodyStructure{}, - BodySection: []*imap.FetchItemBodySection{{}}, - } - - fetchCmd := client.Fetch(uidSet, options) - - msgData := fetchCmd.Next() - if msgData == nil { - fetchCmd.Close() - http.Error(w, "Message not found", http.StatusNotFound) - return - } - - buf, err := msgData.Collect() - if err != nil { - fetchCmd.Close() - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - m := &Message{ - UID: uint32(buf.UID), - Folder: folder, - Flags: make([]string, 0), - Date: buf.InternalDate, - } - - if env := buf.Envelope; env != nil { - m.Subject = env.Subject - m.MessageID = env.MessageID - if !env.Date.IsZero() { - m.Date = env.Date - } - - if len(env.From) > 0 { - from := env.From[0] - if from.Name != "" { - m.From = fmt.Sprintf("%s <%s@%s>", from.Name, from.Mailbox, from.Host) - } else { - m.From = fmt.Sprintf("%s@%s", from.Mailbox, from.Host) - } - } - } - - for _, f := range buf.Flags { - m.Flags = append(m.Flags, string(f)) - } - - if len(buf.BodySection) > 0 { - raw := buf.BodySection[0].Bytes - parsed := ParseMIMEBody(raw) - m.BodyText = parsed.Text - m.BodyHTML = parsed.HTML - m.BodyPreview = truncate(m.BodyText, 500) - } - - fetchCmd.Close() - - w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode(m) -} - -func truncate(s string, maxLen int) string { - if len(s) <= maxLen { - return s - } - return s[:maxLen] + "..." -} - -func handleUpdateMessage(w http.ResponseWriter, r *http.Request, client *imapclient.Client, folder string, uid uint32) { - var update struct { - Seen *bool `json:"seen"` - Flagged *bool `json:"flagged"` - MoveTo *string `json:"move_to"` - } - - if err := json.NewDecoder(r.Body).Decode(&update); err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - - uidSet := imap.UIDSetNum(imap.UID(uid)) - - if update.Seen != nil { - var op imap.StoreFlagsOp - if *update.Seen { - op = imap.StoreFlagsAdd - } else { - op = imap.StoreFlagsDel - } - storeCmd := client.Store(uidSet, &imap.StoreFlags{ - Op: op, - Silent: true, - Flags: []imap.Flag{imap.FlagSeen}, - }, nil) - storeCmd.Close() - } - - if update.Flagged != nil { - var op imap.StoreFlagsOp - if *update.Flagged { - op = imap.StoreFlagsAdd - } else { - op = imap.StoreFlagsDel - } - storeCmd := client.Store(uidSet, &imap.StoreFlags{ - Op: op, - Silent: true, - Flags: []imap.Flag{imap.FlagFlagged}, - }, nil) - storeCmd.Close() - } - - if update.MoveTo != nil { - // Try move first - moveCmd := client.Move(uidSet, *update.MoveTo) - if _, err := moveCmd.Wait(); err != nil { - // If folder doesn't exist, create it and retry - if strings.Contains(err.Error(), "TRYCREATE") || strings.Contains(err.Error(), "no such mailbox") { - createCmd := client.Create(*update.MoveTo, nil) - if cerr := createCmd.Wait(); cerr != nil { - http.Error(w, fmt.Sprintf("Create folder failed: %v", cerr), http.StatusInternalServerError) - return - } - // Retry move - moveCmd2 := client.Move(uidSet, *update.MoveTo) - if _, err2 := moveCmd2.Wait(); err2 != nil { - http.Error(w, fmt.Sprintf("Move failed after create: %v", err2), http.StatusInternalServerError) - return - } - } else { - http.Error(w, fmt.Sprintf("Move failed: %v", err), http.StatusInternalServerError) - return - } - } - } - - w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode(map[string]string{"status": "updated"}) -} - -func handleDeleteMessage(w http.ResponseWriter, r *http.Request, client *imapclient.Client, folder string, uid uint32) { - uidSet := imap.UIDSetNum(imap.UID(uid)) - - // Mark deleted and expunge - storeCmd := client.Store(uidSet, &imap.StoreFlags{ - Op: imap.StoreFlagsAdd, - Silent: true, - Flags: []imap.Flag{imap.FlagDeleted}, - }, nil) - storeCmd.Close() - - expungeCmd := client.Expunge() - expungeCmd.Close() - - w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode(map[string]string{"status": "deleted"}) -} - -func handleGetAttachments(w http.ResponseWriter, r *http.Request, client *imapclient.Client, folder string, uid uint32) { - uidSet := imap.UIDSetNum(imap.UID(uid)) - - options := &imap.FetchOptions{ - UID: true, - BodySection: []*imap.FetchItemBodySection{{}}, - } - - fetchCmd := client.Fetch(uidSet, options) - - msgData := fetchCmd.Next() - if msgData == nil { - fetchCmd.Close() - http.Error(w, "Message not found", http.StatusNotFound) - return - } - - buf, err := msgData.Collect() - if err != nil { - fetchCmd.Close() - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - fetchCmd.Close() - - if len(buf.BodySection) == 0 { - w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode([]Attachment{}) - return - } - - raw := buf.BodySection[0].Bytes - attachments := ExtractAttachments(raw) - - w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode(attachments) -} - -// IngestRequest specifies which attachments to forward to DocSys -type IngestRequest struct { - Attachments []string `json:"attachments"` // filenames to ingest (empty = all) -} - -// IngestResult reports the outcome for each attachment -type IngestResult struct { - Filename string `json:"filename"` - Status string `json:"status"` // "success" or "error" - Message string `json:"message,omitempty"` -} - -func handleIngestAttachments(w http.ResponseWriter, r *http.Request, client *imapclient.Client, folder string, uid uint32) { - if r.Method != "POST" { - http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) - return - } - - // Parse request - var req IngestRequest - if err := json.NewDecoder(r.Body).Decode(&req); err != nil && err != io.EOF { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - - // Fetch message with body - uidSet := imap.UIDSetNum(imap.UID(uid)) - options := &imap.FetchOptions{ - UID: true, - Envelope: true, - BodySection: []*imap.FetchItemBodySection{{}}, - } - - fetchCmd := client.Fetch(uidSet, options) - msgData := fetchCmd.Next() - if msgData == nil { - fetchCmd.Close() - http.Error(w, "Message not found", http.StatusNotFound) - return - } - - buf, err := msgData.Collect() - if err != nil { - fetchCmd.Close() - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - fetchCmd.Close() - - if len(buf.BodySection) == 0 { - http.Error(w, "No message body", http.StatusInternalServerError) - return - } - - // Get email metadata for DocSys - var from, subject string - if env := buf.Envelope; env != nil { - subject = env.Subject - if len(env.From) > 0 { - f := env.From[0] - if f.Name != "" { - from = fmt.Sprintf("%s <%s@%s>", f.Name, f.Mailbox, f.Host) - } else { - from = fmt.Sprintf("%s@%s", f.Mailbox, f.Host) - } - } - } - - // Extract attachments - raw := buf.BodySection[0].Bytes - attachments := ExtractAttachments(raw) - - if len(attachments) == 0 { - w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode([]IngestResult{}) - return - } - - // Filter to requested attachments (or all if none specified) - wantedSet := make(map[string]bool) - for _, name := range req.Attachments { - wantedSet[name] = true - } - - var results []IngestResult - for _, att := range attachments { - // Skip if not in requested list (when list is specified) - if len(req.Attachments) > 0 && !wantedSet[att.Filename] { - continue - } - - // POST to DocSys - payload := map[string]string{ - "filename": att.Filename, - "content": att.Content, - "source": "email", - "subject": subject, - "from": from, - } - - payloadBytes, _ := json.Marshal(payload) - resp, err := http.Post("http://localhost:9201/api/ingest", "application/json", bytes.NewReader(payloadBytes)) - - result := IngestResult{Filename: att.Filename} - if err != nil { - result.Status = "error" - result.Message = err.Error() - } else { - defer resp.Body.Close() - if resp.StatusCode >= 400 { - body, _ := io.ReadAll(resp.Body) - result.Status = "error" - result.Message = string(body) - } else { - result.Status = "success" - } - } - results = append(results, result) - } - - w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode(results) + + // Single message - redirect to unified endpoint + uid := pathParts[0] + fullID := fmt.Sprintf("%s:%s", accountName, uid) + handleMessageRoutes(w, r) + _ = fullID // Silence unused variable warning } diff --git a/store.go b/store.go new file mode 100644 index 0000000..5778072 --- /dev/null +++ b/store.go @@ -0,0 +1,322 @@ +package main + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + "sort" + "sync" + "time" +) + +// MessageStore manages unified messages and cursor tracking +type MessageStore struct { + connectors map[string]Connector + cursors map[string]time.Time // consumer -> high-water mark + mu sync.RWMutex + dataDir string +} + +// CursorFile stores cursor positions persistently +type CursorFile struct { + Cursors map[string]time.Time `json:"cursors"` +} + +// NewMessageStore creates a new message store +func NewMessageStore(dataDir string) *MessageStore { + store := &MessageStore{ + connectors: make(map[string]Connector), + cursors: make(map[string]time.Time), + dataDir: dataDir, + } + store.loadCursors() + return store +} + +func (s *MessageStore) loadCursors() { + path := filepath.Join(s.dataDir, "cursors.json") + data, err := os.ReadFile(path) + if err != nil { + return // File doesn't exist yet + } + + var cf CursorFile + if err := json.Unmarshal(data, &cf); err != nil { + return + } + + s.cursors = cf.Cursors +} + +func (s *MessageStore) saveCursors() error { + if s.dataDir == "" { + return nil + } + + path := filepath.Join(s.dataDir, "cursors.json") + cf := CursorFile{Cursors: s.cursors} + + data, err := json.MarshalIndent(cf, "", " ") + if err != nil { + return err + } + + return os.WriteFile(path, data, 0644) +} + +// RegisterConnector adds a connector to the store +func (s *MessageStore) RegisterConnector(c Connector) { + s.mu.Lock() + defer s.mu.Unlock() + s.connectors[c.Name()] = c +} + +// GetConnector returns a connector by name +func (s *MessageStore) GetConnector(name string) (Connector, bool) { + s.mu.RLock() + defer s.mu.RUnlock() + c, ok := s.connectors[name] + return c, ok +} + +// FetchNew returns unseen messages from all connectors +func (s *MessageStore) FetchNew() ([]UnifiedMessage, error) { + s.mu.RLock() + connectors := make([]Connector, 0, len(s.connectors)) + for _, c := range s.connectors { + connectors = append(connectors, c) + } + s.mu.RUnlock() + + var allMessages []UnifiedMessage + for _, c := range connectors { + msgs, err := c.FetchNew() + if err != nil { + // Log but continue with other connectors + continue + } + allMessages = append(allMessages, msgs...) + } + + // Sort by timestamp descending (newest first) + sort.Slice(allMessages, func(i, j int) bool { + return allMessages[i].Timestamp.After(allMessages[j].Timestamp) + }) + + return allMessages, nil +} + +// FetchSince returns messages since the given duration +func (s *MessageStore) FetchSince(duration time.Duration) ([]UnifiedMessage, error) { + since := time.Now().Add(-duration) + return s.FetchSinceTime(since) +} + +// FetchSinceTime returns messages since the given time +func (s *MessageStore) FetchSinceTime(since time.Time) ([]UnifiedMessage, error) { + s.mu.RLock() + connectors := make([]Connector, 0, len(s.connectors)) + for _, c := range s.connectors { + connectors = append(connectors, c) + } + s.mu.RUnlock() + + var allMessages []UnifiedMessage + for _, c := range connectors { + msgs, err := c.FetchSince(since) + if err != nil { + continue + } + allMessages = append(allMessages, msgs...) + } + + // Sort by timestamp descending + sort.Slice(allMessages, func(i, j int) bool { + return allMessages[i].Timestamp.After(allMessages[j].Timestamp) + }) + + return allMessages, nil +} + +// FetchOne returns a single message by ID (source:uid format) +func (s *MessageStore) FetchOne(id string) (*UnifiedMessage, error) { + source, sourceID, err := parseMessageID(id) + if err != nil { + return nil, err + } + + s.mu.RLock() + c, ok := s.connectors[source] + s.mu.RUnlock() + + if !ok { + return nil, fmt.Errorf("unknown source: %s", source) + } + + return c.FetchOne(sourceID) +} + +// Ack advances the cursor for a consumer +func (s *MessageStore) Ack(consumer string, timestamp time.Time) error { + s.mu.Lock() + defer s.mu.Unlock() + + s.cursors[consumer] = timestamp + return s.saveCursors() +} + +// GetCursor returns the cursor position for a consumer +func (s *MessageStore) GetCursor(consumer string) time.Time { + s.mu.RLock() + defer s.mu.RUnlock() + return s.cursors[consumer] +} + +// Archive archives a message +func (s *MessageStore) Archive(id string) error { + source, sourceID, err := parseMessageID(id) + if err != nil { + return err + } + + s.mu.RLock() + c, ok := s.connectors[source] + s.mu.RUnlock() + + if !ok { + return fmt.Errorf("unknown source: %s", source) + } + + return c.Archive(sourceID) +} + +// Delete deletes a message +func (s *MessageStore) Delete(id string) error { + source, sourceID, err := parseMessageID(id) + if err != nil { + return err + } + + s.mu.RLock() + c, ok := s.connectors[source] + s.mu.RUnlock() + + if !ok { + return fmt.Errorf("unknown source: %s", source) + } + + return c.Delete(sourceID) +} + +// Reply sends a reply to a message +func (s *MessageStore) Reply(id string, body string, attachments []string) error { + source, sourceID, err := parseMessageID(id) + if err != nil { + return err + } + + s.mu.RLock() + c, ok := s.connectors[source] + s.mu.RUnlock() + + if !ok { + return fmt.Errorf("unknown source: %s", source) + } + + return c.Reply(sourceID, body, attachments) +} + +// MarkSeen marks a message as seen +func (s *MessageStore) MarkSeen(id string) error { + source, sourceID, err := parseMessageID(id) + if err != nil { + return err + } + + s.mu.RLock() + c, ok := s.connectors[source] + s.mu.RUnlock() + + if !ok { + return fmt.Errorf("unknown source: %s", source) + } + + return c.MarkSeen(sourceID) +} + +// GetAttachment retrieves attachment content +func (s *MessageStore) GetAttachment(id string, filename string) ([]byte, error) { + source, sourceID, err := parseMessageID(id) + if err != nil { + return nil, err + } + + s.mu.RLock() + c, ok := s.connectors[source] + s.mu.RUnlock() + + if !ok { + return nil, fmt.Errorf("unknown source: %s", source) + } + + return c.GetAttachment(sourceID, filename) +} + +// StartAll starts all connectors with a unified callback +func (s *MessageStore) StartAll(callback func()) error { + s.mu.RLock() + defer s.mu.RUnlock() + + for _, c := range s.connectors { + if err := c.Start(callback); err != nil { + return fmt.Errorf("start %s: %w", c.Name(), err) + } + } + return nil +} + +// StopAll stops all connectors +func (s *MessageStore) StopAll() { + s.mu.RLock() + defer s.mu.RUnlock() + + for _, c := range s.connectors { + c.Stop() + } +} + +// parseMessageID splits "source:uid" into components +func parseMessageID(id string) (source, sourceID string, err error) { + for i := 0; i < len(id); i++ { + if id[i] == ':' { + return id[:i], id[i+1:], nil + } + } + return "", "", fmt.Errorf("invalid message ID format: %s (expected source:uid)", id) +} + +// ParseDuration parses duration strings like "24h", "7d", "1w" +func ParseDuration(s string) (time.Duration, error) { + if len(s) == 0 { + return 0, fmt.Errorf("empty duration") + } + + // Check for special suffixes + last := s[len(s)-1] + switch last { + case 'd': // days + var n int + if _, err := fmt.Sscanf(s, "%dd", &n); err == nil { + return time.Duration(n) * 24 * time.Hour, nil + } + case 'w': // weeks + var n int + if _, err := fmt.Sscanf(s, "%dw", &n); err == nil { + return time.Duration(n) * 7 * 24 * time.Hour, nil + } + } + + // Fall back to standard Go duration parsing + return time.ParseDuration(s) +}