125 lines
3.0 KiB
Go
125 lines
3.0 KiB
Go
package main
|
|
|
|
import (
|
|
"log"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/fsnotify/fsnotify"
|
|
)
|
|
|
|
// InboxWatcher watches the inbox directory for new files via inotify
|
|
type InboxWatcher struct {
|
|
dir string
|
|
}
|
|
|
|
// StartInboxWatcher launches a background goroutine that watches the inbox directory
|
|
func StartInboxWatcher() {
|
|
w := &InboxWatcher{dir: inboxDir}
|
|
go w.run()
|
|
}
|
|
|
|
func (w *InboxWatcher) run() {
|
|
watcher, err := fsnotify.NewWatcher()
|
|
if err != nil {
|
|
log.Printf("❌ Inbox watcher failed to start: %v", err)
|
|
return
|
|
}
|
|
defer watcher.Close()
|
|
|
|
os.MkdirAll(w.dir, 0755)
|
|
|
|
if err := watcher.Add(w.dir); err != nil {
|
|
log.Printf("❌ Inbox watcher failed to watch %s: %v", w.dir, err)
|
|
return
|
|
}
|
|
|
|
log.Printf("👁️ Inbox watcher started: %s", w.dir)
|
|
|
|
// Debounce: wait for writes to finish before processing
|
|
pending := make(map[string]time.Time)
|
|
ticker := time.NewTicker(1 * time.Second)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case event, ok := <-watcher.Events:
|
|
if !ok {
|
|
return
|
|
}
|
|
// Track files on create or write (scanner may write in chunks)
|
|
if event.Op&(fsnotify.Create|fsnotify.Write) != 0 {
|
|
name := filepath.Base(event.Name)
|
|
// Skip hidden files, temp files, and non-document files
|
|
if strings.HasPrefix(name, ".") || strings.HasPrefix(name, "._") {
|
|
continue
|
|
}
|
|
ext := strings.ToLower(filepath.Ext(name))
|
|
allowed := map[string]bool{
|
|
".pdf": true, ".jpg": true, ".jpeg": true, ".png": true,
|
|
".tiff": true, ".tif": true, ".bmp": true,
|
|
".doc": true, ".docx": true, ".odt": true, ".rtf": true,
|
|
".xls": true, ".xlsx": true, ".ppt": true, ".pptx": true,
|
|
".txt": true, ".csv": true, ".md": true,
|
|
}
|
|
if !allowed[ext] {
|
|
continue
|
|
}
|
|
pending[event.Name] = time.Now()
|
|
}
|
|
|
|
case err, ok := <-watcher.Errors:
|
|
if !ok {
|
|
return
|
|
}
|
|
log.Printf("Inbox watcher error: %v", err)
|
|
|
|
case <-ticker.C:
|
|
// Process files that haven't been written to for 2 seconds (transfer complete)
|
|
now := time.Now()
|
|
for path, lastWrite := range pending {
|
|
if now.Sub(lastWrite) < 2*time.Second {
|
|
continue
|
|
}
|
|
delete(pending, path)
|
|
|
|
// Verify file still exists and has content
|
|
info, err := os.Stat(path)
|
|
if err != nil || info.Size() == 0 {
|
|
continue
|
|
}
|
|
|
|
w.processFile(path)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (w *InboxWatcher) processFile(filePath string) {
|
|
fname := filepath.Base(filePath)
|
|
log.Printf("📄 Inbox: new file %s", fname)
|
|
|
|
// Check for duplicate
|
|
hash, _ := FileHash(filePath)
|
|
if existing, _ := GetDocument(hash); existing != nil && existing.Status == "ready" {
|
|
log.Printf(" Already exists (%s), skipping", hash)
|
|
os.Remove(filePath)
|
|
return
|
|
}
|
|
|
|
// Create pending document (shows in UI immediately)
|
|
InsertPendingDocument(hash, fname)
|
|
|
|
// Process async (same pipeline as web upload)
|
|
go func() {
|
|
if doc, err := ProcessDocument(filePath); err != nil {
|
|
log.Printf("Inbox process error for %s: %v", fname, err)
|
|
UpdateDocumentStatus(hash, "error")
|
|
} else {
|
|
log.Printf("📥 Processed: %s → %s/%s", fname, doc.Category, doc.ID)
|
|
}
|
|
}()
|
|
}
|