commit 067f47a4d22cd4b3f081cb5f1c1dcd47938e3541 Author: Johan Jongsma Date: Sun Feb 1 08:03:45 2026 +0000 Initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9d82971 --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +node_modules/ +.env +*.log +.DS_Store + diff --git a/README.md b/README.md new file mode 100644 index 0000000..bc2da13 --- /dev/null +++ b/README.md @@ -0,0 +1,95 @@ +# Document Processor + +Go service that watches `~/documents/inbox/` for PDFs and images, uses Kimi K2.5 (via Fireworks API) for OCR and classification, then stores and indexes them. + +## Features + +- **File watcher**: Monitors inbox for new documents +- **OCR + Classification**: Kimi K2.5 extracts text and categorizes documents +- **Storage**: PDFs stored in `~/documents/store/` +- **Records**: Markdown records in `~/documents/records/{category}/` +- **Index**: JSON index at `~/documents/index/master.json` +- **Expense export**: Auto-exports expenses to `~/documents/exports/expenses.csv` +- **HTTP API**: REST endpoints for manual ingestion and search + +## Setup + +1. Set your Fireworks API key: + ```bash + export FIREWORKS_API_KEY=your_key_here + ``` + +2. Run the service: + ```bash + ./docproc + ``` + +3. Or install as systemd service: + ```bash + sudo cp docproc.service /etc/systemd/system/ + # Edit /etc/systemd/system/docproc.service to add your API key + sudo systemctl daemon-reload + sudo systemctl enable --now docproc + ``` + +## API Endpoints + +| Endpoint | Method | Description | +|----------|--------|-------------| +| `/health` | GET | Health check | +| `/ingest` | POST | Upload and process a document (multipart form, field: `file`) | +| `/search?q=query` | GET | Search documents by content | +| `/docs` | GET | List all documents | +| `/doc/{id}` | GET | Get single document by ID | + +## Directory Structure + +``` +~/documents/ +├── inbox/ # Drop files here for processing +├── store/ # Processed PDFs stored by hash +├── records/ # Markdown records by category +│ ├── tax/ +│ ├── expense/ +│ ├── medical/ +│ └── ... +├── index/ +│ └── master.json # Document index +└── exports/ + └── expenses.csv # Expense export +``` + +## Categories + +Documents are classified into: +- tax +- expense +- bill +- invoice +- medical +- receipt +- bank +- insurance +- legal +- correspondence +- other + +## Usage + +Drop a PDF or image into `~/documents/inbox/` and the service will: +1. OCR and classify it +2. Store the original in `store/` +3. Create a markdown record in `records/{category}/` +4. Update the master index +5. Export to CSV if it's an expense +6. Delete from inbox + +Or POST to `/ingest`: +```bash +curl -X POST http://localhost:9900/ingest -F "file=@receipt.pdf" +``` + +Search documents: +```bash +curl "http://localhost:9900/search?q=amazon" +``` diff --git a/api/server.go b/api/server.go new file mode 100644 index 0000000..b0fe324 --- /dev/null +++ b/api/server.go @@ -0,0 +1,146 @@ +package api + +import ( + "encoding/json" + "io" + "log" + "net/http" + "os" + "path/filepath" + "strings" + + "docproc/processor" +) + +type Server struct { + proc *processor.Processor +} + +func NewServer(proc *processor.Processor) *Server { + return &Server{proc: proc} +} + +func (s *Server) Start(addr string) error { + mux := http.NewServeMux() + + mux.HandleFunc("/health", s.handleHealth) + mux.HandleFunc("/ingest", s.handleIngest) + mux.HandleFunc("/search", s.handleSearch) + mux.HandleFunc("/docs", s.handleList) + mux.HandleFunc("/doc/", s.handleDoc) + + log.Printf("API server starting on %s", addr) + return http.ListenAndServe(addr, s.corsMiddleware(mux)) +} + +func (s *Server) corsMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Access-Control-Allow-Methods", "GET, POST, OPTIONS") + w.Header().Set("Access-Control-Allow-Headers", "Content-Type") + + if r.Method == "OPTIONS" { + w.WriteHeader(http.StatusOK) + return + } + + next.ServeHTTP(w, r) + }) +} + +func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) { + json.NewEncoder(w).Encode(map[string]string{"status": "ok"}) +} + +func (s *Server) handleIngest(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + // Parse multipart form + if err := r.ParseMultipartForm(50 << 20); err != nil { // 50MB max + http.Error(w, "Failed to parse form: "+err.Error(), http.StatusBadRequest) + return + } + + file, header, err := r.FormFile("file") + if err != nil { + http.Error(w, "No file provided: "+err.Error(), http.StatusBadRequest) + return + } + defer file.Close() + + // Save to inbox + homeDir, _ := os.UserHomeDir() + inboxPath := filepath.Join(homeDir, "documents", "inbox", header.Filename) + + out, err := os.Create(inboxPath) + if err != nil { + http.Error(w, "Failed to save file: "+err.Error(), http.StatusInternalServerError) + return + } + defer out.Close() + + if _, err := io.Copy(out, file); err != nil { + http.Error(w, "Failed to write file: "+err.Error(), http.StatusInternalServerError) + return + } + + // Process immediately + if err := s.proc.ProcessFile(inboxPath); err != nil { + http.Error(w, "Failed to process file: "+err.Error(), http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]string{ + "status": "processed", + "message": "Document processed successfully", + }) +} + +func (s *Server) handleSearch(w http.ResponseWriter, r *http.Request) { + query := r.URL.Query().Get("q") + if query == "" { + http.Error(w, "Query parameter 'q' is required", http.StatusBadRequest) + return + } + + results := s.proc.Search(query) + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]interface{}{ + "query": query, + "count": len(results), + "results": results, + }) +} + +func (s *Server) handleList(w http.ResponseWriter, r *http.Request) { + docs := s.proc.ListDocuments() + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]interface{}{ + "count": len(docs), + "docs": docs, + }) +} + +func (s *Server) handleDoc(w http.ResponseWriter, r *http.Request) { + // Extract ID from path: /doc/{id} + id := strings.TrimPrefix(r.URL.Path, "/doc/") + if id == "" { + http.Error(w, "Document ID required", http.StatusBadRequest) + return + } + + doc := s.proc.GetDocument(id) + if doc == nil { + http.Error(w, "Document not found", http.StatusNotFound) + return + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(doc) +} diff --git a/docproc b/docproc new file mode 100755 index 0000000..575960e Binary files /dev/null and b/docproc differ diff --git a/docproc.service b/docproc.service new file mode 100644 index 0000000..2eb457a --- /dev/null +++ b/docproc.service @@ -0,0 +1,15 @@ +[Unit] +Description=Document Processor Service +After=network.target + +[Service] +Type=simple +User=johan +WorkingDirectory=/home/johan/dev/docproc +ExecStart=/home/johan/dev/docproc/docproc +Restart=on-failure +RestartSec=5 +Environment=FIREWORKS_API_KEY= + +[Install] +WantedBy=multi-user.target diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..c0b4c64 --- /dev/null +++ b/go.mod @@ -0,0 +1,8 @@ +module docproc + +go 1.22.5 + +require ( + github.com/fsnotify/fsnotify v1.9.0 // indirect + golang.org/x/sys v0.13.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..c1e3272 --- /dev/null +++ b/go.sum @@ -0,0 +1,4 @@ +github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k= +github.com/fsnotify/fsnotify v1.9.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0= +golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= +golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/main.go b/main.go new file mode 100644 index 0000000..187a1fa --- /dev/null +++ b/main.go @@ -0,0 +1,45 @@ +package main + +import ( + "log" + "os" + "os/signal" + "syscall" + + "docproc/api" + "docproc/processor" + "docproc/watcher" +) + +func main() { + log.SetFlags(log.LstdFlags | log.Lshortfile) + log.Println("Starting Document Processor...") + + // Initialize processor + proc := processor.New() + + // Start file watcher + w, err := watcher.New(proc) + if err != nil { + log.Fatalf("Failed to start watcher: %v", err) + } + go w.Watch() + + // Start HTTP API + server := api.NewServer(proc) + go func() { + if err := server.Start(":9900"); err != nil { + log.Fatalf("Server failed: %v", err) + } + }() + + log.Println("Document Processor running on :9900") + log.Println("Watching ~/documents/inbox/") + + // Graceful shutdown + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + <-sigChan + log.Println("Shutting down...") + w.Stop() +} diff --git a/processor/processor.go b/processor/processor.go new file mode 100644 index 0000000..af618af --- /dev/null +++ b/processor/processor.go @@ -0,0 +1,461 @@ +package processor + +import ( + "bytes" + "crypto/sha256" + "encoding/base64" + "encoding/json" + "fmt" + "io" + "log" + "net/http" + "os" + "path/filepath" + "strings" + "sync" + "time" +) + +type Document struct { + ID string `json:"id"` + Hash string `json:"hash"` + Filename string `json:"filename"` + Category string `json:"category"` + Summary string `json:"summary"` + FullText string `json:"full_text"` + PDFPath string `json:"pdf_path"` + RecordPath string `json:"record_path"` + ProcessedAt time.Time `json:"processed_at"` + IsExpense bool `json:"is_expense"` + Amount string `json:"amount,omitempty"` + Vendor string `json:"vendor,omitempty"` + Date string `json:"date,omitempty"` +} + +type Processor struct { + mu sync.RWMutex + docs map[string]*Document + baseDir string + apiKey string + apiURL string +} + +func New() *Processor { + homeDir, _ := os.UserHomeDir() + apiKey := os.Getenv("FIREWORKS_API_KEY") + if apiKey == "" { + apiKey = "PLACEHOLDER_API_KEY" + log.Println("WARNING: FIREWORKS_API_KEY not set, using placeholder") + } + + p := &Processor{ + docs: make(map[string]*Document), + baseDir: filepath.Join(homeDir, "documents"), + apiKey: apiKey, + apiURL: "https://api.fireworks.ai/inference/v1/chat/completions", + } + + // Load existing index + p.loadIndex() + return p +} + +func (p *Processor) loadIndex() { + indexPath := filepath.Join(p.baseDir, "index", "master.json") + data, err := os.ReadFile(indexPath) + if err != nil { + return + } + + var docs []*Document + if err := json.Unmarshal(data, &docs); err != nil { + return + } + + for _, doc := range docs { + p.docs[doc.ID] = doc + } + log.Printf("Loaded %d documents from index", len(docs)) +} + +func (p *Processor) saveIndex() error { + p.mu.RLock() + docs := make([]*Document, 0, len(p.docs)) + for _, doc := range p.docs { + docs = append(docs, doc) + } + p.mu.RUnlock() + + data, err := json.MarshalIndent(docs, "", " ") + if err != nil { + return err + } + + indexPath := filepath.Join(p.baseDir, "index", "master.json") + return os.WriteFile(indexPath, data, 0644) +} + +func (p *Processor) ProcessFile(path string) error { + log.Printf("Processing: %s", path) + + // Read file + data, err := os.ReadFile(path) + if err != nil { + return fmt.Errorf("failed to read file: %w", err) + } + + // Calculate hash + hash := fmt.Sprintf("%x", sha256.Sum256(data)) + shortHash := hash[:12] + + // Check if already processed + p.mu.RLock() + for _, doc := range p.docs { + if doc.Hash == hash { + p.mu.RUnlock() + log.Printf("Already processed: %s (hash: %s)", path, shortHash) + os.Remove(path) + return nil + } + } + p.mu.RUnlock() + + // Determine file type + ext := strings.ToLower(filepath.Ext(path)) + var mimeType string + switch ext { + case ".pdf": + mimeType = "application/pdf" + case ".png": + mimeType = "image/png" + case ".jpg", ".jpeg": + mimeType = "image/jpeg" + default: + return fmt.Errorf("unsupported file type: %s", ext) + } + + // Send to Kimi for OCR + classification + result, err := p.analyzeWithKimi(data, mimeType, filepath.Base(path)) + if err != nil { + return fmt.Errorf("Kimi analysis failed: %w", err) + } + + // Create document record + doc := &Document{ + ID: shortHash, + Hash: hash, + Filename: filepath.Base(path), + Category: result.Category, + Summary: result.Summary, + FullText: result.FullText, + ProcessedAt: time.Now(), + IsExpense: result.IsExpense, + Amount: result.Amount, + Vendor: result.Vendor, + Date: result.Date, + } + + // Store PDF + storePath := filepath.Join(p.baseDir, "store", shortHash+".pdf") + if ext == ".pdf" { + if err := copyFile(path, storePath); err != nil { + return fmt.Errorf("failed to store PDF: %w", err) + } + } else { + // For images, just store as-is with original extension + storePath = filepath.Join(p.baseDir, "store", shortHash+ext) + if err := copyFile(path, storePath); err != nil { + return fmt.Errorf("failed to store file: %w", err) + } + } + doc.PDFPath = storePath + + // Create record + recordDir := filepath.Join(p.baseDir, "records", sanitizeCategory(doc.Category)) + os.MkdirAll(recordDir, 0755) + recordPath := filepath.Join(recordDir, shortHash+".md") + + record := fmt.Sprintf(`# %s + +**ID:** %s +**Original File:** %s +**Category:** %s +**Processed:** %s +**Stored PDF:** %s + +## Summary + +%s + +## Full Text + +%s +`, doc.Filename, doc.ID, doc.Filename, doc.Category, doc.ProcessedAt.Format(time.RFC3339), doc.PDFPath, doc.Summary, doc.FullText) + + if err := os.WriteFile(recordPath, []byte(record), 0644); err != nil { + return fmt.Errorf("failed to write record: %w", err) + } + doc.RecordPath = recordPath + + // Export expense if applicable + if doc.IsExpense { + if err := p.exportExpense(doc); err != nil { + log.Printf("Failed to export expense: %v", err) + } + } + + // Add to index + p.mu.Lock() + p.docs[doc.ID] = doc + p.mu.Unlock() + + if err := p.saveIndex(); err != nil { + log.Printf("Failed to save index: %v", err) + } + + // Remove from inbox + os.Remove(path) + + log.Printf("Processed: %s -> %s (%s)", filepath.Base(path), doc.Category, doc.ID) + return nil +} + +type KimiResult struct { + Category string + Summary string + FullText string + IsExpense bool + Amount string + Vendor string + Date string +} + +func (p *Processor) analyzeWithKimi(data []byte, mimeType, filename string) (*KimiResult, error) { + // Encode file as base64 + b64Data := base64.StdEncoding.EncodeToString(data) + + // Build request for Kimi K2.5 via Fireworks + prompt := `Analyze this document and extract: + +1. **Category**: One of: tax, expense, bill, invoice, medical, receipt, bank, insurance, legal, correspondence, other +2. **Summary**: 2-3 sentence summary of the document +3. **Full Text**: Complete OCR text extraction +4. **Is Expense**: true/false - is this a business expense, receipt, or invoice? +5. **Amount**: If expense/invoice, the total amount (e.g., "$123.45") +6. **Vendor**: If expense/invoice, the vendor/company name +7. **Date**: Document date in YYYY-MM-DD format if visible + +Respond in JSON format: +{ + "category": "...", + "summary": "...", + "full_text": "...", + "is_expense": true/false, + "amount": "...", + "vendor": "...", + "date": "..." +}` + + var content []interface{} + + // Add image/document + if strings.HasPrefix(mimeType, "image/") { + content = append(content, map[string]interface{}{ + "type": "image_url", + "image_url": map[string]string{ + "url": fmt.Sprintf("data:%s;base64,%s", mimeType, b64Data), + }, + }) + } else { + // For PDFs, encode as data URL + content = append(content, map[string]interface{}{ + "type": "image_url", + "image_url": map[string]string{ + "url": fmt.Sprintf("data:%s;base64,%s", mimeType, b64Data), + }, + }) + } + + content = append(content, map[string]interface{}{ + "type": "text", + "text": prompt, + }) + + reqBody := map[string]interface{}{ + "model": "accounts/fireworks/models/kimi-k2-5-instruct", + "messages": []map[string]interface{}{ + { + "role": "user", + "content": content, + }, + }, + "max_tokens": 4096, + "temperature": 0.1, + } + + jsonBody, err := json.Marshal(reqBody) + if err != nil { + return nil, err + } + + req, err := http.NewRequest("POST", p.apiURL, bytes.NewReader(jsonBody)) + if err != nil { + return nil, err + } + + req.Header.Set("Authorization", "Bearer "+p.apiKey) + req.Header.Set("Content-Type", "application/json") + + client := &http.Client{Timeout: 120 * time.Second} + resp, err := client.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("API error %d: %s", resp.StatusCode, string(body)) + } + + var apiResp struct { + Choices []struct { + Message struct { + Content string `json:"content"` + } `json:"message"` + } `json:"choices"` + } + + if err := json.NewDecoder(resp.Body).Decode(&apiResp); err != nil { + return nil, err + } + + if len(apiResp.Choices) == 0 { + return nil, fmt.Errorf("no response from API") + } + + // Parse JSON response + content_str := apiResp.Choices[0].Message.Content + + // Extract JSON from response (might be wrapped in markdown) + jsonStart := strings.Index(content_str, "{") + jsonEnd := strings.LastIndex(content_str, "}") + if jsonStart >= 0 && jsonEnd > jsonStart { + content_str = content_str[jsonStart : jsonEnd+1] + } + + var result struct { + Category string `json:"category"` + Summary string `json:"summary"` + FullText string `json:"full_text"` + IsExpense bool `json:"is_expense"` + Amount string `json:"amount"` + Vendor string `json:"vendor"` + Date string `json:"date"` + } + + if err := json.Unmarshal([]byte(content_str), &result); err != nil { + // If JSON parsing fails, use defaults + return &KimiResult{ + Category: "other", + Summary: "Failed to parse document", + FullText: content_str, + }, nil + } + + return &KimiResult{ + Category: result.Category, + Summary: result.Summary, + FullText: result.FullText, + IsExpense: result.IsExpense, + Amount: result.Amount, + Vendor: result.Vendor, + Date: result.Date, + }, nil +} + +func (p *Processor) exportExpense(doc *Document) error { + csvPath := filepath.Join(p.baseDir, "exports", "expenses.csv") + + // Check if file exists, create with header if not + if _, err := os.Stat(csvPath); os.IsNotExist(err) { + header := "id,date,vendor,amount,category,summary,pdf_path\n" + if err := os.WriteFile(csvPath, []byte(header), 0644); err != nil { + return err + } + } + + // Append expense + f, err := os.OpenFile(csvPath, os.O_APPEND|os.O_WRONLY, 0644) + if err != nil { + return err + } + defer f.Close() + + // Escape CSV fields + summary := strings.ReplaceAll(doc.Summary, "\"", "\"\"") + line := fmt.Sprintf("%s,%s,\"%s\",%s,%s,\"%s\",%s\n", + doc.ID, doc.Date, doc.Vendor, doc.Amount, doc.Category, summary, doc.PDFPath) + + _, err = f.WriteString(line) + return err +} + +func (p *Processor) GetDocument(id string) *Document { + p.mu.RLock() + defer p.mu.RUnlock() + return p.docs[id] +} + +func (p *Processor) Search(query string) []*Document { + p.mu.RLock() + defer p.mu.RUnlock() + + query = strings.ToLower(query) + var results []*Document + + for _, doc := range p.docs { + if strings.Contains(strings.ToLower(doc.Summary), query) || + strings.Contains(strings.ToLower(doc.FullText), query) || + strings.Contains(strings.ToLower(doc.Category), query) || + strings.Contains(strings.ToLower(doc.Vendor), query) { + results = append(results, doc) + } + } + + return results +} + +func (p *Processor) ListDocuments() []*Document { + p.mu.RLock() + defer p.mu.RUnlock() + + docs := make([]*Document, 0, len(p.docs)) + for _, doc := range p.docs { + docs = append(docs, doc) + } + return docs +} + +func sanitizeCategory(cat string) string { + cat = strings.ToLower(cat) + cat = strings.ReplaceAll(cat, " ", "-") + return cat +} + +func copyFile(src, dst string) error { + in, err := os.Open(src) + if err != nil { + return err + } + defer in.Close() + + out, err := os.Create(dst) + if err != nil { + return err + } + defer out.Close() + + _, err = io.Copy(out, in) + return err +} diff --git a/watcher/watcher.go b/watcher/watcher.go new file mode 100644 index 0000000..1a5143f --- /dev/null +++ b/watcher/watcher.go @@ -0,0 +1,103 @@ +package watcher + +import ( + "log" + "os" + "path/filepath" + "strings" + "time" + + "docproc/processor" + + "github.com/fsnotify/fsnotify" +) + +type Watcher struct { + proc *processor.Processor + watcher *fsnotify.Watcher + inbox string + stop chan struct{} +} + +func New(proc *processor.Processor) (*Watcher, error) { + homeDir, _ := os.UserHomeDir() + inbox := filepath.Join(homeDir, "documents", "inbox") + + // Ensure inbox exists + os.MkdirAll(inbox, 0755) + + fsWatcher, err := fsnotify.NewWatcher() + if err != nil { + return nil, err + } + + if err := fsWatcher.Add(inbox); err != nil { + fsWatcher.Close() + return nil, err + } + + return &Watcher{ + proc: proc, + watcher: fsWatcher, + inbox: inbox, + stop: make(chan struct{}), + }, nil +} + +func (w *Watcher) Watch() { + // Process any existing files first + w.processExisting() + + for { + select { + case event, ok := <-w.watcher.Events: + if !ok { + return + } + if event.Op&fsnotify.Create == fsnotify.Create { + // Small delay to ensure file is fully written + time.Sleep(500 * time.Millisecond) + w.processFile(event.Name) + } + case err, ok := <-w.watcher.Errors: + if !ok { + return + } + log.Printf("Watcher error: %v", err) + case <-w.stop: + return + } + } +} + +func (w *Watcher) processExisting() { + entries, err := os.ReadDir(w.inbox) + if err != nil { + log.Printf("Failed to read inbox: %v", err) + return + } + + for _, entry := range entries { + if entry.IsDir() { + continue + } + path := filepath.Join(w.inbox, entry.Name()) + w.processFile(path) + } +} + +func (w *Watcher) processFile(path string) { + ext := strings.ToLower(filepath.Ext(path)) + if ext != ".pdf" && ext != ".png" && ext != ".jpg" && ext != ".jpeg" { + return + } + + if err := w.proc.ProcessFile(path); err != nil { + log.Printf("Failed to process %s: %v", path, err) + } +} + +func (w *Watcher) Stop() { + close(w.stop) + w.watcher.Close() +}