message-center/store.go

340 lines
7.4 KiB
Go

package main
import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"sort"
"sync"
"time"
)
// MessageStore manages unified messages and cursor tracking
type MessageStore struct {
connectors map[string]Connector
cursors map[string]time.Time // consumer -> high-water mark
mu sync.RWMutex
dataDir string
}
// CursorFile stores cursor positions persistently
type CursorFile struct {
Cursors map[string]time.Time `json:"cursors"`
}
// NewMessageStore creates a new message store
func NewMessageStore(dataDir string) *MessageStore {
store := &MessageStore{
connectors: make(map[string]Connector),
cursors: make(map[string]time.Time),
dataDir: dataDir,
}
store.loadCursors()
return store
}
func (s *MessageStore) loadCursors() {
path := filepath.Join(s.dataDir, "cursors.json")
data, err := os.ReadFile(path)
if err != nil {
return // File doesn't exist yet
}
var cf CursorFile
if err := json.Unmarshal(data, &cf); err != nil {
return
}
s.cursors = cf.Cursors
}
func (s *MessageStore) saveCursors() error {
if s.dataDir == "" {
return nil
}
path := filepath.Join(s.dataDir, "cursors.json")
cf := CursorFile{Cursors: s.cursors}
data, err := json.MarshalIndent(cf, "", " ")
if err != nil {
return err
}
return os.WriteFile(path, data, 0644)
}
// RegisterConnector adds a connector to the store
func (s *MessageStore) RegisterConnector(c Connector) {
s.mu.Lock()
defer s.mu.Unlock()
s.connectors[c.Name()] = c
}
// GetConnector returns a connector by name
func (s *MessageStore) GetConnector(name string) (Connector, bool) {
s.mu.RLock()
defer s.mu.RUnlock()
c, ok := s.connectors[name]
return c, ok
}
// FetchNew returns unseen messages from all connectors
func (s *MessageStore) FetchNew() ([]UnifiedMessage, error) {
s.mu.RLock()
connectors := make([]Connector, 0, len(s.connectors))
for _, c := range s.connectors {
connectors = append(connectors, c)
}
s.mu.RUnlock()
var allMessages []UnifiedMessage
for _, c := range connectors {
msgs, err := c.FetchNew()
if err != nil {
// Log but continue with other connectors
continue
}
allMessages = append(allMessages, msgs...)
}
// Sort by timestamp descending (newest first)
sort.Slice(allMessages, func(i, j int) bool {
return allMessages[i].Timestamp.After(allMessages[j].Timestamp)
})
return allMessages, nil
}
// FetchSince returns messages since the given duration
func (s *MessageStore) FetchSince(duration time.Duration) ([]UnifiedMessage, error) {
since := time.Now().Add(-duration)
return s.FetchSinceTime(since)
}
// FetchSinceTime returns messages since the given time
func (s *MessageStore) FetchSinceTime(since time.Time) ([]UnifiedMessage, error) {
s.mu.RLock()
connectors := make([]Connector, 0, len(s.connectors))
for _, c := range s.connectors {
connectors = append(connectors, c)
}
s.mu.RUnlock()
var allMessages []UnifiedMessage
for _, c := range connectors {
msgs, err := c.FetchSince(since)
if err != nil {
continue
}
allMessages = append(allMessages, msgs...)
}
// Sort by timestamp descending
sort.Slice(allMessages, func(i, j int) bool {
return allMessages[i].Timestamp.After(allMessages[j].Timestamp)
})
return allMessages, nil
}
// FetchOne returns a single message by ID (source:uid format)
func (s *MessageStore) FetchOne(id string) (*UnifiedMessage, error) {
source, sourceID, err := parseMessageID(id)
if err != nil {
return nil, err
}
s.mu.RLock()
c, ok := s.connectors[source]
s.mu.RUnlock()
if !ok {
return nil, fmt.Errorf("unknown source: %s", source)
}
return c.FetchOne(sourceID)
}
// Ack advances the cursor for a consumer
func (s *MessageStore) Ack(consumer string, timestamp time.Time) error {
s.mu.Lock()
defer s.mu.Unlock()
s.cursors[consumer] = timestamp
return s.saveCursors()
}
// GetCursor returns the cursor position for a consumer
func (s *MessageStore) GetCursor(consumer string) time.Time {
s.mu.RLock()
defer s.mu.RUnlock()
return s.cursors[consumer]
}
// Archive archives a message
func (s *MessageStore) Archive(id string) error {
source, sourceID, err := parseMessageID(id)
if err != nil {
return err
}
s.mu.RLock()
c, ok := s.connectors[source]
s.mu.RUnlock()
if !ok {
return fmt.Errorf("unknown source: %s", source)
}
return c.Archive(sourceID)
}
// Delete deletes a message
func (s *MessageStore) Delete(id string) error {
source, sourceID, err := parseMessageID(id)
if err != nil {
return err
}
s.mu.RLock()
c, ok := s.connectors[source]
s.mu.RUnlock()
if !ok {
return fmt.Errorf("unknown source: %s", source)
}
return c.Delete(sourceID)
}
// Reply sends a reply to a message
func (s *MessageStore) Reply(id string, body string, attachments []string) error {
source, sourceID, err := parseMessageID(id)
if err != nil {
return err
}
s.mu.RLock()
c, ok := s.connectors[source]
s.mu.RUnlock()
if !ok {
return fmt.Errorf("unknown source: %s", source)
}
return c.Reply(sourceID, body, attachments)
}
// MarkSeen marks a message as seen
func (s *MessageStore) MarkSeen(id string) error {
source, sourceID, err := parseMessageID(id)
if err != nil {
return err
}
s.mu.RLock()
c, ok := s.connectors[source]
s.mu.RUnlock()
if !ok {
return fmt.Errorf("unknown source: %s", source)
}
return c.MarkSeen(sourceID)
}
// GetAttachment retrieves attachment content
func (s *MessageStore) GetAttachment(id string, filename string) ([]byte, error) {
source, sourceID, err := parseMessageID(id)
if err != nil {
return nil, err
}
s.mu.RLock()
c, ok := s.connectors[source]
s.mu.RUnlock()
if !ok {
return nil, fmt.Errorf("unknown source: %s", source)
}
return c.GetAttachment(sourceID, filename)
}
// StartAll starts all connectors with a unified callback
func (s *MessageStore) StartAll(callback func()) error {
s.mu.RLock()
defer s.mu.RUnlock()
for _, c := range s.connectors {
if err := c.Start(callback); err != nil {
return fmt.Errorf("start %s: %w", c.Name(), err)
}
}
return nil
}
// StartAllWithCallbacks starts connectors with different callbacks for email vs other connectors
func (s *MessageStore) StartAllWithCallbacks(emailCallback, otherCallback func()) error {
s.mu.RLock()
defer s.mu.RUnlock()
for _, c := range s.connectors {
cb := otherCallback
if _, isEmail := c.(*EmailConnector); isEmail {
cb = emailCallback
}
if err := c.Start(cb); err != nil {
return fmt.Errorf("start %s: %w", c.Name(), err)
}
}
return nil
}
// StopAll stops all connectors
func (s *MessageStore) StopAll() {
s.mu.RLock()
defer s.mu.RUnlock()
for _, c := range s.connectors {
c.Stop()
}
}
// parseMessageID splits "source:uid" into components
func parseMessageID(id string) (source, sourceID string, err error) {
for i := 0; i < len(id); i++ {
if id[i] == ':' {
return id[:i], id[i+1:], nil
}
}
return "", "", fmt.Errorf("invalid message ID format: %s (expected source:uid)", id)
}
// ParseDuration parses duration strings like "24h", "7d", "1w"
func ParseDuration(s string) (time.Duration, error) {
if len(s) == 0 {
return 0, fmt.Errorf("empty duration")
}
// Check for special suffixes
last := s[len(s)-1]
switch last {
case 'd': // days
var n int
if _, err := fmt.Sscanf(s, "%dd", &n); err == nil {
return time.Duration(n) * 24 * time.Hour, nil
}
case 'w': // weeks
var n int
if _, err := fmt.Sscanf(s, "%dw", &n); err == nil {
return time.Duration(n) * 7 * 24 * time.Hour, nil
}
}
// Fall back to standard Go duration parsing
return time.ParseDuration(s)
}