package store import ( "database/sql" "embed" "encoding/json" "fmt" "io/fs" "sort" "strings" "time" "github.com/inou-ai/messaging-center/internal/core" _ "github.com/mattn/go-sqlite3" ) //go:embed migrations/*.sql var migrations embed.FS // Store provides database operations. type Store struct { db *sql.DB } // New creates a new Store and runs migrations. func New(dbPath string) (*Store, error) { db, err := sql.Open("sqlite3", dbPath+"?_journal_mode=WAL&_foreign_keys=on") if err != nil { return nil, fmt.Errorf("open database: %w", err) } s := &Store{db: db} if err := s.migrate(); err != nil { db.Close() return nil, fmt.Errorf("migrate: %w", err) } return s, nil } // Close closes the database connection. func (s *Store) Close() error { return s.db.Close() } // migrate runs all SQL migrations in order. func (s *Store) migrate() error { // Create migrations table _, err := s.db.Exec(` CREATE TABLE IF NOT EXISTS migrations ( id INTEGER PRIMARY KEY, name TEXT NOT NULL UNIQUE, applied_at DATETIME DEFAULT CURRENT_TIMESTAMP ) `) if err != nil { return fmt.Errorf("create migrations table: %w", err) } // Get applied migrations rows, err := s.db.Query("SELECT name FROM migrations") if err != nil { return fmt.Errorf("query migrations: %w", err) } applied := make(map[string]bool) for rows.Next() { var name string if err := rows.Scan(&name); err != nil { rows.Close() return err } applied[name] = true } rows.Close() // Get migration files entries, err := fs.ReadDir(migrations, "migrations") if err != nil { return fmt.Errorf("read migrations dir: %w", err) } // Sort by name var files []string for _, e := range entries { if !e.IsDir() && strings.HasSuffix(e.Name(), ".sql") { files = append(files, e.Name()) } } sort.Strings(files) // Apply unapplied migrations for _, name := range files { if applied[name] { continue } data, err := migrations.ReadFile("migrations/" + name) if err != nil { return fmt.Errorf("read migration %s: %w", name, err) } tx, err := s.db.Begin() if err != nil { return fmt.Errorf("begin transaction: %w", err) } if _, err := tx.Exec(string(data)); err != nil { tx.Rollback() return fmt.Errorf("execute migration %s: %w", name, err) } if _, err := tx.Exec("INSERT INTO migrations (name) VALUES (?)", name); err != nil { tx.Rollback() return fmt.Errorf("record migration %s: %w", name, err) } if err := tx.Commit(); err != nil { return fmt.Errorf("commit migration %s: %w", name, err) } } return nil } // DB returns the underlying database connection. func (s *Store) DB() *sql.DB { return s.db } // CreateMessage inserts a new message. func (s *Store) CreateMessage(m *core.Message) error { raw, _ := json.Marshal(m.Raw) from, _ := json.Marshal(m.From) to, _ := json.Marshal(m.To) _, err := s.db.Exec(` INSERT INTO messages (id, source, direction, from_contact, to_contact, timestamp, type, subject, body, body_html, status, raw, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) `, m.ID, m.Source, m.Direction, from, to, m.Timestamp, m.Type, m.Subject, m.Body, m.BodyHTML, m.Status, raw, m.CreatedAt, m.UpdatedAt) return err } // GetMessage retrieves a message by ID. func (s *Store) GetMessage(id string) (*core.Message, error) { var m core.Message var from, to, raw []byte err := s.db.QueryRow(` SELECT id, source, direction, from_contact, to_contact, timestamp, type, subject, body, body_html, status, raw, created_at, updated_at FROM messages WHERE id = ? `, id).Scan(&m.ID, &m.Source, &m.Direction, &from, &to, &m.Timestamp, &m.Type, &m.Subject, &m.Body, &m.BodyHTML, &m.Status, &raw, &m.CreatedAt, &m.UpdatedAt) if err == sql.ErrNoRows { return nil, nil } if err != nil { return nil, err } json.Unmarshal(from, &m.From) json.Unmarshal(to, &m.To) m.Raw = raw // Load attachments attachments, err := s.GetAttachmentsByMessage(id) if err != nil { return nil, err } m.Attachments = attachments return &m, nil } // ListMessages retrieves messages with optional filters. func (s *Store) ListMessages(f core.MessageFilter) ([]core.Message, error) { query := `SELECT id, source, direction, from_contact, to_contact, timestamp, type, subject, body, body_html, status, raw, created_at, updated_at FROM messages WHERE 1=1` args := []any{} if f.Source != "" { query += " AND source = ?" args = append(args, f.Source) } if f.Direction != "" { query += " AND direction = ?" args = append(args, f.Direction) } if f.Type != "" { query += " AND type = ?" args = append(args, f.Type) } if f.Since != nil { query += " AND timestamp >= ?" args = append(args, *f.Since) } if f.Until != nil { query += " AND timestamp <= ?" args = append(args, *f.Until) } query += " ORDER BY timestamp DESC" if f.Limit > 0 { query += fmt.Sprintf(" LIMIT %d", f.Limit) } else { query += " LIMIT 50" } if f.Offset > 0 { query += fmt.Sprintf(" OFFSET %d", f.Offset) } rows, err := s.db.Query(query, args...) if err != nil { return nil, err } defer rows.Close() var messages []core.Message for rows.Next() { var m core.Message var from, to, raw []byte if err := rows.Scan(&m.ID, &m.Source, &m.Direction, &from, &to, &m.Timestamp, &m.Type, &m.Subject, &m.Body, &m.BodyHTML, &m.Status, &raw, &m.CreatedAt, &m.UpdatedAt); err != nil { return nil, err } json.Unmarshal(from, &m.From) json.Unmarshal(to, &m.To) m.Raw = raw messages = append(messages, m) } return messages, nil } // CreateAttachment inserts a new attachment. func (s *Store) CreateAttachment(a *core.Attachment) error { _, err := s.db.Exec(` INSERT INTO attachments (id, message_id, type, filename, size, local_path, transcription, status) VALUES (?, ?, ?, ?, ?, ?, ?, ?) `, a.ID, a.MessageID, a.Type, a.Filename, a.Size, a.LocalPath, a.Transcription, a.Status) return err } // GetAttachmentsByMessage retrieves all attachments for a message. func (s *Store) GetAttachmentsByMessage(messageID string) ([]core.Attachment, error) { rows, err := s.db.Query(` SELECT id, message_id, type, filename, size, local_path, transcription, status FROM attachments WHERE message_id = ? `, messageID) if err != nil { return nil, err } defer rows.Close() var attachments []core.Attachment for rows.Next() { var a core.Attachment if err := rows.Scan(&a.ID, &a.MessageID, &a.Type, &a.Filename, &a.Size, &a.LocalPath, &a.Transcription, &a.Status); err != nil { return nil, err } attachments = append(attachments, a) } return attachments, nil } // CreateCommand inserts a new command. func (s *Store) CreateCommand(c *core.Command) error { _, err := s.db.Exec(` INSERT INTO commands (id, type, payload, status, error, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?) `, c.ID, c.Type, c.Payload, c.Status, c.Error, c.CreatedAt, c.UpdatedAt) return err } // GetCommand retrieves a command by ID. func (s *Store) GetCommand(id string) (*core.Command, error) { var c core.Command err := s.db.QueryRow(` SELECT id, type, payload, status, error, created_at, updated_at FROM commands WHERE id = ? `, id).Scan(&c.ID, &c.Type, &c.Payload, &c.Status, &c.Error, &c.CreatedAt, &c.UpdatedAt) if err == sql.ErrNoRows { return nil, nil } if err != nil { return nil, err } return &c, nil } // UpdateCommandStatus updates a command's status and error. func (s *Store) UpdateCommandStatus(id, status, errMsg string) error { _, err := s.db.Exec(` UPDATE commands SET status = ?, error = ?, updated_at = ? WHERE id = ? `, status, errMsg, time.Now(), id) return err } // Ping checks database connectivity. func (s *Store) Ping() error { return s.db.Ping() }