docman/internal/processor/processor.go

701 lines
17 KiB
Go

package processor
import (
"bytes"
"context"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"log"
"os"
"os/exec"
"path/filepath"
"regexp"
"strings"
"time"
"docman/internal/db"
"github.com/fsnotify/fsnotify"
"github.com/google/uuid"
openai "github.com/sashabaranov/go-openai"
)
type Processor struct {
db *db.DB
inboxDir string
storeDir string
recordsDir string
aiClient *openai.Client
aiModel string
embedModel string
}
type Config struct {
InboxDir string
StoreDir string
RecordsDir string
AIEndpoint string // Fireworks API endpoint
AIKey string
AIModel string // e.g., "accounts/fireworks/models/qwen2-vl-72b-instruct"
EmbedModel string
}
type Classification struct {
Category string `json:"category"`
Subcategory string `json:"subcategory,omitempty"`
Title string `json:"title"`
Date string `json:"date,omitempty"`
Vendor string `json:"vendor,omitempty"`
Amount *float64 `json:"amount,omitempty"`
Currency string `json:"currency,omitempty"`
TaxDeductible bool `json:"tax_deductible"`
Summary string `json:"summary"`
KeyFields map[string]string `json:"key_fields,omitempty"`
}
func New(cfg Config, database *db.DB) *Processor {
config := openai.DefaultConfig(cfg.AIKey)
config.BaseURL = cfg.AIEndpoint
return &Processor{
db: database,
inboxDir: cfg.InboxDir,
storeDir: cfg.StoreDir,
recordsDir: cfg.RecordsDir,
aiClient: openai.NewClientWithConfig(config),
aiModel: cfg.AIModel,
embedModel: cfg.EmbedModel,
}
}
func (p *Processor) Watch(ctx context.Context) error {
// Ensure directories exist
for _, dir := range []string{p.inboxDir, p.storeDir, p.recordsDir} {
if err := os.MkdirAll(dir, 0755); err != nil {
return fmt.Errorf("create directory %s: %w", dir, err)
}
}
// Process existing files first
entries, _ := os.ReadDir(p.inboxDir)
for _, entry := range entries {
if entry.IsDir() || strings.HasPrefix(entry.Name(), ".") {
continue
}
path := filepath.Join(p.inboxDir, entry.Name())
if err := p.ProcessFile(ctx, path); err != nil {
log.Printf("Error processing %s: %v", path, err)
}
}
// Watch for new files
watcher, err := fsnotify.NewWatcher()
if err != nil {
return err
}
defer watcher.Close()
if err := watcher.Add(p.inboxDir); err != nil {
return err
}
log.Printf("Watching inbox: %s", p.inboxDir)
for {
select {
case <-ctx.Done():
return ctx.Err()
case event, ok := <-watcher.Events:
if !ok {
return nil
}
if event.Op&fsnotify.Create == fsnotify.Create {
// Wait a moment for file to be fully written
time.Sleep(500 * time.Millisecond)
if err := p.ProcessFile(ctx, event.Name); err != nil {
log.Printf("Error processing %s: %v", event.Name, err)
}
}
case err, ok := <-watcher.Errors:
if !ok {
return nil
}
log.Printf("Watcher error: %v", err)
}
}
}
func (p *Processor) ProcessFile(ctx context.Context, path string) error {
// Skip hidden files and non-PDFs/images
base := filepath.Base(path)
if strings.HasPrefix(base, ".") {
return nil
}
ext := strings.ToLower(filepath.Ext(path))
if ext != ".pdf" && ext != ".jpg" && ext != ".jpeg" && ext != ".png" {
log.Printf("Skipping non-document file: %s", path)
return nil
}
log.Printf("Processing: %s", path)
// Read file
data, err := os.ReadFile(path)
if err != nil {
return err
}
// Compute checksum
hash := sha256.Sum256(data)
checksum := hex.EncodeToString(hash[:])
// Generate ID
id := uuid.New().String()
// Extract text via OCR
ocrText, pageCount, err := p.extractText(path, ext)
if err != nil {
log.Printf("OCR failed for %s: %v", path, err)
ocrText = ""
}
// Classify with AI
classification, err := p.classify(ctx, ocrText, base)
if err != nil {
log.Printf("Classification failed for %s: %v", path, err)
classification = &Classification{
Category: "uncategorized",
Title: base,
}
}
// Store PDF
storageName := fmt.Sprintf("%s%s", checksum[:16], ext)
storagePath := filepath.Join(p.storeDir, storageName)
if err := os.WriteFile(storagePath, data, 0644); err != nil {
return err
}
// Parse date
var docDate *time.Time
if classification.Date != "" {
if t, err := parseDate(classification.Date); err == nil {
docDate = &t
}
}
// Create document record
doc := &db.Document{
ID: id,
Filename: storageName,
OriginalName: base,
Category: classification.Category,
Subcategory: classification.Subcategory,
Title: classification.Title,
Date: docDate,
Vendor: classification.Vendor,
Amount: classification.Amount,
Currency: classification.Currency,
TaxDeductible: classification.TaxDeductible,
OCRText: ocrText,
StoragePath: storagePath,
PageCount: pageCount,
FileSize: int64(len(data)),
MimeType: getMimeType(ext),
Checksum: checksum,
ProcessedAt: time.Now(),
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}
// Generate markdown record
mdPath, err := p.writeMarkdown(doc, classification)
if err != nil {
log.Printf("Failed to write markdown: %v", err)
} else {
doc.MarkdownPath = mdPath
}
// Generate embedding
if embedding, err := p.generateEmbedding(ctx, ocrText); err == nil {
doc.Embedding = embedding
}
// Store metadata as JSON
if meta, err := json.Marshal(classification.KeyFields); err == nil {
doc.Metadata = meta
}
// Insert into database
if err := p.db.InsertDocument(doc); err != nil {
return err
}
// Remove from inbox
if err := os.Remove(path); err != nil {
log.Printf("Failed to remove inbox file: %v", err)
}
log.Printf("Processed: %s -> %s (%s)", base, classification.Title, classification.Category)
return nil
}
func (p *Processor) extractText(path, ext string) (string, int, error) {
if ext == ".pdf" {
return p.extractPDFText(path)
}
return p.extractImageText(path)
}
func (p *Processor) extractPDFText(path string) (string, int, error) {
// Try pdftotext first (poppler-utils)
cmd := exec.Command("pdftotext", "-layout", path, "-")
output, err := cmd.Output()
if err == nil && len(output) > 100 {
// Count pages
pageCmd := exec.Command("pdfinfo", path)
pageOut, _ := pageCmd.Output()
pages := 1
if match := regexp.MustCompile(`Pages:\s+(\d+)`).FindSubmatch(pageOut); len(match) > 1 {
fmt.Sscanf(string(match[1]), "%d", &pages)
}
return string(output), pages, nil
}
// Fallback to OCR via tesseract
// Convert PDF to images first
tmpDir, err := os.MkdirTemp("", "docman-ocr-")
if err != nil {
return "", 0, err
}
defer os.RemoveAll(tmpDir)
// Use pdftoppm to convert to images
cmd = exec.Command("pdftoppm", "-png", "-r", "300", path, filepath.Join(tmpDir, "page"))
if err := cmd.Run(); err != nil {
return "", 0, fmt.Errorf("pdftoppm failed: %w", err)
}
// OCR each page
var textBuf bytes.Buffer
pages, _ := filepath.Glob(filepath.Join(tmpDir, "page-*.png"))
for _, pagePath := range pages {
text, _, _ := p.extractImageText(pagePath)
textBuf.WriteString(text)
textBuf.WriteString("\n\n--- Page Break ---\n\n")
}
return textBuf.String(), len(pages), nil
}
func (p *Processor) extractImageText(path string) (string, int, error) {
cmd := exec.Command("tesseract", path, "stdout", "-l", "eng+nld")
output, err := cmd.Output()
if err != nil {
return "", 1, err
}
return string(output), 1, nil
}
func (p *Processor) classify(ctx context.Context, text, filename string) (*Classification, error) {
prompt := fmt.Sprintf(`Analyze this scanned document and extract structured information.
Document filename: %s
OCR Text:
%s
Classify and extract the following JSON structure:
{
"category": "taxes|expenses|bills|medical|contacts|legal|insurance|banking|receipts|correspondence|uncategorized",
"subcategory": "more specific category if applicable",
"title": "descriptive title for this document",
"date": "YYYY-MM-DD if found",
"vendor": "company/person name if applicable",
"amount": numeric amount if this is a financial document,
"currency": "USD" or other currency code,
"tax_deductible": true/false if this is a deductible expense,
"summary": "one paragraph summary of the document",
"key_fields": {"field_name": "value"} for any other important extracted data
}
Categories:
- taxes: W-2, 1099, tax returns, deductions
- expenses: receipts, invoices for purchases
- bills: utility bills, service bills
- medical: medical records, prescriptions, EOBs
- contacts: business cards, contact info
- legal: contracts, agreements, legal documents
- insurance: policies, claims
- banking: statements, checks
- receipts: general purchase receipts
- correspondence: letters, emails
Return ONLY valid JSON.`, filename, truncate(text, 4000))
resp, err := p.aiClient.CreateChatCompletion(ctx, openai.ChatCompletionRequest{
Model: p.aiModel,
Messages: []openai.ChatCompletionMessage{
{Role: openai.ChatMessageRoleUser, Content: prompt},
},
Temperature: 0.1,
})
if err != nil {
return nil, err
}
if len(resp.Choices) == 0 {
return nil, fmt.Errorf("no response from AI")
}
content := resp.Choices[0].Message.Content
// Extract JSON from response
content = extractJSON(content)
var classification Classification
if err := json.Unmarshal([]byte(content), &classification); err != nil {
return nil, fmt.Errorf("parse classification: %w", err)
}
return &classification, nil
}
func (p *Processor) generateEmbedding(ctx context.Context, text string) ([]byte, error) {
if p.embedModel == "" {
return nil, nil
}
resp, err := p.aiClient.CreateEmbeddings(ctx, openai.EmbeddingRequest{
Model: openai.EmbeddingModel(p.embedModel),
Input: []string{truncate(text, 8000)},
})
if err != nil {
return nil, err
}
if len(resp.Data) == 0 {
return nil, fmt.Errorf("no embedding returned")
}
// Serialize embedding to bytes
embData, err := json.Marshal(resp.Data[0].Embedding)
return embData, err
}
func (p *Processor) writeMarkdown(doc *db.Document, class *Classification) (string, error) {
// Create category subdirectory
catDir := filepath.Join(p.recordsDir, doc.Category)
if err := os.MkdirAll(catDir, 0755); err != nil {
return "", err
}
// Generate filename
dateStr := "undated"
if doc.Date != nil {
dateStr = doc.Date.Format("2006-01-02")
}
safeName := sanitizeFilename(doc.Title)
mdName := fmt.Sprintf("%s_%s.md", dateStr, safeName)
mdPath := filepath.Join(catDir, mdName)
// Build markdown content
var buf bytes.Buffer
buf.WriteString(fmt.Sprintf("# %s\n\n", doc.Title))
buf.WriteString("## Metadata\n\n")
buf.WriteString(fmt.Sprintf("- **ID:** %s\n", doc.ID))
buf.WriteString(fmt.Sprintf("- **Category:** %s", doc.Category))
if doc.Subcategory != "" {
buf.WriteString(fmt.Sprintf(" / %s", doc.Subcategory))
}
buf.WriteString("\n")
if doc.Date != nil {
buf.WriteString(fmt.Sprintf("- **Date:** %s\n", doc.Date.Format("2006-01-02")))
}
if doc.Vendor != "" {
buf.WriteString(fmt.Sprintf("- **Vendor:** %s\n", doc.Vendor))
}
if doc.Amount != nil {
buf.WriteString(fmt.Sprintf("- **Amount:** %s %.2f\n", doc.Currency, *doc.Amount))
}
if doc.TaxDeductible {
buf.WriteString("- **Tax Deductible:** Yes\n")
}
buf.WriteString(fmt.Sprintf("- **Original File:** %s\n", doc.OriginalName))
buf.WriteString(fmt.Sprintf("- **PDF:** [View](%s)\n", doc.StoragePath))
buf.WriteString(fmt.Sprintf("- **Processed:** %s\n", doc.ProcessedAt.Format(time.RFC3339)))
if class.Summary != "" {
buf.WriteString("\n## Summary\n\n")
buf.WriteString(class.Summary)
buf.WriteString("\n")
}
if len(class.KeyFields) > 0 {
buf.WriteString("\n## Key Fields\n\n")
for k, v := range class.KeyFields {
buf.WriteString(fmt.Sprintf("- **%s:** %s\n", k, v))
}
}
buf.WriteString("\n## Full Text (OCR)\n\n```\n")
buf.WriteString(doc.OCRText)
buf.WriteString("\n```\n")
if err := os.WriteFile(mdPath, buf.Bytes(), 0644); err != nil {
return "", err
}
return mdPath, nil
}
// Helper functions
func truncate(s string, max int) string {
if len(s) <= max {
return s
}
return s[:max]
}
func extractJSON(s string) string {
// Try to find JSON block
start := strings.Index(s, "{")
end := strings.LastIndex(s, "}")
if start >= 0 && end > start {
return s[start : end+1]
}
return s
}
func parseDate(s string) (time.Time, error) {
formats := []string{
"2006-01-02",
"01/02/2006",
"1/2/2006",
"January 2, 2006",
"Jan 2, 2006",
"2006/01/02",
}
for _, f := range formats {
if t, err := time.Parse(f, s); err == nil {
return t, nil
}
}
return time.Time{}, fmt.Errorf("cannot parse date: %s", s)
}
func getMimeType(ext string) string {
switch ext {
case ".pdf":
return "application/pdf"
case ".jpg", ".jpeg":
return "image/jpeg"
case ".png":
return "image/png"
default:
return "application/octet-stream"
}
}
func sanitizeFilename(s string) string {
s = strings.ToLower(s)
s = regexp.MustCompile(`[^a-z0-9]+`).ReplaceAllString(s, "-")
s = strings.Trim(s, "-")
if len(s) > 50 {
s = s[:50]
}
return s
}
// ProcessSingle processes a single file and returns the document (for API uploads)
func (p *Processor) ProcessSingle(ctx context.Context, data []byte, filename string) (*db.Document, error) {
// Write to temp file for processing
ext := strings.ToLower(filepath.Ext(filename))
tmpFile, err := os.CreateTemp("", "docman-upload-*"+ext)
if err != nil {
return nil, err
}
defer os.Remove(tmpFile.Name())
if _, err := tmpFile.Write(data); err != nil {
return nil, err
}
tmpFile.Close()
// Use existing process logic but don't delete the temp file
// Compute checksum
hash := sha256.Sum256(data)
checksum := hex.EncodeToString(hash[:])
id := uuid.New().String()
ocrText, pageCount, err := p.extractText(tmpFile.Name(), ext)
if err != nil {
ocrText = ""
}
classification, err := p.classify(ctx, ocrText, filename)
if err != nil {
classification = &Classification{
Category: "uncategorized",
Title: filename,
}
}
storageName := fmt.Sprintf("%s%s", checksum[:16], ext)
storagePath := filepath.Join(p.storeDir, storageName)
if err := os.WriteFile(storagePath, data, 0644); err != nil {
return nil, err
}
var docDate *time.Time
if classification.Date != "" {
if t, err := parseDate(classification.Date); err == nil {
docDate = &t
}
}
doc := &db.Document{
ID: id,
Filename: storageName,
OriginalName: filename,
Category: classification.Category,
Subcategory: classification.Subcategory,
Title: classification.Title,
Date: docDate,
Vendor: classification.Vendor,
Amount: classification.Amount,
Currency: classification.Currency,
TaxDeductible: classification.TaxDeductible,
OCRText: ocrText,
StoragePath: storagePath,
PageCount: pageCount,
FileSize: int64(len(data)),
MimeType: getMimeType(ext),
Checksum: checksum,
ProcessedAt: time.Now(),
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}
mdPath, _ := p.writeMarkdown(doc, classification)
doc.MarkdownPath = mdPath
if embedding, err := p.generateEmbedding(ctx, ocrText); err == nil {
doc.Embedding = embedding
}
if meta, err := json.Marshal(classification.KeyFields); err == nil {
doc.Metadata = meta
}
if err := p.db.InsertDocument(doc); err != nil {
return nil, err
}
return doc, nil
}
// SearchMarkdown searches markdown files directly (fallback when embeddings unavailable)
func SearchMarkdown(recordsDir, query string, limit int) ([]*db.SearchResult, error) {
var results []*db.SearchResult
query = strings.ToLower(query)
terms := strings.Fields(query)
err := filepath.Walk(recordsDir, func(path string, info os.FileInfo, err error) error {
if err != nil || info.IsDir() || !strings.HasSuffix(path, ".md") {
return nil
}
data, err := os.ReadFile(path)
if err != nil {
return nil
}
content := strings.ToLower(string(data))
score := 0.0
for _, term := range terms {
if strings.Contains(content, term) {
score += 1.0
}
}
if score > 0 {
// Extract title from first line
lines := strings.Split(string(data), "\n")
title := strings.TrimPrefix(lines[0], "# ")
// Find snippet around first match
snippet := findSnippet(string(data), terms[0], 100)
results = append(results, &db.SearchResult{
Document: db.Document{
Title: title,
MarkdownPath: path,
},
Score: score / float64(len(terms)),
Snippet: snippet,
})
}
return nil
})
if err != nil {
return nil, err
}
// Sort by score descending
for i := 0; i < len(results)-1; i++ {
for j := i + 1; j < len(results); j++ {
if results[j].Score > results[i].Score {
results[i], results[j] = results[j], results[i]
}
}
}
if len(results) > limit {
results = results[:limit]
}
return results, nil
}
func findSnippet(text, term string, radius int) string {
lower := strings.ToLower(text)
idx := strings.Index(lower, strings.ToLower(term))
if idx < 0 {
if len(text) > radius*2 {
return text[:radius*2] + "..."
}
return text
}
start := idx - radius
if start < 0 {
start = 0
}
end := idx + len(term) + radius
if end > len(text) {
end = len(text)
}
snippet := text[start:end]
if start > 0 {
snippet = "..." + snippet
}
if end < len(text) {
snippet = snippet + "..."
}
return snippet
}
func (p *Processor) GetRecordsDir() string {
return p.recordsDir
}
func (p *Processor) GetStoreDir() string {
return p.storeDir
}