commit 9dac36681cb44062fd1c03646b7a4c95faf00f52 Author: Johan Jongsma Date: Sun Feb 1 12:22:54 2026 +0000 Document management system - processor and search diff --git a/README.md b/README.md new file mode 100644 index 0000000..da6a221 --- /dev/null +++ b/README.md @@ -0,0 +1,105 @@ +# Document Management System + +Automated document processing pipeline for scanning, OCR, classification, and indexing. + +## Architecture + +``` +~/documents/ +├── inbox/ # Drop documents here (SMB share for scanner) +├── store/ # Original files stored by hash +├── records/ # Markdown records by category +│ ├── bills/ +│ ├── taxes/ +│ ├── medical/ +│ ├── expenses/ +│ └── ... +├── index/ # Search index +│ └── master.json +└── exports/ # CSV exports + └── expenses.csv +``` + +## How It Works + +1. **Drop a document** in `~/documents/inbox/` (via SMB, phone scan, or manually) +2. **Daemon processes it** (runs every 60 seconds): + - Extracts text (pdftotext or tesseract OCR) + - Classifies document type and category + - Extracts key fields (date, vendor, amount) + - Stores original file by content hash + - Creates markdown record + - Updates searchable index + - Exports expenses to CSV +3. **Search** your documents anytime + +## Commands + +```bash +# Process inbox manually +python3 ~/dev/doc-processor/processor.py + +# Process single file +python3 ~/dev/doc-processor/processor.py --file /path/to/doc.pdf + +# Watch mode (manual, daemon does this automatically) +python3 ~/dev/doc-processor/processor.py --watch --interval 30 + +# Search documents +python3 ~/dev/doc-processor/search.py "duke energy" +python3 ~/dev/doc-processor/search.py -c bills # By category +python3 ~/dev/doc-processor/search.py -t receipt # By type +python3 ~/dev/doc-processor/search.py --stats # Statistics +python3 ~/dev/doc-processor/search.py -l # List all +python3 ~/dev/doc-processor/search.py -s # Show full record +``` + +## Daemon + +```bash +# Status +systemctl --user status doc-processor + +# Restart +systemctl --user restart doc-processor + +# Logs +journalctl --user -u doc-processor -f +``` + +## Scanner Setup + +1. Get a scanner with SMB support (Brother ADS-1700W, Fujitsu ScanSnap, etc.) +2. Configure scanner to save to SMB share: `\\192.168.1.16\documents\inbox\` +3. Feed paper, press scan +4. Documents auto-process within 60 seconds + +## Categories + +| Category | Documents | +|----------|-----------| +| taxes | W-2, 1099, tax returns, IRS forms | +| bills | Utility bills, invoices | +| medical | Medical records, prescriptions | +| insurance | Policies, claims | +| legal | Contracts, agreements | +| financial | Bank statements, investments | +| expenses | Receipts, purchases | +| vehicles | Registration, maintenance | +| home | Mortgage, HOA, property | +| personal | General documents | +| contacts | Business cards | +| uncategorized | Unclassified | + +## SMB Share Setup + +Already configured on james server: +``` +[documents] + path = /home/johan/documents + browsable = yes + writable = yes + valid users = scanner, johan +``` + +Scanner user can write to inbox, processed files go to other directories. diff --git a/processor.py b/processor.py new file mode 100755 index 0000000..8308463 --- /dev/null +++ b/processor.py @@ -0,0 +1,406 @@ +#!/usr/bin/env python3 +""" +Document Processor for ~/documents/inbox/ +Watches for new documents, OCRs them, classifies, and files them. +""" + +import os +import sys +import json +import hashlib +import subprocess +import shutil +import sqlite3 +import csv +from datetime import datetime +from pathlib import Path +from typing import Optional, Dict, Any +import re +import time + +# Paths +DOCUMENTS_ROOT = Path.home() / "documents" +INBOX = DOCUMENTS_ROOT / "inbox" +STORE = DOCUMENTS_ROOT / "store" +RECORDS = DOCUMENTS_ROOT / "records" +INDEX = DOCUMENTS_ROOT / "index" +EXPORTS = DOCUMENTS_ROOT / "exports" + +# Categories +CATEGORIES = [ + "taxes", "bills", "medical", "insurance", "legal", + "financial", "expenses", "vehicles", "home", + "personal", "contacts", "uncategorized" +] + +# Ensure directories exist +for d in [STORE, INDEX, EXPORTS]: + d.mkdir(parents=True, exist_ok=True) +for cat in CATEGORIES: + (RECORDS / cat).mkdir(parents=True, exist_ok=True) + + +def file_hash(filepath: Path) -> str: + """SHA256 hash of file contents.""" + h = hashlib.sha256() + with open(filepath, 'rb') as f: + for chunk in iter(lambda: f.read(8192), b''): + h.update(chunk) + return h.hexdigest()[:16] # Short hash for filename + + +def extract_text_pdf(filepath: Path) -> str: + """Extract text from PDF using pdftotext.""" + try: + result = subprocess.run( + ['pdftotext', '-layout', str(filepath), '-'], + capture_output=True, text=True, timeout=30 + ) + text = result.stdout.strip() + if len(text) > 50: # Got meaningful text + return text + except Exception as e: + print(f"pdftotext failed: {e}") + + # Fallback to OCR + return ocr_document(filepath) + + +def ocr_document(filepath: Path) -> str: + """OCR a document using tesseract.""" + try: + # For PDFs, convert to images first + if filepath.suffix.lower() == '.pdf': + # Use pdftoppm to convert to images, then OCR + result = subprocess.run( + ['pdftoppm', '-png', '-r', '300', str(filepath), '/tmp/doc_page'], + capture_output=True, timeout=60 + ) + # OCR all pages + text_parts = [] + for img in sorted(Path('/tmp').glob('doc_page-*.png')): + result = subprocess.run( + ['tesseract', str(img), 'stdout'], + capture_output=True, text=True, timeout=60 + ) + text_parts.append(result.stdout) + img.unlink() # Clean up + return '\n'.join(text_parts).strip() + else: + # Direct image OCR + result = subprocess.run( + ['tesseract', str(filepath), 'stdout'], + capture_output=True, text=True, timeout=60 + ) + return result.stdout.strip() + except Exception as e: + print(f"OCR failed: {e}") + return "" + + +def extract_text(filepath: Path) -> str: + """Extract text from document based on type.""" + suffix = filepath.suffix.lower() + if suffix == '.pdf': + return extract_text_pdf(filepath) + elif suffix in ['.png', '.jpg', '.jpeg', '.tiff', '.tif', '.bmp']: + return ocr_document(filepath) + elif suffix in ['.txt', '.md']: + return filepath.read_text() + else: + return "" + + +def classify_document(text: str, filename: str) -> Dict[str, Any]: + """ + Classify document based on content. + Returns: {category, doc_type, date, vendor, amount, summary} + """ + text_lower = text.lower() + result = { + "category": "uncategorized", + "doc_type": "unknown", + "date": None, + "vendor": None, + "amount": None, + "summary": None, + } + + # Date extraction (various formats) + date_patterns = [ + r'(\d{1,2}[/-]\d{1,2}[/-]\d{2,4})', + r'(\d{4}[/-]\d{1,2}[/-]\d{1,2})', + r'((?:jan|feb|mar|apr|may|jun|jul|aug|sep|oct|nov|dec)[a-z]* \d{1,2},? \d{4})', + ] + for pattern in date_patterns: + match = re.search(pattern, text_lower) + if match: + result["date"] = match.group(1) + break + + # Amount extraction + amount_match = re.search(r'\$[\d,]+\.?\d*', text) + if amount_match: + result["amount"] = amount_match.group(0) + + # Classification rules + if any(x in text_lower for x in ['w-2', 'w2', '1099', 'tax return', 'irs', '1040', 'schedule c', 'form 1098']): + result["category"] = "taxes" + result["doc_type"] = "tax_form" + elif any(x in text_lower for x in ['invoice', 'bill', 'amount due', 'payment due', 'account number', 'autopay']): + result["category"] = "bills" + result["doc_type"] = "bill" + # Try to extract vendor + vendors = ['duke energy', 'fpl', 'florida power', 'spectrum', 'at&t', 'verizon', 't-mobile', 'comcast', 'xfinity'] + for v in vendors: + if v in text_lower: + result["vendor"] = v.title() + break + elif any(x in text_lower for x in ['patient', 'diagnosis', 'prescription', 'medical', 'physician', 'hospital', 'clinic', 'dr.', 'md']): + result["category"] = "medical" + result["doc_type"] = "medical_record" + elif any(x in text_lower for x in ['policy', 'coverage', 'premium', 'deductible', 'insurance', 'claim']): + result["category"] = "insurance" + result["doc_type"] = "insurance_doc" + elif any(x in text_lower for x in ['agreement', 'contract', 'terms', 'hereby', 'whereas', 'attorney', 'legal']): + result["category"] = "legal" + result["doc_type"] = "legal_doc" + elif any(x in text_lower for x in ['bank', 'statement', 'account', 'balance', 'deposit', 'withdrawal', 'investment', 'portfolio']): + result["category"] = "financial" + result["doc_type"] = "financial_statement" + elif any(x in text_lower for x in ['receipt', 'purchase', 'order', 'subtotal', 'total', 'qty', 'item']): + result["category"] = "expenses" + result["doc_type"] = "receipt" + elif any(x in text_lower for x in ['vin', 'vehicle', 'registration', 'dmv', 'license plate', 'odometer']): + result["category"] = "vehicles" + result["doc_type"] = "vehicle_doc" + elif any(x in text_lower for x in ['mortgage', 'deed', 'property', 'hoa', 'homeowner']): + result["category"] = "home" + result["doc_type"] = "property_doc" + + # Generate summary (first 200 chars, cleaned) + clean_text = ' '.join(text.split())[:200] + result["summary"] = clean_text + + return result + + +def store_document(filepath: Path, hash_id: str) -> Path: + """Copy document to store with hash-based name.""" + suffix = filepath.suffix.lower() + store_path = STORE / f"{hash_id}{suffix}" + if not store_path.exists(): + shutil.copy2(filepath, store_path) + return store_path + + +def create_record(filepath: Path, hash_id: str, text: str, classification: Dict) -> Path: + """Create markdown record in appropriate category folder.""" + cat = classification["category"] + now = datetime.now() + + record_name = f"{now.strftime('%Y%m%d')}_{hash_id}.md" + record_path = RECORDS / cat / record_name + + content = f"""# Document Record + +**ID:** {hash_id} +**Original File:** {filepath.name} +**Processed:** {now.isoformat()} +**Category:** {cat} +**Type:** {classification.get('doc_type', 'unknown')} + +## Extracted Info + +| Field | Value | +|-------|-------| +| Date | {classification.get('date', 'N/A')} | +| Vendor | {classification.get('vendor', 'N/A')} | +| Amount | {classification.get('amount', 'N/A')} | + +## Summary + +{classification.get('summary', 'No summary available.')} + +## Full Text + +``` +{text[:5000]} +``` + +## Files + +- **PDF:** [store/{hash_id}{filepath.suffix}](../../store/{hash_id}{filepath.suffix}) +""" + + record_path.write_text(content) + return record_path + + +def update_master_index(hash_id: str, filepath: Path, classification: Dict) -> None: + """Update the master.json index.""" + index_path = INDEX / "master.json" + + if index_path.exists(): + with open(index_path) as f: + data = json.load(f) + else: + data = {"version": "1.0", "created": datetime.now().strftime("%Y-%m-%d"), "documents": [], "stats": {"total": 0, "by_type": {}, "by_year": {}}} + + doc_entry = { + "id": hash_id, + "filename": filepath.name, + "category": classification["category"], + "type": classification.get("doc_type", "unknown"), + "date": classification.get("date"), + "amount": classification.get("amount"), + "processed": datetime.now().isoformat(), + } + + # Check for duplicate + if not any(d["id"] == hash_id for d in data["documents"]): + data["documents"].append(doc_entry) + data["stats"]["total"] = len(data["documents"]) + + # Update type stats + dtype = classification.get("doc_type", "unknown") + data["stats"]["by_type"][dtype] = data["stats"]["by_type"].get(dtype, 0) + 1 + + with open(index_path, 'w') as f: + json.dump(data, f, indent=2) + + +def export_expense(hash_id: str, classification: Dict, filepath: Path) -> None: + """Append to expenses.csv if it's an expense/receipt.""" + if classification["category"] not in ["expenses", "bills"]: + return + + csv_path = EXPORTS / "expenses.csv" + file_exists = csv_path.exists() + + with open(csv_path, 'a', newline='') as f: + writer = csv.writer(f) + if not file_exists: + writer.writerow(["date", "vendor", "amount", "category", "type", "doc_id", "filename"]) + + writer.writerow([ + classification.get("date", ""), + classification.get("vendor", ""), + classification.get("amount", ""), + classification["category"], + classification.get("doc_type", ""), + hash_id, + filepath.name, + ]) + + +def process_document(filepath: Path) -> bool: + """Process a single document through the full pipeline.""" + print(f"Processing: {filepath.name}") + + # Skip hidden files and non-documents + if filepath.name.startswith('.'): + return False + + valid_extensions = {'.pdf', '.png', '.jpg', '.jpeg', '.tiff', '.tif', '.bmp', '.txt'} + if filepath.suffix.lower() not in valid_extensions: + print(f" Skipping unsupported format: {filepath.suffix}") + return False + + # 1. Generate hash + hash_id = file_hash(filepath) + print(f" Hash: {hash_id}") + + # 2. Check if already processed + store_path = STORE / f"{hash_id}{filepath.suffix.lower()}" + if store_path.exists(): + print(f" Already processed, removing from inbox") + filepath.unlink() + return True + + # 3. Extract text (OCR if needed) + print(" Extracting text...") + text = extract_text(filepath) + if not text: + print(" Warning: No text extracted") + text = "(No text could be extracted)" + else: + print(f" Extracted {len(text)} characters") + + # 4. Classify + print(" Classifying...") + classification = classify_document(text, filepath.name) + print(f" Category: {classification['category']}, Type: {classification.get('doc_type')}") + + # 5. Store PDF + print(" Storing document...") + store_document(filepath, hash_id) + + # 6. Create record + print(" Creating record...") + record_path = create_record(filepath, hash_id, text, classification) + print(f" Record: {record_path}") + + # 7. Update index + print(" Updating index...") + update_master_index(hash_id, filepath, classification) + + # 8. Export if expense + export_expense(hash_id, classification, filepath) + + # 9. Remove from inbox + print(" Removing from inbox...") + filepath.unlink() + + print(f" ✓ Done: {classification['category']}/{hash_id}") + return True + + +def process_inbox() -> int: + """Process all documents in inbox. Returns count processed.""" + count = 0 + for filepath in INBOX.iterdir(): + if filepath.is_file() and not filepath.name.startswith('.'): + try: + if process_document(filepath): + count += 1 + except Exception as e: + print(f"Error processing {filepath}: {e}") + return count + + +def watch_inbox(interval: int = 30) -> None: + """Watch inbox continuously.""" + print(f"Watching {INBOX} (interval: {interval}s)") + print("Press Ctrl+C to stop") + + while True: + count = process_inbox() + if count: + print(f"Processed {count} document(s)") + time.sleep(interval) + + +def main(): + import argparse + parser = argparse.ArgumentParser(description="Document processor") + parser.add_argument("--watch", action="store_true", help="Watch inbox continuously") + parser.add_argument("--interval", type=int, default=30, help="Watch interval in seconds") + parser.add_argument("--file", type=Path, help="Process single file") + args = parser.parse_args() + + if args.file: + if args.file.exists(): + process_document(args.file) + else: + print(f"File not found: {args.file}") + sys.exit(1) + elif args.watch: + watch_inbox(args.interval) + else: + count = process_inbox() + print(f"Processed {count} document(s)") + + +if __name__ == "__main__": + main() diff --git a/search.py b/search.py new file mode 100755 index 0000000..37e07b1 --- /dev/null +++ b/search.py @@ -0,0 +1,155 @@ +#!/usr/bin/env python3 +""" +Search documents in the document management system. +""" + +import os +import sys +import json +import argparse +from pathlib import Path +from datetime import datetime + +DOCUMENTS_ROOT = Path.home() / "documents" +INDEX = DOCUMENTS_ROOT / "index" +RECORDS = DOCUMENTS_ROOT / "records" + + +def load_index() -> dict: + """Load the master index.""" + index_path = INDEX / "master.json" + if index_path.exists(): + with open(index_path) as f: + return json.load(f) + return {"documents": []} + + +def search_documents(query: str, category: str = None, doc_type: str = None) -> list: + """Search documents by query, optionally filtered by category/type.""" + data = load_index() + results = [] + + query_lower = query.lower() if query else "" + + for doc in data["documents"]: + # Apply filters + if category and doc.get("category") != category: + continue + if doc_type and doc.get("type") != doc_type: + continue + + # If no query, return all matching filters + if not query: + results.append(doc) + continue + + # Search in indexed fields + searchable = f"{doc.get('filename', '')} {doc.get('category', '')} {doc.get('type', '')} {doc.get('date', '')} {doc.get('amount', '')}".lower() + if query_lower in searchable: + results.append(doc) + continue + + # Search in full text record + record_path = find_record(doc["id"], doc["category"]) + if record_path and record_path.exists(): + content = record_path.read_text().lower() + if query_lower in content: + results.append(doc) + + return results + + +def find_record(doc_id: str, category: str) -> Path: + """Find the record file for a document.""" + cat_dir = RECORDS / category + if cat_dir.exists(): + for f in cat_dir.iterdir(): + if doc_id in f.name: + return f + return None + + +def show_document(doc_id: str) -> None: + """Show full details of a document.""" + data = load_index() + + for doc in data["documents"]: + if doc["id"] == doc_id or doc_id in doc.get("filename", ""): + print(f"\n{'='*60}") + print(f"Document: {doc['filename']}") + print(f"ID: {doc['id']}") + print(f"Category: {doc['category']}") + print(f"Type: {doc.get('type', 'unknown')}") + print(f"Date: {doc.get('date', 'N/A')}") + print(f"Amount: {doc.get('amount', 'N/A')}") + print(f"Processed: {doc.get('processed', 'N/A')}") + print(f"{'='*60}") + + # Show record content + record_path = find_record(doc["id"], doc["category"]) + if record_path: + print(f"\nRecord: {record_path}") + print("-"*60) + print(record_path.read_text()) + return + + print(f"Document not found: {doc_id}") + + +def list_stats() -> None: + """Show document statistics.""" + data = load_index() + + print("\n📊 Document Statistics") + print("="*40) + print(f"Total documents: {data['stats']['total']}") + + print("\nBy type:") + for dtype, count in sorted(data["stats"].get("by_type", {}).items()): + print(f" {dtype}: {count}") + + print("\nBy category:") + by_cat = {} + for doc in data["documents"]: + cat = doc.get("category", "unknown") + by_cat[cat] = by_cat.get(cat, 0) + 1 + for cat, count in sorted(by_cat.items()): + print(f" {cat}: {count}") + + +def main(): + parser = argparse.ArgumentParser(description="Search documents") + parser.add_argument("query", nargs="?", help="Search query") + parser.add_argument("-c", "--category", help="Filter by category") + parser.add_argument("-t", "--type", help="Filter by document type") + parser.add_argument("-s", "--show", help="Show full document by ID") + parser.add_argument("--stats", action="store_true", help="Show statistics") + parser.add_argument("-l", "--list", action="store_true", help="List all documents") + args = parser.parse_args() + + if args.stats: + list_stats() + return + + if args.show: + show_document(args.show) + return + + if args.list or args.query or args.category or args.type: + results = search_documents(args.query, args.category, args.type) + + if not results: + print("No documents found") + return + + print(f"\nFound {len(results)} document(s):\n") + for doc in results: + date = doc.get("date", "")[:10] if doc.get("date") else "" + amount = doc.get("amount", "") + print(f" [{doc['id'][:8]}] {doc['category']:12} {doc.get('type', ''):15} {date:12} {amount:10} {doc['filename']}") + else: + parser.print_help() + + +if __name__ == "__main__": + main()