16013e8b66
Strukturbug auf 132 gefunden: Tenant-User (Rolle user) sahen ihren OCR-Text nicht, obwohl ocr_chars>0 in PostgreSQL stand. Ursache: - OCR-Worker hat in den per Job.TenantID gewaehlten Index geschrieben. Beim Reprocess via CLI kam TenantID aus dem Submitter-Kontext und konnte vom in emails.tenant_id gespeicherten Wert abweichen. - /ocr-text-Endpoint hat fuer die Index-Auswahl session.TenantID benutzt. Bei Admin/Auditor (nil Session-Tenant) wurde immer global gelesen, auch wenn die Mail einem Tenant gehoert. Fix: Beide Stellen lesen jetzt die TenantID **immer** aus storage.GetTenantForMail(emails.tenant_id) und routen den Manticore-Index entsprechend. ACL-Check im Endpoint bleibt unveraendert auf session.TenantID == mail.tenant_id — die Tenant-Isolation wird nicht aufgeweicht. Edge cases: - Mail mit tenant_id NULL: GetTenantForMail liefert nil -> globaler Index (vorher und nachher gleich). - DB-Fehler beim Lookup: faellt auf nil zurueck -> globaler Index, liefert leeren Text fuer Tenant-Mails -> 404. Safe (keine Querleckage zwischen Tenants).
220 lines
5.9 KiB
Go
220 lines
5.9 KiB
Go
package ocr
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"log/slog"
|
|
"strings"
|
|
"sync"
|
|
|
|
"archivmail/internal/index"
|
|
"archivmail/internal/storage"
|
|
"archivmail/pkg/mailparser"
|
|
)
|
|
|
|
// Job describes one OCR work unit — extract text from all attachments of a
|
|
// stored mail and feed the result back into the per-tenant index.
|
|
type Job struct {
|
|
MailID string
|
|
TenantID *int64
|
|
}
|
|
|
|
// Worker runs OCR jobs on a buffered channel using N background goroutines.
|
|
//
|
|
// Lifecycle: NewWorker → Start(ctx) → Submit(...) (n times) → Stop().
|
|
// Submit is non-blocking; jobs are dropped when the queue is full.
|
|
type Worker struct {
|
|
store *storage.Store
|
|
idxMgr index.TenantIndexer
|
|
logger *slog.Logger
|
|
queue chan Job
|
|
done chan struct{}
|
|
wg sync.WaitGroup
|
|
workers int
|
|
langs []string
|
|
}
|
|
|
|
// Options configures a Worker. Zero values are replaced with sensible defaults.
|
|
type Options struct {
|
|
QueueSize int // default 1000
|
|
Workers int // default 2
|
|
Langs []string // default ["deu", "eng"]
|
|
Logger *slog.Logger
|
|
}
|
|
|
|
// NewWorker constructs a worker that reads mails from store, runs OCR on
|
|
// supported attachments, and pushes the combined text into the per-tenant
|
|
// Manticore index via idxMgr. The store and idxMgr must be non-nil.
|
|
func NewWorker(store *storage.Store, idxMgr index.TenantIndexer, opts Options) *Worker {
|
|
if opts.QueueSize <= 0 {
|
|
opts.QueueSize = 1000
|
|
}
|
|
if opts.Workers <= 0 {
|
|
opts.Workers = 2
|
|
}
|
|
if len(opts.Langs) == 0 {
|
|
opts.Langs = []string{"deu", "eng"}
|
|
}
|
|
if opts.Logger == nil {
|
|
opts.Logger = slog.Default()
|
|
}
|
|
return &Worker{
|
|
store: store,
|
|
idxMgr: idxMgr,
|
|
logger: opts.Logger,
|
|
queue: make(chan Job, opts.QueueSize),
|
|
done: make(chan struct{}),
|
|
workers: opts.Workers,
|
|
langs: opts.Langs,
|
|
}
|
|
}
|
|
|
|
// Submit enqueues a job. Drops with a warning if the queue is full so the
|
|
// caller (mail intake) is never blocked.
|
|
func (w *Worker) Submit(mailID string, tenantID *int64) {
|
|
if mailID == "" {
|
|
return
|
|
}
|
|
select {
|
|
case w.queue <- Job{MailID: mailID, TenantID: tenantID}:
|
|
default:
|
|
w.logger.Warn("ocr worker: queue full, dropping job", "mail_id", mailID)
|
|
}
|
|
}
|
|
|
|
// QueueLen returns the current number of pending jobs.
|
|
func (w *Worker) QueueLen() int { return len(w.queue) }
|
|
|
|
// Start launches w.workers goroutines that consume the queue until Stop is
|
|
// called or ctx is cancelled.
|
|
func (w *Worker) Start(ctx context.Context) {
|
|
if !IsAvailable() {
|
|
w.logger.Warn("ocr worker: tesseract/pdftotext not on PATH — OCR disabled at runtime")
|
|
}
|
|
for i := 0; i < w.workers; i++ {
|
|
w.wg.Add(1)
|
|
go w.run(ctx, i)
|
|
}
|
|
w.logger.Info("ocr worker: started", "workers", w.workers, "queue", cap(w.queue))
|
|
}
|
|
|
|
// Stop drains the remaining queue and waits for all goroutines to exit.
|
|
func (w *Worker) Stop() {
|
|
close(w.done)
|
|
w.wg.Wait()
|
|
w.logger.Info("ocr worker: stopped")
|
|
}
|
|
|
|
func (w *Worker) run(ctx context.Context, id int) {
|
|
defer w.wg.Done()
|
|
for {
|
|
select {
|
|
case job, ok := <-w.queue:
|
|
if !ok {
|
|
return
|
|
}
|
|
w.process(ctx, job)
|
|
case <-w.done:
|
|
// Drain remaining jobs so nothing in-flight is lost.
|
|
for {
|
|
select {
|
|
case job, ok := <-w.queue:
|
|
if !ok {
|
|
return
|
|
}
|
|
w.process(ctx, job)
|
|
default:
|
|
return
|
|
}
|
|
}
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (w *Worker) process(ctx context.Context, job Job) {
|
|
// PROJ-44: The canonical source of truth for a mail's tenant assignment
|
|
// is emails.tenant_id in PostgreSQL — never the submitter's context.
|
|
// Re-imports via IMAP/POP3 scheduler may submit the same mail with a
|
|
// different (e.g. nil) tenant pointer than the one stored on the mail,
|
|
// which would otherwise route the OCR result into the wrong per-tenant
|
|
// Manticore index (emails_global instead of emails_tenant_N).
|
|
// We resolve the authoritative tenant once and use it everywhere below.
|
|
tenantID := job.TenantID
|
|
if w.store != nil {
|
|
if t, err := w.store.GetTenantForMail(ctx, job.MailID); err == nil && t != nil {
|
|
tenantID = t
|
|
}
|
|
}
|
|
logger := w.logger.With("mail_id", job.MailID, "tenant_id", tenantID)
|
|
|
|
if w.store.OCREnabled(ctx, tenantID) == false {
|
|
_ = w.store.SetOCRResult(ctx, job.MailID, "disabled", 0)
|
|
return
|
|
}
|
|
|
|
raw, err := w.store.Load(job.MailID)
|
|
if err != nil {
|
|
logger.Warn("ocr worker: load failed", "err", err)
|
|
_ = w.store.SetOCRResult(ctx, job.MailID, "failed", 0)
|
|
return
|
|
}
|
|
|
|
pm, err := mailparser.Parse(raw)
|
|
if err != nil {
|
|
logger.Warn("ocr worker: parse failed", "err", err)
|
|
_ = w.store.SetOCRResult(ctx, job.MailID, "failed", 0)
|
|
return
|
|
}
|
|
|
|
if len(pm.Attachments) == 0 {
|
|
_ = w.store.SetOCRResult(ctx, job.MailID, "skipped", 0)
|
|
return
|
|
}
|
|
|
|
var combined strings.Builder
|
|
processed := 0
|
|
for _, a := range pm.Attachments {
|
|
text, err := ExtractText(ctx, a.Data, a.ContentType, a.Filename, w.langs)
|
|
if err != nil {
|
|
if errors.Is(err, ErrUnsupported) || errors.Is(err, ErrEncrypted) ||
|
|
errors.Is(err, ErrTooLarge) || errors.Is(err, ErrUnavailable) {
|
|
logger.Debug("ocr worker: attachment skipped",
|
|
"filename", a.Filename, "reason", err)
|
|
continue
|
|
}
|
|
logger.Warn("ocr worker: extract failed",
|
|
"filename", a.Filename, "err", err)
|
|
continue
|
|
}
|
|
if t := strings.TrimSpace(text); t != "" {
|
|
combined.WriteString(t)
|
|
combined.WriteString("\n\n")
|
|
processed++
|
|
}
|
|
}
|
|
|
|
if processed == 0 {
|
|
_ = w.store.SetOCRResult(ctx, job.MailID, "skipped", 0)
|
|
return
|
|
}
|
|
|
|
idx := w.idxMgr.ForTenant(tenantID)
|
|
updater, ok := idx.(index.AttachmentTextUpdater)
|
|
if !ok {
|
|
logger.Warn("ocr worker: indexer does not support AttachmentTextUpdater — text dropped")
|
|
_ = w.store.SetOCRResult(ctx, job.MailID, "failed", 0)
|
|
return
|
|
}
|
|
if err := updater.UpdateAttachmentText(job.MailID, combined.String()); err != nil {
|
|
logger.Warn("ocr worker: index update failed", "err", err)
|
|
_ = w.store.SetOCRResult(ctx, job.MailID, "failed", 0)
|
|
return
|
|
}
|
|
|
|
chars := int64(combined.Len())
|
|
_ = w.store.SetOCRResult(ctx, job.MailID, "done", chars)
|
|
logger.Info("ocr worker: indexed", "attachments", processed, "chars", chars)
|
|
}
|