306 lines
7.6 KiB
Go
306 lines
7.6 KiB
Go
package store
|
|
|
|
import (
|
|
"database/sql"
|
|
"embed"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io/fs"
|
|
"sort"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/inou-ai/messaging-center/internal/core"
|
|
_ "github.com/mattn/go-sqlite3"
|
|
)
|
|
|
|
//go:embed migrations/*.sql
|
|
var migrations embed.FS
|
|
|
|
// Store provides database operations.
|
|
type Store struct {
|
|
db *sql.DB
|
|
}
|
|
|
|
// New creates a new Store and runs migrations.
|
|
func New(dbPath string) (*Store, error) {
|
|
db, err := sql.Open("sqlite3", dbPath+"?_journal_mode=WAL&_foreign_keys=on")
|
|
if err != nil {
|
|
return nil, fmt.Errorf("open database: %w", err)
|
|
}
|
|
|
|
s := &Store{db: db}
|
|
if err := s.migrate(); err != nil {
|
|
db.Close()
|
|
return nil, fmt.Errorf("migrate: %w", err)
|
|
}
|
|
|
|
return s, nil
|
|
}
|
|
|
|
// Close closes the database connection.
|
|
func (s *Store) Close() error {
|
|
return s.db.Close()
|
|
}
|
|
|
|
// migrate runs all SQL migrations in order.
|
|
func (s *Store) migrate() error {
|
|
// Create migrations table
|
|
_, err := s.db.Exec(`
|
|
CREATE TABLE IF NOT EXISTS migrations (
|
|
id INTEGER PRIMARY KEY,
|
|
name TEXT NOT NULL UNIQUE,
|
|
applied_at DATETIME DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
`)
|
|
if err != nil {
|
|
return fmt.Errorf("create migrations table: %w", err)
|
|
}
|
|
|
|
// Get applied migrations
|
|
rows, err := s.db.Query("SELECT name FROM migrations")
|
|
if err != nil {
|
|
return fmt.Errorf("query migrations: %w", err)
|
|
}
|
|
applied := make(map[string]bool)
|
|
for rows.Next() {
|
|
var name string
|
|
if err := rows.Scan(&name); err != nil {
|
|
rows.Close()
|
|
return err
|
|
}
|
|
applied[name] = true
|
|
}
|
|
rows.Close()
|
|
|
|
// Get migration files
|
|
entries, err := fs.ReadDir(migrations, "migrations")
|
|
if err != nil {
|
|
return fmt.Errorf("read migrations dir: %w", err)
|
|
}
|
|
|
|
// Sort by name
|
|
var files []string
|
|
for _, e := range entries {
|
|
if !e.IsDir() && strings.HasSuffix(e.Name(), ".sql") {
|
|
files = append(files, e.Name())
|
|
}
|
|
}
|
|
sort.Strings(files)
|
|
|
|
// Apply unapplied migrations
|
|
for _, name := range files {
|
|
if applied[name] {
|
|
continue
|
|
}
|
|
|
|
data, err := migrations.ReadFile("migrations/" + name)
|
|
if err != nil {
|
|
return fmt.Errorf("read migration %s: %w", name, err)
|
|
}
|
|
|
|
tx, err := s.db.Begin()
|
|
if err != nil {
|
|
return fmt.Errorf("begin transaction: %w", err)
|
|
}
|
|
|
|
if _, err := tx.Exec(string(data)); err != nil {
|
|
tx.Rollback()
|
|
return fmt.Errorf("execute migration %s: %w", name, err)
|
|
}
|
|
|
|
if _, err := tx.Exec("INSERT INTO migrations (name) VALUES (?)", name); err != nil {
|
|
tx.Rollback()
|
|
return fmt.Errorf("record migration %s: %w", name, err)
|
|
}
|
|
|
|
if err := tx.Commit(); err != nil {
|
|
return fmt.Errorf("commit migration %s: %w", name, err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// DB returns the underlying database connection.
|
|
func (s *Store) DB() *sql.DB {
|
|
return s.db
|
|
}
|
|
|
|
// CreateMessage inserts a new message.
|
|
func (s *Store) CreateMessage(m *core.Message) error {
|
|
raw, _ := json.Marshal(m.Raw)
|
|
from, _ := json.Marshal(m.From)
|
|
to, _ := json.Marshal(m.To)
|
|
|
|
_, err := s.db.Exec(`
|
|
INSERT INTO messages (id, source, direction, from_contact, to_contact, timestamp, type, subject, body, body_html, status, raw, created_at, updated_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
`, m.ID, m.Source, m.Direction, from, to, m.Timestamp, m.Type, m.Subject, m.Body, m.BodyHTML, m.Status, raw, m.CreatedAt, m.UpdatedAt)
|
|
return err
|
|
}
|
|
|
|
// GetMessage retrieves a message by ID.
|
|
func (s *Store) GetMessage(id string) (*core.Message, error) {
|
|
var m core.Message
|
|
var from, to, raw []byte
|
|
|
|
err := s.db.QueryRow(`
|
|
SELECT id, source, direction, from_contact, to_contact, timestamp, type, subject, body, body_html, status, raw, created_at, updated_at
|
|
FROM messages WHERE id = ?
|
|
`, id).Scan(&m.ID, &m.Source, &m.Direction, &from, &to, &m.Timestamp, &m.Type, &m.Subject, &m.Body, &m.BodyHTML, &m.Status, &raw, &m.CreatedAt, &m.UpdatedAt)
|
|
if err == sql.ErrNoRows {
|
|
return nil, nil
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
json.Unmarshal(from, &m.From)
|
|
json.Unmarshal(to, &m.To)
|
|
m.Raw = raw
|
|
|
|
// Load attachments
|
|
attachments, err := s.GetAttachmentsByMessage(id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
m.Attachments = attachments
|
|
|
|
return &m, nil
|
|
}
|
|
|
|
// ListMessages retrieves messages with optional filters.
|
|
func (s *Store) ListMessages(f core.MessageFilter) ([]core.Message, error) {
|
|
query := `SELECT id, source, direction, from_contact, to_contact, timestamp, type, subject, body, body_html, status, raw, created_at, updated_at FROM messages WHERE 1=1`
|
|
args := []any{}
|
|
|
|
if f.Source != "" {
|
|
query += " AND source = ?"
|
|
args = append(args, f.Source)
|
|
}
|
|
if f.Direction != "" {
|
|
query += " AND direction = ?"
|
|
args = append(args, f.Direction)
|
|
}
|
|
if f.Type != "" {
|
|
query += " AND type = ?"
|
|
args = append(args, f.Type)
|
|
}
|
|
if f.Since != nil {
|
|
query += " AND timestamp >= ?"
|
|
args = append(args, *f.Since)
|
|
}
|
|
if f.Until != nil {
|
|
query += " AND timestamp <= ?"
|
|
args = append(args, *f.Until)
|
|
}
|
|
|
|
query += " ORDER BY timestamp DESC"
|
|
|
|
if f.Limit > 0 {
|
|
query += fmt.Sprintf(" LIMIT %d", f.Limit)
|
|
} else {
|
|
query += " LIMIT 50"
|
|
}
|
|
if f.Offset > 0 {
|
|
query += fmt.Sprintf(" OFFSET %d", f.Offset)
|
|
}
|
|
|
|
rows, err := s.db.Query(query, args...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var messages []core.Message
|
|
for rows.Next() {
|
|
var m core.Message
|
|
var from, to, raw []byte
|
|
|
|
if err := rows.Scan(&m.ID, &m.Source, &m.Direction, &from, &to, &m.Timestamp, &m.Type, &m.Subject, &m.Body, &m.BodyHTML, &m.Status, &raw, &m.CreatedAt, &m.UpdatedAt); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
json.Unmarshal(from, &m.From)
|
|
json.Unmarshal(to, &m.To)
|
|
m.Raw = raw
|
|
|
|
messages = append(messages, m)
|
|
}
|
|
|
|
return messages, nil
|
|
}
|
|
|
|
// CreateAttachment inserts a new attachment.
|
|
func (s *Store) CreateAttachment(a *core.Attachment) error {
|
|
_, err := s.db.Exec(`
|
|
INSERT INTO attachments (id, message_id, type, filename, size, local_path, transcription, status)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
|
`, a.ID, a.MessageID, a.Type, a.Filename, a.Size, a.LocalPath, a.Transcription, a.Status)
|
|
return err
|
|
}
|
|
|
|
// GetAttachmentsByMessage retrieves all attachments for a message.
|
|
func (s *Store) GetAttachmentsByMessage(messageID string) ([]core.Attachment, error) {
|
|
rows, err := s.db.Query(`
|
|
SELECT id, message_id, type, filename, size, local_path, transcription, status
|
|
FROM attachments WHERE message_id = ?
|
|
`, messageID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var attachments []core.Attachment
|
|
for rows.Next() {
|
|
var a core.Attachment
|
|
if err := rows.Scan(&a.ID, &a.MessageID, &a.Type, &a.Filename, &a.Size, &a.LocalPath, &a.Transcription, &a.Status); err != nil {
|
|
return nil, err
|
|
}
|
|
attachments = append(attachments, a)
|
|
}
|
|
|
|
return attachments, nil
|
|
}
|
|
|
|
// CreateCommand inserts a new command.
|
|
func (s *Store) CreateCommand(c *core.Command) error {
|
|
_, err := s.db.Exec(`
|
|
INSERT INTO commands (id, type, payload, status, error, created_at, updated_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)
|
|
`, c.ID, c.Type, c.Payload, c.Status, c.Error, c.CreatedAt, c.UpdatedAt)
|
|
return err
|
|
}
|
|
|
|
// GetCommand retrieves a command by ID.
|
|
func (s *Store) GetCommand(id string) (*core.Command, error) {
|
|
var c core.Command
|
|
|
|
err := s.db.QueryRow(`
|
|
SELECT id, type, payload, status, error, created_at, updated_at
|
|
FROM commands WHERE id = ?
|
|
`, id).Scan(&c.ID, &c.Type, &c.Payload, &c.Status, &c.Error, &c.CreatedAt, &c.UpdatedAt)
|
|
if err == sql.ErrNoRows {
|
|
return nil, nil
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &c, nil
|
|
}
|
|
|
|
// UpdateCommandStatus updates a command's status and error.
|
|
func (s *Store) UpdateCommandStatus(id, status, errMsg string) error {
|
|
_, err := s.db.Exec(`
|
|
UPDATE commands SET status = ?, error = ?, updated_at = ? WHERE id = ?
|
|
`, status, errMsg, time.Now(), id)
|
|
return err
|
|
}
|
|
|
|
// Ping checks database connectivity.
|
|
func (s *Store) Ping() error {
|
|
return s.db.Ping()
|
|
}
|