v0.1: Foundation

- Go module with chi router, sqlite3, uuid deps
- Directory structure per spec
- SQLite store with embedded migrations
- Core types: Message, Attachment, Command, Contact, Channel
- REST API:
  - GET /health, GET /ready (no auth)
  - POST /oauth/token (client_credentials grant)
  - GET /api/v1/messages (list with filters)
  - GET /api/v1/messages/:id
  - POST /api/v1/messages (stub, creates pending)
  - POST /api/v1/commands (stub, creates pending)
  - GET /api/v1/channels (stub, lists configured)
- OAuth 2.0 with Bearer token validation
- YAML config with env var expansion (supports ${VAR:-default})
- Ready for v0.2 (email adapter)
This commit is contained in:
Johan Jongsma 2026-02-02 21:34:34 +00:00
commit 34e06fed8e
17 changed files with 1929 additions and 0 deletions

22
.gitignore vendored Normal file
View File

@ -0,0 +1,22 @@
# Binaries
/mc
*.exe
# Data
data/
*.db
*.db-journal
*.db-shm
*.db-wal
# IDE
.idea/
.vscode/
*.swp
# OS
.DS_Store
# Local config overrides
config.local.yaml
.env

74
README.md Normal file
View File

@ -0,0 +1,74 @@
# Messaging Center
Unified messaging hub that aggregates multiple communication channels into a single, normalized interface.
## Status
**v0.1 — Foundation**
- [x] Project structure
- [x] SQLite store with migrations
- [x] Core types (Message, Attachment, Command, Contact)
- [x] REST API scaffold with chi
- [x] OAuth 2.0 (client_credentials)
- [x] Health/ready endpoints
- [ ] Adapters (email, whatsapp, signal) — v0.2+
- [ ] Web GUI — v0.5
- [ ] Webhooks — v0.6
## Quick Start
```bash
# Build
go build -o mc ./cmd/mc
# Run
./mc -config config.yaml
# Or with go run
go run ./cmd/mc -config config.yaml
```
The server starts on `http://localhost:8040` by default.
## API
### Authentication
Get an access token:
```bash
curl -X POST http://localhost:8040/oauth/token \
-d "grant_type=client_credentials" \
-d "client_id=admin" \
-d "client_secret=dev-secret-admin"
```
### Endpoints
| Method | Endpoint | Auth | Description |
|--------|----------|------|-------------|
| GET | /health | No | Health check |
| GET | /ready | No | Readiness check |
| POST | /oauth/token | No | Get access token |
| GET | /api/v1/messages | Yes | List messages |
| GET | /api/v1/messages/:id | Yes | Get message |
| POST | /api/v1/messages | Yes | Send message (stub) |
| POST | /api/v1/commands | Yes | Execute command (stub) |
| GET | /api/v1/channels | Yes (admin) | List channels |
### Example: List Messages
```bash
TOKEN="your-access-token"
curl -H "Authorization: Bearer $TOKEN" \
"http://localhost:8040/api/v1/messages?source=whatsapp&limit=10"
```
## Configuration
See `config.yaml` for all options. Environment variables are expanded (e.g., `${MC_SIGNING_KEY}`).
## License
MIT

473
SPEC.md Normal file
View File

@ -0,0 +1,473 @@
# Messaging Center — Specification
**Version:** 0.1.0
**Status:** Draft
**Author:** James ⚡
## Overview
Unified messaging hub that aggregates multiple communication channels into a single, normalized interface. OpenClaw (or any client) receives metadata via webhook, makes decisions, and sends commands back via REST API.
**Key principle:** Files never transit through OpenClaw. MC stores attachments locally, sends metadata only, executes routing commands.
## Architecture
```
┌─────────────────────────────────────────────────────────────────┐
│ MESSAGING CENTER │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Email │ │WhatsApp │ │ Signal │ │ SMS │ │Voicemail│ │
│ │ (IMAP) │ │(meow) │ │ (cli) │ │ (future)│ │(future) │ │
│ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ │
│ │ │ │ │ │ │
│ └───────────┴───────────┼───────────┴───────────┘ │
│ ▼ │
│ ┌───────────────────┐ │
│ │ Adapter Layer │ │
│ └─────────┬─────────┘ │
│ ▼ │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ Core Engine │ │
│ │ - Normalize messages │ │
│ │ - Store attachments (local) │ │
│ │ - Transcribe voice (Fireworks Whisper) │ │
│ │ - Queue outbound messages │ │
│ │ - Execute commands │ │
│ └───────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌───────────────┼───────────────┐ │
│ ▼ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ REST API │ │ Webhooks │ │ Web GUI │ │
│ │ (commands) │ │ (events) │ │ (oversight) │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ OPENCLAW │
│ - Receives message metadata (no files) │
│ - Makes decisions │
│ - Sends commands back to MC │
└─────────────────────────────────────────────────────────────────┘
```
## Tech Stack
| Component | Technology |
|-----------|------------|
| Language | Go 1.25 |
| Database | SQLite (embedded, simple) |
| Web GUI | Go + templ + htmx |
| Auth | OAuth 2.0 (local provider, or external like Authentik) |
| Transcription | Fireworks Whisper API |
| Email | go-imap v2 |
| WhatsApp | whatsmeow |
| Signal | signal-cli JSON-RPC (external daemon) |
## Data Model
### Message
```go
type Message struct {
ID string `json:"id"`
Source string `json:"source"` // email, whatsapp, signal, sms, voicemail
Direction string `json:"direction"` // inbound, outbound
From Contact `json:"from"`
To Contact `json:"to"`
Timestamp time.Time `json:"timestamp"`
Type string `json:"type"` // text, voice, image, document, email
Subject string `json:"subject,omitempty"`
Body string `json:"body"` // text content or transcription
BodyHTML string `json:"body_html,omitempty"`
Attachments []Attachment `json:"attachments,omitempty"`
Status string `json:"status"` // received, processing, delivered, failed
Raw json.RawMessage `json:"raw,omitempty"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
type Contact struct {
ID string `json:"id"` // phone number, email, etc.
Name string `json:"name,omitempty"`
Type string `json:"type"` // phone, email, username
}
type Attachment struct {
ID string `json:"id"`
MessageID string `json:"message_id"`
Type string `json:"type"` // mime type
Filename string `json:"filename,omitempty"`
Size int64 `json:"size"`
LocalPath string `json:"-"` // not exposed via API
Transcription string `json:"transcription,omitempty"`
Status string `json:"status"` // stored, processing, routed, deleted
}
```
### Command
```go
type Command struct {
ID string `json:"id"`
Type string `json:"type"` // send, route, delete, archive, forward
Payload json.RawMessage `json:"payload"`
Status string `json:"status"` // pending, executing, completed, failed
Error string `json:"error,omitempty"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
```
## REST API
Base URL: `http://localhost:8040/api/v1`
### Authentication
OAuth 2.0 Bearer tokens. All API endpoints require `Authorization: Bearer <token>`.
Token endpoint: `POST /oauth/token`
- Grant types: `client_credentials`, `authorization_code`
- Scopes: `messages:read`, `messages:write`, `commands:write`, `admin`
### Endpoints
#### Messages
| Method | Endpoint | Description | Scope |
|--------|----------|-------------|-------|
| GET | /messages | List messages (paginated, filterable) | messages:read |
| GET | /messages/:id | Get single message | messages:read |
| GET | /messages/:id/attachments/:att_id | Download attachment | messages:read |
| POST | /messages | Send new message | messages:write |
**Query parameters for GET /messages:**
- `source` - filter by source (email, whatsapp, signal)
- `direction` - inbound, outbound
- `type` - text, voice, image, document, email
- `since` - ISO8601 timestamp
- `until` - ISO8601 timestamp
- `limit` - max results (default 50, max 200)
- `offset` - pagination offset
**POST /messages (send):**
```json
{
"channel": "whatsapp",
"to": "+17272252475",
"body": "Hello!",
"attachments": [
{"id": "att_xyz"} // reference existing attachment
]
}
```
#### Commands
| Method | Endpoint | Description | Scope |
|--------|----------|-------------|-------|
| POST | /commands | Execute command | commands:write |
| GET | /commands/:id | Get command status | commands:write |
**Command types:**
**route** - Send attachment to external system
```json
{
"type": "route",
"payload": {
"attachment_id": "att_xyz",
"destination": "docsys",
"options": {}
}
}
```
**send** - Send message (alias for POST /messages)
```json
{
"type": "send",
"payload": {
"channel": "signal",
"to": "+17272252475",
"body": "Got it!"
}
}
```
**delete** - Delete message (email)
```json
{
"type": "delete",
"payload": {
"message_id": "msg_abc"
}
}
```
**archive** - Archive message (email)
```json
{
"type": "archive",
"payload": {
"message_id": "msg_abc"
}
}
```
**forward** - Forward attachment to chat
```json
{
"type": "forward",
"payload": {
"attachment_id": "att_xyz",
"channel": "whatsapp",
"to": "+17272252475",
"caption": "Here's that file"
}
}
```
#### Channels
| Method | Endpoint | Description | Scope |
|--------|----------|-------------|-------|
| GET | /channels | List configured channels | admin |
| GET | /channels/:name/status | Get channel status | admin |
| POST | /channels/:name/reconnect | Force reconnect | admin |
#### Admin
| Method | Endpoint | Description | Scope |
|--------|----------|-------------|-------|
| GET | /health | Health check (no auth) | - |
| GET | /ready | Readiness check (no auth) | - |
| GET | /stats | System statistics | admin |
## Webhooks
MC sends webhooks to configured endpoints on events.
**Webhook payload:**
```json
{
"event": "message.received",
"timestamp": "2026-02-02T21:30:00Z",
"data": {
"id": "msg_abc123",
"source": "whatsapp",
"from": {"id": "+17272253810", "name": "Tanya"},
"type": "voice",
"body": "Hey, can you check on Sophia's appointment?",
"attachments": [
{"id": "att_xyz", "type": "audio/ogg", "size": 8420, "transcription": "Hey, can you check on Sophia's appointment?"}
]
}
}
```
**Events:**
- `message.received` - New inbound message
- `message.sent` - Outbound message delivered
- `message.failed` - Outbound message failed
- `channel.connected` - Channel connected
- `channel.disconnected` - Channel disconnected
- `command.completed` - Command finished
- `command.failed` - Command failed
**Webhook auth:** HMAC-SHA256 signature in `X-MC-Signature` header.
## Web GUI
Dashboard for human oversight at `http://localhost:8040/`
### Pages
1. **Dashboard** - Overview of all channels, recent messages, system health
2. **Messages** - Searchable/filterable message list with details
3. **Channels** - Channel status, configuration, reconnect buttons
4. **Commands** - Command history and status
5. **Settings** - OAuth clients, webhooks, routing rules
### Features
- Real-time updates via SSE (Server-Sent Events)
- Mobile responsive (Tailwind CSS)
- Dark mode
- Message search and filters
- Attachment preview (images, PDFs)
- Audio playback for voice messages
## Configuration
```yaml
# config.yaml
server:
host: 0.0.0.0
port: 8040
database:
path: /var/lib/messaging-center/mc.db
storage:
path: /var/lib/messaging-center/attachments
oauth:
issuer: http://localhost:8040
signing_key: ${MC_SIGNING_KEY}
access_token_ttl: 1h
clients:
- id: openclaw
secret: ${MC_OPENCLAW_SECRET}
scopes: [messages:read, messages:write, commands:write]
- id: admin
secret: ${MC_ADMIN_SECRET}
scopes: [messages:read, messages:write, commands:write, admin]
transcription:
provider: fireworks
api_key: ${FIREWORKS_API_KEY}
model: whisper-v3-turbo
webhooks:
- url: http://localhost:18789/hooks/message
secret: ${MC_WEBHOOK_SECRET}
events: [message.received, message.sent]
routing:
docsys:
url: http://localhost:8050/ingest
auth: bearer ${DOCSYS_TOKEN}
channels:
email:
enabled: true
imap:
host: 127.0.0.1
port: 1143
username: tj@jongsma.me
password: ${PROTON_BRIDGE_PASSWORD}
tls: starttls
smtp:
host: 127.0.0.1
port: 1025
username: tj@jongsma.me
password: ${PROTON_BRIDGE_PASSWORD}
tls: starttls
whatsapp:
enabled: true
data_dir: /var/lib/messaging-center/whatsapp
signal:
enabled: true
api_url: http://localhost:8080
account: +31634481877
```
## Directory Structure
```
messaging-center/
├── cmd/
│ └── mc/
│ └── main.go
├── internal/
│ ├── adapter/
│ │ ├── adapter.go # Interface
│ │ ├── email/
│ │ ├── whatsapp/
│ │ └── signal/
│ ├── api/
│ │ ├── router.go
│ │ ├── messages.go
│ │ ├── commands.go
│ │ ├── channels.go
│ │ └── oauth.go
│ ├── core/
│ │ ├── engine.go
│ │ ├── message.go
│ │ ├── command.go
│ │ └── transcribe.go
│ ├── store/
│ │ ├── sqlite.go
│ │ └── migrations/
│ ├── web/
│ │ ├── handlers.go
│ │ ├── templates/
│ │ └── static/
│ └── webhook/
│ └── sender.go
├── config.yaml
├── go.mod
├── go.sum
├── Dockerfile
├── docker-compose.yml
└── README.md
```
## Milestones
### v0.1 — Foundation
- [ ] Project structure
- [ ] SQLite store with migrations
- [ ] Core engine (message normalization)
- [ ] REST API scaffold
- [ ] OAuth 2.0 (client_credentials)
- [ ] Health/ready endpoints
### v0.2 — Email Adapter
- [ ] IMAP inbound (from mail-bridge)
- [ ] SMTP outbound
- [ ] Email-specific commands (delete, archive, move)
### v0.3 — WhatsApp Adapter
- [ ] whatsmeow integration (from message-bridge)
- [ ] Voice transcription (Fireworks)
- [ ] Media handling
### v0.4 — Signal Adapter
- [ ] signal-cli JSON-RPC client
- [ ] Bidirectional messaging
### v0.5 — Web GUI
- [ ] Dashboard
- [ ] Message browser
- [ ] Channel status
- [ ] Settings
### v0.6 — Webhooks & Routing
- [ ] Webhook sender with HMAC
- [ ] docsys routing
- [ ] Command execution
### v1.0 — Production Ready
- [ ] Systemd service
- [ ] Documentation
- [ ] Docker image
- [ ] Backup/restore
---
## Open Questions
1. **OAuth provider:** Build simple local provider, or integrate with external (Authentik)?
- *Recommendation:* Start with simple local provider, extract later if needed
2. **Signal-cli:** Keep as external daemon, or embed?
- *Recommendation:* Keep external (Java, complex), adapter calls JSON-RPC
3. **Database:** SQLite vs Postgres?
- *Recommendation:* SQLite for now (simpler), abstract for later swap
4. **Message retention:** How long to keep messages/attachments?
- **Decision:** Forever. Storage is cheap.
---
*Ready to build. Starting with v0.1 foundation.*

71
cmd/mc/main.go Normal file
View File

@ -0,0 +1,71 @@
package main
import (
"flag"
"fmt"
"log"
"net/http"
"os"
"os/signal"
"path/filepath"
"syscall"
"github.com/inou-ai/messaging-center/internal/api"
"github.com/inou-ai/messaging-center/internal/core"
"github.com/inou-ai/messaging-center/internal/store"
)
func main() {
configPath := flag.String("config", "config.yaml", "Path to config file")
flag.Parse()
// Load configuration
cfg, err := core.LoadConfig(*configPath)
if err != nil {
log.Fatalf("Failed to load config: %v", err)
}
// Ensure storage directory exists
if err := os.MkdirAll(cfg.Storage.Path, 0755); err != nil {
log.Fatalf("Failed to create storage directory: %v", err)
}
// Ensure database directory exists
dbDir := filepath.Dir(cfg.Database.Path)
if dbDir != "." && dbDir != "" {
if err := os.MkdirAll(dbDir, 0755); err != nil {
log.Fatalf("Failed to create database directory: %v", err)
}
}
// Initialize store
store, err := store.New(cfg.Database.Path)
if err != nil {
log.Fatalf("Failed to initialize store: %v", err)
}
defer store.Close()
// Create API server
srv := api.NewServer(cfg, store)
// Start HTTP server
addr := fmt.Sprintf("%s:%d", cfg.Server.Host, cfg.Server.Port)
httpServer := &http.Server{
Addr: addr,
Handler: srv,
}
// Handle shutdown
done := make(chan os.Signal, 1)
signal.Notify(done, os.Interrupt, syscall.SIGTERM)
go func() {
log.Printf("Starting messaging-center on %s", addr)
if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatalf("HTTP server error: %v", err)
}
}()
<-done
log.Println("Shutting down...")
}

61
config.yaml Normal file
View File

@ -0,0 +1,61 @@
# Messaging Center Configuration
server:
host: 0.0.0.0
port: 8040
database:
path: ./data/mc.db
storage:
path: ./data/attachments
oauth:
issuer: http://localhost:8040
signing_key: ${MC_SIGNING_KEY:-change-me-in-production}
access_token_ttl: 1h
clients:
- id: openclaw
secret: ${MC_OPENCLAW_SECRET:-dev-secret-openclaw}
scopes: [messages:read, messages:write, commands:write]
- id: admin
secret: ${MC_ADMIN_SECRET:-dev-secret-admin}
scopes: [messages:read, messages:write, commands:write, admin]
# Transcription (not used in v0.1)
transcription:
provider: fireworks
api_key: ${FIREWORKS_API_KEY:-}
model: whisper-v3-turbo
# Webhooks (not implemented in v0.1)
webhooks:
- url: http://localhost:18789/hooks/message
secret: ${MC_WEBHOOK_SECRET:-}
events: [message.received, message.sent]
# Channels (not implemented in v0.1)
channels:
email:
enabled: false
imap:
host: 127.0.0.1
port: 1143
username: ${EMAIL_USERNAME:-}
password: ${EMAIL_PASSWORD:-}
tls: starttls
smtp:
host: 127.0.0.1
port: 1025
username: ${EMAIL_USERNAME:-}
password: ${EMAIL_PASSWORD:-}
tls: starttls
whatsapp:
enabled: false
data_dir: ./data/whatsapp
signal:
enabled: false
api_url: http://localhost:8080
account: ${SIGNAL_ACCOUNT:-}

10
go.mod Normal file
View File

@ -0,0 +1,10 @@
module github.com/inou-ai/messaging-center
go 1.23
require (
github.com/go-chi/chi/v5 v5.2.1
github.com/google/uuid v1.6.0
github.com/mattn/go-sqlite3 v1.14.24
gopkg.in/yaml.v3 v3.0.1
)

10
go.sum Normal file
View File

@ -0,0 +1,10 @@
github.com/go-chi/chi/v5 v5.2.1 h1:KOIHODQj58PmL80G2Eak4WdvUzjSJSm0vG72crDCqb8=
github.com/go-chi/chi/v5 v5.2.1/go.mod h1:L2yAIGWB3H+phAw1NxKwWM+7eUH/lU8pOMm5hHcoops=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/mattn/go-sqlite3 v1.14.24 h1:tpSp2G2KyMnnQu99ngJ47EIkWVmliIizyZBfPrBWDRM=
github.com/mattn/go-sqlite3 v1.14.24/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

45
internal/api/channels.go Normal file
View File

@ -0,0 +1,45 @@
package api
import (
"net/http"
"github.com/inou-ai/messaging-center/internal/core"
)
// ChannelsResponse wraps a list of channels for API response.
type ChannelsResponse struct {
Channels []core.Channel `json:"channels"`
}
// handleListChannels handles GET /api/v1/channels (stub).
func (s *Server) handleListChannels(w http.ResponseWriter, r *http.Request) {
// For now, return configured channels with unknown status
// Actual status will come from adapters once implemented
var channels []core.Channel
if s.config.Channels.Email != nil && s.config.Channels.Email.Enabled {
channels = append(channels, core.Channel{
Name: "email",
Enabled: true,
Status: "not_implemented",
})
}
if s.config.Channels.WhatsApp != nil && s.config.Channels.WhatsApp.Enabled {
channels = append(channels, core.Channel{
Name: "whatsapp",
Enabled: true,
Status: "not_implemented",
})
}
if s.config.Channels.Signal != nil && s.config.Channels.Signal.Enabled {
channels = append(channels, core.Channel{
Name: "signal",
Enabled: true,
Status: "not_implemented",
})
}
writeJSON(w, http.StatusOK, ChannelsResponse{Channels: channels})
}

87
internal/api/commands.go Normal file
View File

@ -0,0 +1,87 @@
package api
import (
"encoding/json"
"net/http"
"time"
"github.com/go-chi/chi/v5"
"github.com/google/uuid"
"github.com/inou-ai/messaging-center/internal/core"
)
// CommandResponse wraps a command for API response.
type CommandResponse struct {
Command *core.Command `json:"command"`
}
// CreateCommandRequest is the request body for creating a command.
type CreateCommandRequest struct {
Type string `json:"type"`
Payload json.RawMessage `json:"payload"`
}
var validCommandTypes = map[string]bool{
"send": true,
"route": true,
"delete": true,
"archive": true,
"forward": true,
}
// handleCreateCommand handles POST /api/v1/commands (stub).
func (s *Server) handleCreateCommand(w http.ResponseWriter, r *http.Request) {
var req CreateCommandRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeJSON(w, http.StatusBadRequest, map[string]string{"error": "invalid request body"})
return
}
if req.Type == "" {
writeJSON(w, http.StatusBadRequest, map[string]string{"error": "type is required"})
return
}
if !validCommandTypes[req.Type] {
writeJSON(w, http.StatusBadRequest, map[string]string{"error": "invalid command type"})
return
}
now := time.Now()
cmd := &core.Command{
ID: "cmd_" + uuid.New().String()[:8],
Type: req.Type,
Payload: req.Payload,
Status: "pending",
CreatedAt: now,
UpdatedAt: now,
}
if err := s.store.CreateCommand(cmd); err != nil {
writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
return
}
// TODO: Actually execute the command via adapters
// For now, just mark it as pending
writeJSON(w, http.StatusAccepted, CommandResponse{Command: cmd})
}
// handleGetCommand handles GET /api/v1/commands/:id.
func (s *Server) handleGetCommand(w http.ResponseWriter, r *http.Request) {
id := chi.URLParam(r, "id")
cmd, err := s.store.GetCommand(id)
if err != nil {
writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
return
}
if cmd == nil {
writeJSON(w, http.StatusNotFound, map[string]string{"error": "command not found"})
return
}
writeJSON(w, http.StatusOK, CommandResponse{Command: cmd})
}

20
internal/api/context.go Normal file
View File

@ -0,0 +1,20 @@
package api
import "context"
type contextKey string
const tokenInfoKey contextKey = "tokenInfo"
// withTokenInfo stores token info in context.
func withTokenInfo(ctx context.Context, info *TokenInfo) context.Context {
return context.WithValue(ctx, tokenInfoKey, info)
}
// GetTokenInfo retrieves token info from context.
func GetTokenInfo(ctx context.Context) *TokenInfo {
if v := ctx.Value(tokenInfoKey); v != nil {
return v.(*TokenInfo)
}
return nil
}

156
internal/api/messages.go Normal file
View File

@ -0,0 +1,156 @@
package api
import (
"encoding/json"
"net/http"
"strconv"
"time"
"github.com/go-chi/chi/v5"
"github.com/google/uuid"
"github.com/inou-ai/messaging-center/internal/core"
)
// MessageResponse wraps a message for API response.
type MessageResponse struct {
Message *core.Message `json:"message"`
}
// MessagesResponse wraps a list of messages for API response.
type MessagesResponse struct {
Messages []core.Message `json:"messages"`
Count int `json:"count"`
Limit int `json:"limit"`
Offset int `json:"offset"`
}
// CreateMessageRequest is the request body for creating a message.
type CreateMessageRequest struct {
Channel string `json:"channel"`
To string `json:"to"`
Body string `json:"body"`
Subject string `json:"subject,omitempty"`
Attachments []string `json:"attachments,omitempty"` // attachment IDs
}
// handleListMessages handles GET /api/v1/messages.
func (s *Server) handleListMessages(w http.ResponseWriter, r *http.Request) {
filter := core.MessageFilter{
Source: r.URL.Query().Get("source"),
Direction: r.URL.Query().Get("direction"),
Type: r.URL.Query().Get("type"),
}
if since := r.URL.Query().Get("since"); since != "" {
if t, err := time.Parse(time.RFC3339, since); err == nil {
filter.Since = &t
}
}
if until := r.URL.Query().Get("until"); until != "" {
if t, err := time.Parse(time.RFC3339, until); err == nil {
filter.Until = &t
}
}
if limit := r.URL.Query().Get("limit"); limit != "" {
if l, err := strconv.Atoi(limit); err == nil {
if l > 200 {
l = 200
}
filter.Limit = l
}
}
if offset := r.URL.Query().Get("offset"); offset != "" {
if o, err := strconv.Atoi(offset); err == nil {
filter.Offset = o
}
}
messages, err := s.store.ListMessages(filter)
if err != nil {
writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
return
}
if messages == nil {
messages = []core.Message{}
}
writeJSON(w, http.StatusOK, MessagesResponse{
Messages: messages,
Count: len(messages),
Limit: filter.Limit,
Offset: filter.Offset,
})
}
// handleGetMessage handles GET /api/v1/messages/:id.
func (s *Server) handleGetMessage(w http.ResponseWriter, r *http.Request) {
id := chi.URLParam(r, "id")
msg, err := s.store.GetMessage(id)
if err != nil {
writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
return
}
if msg == nil {
writeJSON(w, http.StatusNotFound, map[string]string{"error": "message not found"})
return
}
writeJSON(w, http.StatusOK, MessageResponse{Message: msg})
}
// handleCreateMessage handles POST /api/v1/messages (stub).
func (s *Server) handleCreateMessage(w http.ResponseWriter, r *http.Request) {
var req CreateMessageRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeJSON(w, http.StatusBadRequest, map[string]string{"error": "invalid request body"})
return
}
if req.Channel == "" || req.To == "" || req.Body == "" {
writeJSON(w, http.StatusBadRequest, map[string]string{"error": "channel, to, and body are required"})
return
}
// For now, just create a pending message record
// Actual sending will be implemented with adapters
now := time.Now()
msg := &core.Message{
ID: "msg_" + uuid.New().String()[:8],
Source: req.Channel,
Direction: "outbound",
From: core.Contact{ID: "system", Type: "system"},
To: core.Contact{ID: req.To, Type: contactType(req.Channel)},
Timestamp: now,
Type: "text",
Subject: req.Subject,
Body: req.Body,
Status: "pending",
CreatedAt: now,
UpdatedAt: now,
}
if err := s.store.CreateMessage(msg); err != nil {
writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
return
}
writeJSON(w, http.StatusAccepted, MessageResponse{Message: msg})
}
// contactType returns the contact type for a channel.
func contactType(channel string) string {
switch channel {
case "email":
return "email"
case "whatsapp", "signal", "sms":
return "phone"
default:
return "unknown"
}
}

220
internal/api/oauth.go Normal file
View File

@ -0,0 +1,220 @@
package api
import (
"crypto/hmac"
"crypto/sha256"
"crypto/subtle"
"encoding/base64"
"encoding/json"
"net/http"
"strings"
"sync"
"time"
"github.com/google/uuid"
"github.com/inou-ai/messaging-center/internal/core"
)
// TokenInfo represents an issued access token.
type TokenInfo struct {
ClientID string
Scopes []string
ExpiresAt time.Time
}
// OAuthProvider handles OAuth 2.0 authentication.
type OAuthProvider struct {
config *core.OAuthConfig
tokens map[string]*TokenInfo
mu sync.RWMutex
}
// NewOAuthProvider creates a new OAuth provider.
func NewOAuthProvider(cfg *core.OAuthConfig) *OAuthProvider {
return &OAuthProvider{
config: cfg,
tokens: make(map[string]*TokenInfo),
}
}
// TokenResponse is the OAuth token endpoint response.
type TokenResponse struct {
AccessToken string `json:"access_token"`
TokenType string `json:"token_type"`
ExpiresIn int `json:"expires_in"`
Scope string `json:"scope,omitempty"`
}
// TokenError is the OAuth error response.
type TokenError struct {
Error string `json:"error"`
Description string `json:"error_description,omitempty"`
}
// HandleToken handles POST /oauth/token.
func (p *OAuthProvider) HandleToken(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
writeJSON(w, http.StatusMethodNotAllowed, TokenError{Error: "method_not_allowed"})
return
}
if err := r.ParseForm(); err != nil {
writeJSON(w, http.StatusBadRequest, TokenError{Error: "invalid_request"})
return
}
grantType := r.FormValue("grant_type")
if grantType != "client_credentials" {
writeJSON(w, http.StatusBadRequest, TokenError{
Error: "unsupported_grant_type",
Description: "Only client_credentials is supported",
})
return
}
// Get client credentials from Basic auth or form
clientID, clientSecret, ok := r.BasicAuth()
if !ok {
clientID = r.FormValue("client_id")
clientSecret = r.FormValue("client_secret")
}
if clientID == "" || clientSecret == "" {
writeJSON(w, http.StatusUnauthorized, TokenError{Error: "invalid_client"})
return
}
// Find and validate client
var client *core.OAuthClient
for i := range p.config.Clients {
if p.config.Clients[i].ID == clientID {
client = &p.config.Clients[i]
break
}
}
if client == nil || !secureCompare(client.Secret, clientSecret) {
writeJSON(w, http.StatusUnauthorized, TokenError{Error: "invalid_client"})
return
}
// Parse requested scopes
requestedScopes := strings.Fields(r.FormValue("scope"))
var grantedScopes []string
if len(requestedScopes) == 0 {
// Grant all client scopes
grantedScopes = client.Scopes
} else {
// Grant only requested scopes that client has
for _, s := range requestedScopes {
if client.HasScope(s) {
grantedScopes = append(grantedScopes, s)
}
}
}
// Generate token
token := generateToken()
expiresAt := time.Now().Add(p.config.AccessTokenTTL)
p.mu.Lock()
p.tokens[token] = &TokenInfo{
ClientID: clientID,
Scopes: grantedScopes,
ExpiresAt: expiresAt,
}
p.mu.Unlock()
writeJSON(w, http.StatusOK, TokenResponse{
AccessToken: token,
TokenType: "Bearer",
ExpiresIn: int(p.config.AccessTokenTTL.Seconds()),
Scope: strings.Join(grantedScopes, " "),
})
}
// ValidateToken validates a bearer token and returns its info.
func (p *OAuthProvider) ValidateToken(token string) *TokenInfo {
p.mu.RLock()
defer p.mu.RUnlock()
info, ok := p.tokens[token]
if !ok {
return nil
}
if time.Now().After(info.ExpiresAt) {
return nil
}
return info
}
// RequireAuth returns middleware that requires authentication.
func (p *OAuthProvider) RequireAuth(scopes ...string) func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
auth := r.Header.Get("Authorization")
if auth == "" {
w.Header().Set("WWW-Authenticate", "Bearer")
writeJSON(w, http.StatusUnauthorized, TokenError{Error: "unauthorized"})
return
}
parts := strings.SplitN(auth, " ", 2)
if len(parts) != 2 || !strings.EqualFold(parts[0], "Bearer") {
writeJSON(w, http.StatusUnauthorized, TokenError{Error: "invalid_token"})
return
}
info := p.ValidateToken(parts[1])
if info == nil {
writeJSON(w, http.StatusUnauthorized, TokenError{Error: "invalid_token"})
return
}
// Check required scopes
for _, required := range scopes {
hasScope := false
for _, s := range info.Scopes {
if s == required || s == "admin" {
hasScope = true
break
}
}
if !hasScope {
writeJSON(w, http.StatusForbidden, TokenError{
Error: "insufficient_scope",
Description: "Required scope: " + required,
})
return
}
}
// Store token info in context
ctx := r.Context()
ctx = withTokenInfo(ctx, info)
next.ServeHTTP(w, r.WithContext(ctx))
})
}
}
// generateToken creates a secure random token.
func generateToken() string {
id := uuid.New()
h := hmac.New(sha256.New, []byte("mc-token-salt"))
h.Write([]byte(id.String()))
return base64.RawURLEncoding.EncodeToString(h.Sum(nil))
}
// secureCompare compares two strings in constant time.
func secureCompare(a, b string) bool {
return subtle.ConstantTimeCompare([]byte(a), []byte(b)) == 1
}
func writeJSON(w http.ResponseWriter, status int, v any) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(status)
json.NewEncoder(w).Encode(v)
}

90
internal/api/router.go Normal file
View File

@ -0,0 +1,90 @@
package api
import (
"net/http"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
"github.com/inou-ai/messaging-center/internal/core"
"github.com/inou-ai/messaging-center/internal/store"
)
// Server is the HTTP API server.
type Server struct {
config *core.Config
store *store.Store
oauth *OAuthProvider
router chi.Router
}
// NewServer creates a new API server.
func NewServer(cfg *core.Config, s *store.Store) *Server {
srv := &Server{
config: cfg,
store: s,
oauth: NewOAuthProvider(&cfg.OAuth),
}
srv.setupRoutes()
return srv
}
// ServeHTTP implements http.Handler.
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
s.router.ServeHTTP(w, r)
}
func (s *Server) setupRoutes() {
r := chi.NewRouter()
// Middleware
r.Use(middleware.Logger)
r.Use(middleware.Recoverer)
r.Use(middleware.RealIP)
// Health endpoints (no auth)
r.Get("/health", s.handleHealth)
r.Get("/ready", s.handleReady)
// OAuth token endpoint
r.Post("/oauth/token", s.oauth.HandleToken)
// API v1
r.Route("/api/v1", func(r chi.Router) {
// Messages
r.Route("/messages", func(r chi.Router) {
r.With(s.oauth.RequireAuth("messages:read")).Get("/", s.handleListMessages)
r.With(s.oauth.RequireAuth("messages:read")).Get("/{id}", s.handleGetMessage)
r.With(s.oauth.RequireAuth("messages:write")).Post("/", s.handleCreateMessage)
})
// Commands
r.Route("/commands", func(r chi.Router) {
r.With(s.oauth.RequireAuth("commands:write")).Post("/", s.handleCreateCommand)
r.With(s.oauth.RequireAuth("commands:write")).Get("/{id}", s.handleGetCommand)
})
// Channels
r.Route("/channels", func(r chi.Router) {
r.With(s.oauth.RequireAuth("admin")).Get("/", s.handleListChannels)
})
})
s.router = r
}
// handleHealth returns 200 OK if the server is running.
func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) {
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
}
// handleReady returns 200 OK if the server is ready to accept traffic.
func (s *Server) handleReady(w http.ResponseWriter, r *http.Request) {
if err := s.store.Ping(); err != nil {
writeJSON(w, http.StatusServiceUnavailable, map[string]string{
"status": "not ready",
"error": err.Error(),
})
return
}
writeJSON(w, http.StatusOK, map[string]string{"status": "ready"})
}

160
internal/core/config.go Normal file
View File

@ -0,0 +1,160 @@
package core
import (
"os"
"strings"
"time"
"gopkg.in/yaml.v3"
)
// Config represents the application configuration.
type Config struct {
Server ServerConfig `yaml:"server"`
Database DatabaseConfig `yaml:"database"`
Storage StorageConfig `yaml:"storage"`
OAuth OAuthConfig `yaml:"oauth"`
Transcription TranscriptionConfig `yaml:"transcription"`
Webhooks []WebhookConfig `yaml:"webhooks"`
Channels ChannelsConfig `yaml:"channels"`
}
type ServerConfig struct {
Host string `yaml:"host"`
Port int `yaml:"port"`
}
type DatabaseConfig struct {
Path string `yaml:"path"`
}
type StorageConfig struct {
Path string `yaml:"path"`
}
type OAuthConfig struct {
Issuer string `yaml:"issuer"`
SigningKey string `yaml:"signing_key"`
AccessTokenTTL time.Duration `yaml:"access_token_ttl"`
Clients []OAuthClient `yaml:"clients"`
}
type OAuthClient struct {
ID string `yaml:"id"`
Secret string `yaml:"secret"`
Scopes []string `yaml:"scopes"`
}
type TranscriptionConfig struct {
Provider string `yaml:"provider"`
APIKey string `yaml:"api_key"`
Model string `yaml:"model"`
}
type WebhookConfig struct {
URL string `yaml:"url"`
Secret string `yaml:"secret"`
Events []string `yaml:"events"`
}
type ChannelsConfig struct {
Email *EmailChannelConfig `yaml:"email"`
WhatsApp *WhatsAppChannelConfig `yaml:"whatsapp"`
Signal *SignalChannelConfig `yaml:"signal"`
}
type EmailChannelConfig struct {
Enabled bool `yaml:"enabled"`
IMAP IMAPConfig `yaml:"imap"`
SMTP SMTPConfig `yaml:"smtp"`
}
type IMAPConfig struct {
Host string `yaml:"host"`
Port int `yaml:"port"`
Username string `yaml:"username"`
Password string `yaml:"password"`
TLS string `yaml:"tls"`
}
type SMTPConfig struct {
Host string `yaml:"host"`
Port int `yaml:"port"`
Username string `yaml:"username"`
Password string `yaml:"password"`
TLS string `yaml:"tls"`
}
type WhatsAppChannelConfig struct {
Enabled bool `yaml:"enabled"`
DataDir string `yaml:"data_dir"`
}
type SignalChannelConfig struct {
Enabled bool `yaml:"enabled"`
APIURL string `yaml:"api_url"`
Account string `yaml:"account"`
}
// LoadConfig reads and parses the configuration file.
func LoadConfig(path string) (*Config, error) {
data, err := os.ReadFile(path)
if err != nil {
return nil, err
}
// Expand environment variables with default value support (${VAR:-default})
expanded := os.Expand(string(data), func(key string) string {
// Handle ${VAR:-default} syntax
if idx := strings.Index(key, ":-"); idx != -1 {
envKey := key[:idx]
defaultVal := key[idx+2:]
if val := os.Getenv(envKey); val != "" {
return val
}
return defaultVal
}
return os.Getenv(key)
})
var cfg Config
if err := yaml.Unmarshal([]byte(expanded), &cfg); err != nil {
return nil, err
}
// Set defaults
if cfg.Server.Host == "" {
cfg.Server.Host = "0.0.0.0"
}
if cfg.Server.Port == 0 {
cfg.Server.Port = 8040
}
if cfg.Database.Path == "" {
cfg.Database.Path = "./mc.db"
}
if cfg.Storage.Path == "" {
cfg.Storage.Path = "./attachments"
}
if cfg.OAuth.AccessTokenTTL == 0 {
cfg.OAuth.AccessTokenTTL = time.Hour
}
return &cfg, nil
}
// HasScope checks if a client has a specific scope.
func (c *OAuthClient) HasScope(scope string) bool {
for _, s := range c.Scopes {
if s == scope || s == "admin" {
return true
}
// Check wildcard (e.g., "messages:*" matches "messages:read")
if strings.HasSuffix(s, ":*") {
prefix := strings.TrimSuffix(s, "*")
if strings.HasPrefix(scope, prefix) {
return true
}
}
}
return false
}

73
internal/core/types.go Normal file
View File

@ -0,0 +1,73 @@
package core
import (
"encoding/json"
"time"
)
// Message represents a normalized message from any channel.
type Message struct {
ID string `json:"id"`
Source string `json:"source"` // email, whatsapp, signal, sms, voicemail
Direction string `json:"direction"` // inbound, outbound
From Contact `json:"from"`
To Contact `json:"to"`
Timestamp time.Time `json:"timestamp"`
Type string `json:"type"` // text, voice, image, document, email
Subject string `json:"subject,omitempty"`
Body string `json:"body"`
BodyHTML string `json:"body_html,omitempty"`
Attachments []Attachment `json:"attachments,omitempty"`
Status string `json:"status"` // received, processing, delivered, failed
Raw json.RawMessage `json:"raw,omitempty"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
// Contact represents a sender or recipient.
type Contact struct {
ID string `json:"id"` // phone number, email, etc.
Name string `json:"name,omitempty"`
Type string `json:"type"` // phone, email, username
}
// Attachment represents a file attached to a message.
type Attachment struct {
ID string `json:"id"`
MessageID string `json:"message_id"`
Type string `json:"type"` // mime type
Filename string `json:"filename,omitempty"`
Size int64 `json:"size"`
LocalPath string `json:"-"` // not exposed via API
Transcription string `json:"transcription,omitempty"`
Status string `json:"status"` // stored, processing, routed, deleted
}
// Command represents an action to execute.
type Command struct {
ID string `json:"id"`
Type string `json:"type"` // send, route, delete, archive, forward
Payload json.RawMessage `json:"payload"`
Status string `json:"status"` // pending, executing, completed, failed
Error string `json:"error,omitempty"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
// Channel represents a configured messaging channel.
type Channel struct {
Name string `json:"name"`
Enabled bool `json:"enabled"`
Status string `json:"status"` // connected, disconnected, error
}
// MessageFilter for querying messages.
type MessageFilter struct {
Source string
Direction string
Type string
Since *time.Time
Until *time.Time
Limit int
Offset int
}

View File

@ -0,0 +1,52 @@
-- Initial schema for messaging-center
CREATE TABLE messages (
id TEXT PRIMARY KEY,
source TEXT NOT NULL, -- email, whatsapp, signal, sms, voicemail
direction TEXT NOT NULL, -- inbound, outbound
from_contact TEXT NOT NULL, -- JSON: {id, name, type}
to_contact TEXT NOT NULL, -- JSON: {id, name, type}
timestamp DATETIME NOT NULL,
type TEXT NOT NULL, -- text, voice, image, document, email
subject TEXT,
body TEXT,
body_html TEXT,
status TEXT NOT NULL, -- received, processing, delivered, failed
raw TEXT, -- original message JSON
created_at DATETIME NOT NULL,
updated_at DATETIME NOT NULL
);
CREATE INDEX idx_messages_source ON messages(source);
CREATE INDEX idx_messages_direction ON messages(direction);
CREATE INDEX idx_messages_timestamp ON messages(timestamp);
CREATE INDEX idx_messages_type ON messages(type);
CREATE INDEX idx_messages_status ON messages(status);
CREATE TABLE attachments (
id TEXT PRIMARY KEY,
message_id TEXT NOT NULL REFERENCES messages(id) ON DELETE CASCADE,
type TEXT NOT NULL, -- mime type
filename TEXT,
size INTEGER NOT NULL,
local_path TEXT,
transcription TEXT,
status TEXT NOT NULL -- stored, processing, routed, deleted
);
CREATE INDEX idx_attachments_message_id ON attachments(message_id);
CREATE INDEX idx_attachments_status ON attachments(status);
CREATE TABLE commands (
id TEXT PRIMARY KEY,
type TEXT NOT NULL, -- send, route, delete, archive, forward
payload TEXT, -- JSON
status TEXT NOT NULL, -- pending, executing, completed, failed
error TEXT,
created_at DATETIME NOT NULL,
updated_at DATETIME NOT NULL
);
CREATE INDEX idx_commands_type ON commands(type);
CREATE INDEX idx_commands_status ON commands(status);
CREATE INDEX idx_commands_created_at ON commands(created_at);

305
internal/store/sqlite.go Normal file
View File

@ -0,0 +1,305 @@
package store
import (
"database/sql"
"embed"
"encoding/json"
"fmt"
"io/fs"
"sort"
"strings"
"time"
"github.com/inou-ai/messaging-center/internal/core"
_ "github.com/mattn/go-sqlite3"
)
//go:embed migrations/*.sql
var migrations embed.FS
// Store provides database operations.
type Store struct {
db *sql.DB
}
// New creates a new Store and runs migrations.
func New(dbPath string) (*Store, error) {
db, err := sql.Open("sqlite3", dbPath+"?_journal_mode=WAL&_foreign_keys=on")
if err != nil {
return nil, fmt.Errorf("open database: %w", err)
}
s := &Store{db: db}
if err := s.migrate(); err != nil {
db.Close()
return nil, fmt.Errorf("migrate: %w", err)
}
return s, nil
}
// Close closes the database connection.
func (s *Store) Close() error {
return s.db.Close()
}
// migrate runs all SQL migrations in order.
func (s *Store) migrate() error {
// Create migrations table
_, err := s.db.Exec(`
CREATE TABLE IF NOT EXISTS migrations (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL UNIQUE,
applied_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
`)
if err != nil {
return fmt.Errorf("create migrations table: %w", err)
}
// Get applied migrations
rows, err := s.db.Query("SELECT name FROM migrations")
if err != nil {
return fmt.Errorf("query migrations: %w", err)
}
applied := make(map[string]bool)
for rows.Next() {
var name string
if err := rows.Scan(&name); err != nil {
rows.Close()
return err
}
applied[name] = true
}
rows.Close()
// Get migration files
entries, err := fs.ReadDir(migrations, "migrations")
if err != nil {
return fmt.Errorf("read migrations dir: %w", err)
}
// Sort by name
var files []string
for _, e := range entries {
if !e.IsDir() && strings.HasSuffix(e.Name(), ".sql") {
files = append(files, e.Name())
}
}
sort.Strings(files)
// Apply unapplied migrations
for _, name := range files {
if applied[name] {
continue
}
data, err := migrations.ReadFile("migrations/" + name)
if err != nil {
return fmt.Errorf("read migration %s: %w", name, err)
}
tx, err := s.db.Begin()
if err != nil {
return fmt.Errorf("begin transaction: %w", err)
}
if _, err := tx.Exec(string(data)); err != nil {
tx.Rollback()
return fmt.Errorf("execute migration %s: %w", name, err)
}
if _, err := tx.Exec("INSERT INTO migrations (name) VALUES (?)", name); err != nil {
tx.Rollback()
return fmt.Errorf("record migration %s: %w", name, err)
}
if err := tx.Commit(); err != nil {
return fmt.Errorf("commit migration %s: %w", name, err)
}
}
return nil
}
// DB returns the underlying database connection.
func (s *Store) DB() *sql.DB {
return s.db
}
// CreateMessage inserts a new message.
func (s *Store) CreateMessage(m *core.Message) error {
raw, _ := json.Marshal(m.Raw)
from, _ := json.Marshal(m.From)
to, _ := json.Marshal(m.To)
_, err := s.db.Exec(`
INSERT INTO messages (id, source, direction, from_contact, to_contact, timestamp, type, subject, body, body_html, status, raw, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`, m.ID, m.Source, m.Direction, from, to, m.Timestamp, m.Type, m.Subject, m.Body, m.BodyHTML, m.Status, raw, m.CreatedAt, m.UpdatedAt)
return err
}
// GetMessage retrieves a message by ID.
func (s *Store) GetMessage(id string) (*core.Message, error) {
var m core.Message
var from, to, raw []byte
err := s.db.QueryRow(`
SELECT id, source, direction, from_contact, to_contact, timestamp, type, subject, body, body_html, status, raw, created_at, updated_at
FROM messages WHERE id = ?
`, id).Scan(&m.ID, &m.Source, &m.Direction, &from, &to, &m.Timestamp, &m.Type, &m.Subject, &m.Body, &m.BodyHTML, &m.Status, &raw, &m.CreatedAt, &m.UpdatedAt)
if err == sql.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, err
}
json.Unmarshal(from, &m.From)
json.Unmarshal(to, &m.To)
m.Raw = raw
// Load attachments
attachments, err := s.GetAttachmentsByMessage(id)
if err != nil {
return nil, err
}
m.Attachments = attachments
return &m, nil
}
// ListMessages retrieves messages with optional filters.
func (s *Store) ListMessages(f core.MessageFilter) ([]core.Message, error) {
query := `SELECT id, source, direction, from_contact, to_contact, timestamp, type, subject, body, body_html, status, raw, created_at, updated_at FROM messages WHERE 1=1`
args := []any{}
if f.Source != "" {
query += " AND source = ?"
args = append(args, f.Source)
}
if f.Direction != "" {
query += " AND direction = ?"
args = append(args, f.Direction)
}
if f.Type != "" {
query += " AND type = ?"
args = append(args, f.Type)
}
if f.Since != nil {
query += " AND timestamp >= ?"
args = append(args, *f.Since)
}
if f.Until != nil {
query += " AND timestamp <= ?"
args = append(args, *f.Until)
}
query += " ORDER BY timestamp DESC"
if f.Limit > 0 {
query += fmt.Sprintf(" LIMIT %d", f.Limit)
} else {
query += " LIMIT 50"
}
if f.Offset > 0 {
query += fmt.Sprintf(" OFFSET %d", f.Offset)
}
rows, err := s.db.Query(query, args...)
if err != nil {
return nil, err
}
defer rows.Close()
var messages []core.Message
for rows.Next() {
var m core.Message
var from, to, raw []byte
if err := rows.Scan(&m.ID, &m.Source, &m.Direction, &from, &to, &m.Timestamp, &m.Type, &m.Subject, &m.Body, &m.BodyHTML, &m.Status, &raw, &m.CreatedAt, &m.UpdatedAt); err != nil {
return nil, err
}
json.Unmarshal(from, &m.From)
json.Unmarshal(to, &m.To)
m.Raw = raw
messages = append(messages, m)
}
return messages, nil
}
// CreateAttachment inserts a new attachment.
func (s *Store) CreateAttachment(a *core.Attachment) error {
_, err := s.db.Exec(`
INSERT INTO attachments (id, message_id, type, filename, size, local_path, transcription, status)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
`, a.ID, a.MessageID, a.Type, a.Filename, a.Size, a.LocalPath, a.Transcription, a.Status)
return err
}
// GetAttachmentsByMessage retrieves all attachments for a message.
func (s *Store) GetAttachmentsByMessage(messageID string) ([]core.Attachment, error) {
rows, err := s.db.Query(`
SELECT id, message_id, type, filename, size, local_path, transcription, status
FROM attachments WHERE message_id = ?
`, messageID)
if err != nil {
return nil, err
}
defer rows.Close()
var attachments []core.Attachment
for rows.Next() {
var a core.Attachment
if err := rows.Scan(&a.ID, &a.MessageID, &a.Type, &a.Filename, &a.Size, &a.LocalPath, &a.Transcription, &a.Status); err != nil {
return nil, err
}
attachments = append(attachments, a)
}
return attachments, nil
}
// CreateCommand inserts a new command.
func (s *Store) CreateCommand(c *core.Command) error {
_, err := s.db.Exec(`
INSERT INTO commands (id, type, payload, status, error, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
`, c.ID, c.Type, c.Payload, c.Status, c.Error, c.CreatedAt, c.UpdatedAt)
return err
}
// GetCommand retrieves a command by ID.
func (s *Store) GetCommand(id string) (*core.Command, error) {
var c core.Command
err := s.db.QueryRow(`
SELECT id, type, payload, status, error, created_at, updated_at
FROM commands WHERE id = ?
`, id).Scan(&c.ID, &c.Type, &c.Payload, &c.Status, &c.Error, &c.CreatedAt, &c.UpdatedAt)
if err == sql.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, err
}
return &c, nil
}
// UpdateCommandStatus updates a command's status and error.
func (s *Store) UpdateCommandStatus(id, status, errMsg string) error {
_, err := s.db.Exec(`
UPDATE commands SET status = ?, error = ?, updated_at = ? WHERE id = ?
`, status, errMsg, time.Now(), id)
return err
}
// Ping checks database connectivity.
func (s *Store) Ping() error {
return s.db.Ping()
}