195 lines
5.1 KiB
Go
195 lines
5.1 KiB
Go
package main
|
|
|
|
import (
|
|
"database/sql"
|
|
"fmt"
|
|
"time"
|
|
|
|
_ "github.com/mattn/go-sqlite3"
|
|
)
|
|
|
|
// OrchestrationDB tracks message state without storing content
|
|
type OrchestrationDB struct {
|
|
db *sql.DB
|
|
}
|
|
|
|
// MessageState represents orchestration state for a message
|
|
type MessageState struct {
|
|
ID string `json:"id"` // "whatsapp:3A6034A970A5C44D17F9"
|
|
Source string `json:"source"` // "whatsapp", "tj_jongsma_me", etc.
|
|
SourceID string `json:"source_id"` // original ID at source
|
|
Mailbox string `json:"mailbox"` // "INBOX", null for WA
|
|
FirstSeenAt time.Time `json:"first_seen_at"`
|
|
LastAction string `json:"last_action,omitempty"` // "archive", "delete", "reply", "to-docs"
|
|
LastActionAt *time.Time `json:"last_action_at,omitempty"`
|
|
AckedBy string `json:"acked_by,omitempty"`
|
|
AckedAt *time.Time `json:"acked_at,omitempty"`
|
|
}
|
|
|
|
// NewOrchestrationDB opens or creates the orchestration database
|
|
func NewOrchestrationDB(path string) (*OrchestrationDB, error) {
|
|
db, err := sql.Open("sqlite3", path)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("open db: %w", err)
|
|
}
|
|
|
|
// Create schema
|
|
schema := `
|
|
CREATE TABLE IF NOT EXISTS message_state (
|
|
id TEXT PRIMARY KEY,
|
|
source TEXT NOT NULL,
|
|
source_id TEXT NOT NULL,
|
|
mailbox TEXT,
|
|
first_seen_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
last_action TEXT,
|
|
last_action_at TIMESTAMP,
|
|
acked_by TEXT,
|
|
acked_at TIMESTAMP
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_source ON message_state(source, source_id);
|
|
CREATE INDEX IF NOT EXISTS idx_pending ON message_state(last_action) WHERE last_action IS NULL;
|
|
`
|
|
|
|
if _, err := db.Exec(schema); err != nil {
|
|
return nil, fmt.Errorf("create schema: %w", err)
|
|
}
|
|
|
|
return &OrchestrationDB{db: db}, nil
|
|
}
|
|
|
|
// Close closes the database
|
|
func (o *OrchestrationDB) Close() error {
|
|
return o.db.Close()
|
|
}
|
|
|
|
// RecordSeen marks a message as seen (first encounter)
|
|
func (o *OrchestrationDB) RecordSeen(id, source, sourceID, mailbox string) error {
|
|
_, err := o.db.Exec(`
|
|
INSERT INTO message_state (id, source, source_id, mailbox, first_seen_at)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
ON CONFLICT(id) DO NOTHING
|
|
`, id, source, sourceID, mailbox, time.Now())
|
|
return err
|
|
}
|
|
|
|
// RecordAction records an action taken on a message
|
|
func (o *OrchestrationDB) RecordAction(id, action string) error {
|
|
now := time.Now()
|
|
result, err := o.db.Exec(`
|
|
UPDATE message_state
|
|
SET last_action = ?, last_action_at = ?
|
|
WHERE id = ?
|
|
`, action, now, id)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
rows, _ := result.RowsAffected()
|
|
if rows == 0 {
|
|
// Message wasn't seen yet, insert with action
|
|
source, sourceID, _ := parseMessageID(id)
|
|
_, err = o.db.Exec(`
|
|
INSERT INTO message_state (id, source, source_id, mailbox, first_seen_at, last_action, last_action_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)
|
|
`, id, source, sourceID, "INBOX", time.Now(), action, now)
|
|
}
|
|
return err
|
|
}
|
|
|
|
// RecordAck records that a consumer acknowledged a message
|
|
func (o *OrchestrationDB) RecordAck(id, consumer string) error {
|
|
now := time.Now()
|
|
result, err := o.db.Exec(`
|
|
UPDATE message_state
|
|
SET acked_by = ?, acked_at = ?
|
|
WHERE id = ?
|
|
`, consumer, now, id)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
rows, _ := result.RowsAffected()
|
|
if rows == 0 {
|
|
source, sourceID, _ := parseMessageID(id)
|
|
_, err = o.db.Exec(`
|
|
INSERT INTO message_state (id, source, source_id, mailbox, first_seen_at, acked_by, acked_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)
|
|
`, id, source, sourceID, "INBOX", time.Now(), consumer, now)
|
|
}
|
|
return err
|
|
}
|
|
|
|
// GetState returns the state for a message
|
|
func (o *OrchestrationDB) GetState(id string) (*MessageState, error) {
|
|
row := o.db.QueryRow(`
|
|
SELECT id, source, source_id, mailbox, first_seen_at, last_action, last_action_at, acked_by, acked_at
|
|
FROM message_state WHERE id = ?
|
|
`, id)
|
|
|
|
var state MessageState
|
|
var lastAction, ackedBy sql.NullString
|
|
var lastActionAt, ackedAt sql.NullTime
|
|
var mailbox sql.NullString
|
|
|
|
err := row.Scan(&state.ID, &state.Source, &state.SourceID, &mailbox,
|
|
&state.FirstSeenAt, &lastAction, &lastActionAt, &ackedBy, &ackedAt)
|
|
if err == sql.ErrNoRows {
|
|
return nil, nil
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if mailbox.Valid {
|
|
state.Mailbox = mailbox.String
|
|
}
|
|
if lastAction.Valid {
|
|
state.LastAction = lastAction.String
|
|
}
|
|
if lastActionAt.Valid {
|
|
state.LastActionAt = &lastActionAt.Time
|
|
}
|
|
if ackedBy.Valid {
|
|
state.AckedBy = ackedBy.String
|
|
}
|
|
if ackedAt.Valid {
|
|
state.AckedAt = &ackedAt.Time
|
|
}
|
|
|
|
return &state, nil
|
|
}
|
|
|
|
// HasAction checks if a message has been actioned (archived, deleted, etc.)
|
|
func (o *OrchestrationDB) HasAction(id string) bool {
|
|
var count int
|
|
o.db.QueryRow(`
|
|
SELECT COUNT(*) FROM message_state
|
|
WHERE id = ? AND last_action IS NOT NULL
|
|
`, id).Scan(&count)
|
|
return count > 0
|
|
}
|
|
|
|
// GetPendingIDs returns message IDs that have no action recorded
|
|
func (o *OrchestrationDB) GetPendingIDs() ([]string, error) {
|
|
rows, err := o.db.Query(`
|
|
SELECT id FROM message_state
|
|
WHERE last_action IS NULL
|
|
ORDER BY first_seen_at DESC
|
|
`)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var ids []string
|
|
for rows.Next() {
|
|
var id string
|
|
if err := rows.Scan(&id); err != nil {
|
|
return nil, err
|
|
}
|
|
ids = append(ids, id)
|
|
}
|
|
return ids, nil
|
|
}
|