message-center/connector_email.go

627 lines
14 KiB
Go

package main
import (
"bytes"
"context"
"encoding/base64"
"fmt"
"log"
"mime"
"net/smtp"
"strings"
"sync"
"time"
"github.com/emersion/go-imap/v2"
"github.com/emersion/go-imap/v2/imapclient"
)
// EmailConnector implements Connector for IMAP email accounts
type EmailConnector struct {
name string
config AccountConfig
smtpConfig SMTPConfig
ctx context.Context
cancel context.CancelFunc
callback func()
mu sync.Mutex
}
// SMTPConfig holds SMTP settings for sending replies
type SMTPConfig struct {
Host string
Port int
Username string
Password string
From string
}
// NewEmailConnector creates a new email connector
func NewEmailConnector(name string, config AccountConfig, smtpConfig SMTPConfig) *EmailConnector {
return &EmailConnector{
name: name,
config: config,
smtpConfig: smtpConfig,
}
}
func (e *EmailConnector) Name() string {
return e.name
}
func (e *EmailConnector) connect() (*imapclient.Client, error) {
addr := fmt.Sprintf("%s:%d", e.config.Host, e.config.Port)
var client *imapclient.Client
var err error
switch e.config.TLS {
case "ssl":
client, err = imapclient.DialTLS(addr, nil)
case "starttls":
client, err = imapclient.DialStartTLS(addr, nil)
default:
client, err = imapclient.DialInsecure(addr, nil)
}
if err != nil {
return nil, fmt.Errorf("dial: %w", err)
}
if err := client.Login(e.config.Username, e.config.Password).Wait(); err != nil {
client.Close()
return nil, fmt.Errorf("login: %w", err)
}
return client, nil
}
func (e *EmailConnector) FetchNew() ([]UnifiedMessage, error) {
client, err := e.connect()
if err != nil {
return nil, err
}
defer client.Close()
// Get messages from watched folders (default INBOX)
folders := e.config.Watch
if len(folders) == 0 {
folders = []string{"INBOX"}
}
var messages []UnifiedMessage
for _, folder := range folders {
msgs, err := e.fetchFromFolder(client, folder, true, time.Time{})
if err != nil {
log.Printf("[%s] Error fetching from %s: %v", e.name, folder, err)
continue
}
messages = append(messages, msgs...)
}
return messages, nil
}
func (e *EmailConnector) FetchSince(since time.Time) ([]UnifiedMessage, error) {
client, err := e.connect()
if err != nil {
return nil, err
}
defer client.Close()
folders := e.config.Watch
if len(folders) == 0 {
folders = []string{"INBOX"}
}
var messages []UnifiedMessage
for _, folder := range folders {
msgs, err := e.fetchFromFolder(client, folder, false, since)
if err != nil {
log.Printf("[%s] Error fetching from %s: %v", e.name, folder, err)
continue
}
messages = append(messages, msgs...)
}
return messages, nil
}
func (e *EmailConnector) fetchFromFolder(client *imapclient.Client, folder string, unseenOnly bool, since time.Time) ([]UnifiedMessage, error) {
_, err := client.Select(folder, nil).Wait()
if err != nil {
return nil, fmt.Errorf("select %s: %w", folder, err)
}
// Build search criteria
criteria := &imap.SearchCriteria{}
if unseenOnly {
criteria.NotFlag = []imap.Flag{imap.FlagSeen}
}
if !since.IsZero() {
criteria.Since = since
}
searchData, err := client.Search(criteria, nil).Wait()
if err != nil {
return nil, fmt.Errorf("search: %w", err)
}
seqNums := searchData.AllSeqNums()
if len(seqNums) == 0 {
return nil, nil
}
var seqSet imap.SeqSet
for _, num := range seqNums {
seqSet.AddNum(num)
}
return e.fetchMessages(client, seqSet, folder)
}
func (e *EmailConnector) fetchMessages(client *imapclient.Client, seqSet imap.SeqSet, folder string) ([]UnifiedMessage, error) {
options := &imap.FetchOptions{
Envelope: true,
Flags: true,
UID: true,
BodyStructure: &imap.FetchItemBodyStructure{},
BodySection: []*imap.FetchItemBodySection{{}},
}
fetchCmd := client.Fetch(seqSet, options)
var messages []UnifiedMessage
for {
msgData := fetchCmd.Next()
if msgData == nil {
break
}
buf, err := msgData.Collect()
if err != nil {
continue
}
msg := e.convertMessage(*buf, folder)
messages = append(messages, msg)
}
if err := fetchCmd.Close(); err != nil {
return nil, err
}
return messages, nil
}
func (e *EmailConnector) convertMessage(buf imapclient.FetchMessageBuffer, folder string) UnifiedMessage {
msg := UnifiedMessage{
Source: e.name,
SourceUID: fmt.Sprintf("%d", buf.UID),
SourceExtra: folder,
Seen: false,
Attachments: []AttachmentMeta{},
}
// Generate unified ID
msg.ID = fmt.Sprintf("%s:%d", e.name, buf.UID)
// Parse envelope
if env := buf.Envelope; env != nil {
msg.Subject = env.Subject
msg.Timestamp = env.Date
if msg.Timestamp.IsZero() {
msg.Timestamp = buf.InternalDate
}
if len(env.From) > 0 {
from := env.From[0]
msg.From = fmt.Sprintf("%s@%s", from.Mailbox, from.Host)
if from.Name != "" {
msg.FromName = from.Name
}
}
if len(env.To) > 0 {
to := env.To[0]
msg.To = fmt.Sprintf("%s@%s", to.Mailbox, to.Host)
}
}
// Check seen flag
for _, f := range buf.Flags {
if f == imap.FlagSeen {
msg.Seen = true
break
}
}
// Parse body
if len(buf.BodySection) > 0 {
raw := buf.BodySection[0].Bytes
parsed := ParseMIMEBody(raw)
msg.Body = parsed.Text
if msg.Body == "" {
msg.Body = stripHTML(parsed.HTML)
}
// Extract attachment metadata
attachments := ExtractAttachments(raw)
for _, att := range attachments {
msg.Attachments = append(msg.Attachments, AttachmentMeta{
Name: att.Filename,
Mime: att.ContentType,
Size: att.Size,
})
}
}
return msg
}
func (e *EmailConnector) FetchOne(sourceID string) (*UnifiedMessage, error) {
client, err := e.connect()
if err != nil {
return nil, err
}
defer client.Close()
// Parse UID from sourceID
var uid uint32
if _, err := fmt.Sscanf(sourceID, "%d", &uid); err != nil {
return nil, fmt.Errorf("invalid source ID: %s", sourceID)
}
// Try each watched folder
folders := e.config.Watch
if len(folders) == 0 {
folders = []string{"INBOX"}
}
for _, folder := range folders {
_, err := client.Select(folder, nil).Wait()
if err != nil {
continue
}
uidSet := imap.UIDSetNum(imap.UID(uid))
options := &imap.FetchOptions{
Envelope: true,
Flags: true,
UID: true,
BodyStructure: &imap.FetchItemBodyStructure{},
BodySection: []*imap.FetchItemBodySection{{}},
}
fetchCmd := client.Fetch(uidSet, options)
msgData := fetchCmd.Next()
if msgData != nil {
buf, err := msgData.Collect()
fetchCmd.Close()
if err == nil {
msg := e.convertMessage(*buf, folder)
return &msg, nil
}
}
fetchCmd.Close()
}
return nil, fmt.Errorf("message not found: %s", sourceID)
}
func (e *EmailConnector) Archive(sourceID string) error {
return e.moveMessage(sourceID, "Archive")
}
func (e *EmailConnector) Delete(sourceID string) error {
client, err := e.connect()
if err != nil {
return err
}
defer client.Close()
var uid uint32
if _, err := fmt.Sscanf(sourceID, "%d", &uid); err != nil {
return fmt.Errorf("invalid source ID: %s", sourceID)
}
folders := e.config.Watch
if len(folders) == 0 {
folders = []string{"INBOX"}
}
for _, folder := range folders {
_, err := client.Select(folder, nil).Wait()
if err != nil {
continue
}
uidSet := imap.UIDSetNum(imap.UID(uid))
// Mark deleted
storeCmd := client.Store(uidSet, &imap.StoreFlags{
Op: imap.StoreFlagsAdd,
Silent: true,
Flags: []imap.Flag{imap.FlagDeleted},
}, nil)
storeCmd.Close()
// Expunge
expungeCmd := client.Expunge()
expungeCmd.Close()
return nil
}
return fmt.Errorf("message not found: %s", sourceID)
}
func (e *EmailConnector) moveMessage(sourceID, destFolder string) error {
client, err := e.connect()
if err != nil {
return err
}
defer client.Close()
var uid uint32
if _, err := fmt.Sscanf(sourceID, "%d", &uid); err != nil {
return fmt.Errorf("invalid source ID: %s", sourceID)
}
folders := e.config.Watch
if len(folders) == 0 {
folders = []string{"INBOX"}
}
for _, folder := range folders {
_, err := client.Select(folder, nil).Wait()
if err != nil {
continue
}
uidSet := imap.UIDSetNum(imap.UID(uid))
moveCmd := client.Move(uidSet, destFolder)
if _, err := moveCmd.Wait(); err != nil {
// Try creating folder
if strings.Contains(err.Error(), "TRYCREATE") || strings.Contains(err.Error(), "no such mailbox") {
client.Create(destFolder, nil).Wait()
moveCmd2 := client.Move(uidSet, destFolder)
if _, err2 := moveCmd2.Wait(); err2 != nil {
return err2
}
} else {
continue
}
}
return nil
}
return fmt.Errorf("message not found: %s", sourceID)
}
func (e *EmailConnector) Reply(sourceID string, body string, attachments []string) error {
// Get original message to extract reply-to address
msg, err := e.FetchOne(sourceID)
if err != nil {
return err
}
// Build email
to := msg.From
subject := msg.Subject
if !strings.HasPrefix(strings.ToLower(subject), "re:") {
subject = "Re: " + subject
}
// Simple SMTP send
smtpAddr := fmt.Sprintf("%s:%d", e.smtpConfig.Host, e.smtpConfig.Port)
var msgBuf bytes.Buffer
msgBuf.WriteString(fmt.Sprintf("To: %s\r\n", to))
msgBuf.WriteString(fmt.Sprintf("From: %s\r\n", e.smtpConfig.From))
msgBuf.WriteString(fmt.Sprintf("Subject: %s\r\n", mime.QEncoding.Encode("utf-8", subject)))
msgBuf.WriteString("MIME-Version: 1.0\r\n")
msgBuf.WriteString("Content-Type: text/plain; charset=utf-8\r\n")
msgBuf.WriteString("\r\n")
msgBuf.WriteString(body)
auth := smtp.PlainAuth("", e.smtpConfig.Username, e.smtpConfig.Password, e.smtpConfig.Host)
return smtp.SendMail(smtpAddr, auth, e.smtpConfig.From, []string{to}, msgBuf.Bytes())
}
func (e *EmailConnector) MarkSeen(sourceID string) error {
client, err := e.connect()
if err != nil {
return err
}
defer client.Close()
var uid uint32
if _, err := fmt.Sscanf(sourceID, "%d", &uid); err != nil {
return fmt.Errorf("invalid source ID: %s", sourceID)
}
folders := e.config.Watch
if len(folders) == 0 {
folders = []string{"INBOX"}
}
for _, folder := range folders {
_, err := client.Select(folder, nil).Wait()
if err != nil {
continue
}
uidSet := imap.UIDSetNum(imap.UID(uid))
storeCmd := client.Store(uidSet, &imap.StoreFlags{
Op: imap.StoreFlagsAdd,
Silent: true,
Flags: []imap.Flag{imap.FlagSeen},
}, nil)
storeCmd.Close()
return nil
}
return fmt.Errorf("message not found: %s", sourceID)
}
func (e *EmailConnector) GetAttachment(sourceID string, filename string) ([]byte, error) {
client, err := e.connect()
if err != nil {
return nil, err
}
defer client.Close()
var uid uint32
if _, err := fmt.Sscanf(sourceID, "%d", &uid); err != nil {
return nil, fmt.Errorf("invalid source ID: %s", sourceID)
}
folders := e.config.Watch
if len(folders) == 0 {
folders = []string{"INBOX"}
}
for _, folder := range folders {
_, err := client.Select(folder, nil).Wait()
if err != nil {
continue
}
uidSet := imap.UIDSetNum(imap.UID(uid))
options := &imap.FetchOptions{
UID: true,
BodySection: []*imap.FetchItemBodySection{{}},
}
fetchCmd := client.Fetch(uidSet, options)
msgData := fetchCmd.Next()
if msgData == nil {
fetchCmd.Close()
continue
}
buf, err := msgData.Collect()
fetchCmd.Close()
if err != nil {
continue
}
if len(buf.BodySection) == 0 {
continue
}
raw := buf.BodySection[0].Bytes
attachments := ExtractAttachments(raw)
for _, att := range attachments {
if att.Filename == filename {
decoded, err := base64.StdEncoding.DecodeString(att.Content)
if err != nil {
return nil, err
}
return decoded, nil
}
}
}
return nil, fmt.Errorf("attachment not found: %s", filename)
}
func (e *EmailConnector) Start(callback func()) error {
e.mu.Lock()
defer e.mu.Unlock()
e.ctx, e.cancel = context.WithCancel(context.Background())
e.callback = callback
for _, folder := range e.config.Watch {
go e.watchFolder(folder)
}
return nil
}
func (e *EmailConnector) watchFolder(folder string) {
log.Printf("[%s] Starting IDLE watcher for %s", e.name, folder)
for {
select {
case <-e.ctx.Done():
return
default:
}
err := e.runIDLE(folder)
if err != nil {
log.Printf("[%s] IDLE error on %s: %v, reconnecting in 10s", e.name, folder, err)
select {
case <-e.ctx.Done():
return
case <-time.After(10 * time.Second):
}
}
}
}
func (e *EmailConnector) runIDLE(folder string) error {
client, err := e.connect()
if err != nil {
return fmt.Errorf("connect: %w", err)
}
defer client.Close()
mbox, err := client.Select(folder, nil).Wait()
if err != nil {
return fmt.Errorf("select %s: %w", folder, err)
}
prevCount := mbox.NumMessages
log.Printf("[%s] IDLE connected to %s (%d messages)", e.name, folder, prevCount)
for {
idleCmd, err := client.Idle()
if err != nil {
return fmt.Errorf("idle start: %w", err)
}
idleDone := make(chan error, 1)
go func() {
idleDone <- idleCmd.Wait()
}()
select {
case <-e.ctx.Done():
idleCmd.Close()
return nil
case <-time.After(4 * time.Minute):
idleCmd.Close()
case err := <-idleDone:
if err != nil {
log.Printf("[%s] IDLE ended with error: %v", e.name, err)
}
}
mbox, err = client.Select(folder, nil).Wait()
if err != nil {
return fmt.Errorf("re-select %s: %w", folder, err)
}
if mbox.NumMessages > prevCount {
log.Printf("[%s] New mail in %s: %d -> %d", e.name, folder, prevCount, mbox.NumMessages)
if e.callback != nil {
go e.callback()
}
prevCount = mbox.NumMessages
} else {
prevCount = mbox.NumMessages
}
}
}
func (e *EmailConnector) Stop() {
e.mu.Lock()
defer e.mu.Unlock()
if e.cancel != nil {
e.cancel()
}
}