From 34e06fed8e11e4e33c074fa236f41ef3902033d4 Mon Sep 17 00:00:00 2001 From: Johan Jongsma Date: Mon, 2 Feb 2026 21:34:34 +0000 Subject: [PATCH] v0.1: Foundation - Go module with chi router, sqlite3, uuid deps - Directory structure per spec - SQLite store with embedded migrations - Core types: Message, Attachment, Command, Contact, Channel - REST API: - GET /health, GET /ready (no auth) - POST /oauth/token (client_credentials grant) - GET /api/v1/messages (list with filters) - GET /api/v1/messages/:id - POST /api/v1/messages (stub, creates pending) - POST /api/v1/commands (stub, creates pending) - GET /api/v1/channels (stub, lists configured) - OAuth 2.0 with Bearer token validation - YAML config with env var expansion (supports ${VAR:-default}) - Ready for v0.2 (email adapter) --- .gitignore | 22 + README.md | 74 ++++ SPEC.md | 473 ++++++++++++++++++++++ cmd/mc/main.go | 71 ++++ config.yaml | 61 +++ go.mod | 10 + go.sum | 10 + internal/api/channels.go | 45 ++ internal/api/commands.go | 87 ++++ internal/api/context.go | 20 + internal/api/messages.go | 156 +++++++ internal/api/oauth.go | 220 ++++++++++ internal/api/router.go | 90 ++++ internal/core/config.go | 160 ++++++++ internal/core/types.go | 73 ++++ internal/store/migrations/001_initial.sql | 52 +++ internal/store/sqlite.go | 305 ++++++++++++++ 17 files changed, 1929 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 SPEC.md create mode 100644 cmd/mc/main.go create mode 100644 config.yaml create mode 100644 go.mod create mode 100644 go.sum create mode 100644 internal/api/channels.go create mode 100644 internal/api/commands.go create mode 100644 internal/api/context.go create mode 100644 internal/api/messages.go create mode 100644 internal/api/oauth.go create mode 100644 internal/api/router.go create mode 100644 internal/core/config.go create mode 100644 internal/core/types.go create mode 100644 internal/store/migrations/001_initial.sql create mode 100644 internal/store/sqlite.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ae3ed2d --- /dev/null +++ b/.gitignore @@ -0,0 +1,22 @@ +# Binaries +/mc +*.exe + +# Data +data/ +*.db +*.db-journal +*.db-shm +*.db-wal + +# IDE +.idea/ +.vscode/ +*.swp + +# OS +.DS_Store + +# Local config overrides +config.local.yaml +.env diff --git a/README.md b/README.md new file mode 100644 index 0000000..7795f1f --- /dev/null +++ b/README.md @@ -0,0 +1,74 @@ +# Messaging Center + +Unified messaging hub that aggregates multiple communication channels into a single, normalized interface. + +## Status + +**v0.1 — Foundation** + +- [x] Project structure +- [x] SQLite store with migrations +- [x] Core types (Message, Attachment, Command, Contact) +- [x] REST API scaffold with chi +- [x] OAuth 2.0 (client_credentials) +- [x] Health/ready endpoints +- [ ] Adapters (email, whatsapp, signal) — v0.2+ +- [ ] Web GUI — v0.5 +- [ ] Webhooks — v0.6 + +## Quick Start + +```bash +# Build +go build -o mc ./cmd/mc + +# Run +./mc -config config.yaml + +# Or with go run +go run ./cmd/mc -config config.yaml +``` + +The server starts on `http://localhost:8040` by default. + +## API + +### Authentication + +Get an access token: + +```bash +curl -X POST http://localhost:8040/oauth/token \ + -d "grant_type=client_credentials" \ + -d "client_id=admin" \ + -d "client_secret=dev-secret-admin" +``` + +### Endpoints + +| Method | Endpoint | Auth | Description | +|--------|----------|------|-------------| +| GET | /health | No | Health check | +| GET | /ready | No | Readiness check | +| POST | /oauth/token | No | Get access token | +| GET | /api/v1/messages | Yes | List messages | +| GET | /api/v1/messages/:id | Yes | Get message | +| POST | /api/v1/messages | Yes | Send message (stub) | +| POST | /api/v1/commands | Yes | Execute command (stub) | +| GET | /api/v1/channels | Yes (admin) | List channels | + +### Example: List Messages + +```bash +TOKEN="your-access-token" +curl -H "Authorization: Bearer $TOKEN" \ + "http://localhost:8040/api/v1/messages?source=whatsapp&limit=10" +``` + +## Configuration + +See `config.yaml` for all options. Environment variables are expanded (e.g., `${MC_SIGNING_KEY}`). + +## License + +MIT diff --git a/SPEC.md b/SPEC.md new file mode 100644 index 0000000..392abb2 --- /dev/null +++ b/SPEC.md @@ -0,0 +1,473 @@ +# Messaging Center — Specification + +**Version:** 0.1.0 +**Status:** Draft +**Author:** James ⚡ + +## Overview + +Unified messaging hub that aggregates multiple communication channels into a single, normalized interface. OpenClaw (or any client) receives metadata via webhook, makes decisions, and sends commands back via REST API. + +**Key principle:** Files never transit through OpenClaw. MC stores attachments locally, sends metadata only, executes routing commands. + +## Architecture + +``` +┌─────────────────────────────────────────────────────────────────┐ +│ MESSAGING CENTER │ +├─────────────────────────────────────────────────────────────────┤ +│ │ +│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ +│ │ Email │ │WhatsApp │ │ Signal │ │ SMS │ │Voicemail│ │ +│ │ (IMAP) │ │(meow) │ │ (cli) │ │ (future)│ │(future) │ │ +│ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ │ +│ │ │ │ │ │ │ +│ └───────────┴───────────┼───────────┴───────────┘ │ +│ ▼ │ +│ ┌───────────────────┐ │ +│ │ Adapter Layer │ │ +│ └─────────┬─────────┘ │ +│ ▼ │ +│ ┌───────────────────────────────────────────────────────────┐ │ +│ │ Core Engine │ │ +│ │ - Normalize messages │ │ +│ │ - Store attachments (local) │ │ +│ │ - Transcribe voice (Fireworks Whisper) │ │ +│ │ - Queue outbound messages │ │ +│ │ - Execute commands │ │ +│ └───────────────────────────────────────────────────────────┘ │ +│ │ │ +│ ┌───────────────┼───────────────┐ │ +│ ▼ ▼ ▼ │ +│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ +│ │ REST API │ │ Webhooks │ │ Web GUI │ │ +│ │ (commands) │ │ (events) │ │ (oversight) │ │ +│ └─────────────┘ └─────────────┘ └─────────────┘ │ +│ │ +└─────────────────────────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────────┐ +│ OPENCLAW │ +│ - Receives message metadata (no files) │ +│ - Makes decisions │ +│ - Sends commands back to MC │ +└─────────────────────────────────────────────────────────────────┘ +``` + +## Tech Stack + +| Component | Technology | +|-----------|------------| +| Language | Go 1.25 | +| Database | SQLite (embedded, simple) | +| Web GUI | Go + templ + htmx | +| Auth | OAuth 2.0 (local provider, or external like Authentik) | +| Transcription | Fireworks Whisper API | +| Email | go-imap v2 | +| WhatsApp | whatsmeow | +| Signal | signal-cli JSON-RPC (external daemon) | + +## Data Model + +### Message + +```go +type Message struct { + ID string `json:"id"` + Source string `json:"source"` // email, whatsapp, signal, sms, voicemail + Direction string `json:"direction"` // inbound, outbound + From Contact `json:"from"` + To Contact `json:"to"` + Timestamp time.Time `json:"timestamp"` + Type string `json:"type"` // text, voice, image, document, email + Subject string `json:"subject,omitempty"` + Body string `json:"body"` // text content or transcription + BodyHTML string `json:"body_html,omitempty"` + Attachments []Attachment `json:"attachments,omitempty"` + Status string `json:"status"` // received, processing, delivered, failed + Raw json.RawMessage `json:"raw,omitempty"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` +} + +type Contact struct { + ID string `json:"id"` // phone number, email, etc. + Name string `json:"name,omitempty"` + Type string `json:"type"` // phone, email, username +} + +type Attachment struct { + ID string `json:"id"` + MessageID string `json:"message_id"` + Type string `json:"type"` // mime type + Filename string `json:"filename,omitempty"` + Size int64 `json:"size"` + LocalPath string `json:"-"` // not exposed via API + Transcription string `json:"transcription,omitempty"` + Status string `json:"status"` // stored, processing, routed, deleted +} +``` + +### Command + +```go +type Command struct { + ID string `json:"id"` + Type string `json:"type"` // send, route, delete, archive, forward + Payload json.RawMessage `json:"payload"` + Status string `json:"status"` // pending, executing, completed, failed + Error string `json:"error,omitempty"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` +} +``` + +## REST API + +Base URL: `http://localhost:8040/api/v1` + +### Authentication + +OAuth 2.0 Bearer tokens. All API endpoints require `Authorization: Bearer `. + +Token endpoint: `POST /oauth/token` +- Grant types: `client_credentials`, `authorization_code` +- Scopes: `messages:read`, `messages:write`, `commands:write`, `admin` + +### Endpoints + +#### Messages + +| Method | Endpoint | Description | Scope | +|--------|----------|-------------|-------| +| GET | /messages | List messages (paginated, filterable) | messages:read | +| GET | /messages/:id | Get single message | messages:read | +| GET | /messages/:id/attachments/:att_id | Download attachment | messages:read | +| POST | /messages | Send new message | messages:write | + +**Query parameters for GET /messages:** +- `source` - filter by source (email, whatsapp, signal) +- `direction` - inbound, outbound +- `type` - text, voice, image, document, email +- `since` - ISO8601 timestamp +- `until` - ISO8601 timestamp +- `limit` - max results (default 50, max 200) +- `offset` - pagination offset + +**POST /messages (send):** +```json +{ + "channel": "whatsapp", + "to": "+17272252475", + "body": "Hello!", + "attachments": [ + {"id": "att_xyz"} // reference existing attachment + ] +} +``` + +#### Commands + +| Method | Endpoint | Description | Scope | +|--------|----------|-------------|-------| +| POST | /commands | Execute command | commands:write | +| GET | /commands/:id | Get command status | commands:write | + +**Command types:** + +**route** - Send attachment to external system +```json +{ + "type": "route", + "payload": { + "attachment_id": "att_xyz", + "destination": "docsys", + "options": {} + } +} +``` + +**send** - Send message (alias for POST /messages) +```json +{ + "type": "send", + "payload": { + "channel": "signal", + "to": "+17272252475", + "body": "Got it!" + } +} +``` + +**delete** - Delete message (email) +```json +{ + "type": "delete", + "payload": { + "message_id": "msg_abc" + } +} +``` + +**archive** - Archive message (email) +```json +{ + "type": "archive", + "payload": { + "message_id": "msg_abc" + } +} +``` + +**forward** - Forward attachment to chat +```json +{ + "type": "forward", + "payload": { + "attachment_id": "att_xyz", + "channel": "whatsapp", + "to": "+17272252475", + "caption": "Here's that file" + } +} +``` + +#### Channels + +| Method | Endpoint | Description | Scope | +|--------|----------|-------------|-------| +| GET | /channels | List configured channels | admin | +| GET | /channels/:name/status | Get channel status | admin | +| POST | /channels/:name/reconnect | Force reconnect | admin | + +#### Admin + +| Method | Endpoint | Description | Scope | +|--------|----------|-------------|-------| +| GET | /health | Health check (no auth) | - | +| GET | /ready | Readiness check (no auth) | - | +| GET | /stats | System statistics | admin | + +## Webhooks + +MC sends webhooks to configured endpoints on events. + +**Webhook payload:** +```json +{ + "event": "message.received", + "timestamp": "2026-02-02T21:30:00Z", + "data": { + "id": "msg_abc123", + "source": "whatsapp", + "from": {"id": "+17272253810", "name": "Tanya"}, + "type": "voice", + "body": "Hey, can you check on Sophia's appointment?", + "attachments": [ + {"id": "att_xyz", "type": "audio/ogg", "size": 8420, "transcription": "Hey, can you check on Sophia's appointment?"} + ] + } +} +``` + +**Events:** +- `message.received` - New inbound message +- `message.sent` - Outbound message delivered +- `message.failed` - Outbound message failed +- `channel.connected` - Channel connected +- `channel.disconnected` - Channel disconnected +- `command.completed` - Command finished +- `command.failed` - Command failed + +**Webhook auth:** HMAC-SHA256 signature in `X-MC-Signature` header. + +## Web GUI + +Dashboard for human oversight at `http://localhost:8040/` + +### Pages + +1. **Dashboard** - Overview of all channels, recent messages, system health +2. **Messages** - Searchable/filterable message list with details +3. **Channels** - Channel status, configuration, reconnect buttons +4. **Commands** - Command history and status +5. **Settings** - OAuth clients, webhooks, routing rules + +### Features + +- Real-time updates via SSE (Server-Sent Events) +- Mobile responsive (Tailwind CSS) +- Dark mode +- Message search and filters +- Attachment preview (images, PDFs) +- Audio playback for voice messages + +## Configuration + +```yaml +# config.yaml +server: + host: 0.0.0.0 + port: 8040 + +database: + path: /var/lib/messaging-center/mc.db + +storage: + path: /var/lib/messaging-center/attachments + +oauth: + issuer: http://localhost:8040 + signing_key: ${MC_SIGNING_KEY} + access_token_ttl: 1h + clients: + - id: openclaw + secret: ${MC_OPENCLAW_SECRET} + scopes: [messages:read, messages:write, commands:write] + - id: admin + secret: ${MC_ADMIN_SECRET} + scopes: [messages:read, messages:write, commands:write, admin] + +transcription: + provider: fireworks + api_key: ${FIREWORKS_API_KEY} + model: whisper-v3-turbo + +webhooks: + - url: http://localhost:18789/hooks/message + secret: ${MC_WEBHOOK_SECRET} + events: [message.received, message.sent] + +routing: + docsys: + url: http://localhost:8050/ingest + auth: bearer ${DOCSYS_TOKEN} + +channels: + email: + enabled: true + imap: + host: 127.0.0.1 + port: 1143 + username: tj@jongsma.me + password: ${PROTON_BRIDGE_PASSWORD} + tls: starttls + smtp: + host: 127.0.0.1 + port: 1025 + username: tj@jongsma.me + password: ${PROTON_BRIDGE_PASSWORD} + tls: starttls + + whatsapp: + enabled: true + data_dir: /var/lib/messaging-center/whatsapp + + signal: + enabled: true + api_url: http://localhost:8080 + account: +31634481877 +``` + +## Directory Structure + +``` +messaging-center/ +├── cmd/ +│ └── mc/ +│ └── main.go +├── internal/ +│ ├── adapter/ +│ │ ├── adapter.go # Interface +│ │ ├── email/ +│ │ ├── whatsapp/ +│ │ └── signal/ +│ ├── api/ +│ │ ├── router.go +│ │ ├── messages.go +│ │ ├── commands.go +│ │ ├── channels.go +│ │ └── oauth.go +│ ├── core/ +│ │ ├── engine.go +│ │ ├── message.go +│ │ ├── command.go +│ │ └── transcribe.go +│ ├── store/ +│ │ ├── sqlite.go +│ │ └── migrations/ +│ ├── web/ +│ │ ├── handlers.go +│ │ ├── templates/ +│ │ └── static/ +│ └── webhook/ +│ └── sender.go +├── config.yaml +├── go.mod +├── go.sum +├── Dockerfile +├── docker-compose.yml +└── README.md +``` + +## Milestones + +### v0.1 — Foundation +- [ ] Project structure +- [ ] SQLite store with migrations +- [ ] Core engine (message normalization) +- [ ] REST API scaffold +- [ ] OAuth 2.0 (client_credentials) +- [ ] Health/ready endpoints + +### v0.2 — Email Adapter +- [ ] IMAP inbound (from mail-bridge) +- [ ] SMTP outbound +- [ ] Email-specific commands (delete, archive, move) + +### v0.3 — WhatsApp Adapter +- [ ] whatsmeow integration (from message-bridge) +- [ ] Voice transcription (Fireworks) +- [ ] Media handling + +### v0.4 — Signal Adapter +- [ ] signal-cli JSON-RPC client +- [ ] Bidirectional messaging + +### v0.5 — Web GUI +- [ ] Dashboard +- [ ] Message browser +- [ ] Channel status +- [ ] Settings + +### v0.6 — Webhooks & Routing +- [ ] Webhook sender with HMAC +- [ ] docsys routing +- [ ] Command execution + +### v1.0 — Production Ready +- [ ] Systemd service +- [ ] Documentation +- [ ] Docker image +- [ ] Backup/restore + +--- + +## Open Questions + +1. **OAuth provider:** Build simple local provider, or integrate with external (Authentik)? + - *Recommendation:* Start with simple local provider, extract later if needed + +2. **Signal-cli:** Keep as external daemon, or embed? + - *Recommendation:* Keep external (Java, complex), adapter calls JSON-RPC + +3. **Database:** SQLite vs Postgres? + - *Recommendation:* SQLite for now (simpler), abstract for later swap + +4. **Message retention:** How long to keep messages/attachments? + - **Decision:** Forever. Storage is cheap. + +--- + +*Ready to build. Starting with v0.1 foundation.* diff --git a/cmd/mc/main.go b/cmd/mc/main.go new file mode 100644 index 0000000..1f97093 --- /dev/null +++ b/cmd/mc/main.go @@ -0,0 +1,71 @@ +package main + +import ( + "flag" + "fmt" + "log" + "net/http" + "os" + "os/signal" + "path/filepath" + "syscall" + + "github.com/inou-ai/messaging-center/internal/api" + "github.com/inou-ai/messaging-center/internal/core" + "github.com/inou-ai/messaging-center/internal/store" +) + +func main() { + configPath := flag.String("config", "config.yaml", "Path to config file") + flag.Parse() + + // Load configuration + cfg, err := core.LoadConfig(*configPath) + if err != nil { + log.Fatalf("Failed to load config: %v", err) + } + + // Ensure storage directory exists + if err := os.MkdirAll(cfg.Storage.Path, 0755); err != nil { + log.Fatalf("Failed to create storage directory: %v", err) + } + + // Ensure database directory exists + dbDir := filepath.Dir(cfg.Database.Path) + if dbDir != "." && dbDir != "" { + if err := os.MkdirAll(dbDir, 0755); err != nil { + log.Fatalf("Failed to create database directory: %v", err) + } + } + + // Initialize store + store, err := store.New(cfg.Database.Path) + if err != nil { + log.Fatalf("Failed to initialize store: %v", err) + } + defer store.Close() + + // Create API server + srv := api.NewServer(cfg, store) + + // Start HTTP server + addr := fmt.Sprintf("%s:%d", cfg.Server.Host, cfg.Server.Port) + httpServer := &http.Server{ + Addr: addr, + Handler: srv, + } + + // Handle shutdown + done := make(chan os.Signal, 1) + signal.Notify(done, os.Interrupt, syscall.SIGTERM) + + go func() { + log.Printf("Starting messaging-center on %s", addr) + if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed { + log.Fatalf("HTTP server error: %v", err) + } + }() + + <-done + log.Println("Shutting down...") +} diff --git a/config.yaml b/config.yaml new file mode 100644 index 0000000..9f0ac7a --- /dev/null +++ b/config.yaml @@ -0,0 +1,61 @@ +# Messaging Center Configuration + +server: + host: 0.0.0.0 + port: 8040 + +database: + path: ./data/mc.db + +storage: + path: ./data/attachments + +oauth: + issuer: http://localhost:8040 + signing_key: ${MC_SIGNING_KEY:-change-me-in-production} + access_token_ttl: 1h + clients: + - id: openclaw + secret: ${MC_OPENCLAW_SECRET:-dev-secret-openclaw} + scopes: [messages:read, messages:write, commands:write] + - id: admin + secret: ${MC_ADMIN_SECRET:-dev-secret-admin} + scopes: [messages:read, messages:write, commands:write, admin] + +# Transcription (not used in v0.1) +transcription: + provider: fireworks + api_key: ${FIREWORKS_API_KEY:-} + model: whisper-v3-turbo + +# Webhooks (not implemented in v0.1) +webhooks: + - url: http://localhost:18789/hooks/message + secret: ${MC_WEBHOOK_SECRET:-} + events: [message.received, message.sent] + +# Channels (not implemented in v0.1) +channels: + email: + enabled: false + imap: + host: 127.0.0.1 + port: 1143 + username: ${EMAIL_USERNAME:-} + password: ${EMAIL_PASSWORD:-} + tls: starttls + smtp: + host: 127.0.0.1 + port: 1025 + username: ${EMAIL_USERNAME:-} + password: ${EMAIL_PASSWORD:-} + tls: starttls + + whatsapp: + enabled: false + data_dir: ./data/whatsapp + + signal: + enabled: false + api_url: http://localhost:8080 + account: ${SIGNAL_ACCOUNT:-} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..3b61091 --- /dev/null +++ b/go.mod @@ -0,0 +1,10 @@ +module github.com/inou-ai/messaging-center + +go 1.23 + +require ( + github.com/go-chi/chi/v5 v5.2.1 + github.com/google/uuid v1.6.0 + github.com/mattn/go-sqlite3 v1.14.24 + gopkg.in/yaml.v3 v3.0.1 +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..0d8ae8d --- /dev/null +++ b/go.sum @@ -0,0 +1,10 @@ +github.com/go-chi/chi/v5 v5.2.1 h1:KOIHODQj58PmL80G2Eak4WdvUzjSJSm0vG72crDCqb8= +github.com/go-chi/chi/v5 v5.2.1/go.mod h1:L2yAIGWB3H+phAw1NxKwWM+7eUH/lU8pOMm5hHcoops= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/mattn/go-sqlite3 v1.14.24 h1:tpSp2G2KyMnnQu99ngJ47EIkWVmliIizyZBfPrBWDRM= +github.com/mattn/go-sqlite3 v1.14.24/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/api/channels.go b/internal/api/channels.go new file mode 100644 index 0000000..0e23fc9 --- /dev/null +++ b/internal/api/channels.go @@ -0,0 +1,45 @@ +package api + +import ( + "net/http" + + "github.com/inou-ai/messaging-center/internal/core" +) + +// ChannelsResponse wraps a list of channels for API response. +type ChannelsResponse struct { + Channels []core.Channel `json:"channels"` +} + +// handleListChannels handles GET /api/v1/channels (stub). +func (s *Server) handleListChannels(w http.ResponseWriter, r *http.Request) { + // For now, return configured channels with unknown status + // Actual status will come from adapters once implemented + var channels []core.Channel + + if s.config.Channels.Email != nil && s.config.Channels.Email.Enabled { + channels = append(channels, core.Channel{ + Name: "email", + Enabled: true, + Status: "not_implemented", + }) + } + + if s.config.Channels.WhatsApp != nil && s.config.Channels.WhatsApp.Enabled { + channels = append(channels, core.Channel{ + Name: "whatsapp", + Enabled: true, + Status: "not_implemented", + }) + } + + if s.config.Channels.Signal != nil && s.config.Channels.Signal.Enabled { + channels = append(channels, core.Channel{ + Name: "signal", + Enabled: true, + Status: "not_implemented", + }) + } + + writeJSON(w, http.StatusOK, ChannelsResponse{Channels: channels}) +} diff --git a/internal/api/commands.go b/internal/api/commands.go new file mode 100644 index 0000000..fa6bc8a --- /dev/null +++ b/internal/api/commands.go @@ -0,0 +1,87 @@ +package api + +import ( + "encoding/json" + "net/http" + "time" + + "github.com/go-chi/chi/v5" + "github.com/google/uuid" + "github.com/inou-ai/messaging-center/internal/core" +) + +// CommandResponse wraps a command for API response. +type CommandResponse struct { + Command *core.Command `json:"command"` +} + +// CreateCommandRequest is the request body for creating a command. +type CreateCommandRequest struct { + Type string `json:"type"` + Payload json.RawMessage `json:"payload"` +} + +var validCommandTypes = map[string]bool{ + "send": true, + "route": true, + "delete": true, + "archive": true, + "forward": true, +} + +// handleCreateCommand handles POST /api/v1/commands (stub). +func (s *Server) handleCreateCommand(w http.ResponseWriter, r *http.Request) { + var req CreateCommandRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + writeJSON(w, http.StatusBadRequest, map[string]string{"error": "invalid request body"}) + return + } + + if req.Type == "" { + writeJSON(w, http.StatusBadRequest, map[string]string{"error": "type is required"}) + return + } + + if !validCommandTypes[req.Type] { + writeJSON(w, http.StatusBadRequest, map[string]string{"error": "invalid command type"}) + return + } + + now := time.Now() + cmd := &core.Command{ + ID: "cmd_" + uuid.New().String()[:8], + Type: req.Type, + Payload: req.Payload, + Status: "pending", + CreatedAt: now, + UpdatedAt: now, + } + + if err := s.store.CreateCommand(cmd); err != nil { + writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()}) + return + } + + // TODO: Actually execute the command via adapters + // For now, just mark it as pending + + writeJSON(w, http.StatusAccepted, CommandResponse{Command: cmd}) +} + +// handleGetCommand handles GET /api/v1/commands/:id. +func (s *Server) handleGetCommand(w http.ResponseWriter, r *http.Request) { + id := chi.URLParam(r, "id") + + cmd, err := s.store.GetCommand(id) + if err != nil { + writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()}) + return + } + + if cmd == nil { + writeJSON(w, http.StatusNotFound, map[string]string{"error": "command not found"}) + return + } + + writeJSON(w, http.StatusOK, CommandResponse{Command: cmd}) +} diff --git a/internal/api/context.go b/internal/api/context.go new file mode 100644 index 0000000..c38b647 --- /dev/null +++ b/internal/api/context.go @@ -0,0 +1,20 @@ +package api + +import "context" + +type contextKey string + +const tokenInfoKey contextKey = "tokenInfo" + +// withTokenInfo stores token info in context. +func withTokenInfo(ctx context.Context, info *TokenInfo) context.Context { + return context.WithValue(ctx, tokenInfoKey, info) +} + +// GetTokenInfo retrieves token info from context. +func GetTokenInfo(ctx context.Context) *TokenInfo { + if v := ctx.Value(tokenInfoKey); v != nil { + return v.(*TokenInfo) + } + return nil +} diff --git a/internal/api/messages.go b/internal/api/messages.go new file mode 100644 index 0000000..8c3b559 --- /dev/null +++ b/internal/api/messages.go @@ -0,0 +1,156 @@ +package api + +import ( + "encoding/json" + "net/http" + "strconv" + "time" + + "github.com/go-chi/chi/v5" + "github.com/google/uuid" + "github.com/inou-ai/messaging-center/internal/core" +) + +// MessageResponse wraps a message for API response. +type MessageResponse struct { + Message *core.Message `json:"message"` +} + +// MessagesResponse wraps a list of messages for API response. +type MessagesResponse struct { + Messages []core.Message `json:"messages"` + Count int `json:"count"` + Limit int `json:"limit"` + Offset int `json:"offset"` +} + +// CreateMessageRequest is the request body for creating a message. +type CreateMessageRequest struct { + Channel string `json:"channel"` + To string `json:"to"` + Body string `json:"body"` + Subject string `json:"subject,omitempty"` + Attachments []string `json:"attachments,omitempty"` // attachment IDs +} + +// handleListMessages handles GET /api/v1/messages. +func (s *Server) handleListMessages(w http.ResponseWriter, r *http.Request) { + filter := core.MessageFilter{ + Source: r.URL.Query().Get("source"), + Direction: r.URL.Query().Get("direction"), + Type: r.URL.Query().Get("type"), + } + + if since := r.URL.Query().Get("since"); since != "" { + if t, err := time.Parse(time.RFC3339, since); err == nil { + filter.Since = &t + } + } + + if until := r.URL.Query().Get("until"); until != "" { + if t, err := time.Parse(time.RFC3339, until); err == nil { + filter.Until = &t + } + } + + if limit := r.URL.Query().Get("limit"); limit != "" { + if l, err := strconv.Atoi(limit); err == nil { + if l > 200 { + l = 200 + } + filter.Limit = l + } + } + + if offset := r.URL.Query().Get("offset"); offset != "" { + if o, err := strconv.Atoi(offset); err == nil { + filter.Offset = o + } + } + + messages, err := s.store.ListMessages(filter) + if err != nil { + writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()}) + return + } + + if messages == nil { + messages = []core.Message{} + } + + writeJSON(w, http.StatusOK, MessagesResponse{ + Messages: messages, + Count: len(messages), + Limit: filter.Limit, + Offset: filter.Offset, + }) +} + +// handleGetMessage handles GET /api/v1/messages/:id. +func (s *Server) handleGetMessage(w http.ResponseWriter, r *http.Request) { + id := chi.URLParam(r, "id") + + msg, err := s.store.GetMessage(id) + if err != nil { + writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()}) + return + } + + if msg == nil { + writeJSON(w, http.StatusNotFound, map[string]string{"error": "message not found"}) + return + } + + writeJSON(w, http.StatusOK, MessageResponse{Message: msg}) +} + +// handleCreateMessage handles POST /api/v1/messages (stub). +func (s *Server) handleCreateMessage(w http.ResponseWriter, r *http.Request) { + var req CreateMessageRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + writeJSON(w, http.StatusBadRequest, map[string]string{"error": "invalid request body"}) + return + } + + if req.Channel == "" || req.To == "" || req.Body == "" { + writeJSON(w, http.StatusBadRequest, map[string]string{"error": "channel, to, and body are required"}) + return + } + + // For now, just create a pending message record + // Actual sending will be implemented with adapters + now := time.Now() + msg := &core.Message{ + ID: "msg_" + uuid.New().String()[:8], + Source: req.Channel, + Direction: "outbound", + From: core.Contact{ID: "system", Type: "system"}, + To: core.Contact{ID: req.To, Type: contactType(req.Channel)}, + Timestamp: now, + Type: "text", + Subject: req.Subject, + Body: req.Body, + Status: "pending", + CreatedAt: now, + UpdatedAt: now, + } + + if err := s.store.CreateMessage(msg); err != nil { + writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()}) + return + } + + writeJSON(w, http.StatusAccepted, MessageResponse{Message: msg}) +} + +// contactType returns the contact type for a channel. +func contactType(channel string) string { + switch channel { + case "email": + return "email" + case "whatsapp", "signal", "sms": + return "phone" + default: + return "unknown" + } +} diff --git a/internal/api/oauth.go b/internal/api/oauth.go new file mode 100644 index 0000000..7cd5213 --- /dev/null +++ b/internal/api/oauth.go @@ -0,0 +1,220 @@ +package api + +import ( + "crypto/hmac" + "crypto/sha256" + "crypto/subtle" + "encoding/base64" + "encoding/json" + "net/http" + "strings" + "sync" + "time" + + "github.com/google/uuid" + "github.com/inou-ai/messaging-center/internal/core" +) + +// TokenInfo represents an issued access token. +type TokenInfo struct { + ClientID string + Scopes []string + ExpiresAt time.Time +} + +// OAuthProvider handles OAuth 2.0 authentication. +type OAuthProvider struct { + config *core.OAuthConfig + tokens map[string]*TokenInfo + mu sync.RWMutex +} + +// NewOAuthProvider creates a new OAuth provider. +func NewOAuthProvider(cfg *core.OAuthConfig) *OAuthProvider { + return &OAuthProvider{ + config: cfg, + tokens: make(map[string]*TokenInfo), + } +} + +// TokenResponse is the OAuth token endpoint response. +type TokenResponse struct { + AccessToken string `json:"access_token"` + TokenType string `json:"token_type"` + ExpiresIn int `json:"expires_in"` + Scope string `json:"scope,omitempty"` +} + +// TokenError is the OAuth error response. +type TokenError struct { + Error string `json:"error"` + Description string `json:"error_description,omitempty"` +} + +// HandleToken handles POST /oauth/token. +func (p *OAuthProvider) HandleToken(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + writeJSON(w, http.StatusMethodNotAllowed, TokenError{Error: "method_not_allowed"}) + return + } + + if err := r.ParseForm(); err != nil { + writeJSON(w, http.StatusBadRequest, TokenError{Error: "invalid_request"}) + return + } + + grantType := r.FormValue("grant_type") + if grantType != "client_credentials" { + writeJSON(w, http.StatusBadRequest, TokenError{ + Error: "unsupported_grant_type", + Description: "Only client_credentials is supported", + }) + return + } + + // Get client credentials from Basic auth or form + clientID, clientSecret, ok := r.BasicAuth() + if !ok { + clientID = r.FormValue("client_id") + clientSecret = r.FormValue("client_secret") + } + + if clientID == "" || clientSecret == "" { + writeJSON(w, http.StatusUnauthorized, TokenError{Error: "invalid_client"}) + return + } + + // Find and validate client + var client *core.OAuthClient + for i := range p.config.Clients { + if p.config.Clients[i].ID == clientID { + client = &p.config.Clients[i] + break + } + } + + if client == nil || !secureCompare(client.Secret, clientSecret) { + writeJSON(w, http.StatusUnauthorized, TokenError{Error: "invalid_client"}) + return + } + + // Parse requested scopes + requestedScopes := strings.Fields(r.FormValue("scope")) + var grantedScopes []string + + if len(requestedScopes) == 0 { + // Grant all client scopes + grantedScopes = client.Scopes + } else { + // Grant only requested scopes that client has + for _, s := range requestedScopes { + if client.HasScope(s) { + grantedScopes = append(grantedScopes, s) + } + } + } + + // Generate token + token := generateToken() + expiresAt := time.Now().Add(p.config.AccessTokenTTL) + + p.mu.Lock() + p.tokens[token] = &TokenInfo{ + ClientID: clientID, + Scopes: grantedScopes, + ExpiresAt: expiresAt, + } + p.mu.Unlock() + + writeJSON(w, http.StatusOK, TokenResponse{ + AccessToken: token, + TokenType: "Bearer", + ExpiresIn: int(p.config.AccessTokenTTL.Seconds()), + Scope: strings.Join(grantedScopes, " "), + }) +} + +// ValidateToken validates a bearer token and returns its info. +func (p *OAuthProvider) ValidateToken(token string) *TokenInfo { + p.mu.RLock() + defer p.mu.RUnlock() + + info, ok := p.tokens[token] + if !ok { + return nil + } + + if time.Now().After(info.ExpiresAt) { + return nil + } + + return info +} + +// RequireAuth returns middleware that requires authentication. +func (p *OAuthProvider) RequireAuth(scopes ...string) func(http.Handler) http.Handler { + return func(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + auth := r.Header.Get("Authorization") + if auth == "" { + w.Header().Set("WWW-Authenticate", "Bearer") + writeJSON(w, http.StatusUnauthorized, TokenError{Error: "unauthorized"}) + return + } + + parts := strings.SplitN(auth, " ", 2) + if len(parts) != 2 || !strings.EqualFold(parts[0], "Bearer") { + writeJSON(w, http.StatusUnauthorized, TokenError{Error: "invalid_token"}) + return + } + + info := p.ValidateToken(parts[1]) + if info == nil { + writeJSON(w, http.StatusUnauthorized, TokenError{Error: "invalid_token"}) + return + } + + // Check required scopes + for _, required := range scopes { + hasScope := false + for _, s := range info.Scopes { + if s == required || s == "admin" { + hasScope = true + break + } + } + if !hasScope { + writeJSON(w, http.StatusForbidden, TokenError{ + Error: "insufficient_scope", + Description: "Required scope: " + required, + }) + return + } + } + + // Store token info in context + ctx := r.Context() + ctx = withTokenInfo(ctx, info) + next.ServeHTTP(w, r.WithContext(ctx)) + }) + } +} + +// generateToken creates a secure random token. +func generateToken() string { + id := uuid.New() + h := hmac.New(sha256.New, []byte("mc-token-salt")) + h.Write([]byte(id.String())) + return base64.RawURLEncoding.EncodeToString(h.Sum(nil)) +} + +// secureCompare compares two strings in constant time. +func secureCompare(a, b string) bool { + return subtle.ConstantTimeCompare([]byte(a), []byte(b)) == 1 +} + +func writeJSON(w http.ResponseWriter, status int, v any) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(status) + json.NewEncoder(w).Encode(v) +} diff --git a/internal/api/router.go b/internal/api/router.go new file mode 100644 index 0000000..429585d --- /dev/null +++ b/internal/api/router.go @@ -0,0 +1,90 @@ +package api + +import ( + "net/http" + + "github.com/go-chi/chi/v5" + "github.com/go-chi/chi/v5/middleware" + "github.com/inou-ai/messaging-center/internal/core" + "github.com/inou-ai/messaging-center/internal/store" +) + +// Server is the HTTP API server. +type Server struct { + config *core.Config + store *store.Store + oauth *OAuthProvider + router chi.Router +} + +// NewServer creates a new API server. +func NewServer(cfg *core.Config, s *store.Store) *Server { + srv := &Server{ + config: cfg, + store: s, + oauth: NewOAuthProvider(&cfg.OAuth), + } + srv.setupRoutes() + return srv +} + +// ServeHTTP implements http.Handler. +func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { + s.router.ServeHTTP(w, r) +} + +func (s *Server) setupRoutes() { + r := chi.NewRouter() + + // Middleware + r.Use(middleware.Logger) + r.Use(middleware.Recoverer) + r.Use(middleware.RealIP) + + // Health endpoints (no auth) + r.Get("/health", s.handleHealth) + r.Get("/ready", s.handleReady) + + // OAuth token endpoint + r.Post("/oauth/token", s.oauth.HandleToken) + + // API v1 + r.Route("/api/v1", func(r chi.Router) { + // Messages + r.Route("/messages", func(r chi.Router) { + r.With(s.oauth.RequireAuth("messages:read")).Get("/", s.handleListMessages) + r.With(s.oauth.RequireAuth("messages:read")).Get("/{id}", s.handleGetMessage) + r.With(s.oauth.RequireAuth("messages:write")).Post("/", s.handleCreateMessage) + }) + + // Commands + r.Route("/commands", func(r chi.Router) { + r.With(s.oauth.RequireAuth("commands:write")).Post("/", s.handleCreateCommand) + r.With(s.oauth.RequireAuth("commands:write")).Get("/{id}", s.handleGetCommand) + }) + + // Channels + r.Route("/channels", func(r chi.Router) { + r.With(s.oauth.RequireAuth("admin")).Get("/", s.handleListChannels) + }) + }) + + s.router = r +} + +// handleHealth returns 200 OK if the server is running. +func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) { + writeJSON(w, http.StatusOK, map[string]string{"status": "ok"}) +} + +// handleReady returns 200 OK if the server is ready to accept traffic. +func (s *Server) handleReady(w http.ResponseWriter, r *http.Request) { + if err := s.store.Ping(); err != nil { + writeJSON(w, http.StatusServiceUnavailable, map[string]string{ + "status": "not ready", + "error": err.Error(), + }) + return + } + writeJSON(w, http.StatusOK, map[string]string{"status": "ready"}) +} diff --git a/internal/core/config.go b/internal/core/config.go new file mode 100644 index 0000000..69c51ae --- /dev/null +++ b/internal/core/config.go @@ -0,0 +1,160 @@ +package core + +import ( + "os" + "strings" + "time" + + "gopkg.in/yaml.v3" +) + +// Config represents the application configuration. +type Config struct { + Server ServerConfig `yaml:"server"` + Database DatabaseConfig `yaml:"database"` + Storage StorageConfig `yaml:"storage"` + OAuth OAuthConfig `yaml:"oauth"` + Transcription TranscriptionConfig `yaml:"transcription"` + Webhooks []WebhookConfig `yaml:"webhooks"` + Channels ChannelsConfig `yaml:"channels"` +} + +type ServerConfig struct { + Host string `yaml:"host"` + Port int `yaml:"port"` +} + +type DatabaseConfig struct { + Path string `yaml:"path"` +} + +type StorageConfig struct { + Path string `yaml:"path"` +} + +type OAuthConfig struct { + Issuer string `yaml:"issuer"` + SigningKey string `yaml:"signing_key"` + AccessTokenTTL time.Duration `yaml:"access_token_ttl"` + Clients []OAuthClient `yaml:"clients"` +} + +type OAuthClient struct { + ID string `yaml:"id"` + Secret string `yaml:"secret"` + Scopes []string `yaml:"scopes"` +} + +type TranscriptionConfig struct { + Provider string `yaml:"provider"` + APIKey string `yaml:"api_key"` + Model string `yaml:"model"` +} + +type WebhookConfig struct { + URL string `yaml:"url"` + Secret string `yaml:"secret"` + Events []string `yaml:"events"` +} + +type ChannelsConfig struct { + Email *EmailChannelConfig `yaml:"email"` + WhatsApp *WhatsAppChannelConfig `yaml:"whatsapp"` + Signal *SignalChannelConfig `yaml:"signal"` +} + +type EmailChannelConfig struct { + Enabled bool `yaml:"enabled"` + IMAP IMAPConfig `yaml:"imap"` + SMTP SMTPConfig `yaml:"smtp"` +} + +type IMAPConfig struct { + Host string `yaml:"host"` + Port int `yaml:"port"` + Username string `yaml:"username"` + Password string `yaml:"password"` + TLS string `yaml:"tls"` +} + +type SMTPConfig struct { + Host string `yaml:"host"` + Port int `yaml:"port"` + Username string `yaml:"username"` + Password string `yaml:"password"` + TLS string `yaml:"tls"` +} + +type WhatsAppChannelConfig struct { + Enabled bool `yaml:"enabled"` + DataDir string `yaml:"data_dir"` +} + +type SignalChannelConfig struct { + Enabled bool `yaml:"enabled"` + APIURL string `yaml:"api_url"` + Account string `yaml:"account"` +} + +// LoadConfig reads and parses the configuration file. +func LoadConfig(path string) (*Config, error) { + data, err := os.ReadFile(path) + if err != nil { + return nil, err + } + + // Expand environment variables with default value support (${VAR:-default}) + expanded := os.Expand(string(data), func(key string) string { + // Handle ${VAR:-default} syntax + if idx := strings.Index(key, ":-"); idx != -1 { + envKey := key[:idx] + defaultVal := key[idx+2:] + if val := os.Getenv(envKey); val != "" { + return val + } + return defaultVal + } + return os.Getenv(key) + }) + + var cfg Config + if err := yaml.Unmarshal([]byte(expanded), &cfg); err != nil { + return nil, err + } + + // Set defaults + if cfg.Server.Host == "" { + cfg.Server.Host = "0.0.0.0" + } + if cfg.Server.Port == 0 { + cfg.Server.Port = 8040 + } + if cfg.Database.Path == "" { + cfg.Database.Path = "./mc.db" + } + if cfg.Storage.Path == "" { + cfg.Storage.Path = "./attachments" + } + if cfg.OAuth.AccessTokenTTL == 0 { + cfg.OAuth.AccessTokenTTL = time.Hour + } + + return &cfg, nil +} + +// HasScope checks if a client has a specific scope. +func (c *OAuthClient) HasScope(scope string) bool { + for _, s := range c.Scopes { + if s == scope || s == "admin" { + return true + } + // Check wildcard (e.g., "messages:*" matches "messages:read") + if strings.HasSuffix(s, ":*") { + prefix := strings.TrimSuffix(s, "*") + if strings.HasPrefix(scope, prefix) { + return true + } + } + } + return false +} diff --git a/internal/core/types.go b/internal/core/types.go new file mode 100644 index 0000000..52a8326 --- /dev/null +++ b/internal/core/types.go @@ -0,0 +1,73 @@ +package core + +import ( + "encoding/json" + "time" +) + +// Message represents a normalized message from any channel. +type Message struct { + ID string `json:"id"` + Source string `json:"source"` // email, whatsapp, signal, sms, voicemail + Direction string `json:"direction"` // inbound, outbound + From Contact `json:"from"` + To Contact `json:"to"` + Timestamp time.Time `json:"timestamp"` + Type string `json:"type"` // text, voice, image, document, email + Subject string `json:"subject,omitempty"` + Body string `json:"body"` + BodyHTML string `json:"body_html,omitempty"` + Attachments []Attachment `json:"attachments,omitempty"` + Status string `json:"status"` // received, processing, delivered, failed + Raw json.RawMessage `json:"raw,omitempty"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` +} + +// Contact represents a sender or recipient. +type Contact struct { + ID string `json:"id"` // phone number, email, etc. + Name string `json:"name,omitempty"` + Type string `json:"type"` // phone, email, username +} + +// Attachment represents a file attached to a message. +type Attachment struct { + ID string `json:"id"` + MessageID string `json:"message_id"` + Type string `json:"type"` // mime type + Filename string `json:"filename,omitempty"` + Size int64 `json:"size"` + LocalPath string `json:"-"` // not exposed via API + Transcription string `json:"transcription,omitempty"` + Status string `json:"status"` // stored, processing, routed, deleted +} + +// Command represents an action to execute. +type Command struct { + ID string `json:"id"` + Type string `json:"type"` // send, route, delete, archive, forward + Payload json.RawMessage `json:"payload"` + Status string `json:"status"` // pending, executing, completed, failed + Error string `json:"error,omitempty"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` +} + +// Channel represents a configured messaging channel. +type Channel struct { + Name string `json:"name"` + Enabled bool `json:"enabled"` + Status string `json:"status"` // connected, disconnected, error +} + +// MessageFilter for querying messages. +type MessageFilter struct { + Source string + Direction string + Type string + Since *time.Time + Until *time.Time + Limit int + Offset int +} diff --git a/internal/store/migrations/001_initial.sql b/internal/store/migrations/001_initial.sql new file mode 100644 index 0000000..bb8bdb0 --- /dev/null +++ b/internal/store/migrations/001_initial.sql @@ -0,0 +1,52 @@ +-- Initial schema for messaging-center + +CREATE TABLE messages ( + id TEXT PRIMARY KEY, + source TEXT NOT NULL, -- email, whatsapp, signal, sms, voicemail + direction TEXT NOT NULL, -- inbound, outbound + from_contact TEXT NOT NULL, -- JSON: {id, name, type} + to_contact TEXT NOT NULL, -- JSON: {id, name, type} + timestamp DATETIME NOT NULL, + type TEXT NOT NULL, -- text, voice, image, document, email + subject TEXT, + body TEXT, + body_html TEXT, + status TEXT NOT NULL, -- received, processing, delivered, failed + raw TEXT, -- original message JSON + created_at DATETIME NOT NULL, + updated_at DATETIME NOT NULL +); + +CREATE INDEX idx_messages_source ON messages(source); +CREATE INDEX idx_messages_direction ON messages(direction); +CREATE INDEX idx_messages_timestamp ON messages(timestamp); +CREATE INDEX idx_messages_type ON messages(type); +CREATE INDEX idx_messages_status ON messages(status); + +CREATE TABLE attachments ( + id TEXT PRIMARY KEY, + message_id TEXT NOT NULL REFERENCES messages(id) ON DELETE CASCADE, + type TEXT NOT NULL, -- mime type + filename TEXT, + size INTEGER NOT NULL, + local_path TEXT, + transcription TEXT, + status TEXT NOT NULL -- stored, processing, routed, deleted +); + +CREATE INDEX idx_attachments_message_id ON attachments(message_id); +CREATE INDEX idx_attachments_status ON attachments(status); + +CREATE TABLE commands ( + id TEXT PRIMARY KEY, + type TEXT NOT NULL, -- send, route, delete, archive, forward + payload TEXT, -- JSON + status TEXT NOT NULL, -- pending, executing, completed, failed + error TEXT, + created_at DATETIME NOT NULL, + updated_at DATETIME NOT NULL +); + +CREATE INDEX idx_commands_type ON commands(type); +CREATE INDEX idx_commands_status ON commands(status); +CREATE INDEX idx_commands_created_at ON commands(created_at); diff --git a/internal/store/sqlite.go b/internal/store/sqlite.go new file mode 100644 index 0000000..dcb2284 --- /dev/null +++ b/internal/store/sqlite.go @@ -0,0 +1,305 @@ +package store + +import ( + "database/sql" + "embed" + "encoding/json" + "fmt" + "io/fs" + "sort" + "strings" + "time" + + "github.com/inou-ai/messaging-center/internal/core" + _ "github.com/mattn/go-sqlite3" +) + +//go:embed migrations/*.sql +var migrations embed.FS + +// Store provides database operations. +type Store struct { + db *sql.DB +} + +// New creates a new Store and runs migrations. +func New(dbPath string) (*Store, error) { + db, err := sql.Open("sqlite3", dbPath+"?_journal_mode=WAL&_foreign_keys=on") + if err != nil { + return nil, fmt.Errorf("open database: %w", err) + } + + s := &Store{db: db} + if err := s.migrate(); err != nil { + db.Close() + return nil, fmt.Errorf("migrate: %w", err) + } + + return s, nil +} + +// Close closes the database connection. +func (s *Store) Close() error { + return s.db.Close() +} + +// migrate runs all SQL migrations in order. +func (s *Store) migrate() error { + // Create migrations table + _, err := s.db.Exec(` + CREATE TABLE IF NOT EXISTS migrations ( + id INTEGER PRIMARY KEY, + name TEXT NOT NULL UNIQUE, + applied_at DATETIME DEFAULT CURRENT_TIMESTAMP + ) + `) + if err != nil { + return fmt.Errorf("create migrations table: %w", err) + } + + // Get applied migrations + rows, err := s.db.Query("SELECT name FROM migrations") + if err != nil { + return fmt.Errorf("query migrations: %w", err) + } + applied := make(map[string]bool) + for rows.Next() { + var name string + if err := rows.Scan(&name); err != nil { + rows.Close() + return err + } + applied[name] = true + } + rows.Close() + + // Get migration files + entries, err := fs.ReadDir(migrations, "migrations") + if err != nil { + return fmt.Errorf("read migrations dir: %w", err) + } + + // Sort by name + var files []string + for _, e := range entries { + if !e.IsDir() && strings.HasSuffix(e.Name(), ".sql") { + files = append(files, e.Name()) + } + } + sort.Strings(files) + + // Apply unapplied migrations + for _, name := range files { + if applied[name] { + continue + } + + data, err := migrations.ReadFile("migrations/" + name) + if err != nil { + return fmt.Errorf("read migration %s: %w", name, err) + } + + tx, err := s.db.Begin() + if err != nil { + return fmt.Errorf("begin transaction: %w", err) + } + + if _, err := tx.Exec(string(data)); err != nil { + tx.Rollback() + return fmt.Errorf("execute migration %s: %w", name, err) + } + + if _, err := tx.Exec("INSERT INTO migrations (name) VALUES (?)", name); err != nil { + tx.Rollback() + return fmt.Errorf("record migration %s: %w", name, err) + } + + if err := tx.Commit(); err != nil { + return fmt.Errorf("commit migration %s: %w", name, err) + } + } + + return nil +} + +// DB returns the underlying database connection. +func (s *Store) DB() *sql.DB { + return s.db +} + +// CreateMessage inserts a new message. +func (s *Store) CreateMessage(m *core.Message) error { + raw, _ := json.Marshal(m.Raw) + from, _ := json.Marshal(m.From) + to, _ := json.Marshal(m.To) + + _, err := s.db.Exec(` + INSERT INTO messages (id, source, direction, from_contact, to_contact, timestamp, type, subject, body, body_html, status, raw, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + `, m.ID, m.Source, m.Direction, from, to, m.Timestamp, m.Type, m.Subject, m.Body, m.BodyHTML, m.Status, raw, m.CreatedAt, m.UpdatedAt) + return err +} + +// GetMessage retrieves a message by ID. +func (s *Store) GetMessage(id string) (*core.Message, error) { + var m core.Message + var from, to, raw []byte + + err := s.db.QueryRow(` + SELECT id, source, direction, from_contact, to_contact, timestamp, type, subject, body, body_html, status, raw, created_at, updated_at + FROM messages WHERE id = ? + `, id).Scan(&m.ID, &m.Source, &m.Direction, &from, &to, &m.Timestamp, &m.Type, &m.Subject, &m.Body, &m.BodyHTML, &m.Status, &raw, &m.CreatedAt, &m.UpdatedAt) + if err == sql.ErrNoRows { + return nil, nil + } + if err != nil { + return nil, err + } + + json.Unmarshal(from, &m.From) + json.Unmarshal(to, &m.To) + m.Raw = raw + + // Load attachments + attachments, err := s.GetAttachmentsByMessage(id) + if err != nil { + return nil, err + } + m.Attachments = attachments + + return &m, nil +} + +// ListMessages retrieves messages with optional filters. +func (s *Store) ListMessages(f core.MessageFilter) ([]core.Message, error) { + query := `SELECT id, source, direction, from_contact, to_contact, timestamp, type, subject, body, body_html, status, raw, created_at, updated_at FROM messages WHERE 1=1` + args := []any{} + + if f.Source != "" { + query += " AND source = ?" + args = append(args, f.Source) + } + if f.Direction != "" { + query += " AND direction = ?" + args = append(args, f.Direction) + } + if f.Type != "" { + query += " AND type = ?" + args = append(args, f.Type) + } + if f.Since != nil { + query += " AND timestamp >= ?" + args = append(args, *f.Since) + } + if f.Until != nil { + query += " AND timestamp <= ?" + args = append(args, *f.Until) + } + + query += " ORDER BY timestamp DESC" + + if f.Limit > 0 { + query += fmt.Sprintf(" LIMIT %d", f.Limit) + } else { + query += " LIMIT 50" + } + if f.Offset > 0 { + query += fmt.Sprintf(" OFFSET %d", f.Offset) + } + + rows, err := s.db.Query(query, args...) + if err != nil { + return nil, err + } + defer rows.Close() + + var messages []core.Message + for rows.Next() { + var m core.Message + var from, to, raw []byte + + if err := rows.Scan(&m.ID, &m.Source, &m.Direction, &from, &to, &m.Timestamp, &m.Type, &m.Subject, &m.Body, &m.BodyHTML, &m.Status, &raw, &m.CreatedAt, &m.UpdatedAt); err != nil { + return nil, err + } + + json.Unmarshal(from, &m.From) + json.Unmarshal(to, &m.To) + m.Raw = raw + + messages = append(messages, m) + } + + return messages, nil +} + +// CreateAttachment inserts a new attachment. +func (s *Store) CreateAttachment(a *core.Attachment) error { + _, err := s.db.Exec(` + INSERT INTO attachments (id, message_id, type, filename, size, local_path, transcription, status) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + `, a.ID, a.MessageID, a.Type, a.Filename, a.Size, a.LocalPath, a.Transcription, a.Status) + return err +} + +// GetAttachmentsByMessage retrieves all attachments for a message. +func (s *Store) GetAttachmentsByMessage(messageID string) ([]core.Attachment, error) { + rows, err := s.db.Query(` + SELECT id, message_id, type, filename, size, local_path, transcription, status + FROM attachments WHERE message_id = ? + `, messageID) + if err != nil { + return nil, err + } + defer rows.Close() + + var attachments []core.Attachment + for rows.Next() { + var a core.Attachment + if err := rows.Scan(&a.ID, &a.MessageID, &a.Type, &a.Filename, &a.Size, &a.LocalPath, &a.Transcription, &a.Status); err != nil { + return nil, err + } + attachments = append(attachments, a) + } + + return attachments, nil +} + +// CreateCommand inserts a new command. +func (s *Store) CreateCommand(c *core.Command) error { + _, err := s.db.Exec(` + INSERT INTO commands (id, type, payload, status, error, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?) + `, c.ID, c.Type, c.Payload, c.Status, c.Error, c.CreatedAt, c.UpdatedAt) + return err +} + +// GetCommand retrieves a command by ID. +func (s *Store) GetCommand(id string) (*core.Command, error) { + var c core.Command + + err := s.db.QueryRow(` + SELECT id, type, payload, status, error, created_at, updated_at + FROM commands WHERE id = ? + `, id).Scan(&c.ID, &c.Type, &c.Payload, &c.Status, &c.Error, &c.CreatedAt, &c.UpdatedAt) + if err == sql.ErrNoRows { + return nil, nil + } + if err != nil { + return nil, err + } + + return &c, nil +} + +// UpdateCommandStatus updates a command's status and error. +func (s *Store) UpdateCommandStatus(id, status, errMsg string) error { + _, err := s.db.Exec(` + UPDATE commands SET status = ?, error = ?, updated_at = ? WHERE id = ? + `, status, errMsg, time.Now(), id) + return err +} + +// Ping checks database connectivity. +func (s *Store) Ping() error { + return s.db.Ping() +}