Message Center: unified API with connector architecture
- Connector interface for pluggable message sources
- Email connector (refactored IMAP code)
- WhatsApp connector (HTTP wrapper for message-bridge)
- Unified message store with cursor tracking
- New API endpoints:
- GET /messages/new - unseen messages from all sources
- GET /messages?since=24h - replay window
- GET /messages/{id} - single message
- POST /messages/ack - advance cursor
- POST /messages/{id}/archive
- POST /messages/{id}/delete
- POST /messages/{id}/reply
- POST /messages/{id}/to-docs
- Simplified webhook (just sends {event: 'new'})
- Legacy /accounts/ endpoints preserved for compatibility
This commit is contained in:
parent
1dacfdbaf9
commit
c6c0ccb9bc
|
|
@ -4,3 +4,4 @@ node_modules/
|
|||
.DS_Store
|
||||
|
||||
mail-bridge
|
||||
message-center
|
||||
|
|
|
|||
24
config.yaml
24
config.yaml
|
|
@ -2,6 +2,8 @@ server:
|
|||
host: 127.0.0.1
|
||||
port: 8025
|
||||
|
||||
data_dir: ~/.message-center
|
||||
|
||||
accounts:
|
||||
proton:
|
||||
host: 127.0.0.1
|
||||
|
|
@ -11,6 +13,12 @@ accounts:
|
|||
tls: starttls
|
||||
watch:
|
||||
- INBOX
|
||||
smtp:
|
||||
host: 127.0.0.1
|
||||
port: 1025
|
||||
username: tj@jongsma.me
|
||||
password: ${PROTON_BRIDGE_PASSWORD}
|
||||
from: tj@jongsma.me
|
||||
johan:
|
||||
host: 127.0.0.1
|
||||
port: 1143
|
||||
|
|
@ -19,8 +27,20 @@ accounts:
|
|||
tls: starttls
|
||||
watch:
|
||||
- INBOX
|
||||
smtp:
|
||||
host: 127.0.0.1
|
||||
port: 1025
|
||||
username: johan@jongsma.me
|
||||
password: ${JOHAN_BRIDGE_PASSWORD}
|
||||
from: johan@jongsma.me
|
||||
|
||||
connectors:
|
||||
whatsapp:
|
||||
enabled: true
|
||||
name: whatsapp
|
||||
base_url: http://localhost:8030
|
||||
|
||||
webhook:
|
||||
enabled: true
|
||||
url: http://localhost:18789/hooks/mail
|
||||
token: "kuma-alert-token-2026"
|
||||
url: http://localhost:18789/hooks/messages
|
||||
token: "mc-webhook-token-2026"
|
||||
|
|
|
|||
|
|
@ -0,0 +1,65 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"time"
|
||||
)
|
||||
|
||||
// AttachmentMeta holds attachment metadata (content fetched on demand)
|
||||
type AttachmentMeta struct {
|
||||
Name string `json:"name"`
|
||||
Mime string `json:"mime"`
|
||||
Size int `json:"size"`
|
||||
}
|
||||
|
||||
// UnifiedMessage is the canonical message format across all sources
|
||||
type UnifiedMessage struct {
|
||||
ID string `json:"id"` // source:unique_id
|
||||
Source string `json:"source"` // proton, whatsapp, etc.
|
||||
From string `json:"from"`
|
||||
FromName string `json:"from_name,omitempty"`
|
||||
To string `json:"to"`
|
||||
Timestamp time.Time `json:"timestamp"`
|
||||
Subject string `json:"subject,omitempty"`
|
||||
Body string `json:"body"`
|
||||
Attachments []AttachmentMeta `json:"attachments"`
|
||||
Seen bool `json:"seen"`
|
||||
// Internal fields (not exposed in JSON)
|
||||
SourceUID string `json:"-"` // Original UID/ID from source
|
||||
SourceExtra any `json:"-"` // Source-specific data (e.g., folder for email)
|
||||
}
|
||||
|
||||
// Connector defines the interface for message sources
|
||||
type Connector interface {
|
||||
// Name returns the connector identifier (e.g., "proton", "whatsapp")
|
||||
Name() string
|
||||
|
||||
// FetchNew returns unseen/new messages
|
||||
FetchNew() ([]UnifiedMessage, error)
|
||||
|
||||
// FetchSince returns messages since the given time
|
||||
FetchSince(since time.Time) ([]UnifiedMessage, error)
|
||||
|
||||
// FetchOne returns a single message by its source-specific ID
|
||||
FetchOne(sourceID string) (*UnifiedMessage, error)
|
||||
|
||||
// Archive marks a message as archived
|
||||
Archive(sourceID string) error
|
||||
|
||||
// Delete permanently removes a message
|
||||
Delete(sourceID string) error
|
||||
|
||||
// Reply sends a reply to a message
|
||||
Reply(sourceID string, body string, attachments []string) error
|
||||
|
||||
// MarkSeen marks a message as seen/read
|
||||
MarkSeen(sourceID string) error
|
||||
|
||||
// GetAttachment retrieves attachment content (base64)
|
||||
GetAttachment(sourceID string, filename string) ([]byte, error)
|
||||
|
||||
// Start begins watching for new messages (calls callback on new)
|
||||
Start(callback func()) error
|
||||
|
||||
// Stop stops the connector
|
||||
Stop()
|
||||
}
|
||||
|
|
@ -0,0 +1,626 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"fmt"
|
||||
"log"
|
||||
"mime"
|
||||
"net/smtp"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/emersion/go-imap/v2"
|
||||
"github.com/emersion/go-imap/v2/imapclient"
|
||||
)
|
||||
|
||||
// EmailConnector implements Connector for IMAP email accounts
|
||||
type EmailConnector struct {
|
||||
name string
|
||||
config AccountConfig
|
||||
smtpConfig SMTPConfig
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
callback func()
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
// SMTPConfig holds SMTP settings for sending replies
|
||||
type SMTPConfig struct {
|
||||
Host string
|
||||
Port int
|
||||
Username string
|
||||
Password string
|
||||
From string
|
||||
}
|
||||
|
||||
// NewEmailConnector creates a new email connector
|
||||
func NewEmailConnector(name string, config AccountConfig, smtpConfig SMTPConfig) *EmailConnector {
|
||||
return &EmailConnector{
|
||||
name: name,
|
||||
config: config,
|
||||
smtpConfig: smtpConfig,
|
||||
}
|
||||
}
|
||||
|
||||
func (e *EmailConnector) Name() string {
|
||||
return e.name
|
||||
}
|
||||
|
||||
func (e *EmailConnector) connect() (*imapclient.Client, error) {
|
||||
addr := fmt.Sprintf("%s:%d", e.config.Host, e.config.Port)
|
||||
|
||||
var client *imapclient.Client
|
||||
var err error
|
||||
|
||||
switch e.config.TLS {
|
||||
case "ssl":
|
||||
client, err = imapclient.DialTLS(addr, nil)
|
||||
case "starttls":
|
||||
client, err = imapclient.DialStartTLS(addr, nil)
|
||||
default:
|
||||
client, err = imapclient.DialInsecure(addr, nil)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("dial: %w", err)
|
||||
}
|
||||
|
||||
if err := client.Login(e.config.Username, e.config.Password).Wait(); err != nil {
|
||||
client.Close()
|
||||
return nil, fmt.Errorf("login: %w", err)
|
||||
}
|
||||
|
||||
return client, nil
|
||||
}
|
||||
|
||||
func (e *EmailConnector) FetchNew() ([]UnifiedMessage, error) {
|
||||
client, err := e.connect()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer client.Close()
|
||||
|
||||
// Get messages from watched folders (default INBOX)
|
||||
folders := e.config.Watch
|
||||
if len(folders) == 0 {
|
||||
folders = []string{"INBOX"}
|
||||
}
|
||||
|
||||
var messages []UnifiedMessage
|
||||
for _, folder := range folders {
|
||||
msgs, err := e.fetchFromFolder(client, folder, true, time.Time{})
|
||||
if err != nil {
|
||||
log.Printf("[%s] Error fetching from %s: %v", e.name, folder, err)
|
||||
continue
|
||||
}
|
||||
messages = append(messages, msgs...)
|
||||
}
|
||||
|
||||
return messages, nil
|
||||
}
|
||||
|
||||
func (e *EmailConnector) FetchSince(since time.Time) ([]UnifiedMessage, error) {
|
||||
client, err := e.connect()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer client.Close()
|
||||
|
||||
folders := e.config.Watch
|
||||
if len(folders) == 0 {
|
||||
folders = []string{"INBOX"}
|
||||
}
|
||||
|
||||
var messages []UnifiedMessage
|
||||
for _, folder := range folders {
|
||||
msgs, err := e.fetchFromFolder(client, folder, false, since)
|
||||
if err != nil {
|
||||
log.Printf("[%s] Error fetching from %s: %v", e.name, folder, err)
|
||||
continue
|
||||
}
|
||||
messages = append(messages, msgs...)
|
||||
}
|
||||
|
||||
return messages, nil
|
||||
}
|
||||
|
||||
func (e *EmailConnector) fetchFromFolder(client *imapclient.Client, folder string, unseenOnly bool, since time.Time) ([]UnifiedMessage, error) {
|
||||
_, err := client.Select(folder, nil).Wait()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("select %s: %w", folder, err)
|
||||
}
|
||||
|
||||
// Build search criteria
|
||||
criteria := &imap.SearchCriteria{}
|
||||
if unseenOnly {
|
||||
criteria.NotFlag = []imap.Flag{imap.FlagSeen}
|
||||
}
|
||||
if !since.IsZero() {
|
||||
criteria.Since = since
|
||||
}
|
||||
|
||||
searchData, err := client.Search(criteria, nil).Wait()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("search: %w", err)
|
||||
}
|
||||
|
||||
seqNums := searchData.AllSeqNums()
|
||||
if len(seqNums) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
var seqSet imap.SeqSet
|
||||
for _, num := range seqNums {
|
||||
seqSet.AddNum(num)
|
||||
}
|
||||
|
||||
return e.fetchMessages(client, seqSet, folder)
|
||||
}
|
||||
|
||||
func (e *EmailConnector) fetchMessages(client *imapclient.Client, seqSet imap.SeqSet, folder string) ([]UnifiedMessage, error) {
|
||||
options := &imap.FetchOptions{
|
||||
Envelope: true,
|
||||
Flags: true,
|
||||
UID: true,
|
||||
BodyStructure: &imap.FetchItemBodyStructure{},
|
||||
BodySection: []*imap.FetchItemBodySection{{}},
|
||||
}
|
||||
|
||||
fetchCmd := client.Fetch(seqSet, options)
|
||||
|
||||
var messages []UnifiedMessage
|
||||
for {
|
||||
msgData := fetchCmd.Next()
|
||||
if msgData == nil {
|
||||
break
|
||||
}
|
||||
|
||||
buf, err := msgData.Collect()
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
msg := e.convertMessage(*buf, folder)
|
||||
messages = append(messages, msg)
|
||||
}
|
||||
|
||||
if err := fetchCmd.Close(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return messages, nil
|
||||
}
|
||||
|
||||
func (e *EmailConnector) convertMessage(buf imapclient.FetchMessageBuffer, folder string) UnifiedMessage {
|
||||
msg := UnifiedMessage{
|
||||
Source: e.name,
|
||||
SourceUID: fmt.Sprintf("%d", buf.UID),
|
||||
SourceExtra: folder,
|
||||
Seen: false,
|
||||
Attachments: []AttachmentMeta{},
|
||||
}
|
||||
|
||||
// Generate unified ID
|
||||
msg.ID = fmt.Sprintf("%s:%d", e.name, buf.UID)
|
||||
|
||||
// Parse envelope
|
||||
if env := buf.Envelope; env != nil {
|
||||
msg.Subject = env.Subject
|
||||
msg.Timestamp = env.Date
|
||||
if msg.Timestamp.IsZero() {
|
||||
msg.Timestamp = buf.InternalDate
|
||||
}
|
||||
|
||||
if len(env.From) > 0 {
|
||||
from := env.From[0]
|
||||
msg.From = fmt.Sprintf("%s@%s", from.Mailbox, from.Host)
|
||||
if from.Name != "" {
|
||||
msg.FromName = from.Name
|
||||
}
|
||||
}
|
||||
|
||||
if len(env.To) > 0 {
|
||||
to := env.To[0]
|
||||
msg.To = fmt.Sprintf("%s@%s", to.Mailbox, to.Host)
|
||||
}
|
||||
}
|
||||
|
||||
// Check seen flag
|
||||
for _, f := range buf.Flags {
|
||||
if f == imap.FlagSeen {
|
||||
msg.Seen = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// Parse body
|
||||
if len(buf.BodySection) > 0 {
|
||||
raw := buf.BodySection[0].Bytes
|
||||
parsed := ParseMIMEBody(raw)
|
||||
msg.Body = parsed.Text
|
||||
if msg.Body == "" {
|
||||
msg.Body = stripHTML(parsed.HTML)
|
||||
}
|
||||
|
||||
// Extract attachment metadata
|
||||
attachments := ExtractAttachments(raw)
|
||||
for _, att := range attachments {
|
||||
msg.Attachments = append(msg.Attachments, AttachmentMeta{
|
||||
Name: att.Filename,
|
||||
Mime: att.ContentType,
|
||||
Size: att.Size,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
return msg
|
||||
}
|
||||
|
||||
func (e *EmailConnector) FetchOne(sourceID string) (*UnifiedMessage, error) {
|
||||
client, err := e.connect()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer client.Close()
|
||||
|
||||
// Parse UID from sourceID
|
||||
var uid uint32
|
||||
if _, err := fmt.Sscanf(sourceID, "%d", &uid); err != nil {
|
||||
return nil, fmt.Errorf("invalid source ID: %s", sourceID)
|
||||
}
|
||||
|
||||
// Try each watched folder
|
||||
folders := e.config.Watch
|
||||
if len(folders) == 0 {
|
||||
folders = []string{"INBOX"}
|
||||
}
|
||||
|
||||
for _, folder := range folders {
|
||||
_, err := client.Select(folder, nil).Wait()
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
uidSet := imap.UIDSetNum(imap.UID(uid))
|
||||
options := &imap.FetchOptions{
|
||||
Envelope: true,
|
||||
Flags: true,
|
||||
UID: true,
|
||||
BodyStructure: &imap.FetchItemBodyStructure{},
|
||||
BodySection: []*imap.FetchItemBodySection{{}},
|
||||
}
|
||||
|
||||
fetchCmd := client.Fetch(uidSet, options)
|
||||
msgData := fetchCmd.Next()
|
||||
if msgData != nil {
|
||||
buf, err := msgData.Collect()
|
||||
fetchCmd.Close()
|
||||
if err == nil {
|
||||
msg := e.convertMessage(*buf, folder)
|
||||
return &msg, nil
|
||||
}
|
||||
}
|
||||
fetchCmd.Close()
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("message not found: %s", sourceID)
|
||||
}
|
||||
|
||||
func (e *EmailConnector) Archive(sourceID string) error {
|
||||
return e.moveMessage(sourceID, "Archive")
|
||||
}
|
||||
|
||||
func (e *EmailConnector) Delete(sourceID string) error {
|
||||
client, err := e.connect()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer client.Close()
|
||||
|
||||
var uid uint32
|
||||
if _, err := fmt.Sscanf(sourceID, "%d", &uid); err != nil {
|
||||
return fmt.Errorf("invalid source ID: %s", sourceID)
|
||||
}
|
||||
|
||||
folders := e.config.Watch
|
||||
if len(folders) == 0 {
|
||||
folders = []string{"INBOX"}
|
||||
}
|
||||
|
||||
for _, folder := range folders {
|
||||
_, err := client.Select(folder, nil).Wait()
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
uidSet := imap.UIDSetNum(imap.UID(uid))
|
||||
|
||||
// Mark deleted
|
||||
storeCmd := client.Store(uidSet, &imap.StoreFlags{
|
||||
Op: imap.StoreFlagsAdd,
|
||||
Silent: true,
|
||||
Flags: []imap.Flag{imap.FlagDeleted},
|
||||
}, nil)
|
||||
storeCmd.Close()
|
||||
|
||||
// Expunge
|
||||
expungeCmd := client.Expunge()
|
||||
expungeCmd.Close()
|
||||
return nil
|
||||
}
|
||||
|
||||
return fmt.Errorf("message not found: %s", sourceID)
|
||||
}
|
||||
|
||||
func (e *EmailConnector) moveMessage(sourceID, destFolder string) error {
|
||||
client, err := e.connect()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer client.Close()
|
||||
|
||||
var uid uint32
|
||||
if _, err := fmt.Sscanf(sourceID, "%d", &uid); err != nil {
|
||||
return fmt.Errorf("invalid source ID: %s", sourceID)
|
||||
}
|
||||
|
||||
folders := e.config.Watch
|
||||
if len(folders) == 0 {
|
||||
folders = []string{"INBOX"}
|
||||
}
|
||||
|
||||
for _, folder := range folders {
|
||||
_, err := client.Select(folder, nil).Wait()
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
uidSet := imap.UIDSetNum(imap.UID(uid))
|
||||
moveCmd := client.Move(uidSet, destFolder)
|
||||
if _, err := moveCmd.Wait(); err != nil {
|
||||
// Try creating folder
|
||||
if strings.Contains(err.Error(), "TRYCREATE") || strings.Contains(err.Error(), "no such mailbox") {
|
||||
client.Create(destFolder, nil).Wait()
|
||||
moveCmd2 := client.Move(uidSet, destFolder)
|
||||
if _, err2 := moveCmd2.Wait(); err2 != nil {
|
||||
return err2
|
||||
}
|
||||
} else {
|
||||
continue
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
return fmt.Errorf("message not found: %s", sourceID)
|
||||
}
|
||||
|
||||
func (e *EmailConnector) Reply(sourceID string, body string, attachments []string) error {
|
||||
// Get original message to extract reply-to address
|
||||
msg, err := e.FetchOne(sourceID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Build email
|
||||
to := msg.From
|
||||
subject := msg.Subject
|
||||
if !strings.HasPrefix(strings.ToLower(subject), "re:") {
|
||||
subject = "Re: " + subject
|
||||
}
|
||||
|
||||
// Simple SMTP send
|
||||
smtpAddr := fmt.Sprintf("%s:%d", e.smtpConfig.Host, e.smtpConfig.Port)
|
||||
|
||||
var msgBuf bytes.Buffer
|
||||
msgBuf.WriteString(fmt.Sprintf("To: %s\r\n", to))
|
||||
msgBuf.WriteString(fmt.Sprintf("From: %s\r\n", e.smtpConfig.From))
|
||||
msgBuf.WriteString(fmt.Sprintf("Subject: %s\r\n", mime.QEncoding.Encode("utf-8", subject)))
|
||||
msgBuf.WriteString("MIME-Version: 1.0\r\n")
|
||||
msgBuf.WriteString("Content-Type: text/plain; charset=utf-8\r\n")
|
||||
msgBuf.WriteString("\r\n")
|
||||
msgBuf.WriteString(body)
|
||||
|
||||
auth := smtp.PlainAuth("", e.smtpConfig.Username, e.smtpConfig.Password, e.smtpConfig.Host)
|
||||
return smtp.SendMail(smtpAddr, auth, e.smtpConfig.From, []string{to}, msgBuf.Bytes())
|
||||
}
|
||||
|
||||
func (e *EmailConnector) MarkSeen(sourceID string) error {
|
||||
client, err := e.connect()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer client.Close()
|
||||
|
||||
var uid uint32
|
||||
if _, err := fmt.Sscanf(sourceID, "%d", &uid); err != nil {
|
||||
return fmt.Errorf("invalid source ID: %s", sourceID)
|
||||
}
|
||||
|
||||
folders := e.config.Watch
|
||||
if len(folders) == 0 {
|
||||
folders = []string{"INBOX"}
|
||||
}
|
||||
|
||||
for _, folder := range folders {
|
||||
_, err := client.Select(folder, nil).Wait()
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
uidSet := imap.UIDSetNum(imap.UID(uid))
|
||||
storeCmd := client.Store(uidSet, &imap.StoreFlags{
|
||||
Op: imap.StoreFlagsAdd,
|
||||
Silent: true,
|
||||
Flags: []imap.Flag{imap.FlagSeen},
|
||||
}, nil)
|
||||
storeCmd.Close()
|
||||
return nil
|
||||
}
|
||||
|
||||
return fmt.Errorf("message not found: %s", sourceID)
|
||||
}
|
||||
|
||||
func (e *EmailConnector) GetAttachment(sourceID string, filename string) ([]byte, error) {
|
||||
client, err := e.connect()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer client.Close()
|
||||
|
||||
var uid uint32
|
||||
if _, err := fmt.Sscanf(sourceID, "%d", &uid); err != nil {
|
||||
return nil, fmt.Errorf("invalid source ID: %s", sourceID)
|
||||
}
|
||||
|
||||
folders := e.config.Watch
|
||||
if len(folders) == 0 {
|
||||
folders = []string{"INBOX"}
|
||||
}
|
||||
|
||||
for _, folder := range folders {
|
||||
_, err := client.Select(folder, nil).Wait()
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
uidSet := imap.UIDSetNum(imap.UID(uid))
|
||||
options := &imap.FetchOptions{
|
||||
UID: true,
|
||||
BodySection: []*imap.FetchItemBodySection{{}},
|
||||
}
|
||||
|
||||
fetchCmd := client.Fetch(uidSet, options)
|
||||
msgData := fetchCmd.Next()
|
||||
if msgData == nil {
|
||||
fetchCmd.Close()
|
||||
continue
|
||||
}
|
||||
|
||||
buf, err := msgData.Collect()
|
||||
fetchCmd.Close()
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
if len(buf.BodySection) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
raw := buf.BodySection[0].Bytes
|
||||
attachments := ExtractAttachments(raw)
|
||||
for _, att := range attachments {
|
||||
if att.Filename == filename {
|
||||
decoded, err := base64.StdEncoding.DecodeString(att.Content)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return decoded, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("attachment not found: %s", filename)
|
||||
}
|
||||
|
||||
func (e *EmailConnector) Start(callback func()) error {
|
||||
e.mu.Lock()
|
||||
defer e.mu.Unlock()
|
||||
|
||||
e.ctx, e.cancel = context.WithCancel(context.Background())
|
||||
e.callback = callback
|
||||
|
||||
for _, folder := range e.config.Watch {
|
||||
go e.watchFolder(folder)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *EmailConnector) watchFolder(folder string) {
|
||||
log.Printf("[%s] Starting IDLE watcher for %s", e.name, folder)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-e.ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
err := e.runIDLE(folder)
|
||||
if err != nil {
|
||||
log.Printf("[%s] IDLE error on %s: %v, reconnecting in 10s", e.name, folder, err)
|
||||
select {
|
||||
case <-e.ctx.Done():
|
||||
return
|
||||
case <-time.After(10 * time.Second):
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (e *EmailConnector) runIDLE(folder string) error {
|
||||
client, err := e.connect()
|
||||
if err != nil {
|
||||
return fmt.Errorf("connect: %w", err)
|
||||
}
|
||||
defer client.Close()
|
||||
|
||||
mbox, err := client.Select(folder, nil).Wait()
|
||||
if err != nil {
|
||||
return fmt.Errorf("select %s: %w", folder, err)
|
||||
}
|
||||
|
||||
prevCount := mbox.NumMessages
|
||||
log.Printf("[%s] IDLE connected to %s (%d messages)", e.name, folder, prevCount)
|
||||
|
||||
for {
|
||||
idleCmd, err := client.Idle()
|
||||
if err != nil {
|
||||
return fmt.Errorf("idle start: %w", err)
|
||||
}
|
||||
|
||||
idleDone := make(chan error, 1)
|
||||
go func() {
|
||||
idleDone <- idleCmd.Wait()
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-e.ctx.Done():
|
||||
idleCmd.Close()
|
||||
return nil
|
||||
case <-time.After(4 * time.Minute):
|
||||
idleCmd.Close()
|
||||
case err := <-idleDone:
|
||||
if err != nil {
|
||||
log.Printf("[%s] IDLE ended with error: %v", e.name, err)
|
||||
}
|
||||
}
|
||||
|
||||
mbox, err = client.Select(folder, nil).Wait()
|
||||
if err != nil {
|
||||
return fmt.Errorf("re-select %s: %w", folder, err)
|
||||
}
|
||||
|
||||
if mbox.NumMessages > prevCount {
|
||||
log.Printf("[%s] New mail in %s: %d -> %d", e.name, folder, prevCount, mbox.NumMessages)
|
||||
if e.callback != nil {
|
||||
go e.callback()
|
||||
}
|
||||
prevCount = mbox.NumMessages
|
||||
} else {
|
||||
prevCount = mbox.NumMessages
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (e *EmailConnector) Stop() {
|
||||
e.mu.Lock()
|
||||
defer e.mu.Unlock()
|
||||
|
||||
if e.cancel != nil {
|
||||
e.cancel()
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,353 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// WhatsAppConnector wraps the message-bridge HTTP API
|
||||
type WhatsAppConnector struct {
|
||||
name string
|
||||
baseURL string
|
||||
ctx chan struct{}
|
||||
callback func()
|
||||
mu sync.Mutex
|
||||
lastSeen map[string]bool // Track seen message IDs
|
||||
}
|
||||
|
||||
// WhatsAppMessage is the format from message-bridge
|
||||
type WhatsAppMessage struct {
|
||||
ID string `json:"id"`
|
||||
Platform string `json:"platform"`
|
||||
From string `json:"from"`
|
||||
FromName string `json:"from_name,omitempty"`
|
||||
To string `json:"to,omitempty"`
|
||||
Body string `json:"body"`
|
||||
Timestamp time.Time `json:"timestamp"`
|
||||
IsGroup bool `json:"is_group"`
|
||||
GroupName string `json:"group_name,omitempty"`
|
||||
MediaType string `json:"media_type,omitempty"`
|
||||
MediaURL string `json:"media_url,omitempty"`
|
||||
MediaPath string `json:"media_path,omitempty"`
|
||||
HasMedia bool `json:"has_media"`
|
||||
Transcription string `json:"transcription,omitempty"`
|
||||
}
|
||||
|
||||
// NewWhatsAppConnector creates a new WhatsApp connector
|
||||
func NewWhatsAppConnector(name, baseURL string) *WhatsAppConnector {
|
||||
return &WhatsAppConnector{
|
||||
name: name,
|
||||
baseURL: baseURL,
|
||||
lastSeen: make(map[string]bool),
|
||||
}
|
||||
}
|
||||
|
||||
func (w *WhatsAppConnector) Name() string {
|
||||
return w.name
|
||||
}
|
||||
|
||||
func (w *WhatsAppConnector) fetchMessages() ([]WhatsAppMessage, error) {
|
||||
resp, err := http.Get(w.baseURL + "/messages")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("fetch messages: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != 200 {
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
return nil, fmt.Errorf("fetch messages: status %d: %s", resp.StatusCode, string(body))
|
||||
}
|
||||
|
||||
var messages []WhatsAppMessage
|
||||
if err := json.NewDecoder(resp.Body).Decode(&messages); err != nil {
|
||||
return nil, fmt.Errorf("decode messages: %w", err)
|
||||
}
|
||||
|
||||
return messages, nil
|
||||
}
|
||||
|
||||
func (w *WhatsAppConnector) convertMessage(msg WhatsAppMessage) UnifiedMessage {
|
||||
um := UnifiedMessage{
|
||||
ID: fmt.Sprintf("%s:%s", w.name, msg.ID),
|
||||
Source: w.name,
|
||||
SourceUID: msg.ID,
|
||||
From: msg.From,
|
||||
FromName: msg.FromName,
|
||||
Timestamp: msg.Timestamp,
|
||||
Body: msg.Body,
|
||||
Seen: false, // WhatsApp doesn't have a seen concept in our bridge
|
||||
Attachments: []AttachmentMeta{},
|
||||
}
|
||||
|
||||
// Handle group vs direct
|
||||
if msg.IsGroup {
|
||||
um.To = msg.GroupName
|
||||
if um.To == "" {
|
||||
um.To = msg.To
|
||||
}
|
||||
} else {
|
||||
// For direct messages, "to" is the recipient (us)
|
||||
um.To = msg.To
|
||||
}
|
||||
|
||||
// Use transcription as body for voice messages if available
|
||||
if msg.Transcription != "" && msg.Body == "" {
|
||||
um.Body = msg.Transcription
|
||||
}
|
||||
|
||||
// Handle media as attachment
|
||||
if msg.HasMedia && msg.MediaPath != "" {
|
||||
filename := filepath.Base(msg.MediaPath)
|
||||
mimeType := "application/octet-stream"
|
||||
switch msg.MediaType {
|
||||
case "image":
|
||||
mimeType = "image/jpeg"
|
||||
case "video":
|
||||
mimeType = "video/mp4"
|
||||
case "voice", "audio":
|
||||
mimeType = "audio/ogg"
|
||||
case "document":
|
||||
mimeType = "application/pdf"
|
||||
}
|
||||
|
||||
// Get file size
|
||||
var size int
|
||||
if fi, err := os.Stat(msg.MediaPath); err == nil {
|
||||
size = int(fi.Size())
|
||||
}
|
||||
|
||||
um.Attachments = append(um.Attachments, AttachmentMeta{
|
||||
Name: filename,
|
||||
Mime: mimeType,
|
||||
Size: size,
|
||||
})
|
||||
|
||||
// Store media path for later retrieval
|
||||
um.SourceExtra = msg.MediaPath
|
||||
}
|
||||
|
||||
return um
|
||||
}
|
||||
|
||||
func (w *WhatsAppConnector) FetchNew() ([]UnifiedMessage, error) {
|
||||
messages, err := w.fetchMessages()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var result []UnifiedMessage
|
||||
for _, msg := range messages {
|
||||
// WhatsApp bridge doesn't track seen status, so return all recent
|
||||
// Filter to last 24 hours for "new"
|
||||
if time.Since(msg.Timestamp) < 24*time.Hour {
|
||||
result = append(result, w.convertMessage(msg))
|
||||
}
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (w *WhatsAppConnector) FetchSince(since time.Time) ([]UnifiedMessage, error) {
|
||||
messages, err := w.fetchMessages()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var result []UnifiedMessage
|
||||
for _, msg := range messages {
|
||||
if msg.Timestamp.After(since) || msg.Timestamp.Equal(since) {
|
||||
result = append(result, w.convertMessage(msg))
|
||||
}
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (w *WhatsAppConnector) FetchOne(sourceID string) (*UnifiedMessage, error) {
|
||||
messages, err := w.fetchMessages()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, msg := range messages {
|
||||
if msg.ID == sourceID {
|
||||
um := w.convertMessage(msg)
|
||||
return &um, nil
|
||||
}
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("message not found: %s", sourceID)
|
||||
}
|
||||
|
||||
func (w *WhatsAppConnector) Archive(sourceID string) error {
|
||||
// WhatsApp doesn't support archiving individual messages
|
||||
// Mark it as seen in our local tracking
|
||||
w.mu.Lock()
|
||||
w.lastSeen[sourceID] = true
|
||||
w.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *WhatsAppConnector) Delete(sourceID string) error {
|
||||
// WhatsApp doesn't support deleting messages via API
|
||||
return fmt.Errorf("WhatsApp does not support message deletion")
|
||||
}
|
||||
|
||||
func (w *WhatsAppConnector) Reply(sourceID string, body string, attachments []string) error {
|
||||
// Get original message to find the chat
|
||||
msg, err := w.FetchOne(sourceID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Build send request
|
||||
sendReq := map[string]interface{}{
|
||||
"to": msg.From, // Reply to sender
|
||||
"body": body,
|
||||
}
|
||||
|
||||
// Handle attachments if any
|
||||
if len(attachments) > 0 {
|
||||
sendReq["media"] = attachments[0] // message-bridge only supports one attachment
|
||||
}
|
||||
|
||||
reqBody, _ := json.Marshal(sendReq)
|
||||
resp, err := http.Post(w.baseURL+"/send", "application/json", bytes.NewReader(reqBody))
|
||||
if err != nil {
|
||||
return fmt.Errorf("send reply: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode >= 400 {
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
return fmt.Errorf("send reply: status %d: %s", resp.StatusCode, string(body))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *WhatsAppConnector) MarkSeen(sourceID string) error {
|
||||
// Track locally
|
||||
w.mu.Lock()
|
||||
w.lastSeen[sourceID] = true
|
||||
w.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *WhatsAppConnector) GetAttachment(sourceID string, filename string) ([]byte, error) {
|
||||
// First, find the message to get the media path
|
||||
messages, err := w.fetchMessages()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, msg := range messages {
|
||||
if msg.ID == sourceID && msg.MediaPath != "" {
|
||||
// Check if this matches the requested filename
|
||||
if strings.HasSuffix(msg.MediaPath, filename) || filepath.Base(msg.MediaPath) == filename {
|
||||
return os.ReadFile(msg.MediaPath)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Try fetching from message-bridge media endpoint
|
||||
resp, err := http.Get(w.baseURL + "/media/" + filename)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("fetch media: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != 200 {
|
||||
return nil, fmt.Errorf("media not found: %s", filename)
|
||||
}
|
||||
|
||||
return io.ReadAll(resp.Body)
|
||||
}
|
||||
|
||||
func (w *WhatsAppConnector) Start(callback func()) error {
|
||||
w.mu.Lock()
|
||||
defer w.mu.Unlock()
|
||||
|
||||
w.ctx = make(chan struct{})
|
||||
w.callback = callback
|
||||
|
||||
// Poll for new messages periodically
|
||||
go w.pollLoop()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *WhatsAppConnector) pollLoop() {
|
||||
ticker := time.NewTicker(10 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
// Initial fetch to populate seen set
|
||||
msgs, err := w.fetchMessages()
|
||||
if err == nil {
|
||||
w.mu.Lock()
|
||||
for _, msg := range msgs {
|
||||
w.lastSeen[msg.ID] = true
|
||||
}
|
||||
w.mu.Unlock()
|
||||
}
|
||||
|
||||
log.Printf("[%s] Started polling for new messages", w.name)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-w.ctx:
|
||||
return
|
||||
case <-ticker.C:
|
||||
w.checkForNew()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (w *WhatsAppConnector) checkForNew() {
|
||||
msgs, err := w.fetchMessages()
|
||||
if err != nil {
|
||||
log.Printf("[%s] Poll error: %v", w.name, err)
|
||||
return
|
||||
}
|
||||
|
||||
hasNew := false
|
||||
w.mu.Lock()
|
||||
for _, msg := range msgs {
|
||||
if !w.lastSeen[msg.ID] {
|
||||
hasNew = true
|
||||
w.lastSeen[msg.ID] = true
|
||||
log.Printf("[%s] New message from %s: %s", w.name, msg.FromName, truncateStr(msg.Body, 50))
|
||||
}
|
||||
}
|
||||
callback := w.callback
|
||||
w.mu.Unlock()
|
||||
|
||||
if hasNew && callback != nil {
|
||||
go callback()
|
||||
}
|
||||
}
|
||||
|
||||
func truncateStr(s string, maxLen int) string {
|
||||
if len(s) <= maxLen {
|
||||
return s
|
||||
}
|
||||
return s[:maxLen] + "..."
|
||||
}
|
||||
|
||||
func (w *WhatsAppConnector) Stop() {
|
||||
w.mu.Lock()
|
||||
defer w.mu.Unlock()
|
||||
|
||||
if w.ctx != nil {
|
||||
close(w.ctx)
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,322 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// MessageStore manages unified messages and cursor tracking
|
||||
type MessageStore struct {
|
||||
connectors map[string]Connector
|
||||
cursors map[string]time.Time // consumer -> high-water mark
|
||||
mu sync.RWMutex
|
||||
dataDir string
|
||||
}
|
||||
|
||||
// CursorFile stores cursor positions persistently
|
||||
type CursorFile struct {
|
||||
Cursors map[string]time.Time `json:"cursors"`
|
||||
}
|
||||
|
||||
// NewMessageStore creates a new message store
|
||||
func NewMessageStore(dataDir string) *MessageStore {
|
||||
store := &MessageStore{
|
||||
connectors: make(map[string]Connector),
|
||||
cursors: make(map[string]time.Time),
|
||||
dataDir: dataDir,
|
||||
}
|
||||
store.loadCursors()
|
||||
return store
|
||||
}
|
||||
|
||||
func (s *MessageStore) loadCursors() {
|
||||
path := filepath.Join(s.dataDir, "cursors.json")
|
||||
data, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
return // File doesn't exist yet
|
||||
}
|
||||
|
||||
var cf CursorFile
|
||||
if err := json.Unmarshal(data, &cf); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
s.cursors = cf.Cursors
|
||||
}
|
||||
|
||||
func (s *MessageStore) saveCursors() error {
|
||||
if s.dataDir == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
path := filepath.Join(s.dataDir, "cursors.json")
|
||||
cf := CursorFile{Cursors: s.cursors}
|
||||
|
||||
data, err := json.MarshalIndent(cf, "", " ")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return os.WriteFile(path, data, 0644)
|
||||
}
|
||||
|
||||
// RegisterConnector adds a connector to the store
|
||||
func (s *MessageStore) RegisterConnector(c Connector) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
s.connectors[c.Name()] = c
|
||||
}
|
||||
|
||||
// GetConnector returns a connector by name
|
||||
func (s *MessageStore) GetConnector(name string) (Connector, bool) {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
c, ok := s.connectors[name]
|
||||
return c, ok
|
||||
}
|
||||
|
||||
// FetchNew returns unseen messages from all connectors
|
||||
func (s *MessageStore) FetchNew() ([]UnifiedMessage, error) {
|
||||
s.mu.RLock()
|
||||
connectors := make([]Connector, 0, len(s.connectors))
|
||||
for _, c := range s.connectors {
|
||||
connectors = append(connectors, c)
|
||||
}
|
||||
s.mu.RUnlock()
|
||||
|
||||
var allMessages []UnifiedMessage
|
||||
for _, c := range connectors {
|
||||
msgs, err := c.FetchNew()
|
||||
if err != nil {
|
||||
// Log but continue with other connectors
|
||||
continue
|
||||
}
|
||||
allMessages = append(allMessages, msgs...)
|
||||
}
|
||||
|
||||
// Sort by timestamp descending (newest first)
|
||||
sort.Slice(allMessages, func(i, j int) bool {
|
||||
return allMessages[i].Timestamp.After(allMessages[j].Timestamp)
|
||||
})
|
||||
|
||||
return allMessages, nil
|
||||
}
|
||||
|
||||
// FetchSince returns messages since the given duration
|
||||
func (s *MessageStore) FetchSince(duration time.Duration) ([]UnifiedMessage, error) {
|
||||
since := time.Now().Add(-duration)
|
||||
return s.FetchSinceTime(since)
|
||||
}
|
||||
|
||||
// FetchSinceTime returns messages since the given time
|
||||
func (s *MessageStore) FetchSinceTime(since time.Time) ([]UnifiedMessage, error) {
|
||||
s.mu.RLock()
|
||||
connectors := make([]Connector, 0, len(s.connectors))
|
||||
for _, c := range s.connectors {
|
||||
connectors = append(connectors, c)
|
||||
}
|
||||
s.mu.RUnlock()
|
||||
|
||||
var allMessages []UnifiedMessage
|
||||
for _, c := range connectors {
|
||||
msgs, err := c.FetchSince(since)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
allMessages = append(allMessages, msgs...)
|
||||
}
|
||||
|
||||
// Sort by timestamp descending
|
||||
sort.Slice(allMessages, func(i, j int) bool {
|
||||
return allMessages[i].Timestamp.After(allMessages[j].Timestamp)
|
||||
})
|
||||
|
||||
return allMessages, nil
|
||||
}
|
||||
|
||||
// FetchOne returns a single message by ID (source:uid format)
|
||||
func (s *MessageStore) FetchOne(id string) (*UnifiedMessage, error) {
|
||||
source, sourceID, err := parseMessageID(id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
s.mu.RLock()
|
||||
c, ok := s.connectors[source]
|
||||
s.mu.RUnlock()
|
||||
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("unknown source: %s", source)
|
||||
}
|
||||
|
||||
return c.FetchOne(sourceID)
|
||||
}
|
||||
|
||||
// Ack advances the cursor for a consumer
|
||||
func (s *MessageStore) Ack(consumer string, timestamp time.Time) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
s.cursors[consumer] = timestamp
|
||||
return s.saveCursors()
|
||||
}
|
||||
|
||||
// GetCursor returns the cursor position for a consumer
|
||||
func (s *MessageStore) GetCursor(consumer string) time.Time {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
return s.cursors[consumer]
|
||||
}
|
||||
|
||||
// Archive archives a message
|
||||
func (s *MessageStore) Archive(id string) error {
|
||||
source, sourceID, err := parseMessageID(id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.mu.RLock()
|
||||
c, ok := s.connectors[source]
|
||||
s.mu.RUnlock()
|
||||
|
||||
if !ok {
|
||||
return fmt.Errorf("unknown source: %s", source)
|
||||
}
|
||||
|
||||
return c.Archive(sourceID)
|
||||
}
|
||||
|
||||
// Delete deletes a message
|
||||
func (s *MessageStore) Delete(id string) error {
|
||||
source, sourceID, err := parseMessageID(id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.mu.RLock()
|
||||
c, ok := s.connectors[source]
|
||||
s.mu.RUnlock()
|
||||
|
||||
if !ok {
|
||||
return fmt.Errorf("unknown source: %s", source)
|
||||
}
|
||||
|
||||
return c.Delete(sourceID)
|
||||
}
|
||||
|
||||
// Reply sends a reply to a message
|
||||
func (s *MessageStore) Reply(id string, body string, attachments []string) error {
|
||||
source, sourceID, err := parseMessageID(id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.mu.RLock()
|
||||
c, ok := s.connectors[source]
|
||||
s.mu.RUnlock()
|
||||
|
||||
if !ok {
|
||||
return fmt.Errorf("unknown source: %s", source)
|
||||
}
|
||||
|
||||
return c.Reply(sourceID, body, attachments)
|
||||
}
|
||||
|
||||
// MarkSeen marks a message as seen
|
||||
func (s *MessageStore) MarkSeen(id string) error {
|
||||
source, sourceID, err := parseMessageID(id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.mu.RLock()
|
||||
c, ok := s.connectors[source]
|
||||
s.mu.RUnlock()
|
||||
|
||||
if !ok {
|
||||
return fmt.Errorf("unknown source: %s", source)
|
||||
}
|
||||
|
||||
return c.MarkSeen(sourceID)
|
||||
}
|
||||
|
||||
// GetAttachment retrieves attachment content
|
||||
func (s *MessageStore) GetAttachment(id string, filename string) ([]byte, error) {
|
||||
source, sourceID, err := parseMessageID(id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
s.mu.RLock()
|
||||
c, ok := s.connectors[source]
|
||||
s.mu.RUnlock()
|
||||
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("unknown source: %s", source)
|
||||
}
|
||||
|
||||
return c.GetAttachment(sourceID, filename)
|
||||
}
|
||||
|
||||
// StartAll starts all connectors with a unified callback
|
||||
func (s *MessageStore) StartAll(callback func()) error {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
|
||||
for _, c := range s.connectors {
|
||||
if err := c.Start(callback); err != nil {
|
||||
return fmt.Errorf("start %s: %w", c.Name(), err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// StopAll stops all connectors
|
||||
func (s *MessageStore) StopAll() {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
|
||||
for _, c := range s.connectors {
|
||||
c.Stop()
|
||||
}
|
||||
}
|
||||
|
||||
// parseMessageID splits "source:uid" into components
|
||||
func parseMessageID(id string) (source, sourceID string, err error) {
|
||||
for i := 0; i < len(id); i++ {
|
||||
if id[i] == ':' {
|
||||
return id[:i], id[i+1:], nil
|
||||
}
|
||||
}
|
||||
return "", "", fmt.Errorf("invalid message ID format: %s (expected source:uid)", id)
|
||||
}
|
||||
|
||||
// ParseDuration parses duration strings like "24h", "7d", "1w"
|
||||
func ParseDuration(s string) (time.Duration, error) {
|
||||
if len(s) == 0 {
|
||||
return 0, fmt.Errorf("empty duration")
|
||||
}
|
||||
|
||||
// Check for special suffixes
|
||||
last := s[len(s)-1]
|
||||
switch last {
|
||||
case 'd': // days
|
||||
var n int
|
||||
if _, err := fmt.Sscanf(s, "%dd", &n); err == nil {
|
||||
return time.Duration(n) * 24 * time.Hour, nil
|
||||
}
|
||||
case 'w': // weeks
|
||||
var n int
|
||||
if _, err := fmt.Sscanf(s, "%dw", &n); err == nil {
|
||||
return time.Duration(n) * 7 * 24 * time.Hour, nil
|
||||
}
|
||||
}
|
||||
|
||||
// Fall back to standard Go duration parsing
|
||||
return time.ParseDuration(s)
|
||||
}
|
||||
Loading…
Reference in New Issue