Files
archivmail/internal/ocr/worker.go
T
sysops 0bda21033e feat(PROJ-35): OCR & Anhang-Volltext-Indexierung
Asynchrone OCR fuer PDF- und Bild-Anhaenge via tesseract + poppler-utils.
Extrahierter Text wird in Manticore (attachment_text) gespeichert und ist
ueber die normale Volltextsuche auffindbar.

- internal/ocr: ExtractText + Worker (queue + drain)
- internal/storage/ocr.go: SetOCRStatus, OCREnabled, GetMailsByOCRStatus
- emails.ocr_status (pending|done|failed|skipped|disabled)
- tenants.ocr_enabled (Default TRUE, opt-out)
- Manticore: attachment_text-Feld + UpdateAttachmentText
- Boot-resume: pending Jobs nach Restart automatisch in die Queue
- CLI: archivmail ocr-reprocess --tenant N --status pending|failed|all
- update.sh: tesseract-ocr + poppler-utils optional installieren
2026-05-08 22:11:17 +02:00

206 lines
5.2 KiB
Go

package ocr
import (
"context"
"errors"
"log/slog"
"strings"
"sync"
"archivmail/internal/index"
"archivmail/internal/storage"
"archivmail/pkg/mailparser"
)
// Job describes one OCR work unit — extract text from all attachments of a
// stored mail and feed the result back into the per-tenant index.
type Job struct {
MailID string
TenantID *int64
}
// Worker runs OCR jobs on a buffered channel using N background goroutines.
//
// Lifecycle: NewWorker → Start(ctx) → Submit(...) (n times) → Stop().
// Submit is non-blocking; jobs are dropped when the queue is full.
type Worker struct {
store *storage.Store
idxMgr index.TenantIndexer
logger *slog.Logger
queue chan Job
done chan struct{}
wg sync.WaitGroup
workers int
langs []string
}
// Options configures a Worker. Zero values are replaced with sensible defaults.
type Options struct {
QueueSize int // default 1000
Workers int // default 2
Langs []string // default ["deu", "eng"]
Logger *slog.Logger
}
// NewWorker constructs a worker that reads mails from store, runs OCR on
// supported attachments, and pushes the combined text into the per-tenant
// Manticore index via idxMgr. The store and idxMgr must be non-nil.
func NewWorker(store *storage.Store, idxMgr index.TenantIndexer, opts Options) *Worker {
if opts.QueueSize <= 0 {
opts.QueueSize = 1000
}
if opts.Workers <= 0 {
opts.Workers = 2
}
if len(opts.Langs) == 0 {
opts.Langs = []string{"deu", "eng"}
}
if opts.Logger == nil {
opts.Logger = slog.Default()
}
return &Worker{
store: store,
idxMgr: idxMgr,
logger: opts.Logger,
queue: make(chan Job, opts.QueueSize),
done: make(chan struct{}),
workers: opts.Workers,
langs: opts.Langs,
}
}
// Submit enqueues a job. Drops with a warning if the queue is full so the
// caller (mail intake) is never blocked.
func (w *Worker) Submit(mailID string, tenantID *int64) {
if mailID == "" {
return
}
select {
case w.queue <- Job{MailID: mailID, TenantID: tenantID}:
default:
w.logger.Warn("ocr worker: queue full, dropping job", "mail_id", mailID)
}
}
// QueueLen returns the current number of pending jobs.
func (w *Worker) QueueLen() int { return len(w.queue) }
// Start launches w.workers goroutines that consume the queue until Stop is
// called or ctx is cancelled.
func (w *Worker) Start(ctx context.Context) {
if !IsAvailable() {
w.logger.Warn("ocr worker: tesseract/pdftotext not on PATH — OCR disabled at runtime")
}
for i := 0; i < w.workers; i++ {
w.wg.Add(1)
go w.run(ctx, i)
}
w.logger.Info("ocr worker: started", "workers", w.workers, "queue", cap(w.queue))
}
// Stop drains the remaining queue and waits for all goroutines to exit.
func (w *Worker) Stop() {
close(w.done)
w.wg.Wait()
w.logger.Info("ocr worker: stopped")
}
func (w *Worker) run(ctx context.Context, id int) {
defer w.wg.Done()
for {
select {
case job, ok := <-w.queue:
if !ok {
return
}
w.process(ctx, job)
case <-w.done:
// Drain remaining jobs so nothing in-flight is lost.
for {
select {
case job, ok := <-w.queue:
if !ok {
return
}
w.process(ctx, job)
default:
return
}
}
case <-ctx.Done():
return
}
}
}
func (w *Worker) process(ctx context.Context, job Job) {
logger := w.logger.With("mail_id", job.MailID, "tenant_id", job.TenantID)
if w.store.OCREnabled(ctx, job.TenantID) == false {
_ = w.store.SetOCRStatus(ctx, job.MailID, "disabled")
return
}
raw, err := w.store.Load(job.MailID)
if err != nil {
logger.Warn("ocr worker: load failed", "err", err)
_ = w.store.SetOCRStatus(ctx, job.MailID, "failed")
return
}
pm, err := mailparser.Parse(raw)
if err != nil {
logger.Warn("ocr worker: parse failed", "err", err)
_ = w.store.SetOCRStatus(ctx, job.MailID, "failed")
return
}
if len(pm.Attachments) == 0 {
_ = w.store.SetOCRStatus(ctx, job.MailID, "skipped")
return
}
var combined strings.Builder
processed := 0
for _, a := range pm.Attachments {
text, err := ExtractText(ctx, a.Data, a.ContentType, a.Filename, w.langs)
if err != nil {
if errors.Is(err, ErrUnsupported) || errors.Is(err, ErrEncrypted) ||
errors.Is(err, ErrTooLarge) || errors.Is(err, ErrUnavailable) {
logger.Debug("ocr worker: attachment skipped",
"filename", a.Filename, "reason", err)
continue
}
logger.Warn("ocr worker: extract failed",
"filename", a.Filename, "err", err)
continue
}
if t := strings.TrimSpace(text); t != "" {
combined.WriteString(t)
combined.WriteString("\n\n")
processed++
}
}
if processed == 0 {
_ = w.store.SetOCRStatus(ctx, job.MailID, "skipped")
return
}
idx := w.idxMgr.ForTenant(job.TenantID)
updater, ok := idx.(index.AttachmentTextUpdater)
if !ok {
logger.Warn("ocr worker: indexer does not support AttachmentTextUpdater — text dropped")
_ = w.store.SetOCRStatus(ctx, job.MailID, "failed")
return
}
if err := updater.UpdateAttachmentText(job.MailID, combined.String()); err != nil {
logger.Warn("ocr worker: index update failed", "err", err)
_ = w.store.SetOCRStatus(ctx, job.MailID, "failed")
return
}
_ = w.store.SetOCRStatus(ctx, job.MailID, "done")
logger.Info("ocr worker: indexed", "attachments", processed, "chars", combined.Len())
}