Simplify: remove L1/L2/L3 triage, webhook to main session

All intelligence now lives in OpenClaw (James). Mail Agent is just:
- IMAP API (list/read/move/delete)
- IMAP IDLE monitoring
- Webhook POST to /hooks/mail

Removed:
- L1 Fireworks llama triage
- L2/L3 escalation logic
- Hardcoded shipping detection
- Rule-based routing
- All AI logic

974 lines deleted, complexity moved to the right place.
This commit is contained in:
James 2026-01-31 12:18:04 +00:00
parent 7994c4b4a7
commit 36637c0d70
11 changed files with 106 additions and 975 deletions

115
README.md
View File

@ -1,24 +1,15 @@
# Mail Agent
IMAP-based email triage with multi-tier escalation.
Simple IMAP/SMTP API with webhook notifications for OpenClaw.
## Overview
Mail Agent monitors IMAP accounts for new mail and automatically triages messages:
Mail Agent is a thin layer over IMAP that:
1. Monitors mailboxes via IMAP IDLE (push notifications)
2. Provides a REST API for reading/moving/deleting messages
3. POSTs new mail to OpenClaw webhook for processing
- **L1 (Cheap Model):** Fast classification using Fireworks llama-v3p1-8b
- Spam → delete
- Newsletter/receipt → archive
- Shipping → dashboard + archive
- Uncertain → escalate to L2
- **L2 (James/Opus):** Review via Clawdbot Gateway
- Full context review
- Can draft replies or escalate
- **L3 (Johan):** Signal notification
- Important stuff only
- Human decision required
**All intelligence lives in OpenClaw.** Mail Agent is just the pipe.
## Quick Start
@ -30,10 +21,6 @@ source .venv/bin/activate
# Install dependencies
pip install -r requirements.txt
# Set environment variables
export FIREWORKS_API_KEY=your-api-key
export PROTON_BRIDGE_PASSWORD=BlcMCKtNDfqv0cq1LmGR9g
# Run
python -m src.main
```
@ -58,44 +45,54 @@ accounts:
watch: [INBOX]
archive: Archive
triage:
webhook:
enabled: true
l1:
provider: fireworks
model: accounts/fireworks/models/llama-v3p1-8b-instruct
api_key: ${FIREWORKS_API_KEY}
url: http://localhost:18789/hooks/mail
token: kuma-alert-token-2026
```
## API Endpoints
### Health
- `GET /health` - Health check
### Accounts
- `GET /accounts` - List all configured accounts
- `GET /accounts/{id}` - Get account details
- `GET /accounts/{id}/mailboxes` - List folders
### Messages
- `GET /accounts/{id}/messages?folder=INBOX&unread=true` - List messages
- `GET /accounts/{id}/messages?folder=INBOX&limit=20` - List messages
- `GET /accounts/{id}/messages/{uid}?folder=INBOX` - Get full message
- `PATCH /accounts/{id}/messages/{uid}` - Update flags/move
- `DELETE /accounts/{id}/messages/{uid}` - Delete message
- `DELETE /accounts/{id}/messages/{uid}?folder=INBOX` - Delete message
### Events (SSE)
- `GET /accounts/{id}/events?folder=INBOX` - Subscribe to new mail events
- `GET /events?accounts=proton` - Subscribe to new mail events
## Webhook Payload
When new mail arrives, POSTs to webhook:
```json
{
"account": "proton",
"uid": 12345,
"folder": "INBOX",
"from": "sender@example.com",
"to": ["recipient@example.com"],
"subject": "Hello",
"date": "2026-01-31T10:00:00Z",
"preview": "First 1000 chars...",
"body": "Full text up to 10KB",
"has_attachments": false,
"attachment_names": []
}
```
## Systemd Service
```bash
# Copy service file
cp systemd/mail-agent.service ~/.config/systemd/user/
# Edit to add FIREWORKS_API_KEY
systemctl --user edit mail-agent.service
# Enable and start
systemctl --user enable mail-agent
systemctl --user start mail-agent
# Check status
systemctl --user restart mail-agent
systemctl --user status mail-agent
journalctl --user -u mail-agent -f
```
@ -107,44 +104,16 @@ New Mail (IMAP IDLE)
┌─────────────────────┐
L1: Cheap Model │ ~$0.20/1M tokens
Fast classification
POST to webhook │
(raw email data)
└─────────────────────┘
(uncertain/important)
┌─────────────────────┐
│ L2: James (Opus) │ via Gateway
│ Context review │
│ OpenClaw (James) │ ← All decisions here
│ Archive/Delete/ │
│ Reply/Escalate │
└─────────────────────┘
▼ (needs human)
┌─────────────────────┐
│ L3: Johan │ Signal notification
└─────────────────────┘
```
## Shipping Dashboard
Shipping notifications are automatically posted to the James Dashboard:
```
POST http://100.123.216.65:9200/api/news
{
"title": "📦 E3-1275 Server",
"body": "Picked up by UPS. Expected Feb 3rd.",
"type": "info",
"source": "shipping"
}
```
## Development
```bash
# Run with auto-reload
uvicorn src.main:app --reload --host 127.0.0.1 --port 8025
# Test IMAP connection
python -c "from src.imap import ImapClient; from src.config import get_config; c=get_config(); client=ImapClient('proton', c.accounts['proton']); client.connect(); print(client.list_mailboxes())"
```
## License

View File

@ -14,26 +14,7 @@ accounts:
archive: Archive
spam: Spam
triage:
webhook:
enabled: true
l1:
provider: fireworks
model: accounts/fireworks/models/llama-v3p1-8b-instruct
api_key: ${FIREWORKS_API_KEY}
l2:
gateway_url: http://localhost:18080
l3:
gateway_url: http://localhost:18080
shipping:
dashboard_url: http://100.123.216.65:9200
auto_cleanup_days: 1
rules:
always_escalate_from:
- "*@inou.com"
- "*@kaseya.com"
auto_archive_from:
- "*@github.com"
- "noreply@*"
auto_delete_from: []
url: http://localhost:18789/hooks/mail
token: kuma-alert-token-2026

View File

@ -1,4 +0,0 @@
"""Action handlers."""
from .unsubscribe import find_unsubscribe_link, execute_unsubscribe
__all__ = ["find_unsubscribe_link", "execute_unsubscribe"]

View File

@ -1,125 +0,0 @@
"""Unsubscribe action handler."""
import logging
import re
from typing import Optional
from urllib.parse import urlparse
import httpx
from ..models import Message
logger = logging.getLogger(__name__)
def find_unsubscribe_link(message: Message) -> Optional[str]:
"""Find unsubscribe link in an email message.
Checks:
1. List-Unsubscribe header (TODO: needs raw headers)
2. HTML body for common unsubscribe patterns
3. Text body for unsubscribe URLs
"""
# Search patterns
patterns = [
r'href=["\']?(https?://[^"\'>\s]*unsubscribe[^"\'>\s]*)["\']?',
r'href=["\']?(https?://[^"\'>\s]*optout[^"\'>\s]*)["\']?',
r'href=["\']?(https?://[^"\'>\s]*opt-out[^"\'>\s]*)["\']?',
r'href=["\']?(https?://[^"\'>\s]*remove[^"\'>\s]*)["\']?',
r'(https?://[^\s<>"]*unsubscribe[^\s<>"]*)',
r'(https?://[^\s<>"]*optout[^\s<>"]*)',
]
# Search in HTML body first
if message.body_html:
for pattern in patterns:
matches = re.findall(pattern, message.body_html, re.IGNORECASE)
if matches:
url = matches[0]
if _is_valid_unsubscribe_url(url):
return url
# Search in text body
if message.body_text:
for pattern in patterns:
matches = re.findall(pattern, message.body_text, re.IGNORECASE)
if matches:
url = matches[0]
if _is_valid_unsubscribe_url(url):
return url
return None
def _is_valid_unsubscribe_url(url: str) -> bool:
"""Validate that a URL looks like a legitimate unsubscribe link."""
try:
parsed = urlparse(url)
# Must be HTTP(S)
if parsed.scheme not in ("http", "https"):
return False
# Must have a host
if not parsed.netloc:
return False
# Reject obvious non-unsubscribe URLs
suspicious = ["login", "password", "account", "download"]
for term in suspicious:
if term in url.lower() and "unsubscribe" not in url.lower():
return False
return True
except Exception:
return False
async def execute_unsubscribe(url: str) -> tuple[bool, str]:
"""Execute an unsubscribe action by visiting the URL.
Returns (success, message).
"""
try:
async with httpx.AsyncClient(
timeout=30.0,
follow_redirects=True,
headers={
"User-Agent": "Mozilla/5.0 (compatible; MailAgent/1.0)",
},
) as client:
response = await client.get(url)
# Check for success indicators
if response.status_code == 200:
content = response.text.lower()
# Look for success messages
success_indicators = [
"unsubscribed",
"removed",
"successfully",
"you have been",
"no longer",
]
for indicator in success_indicators:
if indicator in content:
logger.info(f"Unsubscribe successful: {url}")
return True, "Successfully unsubscribed"
# If we got 200 but no clear success message, assume it worked
# (many unsubscribe pages just say "done" or redirect)
logger.info(f"Unsubscribe completed (no confirmation): {url}")
return True, "Unsubscribe request sent"
else:
logger.warning(f"Unsubscribe failed: {response.status_code} for {url}")
return False, f"HTTP {response.status_code}"
except httpx.TimeoutException:
logger.error(f"Unsubscribe timeout: {url}")
return False, "Request timed out"
except Exception as e:
logger.error(f"Unsubscribe error: {e}")
return False, str(e)

View File

@ -28,44 +28,17 @@ class AccountConfig(BaseModel):
folders: ImapFolders = Field(default_factory=ImapFolders)
class L1Config(BaseModel):
provider: str = "fireworks"
model: str = "accounts/fireworks/models/llama-v3p1-8b-instruct"
api_key: str = ""
class L2Config(BaseModel):
gateway_url: str = "http://localhost:18080"
class L3Config(BaseModel):
gateway_url: str = "http://localhost:18080"
class ShippingConfig(BaseModel):
dashboard_url: str = "http://100.123.216.65:9200"
auto_cleanup_days: int = 1
class TriageRules(BaseModel):
always_escalate_from: list[str] = Field(default_factory=list)
auto_archive_from: list[str] = Field(default_factory=list)
auto_delete_from: list[str] = Field(default_factory=list)
class TriageConfig(BaseModel):
class WebhookConfig(BaseModel):
"""Simple webhook config - POST new mail to OpenClaw."""
enabled: bool = True
l1: L1Config = Field(default_factory=L1Config)
l2: L2Config = Field(default_factory=L2Config)
l3: L3Config = Field(default_factory=L3Config)
shipping: ShippingConfig = Field(default_factory=ShippingConfig)
rules: TriageRules = Field(default_factory=TriageRules)
url: str = "http://localhost:18789/hooks/mail"
token: str = "kuma-alert-token-2026"
class Config(BaseModel):
server: ServerConfig = Field(default_factory=ServerConfig)
accounts: dict[str, AccountConfig] = Field(default_factory=dict)
triage: TriageConfig = Field(default_factory=TriageConfig)
webhook: WebhookConfig = Field(default_factory=WebhookConfig)
def expand_env_vars(text: str) -> str:
@ -74,20 +47,12 @@ def expand_env_vars(text: str) -> str:
var_name = match.group(1) or match.group(2)
return os.environ.get(var_name, match.group(0))
# Match ${VAR} or $VAR (not inside quotes for simplicity)
pattern = r'\$\{([^}]+)\}|\$([A-Za-z_][A-Za-z0-9_]*)'
return re.sub(pattern, replacer, text)
def load_config(path: Optional[str] = None) -> Config:
"""Load configuration from YAML file.
Searches in order:
1. Explicit path argument
2. MAIL_AGENT_CONFIG env var
3. ./config.yaml
4. ~/.config/mail-agent/config.yaml
"""
"""Load configuration from YAML file."""
search_paths = []
if path:
@ -108,10 +73,8 @@ def load_config(path: Optional[str] = None) -> Config:
break
if config_path is None:
# Return default config
return Config()
# Load and expand env vars
raw = config_path.read_text()
expanded = expand_env_vars(raw)
data = yaml.safe_load(expanded)
@ -119,7 +82,6 @@ def load_config(path: Optional[str] = None) -> Config:
return Config.model_validate(data)
# Global config instance
_config: Optional[Config] = None

View File

@ -1,18 +1,17 @@
"""Mail Agent - IMAP-based email triage with multi-tier escalation."""
"""Mail Agent - Simple IMAP/SMTP API with webhook notifications."""
import asyncio
import logging
import sys
from contextlib import asynccontextmanager
from typing import Optional
import httpx
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from .config import load_config, get_config, Config
from .config import load_config, get_config
from .api import accounts_router, messages_router, events_router
from .imap import ImapClient
from .imap.idle import IdleManager
from .triage import TriagePipeline
from .models import NewMailEvent
# Configure logging
@ -24,8 +23,8 @@ logger = logging.getLogger(__name__)
# Global state
idle_manager: Optional[IdleManager] = None
triage_pipeline: Optional[TriagePipeline] = None
imap_clients: dict[str, ImapClient] = {}
http_client: Optional[httpx.AsyncClient] = None
def get_imap_client(account_id: str) -> ImapClient:
@ -38,38 +37,15 @@ def get_imap_client(account_id: str) -> ImapClient:
return imap_clients[account_id]
def on_archive(account_id: str, uid: int, folder: str) -> None:
"""Archive a message."""
try:
config = get_config()
archive_folder = config.accounts[account_id].folders.archive
client = get_imap_client(account_id)
client.connect()
client.move_message(uid, folder, archive_folder)
logger.info(f"Archived message {uid} to {archive_folder}")
except Exception as e:
logger.error(f"Archive failed: {e}")
def on_delete(account_id: str, uid: int, folder: str) -> None:
"""Delete a message."""
try:
client = get_imap_client(account_id)
client.connect()
client.delete_message(uid, folder)
logger.info(f"Deleted message {uid}")
except Exception as e:
logger.error(f"Delete failed: {e}")
async def on_new_mail(account_id: str, event: NewMailEvent) -> None:
"""Handle new mail event from IDLE."""
global triage_pipeline
"""Handle new mail - fetch full message and POST to webhook."""
global http_client
config = get_config()
logger.info(f"New mail: {event.from_} - {event.subject}")
if triage_pipeline is None:
if not config.webhook.enabled:
logger.debug("Webhook disabled, skipping")
return
try:
@ -78,41 +54,62 @@ async def on_new_mail(account_id: str, event: NewMailEvent) -> None:
client.connect()
message = client.fetch_message(event.uid, event.folder, body=True)
if message:
# Run through triage pipeline
await triage_pipeline.process(account_id, message)
if not message:
logger.warning(f"Could not fetch message {event.uid}")
return
# POST to webhook
if http_client is None:
http_client = httpx.AsyncClient(timeout=30.0)
payload = {
"account": account_id,
"uid": message.uid,
"folder": message.folder,
"from": message.from_,
"to": message.to,
"subject": message.subject or "(no subject)",
"date": message.date.isoformat() if message.date else None,
"preview": (message.body_preview or "")[:1000],
"body": message.body_text[:10000] if message.body_text else None,
"has_attachments": len(message.attachments) > 0,
"attachment_names": [a.filename for a in message.attachments] if message.attachments else [],
}
headers = {"X-Hook-Token": config.webhook.token}
response = await http_client.post(
config.webhook.url,
json=payload,
headers=headers,
)
if response.status_code in (200, 201, 202):
logger.info(f"Webhook sent: {message.from_} - {message.subject}")
else:
logger.warning(f"Webhook failed: {response.status_code}")
except Exception as e:
logger.error(f"Triage error: {e}")
logger.error(f"Webhook error: {e}")
def on_new_mail_sync(account_id: str, event: NewMailEvent) -> None:
"""Sync wrapper for async on_new_mail."""
try:
loop = asyncio.get_running_loop()
asyncio.get_running_loop()
asyncio.create_task(on_new_mail(account_id, event))
except RuntimeError:
# No running loop, run synchronously
asyncio.run(on_new_mail(account_id, event))
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan handler."""
global idle_manager, triage_pipeline
global idle_manager, http_client
logger.info("Starting Mail Agent...")
config = get_config()
# Initialize triage pipeline if enabled
if config.triage.enabled:
triage_pipeline = TriagePipeline(
config.triage,
on_archive=on_archive,
on_delete=on_delete,
)
logger.info("Triage pipeline initialized")
# Initialize IDLE manager
idle_manager = IdleManager()
idle_manager.add_callback(on_new_mail_sync)
@ -129,6 +126,9 @@ async def lifespan(app: FastAPI):
except Exception as e:
logger.error(f"Failed to watch {account_id}: {e}")
if config.webhook.enabled:
logger.info(f"Webhook enabled: {config.webhook.url}")
yield
# Cleanup
@ -137,8 +137,8 @@ async def lifespan(app: FastAPI):
if idle_manager:
await idle_manager.stop_all()
if triage_pipeline:
await triage_pipeline.close()
if http_client:
await http_client.aclose()
for client in imap_clients.values():
try:
@ -152,8 +152,8 @@ async def lifespan(app: FastAPI):
# Create FastAPI app
app = FastAPI(
title="Mail Agent",
description="IMAP-based email triage with multi-tier escalation",
version="0.1.0",
description="Simple IMAP/SMTP API with webhook notifications",
version="0.2.0",
lifespan=lifespan,
)
@ -177,7 +177,7 @@ async def root():
"""Root endpoint."""
return {
"name": "Mail Agent",
"version": "0.1.0",
"version": "0.2.0",
"status": "running",
}
@ -189,7 +189,7 @@ async def health():
return {
"status": "healthy",
"accounts": list(config.accounts.keys()),
"triage_enabled": config.triage.enabled,
"webhook_enabled": config.webhook.enabled,
}

View File

@ -1,7 +0,0 @@
"""Email triage module."""
from .l1 import L1Triage
from .l2 import L2Escalation
from .l3 import L3Escalation
from .pipeline import TriagePipeline
__all__ = ["L1Triage", "L2Escalation", "L3Escalation", "TriagePipeline"]

View File

@ -1,160 +0,0 @@
"""L1 Triage - Cheap model classification."""
import json
import logging
from typing import Optional
import httpx
from ..config import L1Config
from ..models import EmailCategory, Message, ShippingInfo, ShippingStatus, TriageResult
logger = logging.getLogger(__name__)
L1_PROMPT = """Classify this email. Respond with JSON only.
From: {from_addr}
Subject: {subject}
Preview: {preview}
Categories:
- spam: Obvious spam, phishing, scams
- newsletter: Marketing, newsletters, promotions
- receipt: Order confirmations, invoices (not shipping)
- shipping: Shipping/delivery updates (picked up, in transit, delivered)
- notification: Automated notifications (GitHub, services)
- personal: From a real person, needs attention
- important: Urgent, financial, legal, medical
- uncertain: Not sure, needs human review
For shipping emails, also extract:
- carrier: UPS, FedEx, USPS, DHL, etc.
- status: ordered, picked_up, in_transit, out_for_delivery, delivered
- item: Brief description of what's being shipped
- expected_date: Expected delivery date (if available)
Response format:
{{"category": "...", "confidence": 0.0-1.0, "reason": "brief reason"}}
For shipping:
{{"category": "shipping", "confidence": 0.9, "reason": "...",
"shipping": {{"carrier": "UPS", "status": "picked_up", "item": "E3-1275 Server", "expected_date": "2026-02-03"}}}}"""
class L1Triage:
"""Level 1 triage using cheap LLM."""
def __init__(self, config: L1Config):
self.config = config
self._client: Optional[httpx.AsyncClient] = None
async def _get_client(self) -> httpx.AsyncClient:
if self._client is None:
self._client = httpx.AsyncClient(timeout=30.0)
return self._client
async def close(self) -> None:
if self._client:
await self._client.aclose()
self._client = None
async def classify(self, message: Message) -> TriageResult:
"""Classify a message using the L1 model."""
prompt = L1_PROMPT.format(
from_addr=message.from_,
subject=message.subject or "(no subject)",
preview=message.body_preview[:500] if message.body_preview else "(no preview)",
)
try:
response = await self._call_llm(prompt)
return self._parse_response(response)
except Exception as e:
logger.error(f"L1 classification error: {e}")
return TriageResult(
category=EmailCategory.UNCERTAIN,
confidence=0.0,
reason=f"Classification failed: {e}",
)
async def _call_llm(self, prompt: str) -> str:
"""Call the Fireworks API."""
client = await self._get_client()
url = "https://api.fireworks.ai/inference/v1/chat/completions"
headers = {
"Authorization": f"Bearer {self.config.api_key}",
"Content-Type": "application/json",
}
payload = {
"model": self.config.model,
"messages": [
{"role": "user", "content": prompt}
],
"max_tokens": 500,
"temperature": 0.1, # Low temp for consistent classification
}
response = await client.post(url, headers=headers, json=payload)
response.raise_for_status()
data = response.json()
return data["choices"][0]["message"]["content"]
def _parse_response(self, response: str) -> TriageResult:
"""Parse the LLM response into a TriageResult."""
# Try to extract JSON from response
try:
# Find JSON in response (may have extra text)
start = response.find("{")
end = response.rfind("}") + 1
if start >= 0 and end > start:
json_str = response[start:end]
data = json.loads(json_str)
else:
raise ValueError("No JSON found in response")
# Parse category
category_str = data.get("category", "uncertain").lower()
try:
category = EmailCategory(category_str)
except ValueError:
category = EmailCategory.UNCERTAIN
# Parse confidence
confidence = float(data.get("confidence", 0.5))
confidence = max(0.0, min(1.0, confidence))
# Parse reason
reason = data.get("reason", "")
# Parse shipping info if present
shipping = None
if shipping_data := data.get("shipping"):
status_str = shipping_data.get("status", "").lower()
try:
status = ShippingStatus(status_str) if status_str else None
except ValueError:
status = None
shipping = ShippingInfo(
carrier=shipping_data.get("carrier"),
status=status,
item=shipping_data.get("item"),
expected_date=shipping_data.get("expected_date"),
tracking_number=shipping_data.get("tracking_number"),
)
return TriageResult(
category=category,
confidence=confidence,
reason=reason,
shipping=shipping,
)
except Exception as e:
logger.warning(f"Failed to parse L1 response: {e}\nResponse: {response}")
return TriageResult(
category=EmailCategory.UNCERTAIN,
confidence=0.3,
reason=f"Parse error: {e}",
)

View File

@ -1,116 +0,0 @@
"""L2 Escalation - Send to James (Opus) via Gateway."""
import logging
from typing import Optional
import httpx
from ..config import L2Config
from ..models import Message, TriageResult
logger = logging.getLogger(__name__)
class L2Escalation:
"""Level 2 escalation to James via Clawdbot Gateway."""
def __init__(self, config: L2Config):
self.config = config
self._client: Optional[httpx.AsyncClient] = None
async def _get_client(self) -> httpx.AsyncClient:
if self._client is None:
self._client = httpx.AsyncClient(timeout=60.0)
return self._client
async def close(self) -> None:
if self._client:
await self._client.aclose()
self._client = None
async def escalate(
self,
message: Message,
triage_result: TriageResult,
account_id: str,
) -> bool:
"""Escalate a message to James for review.
Returns True if escalation was successful.
"""
# Format the escalation message
escalation_text = self._format_escalation(message, triage_result, account_id)
try:
# Send to gateway
client = await self._get_client()
# Gateway internal message API
# This hooks into an existing session or creates one
url = f"{self.config.gateway_url}/api/message"
payload = {
"text": escalation_text,
"source": "mail-agent",
"metadata": {
"type": "email_escalation",
"account": account_id,
"uid": message.uid,
"folder": message.folder,
"l1_category": triage_result.category.value,
"l1_confidence": triage_result.confidence,
},
}
response = await client.post(url, json=payload)
if response.status_code in (200, 201, 202):
logger.info(f"L2 escalation sent for message {message.uid}")
return True
else:
logger.warning(
f"L2 escalation failed: {response.status_code} {response.text}"
)
return False
except Exception as e:
logger.error(f"L2 escalation error: {e}")
return False
def _format_escalation(
self,
message: Message,
triage_result: TriageResult,
account_id: str,
) -> str:
"""Format the escalation message for James."""
lines = [
"📧 **Email Review Request**",
"",
f"**From:** {message.from_}",
f"**Subject:** {message.subject or '(no subject)'}",
f"**Account:** {account_id}",
f"**Folder:** {message.folder}",
"",
f"**L1 Triage:** {triage_result.category.value} ({triage_result.confidence:.0%})",
f"**Reason:** {triage_result.reason}",
"",
"**Preview:**",
"```",
(message.body_preview or "(no preview)")[:500],
"```",
"",
"**Actions:**",
"- Reply: Draft a response",
"- Archive: Move to archive",
"- Delete: Delete the message",
"- Escalate: Send to Johan",
]
if triage_result.shipping:
ship = triage_result.shipping
lines.insert(8, "")
lines.insert(9, f"**Shipping:** {ship.carrier} - {ship.status}")
if ship.item:
lines.insert(10, f"**Item:** {ship.item}")
return "\n".join(lines)

View File

@ -1,132 +0,0 @@
"""L3 Escalation - Send to Johan via Gateway/Signal."""
import logging
from typing import Optional
import httpx
from ..config import L3Config
from ..models import Message, TriageResult
logger = logging.getLogger(__name__)
class L3Escalation:
"""Level 3 escalation to Johan via Signal."""
def __init__(self, config: L3Config):
self.config = config
self._client: Optional[httpx.AsyncClient] = None
async def _get_client(self) -> httpx.AsyncClient:
if self._client is None:
self._client = httpx.AsyncClient(timeout=30.0)
return self._client
async def close(self) -> None:
if self._client:
await self._client.aclose()
self._client = None
async def escalate(
self,
message: Message,
triage_result: TriageResult,
account_id: str,
james_notes: Optional[str] = None,
) -> bool:
"""Escalate a message to Johan via Signal.
Returns True if escalation was successful.
"""
# Format the escalation message
escalation_text = self._format_escalation(
message, triage_result, account_id, james_notes
)
try:
client = await self._get_client()
# Use gateway message API to send via Signal
# Gateway routes to the configured Signal channel
url = f"{self.config.gateway_url}/api/message/send"
payload = {
"channel": "signal",
"message": escalation_text,
# Target: main user (Johan) - gateway knows this
}
response = await client.post(url, json=payload)
if response.status_code in (200, 201, 202):
logger.info(f"L3 escalation sent for message {message.uid}")
return True
else:
logger.warning(
f"L3 escalation failed: {response.status_code} {response.text}"
)
return False
except Exception as e:
logger.error(f"L3 escalation error: {e}")
return False
def _format_escalation(
self,
message: Message,
triage_result: TriageResult,
account_id: str,
james_notes: Optional[str],
) -> str:
"""Format the escalation message for Johan."""
lines = [
"🚨 **Email Needs Your Attention**",
"",
f"**From:** {message.from_}",
f"**Subject:** {message.subject or '(no subject)'}",
"",
]
# Add James's notes if present
if james_notes:
lines.extend([
"**James says:**",
james_notes,
"",
])
# Add preview
preview = (message.body_preview or "")[:300]
if preview:
lines.extend([
"**Preview:**",
preview,
"",
])
# Add context
lines.extend([
f"_Category: {triage_result.category.value} | "
f"Account: {account_id} | "
f"UID: {message.uid}_",
])
return "\n".join(lines)
async def send_notification(self, text: str) -> bool:
"""Send a generic notification to Johan."""
try:
client = await self._get_client()
url = f"{self.config.gateway_url}/api/message/send"
payload = {
"channel": "signal",
"message": text,
}
response = await client.post(url, json=payload)
return response.status_code in (200, 201, 202)
except Exception as e:
logger.error(f"Notification error: {e}")
return False

View File

@ -1,237 +0,0 @@
"""Triage pipeline orchestration."""
import fnmatch
import logging
from typing import Callable, Optional
import httpx
from ..config import TriageConfig, ShippingConfig
from ..models import EmailCategory, Message, Shipment, ShippingStatus, TriageResult
from .l1 import L1Triage
from .l2 import L2Escalation
from .l3 import L3Escalation
logger = logging.getLogger(__name__)
class TriagePipeline:
"""Orchestrates the multi-tier triage process."""
def __init__(
self,
config: TriageConfig,
on_archive: Optional[Callable[[str, int, str], None]] = None,
on_delete: Optional[Callable[[str, int, str], None]] = None,
):
self.config = config
self.on_archive = on_archive
self.on_delete = on_delete
self.l1 = L1Triage(config.l1)
self.l2 = L2Escalation(config.l2)
self.l3 = L3Escalation(config.l3)
self._shipping_client: Optional[httpx.AsyncClient] = None
async def close(self) -> None:
await self.l1.close()
await self.l2.close()
await self.l3.close()
if self._shipping_client:
await self._shipping_client.aclose()
async def process(self, account_id: str, message: Message) -> TriageResult:
"""Process a message through the triage pipeline."""
# Check rules first
action = self._check_rules(message)
if action:
return await self._execute_rule_action(account_id, message, action)
# L1 classification
result = await self.l1.classify(message)
logger.info(
f"L1: {message.uid} -> {result.category.value} "
f"({result.confidence:.0%}): {result.reason}"
)
# Decide action based on category and confidence
await self._handle_result(account_id, message, result)
return result
def _check_rules(self, message: Message) -> Optional[str]:
"""Check if any rules apply to this message."""
from_addr = message.from_.lower()
# Always escalate from specific domains
for pattern in self.config.rules.always_escalate_from:
if self._matches_pattern(from_addr, pattern):
return "escalate"
# Auto-archive from specific senders
for pattern in self.config.rules.auto_archive_from:
if self._matches_pattern(from_addr, pattern):
return "archive"
# Auto-delete from specific senders
for pattern in self.config.rules.auto_delete_from:
if self._matches_pattern(from_addr, pattern):
return "delete"
return None
def _matches_pattern(self, address: str, pattern: str) -> bool:
"""Check if an email address matches a pattern."""
pattern = pattern.lower()
# Extract just the email part if it has a name
if "<" in address and ">" in address:
start = address.find("<") + 1
end = address.find(">")
address = address[start:end]
return fnmatch.fnmatch(address, pattern)
async def _execute_rule_action(
self,
account_id: str,
message: Message,
action: str,
) -> TriageResult:
"""Execute a rule-based action."""
result = TriageResult(
category=EmailCategory.NOTIFICATION,
confidence=1.0,
reason=f"Rule: auto-{action}",
)
if action == "archive":
if self.on_archive:
self.on_archive(account_id, message.uid, message.folder)
logger.info(f"Rule: archived {message.uid}")
elif action == "delete":
if self.on_delete:
self.on_delete(account_id, message.uid, message.folder)
logger.info(f"Rule: deleted {message.uid}")
elif action == "escalate":
result.category = EmailCategory.IMPORTANT
result.reason = "Rule: always escalate from this sender"
await self.l2.escalate(message, result, account_id)
return result
async def _handle_result(
self,
account_id: str,
message: Message,
result: TriageResult,
) -> None:
"""Handle the L1 classification result."""
category = result.category
confidence = result.confidence
# High confidence actions
if confidence >= 0.8:
if category == EmailCategory.SPAM:
if self.on_delete:
self.on_delete(account_id, message.uid, message.folder)
logger.info(f"Deleted spam: {message.uid}")
return
elif category == EmailCategory.NEWSLETTER:
if self.on_archive:
self.on_archive(account_id, message.uid, message.folder)
logger.info(f"Archived newsletter: {message.uid}")
return
elif category == EmailCategory.RECEIPT:
if self.on_archive:
self.on_archive(account_id, message.uid, message.folder)
logger.info(f"Archived receipt: {message.uid}")
return
elif category == EmailCategory.SHIPPING:
await self._handle_shipping(account_id, message, result)
if self.on_archive:
self.on_archive(account_id, message.uid, message.folder)
logger.info(f"Processed shipping: {message.uid}")
return
elif category == EmailCategory.NOTIFICATION:
if self.on_archive:
self.on_archive(account_id, message.uid, message.folder)
logger.info(f"Archived notification: {message.uid}")
return
# Escalate to L2 for:
# - Personal/important messages
# - Low confidence classifications
# - Uncertain category
if category in (EmailCategory.PERSONAL, EmailCategory.IMPORTANT):
await self.l2.escalate(message, result, account_id)
logger.info(f"Escalated to L2: {message.uid} ({category.value})")
elif category == EmailCategory.UNCERTAIN or confidence < 0.8:
await self.l2.escalate(message, result, account_id)
logger.info(f"Escalated to L2 (uncertain): {message.uid}")
# Flag important messages
if category == EmailCategory.IMPORTANT:
# TODO: Flag the message in IMAP
pass
async def _handle_shipping(
self,
account_id: str,
message: Message,
result: TriageResult,
) -> None:
"""Handle a shipping notification."""
if not result.shipping:
return
ship = result.shipping
# Post to dashboard
await self._post_shipping_to_dashboard(ship)
# Track shipment state (TODO: implement state file)
logger.info(
f"Shipping: {ship.carrier} - {ship.status} - {ship.item}"
)
async def _post_shipping_to_dashboard(self, ship) -> None:
"""Post shipping update to James Dashboard."""
if not self._shipping_client:
self._shipping_client = httpx.AsyncClient(timeout=10.0)
try:
url = f"{self.config.shipping.dashboard_url}/api/news"
# Format status message
status_text = {
ShippingStatus.ORDERED: "Order confirmed",
ShippingStatus.PICKED_UP: f"Picked up by {ship.carrier}",
ShippingStatus.IN_TRANSIT: "In transit",
ShippingStatus.OUT_FOR_DELIVERY: "Out for delivery",
ShippingStatus.DELIVERED: "Delivered ✓",
}.get(ship.status, str(ship.status))
if ship.expected_date:
status_text += f". Expected {ship.expected_date}"
payload = {
"title": f"📦 {ship.item or 'Package'}",
"body": status_text,
"type": "info" if ship.status != ShippingStatus.DELIVERED else "success",
"source": "shipping",
}
response = await self._shipping_client.post(url, json=payload)
if response.status_code not in (200, 201):
logger.warning(f"Dashboard post failed: {response.status_code}")
except Exception as e:
logger.error(f"Dashboard post error: {e}")