message-center/main.go

1035 lines
25 KiB
Go

package main
import (
"bytes"
"context"
"encoding/json"
"flag"
"fmt"
"io"
"log"
"net/http"
"os"
"os/signal"
"sort"
"strings"
"sync"
"syscall"
"time"
"github.com/emersion/go-imap/v2"
"github.com/emersion/go-imap/v2/imapclient"
"gopkg.in/yaml.v3"
)
// Config structures
type Config struct {
Server ServerConfig `yaml:"server"`
Accounts map[string]AccountConfig `yaml:"accounts"`
Webhook WebhookConfig `yaml:"webhook"`
}
type ServerConfig struct {
Host string `yaml:"host"`
Port int `yaml:"port"`
}
type AccountConfig struct {
Host string `yaml:"host"`
Port int `yaml:"port"`
Username string `yaml:"username"`
Password string `yaml:"password"`
TLS string `yaml:"tls"` // "ssl", "starttls", "none"
Watch []string `yaml:"watch"`
}
type WebhookConfig struct {
Enabled bool `yaml:"enabled"`
URL string `yaml:"url"`
Token string `yaml:"token"`
}
// Message represents an email message
type Message struct {
UID uint32 `json:"uid"`
Folder string `json:"folder"`
MessageID string `json:"message_id,omitempty"`
Date time.Time `json:"date"`
From string `json:"from"`
To []string `json:"to,omitempty"`
Subject string `json:"subject"`
BodyPreview string `json:"body_preview,omitempty"`
BodyText string `json:"body_text,omitempty"`
BodyHTML string `json:"body_html,omitempty"`
Flags []string `json:"flags"`
HasAttach bool `json:"has_attachments"`
AttachNames []string `json:"attachment_names,omitempty"`
}
// WebhookPayload for new mail events
// Fields are flattened for simpler template access ({{body.from}} vs {{body.message.from}})
type WebhookPayload struct {
Event string `json:"event"`
Account string `json:"account"`
Folder string `json:"folder"`
// Flattened message fields for easier template access
UID uint32 `json:"uid"`
From string `json:"from"`
Subject string `json:"subject"`
// Full message object still available for detailed access
Message *Message `json:"message,omitempty"`
}
// Global state
var (
config Config
idlesMu sync.Mutex
)
func main() {
configPath := flag.String("config", "config.yaml", "Path to config file")
flag.Parse()
// Load config
data, err := os.ReadFile(*configPath)
if err != nil {
log.Fatalf("Failed to read config: %v", err)
}
// Expand env vars in config
expanded := os.ExpandEnv(string(data))
if err := yaml.Unmarshal([]byte(expanded), &config); err != nil {
log.Fatalf("Failed to parse config: %v", err)
}
log.Printf("Loaded %d accounts", len(config.Accounts))
// Start IDLE watchers for each account
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
for name, acc := range config.Accounts {
if len(acc.Watch) > 0 {
go startIDLE(ctx, name, acc)
}
}
// HTTP server
mux := http.NewServeMux()
mux.HandleFunc("/health", handleHealth)
mux.HandleFunc("/accounts", handleListAccounts)
mux.HandleFunc("/accounts/", handleAccountRoutes)
addr := fmt.Sprintf("%s:%d", config.Server.Host, config.Server.Port)
server := &http.Server{Addr: addr, Handler: logMiddleware(mux)}
// Graceful shutdown
go func() {
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
<-sigCh
log.Println("Shutting down...")
cancel()
server.Shutdown(context.Background())
}()
log.Printf("Starting server on %s", addr)
if err := server.ListenAndServe(); err != http.ErrServerClosed {
log.Fatalf("Server error: %v", err)
}
}
func logMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
start := time.Now()
next.ServeHTTP(w, r)
log.Printf("%s %s %v", r.Method, r.URL.Path, time.Since(start).Round(time.Millisecond))
})
}
// IDLE watcher
func startIDLE(ctx context.Context, accountName string, acc AccountConfig) {
for _, folder := range acc.Watch {
go watchFolder(ctx, accountName, acc, folder)
}
}
func watchFolder(ctx context.Context, accountName string, acc AccountConfig, folder string) {
log.Printf("[%s] Starting IDLE watcher for %s", accountName, folder)
for {
select {
case <-ctx.Done():
return
default:
}
err := runIDLE(ctx, accountName, acc, folder)
if err != nil {
log.Printf("[%s] IDLE error on %s: %v, reconnecting in 10s", accountName, folder, err)
select {
case <-ctx.Done():
return
case <-time.After(10 * time.Second):
}
}
}
}
func runIDLE(ctx context.Context, accountName string, acc AccountConfig, folder string) error {
client, err := connect(acc)
if err != nil {
return fmt.Errorf("connect: %w", err)
}
defer client.Close()
// Select folder and set up unilateral data handler
mbox, err := client.Select(folder, nil).Wait()
if err != nil {
return fmt.Errorf("select %s: %w", folder, err)
}
prevCount := mbox.NumMessages
log.Printf("[%s] IDLE connected to %s (%d messages)", accountName, folder, prevCount)
// Run IDLE with periodic refresh
for {
idleCmd, err := client.Idle()
if err != nil {
return fmt.Errorf("idle start: %w", err)
}
// Use a goroutine to wait for IDLE to complete or get interrupted
idleDone := make(chan error, 1)
go func() {
idleDone <- idleCmd.Wait()
}()
// Wait for either: context cancel, timeout, or IDLE response
select {
case <-ctx.Done():
idleCmd.Close()
return nil
case <-time.After(4 * time.Minute):
// Periodic refresh - break IDLE to check for changes
idleCmd.Close()
case err := <-idleDone:
// IDLE ended (server sent something)
if err != nil {
log.Printf("[%s] IDLE ended with error: %v", accountName, err)
}
}
// Re-select folder to get fresh message count (cached Mailbox() may be stale)
mbox, err = client.Select(folder, nil).Wait()
if err != nil {
log.Printf("[%s] Failed to re-select %s: %v", accountName, folder, err)
return fmt.Errorf("re-select %s: %w", folder, err)
}
// Check for new messages
if mbox.NumMessages > prevCount {
log.Printf("[%s] New mail in %s: %d -> %d", accountName, folder, prevCount, mbox.NumMessages)
go handleNewMail(accountName, acc, folder)
prevCount = mbox.NumMessages
} else {
prevCount = mbox.NumMessages
}
}
}
func handleNewMail(accountName string, acc AccountConfig, folder string) {
if !config.Webhook.Enabled {
return
}
// Get the newest message
client, err := connect(acc)
if err != nil {
log.Printf("[%s] Failed to connect for new mail: %v", accountName, err)
return
}
defer client.Close()
mbox, err := client.Select(folder, nil).Wait()
if err != nil {
log.Printf("[%s] Failed to select %s: %v", accountName, folder, err)
return
}
if mbox.NumMessages == 0 {
return
}
// Fetch the newest message by sequence number
seqSet := imap.SeqSetNum(mbox.NumMessages)
msgs, err := fetchMessages(client, seqSet, folder, true)
if err != nil || len(msgs) == 0 {
log.Printf("[%s] Failed to fetch new message: %v", accountName, err)
return
}
msg := msgs[0]
log.Printf("[%s] Webhook: from=%q subject=%q", accountName, msg.From, msg.Subject)
// Send webhook with flattened fields for easy template access
payload := WebhookPayload{
Event: "new_mail",
Account: accountName,
Folder: folder,
UID: msg.UID,
From: msg.From,
Subject: msg.Subject,
Message: msg,
}
sendWebhook(payload)
}
func sendWebhook(payload WebhookPayload) {
data, err := json.Marshal(payload)
if err != nil {
log.Printf("Webhook marshal error: %v", err)
return
}
req, err := http.NewRequest("POST", config.Webhook.URL, bytes.NewReader(data))
if err != nil {
log.Printf("Webhook request error: %v", err)
return
}
req.Header.Set("Content-Type", "application/json")
if config.Webhook.Token != "" {
req.Header.Set("Authorization", "Bearer "+config.Webhook.Token)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
log.Printf("Webhook send error: %v", err)
return
}
defer resp.Body.Close()
if resp.StatusCode >= 400 {
body, _ := io.ReadAll(resp.Body)
log.Printf("Webhook error %d: %s", resp.StatusCode, string(body))
} else {
log.Printf("Webhook sent: %s %s", payload.Event, payload.Account)
}
}
// IMAP connection
func connect(acc AccountConfig) (*imapclient.Client, error) {
addr := fmt.Sprintf("%s:%d", acc.Host, acc.Port)
var client *imapclient.Client
var err error
switch acc.TLS {
case "ssl":
client, err = imapclient.DialTLS(addr, nil)
case "starttls":
client, err = imapclient.DialStartTLS(addr, nil)
default:
client, err = imapclient.DialInsecure(addr, nil)
}
if err != nil {
return nil, fmt.Errorf("dial: %w", err)
}
if err := client.Login(acc.Username, acc.Password).Wait(); err != nil {
client.Close()
return nil, fmt.Errorf("login: %w", err)
}
return client, nil
}
// HTTP handlers
func handleHealth(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("ok"))
}
func handleListAccounts(w http.ResponseWriter, r *http.Request) {
accounts := make([]string, 0, len(config.Accounts))
for name := range config.Accounts {
accounts = append(accounts, name)
}
json.NewEncoder(w).Encode(accounts)
}
func handleAccountRoutes(w http.ResponseWriter, r *http.Request) {
// Parse path: /accounts/{account}/...
parts := strings.Split(strings.Trim(r.URL.Path, "/"), "/")
if len(parts) < 2 {
http.Error(w, "Invalid path", http.StatusBadRequest)
return
}
accountName := parts[1]
acc, ok := config.Accounts[accountName]
if !ok {
http.Error(w, "Account not found", http.StatusNotFound)
return
}
if len(parts) == 2 {
// /accounts/{account} - account info
json.NewEncoder(w).Encode(map[string]interface{}{
"name": accountName,
"host": acc.Host,
"watch": acc.Watch,
})
return
}
switch parts[2] {
case "mailboxes":
handleMailboxes(w, r, accountName, acc)
case "messages":
handleMessages(w, r, accountName, acc, parts[3:])
default:
http.Error(w, "Not found", http.StatusNotFound)
}
}
func handleMailboxes(w http.ResponseWriter, r *http.Request, accountName string, acc AccountConfig) {
client, err := connect(acc)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
defer client.Close()
switch r.Method {
case "GET":
// List mailboxes
mailboxes, err := client.List("", "*", nil).Collect()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
result := make([]map[string]interface{}, 0, len(mailboxes))
for _, mbox := range mailboxes {
result = append(result, map[string]interface{}{
"name": mbox.Mailbox,
"delimiter": string(mbox.Delim),
})
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(result)
case "POST":
// Create mailbox
var req struct {
Name string `json:"name"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if req.Name == "" {
http.Error(w, "name is required", http.StatusBadRequest)
return
}
if err := client.Create(req.Name, nil).Wait(); err != nil {
http.Error(w, fmt.Sprintf("Failed to create mailbox: %v", err), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]string{"status": "created", "name": req.Name})
case "DELETE":
// Delete mailbox
name := r.URL.Query().Get("name")
if name == "" {
http.Error(w, "name query param is required", http.StatusBadRequest)
return
}
if err := client.Delete(name).Wait(); err != nil {
http.Error(w, fmt.Sprintf("Failed to delete mailbox: %v", err), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]string{"status": "deleted", "name": name})
default:
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
}
}
func handleMessages(w http.ResponseWriter, r *http.Request, accountName string, acc AccountConfig, pathParts []string) {
folder := r.URL.Query().Get("folder")
if folder == "" {
folder = "INBOX"
}
client, err := connect(acc)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
defer client.Close()
// Select folder
mbox, err := client.Select(folder, nil).Wait()
if err != nil {
http.Error(w, fmt.Sprintf("Failed to select folder: %v", err), http.StatusInternalServerError)
return
}
if len(pathParts) == 0 {
// List messages
handleListMessages(w, r, client, mbox, folder)
return
}
// Single message operations
var uid uint32
if _, err := fmt.Sscanf(pathParts[0], "%d", &uid); err != nil {
http.Error(w, "Invalid UID", http.StatusBadRequest)
return
}
// Check for /messages/{uid}/attachments
if len(pathParts) > 1 && pathParts[1] == "attachments" {
handleGetAttachments(w, r, client, folder, uid)
return
}
// Check for /messages/{uid}/ingest
if len(pathParts) > 1 && pathParts[1] == "ingest" {
handleIngestAttachments(w, r, client, folder, uid)
return
}
switch r.Method {
case "GET":
handleGetMessage(w, r, client, folder, uid)
case "PATCH":
handleUpdateMessage(w, r, client, folder, uid)
case "DELETE":
handleDeleteMessage(w, r, client, folder, uid)
default:
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
}
}
func handleListMessages(w http.ResponseWriter, r *http.Request, client *imapclient.Client, mbox *imap.SelectData, folder string) {
limit := 50
if l := r.URL.Query().Get("limit"); l != "" {
fmt.Sscanf(l, "%d", &limit)
if limit > 2000 {
limit = 2000
}
}
if mbox.NumMessages == 0 {
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode([]Message{})
return
}
// Build search criteria - support year filter for bulk operations
searchCriteria := &imap.SearchCriteria{}
if year := r.URL.Query().Get("year"); year != "" {
var y int
fmt.Sscanf(year, "%d", &y)
if y >= 2000 && y <= 2100 {
searchCriteria.Since = time.Date(y, 1, 1, 0, 0, 0, 0, time.UTC)
searchCriteria.Before = time.Date(y+1, 1, 1, 0, 0, 0, 0, time.UTC)
}
}
searchCmd := client.Search(searchCriteria, nil)
searchData, err := searchCmd.Wait()
if err != nil {
http.Error(w, fmt.Sprintf("Search failed: %v", err), http.StatusInternalServerError)
return
}
if len(searchData.AllSeqNums()) == 0 {
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode([]Message{})
return
}
// Convert to SeqSet
seqNums := searchData.AllSeqNums()
var seqSet imap.SeqSet
for _, num := range seqNums {
seqSet.AddNum(num)
}
msgs, err := fetchMessages(client, seqSet, folder, false)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
// Sort by date descending (newest first)
sort.Slice(msgs, func(i, j int) bool {
return msgs[i].Date.After(msgs[j].Date)
})
// Apply limit after sort
if len(msgs) > limit {
msgs = msgs[:limit]
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(msgs)
}
func fetchMessages(client *imapclient.Client, seqSet imap.SeqSet, folder string, withBody bool) ([]*Message, error) {
options := &imap.FetchOptions{
Envelope: true,
Flags: true,
UID: true,
// BodyStructure disabled - causes parser errors on malformed MIME
// BodyStructure: &imap.FetchItemBodyStructure{},
}
if withBody {
options.BodySection = []*imap.FetchItemBodySection{{}}
}
fetchCmd := client.Fetch(seqSet, options)
var messages []*Message
for {
msgData := fetchCmd.Next()
if msgData == nil {
break
}
buf, err := msgData.Collect()
if err != nil {
continue
}
m := &Message{
UID: uint32(buf.UID),
Folder: folder,
Flags: make([]string, 0),
Date: buf.InternalDate,
}
if env := buf.Envelope; env != nil {
m.Subject = env.Subject
m.MessageID = env.MessageID
if !env.Date.IsZero() {
m.Date = env.Date
}
if len(env.From) > 0 {
from := env.From[0]
if from.Name != "" {
m.From = fmt.Sprintf("%s <%s@%s>", from.Name, from.Mailbox, from.Host)
} else {
m.From = fmt.Sprintf("%s@%s", from.Mailbox, from.Host)
}
}
for _, to := range env.To {
m.To = append(m.To, fmt.Sprintf("%s@%s", to.Mailbox, to.Host))
}
}
for _, f := range buf.Flags {
m.Flags = append(m.Flags, string(f))
}
// Check for attachments
if buf.BodyStructure != nil {
m.HasAttach, m.AttachNames = checkAttachments(buf.BodyStructure)
}
// Get body if requested
if withBody && len(buf.BodySection) > 0 {
raw := buf.BodySection[0].Bytes
parsed := ParseMIMEBody(raw)
m.BodyText = parsed.Text
m.BodyHTML = parsed.HTML
m.BodyPreview = truncate(m.BodyText, 500)
}
messages = append(messages, m)
}
if err := fetchCmd.Close(); err != nil {
return nil, err
}
return messages, nil
}
func checkAttachments(bs imap.BodyStructure) (bool, []string) {
var names []string
switch s := bs.(type) {
case *imap.BodyStructureSinglePart:
disp := s.Disposition()
if disp != nil && strings.EqualFold(disp.Value, "attachment") {
name := disp.Params["filename"]
if name == "" {
name = disp.Params["name"]
}
if name == "" {
name = s.Filename()
}
if name != "" {
names = append(names, name)
}
return true, names
}
case *imap.BodyStructureMultiPart:
hasAttach := false
for _, child := range s.Children {
if childHas, childNames := checkAttachments(child); childHas {
hasAttach = true
names = append(names, childNames...)
}
}
return hasAttach, names
}
return false, nil
}
func handleGetMessage(w http.ResponseWriter, r *http.Request, client *imapclient.Client, folder string, uid uint32) {
uidSet := imap.UIDSetNum(imap.UID(uid))
options := &imap.FetchOptions{
Envelope: true,
Flags: true,
UID: true,
BodyStructure: &imap.FetchItemBodyStructure{},
BodySection: []*imap.FetchItemBodySection{{}},
}
fetchCmd := client.Fetch(uidSet, options)
msgData := fetchCmd.Next()
if msgData == nil {
fetchCmd.Close()
http.Error(w, "Message not found", http.StatusNotFound)
return
}
buf, err := msgData.Collect()
if err != nil {
fetchCmd.Close()
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
m := &Message{
UID: uint32(buf.UID),
Folder: folder,
Flags: make([]string, 0),
Date: buf.InternalDate,
}
if env := buf.Envelope; env != nil {
m.Subject = env.Subject
m.MessageID = env.MessageID
if !env.Date.IsZero() {
m.Date = env.Date
}
if len(env.From) > 0 {
from := env.From[0]
if from.Name != "" {
m.From = fmt.Sprintf("%s <%s@%s>", from.Name, from.Mailbox, from.Host)
} else {
m.From = fmt.Sprintf("%s@%s", from.Mailbox, from.Host)
}
}
}
for _, f := range buf.Flags {
m.Flags = append(m.Flags, string(f))
}
if len(buf.BodySection) > 0 {
raw := buf.BodySection[0].Bytes
parsed := ParseMIMEBody(raw)
m.BodyText = parsed.Text
m.BodyHTML = parsed.HTML
m.BodyPreview = truncate(m.BodyText, 500)
}
fetchCmd.Close()
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(m)
}
func truncate(s string, maxLen int) string {
if len(s) <= maxLen {
return s
}
return s[:maxLen] + "..."
}
func handleUpdateMessage(w http.ResponseWriter, r *http.Request, client *imapclient.Client, folder string, uid uint32) {
var update struct {
Seen *bool `json:"seen"`
Flagged *bool `json:"flagged"`
MoveTo *string `json:"move_to"`
}
if err := json.NewDecoder(r.Body).Decode(&update); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
uidSet := imap.UIDSetNum(imap.UID(uid))
if update.Seen != nil {
var op imap.StoreFlagsOp
if *update.Seen {
op = imap.StoreFlagsAdd
} else {
op = imap.StoreFlagsDel
}
storeCmd := client.Store(uidSet, &imap.StoreFlags{
Op: op,
Silent: true,
Flags: []imap.Flag{imap.FlagSeen},
}, nil)
storeCmd.Close()
}
if update.Flagged != nil {
var op imap.StoreFlagsOp
if *update.Flagged {
op = imap.StoreFlagsAdd
} else {
op = imap.StoreFlagsDel
}
storeCmd := client.Store(uidSet, &imap.StoreFlags{
Op: op,
Silent: true,
Flags: []imap.Flag{imap.FlagFlagged},
}, nil)
storeCmd.Close()
}
if update.MoveTo != nil {
// Try move first
moveCmd := client.Move(uidSet, *update.MoveTo)
if _, err := moveCmd.Wait(); err != nil {
// If folder doesn't exist, create it and retry
if strings.Contains(err.Error(), "TRYCREATE") || strings.Contains(err.Error(), "no such mailbox") {
createCmd := client.Create(*update.MoveTo, nil)
if cerr := createCmd.Wait(); cerr != nil {
http.Error(w, fmt.Sprintf("Create folder failed: %v", cerr), http.StatusInternalServerError)
return
}
// Retry move
moveCmd2 := client.Move(uidSet, *update.MoveTo)
if _, err2 := moveCmd2.Wait(); err2 != nil {
http.Error(w, fmt.Sprintf("Move failed after create: %v", err2), http.StatusInternalServerError)
return
}
} else {
http.Error(w, fmt.Sprintf("Move failed: %v", err), http.StatusInternalServerError)
return
}
}
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]string{"status": "updated"})
}
func handleDeleteMessage(w http.ResponseWriter, r *http.Request, client *imapclient.Client, folder string, uid uint32) {
uidSet := imap.UIDSetNum(imap.UID(uid))
// Mark deleted and expunge
storeCmd := client.Store(uidSet, &imap.StoreFlags{
Op: imap.StoreFlagsAdd,
Silent: true,
Flags: []imap.Flag{imap.FlagDeleted},
}, nil)
storeCmd.Close()
expungeCmd := client.Expunge()
expungeCmd.Close()
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]string{"status": "deleted"})
}
func handleGetAttachments(w http.ResponseWriter, r *http.Request, client *imapclient.Client, folder string, uid uint32) {
uidSet := imap.UIDSetNum(imap.UID(uid))
options := &imap.FetchOptions{
UID: true,
BodySection: []*imap.FetchItemBodySection{{}},
}
fetchCmd := client.Fetch(uidSet, options)
msgData := fetchCmd.Next()
if msgData == nil {
fetchCmd.Close()
http.Error(w, "Message not found", http.StatusNotFound)
return
}
buf, err := msgData.Collect()
if err != nil {
fetchCmd.Close()
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
fetchCmd.Close()
if len(buf.BodySection) == 0 {
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode([]Attachment{})
return
}
raw := buf.BodySection[0].Bytes
attachments := ExtractAttachments(raw)
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(attachments)
}
// IngestRequest specifies which attachments to forward to DocSys
type IngestRequest struct {
Attachments []string `json:"attachments"` // filenames to ingest (empty = all)
}
// IngestResult reports the outcome for each attachment
type IngestResult struct {
Filename string `json:"filename"`
Status string `json:"status"` // "success" or "error"
Message string `json:"message,omitempty"`
}
func handleIngestAttachments(w http.ResponseWriter, r *http.Request, client *imapclient.Client, folder string, uid uint32) {
if r.Method != "POST" {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
// Parse request
var req IngestRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil && err != io.EOF {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
// Fetch message with body
uidSet := imap.UIDSetNum(imap.UID(uid))
options := &imap.FetchOptions{
UID: true,
Envelope: true,
BodySection: []*imap.FetchItemBodySection{{}},
}
fetchCmd := client.Fetch(uidSet, options)
msgData := fetchCmd.Next()
if msgData == nil {
fetchCmd.Close()
http.Error(w, "Message not found", http.StatusNotFound)
return
}
buf, err := msgData.Collect()
if err != nil {
fetchCmd.Close()
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
fetchCmd.Close()
if len(buf.BodySection) == 0 {
http.Error(w, "No message body", http.StatusInternalServerError)
return
}
// Get email metadata for DocSys
var from, subject string
if env := buf.Envelope; env != nil {
subject = env.Subject
if len(env.From) > 0 {
f := env.From[0]
if f.Name != "" {
from = fmt.Sprintf("%s <%s@%s>", f.Name, f.Mailbox, f.Host)
} else {
from = fmt.Sprintf("%s@%s", f.Mailbox, f.Host)
}
}
}
// Extract attachments
raw := buf.BodySection[0].Bytes
attachments := ExtractAttachments(raw)
if len(attachments) == 0 {
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode([]IngestResult{})
return
}
// Filter to requested attachments (or all if none specified)
wantedSet := make(map[string]bool)
for _, name := range req.Attachments {
wantedSet[name] = true
}
var results []IngestResult
for _, att := range attachments {
// Skip if not in requested list (when list is specified)
if len(req.Attachments) > 0 && !wantedSet[att.Filename] {
continue
}
// POST to DocSys
payload := map[string]string{
"filename": att.Filename,
"content": att.Content,
"source": "email",
"subject": subject,
"from": from,
}
payloadBytes, _ := json.Marshal(payload)
resp, err := http.Post("http://localhost:9201/api/ingest", "application/json", bytes.NewReader(payloadBytes))
result := IngestResult{Filename: att.Filename}
if err != nil {
result.Status = "error"
result.Message = err.Error()
} else {
defer resp.Body.Close()
if resp.StatusCode >= 400 {
body, _ := io.ReadAll(resp.Body)
result.Status = "error"
result.Message = string(body)
} else {
result.Status = "success"
}
}
results = append(results, result)
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(results)
}