563 lines
18 KiB
Python
Executable File
563 lines
18 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Document Processor for ~/documents/inbox/
|
|
Uses AI vision (K2.5 via Fireworks) for document analysis. Stores embeddings in SQLite.
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import json
|
|
import hashlib
|
|
import shutil
|
|
import sqlite3
|
|
import csv
|
|
import base64
|
|
import struct
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Optional, Dict, Any, List
|
|
import time
|
|
import argparse
|
|
|
|
# Try to import openai (used for Fireworks API), fail gracefully
|
|
try:
|
|
from openai import OpenAI
|
|
except ImportError:
|
|
print("ERROR: openai package not installed")
|
|
print("Run: cd ~/dev/doc-processor && source venv/bin/activate && pip install openai")
|
|
sys.exit(1)
|
|
|
|
# Paths
|
|
DOCUMENTS_ROOT = Path.home() / "documents"
|
|
INBOX = DOCUMENTS_ROOT / "inbox"
|
|
STORE = DOCUMENTS_ROOT / "store"
|
|
RECORDS = DOCUMENTS_ROOT / "records"
|
|
INDEX = DOCUMENTS_ROOT / "index"
|
|
EXPORTS = DOCUMENTS_ROOT / "exports"
|
|
EMBEDDINGS_DB = INDEX / "embeddings.db"
|
|
|
|
# Categories
|
|
CATEGORIES = [
|
|
"taxes", "bills", "medical", "insurance", "legal",
|
|
"financial", "expenses", "vehicles", "home",
|
|
"personal", "contacts", "uncategorized"
|
|
]
|
|
|
|
# Ensure directories exist
|
|
for d in [STORE, INDEX, EXPORTS]:
|
|
d.mkdir(parents=True, exist_ok=True)
|
|
for cat in CATEGORIES:
|
|
(RECORDS / cat).mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
def get_fireworks_client() -> OpenAI:
|
|
"""Get Fireworks client (OpenAI-compatible), checking for API key."""
|
|
api_key = os.environ.get("FIREWORKS_API_KEY")
|
|
if not api_key:
|
|
# Try reading from config file
|
|
config_path = Path.home() / "dev/doc-processor/.env"
|
|
if config_path.exists():
|
|
for line in config_path.read_text().splitlines():
|
|
if line.startswith("FIREWORKS_API_KEY="):
|
|
api_key = line.split("=", 1)[1].strip().strip('"\'')
|
|
break
|
|
|
|
if not api_key:
|
|
raise RuntimeError(
|
|
"FIREWORKS_API_KEY not set. Either:\n"
|
|
" 1. Set FIREWORKS_API_KEY environment variable\n"
|
|
" 2. Create ~/dev/doc-processor/.env with FIREWORKS_API_KEY=..."
|
|
)
|
|
|
|
return OpenAI(
|
|
api_key=api_key,
|
|
base_url="https://api.fireworks.ai/inference/v1"
|
|
)
|
|
|
|
|
|
def init_embeddings_db():
|
|
"""Initialize SQLite database for embeddings."""
|
|
conn = sqlite3.connect(EMBEDDINGS_DB)
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS embeddings (
|
|
doc_id TEXT PRIMARY KEY,
|
|
embedding BLOB,
|
|
text_hash TEXT,
|
|
created_at TEXT
|
|
)
|
|
""")
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS documents (
|
|
doc_id TEXT PRIMARY KEY,
|
|
filename TEXT,
|
|
category TEXT,
|
|
doc_type TEXT,
|
|
date TEXT,
|
|
vendor TEXT,
|
|
amount TEXT,
|
|
summary TEXT,
|
|
full_text TEXT,
|
|
processed_at TEXT
|
|
)
|
|
""")
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
def file_hash(filepath: Path) -> str:
|
|
"""SHA256 hash of file contents."""
|
|
h = hashlib.sha256()
|
|
with open(filepath, 'rb') as f:
|
|
for chunk in iter(lambda: f.read(8192), b''):
|
|
h.update(chunk)
|
|
return h.hexdigest()[:16]
|
|
|
|
|
|
def encode_image_base64(filepath: Path) -> tuple[str, str]:
|
|
"""Encode image/PDF to base64 for API. Returns (base64_data, media_type)."""
|
|
suffix = filepath.suffix.lower()
|
|
|
|
if suffix == '.pdf':
|
|
# For PDFs, convert first page to PNG using pdftoppm
|
|
import subprocess
|
|
result = subprocess.run(
|
|
['pdftoppm', '-png', '-f', '1', '-l', '1', '-r', '150', str(filepath), '-'],
|
|
capture_output=True, timeout=30
|
|
)
|
|
if result.returncode == 0:
|
|
return base64.standard_b64encode(result.stdout).decode('utf-8'), 'image/png'
|
|
else:
|
|
raise RuntimeError(f"Failed to convert PDF: {result.stderr.decode()}")
|
|
|
|
# Image files
|
|
media_types = {
|
|
'.png': 'image/png',
|
|
'.jpg': 'image/jpeg',
|
|
'.jpeg': 'image/jpeg',
|
|
'.gif': 'image/gif',
|
|
'.webp': 'image/webp',
|
|
}
|
|
media_type = media_types.get(suffix, 'image/png')
|
|
|
|
with open(filepath, 'rb') as f:
|
|
return base64.standard_b64encode(f.read()).decode('utf-8'), media_type
|
|
|
|
|
|
def analyze_document_with_ai(filepath: Path, client: OpenAI) -> Dict[str, Any]:
|
|
"""
|
|
Use K2.5 via Fireworks to analyze document.
|
|
Returns: {category, doc_type, date, vendor, amount, summary, full_text}
|
|
"""
|
|
print(f" Analyzing with K2.5...")
|
|
|
|
try:
|
|
image_data, media_type = encode_image_base64(filepath)
|
|
except Exception as e:
|
|
print(f" Failed to encode document: {e}")
|
|
return {
|
|
"category": "uncategorized",
|
|
"doc_type": "unknown",
|
|
"full_text": f"(Failed to process: {e})",
|
|
"summary": "Document could not be processed"
|
|
}
|
|
|
|
prompt = """Analyze this document image and extract:
|
|
|
|
1. **Full Text**: Transcribe ALL visible text from the document, preserving structure where possible.
|
|
|
|
2. **Classification**: Categorize into exactly ONE of:
|
|
- taxes (W-2, 1099, tax returns, IRS forms)
|
|
- bills (utilities, subscriptions, invoices)
|
|
- medical (health records, prescriptions, lab results)
|
|
- insurance (policies, claims, coverage docs)
|
|
- legal (contracts, agreements, legal notices)
|
|
- financial (bank statements, investment docs)
|
|
- expenses (receipts, purchase confirmations)
|
|
- vehicles (registration, maintenance, DMV)
|
|
- home (mortgage, HOA, property docs)
|
|
- personal (ID copies, certificates, misc)
|
|
- contacts (business cards, contact info)
|
|
- uncategorized (if none fit)
|
|
|
|
3. **Document Type**: Specific type (e.g., "utility_bill", "receipt", "tax_form_w2", "insurance_policy")
|
|
|
|
4. **Key Fields**:
|
|
- date: Document date (YYYY-MM-DD format if possible)
|
|
- vendor: Company/organization name
|
|
- amount: Dollar amount if present (e.g., "$123.45")
|
|
|
|
5. **Summary**: 1-2 sentence description of what this document is.
|
|
|
|
Respond in JSON format ONLY (no markdown, no explanation):
|
|
{
|
|
"category": "...",
|
|
"doc_type": "...",
|
|
"date": "...",
|
|
"vendor": "...",
|
|
"amount": "...",
|
|
"summary": "...",
|
|
"full_text": "..."
|
|
}"""
|
|
|
|
try:
|
|
# K2.5 via Fireworks using OpenAI-compatible API
|
|
response = client.chat.completions.create(
|
|
model="accounts/fireworks/models/k2-5-kimi-vision",
|
|
max_tokens=8192,
|
|
messages=[
|
|
{
|
|
"role": "user",
|
|
"content": [
|
|
{
|
|
"type": "image_url",
|
|
"image_url": {
|
|
"url": f"data:{media_type};base64,{image_data}"
|
|
}
|
|
},
|
|
{
|
|
"type": "text",
|
|
"text": prompt
|
|
}
|
|
],
|
|
}
|
|
],
|
|
)
|
|
|
|
# Parse JSON from response
|
|
text = response.choices[0].message.content
|
|
|
|
# Try to extract JSON from response (handle markdown code blocks)
|
|
if "```json" in text:
|
|
text = text.split("```json")[1].split("```")[0]
|
|
elif "```" in text:
|
|
text = text.split("```")[1].split("```")[0]
|
|
|
|
result = json.loads(text.strip())
|
|
|
|
# Validate category
|
|
if result.get("category") not in CATEGORIES:
|
|
result["category"] = "uncategorized"
|
|
|
|
return result
|
|
|
|
except json.JSONDecodeError as e:
|
|
print(f" Failed to parse AI response as JSON: {e}")
|
|
print(f" Raw response: {text[:500]}")
|
|
return {
|
|
"category": "uncategorized",
|
|
"doc_type": "unknown",
|
|
"full_text": text,
|
|
"summary": "AI response could not be parsed"
|
|
}
|
|
except Exception as e:
|
|
print(f" AI analysis failed: {e}")
|
|
return {
|
|
"category": "uncategorized",
|
|
"doc_type": "unknown",
|
|
"full_text": f"(AI analysis failed: {e})",
|
|
"summary": "Document analysis failed"
|
|
}
|
|
|
|
|
|
def generate_embedding(text: str, client: OpenAI) -> Optional[List[float]]:
|
|
"""
|
|
Generate text embedding.
|
|
|
|
For now, returns None and we'll use full-text search in SQLite.
|
|
Can implement with OpenAI, Voyage, or local model later.
|
|
"""
|
|
# TODO: Implement with preferred embedding provider
|
|
# Options:
|
|
# 1. OpenAI text-embedding-3-small (cheap, good quality)
|
|
# 2. Voyage AI (good for documents)
|
|
# 3. Local sentence-transformers
|
|
return None
|
|
|
|
|
|
def store_embedding(doc_id: str, embedding: Optional[List[float]], text: str):
|
|
"""Store embedding in SQLite database."""
|
|
if embedding is None:
|
|
return
|
|
|
|
conn = sqlite3.connect(EMBEDDINGS_DB)
|
|
|
|
# Pack floats as binary blob
|
|
embedding_blob = struct.pack(f'{len(embedding)}f', *embedding)
|
|
text_hash = hashlib.sha256(text.encode()).hexdigest()[:16]
|
|
|
|
conn.execute("""
|
|
INSERT OR REPLACE INTO embeddings (doc_id, embedding, text_hash, created_at)
|
|
VALUES (?, ?, ?, ?)
|
|
""", (doc_id, embedding_blob, text_hash, datetime.now().isoformat()))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
def store_document_metadata(doc_id: str, filename: str, classification: Dict, full_text: str):
|
|
"""Store document metadata in SQLite for full-text search."""
|
|
conn = sqlite3.connect(EMBEDDINGS_DB)
|
|
|
|
conn.execute("""
|
|
INSERT OR REPLACE INTO documents
|
|
(doc_id, filename, category, doc_type, date, vendor, amount, summary, full_text, processed_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""", (
|
|
doc_id,
|
|
filename,
|
|
classification.get("category", "uncategorized"),
|
|
classification.get("doc_type", "unknown"),
|
|
classification.get("date"),
|
|
classification.get("vendor"),
|
|
classification.get("amount"),
|
|
classification.get("summary"),
|
|
full_text[:50000], # Limit text size
|
|
datetime.now().isoformat()
|
|
))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
def store_document(filepath: Path, hash_id: str) -> Path:
|
|
"""Copy document to store with hash-based name."""
|
|
suffix = filepath.suffix.lower()
|
|
store_path = STORE / f"{hash_id}{suffix}"
|
|
if not store_path.exists():
|
|
shutil.copy2(filepath, store_path)
|
|
return store_path
|
|
|
|
|
|
def create_record(filepath: Path, hash_id: str, classification: Dict) -> Path:
|
|
"""Create markdown record in appropriate category folder."""
|
|
cat = classification.get("category", "uncategorized")
|
|
now = datetime.now()
|
|
|
|
record_name = f"{now.strftime('%Y%m%d')}_{hash_id}.md"
|
|
record_path = RECORDS / cat / record_name
|
|
|
|
full_text = classification.get("full_text", "")
|
|
|
|
content = f"""# Document Record
|
|
|
|
**ID:** {hash_id}
|
|
**Original File:** {filepath.name}
|
|
**Processed:** {now.isoformat()}
|
|
**Category:** {cat}
|
|
**Type:** {classification.get('doc_type', 'unknown')}
|
|
|
|
## Extracted Info
|
|
|
|
| Field | Value |
|
|
|-------|-------|
|
|
| Date | {classification.get('date', 'N/A')} |
|
|
| Vendor | {classification.get('vendor', 'N/A')} |
|
|
| Amount | {classification.get('amount', 'N/A')} |
|
|
|
|
## Summary
|
|
|
|
{classification.get('summary', 'No summary available.')}
|
|
|
|
## Full Text
|
|
|
|
```
|
|
{full_text[:10000]}
|
|
```
|
|
|
|
## Files
|
|
|
|
- **Original:** [store/{hash_id}{filepath.suffix}](../../store/{hash_id}{filepath.suffix})
|
|
"""
|
|
|
|
record_path.write_text(content)
|
|
return record_path
|
|
|
|
|
|
def update_master_index(hash_id: str, filepath: Path, classification: Dict) -> None:
|
|
"""Update the master.json index."""
|
|
index_path = INDEX / "master.json"
|
|
|
|
if index_path.exists():
|
|
with open(index_path) as f:
|
|
data = json.load(f)
|
|
else:
|
|
data = {
|
|
"version": "2.0",
|
|
"created": datetime.now().strftime("%Y-%m-%d"),
|
|
"documents": [],
|
|
"stats": {"total": 0, "by_type": {}, "by_category": {}}
|
|
}
|
|
|
|
doc_entry = {
|
|
"id": hash_id,
|
|
"filename": filepath.name,
|
|
"category": classification.get("category", "uncategorized"),
|
|
"type": classification.get("doc_type", "unknown"),
|
|
"date": classification.get("date"),
|
|
"vendor": classification.get("vendor"),
|
|
"amount": classification.get("amount"),
|
|
"summary": classification.get("summary"),
|
|
"processed": datetime.now().isoformat(),
|
|
}
|
|
|
|
# Check for duplicate
|
|
if not any(d["id"] == hash_id for d in data["documents"]):
|
|
data["documents"].append(doc_entry)
|
|
data["stats"]["total"] = len(data["documents"])
|
|
|
|
# Update type/category stats
|
|
dtype = classification.get("doc_type", "unknown")
|
|
cat = classification.get("category", "uncategorized")
|
|
data["stats"]["by_type"][dtype] = data["stats"]["by_type"].get(dtype, 0) + 1
|
|
data["stats"]["by_category"][cat] = data["stats"]["by_category"].get(cat, 0) + 1
|
|
|
|
with open(index_path, 'w') as f:
|
|
json.dump(data, f, indent=2)
|
|
|
|
|
|
def export_expense(hash_id: str, classification: Dict, filepath: Path) -> None:
|
|
"""Append to expenses.csv if it's an expense/receipt."""
|
|
if classification.get("category") not in ["expenses", "bills"]:
|
|
return
|
|
|
|
csv_path = EXPORTS / "expenses.csv"
|
|
file_exists = csv_path.exists()
|
|
|
|
with open(csv_path, 'a', newline='') as f:
|
|
writer = csv.writer(f)
|
|
if not file_exists:
|
|
writer.writerow(["date", "vendor", "amount", "category", "type", "doc_id", "filename"])
|
|
|
|
writer.writerow([
|
|
classification.get("date", ""),
|
|
classification.get("vendor", ""),
|
|
classification.get("amount", ""),
|
|
classification.get("category", ""),
|
|
classification.get("doc_type", ""),
|
|
hash_id,
|
|
filepath.name,
|
|
])
|
|
|
|
|
|
def process_document(filepath: Path, client: OpenAI) -> bool:
|
|
"""Process a single document through the full pipeline."""
|
|
print(f"Processing: {filepath.name}")
|
|
|
|
# Skip hidden files
|
|
if filepath.name.startswith('.'):
|
|
return False
|
|
|
|
valid_extensions = {'.pdf', '.png', '.jpg', '.jpeg', '.gif', '.webp', '.tiff', '.tif', '.bmp'}
|
|
if filepath.suffix.lower() not in valid_extensions:
|
|
print(f" Skipping unsupported format: {filepath.suffix}")
|
|
return False
|
|
|
|
# 1. Generate hash
|
|
hash_id = file_hash(filepath)
|
|
print(f" Hash: {hash_id}")
|
|
|
|
# 2. Check if already processed
|
|
store_path = STORE / f"{hash_id}{filepath.suffix.lower()}"
|
|
if store_path.exists():
|
|
print(f" Already processed, removing from inbox")
|
|
filepath.unlink()
|
|
return True
|
|
|
|
# 3. Analyze with AI (extracts text + classifies in one pass)
|
|
classification = analyze_document_with_ai(filepath, client)
|
|
full_text = classification.get("full_text", "")
|
|
print(f" Category: {classification.get('category')}, Type: {classification.get('doc_type')}")
|
|
print(f" Extracted {len(full_text)} characters")
|
|
|
|
# 4. Store original document
|
|
print(" Storing document...")
|
|
store_document(filepath, hash_id)
|
|
|
|
# 5. Create markdown record
|
|
print(" Creating record...")
|
|
record_path = create_record(filepath, hash_id, classification)
|
|
print(f" Record: {record_path}")
|
|
|
|
# 6. Update JSON index
|
|
print(" Updating index...")
|
|
update_master_index(hash_id, filepath, classification)
|
|
|
|
# 7. Store in SQLite (for search)
|
|
print(" Storing in SQLite...")
|
|
store_document_metadata(hash_id, filepath.name, classification, full_text)
|
|
|
|
# 8. Generate and store embedding (if implemented)
|
|
embedding = generate_embedding(full_text, client)
|
|
if embedding:
|
|
store_embedding(hash_id, embedding, full_text)
|
|
|
|
# 9. Export if expense
|
|
export_expense(hash_id, classification, filepath)
|
|
|
|
# 10. Remove from inbox
|
|
print(" Removing from inbox...")
|
|
filepath.unlink()
|
|
|
|
print(f" ✓ Done: {classification.get('category')}/{hash_id}")
|
|
return True
|
|
|
|
|
|
def process_inbox(client: OpenAI) -> int:
|
|
"""Process all documents in inbox. Returns count processed."""
|
|
count = 0
|
|
for filepath in sorted(INBOX.iterdir()):
|
|
if filepath.is_file() and not filepath.name.startswith('.'):
|
|
try:
|
|
if process_document(filepath, client):
|
|
count += 1
|
|
except Exception as e:
|
|
print(f"Error processing {filepath}: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return count
|
|
|
|
|
|
def watch_inbox(client: OpenAI, interval: int = 60) -> None:
|
|
"""Watch inbox continuously."""
|
|
print(f"Watching {INBOX} (interval: {interval}s)")
|
|
print("Press Ctrl+C to stop")
|
|
|
|
while True:
|
|
count = process_inbox(client)
|
|
if count:
|
|
print(f"Processed {count} document(s)")
|
|
time.sleep(interval)
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="AI-powered document processor")
|
|
parser.add_argument("--watch", action="store_true", help="Watch inbox continuously")
|
|
parser.add_argument("--interval", type=int, default=60, help="Watch interval in seconds")
|
|
parser.add_argument("--file", type=Path, help="Process single file")
|
|
args = parser.parse_args()
|
|
|
|
# Initialize
|
|
init_embeddings_db()
|
|
|
|
try:
|
|
client = get_fireworks_client()
|
|
except RuntimeError as e:
|
|
print(f"ERROR: {e}")
|
|
sys.exit(1)
|
|
|
|
if args.file:
|
|
if args.file.exists():
|
|
process_document(args.file, client)
|
|
else:
|
|
print(f"File not found: {args.file}")
|
|
sys.exit(1)
|
|
elif args.watch:
|
|
watch_inbox(client, args.interval)
|
|
else:
|
|
count = process_inbox(client)
|
|
print(f"Processed {count} document(s)")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|