package ocr import ( "context" "errors" "log/slog" "strings" "sync" "archivmail/internal/index" "archivmail/internal/storage" "archivmail/pkg/mailparser" ) // Job describes one OCR work unit — extract text from all attachments of a // stored mail and feed the result back into the per-tenant index. type Job struct { MailID string TenantID *int64 } // Worker runs OCR jobs on a buffered channel using N background goroutines. // // Lifecycle: NewWorker → Start(ctx) → Submit(...) (n times) → Stop(). // Submit is non-blocking; jobs are dropped when the queue is full. type Worker struct { store *storage.Store idxMgr index.TenantIndexer logger *slog.Logger queue chan Job done chan struct{} wg sync.WaitGroup workers int langs []string } // Options configures a Worker. Zero values are replaced with sensible defaults. type Options struct { QueueSize int // default 1000 Workers int // default 2 Langs []string // default ["deu", "eng"] Logger *slog.Logger } // NewWorker constructs a worker that reads mails from store, runs OCR on // supported attachments, and pushes the combined text into the per-tenant // Manticore index via idxMgr. The store and idxMgr must be non-nil. func NewWorker(store *storage.Store, idxMgr index.TenantIndexer, opts Options) *Worker { if opts.QueueSize <= 0 { opts.QueueSize = 1000 } if opts.Workers <= 0 { opts.Workers = 2 } if len(opts.Langs) == 0 { opts.Langs = []string{"deu", "eng"} } if opts.Logger == nil { opts.Logger = slog.Default() } return &Worker{ store: store, idxMgr: idxMgr, logger: opts.Logger, queue: make(chan Job, opts.QueueSize), done: make(chan struct{}), workers: opts.Workers, langs: opts.Langs, } } // Submit enqueues a job. Drops with a warning if the queue is full so the // caller (mail intake) is never blocked. func (w *Worker) Submit(mailID string, tenantID *int64) { if mailID == "" { return } select { case w.queue <- Job{MailID: mailID, TenantID: tenantID}: default: w.logger.Warn("ocr worker: queue full, dropping job", "mail_id", mailID) } } // QueueLen returns the current number of pending jobs. func (w *Worker) QueueLen() int { return len(w.queue) } // Start launches w.workers goroutines that consume the queue until Stop is // called or ctx is cancelled. func (w *Worker) Start(ctx context.Context) { if !IsAvailable() { w.logger.Warn("ocr worker: tesseract/pdftotext not on PATH — OCR disabled at runtime") } for i := 0; i < w.workers; i++ { w.wg.Add(1) go w.run(ctx, i) } w.logger.Info("ocr worker: started", "workers", w.workers, "queue", cap(w.queue)) } // Stop drains the remaining queue and waits for all goroutines to exit. func (w *Worker) Stop() { close(w.done) w.wg.Wait() w.logger.Info("ocr worker: stopped") } func (w *Worker) run(ctx context.Context, id int) { defer w.wg.Done() for { select { case job, ok := <-w.queue: if !ok { return } w.process(ctx, job) case <-w.done: // Drain remaining jobs so nothing in-flight is lost. for { select { case job, ok := <-w.queue: if !ok { return } w.process(ctx, job) default: return } } case <-ctx.Done(): return } } } func (w *Worker) process(ctx context.Context, job Job) { logger := w.logger.With("mail_id", job.MailID, "tenant_id", job.TenantID) if w.store.OCREnabled(ctx, job.TenantID) == false { _ = w.store.SetOCRResult(ctx, job.MailID, "disabled", 0) return } raw, err := w.store.Load(job.MailID) if err != nil { logger.Warn("ocr worker: load failed", "err", err) _ = w.store.SetOCRResult(ctx, job.MailID, "failed", 0) return } pm, err := mailparser.Parse(raw) if err != nil { logger.Warn("ocr worker: parse failed", "err", err) _ = w.store.SetOCRResult(ctx, job.MailID, "failed", 0) return } if len(pm.Attachments) == 0 { _ = w.store.SetOCRResult(ctx, job.MailID, "skipped", 0) return } var combined strings.Builder processed := 0 for _, a := range pm.Attachments { text, err := ExtractText(ctx, a.Data, a.ContentType, a.Filename, w.langs) if err != nil { if errors.Is(err, ErrUnsupported) || errors.Is(err, ErrEncrypted) || errors.Is(err, ErrTooLarge) || errors.Is(err, ErrUnavailable) { logger.Debug("ocr worker: attachment skipped", "filename", a.Filename, "reason", err) continue } logger.Warn("ocr worker: extract failed", "filename", a.Filename, "err", err) continue } if t := strings.TrimSpace(text); t != "" { combined.WriteString(t) combined.WriteString("\n\n") processed++ } } if processed == 0 { _ = w.store.SetOCRResult(ctx, job.MailID, "skipped", 0) return } idx := w.idxMgr.ForTenant(job.TenantID) updater, ok := idx.(index.AttachmentTextUpdater) if !ok { logger.Warn("ocr worker: indexer does not support AttachmentTextUpdater — text dropped") _ = w.store.SetOCRResult(ctx, job.MailID, "failed", 0) return } if err := updater.UpdateAttachmentText(job.MailID, combined.String()); err != nil { logger.Warn("ocr worker: index update failed", "err", err) _ = w.store.SetOCRResult(ctx, job.MailID, "failed", 0) return } chars := int64(combined.Len()) _ = w.store.SetOCRResult(ctx, job.MailID, "done", chars) logger.Info("ocr worker: indexed", "attachments", processed, "chars", chars) }