From 5d9e58f5aed4e0e711fcab68ab0b2b18c5282497 Mon Sep 17 00:00:00 2001 From: James Date: Sat, 31 Jan 2026 01:09:00 +0000 Subject: [PATCH] Initial mail-agent implementation Full IMAP-based email triage system with multi-tier escalation. Components: - IMAP client with IDLE push notifications - Email parsing with RFC 2047 header decoding - FastAPI REST API (accounts, messages, SSE events) - Triage pipeline (L1/L2/L3) with configurable rules - Shipping dashboard integration - Unsubscribe link detection Pre-configured for Proton Bridge (tj@jongsma.me). --- .venv/bin/python | 1 + .venv/bin/python3 | 1 + .venv/bin/python3.12 | 1 + .venv/lib64 | 1 + .venv/pyvenv.cfg | 5 + README.md | 152 ++++++++++++++++ SPEC.md | 353 +++++++++++++++++++++++++++++++++++++ config.yaml | 39 ++++ requirements.txt | 22 +++ src/__init__.py | 1 + src/actions/__init__.py | 4 + src/actions/unsubscribe.py | 125 +++++++++++++ src/api/__init__.py | 6 + src/api/accounts.py | 101 +++++++++++ src/api/events.py | 76 ++++++++ src/api/messages.py | 135 ++++++++++++++ src/config.py | 137 ++++++++++++++ src/imap/__init__.py | 5 + src/imap/client.py | 265 ++++++++++++++++++++++++++++ src/imap/idle.py | 219 +++++++++++++++++++++++ src/imap/parser.py | 240 +++++++++++++++++++++++++ src/main.py | 211 ++++++++++++++++++++++ src/models.py | 114 ++++++++++++ src/triage/__init__.py | 7 + src/triage/l1.py | 160 +++++++++++++++++ src/triage/l2.py | 116 ++++++++++++ src/triage/l3.py | 132 ++++++++++++++ src/triage/pipeline.py | 237 +++++++++++++++++++++++++ systemd/mail-agent.service | 16 ++ 29 files changed, 2882 insertions(+) create mode 120000 .venv/bin/python create mode 120000 .venv/bin/python3 create mode 120000 .venv/bin/python3.12 create mode 120000 .venv/lib64 create mode 100644 .venv/pyvenv.cfg create mode 100644 README.md create mode 100644 SPEC.md create mode 100644 config.yaml create mode 100644 requirements.txt create mode 100644 src/__init__.py create mode 100644 src/actions/__init__.py create mode 100644 src/actions/unsubscribe.py create mode 100644 src/api/__init__.py create mode 100644 src/api/accounts.py create mode 100644 src/api/events.py create mode 100644 src/api/messages.py create mode 100644 src/config.py create mode 100644 src/imap/__init__.py create mode 100644 src/imap/client.py create mode 100644 src/imap/idle.py create mode 100644 src/imap/parser.py create mode 100644 src/main.py create mode 100644 src/models.py create mode 100644 src/triage/__init__.py create mode 100644 src/triage/l1.py create mode 100644 src/triage/l2.py create mode 100644 src/triage/l3.py create mode 100644 src/triage/pipeline.py create mode 100644 systemd/mail-agent.service diff --git a/.venv/bin/python b/.venv/bin/python new file mode 120000 index 0000000..b8a0adb --- /dev/null +++ b/.venv/bin/python @@ -0,0 +1 @@ +python3 \ No newline at end of file diff --git a/.venv/bin/python3 b/.venv/bin/python3 new file mode 120000 index 0000000..ae65fda --- /dev/null +++ b/.venv/bin/python3 @@ -0,0 +1 @@ +/usr/bin/python3 \ No newline at end of file diff --git a/.venv/bin/python3.12 b/.venv/bin/python3.12 new file mode 120000 index 0000000..b8a0adb --- /dev/null +++ b/.venv/bin/python3.12 @@ -0,0 +1 @@ +python3 \ No newline at end of file diff --git a/.venv/lib64 b/.venv/lib64 new file mode 120000 index 0000000..7951405 --- /dev/null +++ b/.venv/lib64 @@ -0,0 +1 @@ +lib \ No newline at end of file diff --git a/.venv/pyvenv.cfg b/.venv/pyvenv.cfg new file mode 100644 index 0000000..95b5cdd --- /dev/null +++ b/.venv/pyvenv.cfg @@ -0,0 +1,5 @@ +home = /usr/bin +include-system-site-packages = false +version = 3.12.3 +executable = /usr/bin/python3.12 +command = /usr/bin/python3 -m venv /home/johan/dev/mail-agent/.venv diff --git a/README.md b/README.md new file mode 100644 index 0000000..50095e0 --- /dev/null +++ b/README.md @@ -0,0 +1,152 @@ +# Mail Agent + +IMAP-based email triage with multi-tier escalation. + +## Overview + +Mail Agent monitors IMAP accounts for new mail and automatically triages messages: + +- **L1 (Cheap Model):** Fast classification using Fireworks llama-v3p1-8b + - Spam → delete + - Newsletter/receipt → archive + - Shipping → dashboard + archive + - Uncertain → escalate to L2 + +- **L2 (James/Opus):** Review via Clawdbot Gateway + - Full context review + - Can draft replies or escalate + +- **L3 (Johan):** Signal notification + - Important stuff only + - Human decision required + +## Quick Start + +```bash +# Create virtual environment +python3 -m venv .venv +source .venv/bin/activate + +# Install dependencies +pip install -r requirements.txt + +# Set environment variables +export FIREWORKS_API_KEY=your-api-key +export PROTON_BRIDGE_PASSWORD=BlcMCKtNDfqv0cq1LmGR9g + +# Run +python -m src.main +``` + +## Configuration + +Edit `config.yaml`: + +```yaml +server: + host: 127.0.0.1 + port: 8025 + +accounts: + proton: + host: 127.0.0.1 + port: 1143 + username: tj@jongsma.me + password: ${PROTON_BRIDGE_PASSWORD} + tls: starttls + folders: + watch: [INBOX] + archive: Archive + +triage: + enabled: true + l1: + provider: fireworks + model: accounts/fireworks/models/llama-v3p1-8b-instruct + api_key: ${FIREWORKS_API_KEY} +``` + +## API Endpoints + +### Accounts +- `GET /accounts` - List all configured accounts +- `GET /accounts/{id}` - Get account details +- `GET /accounts/{id}/mailboxes` - List folders + +### Messages +- `GET /accounts/{id}/messages?folder=INBOX&unread=true` - List messages +- `GET /accounts/{id}/messages/{uid}?folder=INBOX` - Get full message +- `PATCH /accounts/{id}/messages/{uid}` - Update flags/move +- `DELETE /accounts/{id}/messages/{uid}` - Delete message + +### Events (SSE) +- `GET /accounts/{id}/events?folder=INBOX` - Subscribe to new mail events + +## Systemd Service + +```bash +# Copy service file +cp systemd/mail-agent.service ~/.config/systemd/user/ + +# Edit to add FIREWORKS_API_KEY +systemctl --user edit mail-agent.service + +# Enable and start +systemctl --user enable mail-agent +systemctl --user start mail-agent + +# Check status +systemctl --user status mail-agent +journalctl --user -u mail-agent -f +``` + +## Architecture + +``` +New Mail (IMAP IDLE) + │ + ▼ +┌─────────────────────┐ +│ L1: Cheap Model │ ~$0.20/1M tokens +│ Fast classification│ +└─────────────────────┘ + │ + ▼ (uncertain/important) +┌─────────────────────┐ +│ L2: James (Opus) │ via Gateway +│ Context review │ +└─────────────────────┘ + │ + ▼ (needs human) +┌─────────────────────┐ +│ L3: Johan │ Signal notification +└─────────────────────┘ +``` + +## Shipping Dashboard + +Shipping notifications are automatically posted to the James Dashboard: + +``` +POST http://100.123.216.65:9200/api/news +{ + "title": "📦 E3-1275 Server", + "body": "Picked up by UPS. Expected Feb 3rd.", + "type": "info", + "source": "shipping" +} +``` + +## Development + +```bash +# Run with auto-reload +uvicorn src.main:app --reload --host 127.0.0.1 --port 8025 + +# Test IMAP connection +python -c "from src.imap import ImapClient; from src.config import get_config; c=get_config(); client=ImapClient('proton', c.accounts['proton']); client.connect(); print(client.list_mailboxes())" +``` + +## License + +Proprietary - Johan Jongsma diff --git a/SPEC.md b/SPEC.md new file mode 100644 index 0000000..aaf951b --- /dev/null +++ b/SPEC.md @@ -0,0 +1,353 @@ +# Mail Agent Specification + +## Overview +IMAP-based email triage agent with multi-tier escalation. Runs as a service, processes incoming mail, auto-handles obvious cases, escalates uncertain ones. + +## Architecture + +``` +New Mail (IMAP IDLE push) + │ + ▼ +┌─────────────────────────┐ +│ L1: Cheap Model │ Fireworks llama-v3p1-8b-instruct +│ - Spam → delete │ +│ - Newsletter → archive │ +│ - Receipt → archive │ +│ - Obvious junk → gone │ +│ - Uncertain → L2 │ +└─────────────────────────┘ + │ + ▼ +┌─────────────────────────┐ +│ L2: James (Opus) │ via Gateway +│ - Review context │ +│ - Draft reply? │ +│ - Handle or escalate │ +└─────────────────────────┘ + │ + ▼ +┌─────────────────────────┐ +│ L3: Johan │ Signal notification +│ - Important stuff │ +│ - Needs human decision │ +└─────────────────────────┘ +``` + +## Accounts (Multi-account support) + +First account: +- **ID:** proton +- **Host:** 127.0.0.1 +- **IMAP Port:** 1143 +- **Username:** tj@jongsma.me +- **Password:** BlcMCKtNDfqv0cq1LmGR9g +- **TLS:** STARTTLS + +## REST API + +### Accounts +``` +GET /accounts # List all configured accounts +POST /accounts # Add account +DELETE /accounts/{id} # Remove account +``` + +### Mailboxes +``` +GET /accounts/{id}/mailboxes # List folders +``` + +### Messages +``` +GET /accounts/{id}/messages + ?folder=INBOX + &unread=true + &from=sender@example.com # Search by sender + &limit=50 + &offset=0 + +GET /accounts/{id}/messages/{uid}?folder=INBOX + # Returns full message with body, attachments list + +PATCH /accounts/{id}/messages/{uid}?folder=INBOX + Body: {"seen": true} # Mark as read + Body: {"seen": false} # Mark as unread + Body: {"folder": "Archive"} # Move to folder + +DELETE /accounts/{id}/messages/{uid}?folder=INBOX + # Delete message +``` + +### Actions +``` +POST /accounts/{id}/unsubscribe/{uid}?folder=INBOX + # Find unsubscribe link in email, execute it +``` + +### Push Events (IMAP IDLE) +``` +GET /accounts/{id}/events?folder=INBOX + # SSE stream, emits on new mail: + # data: {"type": "new", "uid": 123, "from": "...", "subject": "..."} +``` + +## Triage Pipeline + +### L1 Triage (Cheap Model) +- **Model:** Fireworks `accounts/fireworks/models/llama-v3p1-8b-instruct` +- **Cost:** ~$0.20/1M tokens (very cheap) + +**L1 Prompt:** +``` +Classify this email. Respond with JSON only. + +From: {from} +Subject: {subject} +Preview: {first 500 chars} + +Categories: +- spam: Obvious spam, phishing, scams +- newsletter: Marketing, newsletters, promotions +- receipt: Order confirmations, invoices (not shipping) +- shipping: Shipping/delivery updates (picked up, in transit, delivered) +- notification: Automated notifications (GitHub, services) +- personal: From a real person, needs attention +- important: Urgent, financial, legal, medical +- uncertain: Not sure, needs human review + +For shipping emails, also extract: +- carrier: UPS, FedEx, USPS, DHL, etc. +- status: ordered, picked_up, in_transit, out_for_delivery, delivered +- item: Brief description of what's being shipped +- expected_date: Expected delivery date (if available) + +Response format: +{"category": "...", "confidence": 0.0-1.0, "reason": "brief reason"} + +For shipping: +{"category": "shipping", "confidence": 0.9, "reason": "...", + "shipping": {"carrier": "UPS", "status": "picked_up", "item": "E3-1275 Server", "expected_date": "2026-02-03"}} +``` + +**L1 Actions:** +| Category | Confidence > 0.8 | Confidence < 0.8 | +|----------|------------------|------------------| +| spam | Delete | → L2 | +| newsletter | Archive | → L2 | +| receipt | Archive + label | → L2 | +| shipping | → Dashboard + Archive | → L2 | +| notification | Archive | → L2 | +| personal | → L2 | → L2 | +| important | → L2 + flag | → L2 + flag | +| uncertain | → L2 | → L2 | + +### Shipping Dashboard Integration + +**Dashboard API:** http://100.123.216.65:9200 (James Dashboard) + +When shipping email detected: +1. POST to `/api/news` with shipping update +2. Archive the email +3. Track status in local state + +**Dashboard payload:** +```json +POST /api/news +{ + "title": "📦 E3-1275 Server", + "body": "Picked up by UPS. Expected Feb 3rd.", + "type": "info", + "source": "shipping", + "url": null +} +``` + +**Status progression:** +- `ordered` → "Order confirmed" +- `picked_up` → "Picked up by {carrier}" +- `in_transit` → "In transit" +- `out_for_delivery` → "Out for delivery" +- `delivered` → "Delivered ✓" + +**Auto-cleanup:** +- When status = `delivered`, set a flag +- Next day, DELETE the news item from dashboard +- Track shipments in local JSON: `~/.config/mail-agent/shipments.json` + +### L2 Triage (James/Opus) +- Receives escalated emails via Gateway hook +- Full context review +- Can: archive, delete, draft reply, escalate to Johan + +### L3 Escalation (Johan) +- Uses existing Clawdbot Signal integration (no hardcoded number needed) +- POST to gateway, let it route to Johan via Signal +- Summary of what needs attention +- Only for actually important stuff + +**Escalation via Gateway:** +``` +POST to gateway → routes to Johan's Signal via existing channel config +``` + +## Configuration + +```yaml +# config.yaml +server: + host: 127.0.0.1 + port: 8025 + +accounts: + proton: + host: 127.0.0.1 + port: 1143 + username: tj@jongsma.me + password: BlcMCKtNDfqv0cq1LmGR9g + tls: starttls + folders: + watch: [INBOX] + archive: Archive + spam: Spam + +triage: + enabled: true + l1: + provider: fireworks + model: accounts/fireworks/models/llama-v3p1-8b-instruct + api_key: ${FIREWORKS_API_KEY} + l2: + gateway_url: http://localhost:18080 # or however gateway is reached + # Hook mechanism TBD + l3: + gateway_url: http://localhost:18080 # Uses existing Signal integration + + shipping: + dashboard_url: http://100.123.216.65:9200 + auto_cleanup_days: 1 # Remove from dashboard 1 day after delivered + + rules: + always_escalate_from: + - "*@inou.com" + - "*@kaseya.com" + auto_archive_from: + - "*@github.com" + - "noreply@*" + auto_delete_from: + - known-spam-domains.txt +``` + +## Tech Stack +- **Language:** Python 3.11+ +- **Framework:** FastAPI +- **IMAP:** imapclient (IDLE support) +- **Async:** asyncio + anyio +- **LLM:** httpx for Fireworks API + +## Files Structure +``` +mail-agent/ +├── SPEC.md # This file +├── README.md # Usage docs +├── config.yaml # Configuration +├── requirements.txt # Dependencies +├── src/ +│ ├── __init__.py +│ ├── main.py # FastAPI app entry +│ ├── config.py # Config loading +│ ├── models.py # Pydantic models +│ ├── imap/ +│ │ ├── __init__.py +│ │ ├── client.py # IMAP connection +│ │ ├── idle.py # IDLE push handler +│ │ └── parser.py # Email parsing +│ ├── api/ +│ │ ├── __init__.py +│ │ ├── accounts.py # Account endpoints +│ │ ├── messages.py # Message endpoints +│ │ └── events.py # SSE endpoint +│ ├── triage/ +│ │ ├── __init__.py +│ │ ├── l1.py # L1 cheap model triage +│ │ ├── l2.py # L2 escalation to James +│ │ └── l3.py # L3 escalation to Johan +│ └── actions/ +│ ├── __init__.py +│ └── unsubscribe.py # Unsubscribe handler +├── systemd/ +│ └── mail-agent.service +└── tests/ + └── ... +``` + +## Systemd Service +```ini +[Unit] +Description=Mail Agent +After=network.target protonmail-bridge.service + +[Service] +Type=simple +User=johan +WorkingDirectory=/home/johan/dev/mail-agent +ExecStart=/home/johan/dev/mail-agent/.venv/bin/python -m src.main +Restart=always +RestartSec=10 +Environment=FIREWORKS_API_KEY=... + +[Install] +WantedBy=default.target +``` + +## Environment Variables +- `FIREWORKS_API_KEY` — For L1 model +- `MAIL_AGENT_CONFIG` — Path to config.yaml (default: ./config.yaml) + +## Shipment Tracking State +Stored in `~/.config/mail-agent/shipments.json`: +```json +{ + "shipments": [ + { + "id": "ups-1234567890", + "carrier": "UPS", + "item": "E3-1275 Server", + "status": "picked_up", + "expected_date": "2026-02-03", + "dashboard_news_id": "abc123", + "last_updated": "2026-01-30T22:00:00Z", + "delivered_at": null + } + ] +} +``` +- On new shipping email: upsert shipment, update dashboard +- On delivered: set `delivered_at`, schedule cleanup +- Cleanup job: delete from dashboard after 1 day + +## First Account (Pre-configured) +The Proton Bridge account is already running: +- Service: `systemctl --user status protonmail-bridge` +- IMAP: 127.0.0.1:1143 +- Account: tj@jongsma.me +- Bridge password: BlcMCKtNDfqv0cq1LmGR9g + +## Open Questions +1. **Gateway hook mechanism:** How does L2/L3 escalation reach James/Johan? POST to gateway? Will check gateway code for webhook/hook endpoint. +2. ~~**Johan's Signal number:**~~ RESOLVED: Use existing Clawdbot Signal integration via gateway +3. ~~**Fireworks API key:**~~ RESOLVED: Available in env + +## Build Checklist +- [ ] Project scaffold +- [ ] Config loading +- [ ] IMAP client with IDLE +- [ ] REST API endpoints +- [ ] L1 triage with Fireworks +- [ ] Shipping detection + dashboard integration +- [ ] Shipment tracking state + auto-cleanup +- [ ] L2 escalation hook (gateway) +- [ ] L3 escalation via gateway → Signal +- [ ] Systemd service +- [ ] Test with Proton account +- [ ] README documentation diff --git a/config.yaml b/config.yaml new file mode 100644 index 0000000..9228074 --- /dev/null +++ b/config.yaml @@ -0,0 +1,39 @@ +server: + host: 127.0.0.1 + port: 8025 + +accounts: + proton: + host: 127.0.0.1 + port: 1143 + username: tj@jongsma.me + password: ${PROTON_BRIDGE_PASSWORD} + tls: starttls + folders: + watch: [INBOX] + archive: Archive + spam: Spam + +triage: + enabled: true + l1: + provider: fireworks + model: accounts/fireworks/models/llama-v3p1-8b-instruct + api_key: ${FIREWORKS_API_KEY} + l2: + gateway_url: http://localhost:18080 + l3: + gateway_url: http://localhost:18080 + + shipping: + dashboard_url: http://100.123.216.65:9200 + auto_cleanup_days: 1 + + rules: + always_escalate_from: + - "*@inou.com" + - "*@kaseya.com" + auto_archive_from: + - "*@github.com" + - "noreply@*" + auto_delete_from: [] diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..10b7088 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,22 @@ +# Core +fastapi>=0.109.0 +uvicorn[standard]>=0.27.0 +pydantic>=2.5.0 +pydantic-settings>=2.1.0 +pyyaml>=6.0 + +# IMAP +imapclient>=3.0.0 +python-dotenv>=1.0.0 + +# HTTP client (for LLM API calls) +httpx>=0.26.0 + +# SSE support +sse-starlette>=2.0.0 + +# Email parsing +email-validator>=2.1.0 + +# Async +anyio>=4.2.0 diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..8ea4359 --- /dev/null +++ b/src/__init__.py @@ -0,0 +1 @@ +# Mail Agent - IMAP-based email triage with multi-tier escalation diff --git a/src/actions/__init__.py b/src/actions/__init__.py new file mode 100644 index 0000000..c6ec4b7 --- /dev/null +++ b/src/actions/__init__.py @@ -0,0 +1,4 @@ +"""Action handlers.""" +from .unsubscribe import find_unsubscribe_link, execute_unsubscribe + +__all__ = ["find_unsubscribe_link", "execute_unsubscribe"] diff --git a/src/actions/unsubscribe.py b/src/actions/unsubscribe.py new file mode 100644 index 0000000..09982ae --- /dev/null +++ b/src/actions/unsubscribe.py @@ -0,0 +1,125 @@ +"""Unsubscribe action handler.""" +import logging +import re +from typing import Optional +from urllib.parse import urlparse + +import httpx + +from ..models import Message + +logger = logging.getLogger(__name__) + + +def find_unsubscribe_link(message: Message) -> Optional[str]: + """Find unsubscribe link in an email message. + + Checks: + 1. List-Unsubscribe header (TODO: needs raw headers) + 2. HTML body for common unsubscribe patterns + 3. Text body for unsubscribe URLs + """ + # Search patterns + patterns = [ + r'href=["\']?(https?://[^"\'>\s]*unsubscribe[^"\'>\s]*)["\']?', + r'href=["\']?(https?://[^"\'>\s]*optout[^"\'>\s]*)["\']?', + r'href=["\']?(https?://[^"\'>\s]*opt-out[^"\'>\s]*)["\']?', + r'href=["\']?(https?://[^"\'>\s]*remove[^"\'>\s]*)["\']?', + r'(https?://[^\s<>"]*unsubscribe[^\s<>"]*)', + r'(https?://[^\s<>"]*optout[^\s<>"]*)', + ] + + # Search in HTML body first + if message.body_html: + for pattern in patterns: + matches = re.findall(pattern, message.body_html, re.IGNORECASE) + if matches: + url = matches[0] + if _is_valid_unsubscribe_url(url): + return url + + # Search in text body + if message.body_text: + for pattern in patterns: + matches = re.findall(pattern, message.body_text, re.IGNORECASE) + if matches: + url = matches[0] + if _is_valid_unsubscribe_url(url): + return url + + return None + + +def _is_valid_unsubscribe_url(url: str) -> bool: + """Validate that a URL looks like a legitimate unsubscribe link.""" + try: + parsed = urlparse(url) + + # Must be HTTP(S) + if parsed.scheme not in ("http", "https"): + return False + + # Must have a host + if not parsed.netloc: + return False + + # Reject obvious non-unsubscribe URLs + suspicious = ["login", "password", "account", "download"] + for term in suspicious: + if term in url.lower() and "unsubscribe" not in url.lower(): + return False + + return True + except Exception: + return False + + +async def execute_unsubscribe(url: str) -> tuple[bool, str]: + """Execute an unsubscribe action by visiting the URL. + + Returns (success, message). + """ + try: + async with httpx.AsyncClient( + timeout=30.0, + follow_redirects=True, + headers={ + "User-Agent": "Mozilla/5.0 (compatible; MailAgent/1.0)", + }, + ) as client: + response = await client.get(url) + + # Check for success indicators + if response.status_code == 200: + content = response.text.lower() + + # Look for success messages + success_indicators = [ + "unsubscribed", + "removed", + "successfully", + "you have been", + "no longer", + ] + + for indicator in success_indicators: + if indicator in content: + logger.info(f"Unsubscribe successful: {url}") + return True, "Successfully unsubscribed" + + # If we got 200 but no clear success message, assume it worked + # (many unsubscribe pages just say "done" or redirect) + logger.info(f"Unsubscribe completed (no confirmation): {url}") + return True, "Unsubscribe request sent" + + else: + logger.warning(f"Unsubscribe failed: {response.status_code} for {url}") + return False, f"HTTP {response.status_code}" + + except httpx.TimeoutException: + logger.error(f"Unsubscribe timeout: {url}") + return False, "Request timed out" + + except Exception as e: + logger.error(f"Unsubscribe error: {e}") + return False, str(e) diff --git a/src/api/__init__.py b/src/api/__init__.py new file mode 100644 index 0000000..81843cf --- /dev/null +++ b/src/api/__init__.py @@ -0,0 +1,6 @@ +"""API module.""" +from .accounts import router as accounts_router +from .messages import router as messages_router +from .events import router as events_router + +__all__ = ["accounts_router", "messages_router", "events_router"] diff --git a/src/api/accounts.py b/src/api/accounts.py new file mode 100644 index 0000000..b083499 --- /dev/null +++ b/src/api/accounts.py @@ -0,0 +1,101 @@ +"""Account management API endpoints.""" +from fastapi import APIRouter, HTTPException +from pydantic import BaseModel +from typing import Optional + +from ..config import get_config, AccountConfig +from ..models import Account, Mailbox +from ..imap import ImapClient + +router = APIRouter(prefix="/accounts", tags=["accounts"]) + + +class AccountCreate(BaseModel): + id: str + host: str + port: int = 993 + username: str + password: str + tls: str = "ssl" + + +@router.get("") +async def list_accounts() -> list[Account]: + """List all configured accounts.""" + config = get_config() + accounts = [] + + for account_id, acc_config in config.accounts.items(): + # Try to connect to check status + connected = False + last_error = None + + try: + client = ImapClient(account_id, acc_config) + client.connect() + client.disconnect() + connected = True + except Exception as e: + last_error = str(e) + + accounts.append(Account( + id=account_id, + host=acc_config.host, + port=acc_config.port, + username=acc_config.username, + tls=acc_config.tls, + connected=connected, + last_error=last_error, + )) + + return accounts + + +@router.get("/{account_id}") +async def get_account(account_id: str) -> Account: + """Get account details.""" + config = get_config() + + if account_id not in config.accounts: + raise HTTPException(status_code=404, detail="Account not found") + + acc_config = config.accounts[account_id] + + connected = False + last_error = None + + try: + client = ImapClient(account_id, acc_config) + client.connect() + client.disconnect() + connected = True + except Exception as e: + last_error = str(e) + + return Account( + id=account_id, + host=acc_config.host, + port=acc_config.port, + username=acc_config.username, + tls=acc_config.tls, + connected=connected, + last_error=last_error, + ) + + +@router.get("/{account_id}/mailboxes") +async def list_mailboxes(account_id: str) -> list[Mailbox]: + """List mailboxes/folders for an account.""" + config = get_config() + + if account_id not in config.accounts: + raise HTTPException(status_code=404, detail="Account not found") + + acc_config = config.accounts[account_id] + + try: + client = ImapClient(account_id, acc_config) + with client.session(): + return client.list_mailboxes() + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) diff --git a/src/api/events.py b/src/api/events.py new file mode 100644 index 0000000..10f5ef6 --- /dev/null +++ b/src/api/events.py @@ -0,0 +1,76 @@ +"""Server-Sent Events for real-time mail notifications.""" +import asyncio +import json +import logging +from typing import AsyncGenerator + +from fastapi import APIRouter, HTTPException, Query +from sse_starlette.sse import EventSourceResponse + +from ..config import get_config +from ..models import NewMailEvent +from ..imap.idle import IdleHandler + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/accounts/{account_id}/events", tags=["events"]) + + +@router.get("") +async def subscribe_events( + account_id: str, + folder: str = Query(default="INBOX"), +) -> EventSourceResponse: + """Subscribe to real-time mail events via SSE.""" + config = get_config() + + if account_id not in config.accounts: + raise HTTPException(status_code=404, detail="Account not found") + + acc_config = config.accounts[account_id] + + async def event_generator() -> AsyncGenerator[dict, None]: + """Generate SSE events from IMAP IDLE.""" + queue: asyncio.Queue[NewMailEvent] = asyncio.Queue() + + def on_new_mail(acc_id: str, event: NewMailEvent) -> None: + if acc_id == account_id: + try: + queue.put_nowait(event) + except asyncio.QueueFull: + pass + + handler = IdleHandler(account_id, acc_config, on_new_mail) + + try: + await handler.start(folder) + + # Send initial connection event + yield { + "event": "connected", + "data": json.dumps({ + "account": account_id, + "folder": folder, + }), + } + + # Stream events + while True: + try: + # Wait for events with timeout (keepalive) + event = await asyncio.wait_for(queue.get(), timeout=30.0) + yield { + "event": "message", + "data": event.model_dump_json(by_alias=True), + } + except asyncio.TimeoutError: + # Send keepalive + yield { + "event": "ping", + "data": "keepalive", + } + + finally: + await handler.stop() + + return EventSourceResponse(event_generator()) diff --git a/src/api/messages.py b/src/api/messages.py new file mode 100644 index 0000000..2a628b6 --- /dev/null +++ b/src/api/messages.py @@ -0,0 +1,135 @@ +"""Message management API endpoints.""" +from fastapi import APIRouter, HTTPException, Query +from typing import Optional + +from ..config import get_config +from ..models import Message, MessageUpdate +from ..imap import ImapClient + +router = APIRouter(prefix="/accounts/{account_id}/messages", tags=["messages"]) + + +@router.get("") +async def list_messages( + account_id: str, + folder: str = Query(default="INBOX"), + unread: bool = Query(default=False), + from_addr: Optional[str] = Query(default=None, alias="from"), + limit: int = Query(default=50, le=200), + offset: int = Query(default=0), +) -> list[Message]: + """List messages in a folder.""" + config = get_config() + + if account_id not in config.accounts: + raise HTTPException(status_code=404, detail="Account not found") + + acc_config = config.accounts[account_id] + + try: + client = ImapClient(account_id, acc_config) + with client.session(): + # Search for messages + uids = client.search( + folder=folder, + unread_only=unread, + from_addr=from_addr, + ) + + # Apply pagination + total = len(uids) + uids = uids[offset:offset + limit] + + if not uids: + return [] + + # Fetch messages (envelope only) + return client.fetch_messages(uids, folder, body=False) + + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + + +@router.get("/{uid}") +async def get_message( + account_id: str, + uid: int, + folder: str = Query(default="INBOX"), +) -> Message: + """Get a single message with full body.""" + config = get_config() + + if account_id not in config.accounts: + raise HTTPException(status_code=404, detail="Account not found") + + acc_config = config.accounts[account_id] + + try: + client = ImapClient(account_id, acc_config) + with client.session(): + msg = client.fetch_message(uid, folder, body=True) + if msg is None: + raise HTTPException(status_code=404, detail="Message not found") + return msg + + except HTTPException: + raise + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + + +@router.patch("/{uid}") +async def update_message( + account_id: str, + uid: int, + update: MessageUpdate, + folder: str = Query(default="INBOX"), +) -> dict: + """Update message flags or move to another folder.""" + config = get_config() + + if account_id not in config.accounts: + raise HTTPException(status_code=404, detail="Account not found") + + acc_config = config.accounts[account_id] + + try: + client = ImapClient(account_id, acc_config) + with client.session(): + if update.seen is not None: + client.mark_seen(uid, folder, update.seen) + + if update.flagged is not None: + client.mark_flagged(uid, folder, update.flagged) + + if update.folder is not None and update.folder != folder: + client.move_message(uid, folder, update.folder) + + return {"status": "updated"} + + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + + +@router.delete("/{uid}") +async def delete_message( + account_id: str, + uid: int, + folder: str = Query(default="INBOX"), +) -> dict: + """Delete a message.""" + config = get_config() + + if account_id not in config.accounts: + raise HTTPException(status_code=404, detail="Account not found") + + acc_config = config.accounts[account_id] + + try: + client = ImapClient(account_id, acc_config) + with client.session(): + client.delete_message(uid, folder) + return {"status": "deleted"} + + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) diff --git a/src/config.py b/src/config.py new file mode 100644 index 0000000..af528db --- /dev/null +++ b/src/config.py @@ -0,0 +1,137 @@ +"""Configuration loading and validation.""" +import os +import re +from pathlib import Path +from typing import Optional + +import yaml +from pydantic import BaseModel, Field + + +class ServerConfig(BaseModel): + host: str = "127.0.0.1" + port: int = 8025 + + +class ImapFolders(BaseModel): + watch: list[str] = Field(default_factory=lambda: ["INBOX"]) + archive: str = "Archive" + spam: str = "Spam" + + +class AccountConfig(BaseModel): + host: str + port: int = 993 + username: str + password: str + tls: str = "ssl" # ssl, starttls, none + folders: ImapFolders = Field(default_factory=ImapFolders) + + +class L1Config(BaseModel): + provider: str = "fireworks" + model: str = "accounts/fireworks/models/llama-v3p1-8b-instruct" + api_key: str = "" + + +class L2Config(BaseModel): + gateway_url: str = "http://localhost:18080" + + +class L3Config(BaseModel): + gateway_url: str = "http://localhost:18080" + + +class ShippingConfig(BaseModel): + dashboard_url: str = "http://100.123.216.65:9200" + auto_cleanup_days: int = 1 + + +class TriageRules(BaseModel): + always_escalate_from: list[str] = Field(default_factory=list) + auto_archive_from: list[str] = Field(default_factory=list) + auto_delete_from: list[str] = Field(default_factory=list) + + +class TriageConfig(BaseModel): + enabled: bool = True + l1: L1Config = Field(default_factory=L1Config) + l2: L2Config = Field(default_factory=L2Config) + l3: L3Config = Field(default_factory=L3Config) + shipping: ShippingConfig = Field(default_factory=ShippingConfig) + rules: TriageRules = Field(default_factory=TriageRules) + + +class Config(BaseModel): + server: ServerConfig = Field(default_factory=ServerConfig) + accounts: dict[str, AccountConfig] = Field(default_factory=dict) + triage: TriageConfig = Field(default_factory=TriageConfig) + + +def expand_env_vars(text: str) -> str: + """Expand ${VAR} and $VAR patterns in text.""" + def replacer(match): + var_name = match.group(1) or match.group(2) + return os.environ.get(var_name, match.group(0)) + + # Match ${VAR} or $VAR (not inside quotes for simplicity) + pattern = r'\$\{([^}]+)\}|\$([A-Za-z_][A-Za-z0-9_]*)' + return re.sub(pattern, replacer, text) + + +def load_config(path: Optional[str] = None) -> Config: + """Load configuration from YAML file. + + Searches in order: + 1. Explicit path argument + 2. MAIL_AGENT_CONFIG env var + 3. ./config.yaml + 4. ~/.config/mail-agent/config.yaml + """ + search_paths = [] + + if path: + search_paths.append(Path(path)) + + if env_path := os.environ.get("MAIL_AGENT_CONFIG"): + search_paths.append(Path(env_path)) + + search_paths.extend([ + Path("config.yaml"), + Path.home() / ".config" / "mail-agent" / "config.yaml", + ]) + + config_path = None + for p in search_paths: + if p.exists(): + config_path = p + break + + if config_path is None: + # Return default config + return Config() + + # Load and expand env vars + raw = config_path.read_text() + expanded = expand_env_vars(raw) + data = yaml.safe_load(expanded) + + return Config.model_validate(data) + + +# Global config instance +_config: Optional[Config] = None + + +def get_config() -> Config: + """Get the global config instance.""" + global _config + if _config is None: + _config = load_config() + return _config + + +def set_config(config: Config) -> None: + """Set the global config instance (for testing).""" + global _config + _config = config diff --git a/src/imap/__init__.py b/src/imap/__init__.py new file mode 100644 index 0000000..93898d2 --- /dev/null +++ b/src/imap/__init__.py @@ -0,0 +1,5 @@ +"""IMAP client module.""" +from .client import ImapClient +from .parser import parse_message + +__all__ = ["ImapClient", "parse_message"] diff --git a/src/imap/client.py b/src/imap/client.py new file mode 100644 index 0000000..cbe8fc8 --- /dev/null +++ b/src/imap/client.py @@ -0,0 +1,265 @@ +"""IMAP client wrapper with connection management.""" +import logging +import ssl +from contextlib import contextmanager +from typing import Optional, Generator + +from imapclient import IMAPClient + +from ..config import AccountConfig +from ..models import Mailbox, Message, MessageFlags +from .parser import parse_message, parse_envelope + +logger = logging.getLogger(__name__) + + +class ImapClient: + """Thread-safe IMAP client wrapper.""" + + def __init__(self, account_id: str, config: AccountConfig): + self.account_id = account_id + self.config = config + self._client: Optional[IMAPClient] = None + self._current_folder: Optional[str] = None + + def connect(self) -> None: + """Establish IMAP connection.""" + if self._client is not None: + return + + # Determine SSL context + ssl_context = None + use_ssl = self.config.tls == "ssl" + + if self.config.tls in ("ssl", "starttls"): + ssl_context = ssl.create_default_context() + # Allow self-signed certs for local Proton Bridge + ssl_context.check_hostname = False + ssl_context.verify_mode = ssl.CERT_NONE + + logger.info(f"Connecting to {self.config.host}:{self.config.port} (SSL={use_ssl})") + + self._client = IMAPClient( + self.config.host, + port=self.config.port, + ssl=use_ssl, + ssl_context=ssl_context if use_ssl else None, + ) + + if self.config.tls == "starttls": + self._client.starttls(ssl_context) + + self._client.login(self.config.username, self.config.password) + logger.info(f"Logged in as {self.config.username}") + + def disconnect(self) -> None: + """Close IMAP connection.""" + if self._client is not None: + try: + self._client.logout() + except Exception: + pass + self._client = None + self._current_folder = None + + def is_connected(self) -> bool: + """Check if connected.""" + return self._client is not None + + @contextmanager + def session(self) -> Generator["ImapClient", None, None]: + """Context manager for connection lifecycle.""" + try: + self.connect() + yield self + finally: + self.disconnect() + + def _ensure_connected(self) -> IMAPClient: + """Ensure we have an active connection.""" + if self._client is None: + self.connect() + return self._client + + def list_mailboxes(self) -> list[Mailbox]: + """List all mailboxes/folders.""" + client = self._ensure_connected() + folders = client.list_folders() + + result = [] + for flags, delimiter, name in folders: + # Get message counts if possible + status = {} + try: + status = client.folder_status(name, ["MESSAGES", "UNSEEN"]) + except Exception: + pass + + result.append(Mailbox( + name=name, + delimiter=delimiter or "/", + flags=[str(f) for f in flags], + message_count=status.get(b"MESSAGES", 0), + unread_count=status.get(b"UNSEEN", 0), + )) + + return result + + def select_folder(self, folder: str = "INBOX") -> dict: + """Select a folder for operations.""" + client = self._ensure_connected() + self._current_folder = folder + return client.select_folder(folder) + + def search( + self, + folder: str = "INBOX", + unread_only: bool = False, + from_addr: Optional[str] = None, + subject: Optional[str] = None, + since: Optional[str] = None, + ) -> list[int]: + """Search for messages matching criteria.""" + client = self._ensure_connected() + + if self._current_folder != folder: + self.select_folder(folder) + + criteria = ["ALL"] + + if unread_only: + criteria = ["UNSEEN"] + + if from_addr: + criteria.extend(["FROM", from_addr]) + + if subject: + criteria.extend(["SUBJECT", subject]) + + if since: + criteria.extend(["SINCE", since]) + + return client.search(criteria) + + def fetch_message( + self, + uid: int, + folder: str = "INBOX", + body: bool = True, + ) -> Optional[Message]: + """Fetch a single message by UID.""" + client = self._ensure_connected() + + if self._current_folder != folder: + self.select_folder(folder) + + # Determine what to fetch + if body: + data_items = ["ENVELOPE", "FLAGS", "BODY[]", "BODYSTRUCTURE"] + else: + data_items = ["ENVELOPE", "FLAGS"] + + response = client.fetch([uid], data_items) + + if uid not in response: + return None + + msg_data = response[uid] + return parse_message(uid, folder, msg_data) + + def fetch_messages( + self, + uids: list[int], + folder: str = "INBOX", + body: bool = False, + ) -> list[Message]: + """Fetch multiple messages (envelope only by default).""" + if not uids: + return [] + + client = self._ensure_connected() + + if self._current_folder != folder: + self.select_folder(folder) + + data_items = ["ENVELOPE", "FLAGS"] + if body: + data_items.extend(["BODY[]", "BODYSTRUCTURE"]) + + response = client.fetch(uids, data_items) + + messages = [] + for uid, msg_data in response.items(): + msg = parse_message(uid, folder, msg_data) + if msg: + messages.append(msg) + + return messages + + def mark_seen(self, uid: int, folder: str = "INBOX", seen: bool = True) -> None: + """Mark a message as seen/unseen.""" + client = self._ensure_connected() + + if self._current_folder != folder: + self.select_folder(folder) + + if seen: + client.add_flags([uid], [b"\\Seen"]) + else: + client.remove_flags([uid], [b"\\Seen"]) + + def mark_flagged(self, uid: int, folder: str = "INBOX", flagged: bool = True) -> None: + """Mark a message as flagged/unflagged.""" + client = self._ensure_connected() + + if self._current_folder != folder: + self.select_folder(folder) + + if flagged: + client.add_flags([uid], [b"\\Flagged"]) + else: + client.remove_flags([uid], [b"\\Flagged"]) + + def move_message(self, uid: int, from_folder: str, to_folder: str) -> None: + """Move a message to another folder.""" + client = self._ensure_connected() + + if self._current_folder != from_folder: + self.select_folder(from_folder) + + # Try MOVE command first, fall back to copy+delete + try: + client.move([uid], to_folder) + except Exception: + client.copy([uid], to_folder) + client.delete_messages([uid]) + client.expunge() + + def delete_message(self, uid: int, folder: str = "INBOX") -> None: + """Delete a message permanently.""" + client = self._ensure_connected() + + if self._current_folder != folder: + self.select_folder(folder) + + client.delete_messages([uid]) + client.expunge() + + def idle_start(self, folder: str = "INBOX", timeout: int = 300) -> None: + """Start IDLE mode for push notifications.""" + client = self._ensure_connected() + + if self._current_folder != folder: + self.select_folder(folder) + + client.idle() + + def idle_check(self, timeout: Optional[int] = None) -> list: + """Check for IDLE responses.""" + client = self._ensure_connected() + return client.idle_check(timeout=timeout) + + def idle_done(self) -> list: + """Exit IDLE mode and get final responses.""" + client = self._ensure_connected() + return client.idle_done() diff --git a/src/imap/idle.py b/src/imap/idle.py new file mode 100644 index 0000000..d2323ab --- /dev/null +++ b/src/imap/idle.py @@ -0,0 +1,219 @@ +"""IMAP IDLE handler for push notifications.""" +import asyncio +import logging +from typing import Callable, Optional + +from ..config import AccountConfig +from ..models import NewMailEvent +from .client import ImapClient + +logger = logging.getLogger(__name__) + + +class IdleHandler: + """Handles IMAP IDLE for real-time mail notifications.""" + + def __init__( + self, + account_id: str, + config: AccountConfig, + on_new_mail: Callable[[str, NewMailEvent], None], + ): + self.account_id = account_id + self.config = config + self.on_new_mail = on_new_mail + self._client: Optional[ImapClient] = None + self._running = False + self._task: Optional[asyncio.Task] = None + + async def start(self, folder: str = "INBOX") -> None: + """Start IDLE monitoring in background.""" + if self._running: + return + + self._running = True + self._task = asyncio.create_task(self._idle_loop(folder)) + logger.info(f"Started IDLE handler for {self.account_id}/{folder}") + + async def stop(self) -> None: + """Stop IDLE monitoring.""" + self._running = False + + if self._client: + try: + self._client.idle_done() + self._client.disconnect() + except Exception: + pass + self._client = None + + if self._task: + self._task.cancel() + try: + await self._task + except asyncio.CancelledError: + pass + self._task = None + + logger.info(f"Stopped IDLE handler for {self.account_id}") + + async def _idle_loop(self, folder: str) -> None: + """Main IDLE loop - reconnects on errors.""" + reconnect_delay = 5 + max_delay = 300 + + while self._running: + try: + await self._do_idle(folder) + reconnect_delay = 5 # Reset on success + except asyncio.CancelledError: + raise + except Exception as e: + logger.error(f"IDLE error for {self.account_id}: {e}") + + if self._client: + try: + self._client.disconnect() + except Exception: + pass + self._client = None + + if self._running: + logger.info(f"Reconnecting in {reconnect_delay}s...") + await asyncio.sleep(reconnect_delay) + reconnect_delay = min(reconnect_delay * 2, max_delay) + + async def _do_idle(self, folder: str) -> None: + """Perform IDLE and process responses.""" + self._client = ImapClient(self.account_id, self.config) + self._client.connect() + + # Get initial message count + folder_info = self._client.select_folder(folder) + last_count = folder_info.get(b"EXISTS", 0) + logger.info(f"IDLE: {self.account_id}/{folder} has {last_count} messages") + + while self._running: + # Start IDLE + self._client.idle_start(folder) + + try: + # Wait for responses (with timeout for periodic reconnect) + responses = await asyncio.to_thread( + self._client.idle_check, + timeout=300 # 5 minute timeout + ) + + # Process responses + for response in responses: + if isinstance(response, tuple) and len(response) >= 2: + count, event = response + if event == b"EXISTS" and isinstance(count, int): + if count > last_count: + # New message(s) arrived + await self._handle_new_messages( + folder, last_count + 1, count + ) + last_count = count + finally: + # Always exit IDLE mode + try: + self._client.idle_done() + except Exception: + pass + + async def _handle_new_messages( + self, + folder: str, + start_seq: int, + end_seq: int, + ) -> None: + """Handle newly arrived messages.""" + logger.info(f"New messages: {start_seq}-{end_seq} in {folder}") + + # Fetch the new messages + # Note: sequence numbers, not UIDs + try: + # Search for recent messages + uids = self._client.search(folder, unread_only=True) + + # Fetch just the envelopes + if uids: + messages = self._client.fetch_messages( + uids[-10:], # Last 10 unread + folder, + body=False, + ) + + for msg in messages: + event = NewMailEvent( + type="new", + uid=msg.uid, + **{"from": msg.from_}, + subject=msg.subject, + folder=folder, + ) + self.on_new_mail(self.account_id, event) + except Exception as e: + logger.error(f"Error fetching new messages: {e}") + + +class IdleManager: + """Manages IDLE handlers for multiple accounts/folders.""" + + def __init__(self): + self._handlers: dict[str, IdleHandler] = {} + self._callbacks: list[Callable[[str, NewMailEvent], None]] = [] + + def add_callback(self, callback: Callable[[str, NewMailEvent], None]) -> None: + """Register a callback for new mail events.""" + self._callbacks.append(callback) + + def _on_new_mail(self, account_id: str, event: NewMailEvent) -> None: + """Dispatch new mail event to all callbacks.""" + for callback in self._callbacks: + try: + callback(account_id, event) + except Exception as e: + logger.error(f"Callback error: {e}") + + async def start_watching( + self, + account_id: str, + config: AccountConfig, + folders: list[str], + ) -> None: + """Start watching folders on an account.""" + for folder in folders: + key = f"{account_id}:{folder}" + if key not in self._handlers: + handler = IdleHandler( + account_id, + config, + self._on_new_mail, + ) + self._handlers[key] = handler + await handler.start(folder) + + async def stop_watching(self, account_id: str, folder: Optional[str] = None) -> None: + """Stop watching an account or specific folder.""" + keys_to_remove = [] + + for key, handler in self._handlers.items(): + if folder: + if key == f"{account_id}:{folder}": + await handler.stop() + keys_to_remove.append(key) + else: + if key.startswith(f"{account_id}:"): + await handler.stop() + keys_to_remove.append(key) + + for key in keys_to_remove: + del self._handlers[key] + + async def stop_all(self) -> None: + """Stop all IDLE handlers.""" + for handler in self._handlers.values(): + await handler.stop() + self._handlers.clear() diff --git a/src/imap/parser.py b/src/imap/parser.py new file mode 100644 index 0000000..b9d6dd6 --- /dev/null +++ b/src/imap/parser.py @@ -0,0 +1,240 @@ +"""Email message parsing utilities.""" +import email +import logging +import re +from datetime import datetime +from email.header import decode_header +from email.utils import parsedate_to_datetime +from typing import Optional, Any + +from ..models import Message, MessageFlags + +logger = logging.getLogger(__name__) + + +def decode_header_value(value: Any) -> str: + """Decode an email header value.""" + if value is None: + return "" + + if isinstance(value, bytes): + value = value.decode("utf-8", errors="replace") + + if isinstance(value, str): + # Try to decode RFC 2047 encoded words + try: + decoded_parts = decode_header(value) + result_parts = [] + for data, charset in decoded_parts: + if isinstance(data, bytes): + charset = charset or "utf-8" + result_parts.append(data.decode(charset, errors="replace")) + else: + result_parts.append(data) + return "".join(result_parts) + except Exception: + return value + + return str(value) + + +def parse_address(addr: Any) -> str: + """Parse an email address from ENVELOPE structure.""" + if addr is None: + return "" + + # ENVELOPE address format: (name, route, mailbox, host) + if isinstance(addr, (list, tuple)) and len(addr) >= 4: + name = decode_header_value(addr[0]) + mailbox = decode_header_value(addr[2]) + host = decode_header_value(addr[3]) + + if mailbox and host: + email_addr = f"{mailbox}@{host}" + if name: + return f"{name} <{email_addr}>" + return email_addr + + return str(addr) + + +def parse_address_list(addrs: Any) -> list[str]: + """Parse a list of addresses.""" + if addrs is None: + return [] + + if isinstance(addrs, (list, tuple)): + return [parse_address(a) for a in addrs if a] + + return [] + + +def parse_envelope(envelope: Any) -> dict: + """Parse an IMAP ENVELOPE structure.""" + if envelope is None: + return {} + + # ENVELOPE format: + # (date, subject, from, sender, reply-to, to, cc, bcc, in-reply-to, message-id) + result = {} + + try: + if len(envelope) >= 1: + result["date"] = decode_header_value(envelope[0]) + if len(envelope) >= 2: + result["subject"] = decode_header_value(envelope[1]) + if len(envelope) >= 3: + from_list = parse_address_list(envelope[2]) + result["from"] = from_list[0] if from_list else "" + if len(envelope) >= 6: + result["to"] = parse_address_list(envelope[5]) + if len(envelope) >= 7: + result["cc"] = parse_address_list(envelope[6]) + if len(envelope) >= 10: + result["message_id"] = decode_header_value(envelope[9]) + except Exception as e: + logger.warning(f"Error parsing envelope: {e}") + + return result + + +def parse_flags(flags: Any) -> MessageFlags: + """Parse IMAP FLAGS.""" + if flags is None: + return MessageFlags() + + flags_set = {str(f).lower() for f in flags} + + return MessageFlags( + seen=b"\\seen" in {f.lower() if isinstance(f, bytes) else f.lower().encode() for f in flags} or "\\seen" in flags_set, + flagged=b"\\flagged" in {f.lower() if isinstance(f, bytes) else f.lower().encode() for f in flags} or "\\flagged" in flags_set, + answered=b"\\answered" in {f.lower() if isinstance(f, bytes) else f.lower().encode() for f in flags} or "\\answered" in flags_set, + deleted=b"\\deleted" in {f.lower() if isinstance(f, bytes) else f.lower().encode() for f in flags} or "\\deleted" in flags_set, + ) + + +def extract_text_body(msg: email.message.Message, max_length: int = 5000) -> tuple[str, str]: + """Extract text and HTML bodies from an email message.""" + text_body = "" + html_body = "" + + if msg.is_multipart(): + for part in msg.walk(): + content_type = part.get_content_type() + content_disposition = str(part.get("Content-Disposition", "")) + + # Skip attachments + if "attachment" in content_disposition: + continue + + if content_type == "text/plain" and not text_body: + try: + payload = part.get_payload(decode=True) + if payload: + charset = part.get_content_charset() or "utf-8" + text_body = payload.decode(charset, errors="replace")[:max_length] + except Exception: + pass + + elif content_type == "text/html" and not html_body: + try: + payload = part.get_payload(decode=True) + if payload: + charset = part.get_content_charset() or "utf-8" + html_body = payload.decode(charset, errors="replace")[:max_length] + except Exception: + pass + else: + content_type = msg.get_content_type() + try: + payload = msg.get_payload(decode=True) + if payload: + charset = msg.get_content_charset() or "utf-8" + decoded = payload.decode(charset, errors="replace")[:max_length] + if content_type == "text/html": + html_body = decoded + else: + text_body = decoded + except Exception: + pass + + return text_body, html_body + + +def extract_attachments(msg: email.message.Message) -> list[str]: + """Extract attachment filenames.""" + attachments = [] + + if not msg.is_multipart(): + return attachments + + for part in msg.walk(): + content_disposition = str(part.get("Content-Disposition", "")) + + if "attachment" in content_disposition: + filename = part.get_filename() + if filename: + attachments.append(decode_header_value(filename)) + + return attachments + + +def parse_message(uid: int, folder: str, msg_data: dict) -> Optional[Message]: + """Parse IMAP fetch response into a Message object.""" + try: + # Parse envelope + envelope = msg_data.get(b"ENVELOPE") + env_data = parse_envelope(envelope) + + # Parse flags + flags = parse_flags(msg_data.get(b"FLAGS")) + + # Parse date + msg_date = None + if date_str := env_data.get("date"): + try: + msg_date = parsedate_to_datetime(date_str) + except Exception: + pass + + # Parse body if present + body_text = "" + body_html = "" + body_preview = "" + attachments = [] + + if raw_body := msg_data.get(b"BODY[]"): + try: + msg = email.message_from_bytes(raw_body) + body_text, body_html = extract_text_body(msg) + attachments = extract_attachments(msg) + + # Create preview from text body + if body_text: + body_preview = re.sub(r"\s+", " ", body_text[:500]).strip() + elif body_html: + # Strip HTML tags for preview + clean = re.sub(r"<[^>]+>", " ", body_html) + body_preview = re.sub(r"\s+", " ", clean[:500]).strip() + except Exception as e: + logger.warning(f"Error parsing body: {e}") + + return Message( + uid=uid, + folder=folder, + message_id=env_data.get("message_id"), + **{"from": env_data.get("from", "")}, + to=env_data.get("to", []), + cc=env_data.get("cc", []), + subject=env_data.get("subject"), + date=msg_date, + flags=flags, + body_preview=body_preview, + body_text=body_text, + body_html=body_html, + has_attachments=bool(attachments), + attachment_names=attachments, + ) + except Exception as e: + logger.error(f"Error parsing message {uid}: {e}") + return None diff --git a/src/main.py b/src/main.py new file mode 100644 index 0000000..e8f776f --- /dev/null +++ b/src/main.py @@ -0,0 +1,211 @@ +"""Mail Agent - IMAP-based email triage with multi-tier escalation.""" +import asyncio +import logging +import sys +from contextlib import asynccontextmanager +from typing import Optional + +from fastapi import FastAPI +from fastapi.middleware.cors import CORSMiddleware + +from .config import load_config, get_config, Config +from .api import accounts_router, messages_router, events_router +from .imap import ImapClient +from .imap.idle import IdleManager +from .triage import TriagePipeline +from .models import NewMailEvent + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", +) +logger = logging.getLogger(__name__) + +# Global state +idle_manager: Optional[IdleManager] = None +triage_pipeline: Optional[TriagePipeline] = None +imap_clients: dict[str, ImapClient] = {} + + +def get_imap_client(account_id: str) -> ImapClient: + """Get or create an IMAP client for an account.""" + if account_id not in imap_clients: + config = get_config() + if account_id not in config.accounts: + raise ValueError(f"Unknown account: {account_id}") + imap_clients[account_id] = ImapClient(account_id, config.accounts[account_id]) + return imap_clients[account_id] + + +def on_archive(account_id: str, uid: int, folder: str) -> None: + """Archive a message.""" + try: + config = get_config() + archive_folder = config.accounts[account_id].folders.archive + + client = get_imap_client(account_id) + client.connect() + client.move_message(uid, folder, archive_folder) + logger.info(f"Archived message {uid} to {archive_folder}") + except Exception as e: + logger.error(f"Archive failed: {e}") + + +def on_delete(account_id: str, uid: int, folder: str) -> None: + """Delete a message.""" + try: + client = get_imap_client(account_id) + client.connect() + client.delete_message(uid, folder) + logger.info(f"Deleted message {uid}") + except Exception as e: + logger.error(f"Delete failed: {e}") + + +async def on_new_mail(account_id: str, event: NewMailEvent) -> None: + """Handle new mail event from IDLE.""" + global triage_pipeline + + logger.info(f"New mail: {event.from_} - {event.subject}") + + if triage_pipeline is None: + return + + try: + # Fetch full message + client = get_imap_client(account_id) + client.connect() + message = client.fetch_message(event.uid, event.folder, body=True) + + if message: + # Run through triage pipeline + await triage_pipeline.process(account_id, message) + + except Exception as e: + logger.error(f"Triage error: {e}") + + +def on_new_mail_sync(account_id: str, event: NewMailEvent) -> None: + """Sync wrapper for async on_new_mail.""" + try: + loop = asyncio.get_running_loop() + asyncio.create_task(on_new_mail(account_id, event)) + except RuntimeError: + # No running loop, run synchronously + asyncio.run(on_new_mail(account_id, event)) + + +@asynccontextmanager +async def lifespan(app: FastAPI): + """Application lifespan handler.""" + global idle_manager, triage_pipeline + + logger.info("Starting Mail Agent...") + config = get_config() + + # Initialize triage pipeline if enabled + if config.triage.enabled: + triage_pipeline = TriagePipeline( + config.triage, + on_archive=on_archive, + on_delete=on_delete, + ) + logger.info("Triage pipeline initialized") + + # Initialize IDLE manager + idle_manager = IdleManager() + idle_manager.add_callback(on_new_mail_sync) + + # Start watching configured accounts + for account_id, acc_config in config.accounts.items(): + try: + await idle_manager.start_watching( + account_id, + acc_config, + acc_config.folders.watch, + ) + logger.info(f"Watching {account_id}: {acc_config.folders.watch}") + except Exception as e: + logger.error(f"Failed to watch {account_id}: {e}") + + yield + + # Cleanup + logger.info("Shutting down Mail Agent...") + + if idle_manager: + await idle_manager.stop_all() + + if triage_pipeline: + await triage_pipeline.close() + + for client in imap_clients.values(): + try: + client.disconnect() + except Exception: + pass + + logger.info("Mail Agent stopped") + + +# Create FastAPI app +app = FastAPI( + title="Mail Agent", + description="IMAP-based email triage with multi-tier escalation", + version="0.1.0", + lifespan=lifespan, +) + +# Add CORS middleware +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + +# Include routers +app.include_router(accounts_router) +app.include_router(messages_router) +app.include_router(events_router) + + +@app.get("/") +async def root(): + """Root endpoint.""" + return { + "name": "Mail Agent", + "version": "0.1.0", + "status": "running", + } + + +@app.get("/health") +async def health(): + """Health check endpoint.""" + config = get_config() + return { + "status": "healthy", + "accounts": list(config.accounts.keys()), + "triage_enabled": config.triage.enabled, + } + + +def main(): + """Entry point for running as a module.""" + import uvicorn + + config = load_config() + + uvicorn.run( + "src.main:app", + host=config.server.host, + port=config.server.port, + reload=False, + ) + + +if __name__ == "__main__": + main() diff --git a/src/models.py b/src/models.py new file mode 100644 index 0000000..ede36eb --- /dev/null +++ b/src/models.py @@ -0,0 +1,114 @@ +"""Pydantic models for the mail agent.""" +from datetime import datetime +from enum import Enum +from typing import Optional + +from pydantic import BaseModel, Field + + +class EmailCategory(str, Enum): + SPAM = "spam" + NEWSLETTER = "newsletter" + RECEIPT = "receipt" + SHIPPING = "shipping" + NOTIFICATION = "notification" + PERSONAL = "personal" + IMPORTANT = "important" + UNCERTAIN = "uncertain" + + +class ShippingStatus(str, Enum): + ORDERED = "ordered" + PICKED_UP = "picked_up" + IN_TRANSIT = "in_transit" + OUT_FOR_DELIVERY = "out_for_delivery" + DELIVERED = "delivered" + + +class ShippingInfo(BaseModel): + carrier: Optional[str] = None + status: Optional[ShippingStatus] = None + item: Optional[str] = None + expected_date: Optional[str] = None + tracking_number: Optional[str] = None + + +class TriageResult(BaseModel): + category: EmailCategory + confidence: float = Field(ge=0.0, le=1.0) + reason: str + shipping: Optional[ShippingInfo] = None + + +class MessageFlags(BaseModel): + seen: bool = False + flagged: bool = False + answered: bool = False + deleted: bool = False + + +class Message(BaseModel): + uid: int + folder: str + message_id: Optional[str] = None + from_: str = Field(alias="from") + to: list[str] = Field(default_factory=list) + cc: list[str] = Field(default_factory=list) + subject: Optional[str] = None + date: Optional[datetime] = None + flags: MessageFlags = Field(default_factory=MessageFlags) + body_preview: Optional[str] = None + body_text: Optional[str] = None + body_html: Optional[str] = None + has_attachments: bool = False + attachment_names: list[str] = Field(default_factory=list) + + class Config: + populate_by_name = True + + +class MessageUpdate(BaseModel): + seen: Optional[bool] = None + flagged: Optional[bool] = None + folder: Optional[str] = None + + +class Account(BaseModel): + id: str + host: str + port: int + username: str + tls: str + connected: bool = False + last_error: Optional[str] = None + + +class Mailbox(BaseModel): + name: str + delimiter: str = "/" + flags: list[str] = Field(default_factory=list) + message_count: int = 0 + unread_count: int = 0 + + +class NewMailEvent(BaseModel): + type: str = "new" + uid: int + from_: str = Field(alias="from") + subject: Optional[str] = None + folder: str = "INBOX" + + class Config: + populate_by_name = True + + +class Shipment(BaseModel): + id: str + carrier: str + item: str + status: ShippingStatus + expected_date: Optional[str] = None + tracking_number: Optional[str] = None + dashboard_news_id: Optional[str] = None + last_updated: datetime = Field(default_factory=datetime.utcnow) + delivered_at: Optional[datetime] = None diff --git a/src/triage/__init__.py b/src/triage/__init__.py new file mode 100644 index 0000000..6bd306f --- /dev/null +++ b/src/triage/__init__.py @@ -0,0 +1,7 @@ +"""Email triage module.""" +from .l1 import L1Triage +from .l2 import L2Escalation +from .l3 import L3Escalation +from .pipeline import TriagePipeline + +__all__ = ["L1Triage", "L2Escalation", "L3Escalation", "TriagePipeline"] diff --git a/src/triage/l1.py b/src/triage/l1.py new file mode 100644 index 0000000..ffcb335 --- /dev/null +++ b/src/triage/l1.py @@ -0,0 +1,160 @@ +"""L1 Triage - Cheap model classification.""" +import json +import logging +from typing import Optional + +import httpx + +from ..config import L1Config +from ..models import EmailCategory, Message, ShippingInfo, ShippingStatus, TriageResult + +logger = logging.getLogger(__name__) + +L1_PROMPT = """Classify this email. Respond with JSON only. + +From: {from_addr} +Subject: {subject} +Preview: {preview} + +Categories: +- spam: Obvious spam, phishing, scams +- newsletter: Marketing, newsletters, promotions +- receipt: Order confirmations, invoices (not shipping) +- shipping: Shipping/delivery updates (picked up, in transit, delivered) +- notification: Automated notifications (GitHub, services) +- personal: From a real person, needs attention +- important: Urgent, financial, legal, medical +- uncertain: Not sure, needs human review + +For shipping emails, also extract: +- carrier: UPS, FedEx, USPS, DHL, etc. +- status: ordered, picked_up, in_transit, out_for_delivery, delivered +- item: Brief description of what's being shipped +- expected_date: Expected delivery date (if available) + +Response format: +{{"category": "...", "confidence": 0.0-1.0, "reason": "brief reason"}} + +For shipping: +{{"category": "shipping", "confidence": 0.9, "reason": "...", + "shipping": {{"carrier": "UPS", "status": "picked_up", "item": "E3-1275 Server", "expected_date": "2026-02-03"}}}}""" + + +class L1Triage: + """Level 1 triage using cheap LLM.""" + + def __init__(self, config: L1Config): + self.config = config + self._client: Optional[httpx.AsyncClient] = None + + async def _get_client(self) -> httpx.AsyncClient: + if self._client is None: + self._client = httpx.AsyncClient(timeout=30.0) + return self._client + + async def close(self) -> None: + if self._client: + await self._client.aclose() + self._client = None + + async def classify(self, message: Message) -> TriageResult: + """Classify a message using the L1 model.""" + prompt = L1_PROMPT.format( + from_addr=message.from_, + subject=message.subject or "(no subject)", + preview=message.body_preview[:500] if message.body_preview else "(no preview)", + ) + + try: + response = await self._call_llm(prompt) + return self._parse_response(response) + except Exception as e: + logger.error(f"L1 classification error: {e}") + return TriageResult( + category=EmailCategory.UNCERTAIN, + confidence=0.0, + reason=f"Classification failed: {e}", + ) + + async def _call_llm(self, prompt: str) -> str: + """Call the Fireworks API.""" + client = await self._get_client() + + url = "https://api.fireworks.ai/inference/v1/chat/completions" + headers = { + "Authorization": f"Bearer {self.config.api_key}", + "Content-Type": "application/json", + } + payload = { + "model": self.config.model, + "messages": [ + {"role": "user", "content": prompt} + ], + "max_tokens": 500, + "temperature": 0.1, # Low temp for consistent classification + } + + response = await client.post(url, headers=headers, json=payload) + response.raise_for_status() + + data = response.json() + return data["choices"][0]["message"]["content"] + + def _parse_response(self, response: str) -> TriageResult: + """Parse the LLM response into a TriageResult.""" + # Try to extract JSON from response + try: + # Find JSON in response (may have extra text) + start = response.find("{") + end = response.rfind("}") + 1 + if start >= 0 and end > start: + json_str = response[start:end] + data = json.loads(json_str) + else: + raise ValueError("No JSON found in response") + + # Parse category + category_str = data.get("category", "uncertain").lower() + try: + category = EmailCategory(category_str) + except ValueError: + category = EmailCategory.UNCERTAIN + + # Parse confidence + confidence = float(data.get("confidence", 0.5)) + confidence = max(0.0, min(1.0, confidence)) + + # Parse reason + reason = data.get("reason", "") + + # Parse shipping info if present + shipping = None + if shipping_data := data.get("shipping"): + status_str = shipping_data.get("status", "").lower() + try: + status = ShippingStatus(status_str) if status_str else None + except ValueError: + status = None + + shipping = ShippingInfo( + carrier=shipping_data.get("carrier"), + status=status, + item=shipping_data.get("item"), + expected_date=shipping_data.get("expected_date"), + tracking_number=shipping_data.get("tracking_number"), + ) + + return TriageResult( + category=category, + confidence=confidence, + reason=reason, + shipping=shipping, + ) + + except Exception as e: + logger.warning(f"Failed to parse L1 response: {e}\nResponse: {response}") + return TriageResult( + category=EmailCategory.UNCERTAIN, + confidence=0.3, + reason=f"Parse error: {e}", + ) diff --git a/src/triage/l2.py b/src/triage/l2.py new file mode 100644 index 0000000..035d442 --- /dev/null +++ b/src/triage/l2.py @@ -0,0 +1,116 @@ +"""L2 Escalation - Send to James (Opus) via Gateway.""" +import logging +from typing import Optional + +import httpx + +from ..config import L2Config +from ..models import Message, TriageResult + +logger = logging.getLogger(__name__) + + +class L2Escalation: + """Level 2 escalation to James via Clawdbot Gateway.""" + + def __init__(self, config: L2Config): + self.config = config + self._client: Optional[httpx.AsyncClient] = None + + async def _get_client(self) -> httpx.AsyncClient: + if self._client is None: + self._client = httpx.AsyncClient(timeout=60.0) + return self._client + + async def close(self) -> None: + if self._client: + await self._client.aclose() + self._client = None + + async def escalate( + self, + message: Message, + triage_result: TriageResult, + account_id: str, + ) -> bool: + """Escalate a message to James for review. + + Returns True if escalation was successful. + """ + # Format the escalation message + escalation_text = self._format_escalation(message, triage_result, account_id) + + try: + # Send to gateway + client = await self._get_client() + + # Gateway internal message API + # This hooks into an existing session or creates one + url = f"{self.config.gateway_url}/api/message" + + payload = { + "text": escalation_text, + "source": "mail-agent", + "metadata": { + "type": "email_escalation", + "account": account_id, + "uid": message.uid, + "folder": message.folder, + "l1_category": triage_result.category.value, + "l1_confidence": triage_result.confidence, + }, + } + + response = await client.post(url, json=payload) + + if response.status_code in (200, 201, 202): + logger.info(f"L2 escalation sent for message {message.uid}") + return True + else: + logger.warning( + f"L2 escalation failed: {response.status_code} {response.text}" + ) + return False + + except Exception as e: + logger.error(f"L2 escalation error: {e}") + return False + + def _format_escalation( + self, + message: Message, + triage_result: TriageResult, + account_id: str, + ) -> str: + """Format the escalation message for James.""" + lines = [ + "📧 **Email Review Request**", + "", + f"**From:** {message.from_}", + f"**Subject:** {message.subject or '(no subject)'}", + f"**Account:** {account_id}", + f"**Folder:** {message.folder}", + "", + f"**L1 Triage:** {triage_result.category.value} ({triage_result.confidence:.0%})", + f"**Reason:** {triage_result.reason}", + "", + "**Preview:**", + "```", + (message.body_preview or "(no preview)")[:500], + "```", + "", + "**Actions:**", + "- Reply: Draft a response", + "- Archive: Move to archive", + "- Delete: Delete the message", + "- Escalate: Send to Johan", + ] + + if triage_result.shipping: + ship = triage_result.shipping + lines.insert(8, "") + lines.insert(9, f"**Shipping:** {ship.carrier} - {ship.status}") + if ship.item: + lines.insert(10, f"**Item:** {ship.item}") + + return "\n".join(lines) diff --git a/src/triage/l3.py b/src/triage/l3.py new file mode 100644 index 0000000..19c3292 --- /dev/null +++ b/src/triage/l3.py @@ -0,0 +1,132 @@ +"""L3 Escalation - Send to Johan via Gateway/Signal.""" +import logging +from typing import Optional + +import httpx + +from ..config import L3Config +from ..models import Message, TriageResult + +logger = logging.getLogger(__name__) + + +class L3Escalation: + """Level 3 escalation to Johan via Signal.""" + + def __init__(self, config: L3Config): + self.config = config + self._client: Optional[httpx.AsyncClient] = None + + async def _get_client(self) -> httpx.AsyncClient: + if self._client is None: + self._client = httpx.AsyncClient(timeout=30.0) + return self._client + + async def close(self) -> None: + if self._client: + await self._client.aclose() + self._client = None + + async def escalate( + self, + message: Message, + triage_result: TriageResult, + account_id: str, + james_notes: Optional[str] = None, + ) -> bool: + """Escalate a message to Johan via Signal. + + Returns True if escalation was successful. + """ + # Format the escalation message + escalation_text = self._format_escalation( + message, triage_result, account_id, james_notes + ) + + try: + client = await self._get_client() + + # Use gateway message API to send via Signal + # Gateway routes to the configured Signal channel + url = f"{self.config.gateway_url}/api/message/send" + + payload = { + "channel": "signal", + "message": escalation_text, + # Target: main user (Johan) - gateway knows this + } + + response = await client.post(url, json=payload) + + if response.status_code in (200, 201, 202): + logger.info(f"L3 escalation sent for message {message.uid}") + return True + else: + logger.warning( + f"L3 escalation failed: {response.status_code} {response.text}" + ) + return False + + except Exception as e: + logger.error(f"L3 escalation error: {e}") + return False + + def _format_escalation( + self, + message: Message, + triage_result: TriageResult, + account_id: str, + james_notes: Optional[str], + ) -> str: + """Format the escalation message for Johan.""" + lines = [ + "🚨 **Email Needs Your Attention**", + "", + f"**From:** {message.from_}", + f"**Subject:** {message.subject or '(no subject)'}", + "", + ] + + # Add James's notes if present + if james_notes: + lines.extend([ + "**James says:**", + james_notes, + "", + ]) + + # Add preview + preview = (message.body_preview or "")[:300] + if preview: + lines.extend([ + "**Preview:**", + preview, + "", + ]) + + # Add context + lines.extend([ + f"_Category: {triage_result.category.value} | " + f"Account: {account_id} | " + f"UID: {message.uid}_", + ]) + + return "\n".join(lines) + + async def send_notification(self, text: str) -> bool: + """Send a generic notification to Johan.""" + try: + client = await self._get_client() + url = f"{self.config.gateway_url}/api/message/send" + + payload = { + "channel": "signal", + "message": text, + } + + response = await client.post(url, json=payload) + return response.status_code in (200, 201, 202) + + except Exception as e: + logger.error(f"Notification error: {e}") + return False diff --git a/src/triage/pipeline.py b/src/triage/pipeline.py new file mode 100644 index 0000000..cc7537e --- /dev/null +++ b/src/triage/pipeline.py @@ -0,0 +1,237 @@ +"""Triage pipeline orchestration.""" +import fnmatch +import logging +from typing import Callable, Optional + +import httpx + +from ..config import TriageConfig, ShippingConfig +from ..models import EmailCategory, Message, Shipment, ShippingStatus, TriageResult +from .l1 import L1Triage +from .l2 import L2Escalation +from .l3 import L3Escalation + +logger = logging.getLogger(__name__) + + +class TriagePipeline: + """Orchestrates the multi-tier triage process.""" + + def __init__( + self, + config: TriageConfig, + on_archive: Optional[Callable[[str, int, str], None]] = None, + on_delete: Optional[Callable[[str, int, str], None]] = None, + ): + self.config = config + self.on_archive = on_archive + self.on_delete = on_delete + + self.l1 = L1Triage(config.l1) + self.l2 = L2Escalation(config.l2) + self.l3 = L3Escalation(config.l3) + + self._shipping_client: Optional[httpx.AsyncClient] = None + + async def close(self) -> None: + await self.l1.close() + await self.l2.close() + await self.l3.close() + if self._shipping_client: + await self._shipping_client.aclose() + + async def process(self, account_id: str, message: Message) -> TriageResult: + """Process a message through the triage pipeline.""" + # Check rules first + action = self._check_rules(message) + if action: + return await self._execute_rule_action(account_id, message, action) + + # L1 classification + result = await self.l1.classify(message) + logger.info( + f"L1: {message.uid} -> {result.category.value} " + f"({result.confidence:.0%}): {result.reason}" + ) + + # Decide action based on category and confidence + await self._handle_result(account_id, message, result) + + return result + + def _check_rules(self, message: Message) -> Optional[str]: + """Check if any rules apply to this message.""" + from_addr = message.from_.lower() + + # Always escalate from specific domains + for pattern in self.config.rules.always_escalate_from: + if self._matches_pattern(from_addr, pattern): + return "escalate" + + # Auto-archive from specific senders + for pattern in self.config.rules.auto_archive_from: + if self._matches_pattern(from_addr, pattern): + return "archive" + + # Auto-delete from specific senders + for pattern in self.config.rules.auto_delete_from: + if self._matches_pattern(from_addr, pattern): + return "delete" + + return None + + def _matches_pattern(self, address: str, pattern: str) -> bool: + """Check if an email address matches a pattern.""" + pattern = pattern.lower() + + # Extract just the email part if it has a name + if "<" in address and ">" in address: + start = address.find("<") + 1 + end = address.find(">") + address = address[start:end] + + return fnmatch.fnmatch(address, pattern) + + async def _execute_rule_action( + self, + account_id: str, + message: Message, + action: str, + ) -> TriageResult: + """Execute a rule-based action.""" + result = TriageResult( + category=EmailCategory.NOTIFICATION, + confidence=1.0, + reason=f"Rule: auto-{action}", + ) + + if action == "archive": + if self.on_archive: + self.on_archive(account_id, message.uid, message.folder) + logger.info(f"Rule: archived {message.uid}") + + elif action == "delete": + if self.on_delete: + self.on_delete(account_id, message.uid, message.folder) + logger.info(f"Rule: deleted {message.uid}") + + elif action == "escalate": + result.category = EmailCategory.IMPORTANT + result.reason = "Rule: always escalate from this sender" + await self.l2.escalate(message, result, account_id) + + return result + + async def _handle_result( + self, + account_id: str, + message: Message, + result: TriageResult, + ) -> None: + """Handle the L1 classification result.""" + category = result.category + confidence = result.confidence + + # High confidence actions + if confidence >= 0.8: + if category == EmailCategory.SPAM: + if self.on_delete: + self.on_delete(account_id, message.uid, message.folder) + logger.info(f"Deleted spam: {message.uid}") + return + + elif category == EmailCategory.NEWSLETTER: + if self.on_archive: + self.on_archive(account_id, message.uid, message.folder) + logger.info(f"Archived newsletter: {message.uid}") + return + + elif category == EmailCategory.RECEIPT: + if self.on_archive: + self.on_archive(account_id, message.uid, message.folder) + logger.info(f"Archived receipt: {message.uid}") + return + + elif category == EmailCategory.SHIPPING: + await self._handle_shipping(account_id, message, result) + if self.on_archive: + self.on_archive(account_id, message.uid, message.folder) + logger.info(f"Processed shipping: {message.uid}") + return + + elif category == EmailCategory.NOTIFICATION: + if self.on_archive: + self.on_archive(account_id, message.uid, message.folder) + logger.info(f"Archived notification: {message.uid}") + return + + # Escalate to L2 for: + # - Personal/important messages + # - Low confidence classifications + # - Uncertain category + if category in (EmailCategory.PERSONAL, EmailCategory.IMPORTANT): + await self.l2.escalate(message, result, account_id) + logger.info(f"Escalated to L2: {message.uid} ({category.value})") + + elif category == EmailCategory.UNCERTAIN or confidence < 0.8: + await self.l2.escalate(message, result, account_id) + logger.info(f"Escalated to L2 (uncertain): {message.uid}") + + # Flag important messages + if category == EmailCategory.IMPORTANT: + # TODO: Flag the message in IMAP + pass + + async def _handle_shipping( + self, + account_id: str, + message: Message, + result: TriageResult, + ) -> None: + """Handle a shipping notification.""" + if not result.shipping: + return + + ship = result.shipping + + # Post to dashboard + await self._post_shipping_to_dashboard(ship) + + # Track shipment state (TODO: implement state file) + logger.info( + f"Shipping: {ship.carrier} - {ship.status} - {ship.item}" + ) + + async def _post_shipping_to_dashboard(self, ship) -> None: + """Post shipping update to James Dashboard.""" + if not self._shipping_client: + self._shipping_client = httpx.AsyncClient(timeout=10.0) + + try: + url = f"{self.config.shipping.dashboard_url}/api/news" + + # Format status message + status_text = { + ShippingStatus.ORDERED: "Order confirmed", + ShippingStatus.PICKED_UP: f"Picked up by {ship.carrier}", + ShippingStatus.IN_TRANSIT: "In transit", + ShippingStatus.OUT_FOR_DELIVERY: "Out for delivery", + ShippingStatus.DELIVERED: "Delivered ✓", + }.get(ship.status, str(ship.status)) + + if ship.expected_date: + status_text += f". Expected {ship.expected_date}" + + payload = { + "title": f"📦 {ship.item or 'Package'}", + "body": status_text, + "type": "info" if ship.status != ShippingStatus.DELIVERED else "success", + "source": "shipping", + } + + response = await self._shipping_client.post(url, json=payload) + if response.status_code not in (200, 201): + logger.warning(f"Dashboard post failed: {response.status_code}") + + except Exception as e: + logger.error(f"Dashboard post error: {e}") diff --git a/systemd/mail-agent.service b/systemd/mail-agent.service new file mode 100644 index 0000000..9ce6720 --- /dev/null +++ b/systemd/mail-agent.service @@ -0,0 +1,16 @@ +[Unit] +Description=Mail Agent - IMAP triage service +After=network.target protonmail-bridge.service + +[Service] +Type=simple +User=johan +WorkingDirectory=/home/johan/dev/mail-agent +ExecStart=/home/johan/dev/mail-agent/.venv/bin/python -m src.main +Restart=always +RestartSec=10 +Environment=FIREWORKS_API_KEY= +Environment=PROTON_BRIDGE_PASSWORD=BlcMCKtNDfqv0cq1LmGR9g + +[Install] +WantedBy=default.target