From 36637c0d70dbec6b404abc06c6b7007c6618f8d1 Mon Sep 17 00:00:00 2001 From: James Date: Sat, 31 Jan 2026 12:18:04 +0000 Subject: [PATCH] Simplify: remove L1/L2/L3 triage, webhook to main session All intelligence now lives in OpenClaw (James). Mail Agent is just: - IMAP API (list/read/move/delete) - IMAP IDLE monitoring - Webhook POST to /hooks/mail Removed: - L1 Fireworks llama triage - L2/L3 escalation logic - Hardcoded shipping detection - Rule-based routing - All AI logic 974 lines deleted, complexity moved to the right place. --- README.md | 115 +++++++----------- config.yaml | 25 +--- src/actions/__init__.py | 4 - src/actions/unsubscribe.py | 125 ------------------- src/config.py | 50 +------- src/main.py | 110 ++++++++--------- src/triage/__init__.py | 7 -- src/triage/l1.py | 160 ------------------------- src/triage/l2.py | 116 ------------------ src/triage/l3.py | 132 --------------------- src/triage/pipeline.py | 237 ------------------------------------- 11 files changed, 106 insertions(+), 975 deletions(-) delete mode 100644 src/actions/__init__.py delete mode 100644 src/actions/unsubscribe.py delete mode 100644 src/triage/__init__.py delete mode 100644 src/triage/l1.py delete mode 100644 src/triage/l2.py delete mode 100644 src/triage/l3.py delete mode 100644 src/triage/pipeline.py diff --git a/README.md b/README.md index 50095e0..5b8cc02 100644 --- a/README.md +++ b/README.md @@ -1,24 +1,15 @@ # Mail Agent -IMAP-based email triage with multi-tier escalation. +Simple IMAP/SMTP API with webhook notifications for OpenClaw. ## Overview -Mail Agent monitors IMAP accounts for new mail and automatically triages messages: +Mail Agent is a thin layer over IMAP that: +1. Monitors mailboxes via IMAP IDLE (push notifications) +2. Provides a REST API for reading/moving/deleting messages +3. POSTs new mail to OpenClaw webhook for processing -- **L1 (Cheap Model):** Fast classification using Fireworks llama-v3p1-8b - - Spam → delete - - Newsletter/receipt → archive - - Shipping → dashboard + archive - - Uncertain → escalate to L2 - -- **L2 (James/Opus):** Review via Clawdbot Gateway - - Full context review - - Can draft replies or escalate - -- **L3 (Johan):** Signal notification - - Important stuff only - - Human decision required +**All intelligence lives in OpenClaw.** Mail Agent is just the pipe. ## Quick Start @@ -30,10 +21,6 @@ source .venv/bin/activate # Install dependencies pip install -r requirements.txt -# Set environment variables -export FIREWORKS_API_KEY=your-api-key -export PROTON_BRIDGE_PASSWORD=BlcMCKtNDfqv0cq1LmGR9g - # Run python -m src.main ``` @@ -58,44 +45,54 @@ accounts: watch: [INBOX] archive: Archive -triage: +webhook: enabled: true - l1: - provider: fireworks - model: accounts/fireworks/models/llama-v3p1-8b-instruct - api_key: ${FIREWORKS_API_KEY} + url: http://localhost:18789/hooks/mail + token: kuma-alert-token-2026 ``` ## API Endpoints +### Health +- `GET /health` - Health check + ### Accounts - `GET /accounts` - List all configured accounts -- `GET /accounts/{id}` - Get account details - `GET /accounts/{id}/mailboxes` - List folders ### Messages -- `GET /accounts/{id}/messages?folder=INBOX&unread=true` - List messages +- `GET /accounts/{id}/messages?folder=INBOX&limit=20` - List messages - `GET /accounts/{id}/messages/{uid}?folder=INBOX` - Get full message - `PATCH /accounts/{id}/messages/{uid}` - Update flags/move -- `DELETE /accounts/{id}/messages/{uid}` - Delete message +- `DELETE /accounts/{id}/messages/{uid}?folder=INBOX` - Delete message ### Events (SSE) -- `GET /accounts/{id}/events?folder=INBOX` - Subscribe to new mail events +- `GET /events?accounts=proton` - Subscribe to new mail events + +## Webhook Payload + +When new mail arrives, POSTs to webhook: + +```json +{ + "account": "proton", + "uid": 12345, + "folder": "INBOX", + "from": "sender@example.com", + "to": ["recipient@example.com"], + "subject": "Hello", + "date": "2026-01-31T10:00:00Z", + "preview": "First 1000 chars...", + "body": "Full text up to 10KB", + "has_attachments": false, + "attachment_names": [] +} +``` ## Systemd Service ```bash -# Copy service file -cp systemd/mail-agent.service ~/.config/systemd/user/ - -# Edit to add FIREWORKS_API_KEY -systemctl --user edit mail-agent.service - -# Enable and start -systemctl --user enable mail-agent -systemctl --user start mail-agent - -# Check status +systemctl --user restart mail-agent systemctl --user status mail-agent journalctl --user -u mail-agent -f ``` @@ -107,44 +104,16 @@ New Mail (IMAP IDLE) │ ▼ ┌─────────────────────┐ -│ L1: Cheap Model │ ~$0.20/1M tokens -│ Fast classification│ +│ POST to webhook │ +│ (raw email data) │ └─────────────────────┘ │ - ▼ (uncertain/important) + ▼ ┌─────────────────────┐ -│ L2: James (Opus) │ via Gateway -│ Context review │ +│ OpenClaw (James) │ ← All decisions here +│ Archive/Delete/ │ +│ Reply/Escalate │ └─────────────────────┘ - │ - ▼ (needs human) -┌─────────────────────┐ -│ L3: Johan │ Signal notification -└─────────────────────┘ -``` - -## Shipping Dashboard - -Shipping notifications are automatically posted to the James Dashboard: - -``` -POST http://100.123.216.65:9200/api/news -{ - "title": "📦 E3-1275 Server", - "body": "Picked up by UPS. Expected Feb 3rd.", - "type": "info", - "source": "shipping" -} -``` - -## Development - -```bash -# Run with auto-reload -uvicorn src.main:app --reload --host 127.0.0.1 --port 8025 - -# Test IMAP connection -python -c "from src.imap import ImapClient; from src.config import get_config; c=get_config(); client=ImapClient('proton', c.accounts['proton']); client.connect(); print(client.list_mailboxes())" ``` ## License diff --git a/config.yaml b/config.yaml index 9228074..f785286 100644 --- a/config.yaml +++ b/config.yaml @@ -14,26 +14,7 @@ accounts: archive: Archive spam: Spam -triage: +webhook: enabled: true - l1: - provider: fireworks - model: accounts/fireworks/models/llama-v3p1-8b-instruct - api_key: ${FIREWORKS_API_KEY} - l2: - gateway_url: http://localhost:18080 - l3: - gateway_url: http://localhost:18080 - - shipping: - dashboard_url: http://100.123.216.65:9200 - auto_cleanup_days: 1 - - rules: - always_escalate_from: - - "*@inou.com" - - "*@kaseya.com" - auto_archive_from: - - "*@github.com" - - "noreply@*" - auto_delete_from: [] + url: http://localhost:18789/hooks/mail + token: kuma-alert-token-2026 diff --git a/src/actions/__init__.py b/src/actions/__init__.py deleted file mode 100644 index c6ec4b7..0000000 --- a/src/actions/__init__.py +++ /dev/null @@ -1,4 +0,0 @@ -"""Action handlers.""" -from .unsubscribe import find_unsubscribe_link, execute_unsubscribe - -__all__ = ["find_unsubscribe_link", "execute_unsubscribe"] diff --git a/src/actions/unsubscribe.py b/src/actions/unsubscribe.py deleted file mode 100644 index 09982ae..0000000 --- a/src/actions/unsubscribe.py +++ /dev/null @@ -1,125 +0,0 @@ -"""Unsubscribe action handler.""" -import logging -import re -from typing import Optional -from urllib.parse import urlparse - -import httpx - -from ..models import Message - -logger = logging.getLogger(__name__) - - -def find_unsubscribe_link(message: Message) -> Optional[str]: - """Find unsubscribe link in an email message. - - Checks: - 1. List-Unsubscribe header (TODO: needs raw headers) - 2. HTML body for common unsubscribe patterns - 3. Text body for unsubscribe URLs - """ - # Search patterns - patterns = [ - r'href=["\']?(https?://[^"\'>\s]*unsubscribe[^"\'>\s]*)["\']?', - r'href=["\']?(https?://[^"\'>\s]*optout[^"\'>\s]*)["\']?', - r'href=["\']?(https?://[^"\'>\s]*opt-out[^"\'>\s]*)["\']?', - r'href=["\']?(https?://[^"\'>\s]*remove[^"\'>\s]*)["\']?', - r'(https?://[^\s<>"]*unsubscribe[^\s<>"]*)', - r'(https?://[^\s<>"]*optout[^\s<>"]*)', - ] - - # Search in HTML body first - if message.body_html: - for pattern in patterns: - matches = re.findall(pattern, message.body_html, re.IGNORECASE) - if matches: - url = matches[0] - if _is_valid_unsubscribe_url(url): - return url - - # Search in text body - if message.body_text: - for pattern in patterns: - matches = re.findall(pattern, message.body_text, re.IGNORECASE) - if matches: - url = matches[0] - if _is_valid_unsubscribe_url(url): - return url - - return None - - -def _is_valid_unsubscribe_url(url: str) -> bool: - """Validate that a URL looks like a legitimate unsubscribe link.""" - try: - parsed = urlparse(url) - - # Must be HTTP(S) - if parsed.scheme not in ("http", "https"): - return False - - # Must have a host - if not parsed.netloc: - return False - - # Reject obvious non-unsubscribe URLs - suspicious = ["login", "password", "account", "download"] - for term in suspicious: - if term in url.lower() and "unsubscribe" not in url.lower(): - return False - - return True - except Exception: - return False - - -async def execute_unsubscribe(url: str) -> tuple[bool, str]: - """Execute an unsubscribe action by visiting the URL. - - Returns (success, message). - """ - try: - async with httpx.AsyncClient( - timeout=30.0, - follow_redirects=True, - headers={ - "User-Agent": "Mozilla/5.0 (compatible; MailAgent/1.0)", - }, - ) as client: - response = await client.get(url) - - # Check for success indicators - if response.status_code == 200: - content = response.text.lower() - - # Look for success messages - success_indicators = [ - "unsubscribed", - "removed", - "successfully", - "you have been", - "no longer", - ] - - for indicator in success_indicators: - if indicator in content: - logger.info(f"Unsubscribe successful: {url}") - return True, "Successfully unsubscribed" - - # If we got 200 but no clear success message, assume it worked - # (many unsubscribe pages just say "done" or redirect) - logger.info(f"Unsubscribe completed (no confirmation): {url}") - return True, "Unsubscribe request sent" - - else: - logger.warning(f"Unsubscribe failed: {response.status_code} for {url}") - return False, f"HTTP {response.status_code}" - - except httpx.TimeoutException: - logger.error(f"Unsubscribe timeout: {url}") - return False, "Request timed out" - - except Exception as e: - logger.error(f"Unsubscribe error: {e}") - return False, str(e) diff --git a/src/config.py b/src/config.py index af528db..c35ebc0 100644 --- a/src/config.py +++ b/src/config.py @@ -28,44 +28,17 @@ class AccountConfig(BaseModel): folders: ImapFolders = Field(default_factory=ImapFolders) -class L1Config(BaseModel): - provider: str = "fireworks" - model: str = "accounts/fireworks/models/llama-v3p1-8b-instruct" - api_key: str = "" - - -class L2Config(BaseModel): - gateway_url: str = "http://localhost:18080" - - -class L3Config(BaseModel): - gateway_url: str = "http://localhost:18080" - - -class ShippingConfig(BaseModel): - dashboard_url: str = "http://100.123.216.65:9200" - auto_cleanup_days: int = 1 - - -class TriageRules(BaseModel): - always_escalate_from: list[str] = Field(default_factory=list) - auto_archive_from: list[str] = Field(default_factory=list) - auto_delete_from: list[str] = Field(default_factory=list) - - -class TriageConfig(BaseModel): +class WebhookConfig(BaseModel): + """Simple webhook config - POST new mail to OpenClaw.""" enabled: bool = True - l1: L1Config = Field(default_factory=L1Config) - l2: L2Config = Field(default_factory=L2Config) - l3: L3Config = Field(default_factory=L3Config) - shipping: ShippingConfig = Field(default_factory=ShippingConfig) - rules: TriageRules = Field(default_factory=TriageRules) + url: str = "http://localhost:18789/hooks/mail" + token: str = "kuma-alert-token-2026" class Config(BaseModel): server: ServerConfig = Field(default_factory=ServerConfig) accounts: dict[str, AccountConfig] = Field(default_factory=dict) - triage: TriageConfig = Field(default_factory=TriageConfig) + webhook: WebhookConfig = Field(default_factory=WebhookConfig) def expand_env_vars(text: str) -> str: @@ -74,20 +47,12 @@ def expand_env_vars(text: str) -> str: var_name = match.group(1) or match.group(2) return os.environ.get(var_name, match.group(0)) - # Match ${VAR} or $VAR (not inside quotes for simplicity) pattern = r'\$\{([^}]+)\}|\$([A-Za-z_][A-Za-z0-9_]*)' return re.sub(pattern, replacer, text) def load_config(path: Optional[str] = None) -> Config: - """Load configuration from YAML file. - - Searches in order: - 1. Explicit path argument - 2. MAIL_AGENT_CONFIG env var - 3. ./config.yaml - 4. ~/.config/mail-agent/config.yaml - """ + """Load configuration from YAML file.""" search_paths = [] if path: @@ -108,10 +73,8 @@ def load_config(path: Optional[str] = None) -> Config: break if config_path is None: - # Return default config return Config() - # Load and expand env vars raw = config_path.read_text() expanded = expand_env_vars(raw) data = yaml.safe_load(expanded) @@ -119,7 +82,6 @@ def load_config(path: Optional[str] = None) -> Config: return Config.model_validate(data) -# Global config instance _config: Optional[Config] = None diff --git a/src/main.py b/src/main.py index e8f776f..bb58e7c 100644 --- a/src/main.py +++ b/src/main.py @@ -1,18 +1,17 @@ -"""Mail Agent - IMAP-based email triage with multi-tier escalation.""" +"""Mail Agent - Simple IMAP/SMTP API with webhook notifications.""" import asyncio import logging -import sys from contextlib import asynccontextmanager from typing import Optional +import httpx from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware -from .config import load_config, get_config, Config +from .config import load_config, get_config from .api import accounts_router, messages_router, events_router from .imap import ImapClient from .imap.idle import IdleManager -from .triage import TriagePipeline from .models import NewMailEvent # Configure logging @@ -24,8 +23,8 @@ logger = logging.getLogger(__name__) # Global state idle_manager: Optional[IdleManager] = None -triage_pipeline: Optional[TriagePipeline] = None imap_clients: dict[str, ImapClient] = {} +http_client: Optional[httpx.AsyncClient] = None def get_imap_client(account_id: str) -> ImapClient: @@ -38,38 +37,15 @@ def get_imap_client(account_id: str) -> ImapClient: return imap_clients[account_id] -def on_archive(account_id: str, uid: int, folder: str) -> None: - """Archive a message.""" - try: - config = get_config() - archive_folder = config.accounts[account_id].folders.archive - - client = get_imap_client(account_id) - client.connect() - client.move_message(uid, folder, archive_folder) - logger.info(f"Archived message {uid} to {archive_folder}") - except Exception as e: - logger.error(f"Archive failed: {e}") - - -def on_delete(account_id: str, uid: int, folder: str) -> None: - """Delete a message.""" - try: - client = get_imap_client(account_id) - client.connect() - client.delete_message(uid, folder) - logger.info(f"Deleted message {uid}") - except Exception as e: - logger.error(f"Delete failed: {e}") - - async def on_new_mail(account_id: str, event: NewMailEvent) -> None: - """Handle new mail event from IDLE.""" - global triage_pipeline + """Handle new mail - fetch full message and POST to webhook.""" + global http_client + config = get_config() logger.info(f"New mail: {event.from_} - {event.subject}") - if triage_pipeline is None: + if not config.webhook.enabled: + logger.debug("Webhook disabled, skipping") return try: @@ -78,41 +54,62 @@ async def on_new_mail(account_id: str, event: NewMailEvent) -> None: client.connect() message = client.fetch_message(event.uid, event.folder, body=True) - if message: - # Run through triage pipeline - await triage_pipeline.process(account_id, message) + if not message: + logger.warning(f"Could not fetch message {event.uid}") + return + + # POST to webhook + if http_client is None: + http_client = httpx.AsyncClient(timeout=30.0) + + payload = { + "account": account_id, + "uid": message.uid, + "folder": message.folder, + "from": message.from_, + "to": message.to, + "subject": message.subject or "(no subject)", + "date": message.date.isoformat() if message.date else None, + "preview": (message.body_preview or "")[:1000], + "body": message.body_text[:10000] if message.body_text else None, + "has_attachments": len(message.attachments) > 0, + "attachment_names": [a.filename for a in message.attachments] if message.attachments else [], + } + + headers = {"X-Hook-Token": config.webhook.token} + + response = await http_client.post( + config.webhook.url, + json=payload, + headers=headers, + ) + + if response.status_code in (200, 201, 202): + logger.info(f"Webhook sent: {message.from_} - {message.subject}") + else: + logger.warning(f"Webhook failed: {response.status_code}") except Exception as e: - logger.error(f"Triage error: {e}") + logger.error(f"Webhook error: {e}") def on_new_mail_sync(account_id: str, event: NewMailEvent) -> None: """Sync wrapper for async on_new_mail.""" try: - loop = asyncio.get_running_loop() + asyncio.get_running_loop() asyncio.create_task(on_new_mail(account_id, event)) except RuntimeError: - # No running loop, run synchronously asyncio.run(on_new_mail(account_id, event)) @asynccontextmanager async def lifespan(app: FastAPI): """Application lifespan handler.""" - global idle_manager, triage_pipeline + global idle_manager, http_client logger.info("Starting Mail Agent...") config = get_config() - # Initialize triage pipeline if enabled - if config.triage.enabled: - triage_pipeline = TriagePipeline( - config.triage, - on_archive=on_archive, - on_delete=on_delete, - ) - logger.info("Triage pipeline initialized") - # Initialize IDLE manager idle_manager = IdleManager() idle_manager.add_callback(on_new_mail_sync) @@ -129,6 +126,9 @@ async def lifespan(app: FastAPI): except Exception as e: logger.error(f"Failed to watch {account_id}: {e}") + if config.webhook.enabled: + logger.info(f"Webhook enabled: {config.webhook.url}") + yield # Cleanup @@ -137,8 +137,8 @@ async def lifespan(app: FastAPI): if idle_manager: await idle_manager.stop_all() - if triage_pipeline: - await triage_pipeline.close() + if http_client: + await http_client.aclose() for client in imap_clients.values(): try: @@ -152,8 +152,8 @@ async def lifespan(app: FastAPI): # Create FastAPI app app = FastAPI( title="Mail Agent", - description="IMAP-based email triage with multi-tier escalation", - version="0.1.0", + description="Simple IMAP/SMTP API with webhook notifications", + version="0.2.0", lifespan=lifespan, ) @@ -177,7 +177,7 @@ async def root(): """Root endpoint.""" return { "name": "Mail Agent", - "version": "0.1.0", + "version": "0.2.0", "status": "running", } @@ -189,7 +189,7 @@ async def health(): return { "status": "healthy", "accounts": list(config.accounts.keys()), - "triage_enabled": config.triage.enabled, + "webhook_enabled": config.webhook.enabled, } diff --git a/src/triage/__init__.py b/src/triage/__init__.py deleted file mode 100644 index 6bd306f..0000000 --- a/src/triage/__init__.py +++ /dev/null @@ -1,7 +0,0 @@ -"""Email triage module.""" -from .l1 import L1Triage -from .l2 import L2Escalation -from .l3 import L3Escalation -from .pipeline import TriagePipeline - -__all__ = ["L1Triage", "L2Escalation", "L3Escalation", "TriagePipeline"] diff --git a/src/triage/l1.py b/src/triage/l1.py deleted file mode 100644 index ffcb335..0000000 --- a/src/triage/l1.py +++ /dev/null @@ -1,160 +0,0 @@ -"""L1 Triage - Cheap model classification.""" -import json -import logging -from typing import Optional - -import httpx - -from ..config import L1Config -from ..models import EmailCategory, Message, ShippingInfo, ShippingStatus, TriageResult - -logger = logging.getLogger(__name__) - -L1_PROMPT = """Classify this email. Respond with JSON only. - -From: {from_addr} -Subject: {subject} -Preview: {preview} - -Categories: -- spam: Obvious spam, phishing, scams -- newsletter: Marketing, newsletters, promotions -- receipt: Order confirmations, invoices (not shipping) -- shipping: Shipping/delivery updates (picked up, in transit, delivered) -- notification: Automated notifications (GitHub, services) -- personal: From a real person, needs attention -- important: Urgent, financial, legal, medical -- uncertain: Not sure, needs human review - -For shipping emails, also extract: -- carrier: UPS, FedEx, USPS, DHL, etc. -- status: ordered, picked_up, in_transit, out_for_delivery, delivered -- item: Brief description of what's being shipped -- expected_date: Expected delivery date (if available) - -Response format: -{{"category": "...", "confidence": 0.0-1.0, "reason": "brief reason"}} - -For shipping: -{{"category": "shipping", "confidence": 0.9, "reason": "...", - "shipping": {{"carrier": "UPS", "status": "picked_up", "item": "E3-1275 Server", "expected_date": "2026-02-03"}}}}""" - - -class L1Triage: - """Level 1 triage using cheap LLM.""" - - def __init__(self, config: L1Config): - self.config = config - self._client: Optional[httpx.AsyncClient] = None - - async def _get_client(self) -> httpx.AsyncClient: - if self._client is None: - self._client = httpx.AsyncClient(timeout=30.0) - return self._client - - async def close(self) -> None: - if self._client: - await self._client.aclose() - self._client = None - - async def classify(self, message: Message) -> TriageResult: - """Classify a message using the L1 model.""" - prompt = L1_PROMPT.format( - from_addr=message.from_, - subject=message.subject or "(no subject)", - preview=message.body_preview[:500] if message.body_preview else "(no preview)", - ) - - try: - response = await self._call_llm(prompt) - return self._parse_response(response) - except Exception as e: - logger.error(f"L1 classification error: {e}") - return TriageResult( - category=EmailCategory.UNCERTAIN, - confidence=0.0, - reason=f"Classification failed: {e}", - ) - - async def _call_llm(self, prompt: str) -> str: - """Call the Fireworks API.""" - client = await self._get_client() - - url = "https://api.fireworks.ai/inference/v1/chat/completions" - headers = { - "Authorization": f"Bearer {self.config.api_key}", - "Content-Type": "application/json", - } - payload = { - "model": self.config.model, - "messages": [ - {"role": "user", "content": prompt} - ], - "max_tokens": 500, - "temperature": 0.1, # Low temp for consistent classification - } - - response = await client.post(url, headers=headers, json=payload) - response.raise_for_status() - - data = response.json() - return data["choices"][0]["message"]["content"] - - def _parse_response(self, response: str) -> TriageResult: - """Parse the LLM response into a TriageResult.""" - # Try to extract JSON from response - try: - # Find JSON in response (may have extra text) - start = response.find("{") - end = response.rfind("}") + 1 - if start >= 0 and end > start: - json_str = response[start:end] - data = json.loads(json_str) - else: - raise ValueError("No JSON found in response") - - # Parse category - category_str = data.get("category", "uncertain").lower() - try: - category = EmailCategory(category_str) - except ValueError: - category = EmailCategory.UNCERTAIN - - # Parse confidence - confidence = float(data.get("confidence", 0.5)) - confidence = max(0.0, min(1.0, confidence)) - - # Parse reason - reason = data.get("reason", "") - - # Parse shipping info if present - shipping = None - if shipping_data := data.get("shipping"): - status_str = shipping_data.get("status", "").lower() - try: - status = ShippingStatus(status_str) if status_str else None - except ValueError: - status = None - - shipping = ShippingInfo( - carrier=shipping_data.get("carrier"), - status=status, - item=shipping_data.get("item"), - expected_date=shipping_data.get("expected_date"), - tracking_number=shipping_data.get("tracking_number"), - ) - - return TriageResult( - category=category, - confidence=confidence, - reason=reason, - shipping=shipping, - ) - - except Exception as e: - logger.warning(f"Failed to parse L1 response: {e}\nResponse: {response}") - return TriageResult( - category=EmailCategory.UNCERTAIN, - confidence=0.3, - reason=f"Parse error: {e}", - ) diff --git a/src/triage/l2.py b/src/triage/l2.py deleted file mode 100644 index 035d442..0000000 --- a/src/triage/l2.py +++ /dev/null @@ -1,116 +0,0 @@ -"""L2 Escalation - Send to James (Opus) via Gateway.""" -import logging -from typing import Optional - -import httpx - -from ..config import L2Config -from ..models import Message, TriageResult - -logger = logging.getLogger(__name__) - - -class L2Escalation: - """Level 2 escalation to James via Clawdbot Gateway.""" - - def __init__(self, config: L2Config): - self.config = config - self._client: Optional[httpx.AsyncClient] = None - - async def _get_client(self) -> httpx.AsyncClient: - if self._client is None: - self._client = httpx.AsyncClient(timeout=60.0) - return self._client - - async def close(self) -> None: - if self._client: - await self._client.aclose() - self._client = None - - async def escalate( - self, - message: Message, - triage_result: TriageResult, - account_id: str, - ) -> bool: - """Escalate a message to James for review. - - Returns True if escalation was successful. - """ - # Format the escalation message - escalation_text = self._format_escalation(message, triage_result, account_id) - - try: - # Send to gateway - client = await self._get_client() - - # Gateway internal message API - # This hooks into an existing session or creates one - url = f"{self.config.gateway_url}/api/message" - - payload = { - "text": escalation_text, - "source": "mail-agent", - "metadata": { - "type": "email_escalation", - "account": account_id, - "uid": message.uid, - "folder": message.folder, - "l1_category": triage_result.category.value, - "l1_confidence": triage_result.confidence, - }, - } - - response = await client.post(url, json=payload) - - if response.status_code in (200, 201, 202): - logger.info(f"L2 escalation sent for message {message.uid}") - return True - else: - logger.warning( - f"L2 escalation failed: {response.status_code} {response.text}" - ) - return False - - except Exception as e: - logger.error(f"L2 escalation error: {e}") - return False - - def _format_escalation( - self, - message: Message, - triage_result: TriageResult, - account_id: str, - ) -> str: - """Format the escalation message for James.""" - lines = [ - "📧 **Email Review Request**", - "", - f"**From:** {message.from_}", - f"**Subject:** {message.subject or '(no subject)'}", - f"**Account:** {account_id}", - f"**Folder:** {message.folder}", - "", - f"**L1 Triage:** {triage_result.category.value} ({triage_result.confidence:.0%})", - f"**Reason:** {triage_result.reason}", - "", - "**Preview:**", - "```", - (message.body_preview or "(no preview)")[:500], - "```", - "", - "**Actions:**", - "- Reply: Draft a response", - "- Archive: Move to archive", - "- Delete: Delete the message", - "- Escalate: Send to Johan", - ] - - if triage_result.shipping: - ship = triage_result.shipping - lines.insert(8, "") - lines.insert(9, f"**Shipping:** {ship.carrier} - {ship.status}") - if ship.item: - lines.insert(10, f"**Item:** {ship.item}") - - return "\n".join(lines) diff --git a/src/triage/l3.py b/src/triage/l3.py deleted file mode 100644 index 19c3292..0000000 --- a/src/triage/l3.py +++ /dev/null @@ -1,132 +0,0 @@ -"""L3 Escalation - Send to Johan via Gateway/Signal.""" -import logging -from typing import Optional - -import httpx - -from ..config import L3Config -from ..models import Message, TriageResult - -logger = logging.getLogger(__name__) - - -class L3Escalation: - """Level 3 escalation to Johan via Signal.""" - - def __init__(self, config: L3Config): - self.config = config - self._client: Optional[httpx.AsyncClient] = None - - async def _get_client(self) -> httpx.AsyncClient: - if self._client is None: - self._client = httpx.AsyncClient(timeout=30.0) - return self._client - - async def close(self) -> None: - if self._client: - await self._client.aclose() - self._client = None - - async def escalate( - self, - message: Message, - triage_result: TriageResult, - account_id: str, - james_notes: Optional[str] = None, - ) -> bool: - """Escalate a message to Johan via Signal. - - Returns True if escalation was successful. - """ - # Format the escalation message - escalation_text = self._format_escalation( - message, triage_result, account_id, james_notes - ) - - try: - client = await self._get_client() - - # Use gateway message API to send via Signal - # Gateway routes to the configured Signal channel - url = f"{self.config.gateway_url}/api/message/send" - - payload = { - "channel": "signal", - "message": escalation_text, - # Target: main user (Johan) - gateway knows this - } - - response = await client.post(url, json=payload) - - if response.status_code in (200, 201, 202): - logger.info(f"L3 escalation sent for message {message.uid}") - return True - else: - logger.warning( - f"L3 escalation failed: {response.status_code} {response.text}" - ) - return False - - except Exception as e: - logger.error(f"L3 escalation error: {e}") - return False - - def _format_escalation( - self, - message: Message, - triage_result: TriageResult, - account_id: str, - james_notes: Optional[str], - ) -> str: - """Format the escalation message for Johan.""" - lines = [ - "🚨 **Email Needs Your Attention**", - "", - f"**From:** {message.from_}", - f"**Subject:** {message.subject or '(no subject)'}", - "", - ] - - # Add James's notes if present - if james_notes: - lines.extend([ - "**James says:**", - james_notes, - "", - ]) - - # Add preview - preview = (message.body_preview or "")[:300] - if preview: - lines.extend([ - "**Preview:**", - preview, - "", - ]) - - # Add context - lines.extend([ - f"_Category: {triage_result.category.value} | " - f"Account: {account_id} | " - f"UID: {message.uid}_", - ]) - - return "\n".join(lines) - - async def send_notification(self, text: str) -> bool: - """Send a generic notification to Johan.""" - try: - client = await self._get_client() - url = f"{self.config.gateway_url}/api/message/send" - - payload = { - "channel": "signal", - "message": text, - } - - response = await client.post(url, json=payload) - return response.status_code in (200, 201, 202) - - except Exception as e: - logger.error(f"Notification error: {e}") - return False diff --git a/src/triage/pipeline.py b/src/triage/pipeline.py deleted file mode 100644 index cc7537e..0000000 --- a/src/triage/pipeline.py +++ /dev/null @@ -1,237 +0,0 @@ -"""Triage pipeline orchestration.""" -import fnmatch -import logging -from typing import Callable, Optional - -import httpx - -from ..config import TriageConfig, ShippingConfig -from ..models import EmailCategory, Message, Shipment, ShippingStatus, TriageResult -from .l1 import L1Triage -from .l2 import L2Escalation -from .l3 import L3Escalation - -logger = logging.getLogger(__name__) - - -class TriagePipeline: - """Orchestrates the multi-tier triage process.""" - - def __init__( - self, - config: TriageConfig, - on_archive: Optional[Callable[[str, int, str], None]] = None, - on_delete: Optional[Callable[[str, int, str], None]] = None, - ): - self.config = config - self.on_archive = on_archive - self.on_delete = on_delete - - self.l1 = L1Triage(config.l1) - self.l2 = L2Escalation(config.l2) - self.l3 = L3Escalation(config.l3) - - self._shipping_client: Optional[httpx.AsyncClient] = None - - async def close(self) -> None: - await self.l1.close() - await self.l2.close() - await self.l3.close() - if self._shipping_client: - await self._shipping_client.aclose() - - async def process(self, account_id: str, message: Message) -> TriageResult: - """Process a message through the triage pipeline.""" - # Check rules first - action = self._check_rules(message) - if action: - return await self._execute_rule_action(account_id, message, action) - - # L1 classification - result = await self.l1.classify(message) - logger.info( - f"L1: {message.uid} -> {result.category.value} " - f"({result.confidence:.0%}): {result.reason}" - ) - - # Decide action based on category and confidence - await self._handle_result(account_id, message, result) - - return result - - def _check_rules(self, message: Message) -> Optional[str]: - """Check if any rules apply to this message.""" - from_addr = message.from_.lower() - - # Always escalate from specific domains - for pattern in self.config.rules.always_escalate_from: - if self._matches_pattern(from_addr, pattern): - return "escalate" - - # Auto-archive from specific senders - for pattern in self.config.rules.auto_archive_from: - if self._matches_pattern(from_addr, pattern): - return "archive" - - # Auto-delete from specific senders - for pattern in self.config.rules.auto_delete_from: - if self._matches_pattern(from_addr, pattern): - return "delete" - - return None - - def _matches_pattern(self, address: str, pattern: str) -> bool: - """Check if an email address matches a pattern.""" - pattern = pattern.lower() - - # Extract just the email part if it has a name - if "<" in address and ">" in address: - start = address.find("<") + 1 - end = address.find(">") - address = address[start:end] - - return fnmatch.fnmatch(address, pattern) - - async def _execute_rule_action( - self, - account_id: str, - message: Message, - action: str, - ) -> TriageResult: - """Execute a rule-based action.""" - result = TriageResult( - category=EmailCategory.NOTIFICATION, - confidence=1.0, - reason=f"Rule: auto-{action}", - ) - - if action == "archive": - if self.on_archive: - self.on_archive(account_id, message.uid, message.folder) - logger.info(f"Rule: archived {message.uid}") - - elif action == "delete": - if self.on_delete: - self.on_delete(account_id, message.uid, message.folder) - logger.info(f"Rule: deleted {message.uid}") - - elif action == "escalate": - result.category = EmailCategory.IMPORTANT - result.reason = "Rule: always escalate from this sender" - await self.l2.escalate(message, result, account_id) - - return result - - async def _handle_result( - self, - account_id: str, - message: Message, - result: TriageResult, - ) -> None: - """Handle the L1 classification result.""" - category = result.category - confidence = result.confidence - - # High confidence actions - if confidence >= 0.8: - if category == EmailCategory.SPAM: - if self.on_delete: - self.on_delete(account_id, message.uid, message.folder) - logger.info(f"Deleted spam: {message.uid}") - return - - elif category == EmailCategory.NEWSLETTER: - if self.on_archive: - self.on_archive(account_id, message.uid, message.folder) - logger.info(f"Archived newsletter: {message.uid}") - return - - elif category == EmailCategory.RECEIPT: - if self.on_archive: - self.on_archive(account_id, message.uid, message.folder) - logger.info(f"Archived receipt: {message.uid}") - return - - elif category == EmailCategory.SHIPPING: - await self._handle_shipping(account_id, message, result) - if self.on_archive: - self.on_archive(account_id, message.uid, message.folder) - logger.info(f"Processed shipping: {message.uid}") - return - - elif category == EmailCategory.NOTIFICATION: - if self.on_archive: - self.on_archive(account_id, message.uid, message.folder) - logger.info(f"Archived notification: {message.uid}") - return - - # Escalate to L2 for: - # - Personal/important messages - # - Low confidence classifications - # - Uncertain category - if category in (EmailCategory.PERSONAL, EmailCategory.IMPORTANT): - await self.l2.escalate(message, result, account_id) - logger.info(f"Escalated to L2: {message.uid} ({category.value})") - - elif category == EmailCategory.UNCERTAIN or confidence < 0.8: - await self.l2.escalate(message, result, account_id) - logger.info(f"Escalated to L2 (uncertain): {message.uid}") - - # Flag important messages - if category == EmailCategory.IMPORTANT: - # TODO: Flag the message in IMAP - pass - - async def _handle_shipping( - self, - account_id: str, - message: Message, - result: TriageResult, - ) -> None: - """Handle a shipping notification.""" - if not result.shipping: - return - - ship = result.shipping - - # Post to dashboard - await self._post_shipping_to_dashboard(ship) - - # Track shipment state (TODO: implement state file) - logger.info( - f"Shipping: {ship.carrier} - {ship.status} - {ship.item}" - ) - - async def _post_shipping_to_dashboard(self, ship) -> None: - """Post shipping update to James Dashboard.""" - if not self._shipping_client: - self._shipping_client = httpx.AsyncClient(timeout=10.0) - - try: - url = f"{self.config.shipping.dashboard_url}/api/news" - - # Format status message - status_text = { - ShippingStatus.ORDERED: "Order confirmed", - ShippingStatus.PICKED_UP: f"Picked up by {ship.carrier}", - ShippingStatus.IN_TRANSIT: "In transit", - ShippingStatus.OUT_FOR_DELIVERY: "Out for delivery", - ShippingStatus.DELIVERED: "Delivered ✓", - }.get(ship.status, str(ship.status)) - - if ship.expected_date: - status_text += f". Expected {ship.expected_date}" - - payload = { - "title": f"📦 {ship.item or 'Package'}", - "body": status_text, - "type": "info" if ship.status != ShippingStatus.DELIVERED else "success", - "source": "shipping", - } - - response = await self._shipping_client.post(url, json=payload) - if response.status_code not in (200, 201): - logger.warning(f"Dashboard post failed: {response.status_code}") - - except Exception as e: - logger.error(f"Dashboard post error: {e}")