messaging-center/internal/store/sqlite.go

306 lines
7.6 KiB
Go

package store
import (
"database/sql"
"embed"
"encoding/json"
"fmt"
"io/fs"
"sort"
"strings"
"time"
"github.com/inou-ai/messaging-center/internal/core"
_ "github.com/mattn/go-sqlite3"
)
//go:embed migrations/*.sql
var migrations embed.FS
// Store provides database operations.
type Store struct {
db *sql.DB
}
// New creates a new Store and runs migrations.
func New(dbPath string) (*Store, error) {
db, err := sql.Open("sqlite3", dbPath+"?_journal_mode=WAL&_foreign_keys=on")
if err != nil {
return nil, fmt.Errorf("open database: %w", err)
}
s := &Store{db: db}
if err := s.migrate(); err != nil {
db.Close()
return nil, fmt.Errorf("migrate: %w", err)
}
return s, nil
}
// Close closes the database connection.
func (s *Store) Close() error {
return s.db.Close()
}
// migrate runs all SQL migrations in order.
func (s *Store) migrate() error {
// Create migrations table
_, err := s.db.Exec(`
CREATE TABLE IF NOT EXISTS migrations (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL UNIQUE,
applied_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
`)
if err != nil {
return fmt.Errorf("create migrations table: %w", err)
}
// Get applied migrations
rows, err := s.db.Query("SELECT name FROM migrations")
if err != nil {
return fmt.Errorf("query migrations: %w", err)
}
applied := make(map[string]bool)
for rows.Next() {
var name string
if err := rows.Scan(&name); err != nil {
rows.Close()
return err
}
applied[name] = true
}
rows.Close()
// Get migration files
entries, err := fs.ReadDir(migrations, "migrations")
if err != nil {
return fmt.Errorf("read migrations dir: %w", err)
}
// Sort by name
var files []string
for _, e := range entries {
if !e.IsDir() && strings.HasSuffix(e.Name(), ".sql") {
files = append(files, e.Name())
}
}
sort.Strings(files)
// Apply unapplied migrations
for _, name := range files {
if applied[name] {
continue
}
data, err := migrations.ReadFile("migrations/" + name)
if err != nil {
return fmt.Errorf("read migration %s: %w", name, err)
}
tx, err := s.db.Begin()
if err != nil {
return fmt.Errorf("begin transaction: %w", err)
}
if _, err := tx.Exec(string(data)); err != nil {
tx.Rollback()
return fmt.Errorf("execute migration %s: %w", name, err)
}
if _, err := tx.Exec("INSERT INTO migrations (name) VALUES (?)", name); err != nil {
tx.Rollback()
return fmt.Errorf("record migration %s: %w", name, err)
}
if err := tx.Commit(); err != nil {
return fmt.Errorf("commit migration %s: %w", name, err)
}
}
return nil
}
// DB returns the underlying database connection.
func (s *Store) DB() *sql.DB {
return s.db
}
// CreateMessage inserts a new message.
func (s *Store) CreateMessage(m *core.Message) error {
raw, _ := json.Marshal(m.Raw)
from, _ := json.Marshal(m.From)
to, _ := json.Marshal(m.To)
_, err := s.db.Exec(`
INSERT INTO messages (id, source, direction, from_contact, to_contact, timestamp, type, subject, body, body_html, status, raw, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`, m.ID, m.Source, m.Direction, from, to, m.Timestamp, m.Type, m.Subject, m.Body, m.BodyHTML, m.Status, raw, m.CreatedAt, m.UpdatedAt)
return err
}
// GetMessage retrieves a message by ID.
func (s *Store) GetMessage(id string) (*core.Message, error) {
var m core.Message
var from, to, raw []byte
err := s.db.QueryRow(`
SELECT id, source, direction, from_contact, to_contact, timestamp, type, subject, body, body_html, status, raw, created_at, updated_at
FROM messages WHERE id = ?
`, id).Scan(&m.ID, &m.Source, &m.Direction, &from, &to, &m.Timestamp, &m.Type, &m.Subject, &m.Body, &m.BodyHTML, &m.Status, &raw, &m.CreatedAt, &m.UpdatedAt)
if err == sql.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, err
}
json.Unmarshal(from, &m.From)
json.Unmarshal(to, &m.To)
m.Raw = raw
// Load attachments
attachments, err := s.GetAttachmentsByMessage(id)
if err != nil {
return nil, err
}
m.Attachments = attachments
return &m, nil
}
// ListMessages retrieves messages with optional filters.
func (s *Store) ListMessages(f core.MessageFilter) ([]core.Message, error) {
query := `SELECT id, source, direction, from_contact, to_contact, timestamp, type, subject, body, body_html, status, raw, created_at, updated_at FROM messages WHERE 1=1`
args := []any{}
if f.Source != "" {
query += " AND source = ?"
args = append(args, f.Source)
}
if f.Direction != "" {
query += " AND direction = ?"
args = append(args, f.Direction)
}
if f.Type != "" {
query += " AND type = ?"
args = append(args, f.Type)
}
if f.Since != nil {
query += " AND timestamp >= ?"
args = append(args, *f.Since)
}
if f.Until != nil {
query += " AND timestamp <= ?"
args = append(args, *f.Until)
}
query += " ORDER BY timestamp DESC"
if f.Limit > 0 {
query += fmt.Sprintf(" LIMIT %d", f.Limit)
} else {
query += " LIMIT 50"
}
if f.Offset > 0 {
query += fmt.Sprintf(" OFFSET %d", f.Offset)
}
rows, err := s.db.Query(query, args...)
if err != nil {
return nil, err
}
defer rows.Close()
var messages []core.Message
for rows.Next() {
var m core.Message
var from, to, raw []byte
if err := rows.Scan(&m.ID, &m.Source, &m.Direction, &from, &to, &m.Timestamp, &m.Type, &m.Subject, &m.Body, &m.BodyHTML, &m.Status, &raw, &m.CreatedAt, &m.UpdatedAt); err != nil {
return nil, err
}
json.Unmarshal(from, &m.From)
json.Unmarshal(to, &m.To)
m.Raw = raw
messages = append(messages, m)
}
return messages, nil
}
// CreateAttachment inserts a new attachment.
func (s *Store) CreateAttachment(a *core.Attachment) error {
_, err := s.db.Exec(`
INSERT INTO attachments (id, message_id, type, filename, size, local_path, transcription, status)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
`, a.ID, a.MessageID, a.Type, a.Filename, a.Size, a.LocalPath, a.Transcription, a.Status)
return err
}
// GetAttachmentsByMessage retrieves all attachments for a message.
func (s *Store) GetAttachmentsByMessage(messageID string) ([]core.Attachment, error) {
rows, err := s.db.Query(`
SELECT id, message_id, type, filename, size, local_path, transcription, status
FROM attachments WHERE message_id = ?
`, messageID)
if err != nil {
return nil, err
}
defer rows.Close()
var attachments []core.Attachment
for rows.Next() {
var a core.Attachment
if err := rows.Scan(&a.ID, &a.MessageID, &a.Type, &a.Filename, &a.Size, &a.LocalPath, &a.Transcription, &a.Status); err != nil {
return nil, err
}
attachments = append(attachments, a)
}
return attachments, nil
}
// CreateCommand inserts a new command.
func (s *Store) CreateCommand(c *core.Command) error {
_, err := s.db.Exec(`
INSERT INTO commands (id, type, payload, status, error, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
`, c.ID, c.Type, c.Payload, c.Status, c.Error, c.CreatedAt, c.UpdatedAt)
return err
}
// GetCommand retrieves a command by ID.
func (s *Store) GetCommand(id string) (*core.Command, error) {
var c core.Command
err := s.db.QueryRow(`
SELECT id, type, payload, status, error, created_at, updated_at
FROM commands WHERE id = ?
`, id).Scan(&c.ID, &c.Type, &c.Payload, &c.Status, &c.Error, &c.CreatedAt, &c.UpdatedAt)
if err == sql.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, err
}
return &c, nil
}
// UpdateCommandStatus updates a command's status and error.
func (s *Store) UpdateCommandStatus(id, status, errMsg string) error {
_, err := s.db.Exec(`
UPDATE commands SET status = ?, error = ?, updated_at = ? WHERE id = ?
`, status, errMsg, time.Now(), id)
return err
}
// Ping checks database connectivity.
func (s *Store) Ping() error {
return s.db.Ping()
}