doc-processor/search.py

227 lines
6.3 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Search documents in the document management system.
Uses SQLite full-text search on document content.
"""
import os
import sys
import json
import sqlite3
import argparse
from pathlib import Path
from datetime import datetime
DOCUMENTS_ROOT = Path.home() / "documents"
INDEX = DOCUMENTS_ROOT / "index"
RECORDS = DOCUMENTS_ROOT / "records"
EMBEDDINGS_DB = INDEX / "embeddings.db"
def get_db() -> sqlite3.Connection:
"""Get database connection."""
if not EMBEDDINGS_DB.exists():
print(f"Database not found: {EMBEDDINGS_DB}")
print("Run the processor first to create the database.")
sys.exit(1)
return sqlite3.connect(EMBEDDINGS_DB)
def search_documents(query: str, category: str = None, doc_type: str = None, limit: int = 20) -> list:
"""
Search documents by query using SQLite full-text search.
Returns list of matching documents.
"""
conn = get_db()
conn.row_factory = sqlite3.Row
# Build query
conditions = []
params = []
if query:
# Search in full_text, summary, vendor, filename
conditions.append("""(
full_text LIKE ? OR
summary LIKE ? OR
vendor LIKE ? OR
filename LIKE ?
)""")
like_query = f"%{query}%"
params.extend([like_query, like_query, like_query, like_query])
if category:
conditions.append("category = ?")
params.append(category)
if doc_type:
conditions.append("doc_type = ?")
params.append(doc_type)
where_clause = " AND ".join(conditions) if conditions else "1=1"
sql = f"""
SELECT doc_id, filename, category, doc_type, date, vendor, amount, summary, processed_at
FROM documents
WHERE {where_clause}
ORDER BY processed_at DESC
LIMIT ?
"""
params.append(limit)
cursor = conn.execute(sql, params)
results = [dict(row) for row in cursor.fetchall()]
conn.close()
return results
def get_document(doc_id: str) -> dict:
"""Get full document details by ID."""
conn = get_db()
conn.row_factory = sqlite3.Row
cursor = conn.execute("""
SELECT * FROM documents WHERE doc_id = ? OR doc_id LIKE ?
""", (doc_id, f"{doc_id}%"))
row = cursor.fetchone()
conn.close()
return dict(row) if row else None
def list_categories() -> dict:
"""List all categories with document counts."""
conn = get_db()
cursor = conn.execute("""
SELECT category, COUNT(*) as count
FROM documents
GROUP BY category
ORDER BY count DESC
""")
results = {row[0]: row[1] for row in cursor.fetchall()}
conn.close()
return results
def list_types() -> dict:
"""List all document types with counts."""
conn = get_db()
cursor = conn.execute("""
SELECT doc_type, COUNT(*) as count
FROM documents
GROUP BY doc_type
ORDER BY count DESC
""")
results = {row[0]: row[1] for row in cursor.fetchall()}
conn.close()
return results
def show_stats() -> None:
"""Show document statistics."""
conn = get_db()
# Total count
total = conn.execute("SELECT COUNT(*) FROM documents").fetchone()[0]
print("\n📊 Document Statistics")
print("=" * 40)
print(f"Total documents: {total}")
# By category
print("\nBy category:")
for cat, count in list_categories().items():
print(f" {cat}: {count}")
# By type
print("\nBy type:")
for dtype, count in list_types().items():
print(f" {dtype}: {count}")
conn.close()
def show_document(doc_id: str) -> None:
"""Show full details of a document."""
doc = get_document(doc_id)
if not doc:
print(f"Document not found: {doc_id}")
return
print(f"\n{'=' * 60}")
print(f"Document: {doc['filename']}")
print(f"ID: {doc['doc_id']}")
print(f"Category: {doc['category']}")
print(f"Type: {doc['doc_type'] or 'unknown'}")
print(f"Date: {doc['date'] or 'N/A'}")
print(f"Vendor: {doc['vendor'] or 'N/A'}")
print(f"Amount: {doc['amount'] or 'N/A'}")
print(f"Processed: {doc['processed_at']}")
print(f"{'=' * 60}")
if doc['summary']:
print(f"\nSummary:\n{doc['summary']}")
if doc['full_text']:
print(f"\n--- Full Text (first 2000 chars) ---\n")
print(doc['full_text'][:2000])
if len(doc['full_text']) > 2000:
print(f"\n... [{len(doc['full_text']) - 2000} more characters]")
def format_results(results: list) -> None:
"""Format and print search results."""
if not results:
print("No documents found")
return
print(f"\nFound {len(results)} document(s):\n")
# Header
print(f"{'ID':<10} {'Category':<12} {'Type':<18} {'Date':<12} {'Amount':<10} {'Filename'}")
print("-" * 90)
for doc in results:
doc_id = doc['doc_id'][:8]
cat = (doc['category'] or '')[:12]
dtype = (doc['doc_type'] or 'unknown')[:18]
date = (doc['date'] or '')[:12]
amount = (doc['amount'] or '')[:10]
filename = doc['filename'][:30]
print(f"{doc_id:<10} {cat:<12} {dtype:<18} {date:<12} {amount:<10} {filename}")
def main():
parser = argparse.ArgumentParser(description="Search documents")
parser.add_argument("query", nargs="?", help="Search query")
parser.add_argument("-c", "--category", help="Filter by category")
parser.add_argument("-t", "--type", help="Filter by document type")
parser.add_argument("-s", "--show", help="Show full document by ID")
parser.add_argument("--stats", action="store_true", help="Show statistics")
parser.add_argument("-l", "--list", action="store_true", help="List all documents")
parser.add_argument("-n", "--limit", type=int, default=20, help="Max results (default: 20)")
parser.add_argument("--full-text", action="store_true", help="Show full text in results")
args = parser.parse_args()
if args.stats:
show_stats()
return
if args.show:
show_document(args.show)
return
if args.list or args.query or args.category or args.type:
results = search_documents(args.query, args.category, args.type, args.limit)
format_results(results)
else:
parser.print_help()
if __name__ == "__main__":
main()