From e017ee4cdd1841b1167b3ed44ed4c43fb563d8ea Mon Sep 17 00:00:00 2001 From: Johan Jongsma Date: Mon, 2 Feb 2026 22:40:30 +0000 Subject: [PATCH] Add SQLite orchestration tracking for message state - Track first_seen, last_action, acked_by per message - Actions: archive, delete, reply, to-docs - /messages/new now filters by orchestration state - Persists across restarts (unlike in-memory tracking) --- go.mod | 1 + go.sum | 2 + main.go | 44 ++++++++++- orchestration.go | 194 +++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 240 insertions(+), 1 deletion(-) create mode 100644 orchestration.go diff --git a/go.mod b/go.mod index 0a3c491..e4aa977 100644 --- a/go.mod +++ b/go.mod @@ -10,5 +10,6 @@ require ( require ( github.com/emersion/go-sasl v0.0.0-20231106173351-e73c9f7bad43 // indirect + github.com/mattn/go-sqlite3 v1.14.33 // indirect golang.org/x/text v0.14.0 // indirect ) diff --git a/go.sum b/go.sum index b012eba..ea98368 100644 --- a/go.sum +++ b/go.sum @@ -4,6 +4,8 @@ github.com/emersion/go-message v0.18.2 h1:rl55SQdjd9oJcIoQNhubD2Acs1E6IzlZISRTK7 github.com/emersion/go-message v0.18.2/go.mod h1:XpJyL70LwRvq2a8rVbHXikPgKj8+aI0kGdHlg16ibYA= github.com/emersion/go-sasl v0.0.0-20231106173351-e73c9f7bad43 h1:hH4PQfOndHDlpzYfLAAfl63E8Le6F2+EL/cdhlkyRJY= github.com/emersion/go-sasl v0.0.0-20231106173351-e73c9f7bad43/go.mod h1:iL2twTeMvZnrg54ZoPDNfJaJaqy0xIQFuBdrLsmspwQ= +github.com/mattn/go-sqlite3 v1.14.33 h1:A5blZ5ulQo2AtayQ9/limgHEkFreKj1Dv226a1K73s0= +github.com/mattn/go-sqlite3 v1.14.33/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= diff --git a/main.go b/main.go index 99286c3..5d1604c 100644 --- a/main.go +++ b/main.go @@ -66,6 +66,7 @@ type WebhookConfig struct { var ( config Config store *MessageStore + orch *OrchestrationDB ) func main() { @@ -100,6 +101,14 @@ func main() { // Initialize store store = NewMessageStore(config.DataDir) + // Initialize orchestration DB + orchPath := filepath.Join(config.DataDir, "orchestration.db") + orch, err = NewOrchestrationDB(orchPath) + if err != nil { + log.Fatalf("Failed to open orchestration DB: %v", err) + } + defer orch.Close() + // Register email connectors for name, acc := range config.Accounts { smtpConfig := SMTPConfig{ @@ -285,7 +294,20 @@ func handleMessagesNew(w http.ResponseWriter, r *http.Request) { return } - json.NewEncoder(w).Encode(messages) + // Filter by orchestration state and record new messages + var filtered []UnifiedMessage + for _, msg := range messages { + // Record that we've seen this message + source, sourceID, _ := parseMessageID(msg.ID) + orch.RecordSeen(msg.ID, source, sourceID, "INBOX") + + // Only include if no action has been taken + if !orch.HasAction(msg.ID) { + filtered = append(filtered, msg) + } + } + + json.NewEncoder(w).Encode(filtered) } func handleMessagesAck(w http.ResponseWriter, r *http.Request) { @@ -297,6 +319,7 @@ func handleMessagesAck(w http.ResponseWriter, r *http.Request) { var req struct { Consumer string `json:"consumer"` Timestamp time.Time `json:"timestamp"` + IDs []string `json:"ids"` // Optional: specific message IDs to ack } if err := json.NewDecoder(r.Body).Decode(&req); err != nil { @@ -313,15 +336,22 @@ func handleMessagesAck(w http.ResponseWriter, r *http.Request) { req.Timestamp = time.Now() } + // Ack cursor-based if err := store.Ack(req.Consumer, req.Timestamp); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } + // Also record per-message acks in orchestration DB + for _, id := range req.IDs { + orch.RecordAck(id, req.Consumer) + } + json.NewEncoder(w).Encode(map[string]interface{}{ "status": "ok", "consumer": req.Consumer, "timestamp": req.Timestamp, + "acked": len(req.IDs), }) } @@ -392,6 +422,9 @@ func handleArchive(w http.ResponseWriter, r *http.Request, id string) { return } + // Record action in orchestration DB + orch.RecordAction(id, "archive") + json.NewEncoder(w).Encode(map[string]string{"status": "archived"}) } @@ -406,6 +439,9 @@ func handleDelete(w http.ResponseWriter, r *http.Request, id string) { return } + // Record action in orchestration DB + orch.RecordAction(id, "delete") + json.NewEncoder(w).Encode(map[string]string{"status": "deleted"}) } @@ -435,6 +471,9 @@ func handleReply(w http.ResponseWriter, r *http.Request, id string) { return } + // Record action in orchestration DB + orch.RecordAction(id, "reply") + json.NewEncoder(w).Encode(map[string]string{"status": "sent"}) } @@ -504,6 +543,9 @@ func handleToDocs(w http.ResponseWriter, r *http.Request, id string) { if len(errors) > 0 && len(saved) == 0 { w.WriteHeader(http.StatusInternalServerError) + } else if len(saved) > 0 { + // Record action in orchestration DB (only if we saved something) + orch.RecordAction(id, "to-docs") } json.NewEncoder(w).Encode(result) diff --git a/orchestration.go b/orchestration.go new file mode 100644 index 0000000..31bd004 --- /dev/null +++ b/orchestration.go @@ -0,0 +1,194 @@ +package main + +import ( + "database/sql" + "fmt" + "time" + + _ "github.com/mattn/go-sqlite3" +) + +// OrchestrationDB tracks message state without storing content +type OrchestrationDB struct { + db *sql.DB +} + +// MessageState represents orchestration state for a message +type MessageState struct { + ID string `json:"id"` // "whatsapp:3A6034A970A5C44D17F9" + Source string `json:"source"` // "whatsapp", "tj_jongsma_me", etc. + SourceID string `json:"source_id"` // original ID at source + Mailbox string `json:"mailbox"` // "INBOX", null for WA + FirstSeenAt time.Time `json:"first_seen_at"` + LastAction string `json:"last_action,omitempty"` // "archive", "delete", "reply", "to-docs" + LastActionAt *time.Time `json:"last_action_at,omitempty"` + AckedBy string `json:"acked_by,omitempty"` + AckedAt *time.Time `json:"acked_at,omitempty"` +} + +// NewOrchestrationDB opens or creates the orchestration database +func NewOrchestrationDB(path string) (*OrchestrationDB, error) { + db, err := sql.Open("sqlite3", path) + if err != nil { + return nil, fmt.Errorf("open db: %w", err) + } + + // Create schema + schema := ` + CREATE TABLE IF NOT EXISTS message_state ( + id TEXT PRIMARY KEY, + source TEXT NOT NULL, + source_id TEXT NOT NULL, + mailbox TEXT, + first_seen_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + last_action TEXT, + last_action_at TIMESTAMP, + acked_by TEXT, + acked_at TIMESTAMP + ); + + CREATE INDEX IF NOT EXISTS idx_source ON message_state(source, source_id); + CREATE INDEX IF NOT EXISTS idx_pending ON message_state(last_action) WHERE last_action IS NULL; + ` + + if _, err := db.Exec(schema); err != nil { + return nil, fmt.Errorf("create schema: %w", err) + } + + return &OrchestrationDB{db: db}, nil +} + +// Close closes the database +func (o *OrchestrationDB) Close() error { + return o.db.Close() +} + +// RecordSeen marks a message as seen (first encounter) +func (o *OrchestrationDB) RecordSeen(id, source, sourceID, mailbox string) error { + _, err := o.db.Exec(` + INSERT INTO message_state (id, source, source_id, mailbox, first_seen_at) + VALUES (?, ?, ?, ?, ?) + ON CONFLICT(id) DO NOTHING + `, id, source, sourceID, mailbox, time.Now()) + return err +} + +// RecordAction records an action taken on a message +func (o *OrchestrationDB) RecordAction(id, action string) error { + now := time.Now() + result, err := o.db.Exec(` + UPDATE message_state + SET last_action = ?, last_action_at = ? + WHERE id = ? + `, action, now, id) + if err != nil { + return err + } + + rows, _ := result.RowsAffected() + if rows == 0 { + // Message wasn't seen yet, insert with action + source, sourceID, _ := parseMessageID(id) + _, err = o.db.Exec(` + INSERT INTO message_state (id, source, source_id, mailbox, first_seen_at, last_action, last_action_at) + VALUES (?, ?, ?, ?, ?, ?, ?) + `, id, source, sourceID, "INBOX", time.Now(), action, now) + } + return err +} + +// RecordAck records that a consumer acknowledged a message +func (o *OrchestrationDB) RecordAck(id, consumer string) error { + now := time.Now() + result, err := o.db.Exec(` + UPDATE message_state + SET acked_by = ?, acked_at = ? + WHERE id = ? + `, consumer, now, id) + if err != nil { + return err + } + + rows, _ := result.RowsAffected() + if rows == 0 { + source, sourceID, _ := parseMessageID(id) + _, err = o.db.Exec(` + INSERT INTO message_state (id, source, source_id, mailbox, first_seen_at, acked_by, acked_at) + VALUES (?, ?, ?, ?, ?, ?, ?) + `, id, source, sourceID, "INBOX", time.Now(), consumer, now) + } + return err +} + +// GetState returns the state for a message +func (o *OrchestrationDB) GetState(id string) (*MessageState, error) { + row := o.db.QueryRow(` + SELECT id, source, source_id, mailbox, first_seen_at, last_action, last_action_at, acked_by, acked_at + FROM message_state WHERE id = ? + `, id) + + var state MessageState + var lastAction, ackedBy sql.NullString + var lastActionAt, ackedAt sql.NullTime + var mailbox sql.NullString + + err := row.Scan(&state.ID, &state.Source, &state.SourceID, &mailbox, + &state.FirstSeenAt, &lastAction, &lastActionAt, &ackedBy, &ackedAt) + if err == sql.ErrNoRows { + return nil, nil + } + if err != nil { + return nil, err + } + + if mailbox.Valid { + state.Mailbox = mailbox.String + } + if lastAction.Valid { + state.LastAction = lastAction.String + } + if lastActionAt.Valid { + state.LastActionAt = &lastActionAt.Time + } + if ackedBy.Valid { + state.AckedBy = ackedBy.String + } + if ackedAt.Valid { + state.AckedAt = &ackedAt.Time + } + + return &state, nil +} + +// HasAction checks if a message has been actioned (archived, deleted, etc.) +func (o *OrchestrationDB) HasAction(id string) bool { + var count int + o.db.QueryRow(` + SELECT COUNT(*) FROM message_state + WHERE id = ? AND last_action IS NOT NULL + `, id).Scan(&count) + return count > 0 +} + +// GetPendingIDs returns message IDs that have no action recorded +func (o *OrchestrationDB) GetPendingIDs() ([]string, error) { + rows, err := o.db.Query(` + SELECT id FROM message_state + WHERE last_action IS NULL + ORDER BY first_seen_at DESC + `) + if err != nil { + return nil, err + } + defer rows.Close() + + var ids []string + for rows.Next() { + var id string + if err := rows.Scan(&id); err != nil { + return nil, err + } + ids = append(ids, id) + } + return ids, nil +}