#!/usr/bin/env python3 """ Document Processor for ~/documents/inbox/ Uses AI vision (Claude) for document analysis. Stores embeddings in SQLite. """ import os import sys import json import hashlib import shutil import sqlite3 import csv import base64 import struct from datetime import datetime from pathlib import Path from typing import Optional, Dict, Any, List import time import argparse # Try to import anthropic, fail gracefully with helpful message try: import anthropic except ImportError: print("ERROR: anthropic package not installed") print("Run: cd ~/dev/doc-processor && source venv/bin/activate && pip install anthropic") sys.exit(1) # Paths DOCUMENTS_ROOT = Path.home() / "documents" INBOX = DOCUMENTS_ROOT / "inbox" STORE = DOCUMENTS_ROOT / "store" RECORDS = DOCUMENTS_ROOT / "records" INDEX = DOCUMENTS_ROOT / "index" EXPORTS = DOCUMENTS_ROOT / "exports" EMBEDDINGS_DB = INDEX / "embeddings.db" # Categories CATEGORIES = [ "taxes", "bills", "medical", "insurance", "legal", "financial", "expenses", "vehicles", "home", "personal", "contacts", "uncategorized" ] # Ensure directories exist for d in [STORE, INDEX, EXPORTS]: d.mkdir(parents=True, exist_ok=True) for cat in CATEGORIES: (RECORDS / cat).mkdir(parents=True, exist_ok=True) def get_anthropic_client() -> anthropic.Anthropic: """Get Anthropic client, checking for API key.""" api_key = os.environ.get("ANTHROPIC_API_KEY") if not api_key: # Try reading from config file config_path = Path.home() / "dev/doc-processor/.env" if config_path.exists(): for line in config_path.read_text().splitlines(): if line.startswith("ANTHROPIC_API_KEY="): api_key = line.split("=", 1)[1].strip().strip('"\'') break if not api_key: raise RuntimeError( "ANTHROPIC_API_KEY not set. Either:\n" " 1. Set ANTHROPIC_API_KEY environment variable\n" " 2. Create ~/dev/doc-processor/.env with ANTHROPIC_API_KEY=sk-ant-..." ) return anthropic.Anthropic(api_key=api_key) def init_embeddings_db(): """Initialize SQLite database for embeddings.""" conn = sqlite3.connect(EMBEDDINGS_DB) conn.execute(""" CREATE TABLE IF NOT EXISTS embeddings ( doc_id TEXT PRIMARY KEY, embedding BLOB, text_hash TEXT, created_at TEXT ) """) conn.execute(""" CREATE TABLE IF NOT EXISTS documents ( doc_id TEXT PRIMARY KEY, filename TEXT, category TEXT, doc_type TEXT, date TEXT, vendor TEXT, amount TEXT, summary TEXT, full_text TEXT, processed_at TEXT ) """) conn.commit() conn.close() def file_hash(filepath: Path) -> str: """SHA256 hash of file contents.""" h = hashlib.sha256() with open(filepath, 'rb') as f: for chunk in iter(lambda: f.read(8192), b''): h.update(chunk) return h.hexdigest()[:16] def encode_image_base64(filepath: Path) -> tuple[str, str]: """Encode image/PDF to base64 for API. Returns (base64_data, media_type).""" suffix = filepath.suffix.lower() if suffix == '.pdf': # For PDFs, convert first page to PNG using pdftoppm import subprocess result = subprocess.run( ['pdftoppm', '-png', '-f', '1', '-l', '1', '-r', '150', str(filepath), '-'], capture_output=True, timeout=30 ) if result.returncode == 0: return base64.standard_b64encode(result.stdout).decode('utf-8'), 'image/png' else: raise RuntimeError(f"Failed to convert PDF: {result.stderr.decode()}") # Image files media_types = { '.png': 'image/png', '.jpg': 'image/jpeg', '.jpeg': 'image/jpeg', '.gif': 'image/gif', '.webp': 'image/webp', } media_type = media_types.get(suffix, 'image/png') with open(filepath, 'rb') as f: return base64.standard_b64encode(f.read()).decode('utf-8'), media_type def analyze_document_with_ai(filepath: Path, client: anthropic.Anthropic) -> Dict[str, Any]: """ Use Claude vision to analyze document. Returns: {category, doc_type, date, vendor, amount, summary, full_text} """ print(f" Analyzing with AI...") try: image_data, media_type = encode_image_base64(filepath) except Exception as e: print(f" Failed to encode document: {e}") return { "category": "uncategorized", "doc_type": "unknown", "full_text": f"(Failed to process: {e})", "summary": "Document could not be processed" } prompt = """Analyze this document image and extract: 1. **Full Text**: Transcribe ALL visible text from the document, preserving structure where possible. 2. **Classification**: Categorize into exactly ONE of: - taxes (W-2, 1099, tax returns, IRS forms) - bills (utilities, subscriptions, invoices) - medical (health records, prescriptions, lab results) - insurance (policies, claims, coverage docs) - legal (contracts, agreements, legal notices) - financial (bank statements, investment docs) - expenses (receipts, purchase confirmations) - vehicles (registration, maintenance, DMV) - home (mortgage, HOA, property docs) - personal (ID copies, certificates, misc) - contacts (business cards, contact info) - uncategorized (if none fit) 3. **Document Type**: Specific type (e.g., "utility_bill", "receipt", "tax_form_w2", "insurance_policy") 4. **Key Fields**: - date: Document date (YYYY-MM-DD format if possible) - vendor: Company/organization name - amount: Dollar amount if present (e.g., "$123.45") 5. **Summary**: 1-2 sentence description of what this document is. Respond in JSON format: { "category": "...", "doc_type": "...", "date": "...", "vendor": "...", "amount": "...", "summary": "...", "full_text": "..." }""" try: response = client.messages.create( model="claude-sonnet-4-20250514", max_tokens=4096, messages=[ { "role": "user", "content": [ { "type": "image", "source": { "type": "base64", "media_type": media_type, "data": image_data, }, }, { "type": "text", "text": prompt } ], } ], ) # Parse JSON from response text = response.content[0].text # Try to extract JSON from response (handle markdown code blocks) if "```json" in text: text = text.split("```json")[1].split("```")[0] elif "```" in text: text = text.split("```")[1].split("```")[0] result = json.loads(text.strip()) # Validate category if result.get("category") not in CATEGORIES: result["category"] = "uncategorized" return result except json.JSONDecodeError as e: print(f" Failed to parse AI response as JSON: {e}") print(f" Raw response: {text[:500]}") return { "category": "uncategorized", "doc_type": "unknown", "full_text": text, "summary": "AI response could not be parsed" } except Exception as e: print(f" AI analysis failed: {e}") return { "category": "uncategorized", "doc_type": "unknown", "full_text": f"(AI analysis failed: {e})", "summary": "Document analysis failed" } def generate_embedding(text: str, client: anthropic.Anthropic) -> Optional[List[float]]: """ Generate text embedding using Anthropic's embedding endpoint. Note: As of 2024, Anthropic doesn't have a public embedding API. This is a placeholder - implement with OpenAI, Voyage, or local model. For now, returns None and we'll use full-text search in SQLite. """ # TODO: Implement with preferred embedding provider # Options: # 1. OpenAI text-embedding-3-small (cheap, good quality) # 2. Voyage AI (good for documents) # 3. Local sentence-transformers return None def store_embedding(doc_id: str, embedding: Optional[List[float]], text: str): """Store embedding in SQLite database.""" if embedding is None: return conn = sqlite3.connect(EMBEDDINGS_DB) # Pack floats as binary blob embedding_blob = struct.pack(f'{len(embedding)}f', *embedding) text_hash = hashlib.sha256(text.encode()).hexdigest()[:16] conn.execute(""" INSERT OR REPLACE INTO embeddings (doc_id, embedding, text_hash, created_at) VALUES (?, ?, ?, ?) """, (doc_id, embedding_blob, text_hash, datetime.now().isoformat())) conn.commit() conn.close() def store_document_metadata(doc_id: str, filename: str, classification: Dict, full_text: str): """Store document metadata in SQLite for full-text search.""" conn = sqlite3.connect(EMBEDDINGS_DB) conn.execute(""" INSERT OR REPLACE INTO documents (doc_id, filename, category, doc_type, date, vendor, amount, summary, full_text, processed_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( doc_id, filename, classification.get("category", "uncategorized"), classification.get("doc_type", "unknown"), classification.get("date"), classification.get("vendor"), classification.get("amount"), classification.get("summary"), full_text[:50000], # Limit text size datetime.now().isoformat() )) conn.commit() conn.close() def store_document(filepath: Path, hash_id: str) -> Path: """Copy document to store with hash-based name.""" suffix = filepath.suffix.lower() store_path = STORE / f"{hash_id}{suffix}" if not store_path.exists(): shutil.copy2(filepath, store_path) return store_path def create_record(filepath: Path, hash_id: str, classification: Dict) -> Path: """Create markdown record in appropriate category folder.""" cat = classification.get("category", "uncategorized") now = datetime.now() record_name = f"{now.strftime('%Y%m%d')}_{hash_id}.md" record_path = RECORDS / cat / record_name full_text = classification.get("full_text", "") content = f"""# Document Record **ID:** {hash_id} **Original File:** {filepath.name} **Processed:** {now.isoformat()} **Category:** {cat} **Type:** {classification.get('doc_type', 'unknown')} ## Extracted Info | Field | Value | |-------|-------| | Date | {classification.get('date', 'N/A')} | | Vendor | {classification.get('vendor', 'N/A')} | | Amount | {classification.get('amount', 'N/A')} | ## Summary {classification.get('summary', 'No summary available.')} ## Full Text ``` {full_text[:10000]} ``` ## Files - **Original:** [store/{hash_id}{filepath.suffix}](../../store/{hash_id}{filepath.suffix}) """ record_path.write_text(content) return record_path def update_master_index(hash_id: str, filepath: Path, classification: Dict) -> None: """Update the master.json index.""" index_path = INDEX / "master.json" if index_path.exists(): with open(index_path) as f: data = json.load(f) else: data = { "version": "2.0", "created": datetime.now().strftime("%Y-%m-%d"), "documents": [], "stats": {"total": 0, "by_type": {}, "by_category": {}} } doc_entry = { "id": hash_id, "filename": filepath.name, "category": classification.get("category", "uncategorized"), "type": classification.get("doc_type", "unknown"), "date": classification.get("date"), "vendor": classification.get("vendor"), "amount": classification.get("amount"), "summary": classification.get("summary"), "processed": datetime.now().isoformat(), } # Check for duplicate if not any(d["id"] == hash_id for d in data["documents"]): data["documents"].append(doc_entry) data["stats"]["total"] = len(data["documents"]) # Update type/category stats dtype = classification.get("doc_type", "unknown") cat = classification.get("category", "uncategorized") data["stats"]["by_type"][dtype] = data["stats"]["by_type"].get(dtype, 0) + 1 data["stats"]["by_category"][cat] = data["stats"]["by_category"].get(cat, 0) + 1 with open(index_path, 'w') as f: json.dump(data, f, indent=2) def export_expense(hash_id: str, classification: Dict, filepath: Path) -> None: """Append to expenses.csv if it's an expense/receipt.""" if classification.get("category") not in ["expenses", "bills"]: return csv_path = EXPORTS / "expenses.csv" file_exists = csv_path.exists() with open(csv_path, 'a', newline='') as f: writer = csv.writer(f) if not file_exists: writer.writerow(["date", "vendor", "amount", "category", "type", "doc_id", "filename"]) writer.writerow([ classification.get("date", ""), classification.get("vendor", ""), classification.get("amount", ""), classification.get("category", ""), classification.get("doc_type", ""), hash_id, filepath.name, ]) def process_document(filepath: Path, client: anthropic.Anthropic) -> bool: """Process a single document through the full pipeline.""" print(f"Processing: {filepath.name}") # Skip hidden files if filepath.name.startswith('.'): return False valid_extensions = {'.pdf', '.png', '.jpg', '.jpeg', '.gif', '.webp', '.tiff', '.tif', '.bmp'} if filepath.suffix.lower() not in valid_extensions: print(f" Skipping unsupported format: {filepath.suffix}") return False # 1. Generate hash hash_id = file_hash(filepath) print(f" Hash: {hash_id}") # 2. Check if already processed store_path = STORE / f"{hash_id}{filepath.suffix.lower()}" if store_path.exists(): print(f" Already processed, removing from inbox") filepath.unlink() return True # 3. Analyze with AI (extracts text + classifies in one pass) classification = analyze_document_with_ai(filepath, client) full_text = classification.get("full_text", "") print(f" Category: {classification.get('category')}, Type: {classification.get('doc_type')}") print(f" Extracted {len(full_text)} characters") # 4. Store original document print(" Storing document...") store_document(filepath, hash_id) # 5. Create markdown record print(" Creating record...") record_path = create_record(filepath, hash_id, classification) print(f" Record: {record_path}") # 6. Update JSON index print(" Updating index...") update_master_index(hash_id, filepath, classification) # 7. Store in SQLite (for search) print(" Storing in SQLite...") store_document_metadata(hash_id, filepath.name, classification, full_text) # 8. Generate and store embedding (if implemented) embedding = generate_embedding(full_text, client) if embedding: store_embedding(hash_id, embedding, full_text) # 9. Export if expense export_expense(hash_id, classification, filepath) # 10. Remove from inbox print(" Removing from inbox...") filepath.unlink() print(f" ✓ Done: {classification.get('category')}/{hash_id}") return True def process_inbox(client: anthropic.Anthropic) -> int: """Process all documents in inbox. Returns count processed.""" count = 0 for filepath in sorted(INBOX.iterdir()): if filepath.is_file() and not filepath.name.startswith('.'): try: if process_document(filepath, client): count += 1 except Exception as e: print(f"Error processing {filepath}: {e}") import traceback traceback.print_exc() return count def watch_inbox(client: anthropic.Anthropic, interval: int = 60) -> None: """Watch inbox continuously.""" print(f"Watching {INBOX} (interval: {interval}s)") print("Press Ctrl+C to stop") while True: count = process_inbox(client) if count: print(f"Processed {count} document(s)") time.sleep(interval) def main(): parser = argparse.ArgumentParser(description="AI-powered document processor") parser.add_argument("--watch", action="store_true", help="Watch inbox continuously") parser.add_argument("--interval", type=int, default=60, help="Watch interval in seconds") parser.add_argument("--file", type=Path, help="Process single file") args = parser.parse_args() # Initialize init_embeddings_db() try: client = get_anthropic_client() except RuntimeError as e: print(f"ERROR: {e}") sys.exit(1) if args.file: if args.file.exists(): process_document(args.file, client) else: print(f"File not found: {args.file}") sys.exit(1) elif args.watch: watch_inbox(client, args.interval) else: count = process_inbox(client) print(f"Processed {count} document(s)") if __name__ == "__main__": main()