407 lines
13 KiB
Python
Executable File
407 lines
13 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Document Processor for ~/documents/inbox/
|
|
Watches for new documents, OCRs them, classifies, and files them.
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import json
|
|
import hashlib
|
|
import subprocess
|
|
import shutil
|
|
import sqlite3
|
|
import csv
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Optional, Dict, Any
|
|
import re
|
|
import time
|
|
|
|
# Paths
|
|
DOCUMENTS_ROOT = Path.home() / "documents"
|
|
INBOX = DOCUMENTS_ROOT / "inbox"
|
|
STORE = DOCUMENTS_ROOT / "store"
|
|
RECORDS = DOCUMENTS_ROOT / "records"
|
|
INDEX = DOCUMENTS_ROOT / "index"
|
|
EXPORTS = DOCUMENTS_ROOT / "exports"
|
|
|
|
# Categories
|
|
CATEGORIES = [
|
|
"taxes", "bills", "medical", "insurance", "legal",
|
|
"financial", "expenses", "vehicles", "home",
|
|
"personal", "contacts", "uncategorized"
|
|
]
|
|
|
|
# Ensure directories exist
|
|
for d in [STORE, INDEX, EXPORTS]:
|
|
d.mkdir(parents=True, exist_ok=True)
|
|
for cat in CATEGORIES:
|
|
(RECORDS / cat).mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
def file_hash(filepath: Path) -> str:
|
|
"""SHA256 hash of file contents."""
|
|
h = hashlib.sha256()
|
|
with open(filepath, 'rb') as f:
|
|
for chunk in iter(lambda: f.read(8192), b''):
|
|
h.update(chunk)
|
|
return h.hexdigest()[:16] # Short hash for filename
|
|
|
|
|
|
def extract_text_pdf(filepath: Path) -> str:
|
|
"""Extract text from PDF using pdftotext."""
|
|
try:
|
|
result = subprocess.run(
|
|
['pdftotext', '-layout', str(filepath), '-'],
|
|
capture_output=True, text=True, timeout=30
|
|
)
|
|
text = result.stdout.strip()
|
|
if len(text) > 50: # Got meaningful text
|
|
return text
|
|
except Exception as e:
|
|
print(f"pdftotext failed: {e}")
|
|
|
|
# Fallback to OCR
|
|
return ocr_document(filepath)
|
|
|
|
|
|
def ocr_document(filepath: Path) -> str:
|
|
"""OCR a document using tesseract."""
|
|
try:
|
|
# For PDFs, convert to images first
|
|
if filepath.suffix.lower() == '.pdf':
|
|
# Use pdftoppm to convert to images, then OCR
|
|
result = subprocess.run(
|
|
['pdftoppm', '-png', '-r', '300', str(filepath), '/tmp/doc_page'],
|
|
capture_output=True, timeout=60
|
|
)
|
|
# OCR all pages
|
|
text_parts = []
|
|
for img in sorted(Path('/tmp').glob('doc_page-*.png')):
|
|
result = subprocess.run(
|
|
['tesseract', str(img), 'stdout'],
|
|
capture_output=True, text=True, timeout=60
|
|
)
|
|
text_parts.append(result.stdout)
|
|
img.unlink() # Clean up
|
|
return '\n'.join(text_parts).strip()
|
|
else:
|
|
# Direct image OCR
|
|
result = subprocess.run(
|
|
['tesseract', str(filepath), 'stdout'],
|
|
capture_output=True, text=True, timeout=60
|
|
)
|
|
return result.stdout.strip()
|
|
except Exception as e:
|
|
print(f"OCR failed: {e}")
|
|
return ""
|
|
|
|
|
|
def extract_text(filepath: Path) -> str:
|
|
"""Extract text from document based on type."""
|
|
suffix = filepath.suffix.lower()
|
|
if suffix == '.pdf':
|
|
return extract_text_pdf(filepath)
|
|
elif suffix in ['.png', '.jpg', '.jpeg', '.tiff', '.tif', '.bmp']:
|
|
return ocr_document(filepath)
|
|
elif suffix in ['.txt', '.md']:
|
|
return filepath.read_text()
|
|
else:
|
|
return ""
|
|
|
|
|
|
def classify_document(text: str, filename: str) -> Dict[str, Any]:
|
|
"""
|
|
Classify document based on content.
|
|
Returns: {category, doc_type, date, vendor, amount, summary}
|
|
"""
|
|
text_lower = text.lower()
|
|
result = {
|
|
"category": "uncategorized",
|
|
"doc_type": "unknown",
|
|
"date": None,
|
|
"vendor": None,
|
|
"amount": None,
|
|
"summary": None,
|
|
}
|
|
|
|
# Date extraction (various formats)
|
|
date_patterns = [
|
|
r'(\d{1,2}[/-]\d{1,2}[/-]\d{2,4})',
|
|
r'(\d{4}[/-]\d{1,2}[/-]\d{1,2})',
|
|
r'((?:jan|feb|mar|apr|may|jun|jul|aug|sep|oct|nov|dec)[a-z]* \d{1,2},? \d{4})',
|
|
]
|
|
for pattern in date_patterns:
|
|
match = re.search(pattern, text_lower)
|
|
if match:
|
|
result["date"] = match.group(1)
|
|
break
|
|
|
|
# Amount extraction
|
|
amount_match = re.search(r'\$[\d,]+\.?\d*', text)
|
|
if amount_match:
|
|
result["amount"] = amount_match.group(0)
|
|
|
|
# Classification rules
|
|
if any(x in text_lower for x in ['w-2', 'w2', '1099', 'tax return', 'irs', '1040', 'schedule c', 'form 1098']):
|
|
result["category"] = "taxes"
|
|
result["doc_type"] = "tax_form"
|
|
elif any(x in text_lower for x in ['invoice', 'bill', 'amount due', 'payment due', 'account number', 'autopay']):
|
|
result["category"] = "bills"
|
|
result["doc_type"] = "bill"
|
|
# Try to extract vendor
|
|
vendors = ['duke energy', 'fpl', 'florida power', 'spectrum', 'at&t', 'verizon', 't-mobile', 'comcast', 'xfinity']
|
|
for v in vendors:
|
|
if v in text_lower:
|
|
result["vendor"] = v.title()
|
|
break
|
|
elif any(x in text_lower for x in ['patient', 'diagnosis', 'prescription', 'medical', 'physician', 'hospital', 'clinic', 'dr.', 'md']):
|
|
result["category"] = "medical"
|
|
result["doc_type"] = "medical_record"
|
|
elif any(x in text_lower for x in ['policy', 'coverage', 'premium', 'deductible', 'insurance', 'claim']):
|
|
result["category"] = "insurance"
|
|
result["doc_type"] = "insurance_doc"
|
|
elif any(x in text_lower for x in ['agreement', 'contract', 'terms', 'hereby', 'whereas', 'attorney', 'legal']):
|
|
result["category"] = "legal"
|
|
result["doc_type"] = "legal_doc"
|
|
elif any(x in text_lower for x in ['bank', 'statement', 'account', 'balance', 'deposit', 'withdrawal', 'investment', 'portfolio']):
|
|
result["category"] = "financial"
|
|
result["doc_type"] = "financial_statement"
|
|
elif any(x in text_lower for x in ['receipt', 'purchase', 'order', 'subtotal', 'total', 'qty', 'item']):
|
|
result["category"] = "expenses"
|
|
result["doc_type"] = "receipt"
|
|
elif any(x in text_lower for x in ['vin', 'vehicle', 'registration', 'dmv', 'license plate', 'odometer']):
|
|
result["category"] = "vehicles"
|
|
result["doc_type"] = "vehicle_doc"
|
|
elif any(x in text_lower for x in ['mortgage', 'deed', 'property', 'hoa', 'homeowner']):
|
|
result["category"] = "home"
|
|
result["doc_type"] = "property_doc"
|
|
|
|
# Generate summary (first 200 chars, cleaned)
|
|
clean_text = ' '.join(text.split())[:200]
|
|
result["summary"] = clean_text
|
|
|
|
return result
|
|
|
|
|
|
def store_document(filepath: Path, hash_id: str) -> Path:
|
|
"""Copy document to store with hash-based name."""
|
|
suffix = filepath.suffix.lower()
|
|
store_path = STORE / f"{hash_id}{suffix}"
|
|
if not store_path.exists():
|
|
shutil.copy2(filepath, store_path)
|
|
return store_path
|
|
|
|
|
|
def create_record(filepath: Path, hash_id: str, text: str, classification: Dict) -> Path:
|
|
"""Create markdown record in appropriate category folder."""
|
|
cat = classification["category"]
|
|
now = datetime.now()
|
|
|
|
record_name = f"{now.strftime('%Y%m%d')}_{hash_id}.md"
|
|
record_path = RECORDS / cat / record_name
|
|
|
|
content = f"""# Document Record
|
|
|
|
**ID:** {hash_id}
|
|
**Original File:** {filepath.name}
|
|
**Processed:** {now.isoformat()}
|
|
**Category:** {cat}
|
|
**Type:** {classification.get('doc_type', 'unknown')}
|
|
|
|
## Extracted Info
|
|
|
|
| Field | Value |
|
|
|-------|-------|
|
|
| Date | {classification.get('date', 'N/A')} |
|
|
| Vendor | {classification.get('vendor', 'N/A')} |
|
|
| Amount | {classification.get('amount', 'N/A')} |
|
|
|
|
## Summary
|
|
|
|
{classification.get('summary', 'No summary available.')}
|
|
|
|
## Full Text
|
|
|
|
```
|
|
{text[:5000]}
|
|
```
|
|
|
|
## Files
|
|
|
|
- **PDF:** [store/{hash_id}{filepath.suffix}](../../store/{hash_id}{filepath.suffix})
|
|
"""
|
|
|
|
record_path.write_text(content)
|
|
return record_path
|
|
|
|
|
|
def update_master_index(hash_id: str, filepath: Path, classification: Dict) -> None:
|
|
"""Update the master.json index."""
|
|
index_path = INDEX / "master.json"
|
|
|
|
if index_path.exists():
|
|
with open(index_path) as f:
|
|
data = json.load(f)
|
|
else:
|
|
data = {"version": "1.0", "created": datetime.now().strftime("%Y-%m-%d"), "documents": [], "stats": {"total": 0, "by_type": {}, "by_year": {}}}
|
|
|
|
doc_entry = {
|
|
"id": hash_id,
|
|
"filename": filepath.name,
|
|
"category": classification["category"],
|
|
"type": classification.get("doc_type", "unknown"),
|
|
"date": classification.get("date"),
|
|
"amount": classification.get("amount"),
|
|
"processed": datetime.now().isoformat(),
|
|
}
|
|
|
|
# Check for duplicate
|
|
if not any(d["id"] == hash_id for d in data["documents"]):
|
|
data["documents"].append(doc_entry)
|
|
data["stats"]["total"] = len(data["documents"])
|
|
|
|
# Update type stats
|
|
dtype = classification.get("doc_type", "unknown")
|
|
data["stats"]["by_type"][dtype] = data["stats"]["by_type"].get(dtype, 0) + 1
|
|
|
|
with open(index_path, 'w') as f:
|
|
json.dump(data, f, indent=2)
|
|
|
|
|
|
def export_expense(hash_id: str, classification: Dict, filepath: Path) -> None:
|
|
"""Append to expenses.csv if it's an expense/receipt."""
|
|
if classification["category"] not in ["expenses", "bills"]:
|
|
return
|
|
|
|
csv_path = EXPORTS / "expenses.csv"
|
|
file_exists = csv_path.exists()
|
|
|
|
with open(csv_path, 'a', newline='') as f:
|
|
writer = csv.writer(f)
|
|
if not file_exists:
|
|
writer.writerow(["date", "vendor", "amount", "category", "type", "doc_id", "filename"])
|
|
|
|
writer.writerow([
|
|
classification.get("date", ""),
|
|
classification.get("vendor", ""),
|
|
classification.get("amount", ""),
|
|
classification["category"],
|
|
classification.get("doc_type", ""),
|
|
hash_id,
|
|
filepath.name,
|
|
])
|
|
|
|
|
|
def process_document(filepath: Path) -> bool:
|
|
"""Process a single document through the full pipeline."""
|
|
print(f"Processing: {filepath.name}")
|
|
|
|
# Skip hidden files and non-documents
|
|
if filepath.name.startswith('.'):
|
|
return False
|
|
|
|
valid_extensions = {'.pdf', '.png', '.jpg', '.jpeg', '.tiff', '.tif', '.bmp', '.txt'}
|
|
if filepath.suffix.lower() not in valid_extensions:
|
|
print(f" Skipping unsupported format: {filepath.suffix}")
|
|
return False
|
|
|
|
# 1. Generate hash
|
|
hash_id = file_hash(filepath)
|
|
print(f" Hash: {hash_id}")
|
|
|
|
# 2. Check if already processed
|
|
store_path = STORE / f"{hash_id}{filepath.suffix.lower()}"
|
|
if store_path.exists():
|
|
print(f" Already processed, removing from inbox")
|
|
filepath.unlink()
|
|
return True
|
|
|
|
# 3. Extract text (OCR if needed)
|
|
print(" Extracting text...")
|
|
text = extract_text(filepath)
|
|
if not text:
|
|
print(" Warning: No text extracted")
|
|
text = "(No text could be extracted)"
|
|
else:
|
|
print(f" Extracted {len(text)} characters")
|
|
|
|
# 4. Classify
|
|
print(" Classifying...")
|
|
classification = classify_document(text, filepath.name)
|
|
print(f" Category: {classification['category']}, Type: {classification.get('doc_type')}")
|
|
|
|
# 5. Store PDF
|
|
print(" Storing document...")
|
|
store_document(filepath, hash_id)
|
|
|
|
# 6. Create record
|
|
print(" Creating record...")
|
|
record_path = create_record(filepath, hash_id, text, classification)
|
|
print(f" Record: {record_path}")
|
|
|
|
# 7. Update index
|
|
print(" Updating index...")
|
|
update_master_index(hash_id, filepath, classification)
|
|
|
|
# 8. Export if expense
|
|
export_expense(hash_id, classification, filepath)
|
|
|
|
# 9. Remove from inbox
|
|
print(" Removing from inbox...")
|
|
filepath.unlink()
|
|
|
|
print(f" ✓ Done: {classification['category']}/{hash_id}")
|
|
return True
|
|
|
|
|
|
def process_inbox() -> int:
|
|
"""Process all documents in inbox. Returns count processed."""
|
|
count = 0
|
|
for filepath in INBOX.iterdir():
|
|
if filepath.is_file() and not filepath.name.startswith('.'):
|
|
try:
|
|
if process_document(filepath):
|
|
count += 1
|
|
except Exception as e:
|
|
print(f"Error processing {filepath}: {e}")
|
|
return count
|
|
|
|
|
|
def watch_inbox(interval: int = 30) -> None:
|
|
"""Watch inbox continuously."""
|
|
print(f"Watching {INBOX} (interval: {interval}s)")
|
|
print("Press Ctrl+C to stop")
|
|
|
|
while True:
|
|
count = process_inbox()
|
|
if count:
|
|
print(f"Processed {count} document(s)")
|
|
time.sleep(interval)
|
|
|
|
|
|
def main():
|
|
import argparse
|
|
parser = argparse.ArgumentParser(description="Document processor")
|
|
parser.add_argument("--watch", action="store_true", help="Watch inbox continuously")
|
|
parser.add_argument("--interval", type=int, default=30, help="Watch interval in seconds")
|
|
parser.add_argument("--file", type=Path, help="Process single file")
|
|
args = parser.parse_args()
|
|
|
|
if args.file:
|
|
if args.file.exists():
|
|
process_document(args.file)
|
|
else:
|
|
print(f"File not found: {args.file}")
|
|
sys.exit(1)
|
|
elif args.watch:
|
|
watch_inbox(args.interval)
|
|
else:
|
|
count = process_inbox()
|
|
print(f"Processed {count} document(s)")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|