Compare commits
34 Commits
master
...
mc-unified
| Author | SHA1 | Date |
|---|---|---|
|
|
53b74b0981 | |
|
|
d31e569d5a | |
|
|
8935329d60 | |
|
|
ecb9fa4a5f | |
|
|
32e51c4cfe | |
|
|
b408ebc2b7 | |
|
|
a76f3677b4 | |
|
|
2005d7521a | |
|
|
3f1259f288 | |
|
|
b69af43840 | |
|
|
8d8a1516bd | |
|
|
3e8b3000fe | |
|
|
adc00c3e6c | |
|
|
78bf7bd68b | |
|
|
6d48e6a826 | |
|
|
90485d381d | |
|
|
ee0bade365 | |
|
|
a5b538563d | |
|
|
a4947516ba | |
|
|
5084e879ef | |
|
|
80bdc5fb6a | |
|
|
8e3845876b | |
|
|
d8a99b5bd2 | |
|
|
7cef0dee97 | |
|
|
eb92f01ef1 | |
|
|
e017ee4cdd | |
|
|
f4e372be84 | |
|
|
24761c77f0 | |
|
|
79cfb0caa6 | |
|
|
05dae4426e | |
|
|
30c4d15b87 | |
|
|
b7a83f2ab5 | |
|
|
5c7c416029 | |
|
|
c6c0ccb9bc |
|
|
@ -4,3 +4,5 @@ node_modules/
|
||||||
.DS_Store
|
.DS_Store
|
||||||
|
|
||||||
mail-bridge
|
mail-bridge
|
||||||
|
message-center
|
||||||
|
~*
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,13 @@
|
||||||
|
BINARY := message-center
|
||||||
|
|
||||||
|
.PHONY: build install clean
|
||||||
|
|
||||||
|
build:
|
||||||
|
/usr/local/go/bin/go build -o $(BINARY) .
|
||||||
|
|
||||||
|
install: build
|
||||||
|
cp $(BINARY) message-center-installed
|
||||||
|
systemctl --user restart mail-bridge
|
||||||
|
|
||||||
|
clean:
|
||||||
|
rm -f $(BINARY) mail-bridge mail-bridge-old mail-bridge-installed
|
||||||
|
|
@ -0,0 +1,138 @@
|
||||||
|
# Message Center
|
||||||
|
|
||||||
|
Unified API for messages from multiple sources (email, WhatsApp).
|
||||||
|
|
||||||
|
## Features
|
||||||
|
|
||||||
|
- **Unified Message Format**: All messages follow the same schema regardless of source
|
||||||
|
- **Connector Architecture**: Pluggable message sources
|
||||||
|
- **Cursor Tracking**: Track high-water mark per consumer for reliable processing
|
||||||
|
- **Replay Window**: Query messages from any time period with `?since=`
|
||||||
|
- **Actions**: Archive, delete, reply, and forward attachments to documents
|
||||||
|
- **Simple Webhooks**: Just notifies `{"event": "new"}` when new messages arrive
|
||||||
|
|
||||||
|
## API Endpoints
|
||||||
|
|
||||||
|
### Unified Messages
|
||||||
|
|
||||||
|
| Endpoint | Method | Description |
|
||||||
|
|----------|--------|-------------|
|
||||||
|
| `/messages/new` | GET | Unseen messages from all sources |
|
||||||
|
| `/messages?since=24h` | GET | Messages from last 24 hours (supports h/d/w) |
|
||||||
|
| `/messages/{id}` | GET | Single message by ID |
|
||||||
|
| `/messages/ack` | POST | Advance cursor for a consumer |
|
||||||
|
| `/messages/{id}/archive` | POST | Archive message |
|
||||||
|
| `/messages/{id}/delete` | POST | Delete message |
|
||||||
|
| `/messages/{id}/reply` | POST | Reply to message |
|
||||||
|
| `/messages/{id}/to-docs` | POST | Save attachments to ~/documents/inbox/ |
|
||||||
|
| `/messages/{id}/attachments` | GET | List or download attachments |
|
||||||
|
|
||||||
|
### Message Format
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"id": "proton:12345",
|
||||||
|
"source": "proton",
|
||||||
|
"from": "sender@example.com",
|
||||||
|
"from_name": "Sender Name",
|
||||||
|
"to": "recipient@example.com",
|
||||||
|
"timestamp": "2026-02-02T10:30:00Z",
|
||||||
|
"subject": "Hello World",
|
||||||
|
"body": "Message content...",
|
||||||
|
"attachments": [
|
||||||
|
{"name": "doc.pdf", "mime": "application/pdf", "size": 12345}
|
||||||
|
],
|
||||||
|
"seen": false
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### Cursor/Acknowledgment
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Acknowledge messages up to a timestamp
|
||||||
|
curl -X POST http://localhost:8025/messages/ack \
|
||||||
|
-d '{"consumer": "james", "timestamp": "2026-02-02T12:00:00Z"}'
|
||||||
|
```
|
||||||
|
|
||||||
|
### Reply
|
||||||
|
|
||||||
|
```bash
|
||||||
|
curl -X POST http://localhost:8025/messages/proton:12345/reply \
|
||||||
|
-d '{"body": "Thanks for your message!"}'
|
||||||
|
```
|
||||||
|
|
||||||
|
### Forward to Documents
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# All attachments
|
||||||
|
curl -X POST http://localhost:8025/messages/proton:12345/to-docs
|
||||||
|
|
||||||
|
# Specific attachments
|
||||||
|
curl -X POST http://localhost:8025/messages/proton:12345/to-docs \
|
||||||
|
-d '{"attachments": ["invoice.pdf"]}'
|
||||||
|
```
|
||||||
|
|
||||||
|
## Configuration
|
||||||
|
|
||||||
|
```yaml
|
||||||
|
server:
|
||||||
|
host: 127.0.0.1
|
||||||
|
port: 8025
|
||||||
|
|
||||||
|
data_dir: ~/.message-center
|
||||||
|
|
||||||
|
accounts:
|
||||||
|
proton:
|
||||||
|
host: 127.0.0.1
|
||||||
|
port: 1143
|
||||||
|
username: user@example.com
|
||||||
|
password: ${PROTON_BRIDGE_PASSWORD}
|
||||||
|
tls: starttls
|
||||||
|
watch:
|
||||||
|
- INBOX
|
||||||
|
smtp:
|
||||||
|
host: 127.0.0.1
|
||||||
|
port: 1025
|
||||||
|
|
||||||
|
connectors:
|
||||||
|
whatsapp:
|
||||||
|
enabled: true
|
||||||
|
name: whatsapp
|
||||||
|
base_url: http://localhost:8030
|
||||||
|
|
||||||
|
webhook:
|
||||||
|
enabled: true
|
||||||
|
url: http://localhost:18789/hooks/messages
|
||||||
|
```
|
||||||
|
|
||||||
|
## Installation
|
||||||
|
|
||||||
|
```bash
|
||||||
|
./install.sh
|
||||||
|
# Edit ~/.config/message-center.env with your passwords
|
||||||
|
systemctl --user start message-center
|
||||||
|
```
|
||||||
|
|
||||||
|
## Connectors
|
||||||
|
|
||||||
|
### Email (IMAP)
|
||||||
|
|
||||||
|
- Supports multiple accounts
|
||||||
|
- IDLE for real-time notifications
|
||||||
|
- Full message fetch with attachments
|
||||||
|
- Archive, delete, reply actions
|
||||||
|
|
||||||
|
### WhatsApp
|
||||||
|
|
||||||
|
- Wraps message-bridge HTTP API (port 8030)
|
||||||
|
- Polls for new messages (10s interval)
|
||||||
|
- Supports media attachments
|
||||||
|
- Reply via WhatsApp API
|
||||||
|
|
||||||
|
## Legacy Endpoints
|
||||||
|
|
||||||
|
For backwards compatibility with existing mail-bridge clients:
|
||||||
|
|
||||||
|
- `GET /accounts` - List accounts
|
||||||
|
- `GET /accounts/{account}/mailboxes` - List folders
|
||||||
|
- `GET /accounts/{account}/messages` - List messages
|
||||||
41
config.yaml
41
config.yaml
|
|
@ -2,16 +2,24 @@ server:
|
||||||
host: 127.0.0.1
|
host: 127.0.0.1
|
||||||
port: 8025
|
port: 8025
|
||||||
|
|
||||||
|
data_dir: ~/.message-center
|
||||||
|
|
||||||
accounts:
|
accounts:
|
||||||
proton:
|
tj_jongsma_me:
|
||||||
host: 127.0.0.1
|
host: 127.0.0.1
|
||||||
port: 1143
|
port: 1143
|
||||||
username: tj@jongsma.me
|
username: tj@jongsma.me
|
||||||
password: ${PROTON_BRIDGE_PASSWORD}
|
password: ${TANYA_BRIDGE_PASSWORD}
|
||||||
tls: starttls
|
tls: starttls
|
||||||
watch:
|
watch:
|
||||||
- INBOX
|
- INBOX
|
||||||
johan:
|
smtp:
|
||||||
|
host: 127.0.0.1
|
||||||
|
port: 1025
|
||||||
|
username: tj@jongsma.me
|
||||||
|
password: ${TANYA_BRIDGE_PASSWORD}
|
||||||
|
from: tj@jongsma.me
|
||||||
|
johan_jongsma_me:
|
||||||
host: 127.0.0.1
|
host: 127.0.0.1
|
||||||
port: 1143
|
port: 1143
|
||||||
username: johan@jongsma.me
|
username: johan@jongsma.me
|
||||||
|
|
@ -19,8 +27,33 @@ accounts:
|
||||||
tls: starttls
|
tls: starttls
|
||||||
watch:
|
watch:
|
||||||
- INBOX
|
- INBOX
|
||||||
|
smtp:
|
||||||
|
host: 127.0.0.1
|
||||||
|
port: 1025
|
||||||
|
username: johan@jongsma.me
|
||||||
|
password: ${JOHAN_BRIDGE_PASSWORD}
|
||||||
|
from: johan@jongsma.me
|
||||||
|
|
||||||
|
connectors:
|
||||||
|
whatsapp:
|
||||||
|
enabled: true
|
||||||
|
name: whatsapp
|
||||||
|
base_url: http://localhost:8030
|
||||||
|
sms:
|
||||||
|
enabled: true
|
||||||
|
name: sms
|
||||||
|
gateway_url: http://localhost:9877
|
||||||
|
|
||||||
webhook:
|
webhook:
|
||||||
enabled: true
|
enabled: true
|
||||||
url: http://localhost:18789/hooks/mail
|
url: http://localhost:18789/hooks/messages
|
||||||
token: "kuma-alert-token-2026"
|
token: "kuma-alert-token-2026"
|
||||||
|
|
||||||
|
triage:
|
||||||
|
enabled: true
|
||||||
|
prompt_file: /home/johan/clawd/config/email-triage-prompt.md
|
||||||
|
provider:
|
||||||
|
base_url: https://api.fireworks.ai/inference/v1
|
||||||
|
api_key: ${FIREWORKS_API_KEY}
|
||||||
|
model: accounts/fireworks/models/kimi-k2p5
|
||||||
|
# dashboard_url removed — routing handled by OpenClaw via webhook
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,65 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// AttachmentMeta holds attachment metadata (content fetched on demand)
|
||||||
|
type AttachmentMeta struct {
|
||||||
|
Name string `json:"name"`
|
||||||
|
Mime string `json:"mime"`
|
||||||
|
Size int `json:"size"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// UnifiedMessage is the canonical message format across all sources
|
||||||
|
type UnifiedMessage struct {
|
||||||
|
ID string `json:"id"` // source:unique_id
|
||||||
|
Source string `json:"source"` // proton, whatsapp, etc.
|
||||||
|
From string `json:"from"`
|
||||||
|
FromName string `json:"from_name,omitempty"`
|
||||||
|
To string `json:"to"`
|
||||||
|
Timestamp time.Time `json:"timestamp"`
|
||||||
|
Subject string `json:"subject,omitempty"`
|
||||||
|
Body string `json:"body"`
|
||||||
|
Attachments []AttachmentMeta `json:"attachments"`
|
||||||
|
Seen bool `json:"seen"`
|
||||||
|
// Internal fields (not exposed in JSON)
|
||||||
|
SourceUID string `json:"-"` // Original UID/ID from source
|
||||||
|
SourceExtra any `json:"-"` // Source-specific data (e.g., folder for email)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Connector defines the interface for message sources
|
||||||
|
type Connector interface {
|
||||||
|
// Name returns the connector identifier (e.g., "proton", "whatsapp")
|
||||||
|
Name() string
|
||||||
|
|
||||||
|
// FetchNew returns unseen/new messages
|
||||||
|
FetchNew() ([]UnifiedMessage, error)
|
||||||
|
|
||||||
|
// FetchSince returns messages since the given time
|
||||||
|
FetchSince(since time.Time) ([]UnifiedMessage, error)
|
||||||
|
|
||||||
|
// FetchOne returns a single message by its source-specific ID
|
||||||
|
FetchOne(sourceID string) (*UnifiedMessage, error)
|
||||||
|
|
||||||
|
// Archive marks a message as archived
|
||||||
|
Archive(sourceID string) error
|
||||||
|
|
||||||
|
// Delete permanently removes a message
|
||||||
|
Delete(sourceID string) error
|
||||||
|
|
||||||
|
// Reply sends a reply to a message
|
||||||
|
Reply(sourceID string, body string, attachments []string) error
|
||||||
|
|
||||||
|
// MarkSeen marks a message as seen/read
|
||||||
|
MarkSeen(sourceID string) error
|
||||||
|
|
||||||
|
// GetAttachment retrieves attachment content (base64)
|
||||||
|
GetAttachment(sourceID string, filename string) ([]byte, error)
|
||||||
|
|
||||||
|
// Start begins watching for new messages (calls callback on new)
|
||||||
|
Start(callback func()) error
|
||||||
|
|
||||||
|
// Stop stops the connector
|
||||||
|
Stop()
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,614 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"crypto/tls"
|
||||||
|
"encoding/base64"
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"mime"
|
||||||
|
"net/smtp"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/emersion/go-imap/v2"
|
||||||
|
"github.com/emersion/go-imap/v2/imapclient"
|
||||||
|
)
|
||||||
|
|
||||||
|
// EmailConnector implements Connector for IMAP email accounts
|
||||||
|
type EmailConnector struct {
|
||||||
|
name string
|
||||||
|
config AccountConfig
|
||||||
|
smtpConfig SMTPConfig
|
||||||
|
ctx context.Context
|
||||||
|
cancel context.CancelFunc
|
||||||
|
callback func()
|
||||||
|
mu sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
// SMTPConfig holds SMTP settings for sending replies
|
||||||
|
type SMTPConfig struct {
|
||||||
|
Host string
|
||||||
|
Port int
|
||||||
|
Username string
|
||||||
|
Password string
|
||||||
|
From string
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewEmailConnector creates a new email connector
|
||||||
|
func NewEmailConnector(name string, config AccountConfig, smtpConfig SMTPConfig) *EmailConnector {
|
||||||
|
return &EmailConnector{
|
||||||
|
name: name,
|
||||||
|
config: config,
|
||||||
|
smtpConfig: smtpConfig,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *EmailConnector) Name() string {
|
||||||
|
return e.name
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *EmailConnector) connect() (*imapclient.Client, error) {
|
||||||
|
addr := fmt.Sprintf("%s:%d", e.config.Host, e.config.Port)
|
||||||
|
|
||||||
|
var client *imapclient.Client
|
||||||
|
var err error
|
||||||
|
|
||||||
|
tlsConfig := &tls.Config{InsecureSkipVerify: true}
|
||||||
|
switch e.config.TLS {
|
||||||
|
case "ssl":
|
||||||
|
client, err = imapclient.DialTLS(addr, &imapclient.Options{TLSConfig: tlsConfig})
|
||||||
|
case "starttls":
|
||||||
|
client, err = imapclient.DialStartTLS(addr, &imapclient.Options{TLSConfig: tlsConfig})
|
||||||
|
default:
|
||||||
|
client, err = imapclient.DialInsecure(addr, nil)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("dial: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := client.Login(e.config.Username, e.config.Password).Wait(); err != nil {
|
||||||
|
client.Close()
|
||||||
|
return nil, fmt.Errorf("login: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return client, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *EmailConnector) FetchNew() ([]UnifiedMessage, error) {
|
||||||
|
client, err := e.connect()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
|
// Get messages from watched folders (default INBOX)
|
||||||
|
folders := e.config.Watch
|
||||||
|
if len(folders) == 0 {
|
||||||
|
folders = []string{"INBOX"}
|
||||||
|
}
|
||||||
|
|
||||||
|
var messages []UnifiedMessage
|
||||||
|
for _, folder := range folders {
|
||||||
|
msgs, err := e.fetchFromFolder(client, folder, true, time.Time{})
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("[%s] Error fetching from %s: %v", e.name, folder, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
messages = append(messages, msgs...)
|
||||||
|
}
|
||||||
|
|
||||||
|
return messages, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *EmailConnector) FetchSince(since time.Time) ([]UnifiedMessage, error) {
|
||||||
|
client, err := e.connect()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
|
// Search all folders including Archive so we can find recently-processed messages
|
||||||
|
folders := e.config.Watch
|
||||||
|
if len(folders) == 0 {
|
||||||
|
folders = []string{"INBOX"}
|
||||||
|
}
|
||||||
|
folders = appendIfMissing(folders, "Archive", "Trash", "Folders/Shopping")
|
||||||
|
|
||||||
|
var messages []UnifiedMessage
|
||||||
|
for _, folder := range folders {
|
||||||
|
msgs, err := e.fetchFromFolder(client, folder, false, since)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("[%s] Error fetching from %s: %v", e.name, folder, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
messages = append(messages, msgs...)
|
||||||
|
}
|
||||||
|
|
||||||
|
return messages, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *EmailConnector) fetchFromFolder(client *imapclient.Client, folder string, unseenOnly bool, since time.Time) ([]UnifiedMessage, error) {
|
||||||
|
_, err := client.Select(folder, nil).Wait()
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("select %s: %w", folder, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Build search criteria
|
||||||
|
criteria := &imap.SearchCriteria{}
|
||||||
|
if unseenOnly {
|
||||||
|
criteria.NotFlag = []imap.Flag{imap.FlagSeen}
|
||||||
|
}
|
||||||
|
if !since.IsZero() {
|
||||||
|
criteria.Since = since
|
||||||
|
}
|
||||||
|
|
||||||
|
searchData, err := client.Search(criteria, nil).Wait()
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("search: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
seqNums := searchData.AllSeqNums()
|
||||||
|
if len(seqNums) == 0 {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var seqSet imap.SeqSet
|
||||||
|
for _, num := range seqNums {
|
||||||
|
seqSet.AddNum(num)
|
||||||
|
}
|
||||||
|
|
||||||
|
return e.fetchMessages(client, seqSet, folder)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *EmailConnector) fetchMessages(client *imapclient.Client, seqSet imap.SeqSet, folder string) ([]UnifiedMessage, error) {
|
||||||
|
options := &imap.FetchOptions{
|
||||||
|
Envelope: true,
|
||||||
|
Flags: true,
|
||||||
|
UID: true,
|
||||||
|
BodyStructure: &imap.FetchItemBodyStructure{},
|
||||||
|
BodySection: []*imap.FetchItemBodySection{{}},
|
||||||
|
}
|
||||||
|
|
||||||
|
fetchCmd := client.Fetch(seqSet, options)
|
||||||
|
|
||||||
|
var messages []UnifiedMessage
|
||||||
|
for {
|
||||||
|
msgData := fetchCmd.Next()
|
||||||
|
if msgData == nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
buf, err := msgData.Collect()
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
msg := e.convertMessage(*buf, folder)
|
||||||
|
messages = append(messages, msg)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := fetchCmd.Close(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return messages, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *EmailConnector) convertMessage(buf imapclient.FetchMessageBuffer, folder string) UnifiedMessage {
|
||||||
|
msg := UnifiedMessage{
|
||||||
|
Source: e.name,
|
||||||
|
SourceUID: fmt.Sprintf("%d", buf.UID),
|
||||||
|
SourceExtra: folder,
|
||||||
|
Seen: false,
|
||||||
|
Attachments: []AttachmentMeta{},
|
||||||
|
}
|
||||||
|
|
||||||
|
// Generate unified ID
|
||||||
|
msg.ID = fmt.Sprintf("%s:%d", e.name, buf.UID)
|
||||||
|
|
||||||
|
// Parse envelope
|
||||||
|
if env := buf.Envelope; env != nil {
|
||||||
|
msg.Subject = env.Subject
|
||||||
|
msg.Timestamp = env.Date
|
||||||
|
if msg.Timestamp.IsZero() {
|
||||||
|
msg.Timestamp = buf.InternalDate
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(env.From) > 0 {
|
||||||
|
from := env.From[0]
|
||||||
|
msg.From = fmt.Sprintf("%s@%s", from.Mailbox, from.Host)
|
||||||
|
if from.Name != "" {
|
||||||
|
msg.FromName = from.Name
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(env.To) > 0 {
|
||||||
|
to := env.To[0]
|
||||||
|
msg.To = fmt.Sprintf("%s@%s", to.Mailbox, to.Host)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check seen flag
|
||||||
|
for _, f := range buf.Flags {
|
||||||
|
if f == imap.FlagSeen {
|
||||||
|
msg.Seen = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Parse body
|
||||||
|
if len(buf.BodySection) > 0 {
|
||||||
|
raw := buf.BodySection[0].Bytes
|
||||||
|
parsed := ParseMIMEBody(raw)
|
||||||
|
msg.Body = parsed.Text
|
||||||
|
if msg.Body == "" {
|
||||||
|
msg.Body = stripHTML(parsed.HTML)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Extract attachment metadata
|
||||||
|
attachments := ExtractAttachments(raw)
|
||||||
|
for _, att := range attachments {
|
||||||
|
msg.Attachments = append(msg.Attachments, AttachmentMeta{
|
||||||
|
Name: att.Filename,
|
||||||
|
Mime: att.ContentType,
|
||||||
|
Size: att.Size,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return msg
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *EmailConnector) FetchOne(sourceID string) (*UnifiedMessage, error) {
|
||||||
|
client, err := e.connect()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
|
// Parse UID from sourceID
|
||||||
|
var uid uint32
|
||||||
|
if _, err := fmt.Sscanf(sourceID, "%d", &uid); err != nil {
|
||||||
|
return nil, fmt.Errorf("invalid source ID: %s", sourceID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try watched folders first, then Archive/Trash/All Mail
|
||||||
|
folders := e.config.Watch
|
||||||
|
if len(folders) == 0 {
|
||||||
|
folders = []string{"INBOX"}
|
||||||
|
}
|
||||||
|
// Append fallback folders for finding archived/deleted messages
|
||||||
|
folders = appendIfMissing(folders, "Archive", "Trash", "All Mail", "Folders/Shopping")
|
||||||
|
|
||||||
|
for _, folder := range folders {
|
||||||
|
_, err := client.Select(folder, nil).Wait()
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
uidSet := imap.UIDSetNum(imap.UID(uid))
|
||||||
|
options := &imap.FetchOptions{
|
||||||
|
Envelope: true,
|
||||||
|
Flags: true,
|
||||||
|
UID: true,
|
||||||
|
BodyStructure: &imap.FetchItemBodyStructure{},
|
||||||
|
BodySection: []*imap.FetchItemBodySection{{}},
|
||||||
|
}
|
||||||
|
|
||||||
|
fetchCmd := client.Fetch(uidSet, options)
|
||||||
|
msgData := fetchCmd.Next()
|
||||||
|
if msgData != nil {
|
||||||
|
buf, err := msgData.Collect()
|
||||||
|
fetchCmd.Close()
|
||||||
|
if err == nil {
|
||||||
|
msg := e.convertMessage(*buf, folder)
|
||||||
|
return &msg, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
fetchCmd.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, fmt.Errorf("message not found: %s", sourceID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *EmailConnector) Archive(sourceID string) error {
|
||||||
|
return e.moveMessage(sourceID, "Archive")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *EmailConnector) Delete(sourceID string) error {
|
||||||
|
// Move to Trash instead of hard-deleting (expunging)
|
||||||
|
return e.moveMessage(sourceID, "Trash")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *EmailConnector) moveMessage(sourceID, destFolder string) error {
|
||||||
|
client, err := e.connect()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
|
var uid uint32
|
||||||
|
if _, err := fmt.Sscanf(sourceID, "%d", &uid); err != nil {
|
||||||
|
return fmt.Errorf("invalid source ID: %s", sourceID)
|
||||||
|
}
|
||||||
|
|
||||||
|
folders := e.config.Watch
|
||||||
|
if len(folders) == 0 {
|
||||||
|
folders = []string{"INBOX"}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, folder := range folders {
|
||||||
|
_, err := client.Select(folder, nil).Wait()
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
uidSet := imap.UIDSetNum(imap.UID(uid))
|
||||||
|
moveCmd := client.Move(uidSet, destFolder)
|
||||||
|
if _, err := moveCmd.Wait(); err != nil {
|
||||||
|
// Try creating folder
|
||||||
|
if strings.Contains(err.Error(), "TRYCREATE") || strings.Contains(err.Error(), "no such mailbox") {
|
||||||
|
client.Create(destFolder, nil).Wait()
|
||||||
|
moveCmd2 := client.Move(uidSet, destFolder)
|
||||||
|
if _, err2 := moveCmd2.Wait(); err2 != nil {
|
||||||
|
return err2
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return fmt.Errorf("message not found: %s", sourceID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *EmailConnector) Reply(sourceID string, body string, attachments []string) error {
|
||||||
|
// Get original message to extract reply-to address
|
||||||
|
msg, err := e.FetchOne(sourceID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Build email
|
||||||
|
to := msg.From
|
||||||
|
subject := msg.Subject
|
||||||
|
if !strings.HasPrefix(strings.ToLower(subject), "re:") {
|
||||||
|
subject = "Re: " + subject
|
||||||
|
}
|
||||||
|
|
||||||
|
// Simple SMTP send
|
||||||
|
smtpAddr := fmt.Sprintf("%s:%d", e.smtpConfig.Host, e.smtpConfig.Port)
|
||||||
|
|
||||||
|
var msgBuf bytes.Buffer
|
||||||
|
msgBuf.WriteString(fmt.Sprintf("To: %s\r\n", to))
|
||||||
|
msgBuf.WriteString(fmt.Sprintf("From: %s\r\n", e.smtpConfig.From))
|
||||||
|
msgBuf.WriteString(fmt.Sprintf("Subject: %s\r\n", mime.QEncoding.Encode("utf-8", subject)))
|
||||||
|
msgBuf.WriteString("MIME-Version: 1.0\r\n")
|
||||||
|
msgBuf.WriteString("Content-Type: text/plain; charset=utf-8\r\n")
|
||||||
|
msgBuf.WriteString("\r\n")
|
||||||
|
msgBuf.WriteString(body)
|
||||||
|
|
||||||
|
auth := smtp.PlainAuth("", e.smtpConfig.Username, e.smtpConfig.Password, e.smtpConfig.Host)
|
||||||
|
return smtp.SendMail(smtpAddr, auth, e.smtpConfig.From, []string{to}, msgBuf.Bytes())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *EmailConnector) MarkSeen(sourceID string) error {
|
||||||
|
client, err := e.connect()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
|
var uid uint32
|
||||||
|
if _, err := fmt.Sscanf(sourceID, "%d", &uid); err != nil {
|
||||||
|
return fmt.Errorf("invalid source ID: %s", sourceID)
|
||||||
|
}
|
||||||
|
|
||||||
|
folders := e.config.Watch
|
||||||
|
if len(folders) == 0 {
|
||||||
|
folders = []string{"INBOX"}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, folder := range folders {
|
||||||
|
_, err := client.Select(folder, nil).Wait()
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
uidSet := imap.UIDSetNum(imap.UID(uid))
|
||||||
|
storeCmd := client.Store(uidSet, &imap.StoreFlags{
|
||||||
|
Op: imap.StoreFlagsAdd,
|
||||||
|
Silent: true,
|
||||||
|
Flags: []imap.Flag{imap.FlagSeen},
|
||||||
|
}, nil)
|
||||||
|
storeCmd.Close()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return fmt.Errorf("message not found: %s", sourceID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *EmailConnector) GetAttachment(sourceID string, filename string) ([]byte, error) {
|
||||||
|
client, err := e.connect()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
|
var uid uint32
|
||||||
|
if _, err := fmt.Sscanf(sourceID, "%d", &uid); err != nil {
|
||||||
|
return nil, fmt.Errorf("invalid source ID: %s", sourceID)
|
||||||
|
}
|
||||||
|
|
||||||
|
folders := e.config.Watch
|
||||||
|
if len(folders) == 0 {
|
||||||
|
folders = []string{"INBOX"}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, folder := range folders {
|
||||||
|
_, err := client.Select(folder, nil).Wait()
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
uidSet := imap.UIDSetNum(imap.UID(uid))
|
||||||
|
options := &imap.FetchOptions{
|
||||||
|
UID: true,
|
||||||
|
BodySection: []*imap.FetchItemBodySection{{}},
|
||||||
|
}
|
||||||
|
|
||||||
|
fetchCmd := client.Fetch(uidSet, options)
|
||||||
|
msgData := fetchCmd.Next()
|
||||||
|
if msgData == nil {
|
||||||
|
fetchCmd.Close()
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
buf, err := msgData.Collect()
|
||||||
|
fetchCmd.Close()
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(buf.BodySection) == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
raw := buf.BodySection[0].Bytes
|
||||||
|
attachments := ExtractAttachments(raw)
|
||||||
|
for _, att := range attachments {
|
||||||
|
if att.Filename == filename {
|
||||||
|
decoded, err := base64.StdEncoding.DecodeString(att.Content)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return decoded, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, fmt.Errorf("attachment not found: %s", filename)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *EmailConnector) Start(callback func()) error {
|
||||||
|
e.mu.Lock()
|
||||||
|
defer e.mu.Unlock()
|
||||||
|
|
||||||
|
e.ctx, e.cancel = context.WithCancel(context.Background())
|
||||||
|
e.callback = callback
|
||||||
|
|
||||||
|
for _, folder := range e.config.Watch {
|
||||||
|
go e.watchFolder(folder)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *EmailConnector) watchFolder(folder string) {
|
||||||
|
log.Printf("[%s] Starting IDLE watcher for %s", e.name, folder)
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-e.ctx.Done():
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
err := e.runIDLE(folder)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("[%s] IDLE error on %s: %v, reconnecting in 10s", e.name, folder, err)
|
||||||
|
select {
|
||||||
|
case <-e.ctx.Done():
|
||||||
|
return
|
||||||
|
case <-time.After(10 * time.Second):
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *EmailConnector) runIDLE(folder string) error {
|
||||||
|
client, err := e.connect()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("connect: %w", err)
|
||||||
|
}
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
|
mbox, err := client.Select(folder, nil).Wait()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("select %s: %w", folder, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
prevCount := mbox.NumMessages
|
||||||
|
log.Printf("[%s] IDLE connected to %s (%d messages)", e.name, folder, prevCount)
|
||||||
|
|
||||||
|
for {
|
||||||
|
idleCmd, err := client.Idle()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("idle start: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
idleDone := make(chan error, 1)
|
||||||
|
go func() {
|
||||||
|
idleDone <- idleCmd.Wait()
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-e.ctx.Done():
|
||||||
|
idleCmd.Close()
|
||||||
|
return nil
|
||||||
|
case <-time.After(4 * time.Minute):
|
||||||
|
idleCmd.Close()
|
||||||
|
case err := <-idleDone:
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("[%s] IDLE ended with error: %v", e.name, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
mbox, err = client.Select(folder, nil).Wait()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("re-select %s: %w", folder, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if mbox.NumMessages > prevCount {
|
||||||
|
log.Printf("[%s] New mail in %s: %d -> %d", e.name, folder, prevCount, mbox.NumMessages)
|
||||||
|
if e.callback != nil {
|
||||||
|
go e.callback()
|
||||||
|
}
|
||||||
|
prevCount = mbox.NumMessages
|
||||||
|
} else {
|
||||||
|
prevCount = mbox.NumMessages
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *EmailConnector) Stop() {
|
||||||
|
e.mu.Lock()
|
||||||
|
defer e.mu.Unlock()
|
||||||
|
|
||||||
|
if e.cancel != nil {
|
||||||
|
e.cancel()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// appendIfMissing adds folders to the slice if not already present
|
||||||
|
func appendIfMissing(slice []string, items ...string) []string {
|
||||||
|
result := make([]string, len(slice))
|
||||||
|
copy(result, slice)
|
||||||
|
for _, item := range items {
|
||||||
|
found := false
|
||||||
|
for _, s := range result {
|
||||||
|
if s == item {
|
||||||
|
found = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !found {
|
||||||
|
result = append(result, item)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return result
|
||||||
|
}
|
||||||
File diff suppressed because it is too large
Load Diff
|
|
@ -0,0 +1,200 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"log"
|
||||||
|
"net/http"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// SignalConfig holds signal-cli connector configuration
|
||||||
|
type SignalConfig struct {
|
||||||
|
Enabled bool `yaml:"enabled"`
|
||||||
|
APIURL string `yaml:"api_url"` // e.g. http://localhost:8080
|
||||||
|
Number string `yaml:"number"` // bot number, e.g. +31634481877
|
||||||
|
Contacts map[string]string `yaml:"contacts"` // phone -> display name
|
||||||
|
PollInterval int `yaml:"poll_interval"` // seconds, default 5
|
||||||
|
}
|
||||||
|
|
||||||
|
// SignalConnector listens for incoming Signal messages and posts them to Fully.
|
||||||
|
// Outgoing sync messages (Johan replying from his phone) auto-clear the alert.
|
||||||
|
type SignalConnector struct {
|
||||||
|
cfg SignalConfig
|
||||||
|
alerts map[string][]string // sender number -> []Fully alertIDs
|
||||||
|
mu sync.Mutex
|
||||||
|
stopCh chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewSignalConnector(cfg SignalConfig) *SignalConnector {
|
||||||
|
if cfg.APIURL == "" {
|
||||||
|
cfg.APIURL = "http://localhost:8080"
|
||||||
|
}
|
||||||
|
if cfg.PollInterval <= 0 {
|
||||||
|
cfg.PollInterval = 5
|
||||||
|
}
|
||||||
|
return &SignalConnector{
|
||||||
|
cfg: cfg,
|
||||||
|
alerts: make(map[string][]string),
|
||||||
|
stopCh: make(chan struct{}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *SignalConnector) Start() {
|
||||||
|
// Incoming Signal messages are consumed by OpenClaw's daemon — we can't poll.
|
||||||
|
// Incoming→Fully flow is handled by James session logic when messages arrive.
|
||||||
|
// This connector is used for outgoing sends only (via Send()).
|
||||||
|
log.Printf("[signal] Started (send-only mode, incoming handled by OpenClaw)")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *SignalConnector) Stop() {
|
||||||
|
close(c.stopCh)
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- signal-cli envelope types ---
|
||||||
|
|
||||||
|
type signalEnvelope struct {
|
||||||
|
Envelope struct {
|
||||||
|
Source string `json:"source"`
|
||||||
|
SourceName string `json:"sourceName"`
|
||||||
|
// Incoming message
|
||||||
|
DataMessage *signalDataMessage `json:"dataMessage"`
|
||||||
|
// Outgoing sync (Johan replied from his own device)
|
||||||
|
SyncMessage *struct {
|
||||||
|
SentMessage *struct {
|
||||||
|
Destination string `json:"destination"`
|
||||||
|
Message string `json:"message"`
|
||||||
|
Timestamp int64 `json:"timestamp"`
|
||||||
|
} `json:"sentMessage"`
|
||||||
|
} `json:"syncMessage"`
|
||||||
|
} `json:"envelope"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type signalDataMessage struct {
|
||||||
|
Message string `json:"message"`
|
||||||
|
Timestamp int64 `json:"timestamp"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// receive fetches pending messages from signal-cli HTTP daemon
|
||||||
|
func (c *SignalConnector) receive() []signalEnvelope {
|
||||||
|
url := c.cfg.APIURL + "/v1/receive/" + c.cfg.Number
|
||||||
|
resp, err := http.Get(url)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("[signal] receive error: %v", err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
if resp.StatusCode == 204 {
|
||||||
|
return nil // no messages
|
||||||
|
}
|
||||||
|
|
||||||
|
body, _ := io.ReadAll(resp.Body)
|
||||||
|
if len(body) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var envelopes []signalEnvelope
|
||||||
|
if err := json.Unmarshal(body, &envelopes); err != nil {
|
||||||
|
log.Printf("[signal] parse error: %v", err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return envelopes
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *SignalConnector) processMessages() {
|
||||||
|
envelopes := c.receive()
|
||||||
|
for _, env := range envelopes {
|
||||||
|
e := env.Envelope
|
||||||
|
|
||||||
|
// Outgoing sync — Johan replied from his own phone → clear Fully alert
|
||||||
|
if e.SyncMessage != nil && e.SyncMessage.SentMessage != nil {
|
||||||
|
dest := e.SyncMessage.SentMessage.Destination
|
||||||
|
if dest != "" {
|
||||||
|
c.clearAlerts(dest)
|
||||||
|
log.Printf("[signal] Johan sent to %s — cleared alerts", c.resolveName(dest))
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Skip our own bot number echoing back
|
||||||
|
if e.Source == c.cfg.Number {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Incoming data message
|
||||||
|
if e.DataMessage == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
msg := strings.TrimSpace(e.DataMessage.Message)
|
||||||
|
if msg == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
name := e.SourceName
|
||||||
|
if name == "" {
|
||||||
|
name = c.resolveName(e.Source)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("[signal] Incoming from %s (%s): %s", name, e.Source, truncateStr(msg, 80))
|
||||||
|
|
||||||
|
alertText := fmt.Sprintf("📱 %s: %s", name, truncateStr(msg, 200))
|
||||||
|
alertID := postFullyAlert(alertText, "info", "signal:"+e.Source)
|
||||||
|
|
||||||
|
c.mu.Lock()
|
||||||
|
c.alerts[e.Source] = append(c.alerts[e.Source], alertID)
|
||||||
|
c.mu.Unlock()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *SignalConnector) clearAlerts(sender string) {
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
for _, id := range c.alerts[sender] {
|
||||||
|
if id != "" {
|
||||||
|
removeFullyAlert(id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
delete(c.alerts, sender)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *SignalConnector) resolveName(number string) string {
|
||||||
|
if name, ok := c.cfg.Contacts[number]; ok {
|
||||||
|
return name
|
||||||
|
}
|
||||||
|
return number
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send sends a Signal message to a recipient number via JSON-RPC
|
||||||
|
func (c *SignalConnector) Send(recipient, message string) error {
|
||||||
|
payload, _ := json.Marshal(map[string]interface{}{
|
||||||
|
"jsonrpc": "2.0",
|
||||||
|
"method": "send",
|
||||||
|
"params": map[string]interface{}{
|
||||||
|
"recipient": recipient,
|
||||||
|
"message": message,
|
||||||
|
},
|
||||||
|
"id": fmt.Sprintf("%d", time.Now().UnixMilli()),
|
||||||
|
})
|
||||||
|
url := c.cfg.APIURL + "/api/v1/rpc"
|
||||||
|
resp, err := http.Post(url, "application/json", bytes.NewReader(payload))
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("signal send: %w", err)
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
body, _ := io.ReadAll(resp.Body)
|
||||||
|
var rpcResp struct {
|
||||||
|
Error *struct {
|
||||||
|
Code int `json:"code"`
|
||||||
|
Message string `json:"message"`
|
||||||
|
} `json:"error"`
|
||||||
|
}
|
||||||
|
if json.Unmarshal(body, &rpcResp) == nil && rpcResp.Error != nil {
|
||||||
|
return fmt.Errorf("signal rpc error %d: %s", rpcResp.Error.Code, rpcResp.Error.Message)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,329 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"log"
|
||||||
|
"net/http"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// SmsConnector wraps the ClawdNode Gateway HTTP API for SMS access
|
||||||
|
type SmsConnector struct {
|
||||||
|
name string
|
||||||
|
gatewayURL string // e.g., http://localhost:9877
|
||||||
|
wsURL string // e.g., ws://localhost:9878 (for future real-time)
|
||||||
|
ctx chan struct{}
|
||||||
|
callback func()
|
||||||
|
mu sync.Mutex
|
||||||
|
lastSeenID int64 // Track highest SMS ID seen
|
||||||
|
seenIDs map[string]bool // Track actioned message IDs
|
||||||
|
}
|
||||||
|
|
||||||
|
// ClawdNode SMS message format (from phone)
|
||||||
|
type ClawdNodeSmsMessage struct {
|
||||||
|
ID int64 `json:"id"`
|
||||||
|
Address string `json:"address"`
|
||||||
|
ContactName string `json:"contactName"`
|
||||||
|
Body string `json:"body"`
|
||||||
|
Date int64 `json:"date"`
|
||||||
|
Read bool `json:"read"`
|
||||||
|
Type int `json:"type"` // 1=inbox, 2=sent
|
||||||
|
ThreadID int64 `json:"threadId"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type ClawdNodeSmsThread struct {
|
||||||
|
ThreadID int64 `json:"threadId"`
|
||||||
|
Address string `json:"address"`
|
||||||
|
ContactName string `json:"contactName"`
|
||||||
|
LastMessage string `json:"lastMessage"`
|
||||||
|
LastDate int64 `json:"lastDate"`
|
||||||
|
MessageCount int `json:"messageCount"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// ClawdNode command response
|
||||||
|
type CommandResponse struct {
|
||||||
|
Success bool `json:"success"`
|
||||||
|
CommandID string `json:"commandId"`
|
||||||
|
Error string `json:"error,omitempty"`
|
||||||
|
Data json.RawMessage `json:"data,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewSmsConnector(name, gatewayURL string) *SmsConnector {
|
||||||
|
return &SmsConnector{
|
||||||
|
name: name,
|
||||||
|
gatewayURL: gatewayURL,
|
||||||
|
seenIDs: make(map[string]bool),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *SmsConnector) Name() string {
|
||||||
|
return s.name
|
||||||
|
}
|
||||||
|
|
||||||
|
// sendCommand sends a command to the phone via ClawdNode gateway
|
||||||
|
func (s *SmsConnector) sendCommand(command string, params map[string]interface{}) (*CommandResponse, error) {
|
||||||
|
body := map[string]interface{}{
|
||||||
|
"command": command,
|
||||||
|
"params": params,
|
||||||
|
"wait": true,
|
||||||
|
"timeout": 15000,
|
||||||
|
}
|
||||||
|
data, _ := json.Marshal(body)
|
||||||
|
|
||||||
|
resp, err := http.Post(s.gatewayURL+"/command", "application/json", bytes.NewReader(data))
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("gateway request: %w", err)
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
if resp.StatusCode != 200 {
|
||||||
|
b, _ := io.ReadAll(resp.Body)
|
||||||
|
return nil, fmt.Errorf("gateway status %d: %s", resp.StatusCode, string(b))
|
||||||
|
}
|
||||||
|
|
||||||
|
var cmdResp CommandResponse
|
||||||
|
if err := json.NewDecoder(resp.Body).Decode(&cmdResp); err != nil {
|
||||||
|
return nil, fmt.Errorf("decode response: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !cmdResp.Success {
|
||||||
|
return &cmdResp, fmt.Errorf("command failed: %s", cmdResp.Error)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &cmdResp, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// fetchMessages gets SMS messages from the phone
|
||||||
|
func (s *SmsConnector) fetchMessages(limit int, since int64) ([]ClawdNodeSmsMessage, error) {
|
||||||
|
params := map[string]interface{}{
|
||||||
|
"limit": limit,
|
||||||
|
}
|
||||||
|
if since > 0 {
|
||||||
|
params["since"] = since
|
||||||
|
}
|
||||||
|
// Only inbox messages (type=1)
|
||||||
|
params["type"] = 1
|
||||||
|
|
||||||
|
resp, err := s.sendCommand("sms.list", params)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var wrapper struct {
|
||||||
|
Messages []ClawdNodeSmsMessage `json:"messages"`
|
||||||
|
}
|
||||||
|
if err := json.Unmarshal(resp.Data, &wrapper); err != nil {
|
||||||
|
return nil, fmt.Errorf("decode messages: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return wrapper.Messages, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *SmsConnector) convertMessage(msg ClawdNodeSmsMessage) UnifiedMessage {
|
||||||
|
fromName := msg.ContactName
|
||||||
|
if fromName == "" {
|
||||||
|
fromName = msg.Address
|
||||||
|
}
|
||||||
|
|
||||||
|
return UnifiedMessage{
|
||||||
|
ID: fmt.Sprintf("%s:%d", s.name, msg.ID),
|
||||||
|
Source: s.name,
|
||||||
|
SourceUID: fmt.Sprintf("%d", msg.ID),
|
||||||
|
From: msg.Address,
|
||||||
|
FromName: fromName,
|
||||||
|
To: "me", // Inbox messages are to us
|
||||||
|
Subject: "", // SMS has no subject
|
||||||
|
Timestamp: time.UnixMilli(msg.Date),
|
||||||
|
Body: msg.Body,
|
||||||
|
Seen: msg.Read,
|
||||||
|
Attachments: []AttachmentMeta{},
|
||||||
|
SourceExtra: msg.ThreadID,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *SmsConnector) FetchNew() ([]UnifiedMessage, error) {
|
||||||
|
// Fetch recent inbox messages (last 48h)
|
||||||
|
since := time.Now().Add(-48 * time.Hour).UnixMilli()
|
||||||
|
messages, err := s.fetchMessages(100, since)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
|
var result []UnifiedMessage
|
||||||
|
for _, msg := range messages {
|
||||||
|
id := fmt.Sprintf("%s:%d", s.name, msg.ID)
|
||||||
|
if s.seenIDs[id] {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// Only unread messages count as "new"
|
||||||
|
if !msg.Read {
|
||||||
|
result = append(result, s.convertMessage(msg))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *SmsConnector) FetchSince(since time.Time) ([]UnifiedMessage, error) {
|
||||||
|
sinceMs := since.UnixMilli()
|
||||||
|
messages, err := s.fetchMessages(100, sinceMs)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var result []UnifiedMessage
|
||||||
|
for _, msg := range messages {
|
||||||
|
if msg.Date >= sinceMs {
|
||||||
|
result = append(result, s.convertMessage(msg))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *SmsConnector) FetchOne(sourceID string) (*UnifiedMessage, error) {
|
||||||
|
// sourceID is the SMS ID (numeric string)
|
||||||
|
var id int64
|
||||||
|
fmt.Sscanf(sourceID, "%d", &id)
|
||||||
|
|
||||||
|
resp, err := s.sendCommand("sms.read", map[string]interface{}{
|
||||||
|
"id": id,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var msg ClawdNodeSmsMessage
|
||||||
|
if err := json.Unmarshal(resp.Data, &msg); err != nil {
|
||||||
|
return nil, fmt.Errorf("decode message: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
um := s.convertMessage(msg)
|
||||||
|
return &um, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *SmsConnector) Archive(sourceID string) error {
|
||||||
|
// SMS doesn't have archive — mark as seen locally
|
||||||
|
s.mu.Lock()
|
||||||
|
s.seenIDs[fmt.Sprintf("%s:%s", s.name, sourceID)] = true
|
||||||
|
s.mu.Unlock()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *SmsConnector) Delete(sourceID string) error {
|
||||||
|
var id int64
|
||||||
|
fmt.Sscanf(sourceID, "%d", &id)
|
||||||
|
|
||||||
|
_, err := s.sendCommand("sms.delete", map[string]interface{}{
|
||||||
|
"id": id,
|
||||||
|
})
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *SmsConnector) Reply(sourceID string, body string, attachments []string) error {
|
||||||
|
// Get original message to find the phone number
|
||||||
|
msg, err := s.FetchOne(sourceID)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("fetch original: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = s.sendCommand("sms.send", map[string]interface{}{
|
||||||
|
"to": msg.From,
|
||||||
|
"body": body,
|
||||||
|
})
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *SmsConnector) MarkSeen(sourceID string) error {
|
||||||
|
s.mu.Lock()
|
||||||
|
s.seenIDs[fmt.Sprintf("%s:%s", s.name, sourceID)] = true
|
||||||
|
s.mu.Unlock()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *SmsConnector) GetAttachment(sourceID string, filename string) ([]byte, error) {
|
||||||
|
return nil, fmt.Errorf("SMS does not support attachments")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *SmsConnector) Start(callback func()) error {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
|
s.ctx = make(chan struct{})
|
||||||
|
s.callback = callback
|
||||||
|
|
||||||
|
// Initialize seen set with current messages
|
||||||
|
messages, err := s.fetchMessages(50, 0)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("[%s] Initial fetch failed (phone may be disconnected): %v", s.name, err)
|
||||||
|
} else {
|
||||||
|
for _, msg := range messages {
|
||||||
|
s.seenIDs[fmt.Sprintf("%s:%d", s.name, msg.ID)] = true
|
||||||
|
}
|
||||||
|
log.Printf("[%s] Initialized with %d existing messages", s.name, len(messages))
|
||||||
|
}
|
||||||
|
|
||||||
|
go s.pollLoop()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *SmsConnector) pollLoop() {
|
||||||
|
ticker := time.NewTicker(30 * time.Second)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
log.Printf("[%s] Started polling for new SMS", s.name)
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-s.ctx:
|
||||||
|
return
|
||||||
|
case <-ticker.C:
|
||||||
|
s.checkForNew()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *SmsConnector) checkForNew() {
|
||||||
|
// Fetch recent unread inbox messages
|
||||||
|
since := time.Now().Add(-1 * time.Hour).UnixMilli()
|
||||||
|
messages, err := s.fetchMessages(20, since)
|
||||||
|
if err != nil {
|
||||||
|
// Don't spam logs if phone is disconnected
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
hasNew := false
|
||||||
|
s.mu.Lock()
|
||||||
|
for _, msg := range messages {
|
||||||
|
id := fmt.Sprintf("%s:%d", s.name, msg.ID)
|
||||||
|
if !s.seenIDs[id] && !msg.Read {
|
||||||
|
hasNew = true
|
||||||
|
s.seenIDs[id] = true
|
||||||
|
log.Printf("[%s] New SMS from %s (%s): %s", s.name,
|
||||||
|
msg.ContactName, msg.Address, truncateStr(msg.Body, 50))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
callback := s.callback
|
||||||
|
s.mu.Unlock()
|
||||||
|
|
||||||
|
if hasNew && callback != nil {
|
||||||
|
go callback()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *SmsConnector) Stop() {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
|
if s.ctx != nil {
|
||||||
|
close(s.ctx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,359 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"log"
|
||||||
|
"net/http"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// WhatsAppConnector wraps the message-bridge HTTP API
|
||||||
|
type WhatsAppConnector struct {
|
||||||
|
name string
|
||||||
|
baseURL string
|
||||||
|
ctx chan struct{}
|
||||||
|
callback func()
|
||||||
|
mu sync.Mutex
|
||||||
|
lastSeen map[string]bool // Track seen message IDs
|
||||||
|
}
|
||||||
|
|
||||||
|
// WhatsAppMessage is the format from message-bridge
|
||||||
|
type WhatsAppMessage struct {
|
||||||
|
ID string `json:"id"`
|
||||||
|
Platform string `json:"platform"`
|
||||||
|
From string `json:"from"`
|
||||||
|
FromName string `json:"from_name,omitempty"`
|
||||||
|
To string `json:"to,omitempty"`
|
||||||
|
Body string `json:"body"`
|
||||||
|
Timestamp time.Time `json:"timestamp"`
|
||||||
|
IsGroup bool `json:"is_group"`
|
||||||
|
GroupName string `json:"group_name,omitempty"`
|
||||||
|
MediaType string `json:"media_type,omitempty"`
|
||||||
|
MediaURL string `json:"media_url,omitempty"`
|
||||||
|
MediaPath string `json:"media_path,omitempty"`
|
||||||
|
HasMedia bool `json:"has_media"`
|
||||||
|
Transcription string `json:"transcription,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewWhatsAppConnector creates a new WhatsApp connector
|
||||||
|
func NewWhatsAppConnector(name, baseURL string) *WhatsAppConnector {
|
||||||
|
return &WhatsAppConnector{
|
||||||
|
name: name,
|
||||||
|
baseURL: baseURL,
|
||||||
|
lastSeen: make(map[string]bool),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *WhatsAppConnector) Name() string {
|
||||||
|
return w.name
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *WhatsAppConnector) fetchMessages() ([]WhatsAppMessage, error) {
|
||||||
|
resp, err := http.Get(w.baseURL + "/messages")
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("fetch messages: %w", err)
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
if resp.StatusCode != 200 {
|
||||||
|
body, _ := io.ReadAll(resp.Body)
|
||||||
|
return nil, fmt.Errorf("fetch messages: status %d: %s", resp.StatusCode, string(body))
|
||||||
|
}
|
||||||
|
|
||||||
|
var messages []WhatsAppMessage
|
||||||
|
if err := json.NewDecoder(resp.Body).Decode(&messages); err != nil {
|
||||||
|
return nil, fmt.Errorf("decode messages: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return messages, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *WhatsAppConnector) convertMessage(msg WhatsAppMessage) UnifiedMessage {
|
||||||
|
um := UnifiedMessage{
|
||||||
|
ID: fmt.Sprintf("%s:%s", w.name, msg.ID),
|
||||||
|
Source: w.name,
|
||||||
|
SourceUID: msg.ID,
|
||||||
|
From: msg.From,
|
||||||
|
FromName: msg.FromName,
|
||||||
|
Timestamp: msg.Timestamp,
|
||||||
|
Body: msg.Body,
|
||||||
|
Seen: false, // WhatsApp doesn't have a seen concept in our bridge
|
||||||
|
Attachments: []AttachmentMeta{},
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handle group vs direct
|
||||||
|
if msg.IsGroup {
|
||||||
|
um.To = msg.GroupName
|
||||||
|
if um.To == "" {
|
||||||
|
um.To = msg.To
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// For direct messages, "to" is the recipient (us)
|
||||||
|
um.To = msg.To
|
||||||
|
}
|
||||||
|
|
||||||
|
// Use transcription as body for voice messages if available
|
||||||
|
if msg.Transcription != "" && msg.Body == "" {
|
||||||
|
um.Body = msg.Transcription
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handle media as attachment
|
||||||
|
if msg.HasMedia && msg.MediaPath != "" {
|
||||||
|
filename := filepath.Base(msg.MediaPath)
|
||||||
|
mimeType := "application/octet-stream"
|
||||||
|
switch msg.MediaType {
|
||||||
|
case "image":
|
||||||
|
mimeType = "image/jpeg"
|
||||||
|
case "video":
|
||||||
|
mimeType = "video/mp4"
|
||||||
|
case "voice", "audio":
|
||||||
|
mimeType = "audio/ogg"
|
||||||
|
case "document":
|
||||||
|
mimeType = "application/pdf"
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get file size
|
||||||
|
var size int
|
||||||
|
if fi, err := os.Stat(msg.MediaPath); err == nil {
|
||||||
|
size = int(fi.Size())
|
||||||
|
}
|
||||||
|
|
||||||
|
um.Attachments = append(um.Attachments, AttachmentMeta{
|
||||||
|
Name: filename,
|
||||||
|
Mime: mimeType,
|
||||||
|
Size: size,
|
||||||
|
})
|
||||||
|
|
||||||
|
// Store media path for later retrieval
|
||||||
|
um.SourceExtra = msg.MediaPath
|
||||||
|
}
|
||||||
|
|
||||||
|
return um
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *WhatsAppConnector) FetchNew() ([]UnifiedMessage, error) {
|
||||||
|
messages, err := w.fetchMessages()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
w.mu.Lock()
|
||||||
|
defer w.mu.Unlock()
|
||||||
|
|
||||||
|
var result []UnifiedMessage
|
||||||
|
for _, msg := range messages {
|
||||||
|
// Skip if already marked as seen
|
||||||
|
if w.lastSeen[msg.ID] {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// Filter to last 24 hours for "new"
|
||||||
|
if time.Since(msg.Timestamp) < 24*time.Hour {
|
||||||
|
result = append(result, w.convertMessage(msg))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *WhatsAppConnector) FetchSince(since time.Time) ([]UnifiedMessage, error) {
|
||||||
|
messages, err := w.fetchMessages()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var result []UnifiedMessage
|
||||||
|
for _, msg := range messages {
|
||||||
|
if msg.Timestamp.After(since) || msg.Timestamp.Equal(since) {
|
||||||
|
result = append(result, w.convertMessage(msg))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *WhatsAppConnector) FetchOne(sourceID string) (*UnifiedMessage, error) {
|
||||||
|
messages, err := w.fetchMessages()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, msg := range messages {
|
||||||
|
if msg.ID == sourceID {
|
||||||
|
um := w.convertMessage(msg)
|
||||||
|
return &um, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, fmt.Errorf("message not found: %s", sourceID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *WhatsAppConnector) Archive(sourceID string) error {
|
||||||
|
// WhatsApp doesn't support archiving individual messages
|
||||||
|
// Mark it as seen in our local tracking
|
||||||
|
w.mu.Lock()
|
||||||
|
w.lastSeen[sourceID] = true
|
||||||
|
w.mu.Unlock()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *WhatsAppConnector) Delete(sourceID string) error {
|
||||||
|
// WhatsApp doesn't support deleting messages via API
|
||||||
|
return fmt.Errorf("WhatsApp does not support message deletion")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *WhatsAppConnector) Reply(sourceID string, body string, attachments []string) error {
|
||||||
|
// Get original message to find the chat
|
||||||
|
msg, err := w.FetchOne(sourceID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Build send request
|
||||||
|
sendReq := map[string]interface{}{
|
||||||
|
"to": msg.From, // Reply to sender
|
||||||
|
"body": body,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handle attachments if any
|
||||||
|
if len(attachments) > 0 {
|
||||||
|
sendReq["media"] = attachments[0] // message-bridge only supports one attachment
|
||||||
|
}
|
||||||
|
|
||||||
|
reqBody, _ := json.Marshal(sendReq)
|
||||||
|
resp, err := http.Post(w.baseURL+"/send", "application/json", bytes.NewReader(reqBody))
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("send reply: %w", err)
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
if resp.StatusCode >= 400 {
|
||||||
|
body, _ := io.ReadAll(resp.Body)
|
||||||
|
return fmt.Errorf("send reply: status %d: %s", resp.StatusCode, string(body))
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *WhatsAppConnector) MarkSeen(sourceID string) error {
|
||||||
|
// Track locally
|
||||||
|
w.mu.Lock()
|
||||||
|
w.lastSeen[sourceID] = true
|
||||||
|
w.mu.Unlock()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *WhatsAppConnector) GetAttachment(sourceID string, filename string) ([]byte, error) {
|
||||||
|
// First, find the message to get the media path
|
||||||
|
messages, err := w.fetchMessages()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, msg := range messages {
|
||||||
|
if msg.ID == sourceID && msg.MediaPath != "" {
|
||||||
|
// Check if this matches the requested filename
|
||||||
|
if strings.HasSuffix(msg.MediaPath, filename) || filepath.Base(msg.MediaPath) == filename {
|
||||||
|
return os.ReadFile(msg.MediaPath)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try fetching from message-bridge media endpoint
|
||||||
|
resp, err := http.Get(w.baseURL + "/media/" + filename)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("fetch media: %w", err)
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
if resp.StatusCode != 200 {
|
||||||
|
return nil, fmt.Errorf("media not found: %s", filename)
|
||||||
|
}
|
||||||
|
|
||||||
|
return io.ReadAll(resp.Body)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *WhatsAppConnector) Start(callback func()) error {
|
||||||
|
w.mu.Lock()
|
||||||
|
defer w.mu.Unlock()
|
||||||
|
|
||||||
|
w.ctx = make(chan struct{})
|
||||||
|
w.callback = callback
|
||||||
|
|
||||||
|
// Poll for new messages periodically
|
||||||
|
go w.pollLoop()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *WhatsAppConnector) pollLoop() {
|
||||||
|
ticker := time.NewTicker(10 * time.Second)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
// Initial fetch to populate seen set
|
||||||
|
msgs, err := w.fetchMessages()
|
||||||
|
if err == nil {
|
||||||
|
w.mu.Lock()
|
||||||
|
for _, msg := range msgs {
|
||||||
|
w.lastSeen[msg.ID] = true
|
||||||
|
}
|
||||||
|
w.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("[%s] Started polling for new messages", w.name)
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-w.ctx:
|
||||||
|
return
|
||||||
|
case <-ticker.C:
|
||||||
|
w.checkForNew()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *WhatsAppConnector) checkForNew() {
|
||||||
|
msgs, err := w.fetchMessages()
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("[%s] Poll error: %v", w.name, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
hasNew := false
|
||||||
|
w.mu.Lock()
|
||||||
|
for _, msg := range msgs {
|
||||||
|
if !w.lastSeen[msg.ID] {
|
||||||
|
hasNew = true
|
||||||
|
w.lastSeen[msg.ID] = true
|
||||||
|
log.Printf("[%s] New message from %s: %s", w.name, msg.FromName, truncateStr(msg.Body, 50))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
callback := w.callback
|
||||||
|
w.mu.Unlock()
|
||||||
|
|
||||||
|
if hasNew && callback != nil {
|
||||||
|
go callback()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func truncateStr(s string, maxLen int) string {
|
||||||
|
if len(s) <= maxLen {
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
return s[:maxLen] + "..."
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *WhatsAppConnector) Stop() {
|
||||||
|
w.mu.Lock()
|
||||||
|
defer w.mu.Unlock()
|
||||||
|
|
||||||
|
if w.ctx != nil {
|
||||||
|
close(w.ctx)
|
||||||
|
}
|
||||||
|
}
|
||||||
3
go.mod
3
go.mod
|
|
@ -1,4 +1,4 @@
|
||||||
module mail-bridge
|
module message-center
|
||||||
|
|
||||||
go 1.22.0
|
go 1.22.0
|
||||||
|
|
||||||
|
|
@ -10,5 +10,6 @@ require (
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/emersion/go-sasl v0.0.0-20231106173351-e73c9f7bad43 // indirect
|
github.com/emersion/go-sasl v0.0.0-20231106173351-e73c9f7bad43 // indirect
|
||||||
|
github.com/mattn/go-sqlite3 v1.14.33 // indirect
|
||||||
golang.org/x/text v0.14.0 // indirect
|
golang.org/x/text v0.14.0 // indirect
|
||||||
)
|
)
|
||||||
|
|
|
||||||
2
go.sum
2
go.sum
|
|
@ -4,6 +4,8 @@ github.com/emersion/go-message v0.18.2 h1:rl55SQdjd9oJcIoQNhubD2Acs1E6IzlZISRTK7
|
||||||
github.com/emersion/go-message v0.18.2/go.mod h1:XpJyL70LwRvq2a8rVbHXikPgKj8+aI0kGdHlg16ibYA=
|
github.com/emersion/go-message v0.18.2/go.mod h1:XpJyL70LwRvq2a8rVbHXikPgKj8+aI0kGdHlg16ibYA=
|
||||||
github.com/emersion/go-sasl v0.0.0-20231106173351-e73c9f7bad43 h1:hH4PQfOndHDlpzYfLAAfl63E8Le6F2+EL/cdhlkyRJY=
|
github.com/emersion/go-sasl v0.0.0-20231106173351-e73c9f7bad43 h1:hH4PQfOndHDlpzYfLAAfl63E8Le6F2+EL/cdhlkyRJY=
|
||||||
github.com/emersion/go-sasl v0.0.0-20231106173351-e73c9f7bad43/go.mod h1:iL2twTeMvZnrg54ZoPDNfJaJaqy0xIQFuBdrLsmspwQ=
|
github.com/emersion/go-sasl v0.0.0-20231106173351-e73c9f7bad43/go.mod h1:iL2twTeMvZnrg54ZoPDNfJaJaqy0xIQFuBdrLsmspwQ=
|
||||||
|
github.com/mattn/go-sqlite3 v1.14.33 h1:A5blZ5ulQo2AtayQ9/limgHEkFreKj1Dv226a1K73s0=
|
||||||
|
github.com/mattn/go-sqlite3 v1.14.33/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
|
||||||
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
|
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
|
||||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||||
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
|
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,33 @@
|
||||||
|
#!/bin/bash
|
||||||
|
set -e
|
||||||
|
|
||||||
|
# Build
|
||||||
|
echo "Building message-center..."
|
||||||
|
/usr/local/go/bin/go build -o message-center .
|
||||||
|
|
||||||
|
# Install systemd service
|
||||||
|
echo "Installing systemd service..."
|
||||||
|
mkdir -p ~/.config/systemd/user/
|
||||||
|
cp message-center.service ~/.config/systemd/user/
|
||||||
|
|
||||||
|
# Create env file if it doesn't exist
|
||||||
|
if [ ! -f ~/.config/message-center.env ]; then
|
||||||
|
echo "Creating ~/.config/message-center.env..."
|
||||||
|
cat > ~/.config/message-center.env << 'EOF'
|
||||||
|
# Proton Bridge passwords - get from pass store
|
||||||
|
PROTON_BRIDGE_PASSWORD=
|
||||||
|
JOHAN_BRIDGE_PASSWORD=
|
||||||
|
EOF
|
||||||
|
echo "Please edit ~/.config/message-center.env and add your passwords"
|
||||||
|
fi
|
||||||
|
|
||||||
|
# Reload and enable
|
||||||
|
systemctl --user daemon-reload
|
||||||
|
systemctl --user enable message-center
|
||||||
|
|
||||||
|
echo ""
|
||||||
|
echo "Installation complete!"
|
||||||
|
echo ""
|
||||||
|
echo "To start: systemctl --user start message-center"
|
||||||
|
echo "To check: systemctl --user status message-center"
|
||||||
|
echo "Logs: journalctl --user -u message-center -f"
|
||||||
BIN
mail-bridge
BIN
mail-bridge
Binary file not shown.
|
|
@ -0,0 +1,15 @@
|
||||||
|
[Unit]
|
||||||
|
Description=Message Center - Unified messaging API (email, WhatsApp)
|
||||||
|
After=network.target protonmail-bridge.service message-bridge.service
|
||||||
|
Wants=protonmail-bridge.service message-bridge.service
|
||||||
|
|
||||||
|
[Service]
|
||||||
|
Type=simple
|
||||||
|
WorkingDirectory=/home/johan/dev/mail-bridge
|
||||||
|
EnvironmentFile=/home/johan/.config/message-center.env
|
||||||
|
ExecStart=/home/johan/dev/mail-bridge/message-center -config config.yaml
|
||||||
|
Restart=always
|
||||||
|
RestartSec=5
|
||||||
|
|
||||||
|
[Install]
|
||||||
|
WantedBy=default.target
|
||||||
94
mime.go
94
mime.go
|
|
@ -210,49 +210,77 @@ func extractAttachmentParts(entity *message.Entity, attachments *[]Attachment) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// stripHTML removes HTML tags and returns plain text
|
// stripHTML removes HTML tags and returns plain text
|
||||||
func stripHTML(html string) string {
|
func stripHTML(htmlContent string) string {
|
||||||
var result strings.Builder
|
// Remove style and script blocks first
|
||||||
|
result := htmlContent
|
||||||
|
|
||||||
|
// Remove <style>...</style> blocks
|
||||||
|
for {
|
||||||
|
start := strings.Index(strings.ToLower(result), "<style")
|
||||||
|
if start == -1 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
end := strings.Index(strings.ToLower(result[start:]), "</style>")
|
||||||
|
if end == -1 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
result = result[:start] + result[start+end+8:]
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove <script>...</script> blocks
|
||||||
|
for {
|
||||||
|
start := strings.Index(strings.ToLower(result), "<script")
|
||||||
|
if start == -1 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
end := strings.Index(strings.ToLower(result[start:]), "</script>")
|
||||||
|
if end == -1 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
result = result[:start] + result[start+end+9:]
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove <head>...</head> blocks
|
||||||
|
for {
|
||||||
|
start := strings.Index(strings.ToLower(result), "<head")
|
||||||
|
if start == -1 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
end := strings.Index(strings.ToLower(result[start:]), "</head>")
|
||||||
|
if end == -1 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
result = result[:start] + result[start+end+7:]
|
||||||
|
}
|
||||||
|
|
||||||
|
// Replace common block elements with newlines
|
||||||
|
result = strings.ReplaceAll(result, "<br>", "\n")
|
||||||
|
result = strings.ReplaceAll(result, "<br/>", "\n")
|
||||||
|
result = strings.ReplaceAll(result, "<br />", "\n")
|
||||||
|
result = strings.ReplaceAll(result, "</p>", "\n")
|
||||||
|
result = strings.ReplaceAll(result, "</div>", "\n")
|
||||||
|
result = strings.ReplaceAll(result, "</tr>", "\n")
|
||||||
|
result = strings.ReplaceAll(result, "</li>", "\n")
|
||||||
|
|
||||||
|
// Decode HTML entities
|
||||||
|
result = decodeHTMLEntities(result)
|
||||||
|
|
||||||
|
// Remove all remaining tags
|
||||||
|
var builder strings.Builder
|
||||||
inTag := false
|
inTag := false
|
||||||
inStyle := false
|
for _, r := range result {
|
||||||
inScript := false
|
|
||||||
|
|
||||||
html = strings.ReplaceAll(html, "<br>", "\n")
|
|
||||||
html = strings.ReplaceAll(html, "<br/>", "\n")
|
|
||||||
html = strings.ReplaceAll(html, "<br />", "\n")
|
|
||||||
html = strings.ReplaceAll(html, "</p>", "\n")
|
|
||||||
html = strings.ReplaceAll(html, "</div>", "\n")
|
|
||||||
html = decodeHTMLEntities(html)
|
|
||||||
|
|
||||||
for _, r := range html {
|
|
||||||
switch {
|
switch {
|
||||||
case r == '<':
|
case r == '<':
|
||||||
inTag = true
|
inTag = true
|
||||||
case r == '>':
|
case r == '>':
|
||||||
inTag = false
|
inTag = false
|
||||||
case !inTag && !inStyle && !inScript:
|
case !inTag:
|
||||||
result.WriteRune(r)
|
builder.WriteRune(r)
|
||||||
}
|
|
||||||
|
|
||||||
// Track style/script blocks (simplified)
|
|
||||||
if inTag {
|
|
||||||
lower := strings.ToLower(string(r))
|
|
||||||
if strings.Contains(lower, "style") {
|
|
||||||
inStyle = true
|
|
||||||
}
|
|
||||||
if strings.Contains(lower, "/style") {
|
|
||||||
inStyle = false
|
|
||||||
}
|
|
||||||
if strings.Contains(lower, "script") {
|
|
||||||
inScript = true
|
|
||||||
}
|
|
||||||
if strings.Contains(lower, "/script") {
|
|
||||||
inScript = false
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Clean up whitespace
|
// Clean up whitespace
|
||||||
text := result.String()
|
text := builder.String()
|
||||||
lines := strings.Split(text, "\n")
|
lines := strings.Split(text, "\n")
|
||||||
var cleaned []string
|
var cleaned []string
|
||||||
for _, line := range lines {
|
for _, line := range lines {
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,194 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"database/sql"
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
_ "github.com/mattn/go-sqlite3"
|
||||||
|
)
|
||||||
|
|
||||||
|
// OrchestrationDB tracks message state without storing content
|
||||||
|
type OrchestrationDB struct {
|
||||||
|
db *sql.DB
|
||||||
|
}
|
||||||
|
|
||||||
|
// MessageState represents orchestration state for a message
|
||||||
|
type MessageState struct {
|
||||||
|
ID string `json:"id"` // "whatsapp:3A6034A970A5C44D17F9"
|
||||||
|
Source string `json:"source"` // "whatsapp", "tj_jongsma_me", etc.
|
||||||
|
SourceID string `json:"source_id"` // original ID at source
|
||||||
|
Mailbox string `json:"mailbox"` // "INBOX", null for WA
|
||||||
|
FirstSeenAt time.Time `json:"first_seen_at"`
|
||||||
|
LastAction string `json:"last_action,omitempty"` // "archive", "delete", "reply", "to-docs"
|
||||||
|
LastActionAt *time.Time `json:"last_action_at,omitempty"`
|
||||||
|
AckedBy string `json:"acked_by,omitempty"`
|
||||||
|
AckedAt *time.Time `json:"acked_at,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewOrchestrationDB opens or creates the orchestration database
|
||||||
|
func NewOrchestrationDB(path string) (*OrchestrationDB, error) {
|
||||||
|
db, err := sql.Open("sqlite3", path)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("open db: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create schema
|
||||||
|
schema := `
|
||||||
|
CREATE TABLE IF NOT EXISTS message_state (
|
||||||
|
id TEXT PRIMARY KEY,
|
||||||
|
source TEXT NOT NULL,
|
||||||
|
source_id TEXT NOT NULL,
|
||||||
|
mailbox TEXT,
|
||||||
|
first_seen_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||||
|
last_action TEXT,
|
||||||
|
last_action_at TIMESTAMP,
|
||||||
|
acked_by TEXT,
|
||||||
|
acked_at TIMESTAMP
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_source ON message_state(source, source_id);
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_pending ON message_state(last_action) WHERE last_action IS NULL;
|
||||||
|
`
|
||||||
|
|
||||||
|
if _, err := db.Exec(schema); err != nil {
|
||||||
|
return nil, fmt.Errorf("create schema: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &OrchestrationDB{db: db}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close closes the database
|
||||||
|
func (o *OrchestrationDB) Close() error {
|
||||||
|
return o.db.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
// RecordSeen marks a message as seen (first encounter)
|
||||||
|
func (o *OrchestrationDB) RecordSeen(id, source, sourceID, mailbox string) error {
|
||||||
|
_, err := o.db.Exec(`
|
||||||
|
INSERT INTO message_state (id, source, source_id, mailbox, first_seen_at)
|
||||||
|
VALUES (?, ?, ?, ?, ?)
|
||||||
|
ON CONFLICT(id) DO NOTHING
|
||||||
|
`, id, source, sourceID, mailbox, time.Now())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// RecordAction records an action taken on a message
|
||||||
|
func (o *OrchestrationDB) RecordAction(id, action string) error {
|
||||||
|
now := time.Now()
|
||||||
|
result, err := o.db.Exec(`
|
||||||
|
UPDATE message_state
|
||||||
|
SET last_action = ?, last_action_at = ?
|
||||||
|
WHERE id = ?
|
||||||
|
`, action, now, id)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
rows, _ := result.RowsAffected()
|
||||||
|
if rows == 0 {
|
||||||
|
// Message wasn't seen yet, insert with action
|
||||||
|
source, sourceID, _ := parseMessageID(id)
|
||||||
|
_, err = o.db.Exec(`
|
||||||
|
INSERT INTO message_state (id, source, source_id, mailbox, first_seen_at, last_action, last_action_at)
|
||||||
|
VALUES (?, ?, ?, ?, ?, ?, ?)
|
||||||
|
`, id, source, sourceID, "INBOX", time.Now(), action, now)
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// RecordAck records that a consumer acknowledged a message
|
||||||
|
func (o *OrchestrationDB) RecordAck(id, consumer string) error {
|
||||||
|
now := time.Now()
|
||||||
|
result, err := o.db.Exec(`
|
||||||
|
UPDATE message_state
|
||||||
|
SET acked_by = ?, acked_at = ?
|
||||||
|
WHERE id = ?
|
||||||
|
`, consumer, now, id)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
rows, _ := result.RowsAffected()
|
||||||
|
if rows == 0 {
|
||||||
|
source, sourceID, _ := parseMessageID(id)
|
||||||
|
_, err = o.db.Exec(`
|
||||||
|
INSERT INTO message_state (id, source, source_id, mailbox, first_seen_at, acked_by, acked_at)
|
||||||
|
VALUES (?, ?, ?, ?, ?, ?, ?)
|
||||||
|
`, id, source, sourceID, "INBOX", time.Now(), consumer, now)
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetState returns the state for a message
|
||||||
|
func (o *OrchestrationDB) GetState(id string) (*MessageState, error) {
|
||||||
|
row := o.db.QueryRow(`
|
||||||
|
SELECT id, source, source_id, mailbox, first_seen_at, last_action, last_action_at, acked_by, acked_at
|
||||||
|
FROM message_state WHERE id = ?
|
||||||
|
`, id)
|
||||||
|
|
||||||
|
var state MessageState
|
||||||
|
var lastAction, ackedBy sql.NullString
|
||||||
|
var lastActionAt, ackedAt sql.NullTime
|
||||||
|
var mailbox sql.NullString
|
||||||
|
|
||||||
|
err := row.Scan(&state.ID, &state.Source, &state.SourceID, &mailbox,
|
||||||
|
&state.FirstSeenAt, &lastAction, &lastActionAt, &ackedBy, &ackedAt)
|
||||||
|
if err == sql.ErrNoRows {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if mailbox.Valid {
|
||||||
|
state.Mailbox = mailbox.String
|
||||||
|
}
|
||||||
|
if lastAction.Valid {
|
||||||
|
state.LastAction = lastAction.String
|
||||||
|
}
|
||||||
|
if lastActionAt.Valid {
|
||||||
|
state.LastActionAt = &lastActionAt.Time
|
||||||
|
}
|
||||||
|
if ackedBy.Valid {
|
||||||
|
state.AckedBy = ackedBy.String
|
||||||
|
}
|
||||||
|
if ackedAt.Valid {
|
||||||
|
state.AckedAt = &ackedAt.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
return &state, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// HasAction checks if a message has been actioned (archived, deleted, etc.)
|
||||||
|
func (o *OrchestrationDB) HasAction(id string) bool {
|
||||||
|
var count int
|
||||||
|
o.db.QueryRow(`
|
||||||
|
SELECT COUNT(*) FROM message_state
|
||||||
|
WHERE id = ? AND last_action IS NOT NULL
|
||||||
|
`, id).Scan(&count)
|
||||||
|
return count > 0
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetPendingIDs returns message IDs that have no action recorded
|
||||||
|
func (o *OrchestrationDB) GetPendingIDs() ([]string, error) {
|
||||||
|
rows, err := o.db.Query(`
|
||||||
|
SELECT id FROM message_state
|
||||||
|
WHERE last_action IS NULL
|
||||||
|
ORDER BY first_seen_at DESC
|
||||||
|
`)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
var ids []string
|
||||||
|
for rows.Next() {
|
||||||
|
var id string
|
||||||
|
if err := rows.Scan(&id); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
ids = append(ids, id)
|
||||||
|
}
|
||||||
|
return ids, nil
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,148 @@
|
||||||
|
# Outbound Email Follow-Up Tracker
|
||||||
|
|
||||||
|
*Spec — February 14, 2026*
|
||||||
|
*Status: Parked, ready to build*
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Problem
|
||||||
|
|
||||||
|
Johan sends emails that require responses. If no reply comes, nobody notices until he manually remembers to check. Conversations also move to phone/WhatsApp/in-person — so "no email reply" doesn't always mean "not handled."
|
||||||
|
|
||||||
|
## Solution
|
||||||
|
|
||||||
|
Triage outgoing emails the same way we triage incoming: K2.5 reads each sent email and decides if it expects a reply. If yes, track it with a deadline. If the deadline passes with no reply, push one alert to Fully. Dismiss via Fully or auto-resolve on inbound reply.
|
||||||
|
|
||||||
|
## Architecture
|
||||||
|
|
||||||
|
Everything lives in MC. Fully is just the display layer.
|
||||||
|
|
||||||
|
### IMAP Changes
|
||||||
|
|
||||||
|
Add Sent folder polling to existing connectors:
|
||||||
|
- `tj_jongsma_me` → poll `Sent` alongside `INBOX`
|
||||||
|
- `johan_jongsma_me` → poll `Sent` alongside `INBOX`
|
||||||
|
|
||||||
|
Tag outgoing messages with `direction: "outbound"` in MC's message store (inbound messages get `direction: "inbound"` or remain untagged for backwards compat).
|
||||||
|
|
||||||
|
### Schema: `follow_ups` table (orchestration.db)
|
||||||
|
|
||||||
|
```sql
|
||||||
|
CREATE TABLE follow_ups (
|
||||||
|
id TEXT PRIMARY KEY, -- MC message ID of the outbound email
|
||||||
|
to_address TEXT NOT NULL, -- recipient email
|
||||||
|
to_name TEXT, -- recipient display name
|
||||||
|
subject TEXT, -- email subject
|
||||||
|
sent_at TEXT NOT NULL, -- ISO timestamp
|
||||||
|
deadline_hours INTEGER DEFAULT 48, -- expected response window
|
||||||
|
expects_reply INTEGER DEFAULT 1, -- 1=yes, 0=no (triage decided no)
|
||||||
|
reply_received INTEGER DEFAULT 0, -- auto-set on inbound match
|
||||||
|
reply_message_id TEXT, -- MC message ID of the inbound reply
|
||||||
|
alerted_at TEXT, -- timestamp when Fully alert was pushed (NULL = not yet)
|
||||||
|
dismissed INTEGER DEFAULT 0, -- set via Fully callback
|
||||||
|
dismissed_at TEXT, -- timestamp
|
||||||
|
created_at TEXT DEFAULT (datetime('now'))
|
||||||
|
);
|
||||||
|
```
|
||||||
|
|
||||||
|
### Triage (K2.5)
|
||||||
|
|
||||||
|
Same triage engine, second pass for outbound messages. Prompt addition:
|
||||||
|
|
||||||
|
```
|
||||||
|
For OUTBOUND emails, determine:
|
||||||
|
1. Does this email expect a reply? (question, request, proposal, scheduling = yes. FYI, acknowledgment, thank you = no)
|
||||||
|
2. How urgent is the expected reply?
|
||||||
|
- urgent: 24h (time-sensitive, medical, financial)
|
||||||
|
- normal: 48h (business correspondence, requests)
|
||||||
|
- low: 7d (informational, low-priority asks)
|
||||||
|
3. Output: { "expects_reply": true/false, "deadline_hours": 24|48|168 }
|
||||||
|
```
|
||||||
|
|
||||||
|
### Alert Flow
|
||||||
|
|
||||||
|
**Check runs every hour (cron or heartbeat):**
|
||||||
|
|
||||||
|
```
|
||||||
|
SELECT * FROM follow_ups
|
||||||
|
WHERE expects_reply = 1
|
||||||
|
AND reply_received = 0
|
||||||
|
AND dismissed = 0
|
||||||
|
AND alerted_at IS NULL
|
||||||
|
AND datetime(sent_at, '+' || deadline_hours || ' hours') < datetime('now')
|
||||||
|
```
|
||||||
|
|
||||||
|
For each result:
|
||||||
|
1. POST to Fully: `{ "message": "🟠 No reply from {to_name} re: {subject} — sent {N} days ago", "priority": "warning" }`
|
||||||
|
2. SET `alerted_at = now()`
|
||||||
|
3. **Never alert again for this follow-up**
|
||||||
|
|
||||||
|
### Auto-Resolve
|
||||||
|
|
||||||
|
When an inbound email arrives, check if sender matches any active follow-up:
|
||||||
|
|
||||||
|
```
|
||||||
|
UPDATE follow_ups
|
||||||
|
SET reply_received = 1, reply_message_id = ?
|
||||||
|
WHERE to_address = ?
|
||||||
|
AND reply_received = 0
|
||||||
|
AND dismissed = 0
|
||||||
|
ORDER BY sent_at DESC
|
||||||
|
LIMIT 1
|
||||||
|
```
|
||||||
|
|
||||||
|
Match by email address. Thread matching (In-Reply-To header) is better but address matching covers 90% of cases and is simpler.
|
||||||
|
|
||||||
|
### Fully Dismiss Callback
|
||||||
|
|
||||||
|
When a follow-up alert is dismissed on Fully (long-press done or ×):
|
||||||
|
|
||||||
|
Alert dashboard POSTs to MC:
|
||||||
|
```
|
||||||
|
POST /api/follow-ups/{id}/dismiss
|
||||||
|
```
|
||||||
|
|
||||||
|
MC sets `dismissed = 1, dismissed_at = now()`.
|
||||||
|
|
||||||
|
This requires:
|
||||||
|
1. Follow-up alerts include the MC follow-up ID in their metadata
|
||||||
|
2. Alert dashboard `removeAlert` / done handler checks for follow-up ID and calls MC
|
||||||
|
|
||||||
|
### Auto-Expire
|
||||||
|
|
||||||
|
Follow-ups that were alerted but not dismissed auto-expire after 14 days:
|
||||||
|
|
||||||
|
```
|
||||||
|
UPDATE follow_ups SET dismissed = 1
|
||||||
|
WHERE alerted_at IS NOT NULL
|
||||||
|
AND dismissed = 0
|
||||||
|
AND datetime(alerted_at, '+14 days') < datetime('now')
|
||||||
|
```
|
||||||
|
|
||||||
|
Runs on the same hourly check.
|
||||||
|
|
||||||
|
## Edge Cases
|
||||||
|
|
||||||
|
- **Reply via WhatsApp/phone:** Johan dismisses the Fully alert. That's the signal.
|
||||||
|
- **Multiple emails to same person:** Each gets its own follow-up. Inbound reply resolves the most recent one.
|
||||||
|
- **CC/BCC recipients:** Only track the TO address. CC'd people rarely owe a reply.
|
||||||
|
- **Auto-replies/OOO:** Don't count as a real reply. Triage can detect these.
|
||||||
|
- **Johan replies again (bump):** If Johan sends a second email to the same person on the same thread, reset the deadline on the existing follow-up.
|
||||||
|
|
||||||
|
## Implementation Order
|
||||||
|
|
||||||
|
1. Add Sent folder to IMAP connectors
|
||||||
|
2. Add `direction` field to MC message model
|
||||||
|
3. Create `follow_ups` table
|
||||||
|
4. Add outbound triage prompt to K2.5
|
||||||
|
5. Add follow-up check to hourly cron / heartbeat
|
||||||
|
6. Add auto-resolve on inbound match
|
||||||
|
7. Add dismiss callback endpoint in MC
|
||||||
|
8. Wire alert dashboard to call MC on dismiss
|
||||||
|
9. Add auto-expire cleanup
|
||||||
|
|
||||||
|
## Not In Scope (Yet)
|
||||||
|
|
||||||
|
- WhatsApp outbound tracking (could add later, same pattern)
|
||||||
|
- Calendar meeting follow-ups (different trigger, same tracker)
|
||||||
|
- Priority escalation (if 2x deadline passes, escalate from 🟠 to 🔴)
|
||||||
|
|
@ -0,0 +1,339 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"sort"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// MessageStore manages unified messages and cursor tracking
|
||||||
|
type MessageStore struct {
|
||||||
|
connectors map[string]Connector
|
||||||
|
cursors map[string]time.Time // consumer -> high-water mark
|
||||||
|
mu sync.RWMutex
|
||||||
|
dataDir string
|
||||||
|
}
|
||||||
|
|
||||||
|
// CursorFile stores cursor positions persistently
|
||||||
|
type CursorFile struct {
|
||||||
|
Cursors map[string]time.Time `json:"cursors"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewMessageStore creates a new message store
|
||||||
|
func NewMessageStore(dataDir string) *MessageStore {
|
||||||
|
store := &MessageStore{
|
||||||
|
connectors: make(map[string]Connector),
|
||||||
|
cursors: make(map[string]time.Time),
|
||||||
|
dataDir: dataDir,
|
||||||
|
}
|
||||||
|
store.loadCursors()
|
||||||
|
return store
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *MessageStore) loadCursors() {
|
||||||
|
path := filepath.Join(s.dataDir, "cursors.json")
|
||||||
|
data, err := os.ReadFile(path)
|
||||||
|
if err != nil {
|
||||||
|
return // File doesn't exist yet
|
||||||
|
}
|
||||||
|
|
||||||
|
var cf CursorFile
|
||||||
|
if err := json.Unmarshal(data, &cf); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
s.cursors = cf.Cursors
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *MessageStore) saveCursors() error {
|
||||||
|
if s.dataDir == "" {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
path := filepath.Join(s.dataDir, "cursors.json")
|
||||||
|
cf := CursorFile{Cursors: s.cursors}
|
||||||
|
|
||||||
|
data, err := json.MarshalIndent(cf, "", " ")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return os.WriteFile(path, data, 0644)
|
||||||
|
}
|
||||||
|
|
||||||
|
// RegisterConnector adds a connector to the store
|
||||||
|
func (s *MessageStore) RegisterConnector(c Connector) {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
s.connectors[c.Name()] = c
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetConnector returns a connector by name
|
||||||
|
func (s *MessageStore) GetConnector(name string) (Connector, bool) {
|
||||||
|
s.mu.RLock()
|
||||||
|
defer s.mu.RUnlock()
|
||||||
|
c, ok := s.connectors[name]
|
||||||
|
return c, ok
|
||||||
|
}
|
||||||
|
|
||||||
|
// FetchNew returns unseen messages from all connectors
|
||||||
|
func (s *MessageStore) FetchNew() ([]UnifiedMessage, error) {
|
||||||
|
s.mu.RLock()
|
||||||
|
connectors := make([]Connector, 0, len(s.connectors))
|
||||||
|
for _, c := range s.connectors {
|
||||||
|
connectors = append(connectors, c)
|
||||||
|
}
|
||||||
|
s.mu.RUnlock()
|
||||||
|
|
||||||
|
var allMessages []UnifiedMessage
|
||||||
|
for _, c := range connectors {
|
||||||
|
msgs, err := c.FetchNew()
|
||||||
|
if err != nil {
|
||||||
|
// Log but continue with other connectors
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
allMessages = append(allMessages, msgs...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sort by timestamp descending (newest first)
|
||||||
|
sort.Slice(allMessages, func(i, j int) bool {
|
||||||
|
return allMessages[i].Timestamp.After(allMessages[j].Timestamp)
|
||||||
|
})
|
||||||
|
|
||||||
|
return allMessages, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// FetchSince returns messages since the given duration
|
||||||
|
func (s *MessageStore) FetchSince(duration time.Duration) ([]UnifiedMessage, error) {
|
||||||
|
since := time.Now().Add(-duration)
|
||||||
|
return s.FetchSinceTime(since)
|
||||||
|
}
|
||||||
|
|
||||||
|
// FetchSinceTime returns messages since the given time
|
||||||
|
func (s *MessageStore) FetchSinceTime(since time.Time) ([]UnifiedMessage, error) {
|
||||||
|
s.mu.RLock()
|
||||||
|
connectors := make([]Connector, 0, len(s.connectors))
|
||||||
|
for _, c := range s.connectors {
|
||||||
|
connectors = append(connectors, c)
|
||||||
|
}
|
||||||
|
s.mu.RUnlock()
|
||||||
|
|
||||||
|
var allMessages []UnifiedMessage
|
||||||
|
for _, c := range connectors {
|
||||||
|
msgs, err := c.FetchSince(since)
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
allMessages = append(allMessages, msgs...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sort by timestamp descending
|
||||||
|
sort.Slice(allMessages, func(i, j int) bool {
|
||||||
|
return allMessages[i].Timestamp.After(allMessages[j].Timestamp)
|
||||||
|
})
|
||||||
|
|
||||||
|
return allMessages, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// FetchOne returns a single message by ID (source:uid format)
|
||||||
|
func (s *MessageStore) FetchOne(id string) (*UnifiedMessage, error) {
|
||||||
|
source, sourceID, err := parseMessageID(id)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
s.mu.RLock()
|
||||||
|
c, ok := s.connectors[source]
|
||||||
|
s.mu.RUnlock()
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("unknown source: %s", source)
|
||||||
|
}
|
||||||
|
|
||||||
|
return c.FetchOne(sourceID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ack advances the cursor for a consumer
|
||||||
|
func (s *MessageStore) Ack(consumer string, timestamp time.Time) error {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
|
s.cursors[consumer] = timestamp
|
||||||
|
return s.saveCursors()
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetCursor returns the cursor position for a consumer
|
||||||
|
func (s *MessageStore) GetCursor(consumer string) time.Time {
|
||||||
|
s.mu.RLock()
|
||||||
|
defer s.mu.RUnlock()
|
||||||
|
return s.cursors[consumer]
|
||||||
|
}
|
||||||
|
|
||||||
|
// Archive archives a message
|
||||||
|
func (s *MessageStore) Archive(id string) error {
|
||||||
|
source, sourceID, err := parseMessageID(id)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
s.mu.RLock()
|
||||||
|
c, ok := s.connectors[source]
|
||||||
|
s.mu.RUnlock()
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("unknown source: %s", source)
|
||||||
|
}
|
||||||
|
|
||||||
|
return c.Archive(sourceID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete deletes a message
|
||||||
|
func (s *MessageStore) Delete(id string) error {
|
||||||
|
source, sourceID, err := parseMessageID(id)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
s.mu.RLock()
|
||||||
|
c, ok := s.connectors[source]
|
||||||
|
s.mu.RUnlock()
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("unknown source: %s", source)
|
||||||
|
}
|
||||||
|
|
||||||
|
return c.Delete(sourceID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reply sends a reply to a message
|
||||||
|
func (s *MessageStore) Reply(id string, body string, attachments []string) error {
|
||||||
|
source, sourceID, err := parseMessageID(id)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
s.mu.RLock()
|
||||||
|
c, ok := s.connectors[source]
|
||||||
|
s.mu.RUnlock()
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("unknown source: %s", source)
|
||||||
|
}
|
||||||
|
|
||||||
|
return c.Reply(sourceID, body, attachments)
|
||||||
|
}
|
||||||
|
|
||||||
|
// MarkSeen marks a message as seen
|
||||||
|
func (s *MessageStore) MarkSeen(id string) error {
|
||||||
|
source, sourceID, err := parseMessageID(id)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
s.mu.RLock()
|
||||||
|
c, ok := s.connectors[source]
|
||||||
|
s.mu.RUnlock()
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("unknown source: %s", source)
|
||||||
|
}
|
||||||
|
|
||||||
|
return c.MarkSeen(sourceID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetAttachment retrieves attachment content
|
||||||
|
func (s *MessageStore) GetAttachment(id string, filename string) ([]byte, error) {
|
||||||
|
source, sourceID, err := parseMessageID(id)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
s.mu.RLock()
|
||||||
|
c, ok := s.connectors[source]
|
||||||
|
s.mu.RUnlock()
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("unknown source: %s", source)
|
||||||
|
}
|
||||||
|
|
||||||
|
return c.GetAttachment(sourceID, filename)
|
||||||
|
}
|
||||||
|
|
||||||
|
// StartAll starts all connectors with a unified callback
|
||||||
|
func (s *MessageStore) StartAll(callback func()) error {
|
||||||
|
s.mu.RLock()
|
||||||
|
defer s.mu.RUnlock()
|
||||||
|
|
||||||
|
for _, c := range s.connectors {
|
||||||
|
if err := c.Start(callback); err != nil {
|
||||||
|
return fmt.Errorf("start %s: %w", c.Name(), err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// StartAllWithCallbacks starts connectors with different callbacks for email vs other connectors
|
||||||
|
func (s *MessageStore) StartAllWithCallbacks(emailCallback, otherCallback func()) error {
|
||||||
|
s.mu.RLock()
|
||||||
|
defer s.mu.RUnlock()
|
||||||
|
|
||||||
|
for _, c := range s.connectors {
|
||||||
|
cb := otherCallback
|
||||||
|
if _, isEmail := c.(*EmailConnector); isEmail {
|
||||||
|
cb = emailCallback
|
||||||
|
}
|
||||||
|
if err := c.Start(cb); err != nil {
|
||||||
|
return fmt.Errorf("start %s: %w", c.Name(), err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// StopAll stops all connectors
|
||||||
|
func (s *MessageStore) StopAll() {
|
||||||
|
s.mu.RLock()
|
||||||
|
defer s.mu.RUnlock()
|
||||||
|
|
||||||
|
for _, c := range s.connectors {
|
||||||
|
c.Stop()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// parseMessageID splits "source:uid" into components
|
||||||
|
func parseMessageID(id string) (source, sourceID string, err error) {
|
||||||
|
for i := 0; i < len(id); i++ {
|
||||||
|
if id[i] == ':' {
|
||||||
|
return id[:i], id[i+1:], nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return "", "", fmt.Errorf("invalid message ID format: %s (expected source:uid)", id)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ParseDuration parses duration strings like "24h", "7d", "1w"
|
||||||
|
func ParseDuration(s string) (time.Duration, error) {
|
||||||
|
if len(s) == 0 {
|
||||||
|
return 0, fmt.Errorf("empty duration")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check for special suffixes
|
||||||
|
last := s[len(s)-1]
|
||||||
|
switch last {
|
||||||
|
case 'd': // days
|
||||||
|
var n int
|
||||||
|
if _, err := fmt.Sscanf(s, "%dd", &n); err == nil {
|
||||||
|
return time.Duration(n) * 24 * time.Hour, nil
|
||||||
|
}
|
||||||
|
case 'w': // weeks
|
||||||
|
var n int
|
||||||
|
if _, err := fmt.Sscanf(s, "%dw", &n); err == nil {
|
||||||
|
return time.Duration(n) * 7 * 24 * time.Hour, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fall back to standard Go duration parsing
|
||||||
|
return time.ParseDuration(s)
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,56 @@
|
||||||
|
#!/bin/bash
|
||||||
|
# Test script for Message Center API
|
||||||
|
|
||||||
|
BASE_URL="${1:-http://localhost:8025}"
|
||||||
|
|
||||||
|
echo "Testing Message Center at $BASE_URL"
|
||||||
|
echo "=================================="
|
||||||
|
|
||||||
|
# Health check
|
||||||
|
echo -n "Health check: "
|
||||||
|
curl -sf "$BASE_URL/health" && echo " ✓" || echo " ✗"
|
||||||
|
|
||||||
|
# List new messages
|
||||||
|
echo ""
|
||||||
|
echo "GET /messages/new:"
|
||||||
|
curl -sf "$BASE_URL/messages/new" | jq '.[0:2]' 2>/dev/null || echo "No messages or error"
|
||||||
|
|
||||||
|
# List recent messages
|
||||||
|
echo ""
|
||||||
|
echo "GET /messages?since=1h:"
|
||||||
|
curl -sf "$BASE_URL/messages?since=1h" | jq 'length' 2>/dev/null | xargs -I{} echo "{} messages" || echo "Error"
|
||||||
|
|
||||||
|
# Test since with different units
|
||||||
|
echo ""
|
||||||
|
echo "Testing since parameter parsing:"
|
||||||
|
for unit in "1h" "24h" "7d" "1w"; do
|
||||||
|
count=$(curl -sf "$BASE_URL/messages?since=$unit" | jq 'length' 2>/dev/null || echo "err")
|
||||||
|
echo " since=$unit: $count messages"
|
||||||
|
done
|
||||||
|
|
||||||
|
# Get single message (if available)
|
||||||
|
echo ""
|
||||||
|
echo "GET /messages/{id} (first available):"
|
||||||
|
first_id=$(curl -sf "$BASE_URL/messages/new" | jq -r '.[0].id // empty' 2>/dev/null)
|
||||||
|
if [ -n "$first_id" ]; then
|
||||||
|
echo " Fetching: $first_id"
|
||||||
|
curl -sf "$BASE_URL/messages/$first_id" | jq '{id, source, from, subject}' 2>/dev/null || echo " Error"
|
||||||
|
else
|
||||||
|
echo " No messages available"
|
||||||
|
fi
|
||||||
|
|
||||||
|
# Test cursor API
|
||||||
|
echo ""
|
||||||
|
echo "POST /messages/ack:"
|
||||||
|
curl -sf -X POST "$BASE_URL/messages/ack" \
|
||||||
|
-H "Content-Type: application/json" \
|
||||||
|
-d '{"consumer": "test", "timestamp": "2026-02-02T12:00:00Z"}' | jq . 2>/dev/null || echo "Error"
|
||||||
|
|
||||||
|
# Legacy endpoints
|
||||||
|
echo ""
|
||||||
|
echo "GET /accounts (legacy):"
|
||||||
|
curl -sf "$BASE_URL/accounts" | jq . 2>/dev/null || echo "Error"
|
||||||
|
|
||||||
|
echo ""
|
||||||
|
echo "=================================="
|
||||||
|
echo "Tests complete"
|
||||||
|
|
@ -0,0 +1,316 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"log"
|
||||||
|
"net/http"
|
||||||
|
"os"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TriageConfig holds LLM triage configuration
|
||||||
|
type TriageConfig struct {
|
||||||
|
Enabled bool `yaml:"enabled"`
|
||||||
|
PromptFile string `yaml:"prompt_file"`
|
||||||
|
Provider ProviderConfig `yaml:"provider"`
|
||||||
|
DashboardURL string `yaml:"dashboard_url"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type ProviderConfig struct {
|
||||||
|
BaseURL string `yaml:"base_url"`
|
||||||
|
APIKey string `yaml:"api_key"`
|
||||||
|
Model string `yaml:"model"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// TriageResult is the expected LLM response
|
||||||
|
type TriageResult struct {
|
||||||
|
Action string `json:"action"` // junk, pass
|
||||||
|
Reason string `json:"reason"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// triageLogEntry is a structured log entry for triage decisions
|
||||||
|
type triageLogEntry struct {
|
||||||
|
Ts string `json:"ts"`
|
||||||
|
ID string `json:"id"`
|
||||||
|
From string `json:"from"`
|
||||||
|
Subject string `json:"subject"`
|
||||||
|
Action string `json:"action"`
|
||||||
|
Reason string `json:"reason"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// triageEmail runs LLM triage on an email message.
|
||||||
|
// Returns the triage result, or nil if triage should be skipped (fall through to webhook).
|
||||||
|
func triageEmail(msg UnifiedMessage, cfg TriageConfig) *TriageResult {
|
||||||
|
// Read prompt file fresh every time
|
||||||
|
promptBytes, err := os.ReadFile(cfg.PromptFile)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("[triage] Failed to read prompt file %s: %v — falling through to webhook", cfg.PromptFile, err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
systemPrompt := string(promptBytes)
|
||||||
|
|
||||||
|
// Build user message with structured email data
|
||||||
|
attachmentNames := make([]string, 0, len(msg.Attachments))
|
||||||
|
for _, a := range msg.Attachments {
|
||||||
|
attachmentNames = append(attachmentNames, a.Name)
|
||||||
|
}
|
||||||
|
|
||||||
|
userMsg := fmt.Sprintf(`From: %s (%s)
|
||||||
|
To: %s
|
||||||
|
Account: %s
|
||||||
|
Subject: %s
|
||||||
|
Date: %s
|
||||||
|
Has Attachments: %v
|
||||||
|
Attachment Names: %s
|
||||||
|
|
||||||
|
Body:
|
||||||
|
%s`,
|
||||||
|
msg.From, msg.FromName,
|
||||||
|
msg.To,
|
||||||
|
msg.Source,
|
||||||
|
msg.Subject,
|
||||||
|
msg.Timestamp.Format(time.RFC1123),
|
||||||
|
len(msg.Attachments) > 0,
|
||||||
|
strings.Join(attachmentNames, ", "),
|
||||||
|
msg.Body,
|
||||||
|
)
|
||||||
|
|
||||||
|
// Truncate body to avoid huge payloads
|
||||||
|
if len(userMsg) > 12000 {
|
||||||
|
userMsg = userMsg[:12000] + "\n...[truncated]"
|
||||||
|
}
|
||||||
|
|
||||||
|
// Call LLM
|
||||||
|
result, err := callLLM(cfg.Provider, systemPrompt, userMsg)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("[triage] LLM call failed: %v — falling through to webhook", err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
// callLLM calls the OpenAI-compatible API
|
||||||
|
func callLLM(provider ProviderConfig, systemPrompt, userMessage string) (*TriageResult, error) {
|
||||||
|
reqBody := map[string]interface{}{
|
||||||
|
"model": provider.Model,
|
||||||
|
"messages": []map[string]string{
|
||||||
|
{"role": "system", "content": systemPrompt},
|
||||||
|
{"role": "user", "content": userMessage},
|
||||||
|
},
|
||||||
|
"temperature": 0.1,
|
||||||
|
"max_tokens": 256,
|
||||||
|
"response_format": map[string]string{"type": "json_object"},
|
||||||
|
}
|
||||||
|
|
||||||
|
data, err := json.Marshal(reqBody)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("marshal request: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
url := strings.TrimRight(provider.BaseURL, "/") + "/chat/completions"
|
||||||
|
req, err := http.NewRequest("POST", url, bytes.NewReader(data))
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("create request: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
req.Header.Set("Content-Type", "application/json")
|
||||||
|
req.Header.Set("Authorization", "Bearer "+provider.APIKey)
|
||||||
|
|
||||||
|
client := &http.Client{Timeout: 30 * time.Second}
|
||||||
|
resp, err := client.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("send request: %w", err)
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
body, err := io.ReadAll(resp.Body)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("read response: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if resp.StatusCode != 200 {
|
||||||
|
return nil, fmt.Errorf("API error %d: %s", resp.StatusCode, string(body))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Parse OpenAI-compatible response
|
||||||
|
var apiResp struct {
|
||||||
|
Choices []struct {
|
||||||
|
Message struct {
|
||||||
|
Content string `json:"content"`
|
||||||
|
} `json:"message"`
|
||||||
|
} `json:"choices"`
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := json.Unmarshal(body, &apiResp); err != nil {
|
||||||
|
return nil, fmt.Errorf("parse API response: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(apiResp.Choices) == 0 {
|
||||||
|
return nil, fmt.Errorf("no choices in API response")
|
||||||
|
}
|
||||||
|
|
||||||
|
content := apiResp.Choices[0].Message.Content
|
||||||
|
|
||||||
|
// Parse the triage result JSON
|
||||||
|
var result TriageResult
|
||||||
|
if err := json.Unmarshal([]byte(content), &result); err != nil {
|
||||||
|
return nil, fmt.Errorf("parse triage result %q: %w", content, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Validate action
|
||||||
|
switch result.Action {
|
||||||
|
case "junk", "pass":
|
||||||
|
// valid
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("unknown triage action: %q", result.Action)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// executeTriageAction performs the action decided by the LLM
|
||||||
|
func executeTriageAction(msg UnifiedMessage, result TriageResult) error {
|
||||||
|
switch result.Action {
|
||||||
|
case "junk":
|
||||||
|
if err := store.Delete(msg.ID); err != nil {
|
||||||
|
return fmt.Errorf("junk delete: %w", err)
|
||||||
|
}
|
||||||
|
orch.RecordAction(msg.ID, "delete")
|
||||||
|
case "pass":
|
||||||
|
// Leave for webhook — record as seen so it shows in /messages/new for the agent
|
||||||
|
source, sourceID, _ := parseMessageID(msg.ID)
|
||||||
|
orch.RecordSeen(msg.ID, source, sourceID, "INBOX")
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// logTriageDecision appends a structured log entry to the triage log file
|
||||||
|
func logTriageDecision(msg UnifiedMessage, result TriageResult) {
|
||||||
|
logPath := os.ExpandEnv("$HOME") + "/.message-center/triage-log.jsonl"
|
||||||
|
|
||||||
|
sender := msg.From
|
||||||
|
if msg.FromName != "" {
|
||||||
|
sender = msg.FromName + " <" + msg.From + ">"
|
||||||
|
}
|
||||||
|
|
||||||
|
entry := triageLogEntry{
|
||||||
|
Ts: time.Now().UTC().Format(time.RFC3339),
|
||||||
|
ID: msg.ID,
|
||||||
|
From: sender,
|
||||||
|
Subject: msg.Subject,
|
||||||
|
Action: result.Action,
|
||||||
|
Reason: result.Reason,
|
||||||
|
}
|
||||||
|
|
||||||
|
data, err := json.Marshal(entry)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("[triage] Failed to marshal log entry: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
f, err := os.OpenFile(logPath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("[triage] Failed to open triage log %s: %v", logPath, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer f.Close()
|
||||||
|
|
||||||
|
f.Write(append(data, '\n'))
|
||||||
|
}
|
||||||
|
|
||||||
|
// logTriageToDashboard posts a log entry to the alert dashboard
|
||||||
|
func logTriageToDashboard(dashboardURL string, msg UnifiedMessage, result TriageResult) {
|
||||||
|
if dashboardURL == "" {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
sender := msg.FromName
|
||||||
|
if sender == "" {
|
||||||
|
sender = msg.From
|
||||||
|
}
|
||||||
|
|
||||||
|
priority := "info"
|
||||||
|
if result.Action == "escalate" {
|
||||||
|
priority = "warning"
|
||||||
|
}
|
||||||
|
|
||||||
|
alert := map[string]string{
|
||||||
|
"message": fmt.Sprintf("📧 [%s] %s: %s — %s", result.Action, sender, msg.Subject, result.Reason),
|
||||||
|
"priority": priority,
|
||||||
|
}
|
||||||
|
|
||||||
|
data, _ := json.Marshal(alert)
|
||||||
|
resp, err := http.Post(dashboardURL, "application/json", bytes.NewReader(data))
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("[triage] Dashboard alert failed: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
resp.Body.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
// triageNewEmails fetches new emails from a specific account, triages them,
|
||||||
|
// and returns true if any messages should escalate (fire webhook).
|
||||||
|
func triageNewEmails(accountName string, cfg TriageConfig) bool {
|
||||||
|
c, ok := store.GetConnector(accountName)
|
||||||
|
if !ok {
|
||||||
|
log.Printf("[triage] Connector %s not found", accountName)
|
||||||
|
return true // fall through to webhook
|
||||||
|
}
|
||||||
|
|
||||||
|
// Only triage email connectors
|
||||||
|
_, isEmail := c.(*EmailConnector)
|
||||||
|
if !isEmail {
|
||||||
|
return true // non-email connectors always webhook
|
||||||
|
}
|
||||||
|
|
||||||
|
msgs, err := c.FetchNew()
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("[triage] Failed to fetch new from %s: %v", accountName, err)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(msgs) == 0 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
shouldEscalate := false
|
||||||
|
|
||||||
|
for _, msg := range msgs {
|
||||||
|
// Skip already-actioned messages
|
||||||
|
if orch.HasAction(msg.ID) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("[triage] Processing %s: %s from %s", msg.ID, msg.Subject, msg.From)
|
||||||
|
|
||||||
|
result := triageEmail(msg, cfg)
|
||||||
|
if result == nil {
|
||||||
|
// Triage failed — escalate this message
|
||||||
|
shouldEscalate = true
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("[triage] %s → %s: %s", msg.ID, result.Action, result.Reason)
|
||||||
|
|
||||||
|
if err := executeTriageAction(msg, *result); err != nil {
|
||||||
|
log.Printf("[triage] Action failed for %s: %v — escalating", msg.ID, err)
|
||||||
|
shouldEscalate = true
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
logTriageDecision(msg, *result)
|
||||||
|
logTriageToDashboard(cfg.DashboardURL, msg, *result)
|
||||||
|
|
||||||
|
if result.Action == "pass" {
|
||||||
|
shouldEscalate = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return shouldEscalate
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue