package main import ( "encoding/json" "fmt" "os" "path/filepath" "sort" "sync" "time" ) // MessageStore manages unified messages and cursor tracking type MessageStore struct { connectors map[string]Connector cursors map[string]time.Time // consumer -> high-water mark mu sync.RWMutex dataDir string } // CursorFile stores cursor positions persistently type CursorFile struct { Cursors map[string]time.Time `json:"cursors"` } // NewMessageStore creates a new message store func NewMessageStore(dataDir string) *MessageStore { store := &MessageStore{ connectors: make(map[string]Connector), cursors: make(map[string]time.Time), dataDir: dataDir, } store.loadCursors() return store } func (s *MessageStore) loadCursors() { path := filepath.Join(s.dataDir, "cursors.json") data, err := os.ReadFile(path) if err != nil { return // File doesn't exist yet } var cf CursorFile if err := json.Unmarshal(data, &cf); err != nil { return } s.cursors = cf.Cursors } func (s *MessageStore) saveCursors() error { if s.dataDir == "" { return nil } path := filepath.Join(s.dataDir, "cursors.json") cf := CursorFile{Cursors: s.cursors} data, err := json.MarshalIndent(cf, "", " ") if err != nil { return err } return os.WriteFile(path, data, 0644) } // RegisterConnector adds a connector to the store func (s *MessageStore) RegisterConnector(c Connector) { s.mu.Lock() defer s.mu.Unlock() s.connectors[c.Name()] = c } // GetConnector returns a connector by name func (s *MessageStore) GetConnector(name string) (Connector, bool) { s.mu.RLock() defer s.mu.RUnlock() c, ok := s.connectors[name] return c, ok } // FetchNew returns unseen messages from all connectors func (s *MessageStore) FetchNew() ([]UnifiedMessage, error) { s.mu.RLock() connectors := make([]Connector, 0, len(s.connectors)) for _, c := range s.connectors { connectors = append(connectors, c) } s.mu.RUnlock() var allMessages []UnifiedMessage for _, c := range connectors { msgs, err := c.FetchNew() if err != nil { // Log but continue with other connectors continue } allMessages = append(allMessages, msgs...) } // Sort by timestamp descending (newest first) sort.Slice(allMessages, func(i, j int) bool { return allMessages[i].Timestamp.After(allMessages[j].Timestamp) }) return allMessages, nil } // FetchSince returns messages since the given duration func (s *MessageStore) FetchSince(duration time.Duration) ([]UnifiedMessage, error) { since := time.Now().Add(-duration) return s.FetchSinceTime(since) } // FetchSinceTime returns messages since the given time func (s *MessageStore) FetchSinceTime(since time.Time) ([]UnifiedMessage, error) { s.mu.RLock() connectors := make([]Connector, 0, len(s.connectors)) for _, c := range s.connectors { connectors = append(connectors, c) } s.mu.RUnlock() var allMessages []UnifiedMessage for _, c := range connectors { msgs, err := c.FetchSince(since) if err != nil { continue } allMessages = append(allMessages, msgs...) } // Sort by timestamp descending sort.Slice(allMessages, func(i, j int) bool { return allMessages[i].Timestamp.After(allMessages[j].Timestamp) }) return allMessages, nil } // FetchOne returns a single message by ID (source:uid format) func (s *MessageStore) FetchOne(id string) (*UnifiedMessage, error) { source, sourceID, err := parseMessageID(id) if err != nil { return nil, err } s.mu.RLock() c, ok := s.connectors[source] s.mu.RUnlock() if !ok { return nil, fmt.Errorf("unknown source: %s", source) } return c.FetchOne(sourceID) } // Ack advances the cursor for a consumer func (s *MessageStore) Ack(consumer string, timestamp time.Time) error { s.mu.Lock() defer s.mu.Unlock() s.cursors[consumer] = timestamp return s.saveCursors() } // GetCursor returns the cursor position for a consumer func (s *MessageStore) GetCursor(consumer string) time.Time { s.mu.RLock() defer s.mu.RUnlock() return s.cursors[consumer] } // Archive archives a message func (s *MessageStore) Archive(id string) error { source, sourceID, err := parseMessageID(id) if err != nil { return err } s.mu.RLock() c, ok := s.connectors[source] s.mu.RUnlock() if !ok { return fmt.Errorf("unknown source: %s", source) } return c.Archive(sourceID) } // Delete deletes a message func (s *MessageStore) Delete(id string) error { source, sourceID, err := parseMessageID(id) if err != nil { return err } s.mu.RLock() c, ok := s.connectors[source] s.mu.RUnlock() if !ok { return fmt.Errorf("unknown source: %s", source) } return c.Delete(sourceID) } // Reply sends a reply to a message func (s *MessageStore) Reply(id string, body string, attachments []string) error { source, sourceID, err := parseMessageID(id) if err != nil { return err } s.mu.RLock() c, ok := s.connectors[source] s.mu.RUnlock() if !ok { return fmt.Errorf("unknown source: %s", source) } return c.Reply(sourceID, body, attachments) } // MarkSeen marks a message as seen func (s *MessageStore) MarkSeen(id string) error { source, sourceID, err := parseMessageID(id) if err != nil { return err } s.mu.RLock() c, ok := s.connectors[source] s.mu.RUnlock() if !ok { return fmt.Errorf("unknown source: %s", source) } return c.MarkSeen(sourceID) } // GetAttachment retrieves attachment content func (s *MessageStore) GetAttachment(id string, filename string) ([]byte, error) { source, sourceID, err := parseMessageID(id) if err != nil { return nil, err } s.mu.RLock() c, ok := s.connectors[source] s.mu.RUnlock() if !ok { return nil, fmt.Errorf("unknown source: %s", source) } return c.GetAttachment(sourceID, filename) } // StartAll starts all connectors with a unified callback func (s *MessageStore) StartAll(callback func()) error { s.mu.RLock() defer s.mu.RUnlock() for _, c := range s.connectors { if err := c.Start(callback); err != nil { return fmt.Errorf("start %s: %w", c.Name(), err) } } return nil } // StartAllWithCallbacks starts connectors with different callbacks for email vs other connectors func (s *MessageStore) StartAllWithCallbacks(emailCallback, otherCallback func()) error { s.mu.RLock() defer s.mu.RUnlock() for _, c := range s.connectors { cb := otherCallback if _, isEmail := c.(*EmailConnector); isEmail { cb = emailCallback } if err := c.Start(cb); err != nil { return fmt.Errorf("start %s: %w", c.Name(), err) } } return nil } // StopAll stops all connectors func (s *MessageStore) StopAll() { s.mu.RLock() defer s.mu.RUnlock() for _, c := range s.connectors { c.Stop() } } // parseMessageID splits "source:uid" into components func parseMessageID(id string) (source, sourceID string, err error) { for i := 0; i < len(id); i++ { if id[i] == ':' { return id[:i], id[i+1:], nil } } return "", "", fmt.Errorf("invalid message ID format: %s (expected source:uid)", id) } // ParseDuration parses duration strings like "24h", "7d", "1w" func ParseDuration(s string) (time.Duration, error) { if len(s) == 0 { return 0, fmt.Errorf("empty duration") } // Check for special suffixes last := s[len(s)-1] switch last { case 'd': // days var n int if _, err := fmt.Sscanf(s, "%dd", &n); err == nil { return time.Duration(n) * 24 * time.Hour, nil } case 'w': // weeks var n int if _, err := fmt.Sscanf(s, "%dw", &n); err == nil { return time.Duration(n) * 7 * 24 * time.Hour, nil } } // Fall back to standard Go duration parsing return time.ParseDuration(s) }