From fb3d5a46b525ee6da27b219909374532e0758a28 Mon Sep 17 00:00:00 2001 From: Johan Jongsma Date: Sun, 1 Feb 2026 17:24:05 +0000 Subject: [PATCH] Replace OCR with AI vision, SQLite for storage - Remove Tesseract/OCR dependencies - Use Claude vision API for document analysis - Single AI pass: extract text + classify + summarize - SQLite database for documents and embeddings - Embeddings storage ready (generation placeholder) - Full-text search via SQLite - Updated systemd service to use venv - Support .env file for API key --- .gitignore | 4 + README.md | 188 ++++++++++--------- processor.py | 503 +++++++++++++++++++++++++++++++++------------------ search.py | 257 ++++++++++++++++---------- 4 files changed, 598 insertions(+), 354 deletions(-) create mode 100644 .gitignore diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..4936acf --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +venv/ +.env +__pycache__/ +*.pyc diff --git a/README.md b/README.md index da6a221..e22d184 100644 --- a/README.md +++ b/README.md @@ -1,105 +1,119 @@ -# Document Management System +# Document Processor -Automated document processing pipeline for scanning, OCR, classification, and indexing. +AI-powered document management system using Claude vision for extraction and SQLite for storage/search. -## Architecture +## Features + +- **AI Vision Analysis**: Uses Claude to read documents, extract text, classify, and summarize +- **No OCR dependencies**: Just drop files in inbox, AI handles the rest +- **SQLite Storage**: Full-text search via SQLite, embeddings ready (placeholder) +- **Auto-categorization**: Taxes, bills, medical, insurance, legal, financial, etc. +- **Expense Tracking**: Auto-exports bills/receipts to CSV + +## Setup + +```bash +cd ~/dev/doc-processor + +# Create/activate venv +python3 -m venv venv +source venv/bin/activate + +# Install dependencies +pip install anthropic + +# Configure API key (one of these methods): +# Option 1: Environment variable +export ANTHROPIC_API_KEY=sk-ant-... + +# Option 2: .env file +echo 'ANTHROPIC_API_KEY=sk-ant-...' > .env +``` + +## Usage + +```bash +# Activate venv first +source ~/dev/doc-processor/venv/bin/activate + +# Process all documents in inbox +python processor.py + +# Watch inbox continuously +python processor.py --watch + +# Process single file +python processor.py --file /path/to/document.pdf + +# Search documents +python search.py "query" +python search.py -c medical # By category +python search.py -t receipt # By type +python search.py -s abc123 # Show full document +python search.py --stats # Statistics +python search.py -l # List all +``` + +## Directory Structure ``` ~/documents/ -├── inbox/ # Drop documents here (SMB share for scanner) -├── store/ # Original files stored by hash -├── records/ # Markdown records by category -│ ├── bills/ +├── inbox/ # Drop files here (SMB share for scanner) +├── store/ # Original files (hash-named) +├── records/ # Markdown records by category │ ├── taxes/ +│ ├── bills/ │ ├── medical/ -│ ├── expenses/ │ └── ... -├── index/ # Search index -│ └── master.json -└── exports/ # CSV exports - └── expenses.csv +├── index/ +│ ├── master.json # JSON index +│ └── embeddings.db # SQLite (documents + embeddings) +└── exports/ + └── expenses.csv # Auto-exported expenses ``` -## How It Works +## Supported Formats -1. **Drop a document** in `~/documents/inbox/` (via SMB, phone scan, or manually) -2. **Daemon processes it** (runs every 60 seconds): - - Extracts text (pdftotext or tesseract OCR) - - Classifies document type and category - - Extracts key fields (date, vendor, amount) - - Stores original file by content hash - - Creates markdown record - - Updates searchable index - - Exports expenses to CSV -3. **Search** your documents anytime - -## Commands - -```bash -# Process inbox manually -python3 ~/dev/doc-processor/processor.py - -# Process single file -python3 ~/dev/doc-processor/processor.py --file /path/to/doc.pdf - -# Watch mode (manual, daemon does this automatically) -python3 ~/dev/doc-processor/processor.py --watch --interval 30 - -# Search documents -python3 ~/dev/doc-processor/search.py "duke energy" -python3 ~/dev/doc-processor/search.py -c bills # By category -python3 ~/dev/doc-processor/search.py -t receipt # By type -python3 ~/dev/doc-processor/search.py --stats # Statistics -python3 ~/dev/doc-processor/search.py -l # List all -python3 ~/dev/doc-processor/search.py -s # Show full record -``` - -## Daemon - -```bash -# Status -systemctl --user status doc-processor - -# Restart -systemctl --user restart doc-processor - -# Logs -journalctl --user -u doc-processor -f -``` - -## Scanner Setup - -1. Get a scanner with SMB support (Brother ADS-1700W, Fujitsu ScanSnap, etc.) -2. Configure scanner to save to SMB share: `\\192.168.1.16\documents\inbox\` -3. Feed paper, press scan -4. Documents auto-process within 60 seconds +- PDF (converted to image for vision) +- Images: PNG, JPG, JPEG, GIF, WebP, TIFF, BMP ## Categories -| Category | Documents | -|----------|-----------| -| taxes | W-2, 1099, tax returns, IRS forms | -| bills | Utility bills, invoices | -| medical | Medical records, prescriptions | -| insurance | Policies, claims | -| legal | Contracts, agreements | -| financial | Bank statements, investments | -| expenses | Receipts, purchases | -| vehicles | Registration, maintenance | -| home | Mortgage, HOA, property | -| personal | General documents | -| contacts | Business cards | -| uncategorized | Unclassified | +- taxes, bills, medical, insurance, legal +- financial, expenses, vehicles, home +- personal, contacts, uncategorized -## SMB Share Setup +## Systemd Service -Already configured on james server: -``` -[documents] - path = /home/johan/documents - browsable = yes - writable = yes - valid users = scanner, johan +```bash +# Install service +systemctl --user daemon-reload +systemctl --user enable doc-processor +systemctl --user start doc-processor + +# Check status +systemctl --user status doc-processor +journalctl --user -u doc-processor -f ``` -Scanner user can write to inbox, processed files go to other directories. +## Requirements + +- Python 3.10+ +- `anthropic` Python package +- `pdftoppm` (poppler-utils) for PDF conversion +- Anthropic API key + +## API Key + +The processor looks for the API key in this order: +1. `ANTHROPIC_API_KEY` environment variable +2. `~/dev/doc-processor/.env` file + +## Embeddings + +The embedding storage is ready but the generation is a placeholder. Options: +- OpenAI text-embedding-3-small (cheap, good) +- Voyage AI (optimized for documents) +- Local sentence-transformers + +Currently uses SQLite full-text search which works well for most use cases. diff --git a/processor.py b/processor.py index 8308463..8ae1cba 100755 --- a/processor.py +++ b/processor.py @@ -1,22 +1,31 @@ #!/usr/bin/env python3 """ Document Processor for ~/documents/inbox/ -Watches for new documents, OCRs them, classifies, and files them. +Uses AI vision (Claude) for document analysis. Stores embeddings in SQLite. """ import os import sys import json import hashlib -import subprocess import shutil import sqlite3 import csv +import base64 +import struct from datetime import datetime from pathlib import Path -from typing import Optional, Dict, Any -import re +from typing import Optional, Dict, Any, List import time +import argparse + +# Try to import anthropic, fail gracefully with helpful message +try: + import anthropic +except ImportError: + print("ERROR: anthropic package not installed") + print("Run: cd ~/dev/doc-processor && source venv/bin/activate && pip install anthropic") + sys.exit(1) # Paths DOCUMENTS_ROOT = Path.home() / "documents" @@ -25,6 +34,7 @@ STORE = DOCUMENTS_ROOT / "store" RECORDS = DOCUMENTS_ROOT / "records" INDEX = DOCUMENTS_ROOT / "index" EXPORTS = DOCUMENTS_ROOT / "exports" +EMBEDDINGS_DB = INDEX / "embeddings.db" # Categories CATEGORIES = [ @@ -40,149 +50,272 @@ for cat in CATEGORIES: (RECORDS / cat).mkdir(parents=True, exist_ok=True) +def get_anthropic_client() -> anthropic.Anthropic: + """Get Anthropic client, checking for API key.""" + api_key = os.environ.get("ANTHROPIC_API_KEY") + if not api_key: + # Try reading from config file + config_path = Path.home() / "dev/doc-processor/.env" + if config_path.exists(): + for line in config_path.read_text().splitlines(): + if line.startswith("ANTHROPIC_API_KEY="): + api_key = line.split("=", 1)[1].strip().strip('"\'') + break + + if not api_key: + raise RuntimeError( + "ANTHROPIC_API_KEY not set. Either:\n" + " 1. Set ANTHROPIC_API_KEY environment variable\n" + " 2. Create ~/dev/doc-processor/.env with ANTHROPIC_API_KEY=sk-ant-..." + ) + + return anthropic.Anthropic(api_key=api_key) + + +def init_embeddings_db(): + """Initialize SQLite database for embeddings.""" + conn = sqlite3.connect(EMBEDDINGS_DB) + conn.execute(""" + CREATE TABLE IF NOT EXISTS embeddings ( + doc_id TEXT PRIMARY KEY, + embedding BLOB, + text_hash TEXT, + created_at TEXT + ) + """) + conn.execute(""" + CREATE TABLE IF NOT EXISTS documents ( + doc_id TEXT PRIMARY KEY, + filename TEXT, + category TEXT, + doc_type TEXT, + date TEXT, + vendor TEXT, + amount TEXT, + summary TEXT, + full_text TEXT, + processed_at TEXT + ) + """) + conn.commit() + conn.close() + + def file_hash(filepath: Path) -> str: """SHA256 hash of file contents.""" h = hashlib.sha256() with open(filepath, 'rb') as f: for chunk in iter(lambda: f.read(8192), b''): h.update(chunk) - return h.hexdigest()[:16] # Short hash for filename + return h.hexdigest()[:16] -def extract_text_pdf(filepath: Path) -> str: - """Extract text from PDF using pdftotext.""" - try: - result = subprocess.run( - ['pdftotext', '-layout', str(filepath), '-'], - capture_output=True, text=True, timeout=30 - ) - text = result.stdout.strip() - if len(text) > 50: # Got meaningful text - return text - except Exception as e: - print(f"pdftotext failed: {e}") - - # Fallback to OCR - return ocr_document(filepath) - - -def ocr_document(filepath: Path) -> str: - """OCR a document using tesseract.""" - try: - # For PDFs, convert to images first - if filepath.suffix.lower() == '.pdf': - # Use pdftoppm to convert to images, then OCR - result = subprocess.run( - ['pdftoppm', '-png', '-r', '300', str(filepath), '/tmp/doc_page'], - capture_output=True, timeout=60 - ) - # OCR all pages - text_parts = [] - for img in sorted(Path('/tmp').glob('doc_page-*.png')): - result = subprocess.run( - ['tesseract', str(img), 'stdout'], - capture_output=True, text=True, timeout=60 - ) - text_parts.append(result.stdout) - img.unlink() # Clean up - return '\n'.join(text_parts).strip() - else: - # Direct image OCR - result = subprocess.run( - ['tesseract', str(filepath), 'stdout'], - capture_output=True, text=True, timeout=60 - ) - return result.stdout.strip() - except Exception as e: - print(f"OCR failed: {e}") - return "" - - -def extract_text(filepath: Path) -> str: - """Extract text from document based on type.""" +def encode_image_base64(filepath: Path) -> tuple[str, str]: + """Encode image/PDF to base64 for API. Returns (base64_data, media_type).""" suffix = filepath.suffix.lower() + if suffix == '.pdf': - return extract_text_pdf(filepath) - elif suffix in ['.png', '.jpg', '.jpeg', '.tiff', '.tif', '.bmp']: - return ocr_document(filepath) - elif suffix in ['.txt', '.md']: - return filepath.read_text() - else: - return "" - - -def classify_document(text: str, filename: str) -> Dict[str, Any]: - """ - Classify document based on content. - Returns: {category, doc_type, date, vendor, amount, summary} - """ - text_lower = text.lower() - result = { - "category": "uncategorized", - "doc_type": "unknown", - "date": None, - "vendor": None, - "amount": None, - "summary": None, + # For PDFs, convert first page to PNG using pdftoppm + import subprocess + result = subprocess.run( + ['pdftoppm', '-png', '-f', '1', '-l', '1', '-r', '150', str(filepath), '-'], + capture_output=True, timeout=30 + ) + if result.returncode == 0: + return base64.standard_b64encode(result.stdout).decode('utf-8'), 'image/png' + else: + raise RuntimeError(f"Failed to convert PDF: {result.stderr.decode()}") + + # Image files + media_types = { + '.png': 'image/png', + '.jpg': 'image/jpeg', + '.jpeg': 'image/jpeg', + '.gif': 'image/gif', + '.webp': 'image/webp', } + media_type = media_types.get(suffix, 'image/png') - # Date extraction (various formats) - date_patterns = [ - r'(\d{1,2}[/-]\d{1,2}[/-]\d{2,4})', - r'(\d{4}[/-]\d{1,2}[/-]\d{1,2})', - r'((?:jan|feb|mar|apr|may|jun|jul|aug|sep|oct|nov|dec)[a-z]* \d{1,2},? \d{4})', - ] - for pattern in date_patterns: - match = re.search(pattern, text_lower) - if match: - result["date"] = match.group(1) - break + with open(filepath, 'rb') as f: + return base64.standard_b64encode(f.read()).decode('utf-8'), media_type + + +def analyze_document_with_ai(filepath: Path, client: anthropic.Anthropic) -> Dict[str, Any]: + """ + Use Claude vision to analyze document. + Returns: {category, doc_type, date, vendor, amount, summary, full_text} + """ + print(f" Analyzing with AI...") - # Amount extraction - amount_match = re.search(r'\$[\d,]+\.?\d*', text) - if amount_match: - result["amount"] = amount_match.group(0) + try: + image_data, media_type = encode_image_base64(filepath) + except Exception as e: + print(f" Failed to encode document: {e}") + return { + "category": "uncategorized", + "doc_type": "unknown", + "full_text": f"(Failed to process: {e})", + "summary": "Document could not be processed" + } - # Classification rules - if any(x in text_lower for x in ['w-2', 'w2', '1099', 'tax return', 'irs', '1040', 'schedule c', 'form 1098']): - result["category"] = "taxes" - result["doc_type"] = "tax_form" - elif any(x in text_lower for x in ['invoice', 'bill', 'amount due', 'payment due', 'account number', 'autopay']): - result["category"] = "bills" - result["doc_type"] = "bill" - # Try to extract vendor - vendors = ['duke energy', 'fpl', 'florida power', 'spectrum', 'at&t', 'verizon', 't-mobile', 'comcast', 'xfinity'] - for v in vendors: - if v in text_lower: - result["vendor"] = v.title() - break - elif any(x in text_lower for x in ['patient', 'diagnosis', 'prescription', 'medical', 'physician', 'hospital', 'clinic', 'dr.', 'md']): - result["category"] = "medical" - result["doc_type"] = "medical_record" - elif any(x in text_lower for x in ['policy', 'coverage', 'premium', 'deductible', 'insurance', 'claim']): - result["category"] = "insurance" - result["doc_type"] = "insurance_doc" - elif any(x in text_lower for x in ['agreement', 'contract', 'terms', 'hereby', 'whereas', 'attorney', 'legal']): - result["category"] = "legal" - result["doc_type"] = "legal_doc" - elif any(x in text_lower for x in ['bank', 'statement', 'account', 'balance', 'deposit', 'withdrawal', 'investment', 'portfolio']): - result["category"] = "financial" - result["doc_type"] = "financial_statement" - elif any(x in text_lower for x in ['receipt', 'purchase', 'order', 'subtotal', 'total', 'qty', 'item']): - result["category"] = "expenses" - result["doc_type"] = "receipt" - elif any(x in text_lower for x in ['vin', 'vehicle', 'registration', 'dmv', 'license plate', 'odometer']): - result["category"] = "vehicles" - result["doc_type"] = "vehicle_doc" - elif any(x in text_lower for x in ['mortgage', 'deed', 'property', 'hoa', 'homeowner']): - result["category"] = "home" - result["doc_type"] = "property_doc" + prompt = """Analyze this document image and extract: + +1. **Full Text**: Transcribe ALL visible text from the document, preserving structure where possible. + +2. **Classification**: Categorize into exactly ONE of: + - taxes (W-2, 1099, tax returns, IRS forms) + - bills (utilities, subscriptions, invoices) + - medical (health records, prescriptions, lab results) + - insurance (policies, claims, coverage docs) + - legal (contracts, agreements, legal notices) + - financial (bank statements, investment docs) + - expenses (receipts, purchase confirmations) + - vehicles (registration, maintenance, DMV) + - home (mortgage, HOA, property docs) + - personal (ID copies, certificates, misc) + - contacts (business cards, contact info) + - uncategorized (if none fit) + +3. **Document Type**: Specific type (e.g., "utility_bill", "receipt", "tax_form_w2", "insurance_policy") + +4. **Key Fields**: + - date: Document date (YYYY-MM-DD format if possible) + - vendor: Company/organization name + - amount: Dollar amount if present (e.g., "$123.45") + +5. **Summary**: 1-2 sentence description of what this document is. + +Respond in JSON format: +{ + "category": "...", + "doc_type": "...", + "date": "...", + "vendor": "...", + "amount": "...", + "summary": "...", + "full_text": "..." +}""" + + try: + response = client.messages.create( + model="claude-sonnet-4-20250514", + max_tokens=4096, + messages=[ + { + "role": "user", + "content": [ + { + "type": "image", + "source": { + "type": "base64", + "media_type": media_type, + "data": image_data, + }, + }, + { + "type": "text", + "text": prompt + } + ], + } + ], + ) + + # Parse JSON from response + text = response.content[0].text + + # Try to extract JSON from response (handle markdown code blocks) + if "```json" in text: + text = text.split("```json")[1].split("```")[0] + elif "```" in text: + text = text.split("```")[1].split("```")[0] + + result = json.loads(text.strip()) + + # Validate category + if result.get("category") not in CATEGORIES: + result["category"] = "uncategorized" + + return result + + except json.JSONDecodeError as e: + print(f" Failed to parse AI response as JSON: {e}") + print(f" Raw response: {text[:500]}") + return { + "category": "uncategorized", + "doc_type": "unknown", + "full_text": text, + "summary": "AI response could not be parsed" + } + except Exception as e: + print(f" AI analysis failed: {e}") + return { + "category": "uncategorized", + "doc_type": "unknown", + "full_text": f"(AI analysis failed: {e})", + "summary": "Document analysis failed" + } + + +def generate_embedding(text: str, client: anthropic.Anthropic) -> Optional[List[float]]: + """ + Generate text embedding using Anthropic's embedding endpoint. + Note: As of 2024, Anthropic doesn't have a public embedding API. + This is a placeholder - implement with OpenAI, Voyage, or local model. - # Generate summary (first 200 chars, cleaned) - clean_text = ' '.join(text.split())[:200] - result["summary"] = clean_text + For now, returns None and we'll use full-text search in SQLite. + """ + # TODO: Implement with preferred embedding provider + # Options: + # 1. OpenAI text-embedding-3-small (cheap, good quality) + # 2. Voyage AI (good for documents) + # 3. Local sentence-transformers + return None + + +def store_embedding(doc_id: str, embedding: Optional[List[float]], text: str): + """Store embedding in SQLite database.""" + if embedding is None: + return - return result + conn = sqlite3.connect(EMBEDDINGS_DB) + + # Pack floats as binary blob + embedding_blob = struct.pack(f'{len(embedding)}f', *embedding) + text_hash = hashlib.sha256(text.encode()).hexdigest()[:16] + + conn.execute(""" + INSERT OR REPLACE INTO embeddings (doc_id, embedding, text_hash, created_at) + VALUES (?, ?, ?, ?) + """, (doc_id, embedding_blob, text_hash, datetime.now().isoformat())) + + conn.commit() + conn.close() + + +def store_document_metadata(doc_id: str, filename: str, classification: Dict, full_text: str): + """Store document metadata in SQLite for full-text search.""" + conn = sqlite3.connect(EMBEDDINGS_DB) + + conn.execute(""" + INSERT OR REPLACE INTO documents + (doc_id, filename, category, doc_type, date, vendor, amount, summary, full_text, processed_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, ( + doc_id, + filename, + classification.get("category", "uncategorized"), + classification.get("doc_type", "unknown"), + classification.get("date"), + classification.get("vendor"), + classification.get("amount"), + classification.get("summary"), + full_text[:50000], # Limit text size + datetime.now().isoformat() + )) + + conn.commit() + conn.close() def store_document(filepath: Path, hash_id: str) -> Path: @@ -194,14 +327,16 @@ def store_document(filepath: Path, hash_id: str) -> Path: return store_path -def create_record(filepath: Path, hash_id: str, text: str, classification: Dict) -> Path: +def create_record(filepath: Path, hash_id: str, classification: Dict) -> Path: """Create markdown record in appropriate category folder.""" - cat = classification["category"] + cat = classification.get("category", "uncategorized") now = datetime.now() record_name = f"{now.strftime('%Y%m%d')}_{hash_id}.md" record_path = RECORDS / cat / record_name + full_text = classification.get("full_text", "") + content = f"""# Document Record **ID:** {hash_id} @@ -225,12 +360,12 @@ def create_record(filepath: Path, hash_id: str, text: str, classification: Dict) ## Full Text ``` -{text[:5000]} +{full_text[:10000]} ``` ## Files -- **PDF:** [store/{hash_id}{filepath.suffix}](../../store/{hash_id}{filepath.suffix}) +- **Original:** [store/{hash_id}{filepath.suffix}](../../store/{hash_id}{filepath.suffix}) """ record_path.write_text(content) @@ -245,15 +380,22 @@ def update_master_index(hash_id: str, filepath: Path, classification: Dict) -> N with open(index_path) as f: data = json.load(f) else: - data = {"version": "1.0", "created": datetime.now().strftime("%Y-%m-%d"), "documents": [], "stats": {"total": 0, "by_type": {}, "by_year": {}}} + data = { + "version": "2.0", + "created": datetime.now().strftime("%Y-%m-%d"), + "documents": [], + "stats": {"total": 0, "by_type": {}, "by_category": {}} + } doc_entry = { "id": hash_id, "filename": filepath.name, - "category": classification["category"], + "category": classification.get("category", "uncategorized"), "type": classification.get("doc_type", "unknown"), "date": classification.get("date"), + "vendor": classification.get("vendor"), "amount": classification.get("amount"), + "summary": classification.get("summary"), "processed": datetime.now().isoformat(), } @@ -262,9 +404,11 @@ def update_master_index(hash_id: str, filepath: Path, classification: Dict) -> N data["documents"].append(doc_entry) data["stats"]["total"] = len(data["documents"]) - # Update type stats + # Update type/category stats dtype = classification.get("doc_type", "unknown") + cat = classification.get("category", "uncategorized") data["stats"]["by_type"][dtype] = data["stats"]["by_type"].get(dtype, 0) + 1 + data["stats"]["by_category"][cat] = data["stats"]["by_category"].get(cat, 0) + 1 with open(index_path, 'w') as f: json.dump(data, f, indent=2) @@ -272,7 +416,7 @@ def update_master_index(hash_id: str, filepath: Path, classification: Dict) -> N def export_expense(hash_id: str, classification: Dict, filepath: Path) -> None: """Append to expenses.csv if it's an expense/receipt.""" - if classification["category"] not in ["expenses", "bills"]: + if classification.get("category") not in ["expenses", "bills"]: return csv_path = EXPORTS / "expenses.csv" @@ -287,22 +431,22 @@ def export_expense(hash_id: str, classification: Dict, filepath: Path) -> None: classification.get("date", ""), classification.get("vendor", ""), classification.get("amount", ""), - classification["category"], + classification.get("category", ""), classification.get("doc_type", ""), hash_id, filepath.name, ]) -def process_document(filepath: Path) -> bool: +def process_document(filepath: Path, client: anthropic.Anthropic) -> bool: """Process a single document through the full pipeline.""" print(f"Processing: {filepath.name}") - # Skip hidden files and non-documents + # Skip hidden files if filepath.name.startswith('.'): return False - valid_extensions = {'.pdf', '.png', '.jpg', '.jpeg', '.tiff', '.tif', '.bmp', '.txt'} + valid_extensions = {'.pdf', '.png', '.jpg', '.jpeg', '.gif', '.webp', '.tiff', '.tif', '.bmp'} if filepath.suffix.lower() not in valid_extensions: print(f" Skipping unsupported format: {filepath.suffix}") return False @@ -318,87 +462,98 @@ def process_document(filepath: Path) -> bool: filepath.unlink() return True - # 3. Extract text (OCR if needed) - print(" Extracting text...") - text = extract_text(filepath) - if not text: - print(" Warning: No text extracted") - text = "(No text could be extracted)" - else: - print(f" Extracted {len(text)} characters") + # 3. Analyze with AI (extracts text + classifies in one pass) + classification = analyze_document_with_ai(filepath, client) + full_text = classification.get("full_text", "") + print(f" Category: {classification.get('category')}, Type: {classification.get('doc_type')}") + print(f" Extracted {len(full_text)} characters") - # 4. Classify - print(" Classifying...") - classification = classify_document(text, filepath.name) - print(f" Category: {classification['category']}, Type: {classification.get('doc_type')}") - - # 5. Store PDF + # 4. Store original document print(" Storing document...") store_document(filepath, hash_id) - # 6. Create record + # 5. Create markdown record print(" Creating record...") - record_path = create_record(filepath, hash_id, text, classification) + record_path = create_record(filepath, hash_id, classification) print(f" Record: {record_path}") - # 7. Update index + # 6. Update JSON index print(" Updating index...") update_master_index(hash_id, filepath, classification) - # 8. Export if expense + # 7. Store in SQLite (for search) + print(" Storing in SQLite...") + store_document_metadata(hash_id, filepath.name, classification, full_text) + + # 8. Generate and store embedding (if implemented) + embedding = generate_embedding(full_text, client) + if embedding: + store_embedding(hash_id, embedding, full_text) + + # 9. Export if expense export_expense(hash_id, classification, filepath) - # 9. Remove from inbox + # 10. Remove from inbox print(" Removing from inbox...") filepath.unlink() - print(f" ✓ Done: {classification['category']}/{hash_id}") + print(f" ✓ Done: {classification.get('category')}/{hash_id}") return True -def process_inbox() -> int: +def process_inbox(client: anthropic.Anthropic) -> int: """Process all documents in inbox. Returns count processed.""" count = 0 - for filepath in INBOX.iterdir(): + for filepath in sorted(INBOX.iterdir()): if filepath.is_file() and not filepath.name.startswith('.'): try: - if process_document(filepath): + if process_document(filepath, client): count += 1 except Exception as e: print(f"Error processing {filepath}: {e}") + import traceback + traceback.print_exc() return count -def watch_inbox(interval: int = 30) -> None: +def watch_inbox(client: anthropic.Anthropic, interval: int = 60) -> None: """Watch inbox continuously.""" print(f"Watching {INBOX} (interval: {interval}s)") print("Press Ctrl+C to stop") while True: - count = process_inbox() + count = process_inbox(client) if count: print(f"Processed {count} document(s)") time.sleep(interval) def main(): - import argparse - parser = argparse.ArgumentParser(description="Document processor") + parser = argparse.ArgumentParser(description="AI-powered document processor") parser.add_argument("--watch", action="store_true", help="Watch inbox continuously") - parser.add_argument("--interval", type=int, default=30, help="Watch interval in seconds") + parser.add_argument("--interval", type=int, default=60, help="Watch interval in seconds") parser.add_argument("--file", type=Path, help="Process single file") args = parser.parse_args() + # Initialize + init_embeddings_db() + + try: + client = get_anthropic_client() + except RuntimeError as e: + print(f"ERROR: {e}") + sys.exit(1) + if args.file: if args.file.exists(): - process_document(args.file) + process_document(args.file, client) else: print(f"File not found: {args.file}") sys.exit(1) elif args.watch: - watch_inbox(args.interval) + watch_inbox(client, args.interval) else: - count = process_inbox() + count = process_inbox(client) print(f"Processed {count} document(s)") diff --git a/search.py b/search.py index 37e07b1..8bfda83 100755 --- a/search.py +++ b/search.py @@ -1,11 +1,13 @@ #!/usr/bin/env python3 """ Search documents in the document management system. +Uses SQLite full-text search on document content. """ import os import sys import json +import sqlite3 import argparse from pathlib import Path from datetime import datetime @@ -13,108 +15,184 @@ from datetime import datetime DOCUMENTS_ROOT = Path.home() / "documents" INDEX = DOCUMENTS_ROOT / "index" RECORDS = DOCUMENTS_ROOT / "records" +EMBEDDINGS_DB = INDEX / "embeddings.db" -def load_index() -> dict: - """Load the master index.""" - index_path = INDEX / "master.json" - if index_path.exists(): - with open(index_path) as f: - return json.load(f) - return {"documents": []} +def get_db() -> sqlite3.Connection: + """Get database connection.""" + if not EMBEDDINGS_DB.exists(): + print(f"Database not found: {EMBEDDINGS_DB}") + print("Run the processor first to create the database.") + sys.exit(1) + return sqlite3.connect(EMBEDDINGS_DB) -def search_documents(query: str, category: str = None, doc_type: str = None) -> list: - """Search documents by query, optionally filtered by category/type.""" - data = load_index() - results = [] +def search_documents(query: str, category: str = None, doc_type: str = None, limit: int = 20) -> list: + """ + Search documents by query using SQLite full-text search. + Returns list of matching documents. + """ + conn = get_db() + conn.row_factory = sqlite3.Row - query_lower = query.lower() if query else "" + # Build query + conditions = [] + params = [] - for doc in data["documents"]: - # Apply filters - if category and doc.get("category") != category: - continue - if doc_type and doc.get("type") != doc_type: - continue - - # If no query, return all matching filters - if not query: - results.append(doc) - continue - - # Search in indexed fields - searchable = f"{doc.get('filename', '')} {doc.get('category', '')} {doc.get('type', '')} {doc.get('date', '')} {doc.get('amount', '')}".lower() - if query_lower in searchable: - results.append(doc) - continue - - # Search in full text record - record_path = find_record(doc["id"], doc["category"]) - if record_path and record_path.exists(): - content = record_path.read_text().lower() - if query_lower in content: - results.append(doc) + if query: + # Search in full_text, summary, vendor, filename + conditions.append("""( + full_text LIKE ? OR + summary LIKE ? OR + vendor LIKE ? OR + filename LIKE ? + )""") + like_query = f"%{query}%" + params.extend([like_query, like_query, like_query, like_query]) + + if category: + conditions.append("category = ?") + params.append(category) + + if doc_type: + conditions.append("doc_type = ?") + params.append(doc_type) + + where_clause = " AND ".join(conditions) if conditions else "1=1" + + sql = f""" + SELECT doc_id, filename, category, doc_type, date, vendor, amount, summary, processed_at + FROM documents + WHERE {where_clause} + ORDER BY processed_at DESC + LIMIT ? + """ + params.append(limit) + + cursor = conn.execute(sql, params) + results = [dict(row) for row in cursor.fetchall()] + conn.close() return results -def find_record(doc_id: str, category: str) -> Path: - """Find the record file for a document.""" - cat_dir = RECORDS / category - if cat_dir.exists(): - for f in cat_dir.iterdir(): - if doc_id in f.name: - return f - return None +def get_document(doc_id: str) -> dict: + """Get full document details by ID.""" + conn = get_db() + conn.row_factory = sqlite3.Row + + cursor = conn.execute(""" + SELECT * FROM documents WHERE doc_id = ? OR doc_id LIKE ? + """, (doc_id, f"{doc_id}%")) + + row = cursor.fetchone() + conn.close() + + return dict(row) if row else None + + +def list_categories() -> dict: + """List all categories with document counts.""" + conn = get_db() + cursor = conn.execute(""" + SELECT category, COUNT(*) as count + FROM documents + GROUP BY category + ORDER BY count DESC + """) + results = {row[0]: row[1] for row in cursor.fetchall()} + conn.close() + return results + + +def list_types() -> dict: + """List all document types with counts.""" + conn = get_db() + cursor = conn.execute(""" + SELECT doc_type, COUNT(*) as count + FROM documents + GROUP BY doc_type + ORDER BY count DESC + """) + results = {row[0]: row[1] for row in cursor.fetchall()} + conn.close() + return results + + +def show_stats() -> None: + """Show document statistics.""" + conn = get_db() + + # Total count + total = conn.execute("SELECT COUNT(*) FROM documents").fetchone()[0] + + print("\n📊 Document Statistics") + print("=" * 40) + print(f"Total documents: {total}") + + # By category + print("\nBy category:") + for cat, count in list_categories().items(): + print(f" {cat}: {count}") + + # By type + print("\nBy type:") + for dtype, count in list_types().items(): + print(f" {dtype}: {count}") + + conn.close() def show_document(doc_id: str) -> None: """Show full details of a document.""" - data = load_index() + doc = get_document(doc_id) - for doc in data["documents"]: - if doc["id"] == doc_id or doc_id in doc.get("filename", ""): - print(f"\n{'='*60}") - print(f"Document: {doc['filename']}") - print(f"ID: {doc['id']}") - print(f"Category: {doc['category']}") - print(f"Type: {doc.get('type', 'unknown')}") - print(f"Date: {doc.get('date', 'N/A')}") - print(f"Amount: {doc.get('amount', 'N/A')}") - print(f"Processed: {doc.get('processed', 'N/A')}") - print(f"{'='*60}") - - # Show record content - record_path = find_record(doc["id"], doc["category"]) - if record_path: - print(f"\nRecord: {record_path}") - print("-"*60) - print(record_path.read_text()) - return + if not doc: + print(f"Document not found: {doc_id}") + return - print(f"Document not found: {doc_id}") + print(f"\n{'=' * 60}") + print(f"Document: {doc['filename']}") + print(f"ID: {doc['doc_id']}") + print(f"Category: {doc['category']}") + print(f"Type: {doc['doc_type'] or 'unknown'}") + print(f"Date: {doc['date'] or 'N/A'}") + print(f"Vendor: {doc['vendor'] or 'N/A'}") + print(f"Amount: {doc['amount'] or 'N/A'}") + print(f"Processed: {doc['processed_at']}") + print(f"{'=' * 60}") + + if doc['summary']: + print(f"\nSummary:\n{doc['summary']}") + + if doc['full_text']: + print(f"\n--- Full Text (first 2000 chars) ---\n") + print(doc['full_text'][:2000]) + if len(doc['full_text']) > 2000: + print(f"\n... [{len(doc['full_text']) - 2000} more characters]") -def list_stats() -> None: - """Show document statistics.""" - data = load_index() +def format_results(results: list) -> None: + """Format and print search results.""" + if not results: + print("No documents found") + return - print("\n📊 Document Statistics") - print("="*40) - print(f"Total documents: {data['stats']['total']}") + print(f"\nFound {len(results)} document(s):\n") - print("\nBy type:") - for dtype, count in sorted(data["stats"].get("by_type", {}).items()): - print(f" {dtype}: {count}") + # Header + print(f"{'ID':<10} {'Category':<12} {'Type':<18} {'Date':<12} {'Amount':<10} {'Filename'}") + print("-" * 90) - print("\nBy category:") - by_cat = {} - for doc in data["documents"]: - cat = doc.get("category", "unknown") - by_cat[cat] = by_cat.get(cat, 0) + 1 - for cat, count in sorted(by_cat.items()): - print(f" {cat}: {count}") + for doc in results: + doc_id = doc['doc_id'][:8] + cat = (doc['category'] or '')[:12] + dtype = (doc['doc_type'] or 'unknown')[:18] + date = (doc['date'] or '')[:12] + amount = (doc['amount'] or '')[:10] + filename = doc['filename'][:30] + + print(f"{doc_id:<10} {cat:<12} {dtype:<18} {date:<12} {amount:<10} {filename}") def main(): @@ -125,10 +203,12 @@ def main(): parser.add_argument("-s", "--show", help="Show full document by ID") parser.add_argument("--stats", action="store_true", help="Show statistics") parser.add_argument("-l", "--list", action="store_true", help="List all documents") + parser.add_argument("-n", "--limit", type=int, default=20, help="Max results (default: 20)") + parser.add_argument("--full-text", action="store_true", help="Show full text in results") args = parser.parse_args() if args.stats: - list_stats() + show_stats() return if args.show: @@ -136,17 +216,8 @@ def main(): return if args.list or args.query or args.category or args.type: - results = search_documents(args.query, args.category, args.type) - - if not results: - print("No documents found") - return - - print(f"\nFound {len(results)} document(s):\n") - for doc in results: - date = doc.get("date", "")[:10] if doc.get("date") else "" - amount = doc.get("amount", "") - print(f" [{doc['id'][:8]}] {doc['category']:12} {doc.get('type', ''):15} {date:12} {amount:10} {doc['filename']}") + results = search_documents(args.query, args.category, args.type, args.limit) + format_results(results) else: parser.print_help()