Initial mail-agent implementation

Full IMAP-based email triage system with multi-tier escalation.

Components:
- IMAP client with IDLE push notifications
- Email parsing with RFC 2047 header decoding
- FastAPI REST API (accounts, messages, SSE events)
- Triage pipeline (L1/L2/L3) with configurable rules
- Shipping dashboard integration
- Unsubscribe link detection

Pre-configured for Proton Bridge (tj@jongsma.me).
This commit is contained in:
James 2026-01-31 01:09:00 +00:00
commit 5d9e58f5ae
29 changed files with 2882 additions and 0 deletions

1
.venv/bin/python Symbolic link
View File

@ -0,0 +1 @@
python3

1
.venv/bin/python3 Symbolic link
View File

@ -0,0 +1 @@
/usr/bin/python3

1
.venv/bin/python3.12 Symbolic link
View File

@ -0,0 +1 @@
python3

1
.venv/lib64 Symbolic link
View File

@ -0,0 +1 @@
lib

5
.venv/pyvenv.cfg Normal file
View File

@ -0,0 +1,5 @@
home = /usr/bin
include-system-site-packages = false
version = 3.12.3
executable = /usr/bin/python3.12
command = /usr/bin/python3 -m venv /home/johan/dev/mail-agent/.venv

152
README.md Normal file
View File

@ -0,0 +1,152 @@
# Mail Agent
IMAP-based email triage with multi-tier escalation.
## Overview
Mail Agent monitors IMAP accounts for new mail and automatically triages messages:
- **L1 (Cheap Model):** Fast classification using Fireworks llama-v3p1-8b
- Spam → delete
- Newsletter/receipt → archive
- Shipping → dashboard + archive
- Uncertain → escalate to L2
- **L2 (James/Opus):** Review via Clawdbot Gateway
- Full context review
- Can draft replies or escalate
- **L3 (Johan):** Signal notification
- Important stuff only
- Human decision required
## Quick Start
```bash
# Create virtual environment
python3 -m venv .venv
source .venv/bin/activate
# Install dependencies
pip install -r requirements.txt
# Set environment variables
export FIREWORKS_API_KEY=your-api-key
export PROTON_BRIDGE_PASSWORD=BlcMCKtNDfqv0cq1LmGR9g
# Run
python -m src.main
```
## Configuration
Edit `config.yaml`:
```yaml
server:
host: 127.0.0.1
port: 8025
accounts:
proton:
host: 127.0.0.1
port: 1143
username: tj@jongsma.me
password: ${PROTON_BRIDGE_PASSWORD}
tls: starttls
folders:
watch: [INBOX]
archive: Archive
triage:
enabled: true
l1:
provider: fireworks
model: accounts/fireworks/models/llama-v3p1-8b-instruct
api_key: ${FIREWORKS_API_KEY}
```
## API Endpoints
### Accounts
- `GET /accounts` - List all configured accounts
- `GET /accounts/{id}` - Get account details
- `GET /accounts/{id}/mailboxes` - List folders
### Messages
- `GET /accounts/{id}/messages?folder=INBOX&unread=true` - List messages
- `GET /accounts/{id}/messages/{uid}?folder=INBOX` - Get full message
- `PATCH /accounts/{id}/messages/{uid}` - Update flags/move
- `DELETE /accounts/{id}/messages/{uid}` - Delete message
### Events (SSE)
- `GET /accounts/{id}/events?folder=INBOX` - Subscribe to new mail events
## Systemd Service
```bash
# Copy service file
cp systemd/mail-agent.service ~/.config/systemd/user/
# Edit to add FIREWORKS_API_KEY
systemctl --user edit mail-agent.service
# Enable and start
systemctl --user enable mail-agent
systemctl --user start mail-agent
# Check status
systemctl --user status mail-agent
journalctl --user -u mail-agent -f
```
## Architecture
```
New Mail (IMAP IDLE)
┌─────────────────────┐
│ L1: Cheap Model │ ~$0.20/1M tokens
│ Fast classification│
└─────────────────────┘
▼ (uncertain/important)
┌─────────────────────┐
│ L2: James (Opus) │ via Gateway
│ Context review │
└─────────────────────┘
▼ (needs human)
┌─────────────────────┐
│ L3: Johan │ Signal notification
└─────────────────────┘
```
## Shipping Dashboard
Shipping notifications are automatically posted to the James Dashboard:
```
POST http://100.123.216.65:9200/api/news
{
"title": "📦 E3-1275 Server",
"body": "Picked up by UPS. Expected Feb 3rd.",
"type": "info",
"source": "shipping"
}
```
## Development
```bash
# Run with auto-reload
uvicorn src.main:app --reload --host 127.0.0.1 --port 8025
# Test IMAP connection
python -c "from src.imap import ImapClient; from src.config import get_config; c=get_config(); client=ImapClient('proton', c.accounts['proton']); client.connect(); print(client.list_mailboxes())"
```
## License
Proprietary - Johan Jongsma

353
SPEC.md Normal file
View File

@ -0,0 +1,353 @@
# Mail Agent Specification
## Overview
IMAP-based email triage agent with multi-tier escalation. Runs as a service, processes incoming mail, auto-handles obvious cases, escalates uncertain ones.
## Architecture
```
New Mail (IMAP IDLE push)
┌─────────────────────────┐
│ L1: Cheap Model │ Fireworks llama-v3p1-8b-instruct
│ - Spam → delete │
│ - Newsletter → archive │
│ - Receipt → archive │
│ - Obvious junk → gone │
│ - Uncertain → L2 │
└─────────────────────────┘
┌─────────────────────────┐
│ L2: James (Opus) │ via Gateway
│ - Review context │
│ - Draft reply? │
│ - Handle or escalate │
└─────────────────────────┘
┌─────────────────────────┐
│ L3: Johan │ Signal notification
│ - Important stuff │
│ - Needs human decision │
└─────────────────────────┘
```
## Accounts (Multi-account support)
First account:
- **ID:** proton
- **Host:** 127.0.0.1
- **IMAP Port:** 1143
- **Username:** tj@jongsma.me
- **Password:** BlcMCKtNDfqv0cq1LmGR9g
- **TLS:** STARTTLS
## REST API
### Accounts
```
GET /accounts # List all configured accounts
POST /accounts # Add account
DELETE /accounts/{id} # Remove account
```
### Mailboxes
```
GET /accounts/{id}/mailboxes # List folders
```
### Messages
```
GET /accounts/{id}/messages
?folder=INBOX
&unread=true
&from=sender@example.com # Search by sender
&limit=50
&offset=0
GET /accounts/{id}/messages/{uid}?folder=INBOX
# Returns full message with body, attachments list
PATCH /accounts/{id}/messages/{uid}?folder=INBOX
Body: {"seen": true} # Mark as read
Body: {"seen": false} # Mark as unread
Body: {"folder": "Archive"} # Move to folder
DELETE /accounts/{id}/messages/{uid}?folder=INBOX
# Delete message
```
### Actions
```
POST /accounts/{id}/unsubscribe/{uid}?folder=INBOX
# Find unsubscribe link in email, execute it
```
### Push Events (IMAP IDLE)
```
GET /accounts/{id}/events?folder=INBOX
# SSE stream, emits on new mail:
# data: {"type": "new", "uid": 123, "from": "...", "subject": "..."}
```
## Triage Pipeline
### L1 Triage (Cheap Model)
- **Model:** Fireworks `accounts/fireworks/models/llama-v3p1-8b-instruct`
- **Cost:** ~$0.20/1M tokens (very cheap)
**L1 Prompt:**
```
Classify this email. Respond with JSON only.
From: {from}
Subject: {subject}
Preview: {first 500 chars}
Categories:
- spam: Obvious spam, phishing, scams
- newsletter: Marketing, newsletters, promotions
- receipt: Order confirmations, invoices (not shipping)
- shipping: Shipping/delivery updates (picked up, in transit, delivered)
- notification: Automated notifications (GitHub, services)
- personal: From a real person, needs attention
- important: Urgent, financial, legal, medical
- uncertain: Not sure, needs human review
For shipping emails, also extract:
- carrier: UPS, FedEx, USPS, DHL, etc.
- status: ordered, picked_up, in_transit, out_for_delivery, delivered
- item: Brief description of what's being shipped
- expected_date: Expected delivery date (if available)
Response format:
{"category": "...", "confidence": 0.0-1.0, "reason": "brief reason"}
For shipping:
{"category": "shipping", "confidence": 0.9, "reason": "...",
"shipping": {"carrier": "UPS", "status": "picked_up", "item": "E3-1275 Server", "expected_date": "2026-02-03"}}
```
**L1 Actions:**
| Category | Confidence > 0.8 | Confidence < 0.8 |
|----------|------------------|------------------|
| spam | Delete | → L2 |
| newsletter | Archive | → L2 |
| receipt | Archive + label | → L2 |
| shipping | → Dashboard + Archive | → L2 |
| notification | Archive | → L2 |
| personal | → L2 | → L2 |
| important | → L2 + flag | → L2 + flag |
| uncertain | → L2 | → L2 |
### Shipping Dashboard Integration
**Dashboard API:** http://100.123.216.65:9200 (James Dashboard)
When shipping email detected:
1. POST to `/api/news` with shipping update
2. Archive the email
3. Track status in local state
**Dashboard payload:**
```json
POST /api/news
{
"title": "📦 E3-1275 Server",
"body": "Picked up by UPS. Expected Feb 3rd.",
"type": "info",
"source": "shipping",
"url": null
}
```
**Status progression:**
- `ordered` → "Order confirmed"
- `picked_up` → "Picked up by {carrier}"
- `in_transit` → "In transit"
- `out_for_delivery` → "Out for delivery"
- `delivered` → "Delivered ✓"
**Auto-cleanup:**
- When status = `delivered`, set a flag
- Next day, DELETE the news item from dashboard
- Track shipments in local JSON: `~/.config/mail-agent/shipments.json`
### L2 Triage (James/Opus)
- Receives escalated emails via Gateway hook
- Full context review
- Can: archive, delete, draft reply, escalate to Johan
### L3 Escalation (Johan)
- Uses existing Clawdbot Signal integration (no hardcoded number needed)
- POST to gateway, let it route to Johan via Signal
- Summary of what needs attention
- Only for actually important stuff
**Escalation via Gateway:**
```
POST to gateway → routes to Johan's Signal via existing channel config
```
## Configuration
```yaml
# config.yaml
server:
host: 127.0.0.1
port: 8025
accounts:
proton:
host: 127.0.0.1
port: 1143
username: tj@jongsma.me
password: BlcMCKtNDfqv0cq1LmGR9g
tls: starttls
folders:
watch: [INBOX]
archive: Archive
spam: Spam
triage:
enabled: true
l1:
provider: fireworks
model: accounts/fireworks/models/llama-v3p1-8b-instruct
api_key: ${FIREWORKS_API_KEY}
l2:
gateway_url: http://localhost:18080 # or however gateway is reached
# Hook mechanism TBD
l3:
gateway_url: http://localhost:18080 # Uses existing Signal integration
shipping:
dashboard_url: http://100.123.216.65:9200
auto_cleanup_days: 1 # Remove from dashboard 1 day after delivered
rules:
always_escalate_from:
- "*@inou.com"
- "*@kaseya.com"
auto_archive_from:
- "*@github.com"
- "noreply@*"
auto_delete_from:
- known-spam-domains.txt
```
## Tech Stack
- **Language:** Python 3.11+
- **Framework:** FastAPI
- **IMAP:** imapclient (IDLE support)
- **Async:** asyncio + anyio
- **LLM:** httpx for Fireworks API
## Files Structure
```
mail-agent/
├── SPEC.md # This file
├── README.md # Usage docs
├── config.yaml # Configuration
├── requirements.txt # Dependencies
├── src/
│ ├── __init__.py
│ ├── main.py # FastAPI app entry
│ ├── config.py # Config loading
│ ├── models.py # Pydantic models
│ ├── imap/
│ │ ├── __init__.py
│ │ ├── client.py # IMAP connection
│ │ ├── idle.py # IDLE push handler
│ │ └── parser.py # Email parsing
│ ├── api/
│ │ ├── __init__.py
│ │ ├── accounts.py # Account endpoints
│ │ ├── messages.py # Message endpoints
│ │ └── events.py # SSE endpoint
│ ├── triage/
│ │ ├── __init__.py
│ │ ├── l1.py # L1 cheap model triage
│ │ ├── l2.py # L2 escalation to James
│ │ └── l3.py # L3 escalation to Johan
│ └── actions/
│ ├── __init__.py
│ └── unsubscribe.py # Unsubscribe handler
├── systemd/
│ └── mail-agent.service
└── tests/
└── ...
```
## Systemd Service
```ini
[Unit]
Description=Mail Agent
After=network.target protonmail-bridge.service
[Service]
Type=simple
User=johan
WorkingDirectory=/home/johan/dev/mail-agent
ExecStart=/home/johan/dev/mail-agent/.venv/bin/python -m src.main
Restart=always
RestartSec=10
Environment=FIREWORKS_API_KEY=...
[Install]
WantedBy=default.target
```
## Environment Variables
- `FIREWORKS_API_KEY` — For L1 model
- `MAIL_AGENT_CONFIG` — Path to config.yaml (default: ./config.yaml)
## Shipment Tracking State
Stored in `~/.config/mail-agent/shipments.json`:
```json
{
"shipments": [
{
"id": "ups-1234567890",
"carrier": "UPS",
"item": "E3-1275 Server",
"status": "picked_up",
"expected_date": "2026-02-03",
"dashboard_news_id": "abc123",
"last_updated": "2026-01-30T22:00:00Z",
"delivered_at": null
}
]
}
```
- On new shipping email: upsert shipment, update dashboard
- On delivered: set `delivered_at`, schedule cleanup
- Cleanup job: delete from dashboard after 1 day
## First Account (Pre-configured)
The Proton Bridge account is already running:
- Service: `systemctl --user status protonmail-bridge`
- IMAP: 127.0.0.1:1143
- Account: tj@jongsma.me
- Bridge password: BlcMCKtNDfqv0cq1LmGR9g
## Open Questions
1. **Gateway hook mechanism:** How does L2/L3 escalation reach James/Johan? POST to gateway? Will check gateway code for webhook/hook endpoint.
2. ~~**Johan's Signal number:**~~ RESOLVED: Use existing Clawdbot Signal integration via gateway
3. ~~**Fireworks API key:**~~ RESOLVED: Available in env
## Build Checklist
- [ ] Project scaffold
- [ ] Config loading
- [ ] IMAP client with IDLE
- [ ] REST API endpoints
- [ ] L1 triage with Fireworks
- [ ] Shipping detection + dashboard integration
- [ ] Shipment tracking state + auto-cleanup
- [ ] L2 escalation hook (gateway)
- [ ] L3 escalation via gateway → Signal
- [ ] Systemd service
- [ ] Test with Proton account
- [ ] README documentation

39
config.yaml Normal file
View File

@ -0,0 +1,39 @@
server:
host: 127.0.0.1
port: 8025
accounts:
proton:
host: 127.0.0.1
port: 1143
username: tj@jongsma.me
password: ${PROTON_BRIDGE_PASSWORD}
tls: starttls
folders:
watch: [INBOX]
archive: Archive
spam: Spam
triage:
enabled: true
l1:
provider: fireworks
model: accounts/fireworks/models/llama-v3p1-8b-instruct
api_key: ${FIREWORKS_API_KEY}
l2:
gateway_url: http://localhost:18080
l3:
gateway_url: http://localhost:18080
shipping:
dashboard_url: http://100.123.216.65:9200
auto_cleanup_days: 1
rules:
always_escalate_from:
- "*@inou.com"
- "*@kaseya.com"
auto_archive_from:
- "*@github.com"
- "noreply@*"
auto_delete_from: []

22
requirements.txt Normal file
View File

@ -0,0 +1,22 @@
# Core
fastapi>=0.109.0
uvicorn[standard]>=0.27.0
pydantic>=2.5.0
pydantic-settings>=2.1.0
pyyaml>=6.0
# IMAP
imapclient>=3.0.0
python-dotenv>=1.0.0
# HTTP client (for LLM API calls)
httpx>=0.26.0
# SSE support
sse-starlette>=2.0.0
# Email parsing
email-validator>=2.1.0
# Async
anyio>=4.2.0

1
src/__init__.py Normal file
View File

@ -0,0 +1 @@
# Mail Agent - IMAP-based email triage with multi-tier escalation

4
src/actions/__init__.py Normal file
View File

@ -0,0 +1,4 @@
"""Action handlers."""
from .unsubscribe import find_unsubscribe_link, execute_unsubscribe
__all__ = ["find_unsubscribe_link", "execute_unsubscribe"]

125
src/actions/unsubscribe.py Normal file
View File

@ -0,0 +1,125 @@
"""Unsubscribe action handler."""
import logging
import re
from typing import Optional
from urllib.parse import urlparse
import httpx
from ..models import Message
logger = logging.getLogger(__name__)
def find_unsubscribe_link(message: Message) -> Optional[str]:
"""Find unsubscribe link in an email message.
Checks:
1. List-Unsubscribe header (TODO: needs raw headers)
2. HTML body for common unsubscribe patterns
3. Text body for unsubscribe URLs
"""
# Search patterns
patterns = [
r'href=["\']?(https?://[^"\'>\s]*unsubscribe[^"\'>\s]*)["\']?',
r'href=["\']?(https?://[^"\'>\s]*optout[^"\'>\s]*)["\']?',
r'href=["\']?(https?://[^"\'>\s]*opt-out[^"\'>\s]*)["\']?',
r'href=["\']?(https?://[^"\'>\s]*remove[^"\'>\s]*)["\']?',
r'(https?://[^\s<>"]*unsubscribe[^\s<>"]*)',
r'(https?://[^\s<>"]*optout[^\s<>"]*)',
]
# Search in HTML body first
if message.body_html:
for pattern in patterns:
matches = re.findall(pattern, message.body_html, re.IGNORECASE)
if matches:
url = matches[0]
if _is_valid_unsubscribe_url(url):
return url
# Search in text body
if message.body_text:
for pattern in patterns:
matches = re.findall(pattern, message.body_text, re.IGNORECASE)
if matches:
url = matches[0]
if _is_valid_unsubscribe_url(url):
return url
return None
def _is_valid_unsubscribe_url(url: str) -> bool:
"""Validate that a URL looks like a legitimate unsubscribe link."""
try:
parsed = urlparse(url)
# Must be HTTP(S)
if parsed.scheme not in ("http", "https"):
return False
# Must have a host
if not parsed.netloc:
return False
# Reject obvious non-unsubscribe URLs
suspicious = ["login", "password", "account", "download"]
for term in suspicious:
if term in url.lower() and "unsubscribe" not in url.lower():
return False
return True
except Exception:
return False
async def execute_unsubscribe(url: str) -> tuple[bool, str]:
"""Execute an unsubscribe action by visiting the URL.
Returns (success, message).
"""
try:
async with httpx.AsyncClient(
timeout=30.0,
follow_redirects=True,
headers={
"User-Agent": "Mozilla/5.0 (compatible; MailAgent/1.0)",
},
) as client:
response = await client.get(url)
# Check for success indicators
if response.status_code == 200:
content = response.text.lower()
# Look for success messages
success_indicators = [
"unsubscribed",
"removed",
"successfully",
"you have been",
"no longer",
]
for indicator in success_indicators:
if indicator in content:
logger.info(f"Unsubscribe successful: {url}")
return True, "Successfully unsubscribed"
# If we got 200 but no clear success message, assume it worked
# (many unsubscribe pages just say "done" or redirect)
logger.info(f"Unsubscribe completed (no confirmation): {url}")
return True, "Unsubscribe request sent"
else:
logger.warning(f"Unsubscribe failed: {response.status_code} for {url}")
return False, f"HTTP {response.status_code}"
except httpx.TimeoutException:
logger.error(f"Unsubscribe timeout: {url}")
return False, "Request timed out"
except Exception as e:
logger.error(f"Unsubscribe error: {e}")
return False, str(e)

6
src/api/__init__.py Normal file
View File

@ -0,0 +1,6 @@
"""API module."""
from .accounts import router as accounts_router
from .messages import router as messages_router
from .events import router as events_router
__all__ = ["accounts_router", "messages_router", "events_router"]

101
src/api/accounts.py Normal file
View File

@ -0,0 +1,101 @@
"""Account management API endpoints."""
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from typing import Optional
from ..config import get_config, AccountConfig
from ..models import Account, Mailbox
from ..imap import ImapClient
router = APIRouter(prefix="/accounts", tags=["accounts"])
class AccountCreate(BaseModel):
id: str
host: str
port: int = 993
username: str
password: str
tls: str = "ssl"
@router.get("")
async def list_accounts() -> list[Account]:
"""List all configured accounts."""
config = get_config()
accounts = []
for account_id, acc_config in config.accounts.items():
# Try to connect to check status
connected = False
last_error = None
try:
client = ImapClient(account_id, acc_config)
client.connect()
client.disconnect()
connected = True
except Exception as e:
last_error = str(e)
accounts.append(Account(
id=account_id,
host=acc_config.host,
port=acc_config.port,
username=acc_config.username,
tls=acc_config.tls,
connected=connected,
last_error=last_error,
))
return accounts
@router.get("/{account_id}")
async def get_account(account_id: str) -> Account:
"""Get account details."""
config = get_config()
if account_id not in config.accounts:
raise HTTPException(status_code=404, detail="Account not found")
acc_config = config.accounts[account_id]
connected = False
last_error = None
try:
client = ImapClient(account_id, acc_config)
client.connect()
client.disconnect()
connected = True
except Exception as e:
last_error = str(e)
return Account(
id=account_id,
host=acc_config.host,
port=acc_config.port,
username=acc_config.username,
tls=acc_config.tls,
connected=connected,
last_error=last_error,
)
@router.get("/{account_id}/mailboxes")
async def list_mailboxes(account_id: str) -> list[Mailbox]:
"""List mailboxes/folders for an account."""
config = get_config()
if account_id not in config.accounts:
raise HTTPException(status_code=404, detail="Account not found")
acc_config = config.accounts[account_id]
try:
client = ImapClient(account_id, acc_config)
with client.session():
return client.list_mailboxes()
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))

76
src/api/events.py Normal file
View File

@ -0,0 +1,76 @@
"""Server-Sent Events for real-time mail notifications."""
import asyncio
import json
import logging
from typing import AsyncGenerator
from fastapi import APIRouter, HTTPException, Query
from sse_starlette.sse import EventSourceResponse
from ..config import get_config
from ..models import NewMailEvent
from ..imap.idle import IdleHandler
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/accounts/{account_id}/events", tags=["events"])
@router.get("")
async def subscribe_events(
account_id: str,
folder: str = Query(default="INBOX"),
) -> EventSourceResponse:
"""Subscribe to real-time mail events via SSE."""
config = get_config()
if account_id not in config.accounts:
raise HTTPException(status_code=404, detail="Account not found")
acc_config = config.accounts[account_id]
async def event_generator() -> AsyncGenerator[dict, None]:
"""Generate SSE events from IMAP IDLE."""
queue: asyncio.Queue[NewMailEvent] = asyncio.Queue()
def on_new_mail(acc_id: str, event: NewMailEvent) -> None:
if acc_id == account_id:
try:
queue.put_nowait(event)
except asyncio.QueueFull:
pass
handler = IdleHandler(account_id, acc_config, on_new_mail)
try:
await handler.start(folder)
# Send initial connection event
yield {
"event": "connected",
"data": json.dumps({
"account": account_id,
"folder": folder,
}),
}
# Stream events
while True:
try:
# Wait for events with timeout (keepalive)
event = await asyncio.wait_for(queue.get(), timeout=30.0)
yield {
"event": "message",
"data": event.model_dump_json(by_alias=True),
}
except asyncio.TimeoutError:
# Send keepalive
yield {
"event": "ping",
"data": "keepalive",
}
finally:
await handler.stop()
return EventSourceResponse(event_generator())

135
src/api/messages.py Normal file
View File

@ -0,0 +1,135 @@
"""Message management API endpoints."""
from fastapi import APIRouter, HTTPException, Query
from typing import Optional
from ..config import get_config
from ..models import Message, MessageUpdate
from ..imap import ImapClient
router = APIRouter(prefix="/accounts/{account_id}/messages", tags=["messages"])
@router.get("")
async def list_messages(
account_id: str,
folder: str = Query(default="INBOX"),
unread: bool = Query(default=False),
from_addr: Optional[str] = Query(default=None, alias="from"),
limit: int = Query(default=50, le=200),
offset: int = Query(default=0),
) -> list[Message]:
"""List messages in a folder."""
config = get_config()
if account_id not in config.accounts:
raise HTTPException(status_code=404, detail="Account not found")
acc_config = config.accounts[account_id]
try:
client = ImapClient(account_id, acc_config)
with client.session():
# Search for messages
uids = client.search(
folder=folder,
unread_only=unread,
from_addr=from_addr,
)
# Apply pagination
total = len(uids)
uids = uids[offset:offset + limit]
if not uids:
return []
# Fetch messages (envelope only)
return client.fetch_messages(uids, folder, body=False)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/{uid}")
async def get_message(
account_id: str,
uid: int,
folder: str = Query(default="INBOX"),
) -> Message:
"""Get a single message with full body."""
config = get_config()
if account_id not in config.accounts:
raise HTTPException(status_code=404, detail="Account not found")
acc_config = config.accounts[account_id]
try:
client = ImapClient(account_id, acc_config)
with client.session():
msg = client.fetch_message(uid, folder, body=True)
if msg is None:
raise HTTPException(status_code=404, detail="Message not found")
return msg
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.patch("/{uid}")
async def update_message(
account_id: str,
uid: int,
update: MessageUpdate,
folder: str = Query(default="INBOX"),
) -> dict:
"""Update message flags or move to another folder."""
config = get_config()
if account_id not in config.accounts:
raise HTTPException(status_code=404, detail="Account not found")
acc_config = config.accounts[account_id]
try:
client = ImapClient(account_id, acc_config)
with client.session():
if update.seen is not None:
client.mark_seen(uid, folder, update.seen)
if update.flagged is not None:
client.mark_flagged(uid, folder, update.flagged)
if update.folder is not None and update.folder != folder:
client.move_message(uid, folder, update.folder)
return {"status": "updated"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.delete("/{uid}")
async def delete_message(
account_id: str,
uid: int,
folder: str = Query(default="INBOX"),
) -> dict:
"""Delete a message."""
config = get_config()
if account_id not in config.accounts:
raise HTTPException(status_code=404, detail="Account not found")
acc_config = config.accounts[account_id]
try:
client = ImapClient(account_id, acc_config)
with client.session():
client.delete_message(uid, folder)
return {"status": "deleted"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))

137
src/config.py Normal file
View File

@ -0,0 +1,137 @@
"""Configuration loading and validation."""
import os
import re
from pathlib import Path
from typing import Optional
import yaml
from pydantic import BaseModel, Field
class ServerConfig(BaseModel):
host: str = "127.0.0.1"
port: int = 8025
class ImapFolders(BaseModel):
watch: list[str] = Field(default_factory=lambda: ["INBOX"])
archive: str = "Archive"
spam: str = "Spam"
class AccountConfig(BaseModel):
host: str
port: int = 993
username: str
password: str
tls: str = "ssl" # ssl, starttls, none
folders: ImapFolders = Field(default_factory=ImapFolders)
class L1Config(BaseModel):
provider: str = "fireworks"
model: str = "accounts/fireworks/models/llama-v3p1-8b-instruct"
api_key: str = ""
class L2Config(BaseModel):
gateway_url: str = "http://localhost:18080"
class L3Config(BaseModel):
gateway_url: str = "http://localhost:18080"
class ShippingConfig(BaseModel):
dashboard_url: str = "http://100.123.216.65:9200"
auto_cleanup_days: int = 1
class TriageRules(BaseModel):
always_escalate_from: list[str] = Field(default_factory=list)
auto_archive_from: list[str] = Field(default_factory=list)
auto_delete_from: list[str] = Field(default_factory=list)
class TriageConfig(BaseModel):
enabled: bool = True
l1: L1Config = Field(default_factory=L1Config)
l2: L2Config = Field(default_factory=L2Config)
l3: L3Config = Field(default_factory=L3Config)
shipping: ShippingConfig = Field(default_factory=ShippingConfig)
rules: TriageRules = Field(default_factory=TriageRules)
class Config(BaseModel):
server: ServerConfig = Field(default_factory=ServerConfig)
accounts: dict[str, AccountConfig] = Field(default_factory=dict)
triage: TriageConfig = Field(default_factory=TriageConfig)
def expand_env_vars(text: str) -> str:
"""Expand ${VAR} and $VAR patterns in text."""
def replacer(match):
var_name = match.group(1) or match.group(2)
return os.environ.get(var_name, match.group(0))
# Match ${VAR} or $VAR (not inside quotes for simplicity)
pattern = r'\$\{([^}]+)\}|\$([A-Za-z_][A-Za-z0-9_]*)'
return re.sub(pattern, replacer, text)
def load_config(path: Optional[str] = None) -> Config:
"""Load configuration from YAML file.
Searches in order:
1. Explicit path argument
2. MAIL_AGENT_CONFIG env var
3. ./config.yaml
4. ~/.config/mail-agent/config.yaml
"""
search_paths = []
if path:
search_paths.append(Path(path))
if env_path := os.environ.get("MAIL_AGENT_CONFIG"):
search_paths.append(Path(env_path))
search_paths.extend([
Path("config.yaml"),
Path.home() / ".config" / "mail-agent" / "config.yaml",
])
config_path = None
for p in search_paths:
if p.exists():
config_path = p
break
if config_path is None:
# Return default config
return Config()
# Load and expand env vars
raw = config_path.read_text()
expanded = expand_env_vars(raw)
data = yaml.safe_load(expanded)
return Config.model_validate(data)
# Global config instance
_config: Optional[Config] = None
def get_config() -> Config:
"""Get the global config instance."""
global _config
if _config is None:
_config = load_config()
return _config
def set_config(config: Config) -> None:
"""Set the global config instance (for testing)."""
global _config
_config = config

5
src/imap/__init__.py Normal file
View File

@ -0,0 +1,5 @@
"""IMAP client module."""
from .client import ImapClient
from .parser import parse_message
__all__ = ["ImapClient", "parse_message"]

265
src/imap/client.py Normal file
View File

@ -0,0 +1,265 @@
"""IMAP client wrapper with connection management."""
import logging
import ssl
from contextlib import contextmanager
from typing import Optional, Generator
from imapclient import IMAPClient
from ..config import AccountConfig
from ..models import Mailbox, Message, MessageFlags
from .parser import parse_message, parse_envelope
logger = logging.getLogger(__name__)
class ImapClient:
"""Thread-safe IMAP client wrapper."""
def __init__(self, account_id: str, config: AccountConfig):
self.account_id = account_id
self.config = config
self._client: Optional[IMAPClient] = None
self._current_folder: Optional[str] = None
def connect(self) -> None:
"""Establish IMAP connection."""
if self._client is not None:
return
# Determine SSL context
ssl_context = None
use_ssl = self.config.tls == "ssl"
if self.config.tls in ("ssl", "starttls"):
ssl_context = ssl.create_default_context()
# Allow self-signed certs for local Proton Bridge
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
logger.info(f"Connecting to {self.config.host}:{self.config.port} (SSL={use_ssl})")
self._client = IMAPClient(
self.config.host,
port=self.config.port,
ssl=use_ssl,
ssl_context=ssl_context if use_ssl else None,
)
if self.config.tls == "starttls":
self._client.starttls(ssl_context)
self._client.login(self.config.username, self.config.password)
logger.info(f"Logged in as {self.config.username}")
def disconnect(self) -> None:
"""Close IMAP connection."""
if self._client is not None:
try:
self._client.logout()
except Exception:
pass
self._client = None
self._current_folder = None
def is_connected(self) -> bool:
"""Check if connected."""
return self._client is not None
@contextmanager
def session(self) -> Generator["ImapClient", None, None]:
"""Context manager for connection lifecycle."""
try:
self.connect()
yield self
finally:
self.disconnect()
def _ensure_connected(self) -> IMAPClient:
"""Ensure we have an active connection."""
if self._client is None:
self.connect()
return self._client
def list_mailboxes(self) -> list[Mailbox]:
"""List all mailboxes/folders."""
client = self._ensure_connected()
folders = client.list_folders()
result = []
for flags, delimiter, name in folders:
# Get message counts if possible
status = {}
try:
status = client.folder_status(name, ["MESSAGES", "UNSEEN"])
except Exception:
pass
result.append(Mailbox(
name=name,
delimiter=delimiter or "/",
flags=[str(f) for f in flags],
message_count=status.get(b"MESSAGES", 0),
unread_count=status.get(b"UNSEEN", 0),
))
return result
def select_folder(self, folder: str = "INBOX") -> dict:
"""Select a folder for operations."""
client = self._ensure_connected()
self._current_folder = folder
return client.select_folder(folder)
def search(
self,
folder: str = "INBOX",
unread_only: bool = False,
from_addr: Optional[str] = None,
subject: Optional[str] = None,
since: Optional[str] = None,
) -> list[int]:
"""Search for messages matching criteria."""
client = self._ensure_connected()
if self._current_folder != folder:
self.select_folder(folder)
criteria = ["ALL"]
if unread_only:
criteria = ["UNSEEN"]
if from_addr:
criteria.extend(["FROM", from_addr])
if subject:
criteria.extend(["SUBJECT", subject])
if since:
criteria.extend(["SINCE", since])
return client.search(criteria)
def fetch_message(
self,
uid: int,
folder: str = "INBOX",
body: bool = True,
) -> Optional[Message]:
"""Fetch a single message by UID."""
client = self._ensure_connected()
if self._current_folder != folder:
self.select_folder(folder)
# Determine what to fetch
if body:
data_items = ["ENVELOPE", "FLAGS", "BODY[]", "BODYSTRUCTURE"]
else:
data_items = ["ENVELOPE", "FLAGS"]
response = client.fetch([uid], data_items)
if uid not in response:
return None
msg_data = response[uid]
return parse_message(uid, folder, msg_data)
def fetch_messages(
self,
uids: list[int],
folder: str = "INBOX",
body: bool = False,
) -> list[Message]:
"""Fetch multiple messages (envelope only by default)."""
if not uids:
return []
client = self._ensure_connected()
if self._current_folder != folder:
self.select_folder(folder)
data_items = ["ENVELOPE", "FLAGS"]
if body:
data_items.extend(["BODY[]", "BODYSTRUCTURE"])
response = client.fetch(uids, data_items)
messages = []
for uid, msg_data in response.items():
msg = parse_message(uid, folder, msg_data)
if msg:
messages.append(msg)
return messages
def mark_seen(self, uid: int, folder: str = "INBOX", seen: bool = True) -> None:
"""Mark a message as seen/unseen."""
client = self._ensure_connected()
if self._current_folder != folder:
self.select_folder(folder)
if seen:
client.add_flags([uid], [b"\\Seen"])
else:
client.remove_flags([uid], [b"\\Seen"])
def mark_flagged(self, uid: int, folder: str = "INBOX", flagged: bool = True) -> None:
"""Mark a message as flagged/unflagged."""
client = self._ensure_connected()
if self._current_folder != folder:
self.select_folder(folder)
if flagged:
client.add_flags([uid], [b"\\Flagged"])
else:
client.remove_flags([uid], [b"\\Flagged"])
def move_message(self, uid: int, from_folder: str, to_folder: str) -> None:
"""Move a message to another folder."""
client = self._ensure_connected()
if self._current_folder != from_folder:
self.select_folder(from_folder)
# Try MOVE command first, fall back to copy+delete
try:
client.move([uid], to_folder)
except Exception:
client.copy([uid], to_folder)
client.delete_messages([uid])
client.expunge()
def delete_message(self, uid: int, folder: str = "INBOX") -> None:
"""Delete a message permanently."""
client = self._ensure_connected()
if self._current_folder != folder:
self.select_folder(folder)
client.delete_messages([uid])
client.expunge()
def idle_start(self, folder: str = "INBOX", timeout: int = 300) -> None:
"""Start IDLE mode for push notifications."""
client = self._ensure_connected()
if self._current_folder != folder:
self.select_folder(folder)
client.idle()
def idle_check(self, timeout: Optional[int] = None) -> list:
"""Check for IDLE responses."""
client = self._ensure_connected()
return client.idle_check(timeout=timeout)
def idle_done(self) -> list:
"""Exit IDLE mode and get final responses."""
client = self._ensure_connected()
return client.idle_done()

219
src/imap/idle.py Normal file
View File

@ -0,0 +1,219 @@
"""IMAP IDLE handler for push notifications."""
import asyncio
import logging
from typing import Callable, Optional
from ..config import AccountConfig
from ..models import NewMailEvent
from .client import ImapClient
logger = logging.getLogger(__name__)
class IdleHandler:
"""Handles IMAP IDLE for real-time mail notifications."""
def __init__(
self,
account_id: str,
config: AccountConfig,
on_new_mail: Callable[[str, NewMailEvent], None],
):
self.account_id = account_id
self.config = config
self.on_new_mail = on_new_mail
self._client: Optional[ImapClient] = None
self._running = False
self._task: Optional[asyncio.Task] = None
async def start(self, folder: str = "INBOX") -> None:
"""Start IDLE monitoring in background."""
if self._running:
return
self._running = True
self._task = asyncio.create_task(self._idle_loop(folder))
logger.info(f"Started IDLE handler for {self.account_id}/{folder}")
async def stop(self) -> None:
"""Stop IDLE monitoring."""
self._running = False
if self._client:
try:
self._client.idle_done()
self._client.disconnect()
except Exception:
pass
self._client = None
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
self._task = None
logger.info(f"Stopped IDLE handler for {self.account_id}")
async def _idle_loop(self, folder: str) -> None:
"""Main IDLE loop - reconnects on errors."""
reconnect_delay = 5
max_delay = 300
while self._running:
try:
await self._do_idle(folder)
reconnect_delay = 5 # Reset on success
except asyncio.CancelledError:
raise
except Exception as e:
logger.error(f"IDLE error for {self.account_id}: {e}")
if self._client:
try:
self._client.disconnect()
except Exception:
pass
self._client = None
if self._running:
logger.info(f"Reconnecting in {reconnect_delay}s...")
await asyncio.sleep(reconnect_delay)
reconnect_delay = min(reconnect_delay * 2, max_delay)
async def _do_idle(self, folder: str) -> None:
"""Perform IDLE and process responses."""
self._client = ImapClient(self.account_id, self.config)
self._client.connect()
# Get initial message count
folder_info = self._client.select_folder(folder)
last_count = folder_info.get(b"EXISTS", 0)
logger.info(f"IDLE: {self.account_id}/{folder} has {last_count} messages")
while self._running:
# Start IDLE
self._client.idle_start(folder)
try:
# Wait for responses (with timeout for periodic reconnect)
responses = await asyncio.to_thread(
self._client.idle_check,
timeout=300 # 5 minute timeout
)
# Process responses
for response in responses:
if isinstance(response, tuple) and len(response) >= 2:
count, event = response
if event == b"EXISTS" and isinstance(count, int):
if count > last_count:
# New message(s) arrived
await self._handle_new_messages(
folder, last_count + 1, count
)
last_count = count
finally:
# Always exit IDLE mode
try:
self._client.idle_done()
except Exception:
pass
async def _handle_new_messages(
self,
folder: str,
start_seq: int,
end_seq: int,
) -> None:
"""Handle newly arrived messages."""
logger.info(f"New messages: {start_seq}-{end_seq} in {folder}")
# Fetch the new messages
# Note: sequence numbers, not UIDs
try:
# Search for recent messages
uids = self._client.search(folder, unread_only=True)
# Fetch just the envelopes
if uids:
messages = self._client.fetch_messages(
uids[-10:], # Last 10 unread
folder,
body=False,
)
for msg in messages:
event = NewMailEvent(
type="new",
uid=msg.uid,
**{"from": msg.from_},
subject=msg.subject,
folder=folder,
)
self.on_new_mail(self.account_id, event)
except Exception as e:
logger.error(f"Error fetching new messages: {e}")
class IdleManager:
"""Manages IDLE handlers for multiple accounts/folders."""
def __init__(self):
self._handlers: dict[str, IdleHandler] = {}
self._callbacks: list[Callable[[str, NewMailEvent], None]] = []
def add_callback(self, callback: Callable[[str, NewMailEvent], None]) -> None:
"""Register a callback for new mail events."""
self._callbacks.append(callback)
def _on_new_mail(self, account_id: str, event: NewMailEvent) -> None:
"""Dispatch new mail event to all callbacks."""
for callback in self._callbacks:
try:
callback(account_id, event)
except Exception as e:
logger.error(f"Callback error: {e}")
async def start_watching(
self,
account_id: str,
config: AccountConfig,
folders: list[str],
) -> None:
"""Start watching folders on an account."""
for folder in folders:
key = f"{account_id}:{folder}"
if key not in self._handlers:
handler = IdleHandler(
account_id,
config,
self._on_new_mail,
)
self._handlers[key] = handler
await handler.start(folder)
async def stop_watching(self, account_id: str, folder: Optional[str] = None) -> None:
"""Stop watching an account or specific folder."""
keys_to_remove = []
for key, handler in self._handlers.items():
if folder:
if key == f"{account_id}:{folder}":
await handler.stop()
keys_to_remove.append(key)
else:
if key.startswith(f"{account_id}:"):
await handler.stop()
keys_to_remove.append(key)
for key in keys_to_remove:
del self._handlers[key]
async def stop_all(self) -> None:
"""Stop all IDLE handlers."""
for handler in self._handlers.values():
await handler.stop()
self._handlers.clear()

240
src/imap/parser.py Normal file
View File

@ -0,0 +1,240 @@
"""Email message parsing utilities."""
import email
import logging
import re
from datetime import datetime
from email.header import decode_header
from email.utils import parsedate_to_datetime
from typing import Optional, Any
from ..models import Message, MessageFlags
logger = logging.getLogger(__name__)
def decode_header_value(value: Any) -> str:
"""Decode an email header value."""
if value is None:
return ""
if isinstance(value, bytes):
value = value.decode("utf-8", errors="replace")
if isinstance(value, str):
# Try to decode RFC 2047 encoded words
try:
decoded_parts = decode_header(value)
result_parts = []
for data, charset in decoded_parts:
if isinstance(data, bytes):
charset = charset or "utf-8"
result_parts.append(data.decode(charset, errors="replace"))
else:
result_parts.append(data)
return "".join(result_parts)
except Exception:
return value
return str(value)
def parse_address(addr: Any) -> str:
"""Parse an email address from ENVELOPE structure."""
if addr is None:
return ""
# ENVELOPE address format: (name, route, mailbox, host)
if isinstance(addr, (list, tuple)) and len(addr) >= 4:
name = decode_header_value(addr[0])
mailbox = decode_header_value(addr[2])
host = decode_header_value(addr[3])
if mailbox and host:
email_addr = f"{mailbox}@{host}"
if name:
return f"{name} <{email_addr}>"
return email_addr
return str(addr)
def parse_address_list(addrs: Any) -> list[str]:
"""Parse a list of addresses."""
if addrs is None:
return []
if isinstance(addrs, (list, tuple)):
return [parse_address(a) for a in addrs if a]
return []
def parse_envelope(envelope: Any) -> dict:
"""Parse an IMAP ENVELOPE structure."""
if envelope is None:
return {}
# ENVELOPE format:
# (date, subject, from, sender, reply-to, to, cc, bcc, in-reply-to, message-id)
result = {}
try:
if len(envelope) >= 1:
result["date"] = decode_header_value(envelope[0])
if len(envelope) >= 2:
result["subject"] = decode_header_value(envelope[1])
if len(envelope) >= 3:
from_list = parse_address_list(envelope[2])
result["from"] = from_list[0] if from_list else ""
if len(envelope) >= 6:
result["to"] = parse_address_list(envelope[5])
if len(envelope) >= 7:
result["cc"] = parse_address_list(envelope[6])
if len(envelope) >= 10:
result["message_id"] = decode_header_value(envelope[9])
except Exception as e:
logger.warning(f"Error parsing envelope: {e}")
return result
def parse_flags(flags: Any) -> MessageFlags:
"""Parse IMAP FLAGS."""
if flags is None:
return MessageFlags()
flags_set = {str(f).lower() for f in flags}
return MessageFlags(
seen=b"\\seen" in {f.lower() if isinstance(f, bytes) else f.lower().encode() for f in flags} or "\\seen" in flags_set,
flagged=b"\\flagged" in {f.lower() if isinstance(f, bytes) else f.lower().encode() for f in flags} or "\\flagged" in flags_set,
answered=b"\\answered" in {f.lower() if isinstance(f, bytes) else f.lower().encode() for f in flags} or "\\answered" in flags_set,
deleted=b"\\deleted" in {f.lower() if isinstance(f, bytes) else f.lower().encode() for f in flags} or "\\deleted" in flags_set,
)
def extract_text_body(msg: email.message.Message, max_length: int = 5000) -> tuple[str, str]:
"""Extract text and HTML bodies from an email message."""
text_body = ""
html_body = ""
if msg.is_multipart():
for part in msg.walk():
content_type = part.get_content_type()
content_disposition = str(part.get("Content-Disposition", ""))
# Skip attachments
if "attachment" in content_disposition:
continue
if content_type == "text/plain" and not text_body:
try:
payload = part.get_payload(decode=True)
if payload:
charset = part.get_content_charset() or "utf-8"
text_body = payload.decode(charset, errors="replace")[:max_length]
except Exception:
pass
elif content_type == "text/html" and not html_body:
try:
payload = part.get_payload(decode=True)
if payload:
charset = part.get_content_charset() or "utf-8"
html_body = payload.decode(charset, errors="replace")[:max_length]
except Exception:
pass
else:
content_type = msg.get_content_type()
try:
payload = msg.get_payload(decode=True)
if payload:
charset = msg.get_content_charset() or "utf-8"
decoded = payload.decode(charset, errors="replace")[:max_length]
if content_type == "text/html":
html_body = decoded
else:
text_body = decoded
except Exception:
pass
return text_body, html_body
def extract_attachments(msg: email.message.Message) -> list[str]:
"""Extract attachment filenames."""
attachments = []
if not msg.is_multipart():
return attachments
for part in msg.walk():
content_disposition = str(part.get("Content-Disposition", ""))
if "attachment" in content_disposition:
filename = part.get_filename()
if filename:
attachments.append(decode_header_value(filename))
return attachments
def parse_message(uid: int, folder: str, msg_data: dict) -> Optional[Message]:
"""Parse IMAP fetch response into a Message object."""
try:
# Parse envelope
envelope = msg_data.get(b"ENVELOPE")
env_data = parse_envelope(envelope)
# Parse flags
flags = parse_flags(msg_data.get(b"FLAGS"))
# Parse date
msg_date = None
if date_str := env_data.get("date"):
try:
msg_date = parsedate_to_datetime(date_str)
except Exception:
pass
# Parse body if present
body_text = ""
body_html = ""
body_preview = ""
attachments = []
if raw_body := msg_data.get(b"BODY[]"):
try:
msg = email.message_from_bytes(raw_body)
body_text, body_html = extract_text_body(msg)
attachments = extract_attachments(msg)
# Create preview from text body
if body_text:
body_preview = re.sub(r"\s+", " ", body_text[:500]).strip()
elif body_html:
# Strip HTML tags for preview
clean = re.sub(r"<[^>]+>", " ", body_html)
body_preview = re.sub(r"\s+", " ", clean[:500]).strip()
except Exception as e:
logger.warning(f"Error parsing body: {e}")
return Message(
uid=uid,
folder=folder,
message_id=env_data.get("message_id"),
**{"from": env_data.get("from", "")},
to=env_data.get("to", []),
cc=env_data.get("cc", []),
subject=env_data.get("subject"),
date=msg_date,
flags=flags,
body_preview=body_preview,
body_text=body_text,
body_html=body_html,
has_attachments=bool(attachments),
attachment_names=attachments,
)
except Exception as e:
logger.error(f"Error parsing message {uid}: {e}")
return None

211
src/main.py Normal file
View File

@ -0,0 +1,211 @@
"""Mail Agent - IMAP-based email triage with multi-tier escalation."""
import asyncio
import logging
import sys
from contextlib import asynccontextmanager
from typing import Optional
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from .config import load_config, get_config, Config
from .api import accounts_router, messages_router, events_router
from .imap import ImapClient
from .imap.idle import IdleManager
from .triage import TriagePipeline
from .models import NewMailEvent
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)
# Global state
idle_manager: Optional[IdleManager] = None
triage_pipeline: Optional[TriagePipeline] = None
imap_clients: dict[str, ImapClient] = {}
def get_imap_client(account_id: str) -> ImapClient:
"""Get or create an IMAP client for an account."""
if account_id not in imap_clients:
config = get_config()
if account_id not in config.accounts:
raise ValueError(f"Unknown account: {account_id}")
imap_clients[account_id] = ImapClient(account_id, config.accounts[account_id])
return imap_clients[account_id]
def on_archive(account_id: str, uid: int, folder: str) -> None:
"""Archive a message."""
try:
config = get_config()
archive_folder = config.accounts[account_id].folders.archive
client = get_imap_client(account_id)
client.connect()
client.move_message(uid, folder, archive_folder)
logger.info(f"Archived message {uid} to {archive_folder}")
except Exception as e:
logger.error(f"Archive failed: {e}")
def on_delete(account_id: str, uid: int, folder: str) -> None:
"""Delete a message."""
try:
client = get_imap_client(account_id)
client.connect()
client.delete_message(uid, folder)
logger.info(f"Deleted message {uid}")
except Exception as e:
logger.error(f"Delete failed: {e}")
async def on_new_mail(account_id: str, event: NewMailEvent) -> None:
"""Handle new mail event from IDLE."""
global triage_pipeline
logger.info(f"New mail: {event.from_} - {event.subject}")
if triage_pipeline is None:
return
try:
# Fetch full message
client = get_imap_client(account_id)
client.connect()
message = client.fetch_message(event.uid, event.folder, body=True)
if message:
# Run through triage pipeline
await triage_pipeline.process(account_id, message)
except Exception as e:
logger.error(f"Triage error: {e}")
def on_new_mail_sync(account_id: str, event: NewMailEvent) -> None:
"""Sync wrapper for async on_new_mail."""
try:
loop = asyncio.get_running_loop()
asyncio.create_task(on_new_mail(account_id, event))
except RuntimeError:
# No running loop, run synchronously
asyncio.run(on_new_mail(account_id, event))
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan handler."""
global idle_manager, triage_pipeline
logger.info("Starting Mail Agent...")
config = get_config()
# Initialize triage pipeline if enabled
if config.triage.enabled:
triage_pipeline = TriagePipeline(
config.triage,
on_archive=on_archive,
on_delete=on_delete,
)
logger.info("Triage pipeline initialized")
# Initialize IDLE manager
idle_manager = IdleManager()
idle_manager.add_callback(on_new_mail_sync)
# Start watching configured accounts
for account_id, acc_config in config.accounts.items():
try:
await idle_manager.start_watching(
account_id,
acc_config,
acc_config.folders.watch,
)
logger.info(f"Watching {account_id}: {acc_config.folders.watch}")
except Exception as e:
logger.error(f"Failed to watch {account_id}: {e}")
yield
# Cleanup
logger.info("Shutting down Mail Agent...")
if idle_manager:
await idle_manager.stop_all()
if triage_pipeline:
await triage_pipeline.close()
for client in imap_clients.values():
try:
client.disconnect()
except Exception:
pass
logger.info("Mail Agent stopped")
# Create FastAPI app
app = FastAPI(
title="Mail Agent",
description="IMAP-based email triage with multi-tier escalation",
version="0.1.0",
lifespan=lifespan,
)
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Include routers
app.include_router(accounts_router)
app.include_router(messages_router)
app.include_router(events_router)
@app.get("/")
async def root():
"""Root endpoint."""
return {
"name": "Mail Agent",
"version": "0.1.0",
"status": "running",
}
@app.get("/health")
async def health():
"""Health check endpoint."""
config = get_config()
return {
"status": "healthy",
"accounts": list(config.accounts.keys()),
"triage_enabled": config.triage.enabled,
}
def main():
"""Entry point for running as a module."""
import uvicorn
config = load_config()
uvicorn.run(
"src.main:app",
host=config.server.host,
port=config.server.port,
reload=False,
)
if __name__ == "__main__":
main()

114
src/models.py Normal file
View File

@ -0,0 +1,114 @@
"""Pydantic models for the mail agent."""
from datetime import datetime
from enum import Enum
from typing import Optional
from pydantic import BaseModel, Field
class EmailCategory(str, Enum):
SPAM = "spam"
NEWSLETTER = "newsletter"
RECEIPT = "receipt"
SHIPPING = "shipping"
NOTIFICATION = "notification"
PERSONAL = "personal"
IMPORTANT = "important"
UNCERTAIN = "uncertain"
class ShippingStatus(str, Enum):
ORDERED = "ordered"
PICKED_UP = "picked_up"
IN_TRANSIT = "in_transit"
OUT_FOR_DELIVERY = "out_for_delivery"
DELIVERED = "delivered"
class ShippingInfo(BaseModel):
carrier: Optional[str] = None
status: Optional[ShippingStatus] = None
item: Optional[str] = None
expected_date: Optional[str] = None
tracking_number: Optional[str] = None
class TriageResult(BaseModel):
category: EmailCategory
confidence: float = Field(ge=0.0, le=1.0)
reason: str
shipping: Optional[ShippingInfo] = None
class MessageFlags(BaseModel):
seen: bool = False
flagged: bool = False
answered: bool = False
deleted: bool = False
class Message(BaseModel):
uid: int
folder: str
message_id: Optional[str] = None
from_: str = Field(alias="from")
to: list[str] = Field(default_factory=list)
cc: list[str] = Field(default_factory=list)
subject: Optional[str] = None
date: Optional[datetime] = None
flags: MessageFlags = Field(default_factory=MessageFlags)
body_preview: Optional[str] = None
body_text: Optional[str] = None
body_html: Optional[str] = None
has_attachments: bool = False
attachment_names: list[str] = Field(default_factory=list)
class Config:
populate_by_name = True
class MessageUpdate(BaseModel):
seen: Optional[bool] = None
flagged: Optional[bool] = None
folder: Optional[str] = None
class Account(BaseModel):
id: str
host: str
port: int
username: str
tls: str
connected: bool = False
last_error: Optional[str] = None
class Mailbox(BaseModel):
name: str
delimiter: str = "/"
flags: list[str] = Field(default_factory=list)
message_count: int = 0
unread_count: int = 0
class NewMailEvent(BaseModel):
type: str = "new"
uid: int
from_: str = Field(alias="from")
subject: Optional[str] = None
folder: str = "INBOX"
class Config:
populate_by_name = True
class Shipment(BaseModel):
id: str
carrier: str
item: str
status: ShippingStatus
expected_date: Optional[str] = None
tracking_number: Optional[str] = None
dashboard_news_id: Optional[str] = None
last_updated: datetime = Field(default_factory=datetime.utcnow)
delivered_at: Optional[datetime] = None

7
src/triage/__init__.py Normal file
View File

@ -0,0 +1,7 @@
"""Email triage module."""
from .l1 import L1Triage
from .l2 import L2Escalation
from .l3 import L3Escalation
from .pipeline import TriagePipeline
__all__ = ["L1Triage", "L2Escalation", "L3Escalation", "TriagePipeline"]

160
src/triage/l1.py Normal file
View File

@ -0,0 +1,160 @@
"""L1 Triage - Cheap model classification."""
import json
import logging
from typing import Optional
import httpx
from ..config import L1Config
from ..models import EmailCategory, Message, ShippingInfo, ShippingStatus, TriageResult
logger = logging.getLogger(__name__)
L1_PROMPT = """Classify this email. Respond with JSON only.
From: {from_addr}
Subject: {subject}
Preview: {preview}
Categories:
- spam: Obvious spam, phishing, scams
- newsletter: Marketing, newsletters, promotions
- receipt: Order confirmations, invoices (not shipping)
- shipping: Shipping/delivery updates (picked up, in transit, delivered)
- notification: Automated notifications (GitHub, services)
- personal: From a real person, needs attention
- important: Urgent, financial, legal, medical
- uncertain: Not sure, needs human review
For shipping emails, also extract:
- carrier: UPS, FedEx, USPS, DHL, etc.
- status: ordered, picked_up, in_transit, out_for_delivery, delivered
- item: Brief description of what's being shipped
- expected_date: Expected delivery date (if available)
Response format:
{{"category": "...", "confidence": 0.0-1.0, "reason": "brief reason"}}
For shipping:
{{"category": "shipping", "confidence": 0.9, "reason": "...",
"shipping": {{"carrier": "UPS", "status": "picked_up", "item": "E3-1275 Server", "expected_date": "2026-02-03"}}}}"""
class L1Triage:
"""Level 1 triage using cheap LLM."""
def __init__(self, config: L1Config):
self.config = config
self._client: Optional[httpx.AsyncClient] = None
async def _get_client(self) -> httpx.AsyncClient:
if self._client is None:
self._client = httpx.AsyncClient(timeout=30.0)
return self._client
async def close(self) -> None:
if self._client:
await self._client.aclose()
self._client = None
async def classify(self, message: Message) -> TriageResult:
"""Classify a message using the L1 model."""
prompt = L1_PROMPT.format(
from_addr=message.from_,
subject=message.subject or "(no subject)",
preview=message.body_preview[:500] if message.body_preview else "(no preview)",
)
try:
response = await self._call_llm(prompt)
return self._parse_response(response)
except Exception as e:
logger.error(f"L1 classification error: {e}")
return TriageResult(
category=EmailCategory.UNCERTAIN,
confidence=0.0,
reason=f"Classification failed: {e}",
)
async def _call_llm(self, prompt: str) -> str:
"""Call the Fireworks API."""
client = await self._get_client()
url = "https://api.fireworks.ai/inference/v1/chat/completions"
headers = {
"Authorization": f"Bearer {self.config.api_key}",
"Content-Type": "application/json",
}
payload = {
"model": self.config.model,
"messages": [
{"role": "user", "content": prompt}
],
"max_tokens": 500,
"temperature": 0.1, # Low temp for consistent classification
}
response = await client.post(url, headers=headers, json=payload)
response.raise_for_status()
data = response.json()
return data["choices"][0]["message"]["content"]
def _parse_response(self, response: str) -> TriageResult:
"""Parse the LLM response into a TriageResult."""
# Try to extract JSON from response
try:
# Find JSON in response (may have extra text)
start = response.find("{")
end = response.rfind("}") + 1
if start >= 0 and end > start:
json_str = response[start:end]
data = json.loads(json_str)
else:
raise ValueError("No JSON found in response")
# Parse category
category_str = data.get("category", "uncertain").lower()
try:
category = EmailCategory(category_str)
except ValueError:
category = EmailCategory.UNCERTAIN
# Parse confidence
confidence = float(data.get("confidence", 0.5))
confidence = max(0.0, min(1.0, confidence))
# Parse reason
reason = data.get("reason", "")
# Parse shipping info if present
shipping = None
if shipping_data := data.get("shipping"):
status_str = shipping_data.get("status", "").lower()
try:
status = ShippingStatus(status_str) if status_str else None
except ValueError:
status = None
shipping = ShippingInfo(
carrier=shipping_data.get("carrier"),
status=status,
item=shipping_data.get("item"),
expected_date=shipping_data.get("expected_date"),
tracking_number=shipping_data.get("tracking_number"),
)
return TriageResult(
category=category,
confidence=confidence,
reason=reason,
shipping=shipping,
)
except Exception as e:
logger.warning(f"Failed to parse L1 response: {e}\nResponse: {response}")
return TriageResult(
category=EmailCategory.UNCERTAIN,
confidence=0.3,
reason=f"Parse error: {e}",
)

116
src/triage/l2.py Normal file
View File

@ -0,0 +1,116 @@
"""L2 Escalation - Send to James (Opus) via Gateway."""
import logging
from typing import Optional
import httpx
from ..config import L2Config
from ..models import Message, TriageResult
logger = logging.getLogger(__name__)
class L2Escalation:
"""Level 2 escalation to James via Clawdbot Gateway."""
def __init__(self, config: L2Config):
self.config = config
self._client: Optional[httpx.AsyncClient] = None
async def _get_client(self) -> httpx.AsyncClient:
if self._client is None:
self._client = httpx.AsyncClient(timeout=60.0)
return self._client
async def close(self) -> None:
if self._client:
await self._client.aclose()
self._client = None
async def escalate(
self,
message: Message,
triage_result: TriageResult,
account_id: str,
) -> bool:
"""Escalate a message to James for review.
Returns True if escalation was successful.
"""
# Format the escalation message
escalation_text = self._format_escalation(message, triage_result, account_id)
try:
# Send to gateway
client = await self._get_client()
# Gateway internal message API
# This hooks into an existing session or creates one
url = f"{self.config.gateway_url}/api/message"
payload = {
"text": escalation_text,
"source": "mail-agent",
"metadata": {
"type": "email_escalation",
"account": account_id,
"uid": message.uid,
"folder": message.folder,
"l1_category": triage_result.category.value,
"l1_confidence": triage_result.confidence,
},
}
response = await client.post(url, json=payload)
if response.status_code in (200, 201, 202):
logger.info(f"L2 escalation sent for message {message.uid}")
return True
else:
logger.warning(
f"L2 escalation failed: {response.status_code} {response.text}"
)
return False
except Exception as e:
logger.error(f"L2 escalation error: {e}")
return False
def _format_escalation(
self,
message: Message,
triage_result: TriageResult,
account_id: str,
) -> str:
"""Format the escalation message for James."""
lines = [
"📧 **Email Review Request**",
"",
f"**From:** {message.from_}",
f"**Subject:** {message.subject or '(no subject)'}",
f"**Account:** {account_id}",
f"**Folder:** {message.folder}",
"",
f"**L1 Triage:** {triage_result.category.value} ({triage_result.confidence:.0%})",
f"**Reason:** {triage_result.reason}",
"",
"**Preview:**",
"```",
(message.body_preview or "(no preview)")[:500],
"```",
"",
"**Actions:**",
"- Reply: Draft a response",
"- Archive: Move to archive",
"- Delete: Delete the message",
"- Escalate: Send to Johan",
]
if triage_result.shipping:
ship = triage_result.shipping
lines.insert(8, "")
lines.insert(9, f"**Shipping:** {ship.carrier} - {ship.status}")
if ship.item:
lines.insert(10, f"**Item:** {ship.item}")
return "\n".join(lines)

132
src/triage/l3.py Normal file
View File

@ -0,0 +1,132 @@
"""L3 Escalation - Send to Johan via Gateway/Signal."""
import logging
from typing import Optional
import httpx
from ..config import L3Config
from ..models import Message, TriageResult
logger = logging.getLogger(__name__)
class L3Escalation:
"""Level 3 escalation to Johan via Signal."""
def __init__(self, config: L3Config):
self.config = config
self._client: Optional[httpx.AsyncClient] = None
async def _get_client(self) -> httpx.AsyncClient:
if self._client is None:
self._client = httpx.AsyncClient(timeout=30.0)
return self._client
async def close(self) -> None:
if self._client:
await self._client.aclose()
self._client = None
async def escalate(
self,
message: Message,
triage_result: TriageResult,
account_id: str,
james_notes: Optional[str] = None,
) -> bool:
"""Escalate a message to Johan via Signal.
Returns True if escalation was successful.
"""
# Format the escalation message
escalation_text = self._format_escalation(
message, triage_result, account_id, james_notes
)
try:
client = await self._get_client()
# Use gateway message API to send via Signal
# Gateway routes to the configured Signal channel
url = f"{self.config.gateway_url}/api/message/send"
payload = {
"channel": "signal",
"message": escalation_text,
# Target: main user (Johan) - gateway knows this
}
response = await client.post(url, json=payload)
if response.status_code in (200, 201, 202):
logger.info(f"L3 escalation sent for message {message.uid}")
return True
else:
logger.warning(
f"L3 escalation failed: {response.status_code} {response.text}"
)
return False
except Exception as e:
logger.error(f"L3 escalation error: {e}")
return False
def _format_escalation(
self,
message: Message,
triage_result: TriageResult,
account_id: str,
james_notes: Optional[str],
) -> str:
"""Format the escalation message for Johan."""
lines = [
"🚨 **Email Needs Your Attention**",
"",
f"**From:** {message.from_}",
f"**Subject:** {message.subject or '(no subject)'}",
"",
]
# Add James's notes if present
if james_notes:
lines.extend([
"**James says:**",
james_notes,
"",
])
# Add preview
preview = (message.body_preview or "")[:300]
if preview:
lines.extend([
"**Preview:**",
preview,
"",
])
# Add context
lines.extend([
f"_Category: {triage_result.category.value} | "
f"Account: {account_id} | "
f"UID: {message.uid}_",
])
return "\n".join(lines)
async def send_notification(self, text: str) -> bool:
"""Send a generic notification to Johan."""
try:
client = await self._get_client()
url = f"{self.config.gateway_url}/api/message/send"
payload = {
"channel": "signal",
"message": text,
}
response = await client.post(url, json=payload)
return response.status_code in (200, 201, 202)
except Exception as e:
logger.error(f"Notification error: {e}")
return False

237
src/triage/pipeline.py Normal file
View File

@ -0,0 +1,237 @@
"""Triage pipeline orchestration."""
import fnmatch
import logging
from typing import Callable, Optional
import httpx
from ..config import TriageConfig, ShippingConfig
from ..models import EmailCategory, Message, Shipment, ShippingStatus, TriageResult
from .l1 import L1Triage
from .l2 import L2Escalation
from .l3 import L3Escalation
logger = logging.getLogger(__name__)
class TriagePipeline:
"""Orchestrates the multi-tier triage process."""
def __init__(
self,
config: TriageConfig,
on_archive: Optional[Callable[[str, int, str], None]] = None,
on_delete: Optional[Callable[[str, int, str], None]] = None,
):
self.config = config
self.on_archive = on_archive
self.on_delete = on_delete
self.l1 = L1Triage(config.l1)
self.l2 = L2Escalation(config.l2)
self.l3 = L3Escalation(config.l3)
self._shipping_client: Optional[httpx.AsyncClient] = None
async def close(self) -> None:
await self.l1.close()
await self.l2.close()
await self.l3.close()
if self._shipping_client:
await self._shipping_client.aclose()
async def process(self, account_id: str, message: Message) -> TriageResult:
"""Process a message through the triage pipeline."""
# Check rules first
action = self._check_rules(message)
if action:
return await self._execute_rule_action(account_id, message, action)
# L1 classification
result = await self.l1.classify(message)
logger.info(
f"L1: {message.uid} -> {result.category.value} "
f"({result.confidence:.0%}): {result.reason}"
)
# Decide action based on category and confidence
await self._handle_result(account_id, message, result)
return result
def _check_rules(self, message: Message) -> Optional[str]:
"""Check if any rules apply to this message."""
from_addr = message.from_.lower()
# Always escalate from specific domains
for pattern in self.config.rules.always_escalate_from:
if self._matches_pattern(from_addr, pattern):
return "escalate"
# Auto-archive from specific senders
for pattern in self.config.rules.auto_archive_from:
if self._matches_pattern(from_addr, pattern):
return "archive"
# Auto-delete from specific senders
for pattern in self.config.rules.auto_delete_from:
if self._matches_pattern(from_addr, pattern):
return "delete"
return None
def _matches_pattern(self, address: str, pattern: str) -> bool:
"""Check if an email address matches a pattern."""
pattern = pattern.lower()
# Extract just the email part if it has a name
if "<" in address and ">" in address:
start = address.find("<") + 1
end = address.find(">")
address = address[start:end]
return fnmatch.fnmatch(address, pattern)
async def _execute_rule_action(
self,
account_id: str,
message: Message,
action: str,
) -> TriageResult:
"""Execute a rule-based action."""
result = TriageResult(
category=EmailCategory.NOTIFICATION,
confidence=1.0,
reason=f"Rule: auto-{action}",
)
if action == "archive":
if self.on_archive:
self.on_archive(account_id, message.uid, message.folder)
logger.info(f"Rule: archived {message.uid}")
elif action == "delete":
if self.on_delete:
self.on_delete(account_id, message.uid, message.folder)
logger.info(f"Rule: deleted {message.uid}")
elif action == "escalate":
result.category = EmailCategory.IMPORTANT
result.reason = "Rule: always escalate from this sender"
await self.l2.escalate(message, result, account_id)
return result
async def _handle_result(
self,
account_id: str,
message: Message,
result: TriageResult,
) -> None:
"""Handle the L1 classification result."""
category = result.category
confidence = result.confidence
# High confidence actions
if confidence >= 0.8:
if category == EmailCategory.SPAM:
if self.on_delete:
self.on_delete(account_id, message.uid, message.folder)
logger.info(f"Deleted spam: {message.uid}")
return
elif category == EmailCategory.NEWSLETTER:
if self.on_archive:
self.on_archive(account_id, message.uid, message.folder)
logger.info(f"Archived newsletter: {message.uid}")
return
elif category == EmailCategory.RECEIPT:
if self.on_archive:
self.on_archive(account_id, message.uid, message.folder)
logger.info(f"Archived receipt: {message.uid}")
return
elif category == EmailCategory.SHIPPING:
await self._handle_shipping(account_id, message, result)
if self.on_archive:
self.on_archive(account_id, message.uid, message.folder)
logger.info(f"Processed shipping: {message.uid}")
return
elif category == EmailCategory.NOTIFICATION:
if self.on_archive:
self.on_archive(account_id, message.uid, message.folder)
logger.info(f"Archived notification: {message.uid}")
return
# Escalate to L2 for:
# - Personal/important messages
# - Low confidence classifications
# - Uncertain category
if category in (EmailCategory.PERSONAL, EmailCategory.IMPORTANT):
await self.l2.escalate(message, result, account_id)
logger.info(f"Escalated to L2: {message.uid} ({category.value})")
elif category == EmailCategory.UNCERTAIN or confidence < 0.8:
await self.l2.escalate(message, result, account_id)
logger.info(f"Escalated to L2 (uncertain): {message.uid}")
# Flag important messages
if category == EmailCategory.IMPORTANT:
# TODO: Flag the message in IMAP
pass
async def _handle_shipping(
self,
account_id: str,
message: Message,
result: TriageResult,
) -> None:
"""Handle a shipping notification."""
if not result.shipping:
return
ship = result.shipping
# Post to dashboard
await self._post_shipping_to_dashboard(ship)
# Track shipment state (TODO: implement state file)
logger.info(
f"Shipping: {ship.carrier} - {ship.status} - {ship.item}"
)
async def _post_shipping_to_dashboard(self, ship) -> None:
"""Post shipping update to James Dashboard."""
if not self._shipping_client:
self._shipping_client = httpx.AsyncClient(timeout=10.0)
try:
url = f"{self.config.shipping.dashboard_url}/api/news"
# Format status message
status_text = {
ShippingStatus.ORDERED: "Order confirmed",
ShippingStatus.PICKED_UP: f"Picked up by {ship.carrier}",
ShippingStatus.IN_TRANSIT: "In transit",
ShippingStatus.OUT_FOR_DELIVERY: "Out for delivery",
ShippingStatus.DELIVERED: "Delivered ✓",
}.get(ship.status, str(ship.status))
if ship.expected_date:
status_text += f". Expected {ship.expected_date}"
payload = {
"title": f"📦 {ship.item or 'Package'}",
"body": status_text,
"type": "info" if ship.status != ShippingStatus.DELIVERED else "success",
"source": "shipping",
}
response = await self._shipping_client.post(url, json=payload)
if response.status_code not in (200, 201):
logger.warning(f"Dashboard post failed: {response.status_code}")
except Exception as e:
logger.error(f"Dashboard post error: {e}")

View File

@ -0,0 +1,16 @@
[Unit]
Description=Mail Agent - IMAP triage service
After=network.target protonmail-bridge.service
[Service]
Type=simple
User=johan
WorkingDirectory=/home/johan/dev/mail-agent
ExecStart=/home/johan/dev/mail-agent/.venv/bin/python -m src.main
Restart=always
RestartSec=10
Environment=FIREWORKS_API_KEY=
Environment=PROTON_BRIDGE_PASSWORD=BlcMCKtNDfqv0cq1LmGR9g
[Install]
WantedBy=default.target