Add SQLite orchestration tracking for message state

- Track first_seen, last_action, acked_by per message
- Actions: archive, delete, reply, to-docs
- /messages/new now filters by orchestration state
- Persists across restarts (unlike in-memory tracking)
This commit is contained in:
Johan Jongsma 2026-02-02 22:40:30 +00:00
parent f4e372be84
commit e017ee4cdd
4 changed files with 240 additions and 1 deletions

1
go.mod
View File

@ -10,5 +10,6 @@ require (
require (
github.com/emersion/go-sasl v0.0.0-20231106173351-e73c9f7bad43 // indirect
github.com/mattn/go-sqlite3 v1.14.33 // indirect
golang.org/x/text v0.14.0 // indirect
)

2
go.sum
View File

@ -4,6 +4,8 @@ github.com/emersion/go-message v0.18.2 h1:rl55SQdjd9oJcIoQNhubD2Acs1E6IzlZISRTK7
github.com/emersion/go-message v0.18.2/go.mod h1:XpJyL70LwRvq2a8rVbHXikPgKj8+aI0kGdHlg16ibYA=
github.com/emersion/go-sasl v0.0.0-20231106173351-e73c9f7bad43 h1:hH4PQfOndHDlpzYfLAAfl63E8Le6F2+EL/cdhlkyRJY=
github.com/emersion/go-sasl v0.0.0-20231106173351-e73c9f7bad43/go.mod h1:iL2twTeMvZnrg54ZoPDNfJaJaqy0xIQFuBdrLsmspwQ=
github.com/mattn/go-sqlite3 v1.14.33 h1:A5blZ5ulQo2AtayQ9/limgHEkFreKj1Dv226a1K73s0=
github.com/mattn/go-sqlite3 v1.14.33/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=

44
main.go
View File

@ -66,6 +66,7 @@ type WebhookConfig struct {
var (
config Config
store *MessageStore
orch *OrchestrationDB
)
func main() {
@ -100,6 +101,14 @@ func main() {
// Initialize store
store = NewMessageStore(config.DataDir)
// Initialize orchestration DB
orchPath := filepath.Join(config.DataDir, "orchestration.db")
orch, err = NewOrchestrationDB(orchPath)
if err != nil {
log.Fatalf("Failed to open orchestration DB: %v", err)
}
defer orch.Close()
// Register email connectors
for name, acc := range config.Accounts {
smtpConfig := SMTPConfig{
@ -285,7 +294,20 @@ func handleMessagesNew(w http.ResponseWriter, r *http.Request) {
return
}
json.NewEncoder(w).Encode(messages)
// Filter by orchestration state and record new messages
var filtered []UnifiedMessage
for _, msg := range messages {
// Record that we've seen this message
source, sourceID, _ := parseMessageID(msg.ID)
orch.RecordSeen(msg.ID, source, sourceID, "INBOX")
// Only include if no action has been taken
if !orch.HasAction(msg.ID) {
filtered = append(filtered, msg)
}
}
json.NewEncoder(w).Encode(filtered)
}
func handleMessagesAck(w http.ResponseWriter, r *http.Request) {
@ -297,6 +319,7 @@ func handleMessagesAck(w http.ResponseWriter, r *http.Request) {
var req struct {
Consumer string `json:"consumer"`
Timestamp time.Time `json:"timestamp"`
IDs []string `json:"ids"` // Optional: specific message IDs to ack
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
@ -313,15 +336,22 @@ func handleMessagesAck(w http.ResponseWriter, r *http.Request) {
req.Timestamp = time.Now()
}
// Ack cursor-based
if err := store.Ack(req.Consumer, req.Timestamp); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
// Also record per-message acks in orchestration DB
for _, id := range req.IDs {
orch.RecordAck(id, req.Consumer)
}
json.NewEncoder(w).Encode(map[string]interface{}{
"status": "ok",
"consumer": req.Consumer,
"timestamp": req.Timestamp,
"acked": len(req.IDs),
})
}
@ -392,6 +422,9 @@ func handleArchive(w http.ResponseWriter, r *http.Request, id string) {
return
}
// Record action in orchestration DB
orch.RecordAction(id, "archive")
json.NewEncoder(w).Encode(map[string]string{"status": "archived"})
}
@ -406,6 +439,9 @@ func handleDelete(w http.ResponseWriter, r *http.Request, id string) {
return
}
// Record action in orchestration DB
orch.RecordAction(id, "delete")
json.NewEncoder(w).Encode(map[string]string{"status": "deleted"})
}
@ -435,6 +471,9 @@ func handleReply(w http.ResponseWriter, r *http.Request, id string) {
return
}
// Record action in orchestration DB
orch.RecordAction(id, "reply")
json.NewEncoder(w).Encode(map[string]string{"status": "sent"})
}
@ -504,6 +543,9 @@ func handleToDocs(w http.ResponseWriter, r *http.Request, id string) {
if len(errors) > 0 && len(saved) == 0 {
w.WriteHeader(http.StatusInternalServerError)
} else if len(saved) > 0 {
// Record action in orchestration DB (only if we saved something)
orch.RecordAction(id, "to-docs")
}
json.NewEncoder(w).Encode(result)

194
orchestration.go Normal file
View File

@ -0,0 +1,194 @@
package main
import (
"database/sql"
"fmt"
"time"
_ "github.com/mattn/go-sqlite3"
)
// OrchestrationDB tracks message state without storing content
type OrchestrationDB struct {
db *sql.DB
}
// MessageState represents orchestration state for a message
type MessageState struct {
ID string `json:"id"` // "whatsapp:3A6034A970A5C44D17F9"
Source string `json:"source"` // "whatsapp", "tj_jongsma_me", etc.
SourceID string `json:"source_id"` // original ID at source
Mailbox string `json:"mailbox"` // "INBOX", null for WA
FirstSeenAt time.Time `json:"first_seen_at"`
LastAction string `json:"last_action,omitempty"` // "archive", "delete", "reply", "to-docs"
LastActionAt *time.Time `json:"last_action_at,omitempty"`
AckedBy string `json:"acked_by,omitempty"`
AckedAt *time.Time `json:"acked_at,omitempty"`
}
// NewOrchestrationDB opens or creates the orchestration database
func NewOrchestrationDB(path string) (*OrchestrationDB, error) {
db, err := sql.Open("sqlite3", path)
if err != nil {
return nil, fmt.Errorf("open db: %w", err)
}
// Create schema
schema := `
CREATE TABLE IF NOT EXISTS message_state (
id TEXT PRIMARY KEY,
source TEXT NOT NULL,
source_id TEXT NOT NULL,
mailbox TEXT,
first_seen_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_action TEXT,
last_action_at TIMESTAMP,
acked_by TEXT,
acked_at TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_source ON message_state(source, source_id);
CREATE INDEX IF NOT EXISTS idx_pending ON message_state(last_action) WHERE last_action IS NULL;
`
if _, err := db.Exec(schema); err != nil {
return nil, fmt.Errorf("create schema: %w", err)
}
return &OrchestrationDB{db: db}, nil
}
// Close closes the database
func (o *OrchestrationDB) Close() error {
return o.db.Close()
}
// RecordSeen marks a message as seen (first encounter)
func (o *OrchestrationDB) RecordSeen(id, source, sourceID, mailbox string) error {
_, err := o.db.Exec(`
INSERT INTO message_state (id, source, source_id, mailbox, first_seen_at)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(id) DO NOTHING
`, id, source, sourceID, mailbox, time.Now())
return err
}
// RecordAction records an action taken on a message
func (o *OrchestrationDB) RecordAction(id, action string) error {
now := time.Now()
result, err := o.db.Exec(`
UPDATE message_state
SET last_action = ?, last_action_at = ?
WHERE id = ?
`, action, now, id)
if err != nil {
return err
}
rows, _ := result.RowsAffected()
if rows == 0 {
// Message wasn't seen yet, insert with action
source, sourceID, _ := parseMessageID(id)
_, err = o.db.Exec(`
INSERT INTO message_state (id, source, source_id, mailbox, first_seen_at, last_action, last_action_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
`, id, source, sourceID, "INBOX", time.Now(), action, now)
}
return err
}
// RecordAck records that a consumer acknowledged a message
func (o *OrchestrationDB) RecordAck(id, consumer string) error {
now := time.Now()
result, err := o.db.Exec(`
UPDATE message_state
SET acked_by = ?, acked_at = ?
WHERE id = ?
`, consumer, now, id)
if err != nil {
return err
}
rows, _ := result.RowsAffected()
if rows == 0 {
source, sourceID, _ := parseMessageID(id)
_, err = o.db.Exec(`
INSERT INTO message_state (id, source, source_id, mailbox, first_seen_at, acked_by, acked_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
`, id, source, sourceID, "INBOX", time.Now(), consumer, now)
}
return err
}
// GetState returns the state for a message
func (o *OrchestrationDB) GetState(id string) (*MessageState, error) {
row := o.db.QueryRow(`
SELECT id, source, source_id, mailbox, first_seen_at, last_action, last_action_at, acked_by, acked_at
FROM message_state WHERE id = ?
`, id)
var state MessageState
var lastAction, ackedBy sql.NullString
var lastActionAt, ackedAt sql.NullTime
var mailbox sql.NullString
err := row.Scan(&state.ID, &state.Source, &state.SourceID, &mailbox,
&state.FirstSeenAt, &lastAction, &lastActionAt, &ackedBy, &ackedAt)
if err == sql.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, err
}
if mailbox.Valid {
state.Mailbox = mailbox.String
}
if lastAction.Valid {
state.LastAction = lastAction.String
}
if lastActionAt.Valid {
state.LastActionAt = &lastActionAt.Time
}
if ackedBy.Valid {
state.AckedBy = ackedBy.String
}
if ackedAt.Valid {
state.AckedAt = &ackedAt.Time
}
return &state, nil
}
// HasAction checks if a message has been actioned (archived, deleted, etc.)
func (o *OrchestrationDB) HasAction(id string) bool {
var count int
o.db.QueryRow(`
SELECT COUNT(*) FROM message_state
WHERE id = ? AND last_action IS NOT NULL
`, id).Scan(&count)
return count > 0
}
// GetPendingIDs returns message IDs that have no action recorded
func (o *OrchestrationDB) GetPendingIDs() ([]string, error) {
rows, err := o.db.Query(`
SELECT id FROM message_state
WHERE last_action IS NULL
ORDER BY first_seen_at DESC
`)
if err != nil {
return nil, err
}
defer rows.Close()
var ids []string
for rows.Next() {
var id string
if err := rows.Scan(&id); err != nil {
return nil, err
}
ids = append(ids, id)
}
return ids, nil
}