package processor import ( "bytes" "crypto/sha256" "encoding/base64" "encoding/json" "fmt" "io" "log" "net/http" "os" "path/filepath" "strings" "sync" "time" ) type Document struct { ID string `json:"id"` Hash string `json:"hash"` Filename string `json:"filename"` Category string `json:"category"` Summary string `json:"summary"` FullText string `json:"full_text"` PDFPath string `json:"pdf_path"` RecordPath string `json:"record_path"` ProcessedAt time.Time `json:"processed_at"` IsExpense bool `json:"is_expense"` Amount string `json:"amount,omitempty"` Vendor string `json:"vendor,omitempty"` Date string `json:"date,omitempty"` } type Processor struct { mu sync.RWMutex docs map[string]*Document baseDir string apiKey string apiURL string } func New() *Processor { homeDir, _ := os.UserHomeDir() apiKey := os.Getenv("FIREWORKS_API_KEY") if apiKey == "" { apiKey = "PLACEHOLDER_API_KEY" log.Println("WARNING: FIREWORKS_API_KEY not set, using placeholder") } p := &Processor{ docs: make(map[string]*Document), baseDir: filepath.Join(homeDir, "documents"), apiKey: apiKey, apiURL: "https://api.fireworks.ai/inference/v1/chat/completions", } // Load existing index p.loadIndex() return p } func (p *Processor) loadIndex() { indexPath := filepath.Join(p.baseDir, "index", "master.json") data, err := os.ReadFile(indexPath) if err != nil { return } var docs []*Document if err := json.Unmarshal(data, &docs); err != nil { return } for _, doc := range docs { p.docs[doc.ID] = doc } log.Printf("Loaded %d documents from index", len(docs)) } func (p *Processor) saveIndex() error { p.mu.RLock() docs := make([]*Document, 0, len(p.docs)) for _, doc := range p.docs { docs = append(docs, doc) } p.mu.RUnlock() data, err := json.MarshalIndent(docs, "", " ") if err != nil { return err } indexPath := filepath.Join(p.baseDir, "index", "master.json") return os.WriteFile(indexPath, data, 0644) } func (p *Processor) ProcessFile(path string) error { log.Printf("Processing: %s", path) // Read file data, err := os.ReadFile(path) if err != nil { return fmt.Errorf("failed to read file: %w", err) } // Calculate hash hash := fmt.Sprintf("%x", sha256.Sum256(data)) shortHash := hash[:12] // Check if already processed p.mu.RLock() for _, doc := range p.docs { if doc.Hash == hash { p.mu.RUnlock() log.Printf("Already processed: %s (hash: %s)", path, shortHash) os.Remove(path) return nil } } p.mu.RUnlock() // Determine file type ext := strings.ToLower(filepath.Ext(path)) var mimeType string switch ext { case ".pdf": mimeType = "application/pdf" case ".png": mimeType = "image/png" case ".jpg", ".jpeg": mimeType = "image/jpeg" default: return fmt.Errorf("unsupported file type: %s", ext) } // Send to Kimi for OCR + classification result, err := p.analyzeWithKimi(data, mimeType, filepath.Base(path)) if err != nil { return fmt.Errorf("Kimi analysis failed: %w", err) } // Create document record doc := &Document{ ID: shortHash, Hash: hash, Filename: filepath.Base(path), Category: result.Category, Summary: result.Summary, FullText: result.FullText, ProcessedAt: time.Now(), IsExpense: result.IsExpense, Amount: result.Amount, Vendor: result.Vendor, Date: result.Date, } // Store PDF storePath := filepath.Join(p.baseDir, "store", shortHash+".pdf") if ext == ".pdf" { if err := copyFile(path, storePath); err != nil { return fmt.Errorf("failed to store PDF: %w", err) } } else { // For images, just store as-is with original extension storePath = filepath.Join(p.baseDir, "store", shortHash+ext) if err := copyFile(path, storePath); err != nil { return fmt.Errorf("failed to store file: %w", err) } } doc.PDFPath = storePath // Create record recordDir := filepath.Join(p.baseDir, "records", sanitizeCategory(doc.Category)) os.MkdirAll(recordDir, 0755) recordPath := filepath.Join(recordDir, shortHash+".md") record := fmt.Sprintf(`# %s **ID:** %s **Original File:** %s **Category:** %s **Processed:** %s **Stored PDF:** %s ## Summary %s ## Full Text %s `, doc.Filename, doc.ID, doc.Filename, doc.Category, doc.ProcessedAt.Format(time.RFC3339), doc.PDFPath, doc.Summary, doc.FullText) if err := os.WriteFile(recordPath, []byte(record), 0644); err != nil { return fmt.Errorf("failed to write record: %w", err) } doc.RecordPath = recordPath // Export expense if applicable if doc.IsExpense { if err := p.exportExpense(doc); err != nil { log.Printf("Failed to export expense: %v", err) } } // Add to index p.mu.Lock() p.docs[doc.ID] = doc p.mu.Unlock() if err := p.saveIndex(); err != nil { log.Printf("Failed to save index: %v", err) } // Remove from inbox os.Remove(path) log.Printf("Processed: %s -> %s (%s)", filepath.Base(path), doc.Category, doc.ID) return nil } type KimiResult struct { Category string Summary string FullText string IsExpense bool Amount string Vendor string Date string } func (p *Processor) analyzeWithKimi(data []byte, mimeType, filename string) (*KimiResult, error) { // Encode file as base64 b64Data := base64.StdEncoding.EncodeToString(data) // Build request for Kimi K2.5 via Fireworks prompt := `Analyze this document and extract: 1. **Category**: One of: tax, expense, bill, invoice, medical, receipt, bank, insurance, legal, correspondence, other 2. **Summary**: 2-3 sentence summary of the document 3. **Full Text**: Complete OCR text extraction 4. **Is Expense**: true/false - is this a business expense, receipt, or invoice? 5. **Amount**: If expense/invoice, the total amount (e.g., "$123.45") 6. **Vendor**: If expense/invoice, the vendor/company name 7. **Date**: Document date in YYYY-MM-DD format if visible Respond in JSON format: { "category": "...", "summary": "...", "full_text": "...", "is_expense": true/false, "amount": "...", "vendor": "...", "date": "..." }` var content []interface{} // Add image/document if strings.HasPrefix(mimeType, "image/") { content = append(content, map[string]interface{}{ "type": "image_url", "image_url": map[string]string{ "url": fmt.Sprintf("data:%s;base64,%s", mimeType, b64Data), }, }) } else { // For PDFs, encode as data URL content = append(content, map[string]interface{}{ "type": "image_url", "image_url": map[string]string{ "url": fmt.Sprintf("data:%s;base64,%s", mimeType, b64Data), }, }) } content = append(content, map[string]interface{}{ "type": "text", "text": prompt, }) reqBody := map[string]interface{}{ "model": "accounts/fireworks/models/kimi-k2-5-instruct", "messages": []map[string]interface{}{ { "role": "user", "content": content, }, }, "max_tokens": 4096, "temperature": 0.1, } jsonBody, err := json.Marshal(reqBody) if err != nil { return nil, err } req, err := http.NewRequest("POST", p.apiURL, bytes.NewReader(jsonBody)) if err != nil { return nil, err } req.Header.Set("Authorization", "Bearer "+p.apiKey) req.Header.Set("Content-Type", "application/json") client := &http.Client{Timeout: 120 * time.Second} resp, err := client.Do(req) if err != nil { return nil, err } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(resp.Body) return nil, fmt.Errorf("API error %d: %s", resp.StatusCode, string(body)) } var apiResp struct { Choices []struct { Message struct { Content string `json:"content"` } `json:"message"` } `json:"choices"` } if err := json.NewDecoder(resp.Body).Decode(&apiResp); err != nil { return nil, err } if len(apiResp.Choices) == 0 { return nil, fmt.Errorf("no response from API") } // Parse JSON response content_str := apiResp.Choices[0].Message.Content // Extract JSON from response (might be wrapped in markdown) jsonStart := strings.Index(content_str, "{") jsonEnd := strings.LastIndex(content_str, "}") if jsonStart >= 0 && jsonEnd > jsonStart { content_str = content_str[jsonStart : jsonEnd+1] } var result struct { Category string `json:"category"` Summary string `json:"summary"` FullText string `json:"full_text"` IsExpense bool `json:"is_expense"` Amount string `json:"amount"` Vendor string `json:"vendor"` Date string `json:"date"` } if err := json.Unmarshal([]byte(content_str), &result); err != nil { // If JSON parsing fails, use defaults return &KimiResult{ Category: "other", Summary: "Failed to parse document", FullText: content_str, }, nil } return &KimiResult{ Category: result.Category, Summary: result.Summary, FullText: result.FullText, IsExpense: result.IsExpense, Amount: result.Amount, Vendor: result.Vendor, Date: result.Date, }, nil } func (p *Processor) exportExpense(doc *Document) error { csvPath := filepath.Join(p.baseDir, "exports", "expenses.csv") // Check if file exists, create with header if not if _, err := os.Stat(csvPath); os.IsNotExist(err) { header := "id,date,vendor,amount,category,summary,pdf_path\n" if err := os.WriteFile(csvPath, []byte(header), 0644); err != nil { return err } } // Append expense f, err := os.OpenFile(csvPath, os.O_APPEND|os.O_WRONLY, 0644) if err != nil { return err } defer f.Close() // Escape CSV fields summary := strings.ReplaceAll(doc.Summary, "\"", "\"\"") line := fmt.Sprintf("%s,%s,\"%s\",%s,%s,\"%s\",%s\n", doc.ID, doc.Date, doc.Vendor, doc.Amount, doc.Category, summary, doc.PDFPath) _, err = f.WriteString(line) return err } func (p *Processor) GetDocument(id string) *Document { p.mu.RLock() defer p.mu.RUnlock() return p.docs[id] } func (p *Processor) Search(query string) []*Document { p.mu.RLock() defer p.mu.RUnlock() query = strings.ToLower(query) var results []*Document for _, doc := range p.docs { if strings.Contains(strings.ToLower(doc.Summary), query) || strings.Contains(strings.ToLower(doc.FullText), query) || strings.Contains(strings.ToLower(doc.Category), query) || strings.Contains(strings.ToLower(doc.Vendor), query) { results = append(results, doc) } } return results } func (p *Processor) ListDocuments() []*Document { p.mu.RLock() defer p.mu.RUnlock() docs := make([]*Document, 0, len(p.docs)) for _, doc := range p.docs { docs = append(docs, doc) } return docs } func sanitizeCategory(cat string) string { cat = strings.ToLower(cat) cat = strings.ReplaceAll(cat, " ", "-") return cat } func copyFile(src, dst string) error { in, err := os.Open(src) if err != nil { return err } defer in.Close() out, err := os.Create(dst) if err != nil { return err } defer out.Close() _, err = io.Copy(out, in) return err }