323 lines
6.9 KiB
Go
323 lines
6.9 KiB
Go
package main
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
"sort"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// MessageStore manages unified messages and cursor tracking
|
|
type MessageStore struct {
|
|
connectors map[string]Connector
|
|
cursors map[string]time.Time // consumer -> high-water mark
|
|
mu sync.RWMutex
|
|
dataDir string
|
|
}
|
|
|
|
// CursorFile stores cursor positions persistently
|
|
type CursorFile struct {
|
|
Cursors map[string]time.Time `json:"cursors"`
|
|
}
|
|
|
|
// NewMessageStore creates a new message store
|
|
func NewMessageStore(dataDir string) *MessageStore {
|
|
store := &MessageStore{
|
|
connectors: make(map[string]Connector),
|
|
cursors: make(map[string]time.Time),
|
|
dataDir: dataDir,
|
|
}
|
|
store.loadCursors()
|
|
return store
|
|
}
|
|
|
|
func (s *MessageStore) loadCursors() {
|
|
path := filepath.Join(s.dataDir, "cursors.json")
|
|
data, err := os.ReadFile(path)
|
|
if err != nil {
|
|
return // File doesn't exist yet
|
|
}
|
|
|
|
var cf CursorFile
|
|
if err := json.Unmarshal(data, &cf); err != nil {
|
|
return
|
|
}
|
|
|
|
s.cursors = cf.Cursors
|
|
}
|
|
|
|
func (s *MessageStore) saveCursors() error {
|
|
if s.dataDir == "" {
|
|
return nil
|
|
}
|
|
|
|
path := filepath.Join(s.dataDir, "cursors.json")
|
|
cf := CursorFile{Cursors: s.cursors}
|
|
|
|
data, err := json.MarshalIndent(cf, "", " ")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return os.WriteFile(path, data, 0644)
|
|
}
|
|
|
|
// RegisterConnector adds a connector to the store
|
|
func (s *MessageStore) RegisterConnector(c Connector) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
s.connectors[c.Name()] = c
|
|
}
|
|
|
|
// GetConnector returns a connector by name
|
|
func (s *MessageStore) GetConnector(name string) (Connector, bool) {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
c, ok := s.connectors[name]
|
|
return c, ok
|
|
}
|
|
|
|
// FetchNew returns unseen messages from all connectors
|
|
func (s *MessageStore) FetchNew() ([]UnifiedMessage, error) {
|
|
s.mu.RLock()
|
|
connectors := make([]Connector, 0, len(s.connectors))
|
|
for _, c := range s.connectors {
|
|
connectors = append(connectors, c)
|
|
}
|
|
s.mu.RUnlock()
|
|
|
|
var allMessages []UnifiedMessage
|
|
for _, c := range connectors {
|
|
msgs, err := c.FetchNew()
|
|
if err != nil {
|
|
// Log but continue with other connectors
|
|
continue
|
|
}
|
|
allMessages = append(allMessages, msgs...)
|
|
}
|
|
|
|
// Sort by timestamp descending (newest first)
|
|
sort.Slice(allMessages, func(i, j int) bool {
|
|
return allMessages[i].Timestamp.After(allMessages[j].Timestamp)
|
|
})
|
|
|
|
return allMessages, nil
|
|
}
|
|
|
|
// FetchSince returns messages since the given duration
|
|
func (s *MessageStore) FetchSince(duration time.Duration) ([]UnifiedMessage, error) {
|
|
since := time.Now().Add(-duration)
|
|
return s.FetchSinceTime(since)
|
|
}
|
|
|
|
// FetchSinceTime returns messages since the given time
|
|
func (s *MessageStore) FetchSinceTime(since time.Time) ([]UnifiedMessage, error) {
|
|
s.mu.RLock()
|
|
connectors := make([]Connector, 0, len(s.connectors))
|
|
for _, c := range s.connectors {
|
|
connectors = append(connectors, c)
|
|
}
|
|
s.mu.RUnlock()
|
|
|
|
var allMessages []UnifiedMessage
|
|
for _, c := range connectors {
|
|
msgs, err := c.FetchSince(since)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
allMessages = append(allMessages, msgs...)
|
|
}
|
|
|
|
// Sort by timestamp descending
|
|
sort.Slice(allMessages, func(i, j int) bool {
|
|
return allMessages[i].Timestamp.After(allMessages[j].Timestamp)
|
|
})
|
|
|
|
return allMessages, nil
|
|
}
|
|
|
|
// FetchOne returns a single message by ID (source:uid format)
|
|
func (s *MessageStore) FetchOne(id string) (*UnifiedMessage, error) {
|
|
source, sourceID, err := parseMessageID(id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
s.mu.RLock()
|
|
c, ok := s.connectors[source]
|
|
s.mu.RUnlock()
|
|
|
|
if !ok {
|
|
return nil, fmt.Errorf("unknown source: %s", source)
|
|
}
|
|
|
|
return c.FetchOne(sourceID)
|
|
}
|
|
|
|
// Ack advances the cursor for a consumer
|
|
func (s *MessageStore) Ack(consumer string, timestamp time.Time) error {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
s.cursors[consumer] = timestamp
|
|
return s.saveCursors()
|
|
}
|
|
|
|
// GetCursor returns the cursor position for a consumer
|
|
func (s *MessageStore) GetCursor(consumer string) time.Time {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
return s.cursors[consumer]
|
|
}
|
|
|
|
// Archive archives a message
|
|
func (s *MessageStore) Archive(id string) error {
|
|
source, sourceID, err := parseMessageID(id)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
s.mu.RLock()
|
|
c, ok := s.connectors[source]
|
|
s.mu.RUnlock()
|
|
|
|
if !ok {
|
|
return fmt.Errorf("unknown source: %s", source)
|
|
}
|
|
|
|
return c.Archive(sourceID)
|
|
}
|
|
|
|
// Delete deletes a message
|
|
func (s *MessageStore) Delete(id string) error {
|
|
source, sourceID, err := parseMessageID(id)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
s.mu.RLock()
|
|
c, ok := s.connectors[source]
|
|
s.mu.RUnlock()
|
|
|
|
if !ok {
|
|
return fmt.Errorf("unknown source: %s", source)
|
|
}
|
|
|
|
return c.Delete(sourceID)
|
|
}
|
|
|
|
// Reply sends a reply to a message
|
|
func (s *MessageStore) Reply(id string, body string, attachments []string) error {
|
|
source, sourceID, err := parseMessageID(id)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
s.mu.RLock()
|
|
c, ok := s.connectors[source]
|
|
s.mu.RUnlock()
|
|
|
|
if !ok {
|
|
return fmt.Errorf("unknown source: %s", source)
|
|
}
|
|
|
|
return c.Reply(sourceID, body, attachments)
|
|
}
|
|
|
|
// MarkSeen marks a message as seen
|
|
func (s *MessageStore) MarkSeen(id string) error {
|
|
source, sourceID, err := parseMessageID(id)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
s.mu.RLock()
|
|
c, ok := s.connectors[source]
|
|
s.mu.RUnlock()
|
|
|
|
if !ok {
|
|
return fmt.Errorf("unknown source: %s", source)
|
|
}
|
|
|
|
return c.MarkSeen(sourceID)
|
|
}
|
|
|
|
// GetAttachment retrieves attachment content
|
|
func (s *MessageStore) GetAttachment(id string, filename string) ([]byte, error) {
|
|
source, sourceID, err := parseMessageID(id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
s.mu.RLock()
|
|
c, ok := s.connectors[source]
|
|
s.mu.RUnlock()
|
|
|
|
if !ok {
|
|
return nil, fmt.Errorf("unknown source: %s", source)
|
|
}
|
|
|
|
return c.GetAttachment(sourceID, filename)
|
|
}
|
|
|
|
// StartAll starts all connectors with a unified callback
|
|
func (s *MessageStore) StartAll(callback func()) error {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
|
|
for _, c := range s.connectors {
|
|
if err := c.Start(callback); err != nil {
|
|
return fmt.Errorf("start %s: %w", c.Name(), err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// StopAll stops all connectors
|
|
func (s *MessageStore) StopAll() {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
|
|
for _, c := range s.connectors {
|
|
c.Stop()
|
|
}
|
|
}
|
|
|
|
// parseMessageID splits "source:uid" into components
|
|
func parseMessageID(id string) (source, sourceID string, err error) {
|
|
for i := 0; i < len(id); i++ {
|
|
if id[i] == ':' {
|
|
return id[:i], id[i+1:], nil
|
|
}
|
|
}
|
|
return "", "", fmt.Errorf("invalid message ID format: %s (expected source:uid)", id)
|
|
}
|
|
|
|
// ParseDuration parses duration strings like "24h", "7d", "1w"
|
|
func ParseDuration(s string) (time.Duration, error) {
|
|
if len(s) == 0 {
|
|
return 0, fmt.Errorf("empty duration")
|
|
}
|
|
|
|
// Check for special suffixes
|
|
last := s[len(s)-1]
|
|
switch last {
|
|
case 'd': // days
|
|
var n int
|
|
if _, err := fmt.Sscanf(s, "%dd", &n); err == nil {
|
|
return time.Duration(n) * 24 * time.Hour, nil
|
|
}
|
|
case 'w': // weeks
|
|
var n int
|
|
if _, err := fmt.Sscanf(s, "%dw", &n); err == nil {
|
|
return time.Duration(n) * 7 * 24 * time.Hour, nil
|
|
}
|
|
}
|
|
|
|
// Fall back to standard Go duration parsing
|
|
return time.ParseDuration(s)
|
|
}
|