diff --git a/cmd/archivmail/cmd_import.go b/cmd/archivmail/cmd_import.go index c73ca7a..1b26dd9 100644 --- a/cmd/archivmail/cmd_import.go +++ b/cmd/archivmail/cmd_import.go @@ -286,6 +286,7 @@ Commands: reindex Index neu aufbauen (alle oder pro Mandant) recompress Bestehende Mails nachträglich gzip-komprimieren rethread Thread-IDs rückwirkend aus In-Reply-To/References befüllen + ocr-reprocess OCR für Anhänge nachholen (alle oder pro Mandant/Status) version Version anzeigen help Diese Hilfe anzeigen diff --git a/cmd/archivmail/cmd_ocr_reprocess.go b/cmd/archivmail/cmd_ocr_reprocess.go new file mode 100644 index 0000000..4e0d72f --- /dev/null +++ b/cmd/archivmail/cmd_ocr_reprocess.go @@ -0,0 +1,131 @@ +package main + +import ( + "context" + "flag" + "log/slog" + "os" + "time" + + "archivmail/config" + "archivmail/internal/index" + "archivmail/internal/ocr" + "archivmail/internal/storage" +) + +// runOCRReprocess re-runs OCR for selected mails. It loads matching IDs from +// the DB by ocr_status, queues them on the OCR worker, then waits for the +// worker to drain. +// +// Usage: +// +// archivmail ocr-reprocess --config /etc/archivmail/config.yml +// archivmail ocr-reprocess --tenant 1 --status failed +// archivmail ocr-reprocess --status all --limit 1000 +func runOCRReprocess(args []string) { + fs := flag.NewFlagSet("ocr-reprocess", flag.ExitOnError) + configPath := fs.String("config", "/etc/archivmail/config.yml", "path to config file") + tenantIDFlag := fs.Int64("tenant", 0, "tenant ID (0 = all tenants)") + statusFlag := fs.String("status", "pending", "ocr_status filter: pending|done|failed|skipped|disabled|all") + limitFlag := fs.Int("limit", 0, "max number of mails to process (0 = no limit)") + fs.Parse(args) + + logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo})) + + cfg, err := config.Load(*configPath) + if err != nil { + logger.Error("failed to load config", "err", err) + os.Exit(1) + } + + storeCfg := storage.Config{ + Dir: cfg.Storage.StorePath, + Keyfile: cfg.Storage.Keyfile, + DSN: cfg.Database.DSN(), + CompressEnabled: cfg.Storage.Compress, + } + mailStore, err := storage.New(storeCfg) + if err != nil { + logger.Error("storage init failed", "err", err) + os.Exit(1) + } + defer mailStore.Close() + + indexBackend := cfg.Index.Backend + if indexBackend == "" { + indexBackend = "manticore" + } + if indexBackend != "manticore" { + logger.Error("ocr-reprocess requires the manticore backend", "configured", indexBackend) + os.Exit(1) + } + dsn := cfg.Index.ManticoreDSN + if dsn == "" { + dsn = "manticore@tcp(127.0.0.1:9306)/" + } + idxMgr, err := index.NewManticoreTenantManager(dsn) + if err != nil { + logger.Error("manticore init failed", "err", err) + os.Exit(1) + } + defer idxMgr.Close() + + if !ocr.IsAvailable() { + ts := ocr.CheckTools() + logger.Warn("ocr tools not on PATH — install tesseract-ocr + poppler-utils", + "pdftotext", ts.HasPdftotext, "tesseract", ts.HasTesseract, "pdftoppm", ts.HasPdftoppm) + } + + ctx := context.Background() + + var tenantPtr *int64 + if *tenantIDFlag > 0 { + t := *tenantIDFlag + tenantPtr = &t + } + + mails, err := mailStore.GetMailsByOCRStatus(ctx, *statusFlag, tenantPtr, *limitFlag) + if err != nil { + logger.Error("failed to list mails", "err", err) + os.Exit(1) + } + + logger.Info("ocr-reprocess: starting", + "status", *statusFlag, "tenant", *tenantIDFlag, "count", len(mails)) + if len(mails) == 0 { + return + } + + // Queue size needs to fit the entire batch so Submit never drops. + qSize := len(mails) + 16 + worker := ocr.NewWorker(mailStore, idxMgr, ocr.Options{ + Workers: 2, + QueueSize: qSize, + Logger: logger, + }) + worker.Start(ctx) + + // Reset status to 'pending' before submission so the worker actually picks + // them up — for callers that want to reprocess 'failed'/'skipped' mails. + if *statusFlag != "pending" && *statusFlag != "all" { + for _, m := range mails { + _ = mailStore.SetOCRStatus(ctx, m.ID, "pending") + } + } + + for _, m := range mails { + worker.Submit(m.ID, m.TenantID) + } + + // Periodic progress while waiting for the queue to drain. + tick := time.NewTicker(5 * time.Second) + defer tick.Stop() + go func() { + for range tick.C { + logger.Info("ocr-reprocess: progress", "queue_remaining", worker.QueueLen()) + } + }() + + worker.Stop() // waits for all in-flight jobs + logger.Info("ocr-reprocess: complete", "submitted", len(mails)) +} diff --git a/cmd/archivmail/main.go b/cmd/archivmail/main.go index 7cc2162..b70c990 100644 --- a/cmd/archivmail/main.go +++ b/cmd/archivmail/main.go @@ -29,6 +29,7 @@ import ( "archivmail/internal/index" ldapcfg "archivmail/internal/ldapconfig" "archivmail/internal/mailer" + "archivmail/internal/ocr" pop3store "archivmail/internal/pop3" "archivmail/internal/smtpoutconfig" "archivmail/internal/smtpd" @@ -63,6 +64,9 @@ func main() { case "rethread": runRethread(os.Args[2:]) return + case "ocr-reprocess": + runOCRReprocess(os.Args[2:]) + return case "version": fmt.Printf("archivmail %s\n", AppVersion) for mod, ver := range Modules { @@ -167,6 +171,39 @@ func main() { tenantWorker.Start() defer tenantWorker.Stop() + // PROJ-35: OCR-Worker — extracts text from PDF/image attachments and feeds + // it back into the per-tenant Manticore index. Non-blocking submit so the + // mail intake pipeline is never delayed. + ocrWorker := ocr.NewWorker(mailStore, idxMgr, ocr.Options{ + Workers: 2, + QueueSize: 1000, + Logger: logger, + }) + ocrWorker.Start(context.Background()) + defer ocrWorker.Stop() + if !ocr.IsAvailable() { + ts := ocr.CheckTools() + logger.Warn("ocr tools not fully available — install tesseract-ocr + poppler-utils for full OCR support", + "pdftotext", ts.HasPdftotext, "tesseract", ts.HasTesseract, "pdftoppm", ts.HasPdftoppm) + } + + // Boot-resume: re-enqueue all mails still marked ocr_status='pending'. + go func() { + ctx := context.Background() + pending, err := mailStore.GetPendingOCRMails(ctx, nil, 5000) + if err != nil { + logger.Warn("ocr boot-resume: query failed", "err", err) + return + } + if len(pending) == 0 { + return + } + logger.Info("ocr boot-resume: re-enqueueing pending jobs", "count", len(pending)) + for _, m := range pending { + ocrWorker.Submit(m.ID, m.TenantID) + } + }() + // User store users, err := userstore.New(cfg.Database.DSN()) if err != nil { @@ -299,7 +336,7 @@ func main() { smtpDaemon.SetIndexCallback(func(raw []byte, id string) { // Look up the tenant_id for this email from DB metadata. tenantID, _ := mailStore.GetTenantForMail(context.Background(), id) - submitToWorker(tenantWorker, mailStore, raw, id, tenantID, logger) + submitToWorker(tenantWorker, mailStore, raw, id, tenantID, logger, ocrWorker) }) // Wire tenant routing into SMTP daemon if cfg.SMTP.TenantRouting == "domain" { @@ -363,7 +400,7 @@ func main() { srv.SetPop3(pop3St, pop3Imp) // Backfill in background: migrate existing files into DB metadata + re-index - go runBackfill(context.Background(), mailStore, idx, tenantWorker, logger) + go runBackfill(context.Background(), mailStore, idx, tenantWorker, logger, ocrWorker) // Background integrity verification — runs every 5 minutes go runIntegrityCheck(context.Background(), mailStore, logger) @@ -389,7 +426,9 @@ func main() { // submitToWorker parses a raw email and submits it to the async index worker. // tenantID may be nil for global context. -func submitToWorker(worker *index.TenantIndexWorker, store *storage.Store, raw []byte, id string, tenantID *int64, logger *slog.Logger) { +// If ocrWorker is non-nil and the mail has attachments, an OCR job is also +// queued (non-blocking). +func submitToWorker(worker *index.TenantIndexWorker, store *storage.Store, raw []byte, id string, tenantID *int64, logger *slog.Logger, ocrWorker *ocr.Worker) { pm, err := mailparser.Parse(raw) if err != nil { logger.Warn("index: parse failed, skipping indexing", "id", id, "err", err) @@ -422,12 +461,19 @@ func submitToWorker(worker *index.TenantIndexWorker, store *storage.Store, raw [ if err := store.SetIndexedAt(context.Background(), id); err != nil { logger.Warn("index: set indexed_at failed", "id", id, "err", err) } + + // PROJ-35: hand off to OCR worker for asynchronous attachment processing. + if ocrWorker != nil && len(pm.Attachments) > 0 { + ocrWorker.Submit(id, tenantID) + } } // runBackfill walks the store, inserts missing DB metadata, and indexes // emails that have not yet been indexed. Per-tenant indexing is handled by // looking up each email's tenant_id from the DB. -func runBackfill(ctx context.Context, store *storage.Store, idx index.Indexer, worker *index.TenantIndexWorker, logger *slog.Logger) { +// ocrWorker is optional; when non-nil, mails with attachments are also +// queued for OCR processing. +func runBackfill(ctx context.Context, store *storage.Store, idx index.Indexer, worker *index.TenantIndexWorker, logger *slog.Logger, ocrWorker *ocr.Worker) { logger.Info("backfill: starting") count := 0 @@ -465,7 +511,7 @@ func runBackfill(ctx context.Context, store *storage.Store, idx index.Indexer, w if !alreadyIndexed { needIndex++ tenantID, _ := store.GetTenantForMail(ctx, id) - submitToWorker(worker, store, raw, id, tenantID, logger) + submitToWorker(worker, store, raw, id, tenantID, logger, ocrWorker) } if count%100 == 0 { diff --git a/internal/index/index.go b/internal/index/index.go index 73654e8..faadef8 100644 --- a/internal/index/index.go +++ b/internal/index/index.go @@ -7,16 +7,17 @@ import ( // MailDocument is the indexed representation of a stored email. type MailDocument struct { - ID string - From string - To string - Subject string - Body string - AttachNames string - HasAttachment bool - Date time.Time - Size int64 - TenantID *int64 // nil = global / superadmin context + ID string + From string + To string + Subject string + Body string + AttachNames string + AttachmentText string // PROJ-35: OCR-extracted text from PDF/image attachments + HasAttachment bool + Date time.Time + Size int64 + TenantID *int64 // nil = global / superadmin context } // SearchRequest specifies search parameters. @@ -53,6 +54,15 @@ type Indexer interface { Close() error } +// AttachmentTextUpdater is implemented by indexers that support partial +// updates of the OCR-extracted attachment text. Optional add-on to Indexer: +// callers should type-assert and degrade gracefully if not supported. +// +// PROJ-35: Manticore implements this; legacy Xapian does not. +type AttachmentTextUpdater interface { + UpdateAttachmentText(mailID, text string) error +} + // TenantIndexer manages per-tenant Indexer instances. // Implemented by ManticoreTenantManager (primary) and TenantIndexManager (legacy Xapian). type TenantIndexer interface { diff --git a/internal/index/manticore.go b/internal/index/manticore.go index 2d51a0f..a6c2686 100644 --- a/internal/index/manticore.go +++ b/internal/index/manticore.go @@ -105,7 +105,8 @@ func (m *ManticoreTenantManager) Close() error { // ── manticoreIndex methods ──────────────────────────────────────────────── -// ensureTable creates the RT index if it does not yet exist. +// ensureTable creates the RT index if it does not yet exist and applies +// idempotent column additions for schema migrations. func (idx *manticoreIndex) ensureTable() error { stmt := fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s ( mail_id string, @@ -114,14 +115,46 @@ func (idx *manticoreIndex) ensureTable() error { to_addr text, body text, attachment_names text, + attachment_text text, has_attachment uint, date_ts bigint, size_bytes bigint ) type='rt' morphology='stem_en,lemmatize_de_all'`, idx.table) - _, err := idx.db.Exec(stmt) - if err != nil { + if _, err := idx.db.Exec(stmt); err != nil { return fmt.Errorf("ensureTable %s: %w", idx.table, err) } + // PROJ-35: ALTER existing tables to add attachment_text. Manticore lacks + // ALTER IF NOT EXISTS, so we DESC first and only add when missing. + if err := idx.ensureColumn("attachment_text", "text"); err != nil { + return err + } + return nil +} + +// ensureColumn checks DESC for the named column and adds it via +// ALTER TABLE when missing. Safe to call repeatedly. +func (idx *manticoreIndex) ensureColumn(name, typ string) error { + rows, err := idx.db.Query(fmt.Sprintf("DESC %s", idx.table)) + if err != nil { + return fmt.Errorf("desc %s: %w", idx.table, err) + } + defer rows.Close() + for rows.Next() { + var field, fieldType string + var props sql.NullString + if err := rows.Scan(&field, &fieldType, &props); err != nil { + return fmt.Errorf("desc scan %s: %w", idx.table, err) + } + if field == name { + return nil + } + } + if err := rows.Err(); err != nil { + return fmt.Errorf("desc rows %s: %w", idx.table, err) + } + if _, err := idx.db.Exec(fmt.Sprintf("ALTER TABLE %s ADD COLUMN %s %s", idx.table, name, typ)); err != nil { + return fmt.Errorf("alter %s add %s: %w", idx.table, name, err) + } return nil } @@ -139,8 +172,8 @@ func (idx *manticoreIndex) IndexSync(doc MailDocument) error { _, err := idx.db.Exec( fmt.Sprintf(`REPLACE INTO %s - (id, mail_id, subject, from_addr, to_addr, body, attachment_names, has_attachment, date_ts, size_bytes) - VALUES (?,?,?,?,?,?,?,?,?,?)`, idx.table), + (id, mail_id, subject, from_addr, to_addr, body, attachment_names, attachment_text, has_attachment, date_ts, size_bytes) + VALUES (?,?,?,?,?,?,?,?,?,?,?)`, idx.table), rowID, doc.ID, doc.Subject, @@ -148,6 +181,7 @@ func (idx *manticoreIndex) IndexSync(doc MailDocument) error { doc.To, doc.Body, doc.AttachNames, + doc.AttachmentText, hasAttach, dateTS, doc.Size, @@ -158,6 +192,40 @@ func (idx *manticoreIndex) IndexSync(doc MailDocument) error { return nil } +// UpdateAttachmentText partially updates only the attachment_text field of an +// already-indexed document. Implements index.AttachmentTextUpdater. +// +// Manticore RT indexes do not support UPDATE on text columns, so this +// re-fetches the full row and issues a REPLACE INTO with all fields preserved +// and attachment_text overwritten. Returns sql.ErrNoRows-style nil result if +// the document is not yet indexed (mail must be ingested first). +func (idx *manticoreIndex) UpdateAttachmentText(mailID, text string) error { + rowID := hashMailID(mailID) + row := idx.db.QueryRow(fmt.Sprintf( + `SELECT mail_id, subject, from_addr, to_addr, body, attachment_names, + has_attachment, date_ts, size_bytes + FROM %s WHERE id = ? LIMIT 1`, idx.table), + rowID, + ) + var ( + mid, subj, from, to, body, attachNames string + hasAttach uint64 + dateTS, sizeBytes int64 + ) + if err := row.Scan(&mid, &subj, &from, &to, &body, &attachNames, &hasAttach, &dateTS, &sizeBytes); err != nil { + return fmt.Errorf("manticore UpdateAttachmentText %s: load row: %w", idx.table, err) + } + if _, err := idx.db.Exec( + fmt.Sprintf(`REPLACE INTO %s + (id, mail_id, subject, from_addr, to_addr, body, attachment_names, attachment_text, has_attachment, date_ts, size_bytes) + VALUES (?,?,?,?,?,?,?,?,?,?,?)`, idx.table), + rowID, mid, subj, from, to, body, attachNames, text, hasAttach, dateTS, sizeBytes, + ); err != nil { + return fmt.Errorf("manticore UpdateAttachmentText %s: replace: %w", idx.table, err) + } + return nil +} + // Delete removes a document by mail ID hash. func (idx *manticoreIndex) Delete(id string) error { rowID := hashMailID(id) diff --git a/internal/ocr/ocr.go b/internal/ocr/ocr.go new file mode 100644 index 0000000..047ef53 --- /dev/null +++ b/internal/ocr/ocr.go @@ -0,0 +1,265 @@ +// Package ocr extracts plain text from email attachments (PDFs and images) +// using locally installed tools — pdftotext (poppler-utils) and tesseract. +// +// PROJ-35: All extraction happens on the host. No external services. +package ocr + +import ( + "bytes" + "context" + "errors" + "fmt" + "os" + "os/exec" + "path/filepath" + "strings" + "time" +) + +const ( + // MaxAttachmentSize skips files larger than this (50 MiB per spec). + MaxAttachmentSize = 50 * 1024 * 1024 + // DefaultTimeout caps a single OCR run. + DefaultTimeout = 60 * time.Second + // pdfMinTextLen below this we treat the PDF as a scan and fall back to OCR. + pdfMinTextLen = 32 +) + +// Errors returned by Extract. +var ( + ErrUnsupported = errors.New("ocr: unsupported attachment type") + ErrTooLarge = errors.New("ocr: attachment exceeds size limit") + ErrEncrypted = errors.New("ocr: attachment is password protected") + ErrUnavailable = errors.New("ocr: tesseract or pdftotext not available") +) + +// IsAvailable reports whether at least one OCR tool is on PATH. +// pdftotext alone enables text extraction from native PDFs; +// tesseract enables OCR for images and scanned PDFs. +func IsAvailable() bool { + _, errPDF := exec.LookPath("pdftotext") + _, errTess := exec.LookPath("tesseract") + return errPDF == nil || errTess == nil +} + +// ToolStatus reports which tools are present on the host. +type ToolStatus struct { + HasPdftotext bool + HasTesseract bool + HasPdftoppm bool +} + +// CheckTools probes the system for the supporting binaries. +func CheckTools() ToolStatus { + _, errPDF := exec.LookPath("pdftotext") + _, errTess := exec.LookPath("tesseract") + _, errPpm := exec.LookPath("pdftoppm") + return ToolStatus{ + HasPdftotext: errPDF == nil, + HasTesseract: errTess == nil, + HasPdftoppm: errPpm == nil, + } +} + +// ExtractText extracts plain text from a single attachment. +// contentType is the MIME type (lowercased), filename is used to derive an +// extension when contentType is missing or generic. langs is the Tesseract +// language list (e.g. ["deu","eng"]). +// +// Returns: +// - extracted text (may be empty when nothing was found) +// - ErrUnsupported when the format cannot be processed +// - ErrTooLarge when data exceeds MaxAttachmentSize +// - ErrEncrypted when the PDF is password-protected +// - other errors from the underlying tool +func ExtractText(ctx context.Context, data []byte, contentType, filename string, langs []string) (string, error) { + if len(data) == 0 { + return "", nil + } + if len(data) > MaxAttachmentSize { + return "", ErrTooLarge + } + + kind := classify(contentType, filename) + switch kind { + case kindPDF: + return extractPDF(ctx, data, langs) + case kindImage: + return extractImage(ctx, data, filename, langs) + default: + return "", ErrUnsupported + } +} + +// ── classification ──────────────────────────────────────────────────────── + +type fileKind int + +const ( + kindUnknown fileKind = iota + kindPDF + kindImage +) + +func classify(contentType, filename string) fileKind { + ct := strings.ToLower(strings.TrimSpace(contentType)) + if i := strings.Index(ct, ";"); i >= 0 { + ct = strings.TrimSpace(ct[:i]) + } + + switch ct { + case "application/pdf", "application/x-pdf": + return kindPDF + case "image/jpeg", "image/jpg", "image/png", "image/tiff", + "image/x-tiff", "image/bmp", "image/x-bmp", "image/webp": + return kindImage + } + + ext := strings.ToLower(filepath.Ext(filename)) + switch ext { + case ".pdf": + return kindPDF + case ".jpg", ".jpeg", ".png", ".tif", ".tiff", ".bmp", ".webp": + return kindImage + } + return kindUnknown +} + +// ── PDF extraction (pdftotext → tesseract fallback) ─────────────────────── + +func extractPDF(ctx context.Context, data []byte, langs []string) (string, error) { + tmp, err := os.MkdirTemp("", "archivmail-ocr-*") + if err != nil { + return "", fmt.Errorf("ocr: tempdir: %w", err) + } + defer os.RemoveAll(tmp) + + pdfPath := filepath.Join(tmp, "in.pdf") + if err := os.WriteFile(pdfPath, data, 0o600); err != nil { + return "", fmt.Errorf("ocr: write pdf: %w", err) + } + + if _, err := exec.LookPath("pdftotext"); err == nil { + text, ptErr := runPdftotext(ctx, pdfPath) + if ptErr != nil { + if errors.Is(ptErr, ErrEncrypted) { + return "", ErrEncrypted + } + // Fall through to OCR fallback on other errors. + } + if len(strings.TrimSpace(text)) >= pdfMinTextLen { + return text, nil + } + } + + if _, err := exec.LookPath("tesseract"); err != nil { + return "", ErrUnavailable + } + if _, err := exec.LookPath("pdftoppm"); err != nil { + return "", ErrUnavailable + } + + return ocrPDFViaImages(ctx, tmp, pdfPath, langs) +} + +func runPdftotext(ctx context.Context, pdfPath string) (string, error) { + cctx, cancel := withDefaultTimeout(ctx) + defer cancel() + + var out, errBuf bytes.Buffer + cmd := exec.CommandContext(cctx, "pdftotext", "-layout", "-q", pdfPath, "-") + cmd.Stdout = &out + cmd.Stderr = &errBuf + if err := cmd.Run(); err != nil { + stderr := errBuf.String() + if strings.Contains(strings.ToLower(stderr), "incorrect password") || + strings.Contains(strings.ToLower(stderr), "encrypted") { + return "", ErrEncrypted + } + return "", fmt.Errorf("ocr: pdftotext: %w (%s)", err, strings.TrimSpace(stderr)) + } + return out.String(), nil +} + +func ocrPDFViaImages(ctx context.Context, dir, pdfPath string, langs []string) (string, error) { + cctx, cancel := withDefaultTimeout(ctx) + defer cancel() + + prefix := filepath.Join(dir, "page") + cmd := exec.CommandContext(cctx, "pdftoppm", "-r", "200", "-png", pdfPath, prefix) + if err := cmd.Run(); err != nil { + return "", fmt.Errorf("ocr: pdftoppm: %w", err) + } + + pages, err := filepath.Glob(prefix + "-*.png") + if err != nil || len(pages) == 0 { + return "", fmt.Errorf("ocr: pdftoppm produced no pages") + } + + var combined strings.Builder + for _, p := range pages { + text, err := tesseractFile(cctx, p, langs) + if err != nil { + continue + } + combined.WriteString(text) + combined.WriteString("\n") + } + return combined.String(), nil +} + +// ── image extraction ────────────────────────────────────────────────────── + +func extractImage(ctx context.Context, data []byte, filename string, langs []string) (string, error) { + if _, err := exec.LookPath("tesseract"); err != nil { + return "", ErrUnavailable + } + tmp, err := os.MkdirTemp("", "archivmail-ocr-*") + if err != nil { + return "", fmt.Errorf("ocr: tempdir: %w", err) + } + defer os.RemoveAll(tmp) + + ext := filepath.Ext(filename) + if ext == "" { + ext = ".bin" + } + in := filepath.Join(tmp, "img"+ext) + if err := os.WriteFile(in, data, 0o600); err != nil { + return "", fmt.Errorf("ocr: write image: %w", err) + } + cctx, cancel := withDefaultTimeout(ctx) + defer cancel() + return tesseractFile(cctx, in, langs) +} + +func tesseractFile(ctx context.Context, path string, langs []string) (string, error) { + args := []string{path, "stdout"} + if l := joinLangs(langs); l != "" { + args = append(args, "-l", l) + } + args = append(args, "--psm", "3", "-c", "preserve_interword_spaces=1") + + var out, errBuf bytes.Buffer + cmd := exec.CommandContext(ctx, "tesseract", args...) + cmd.Stdout = &out + cmd.Stderr = &errBuf + if err := cmd.Run(); err != nil { + return "", fmt.Errorf("ocr: tesseract: %w (%s)", err, strings.TrimSpace(errBuf.String())) + } + return out.String(), nil +} + +func joinLangs(langs []string) string { + if len(langs) == 0 { + return "deu+eng" + } + return strings.Join(langs, "+") +} + +func withDefaultTimeout(ctx context.Context) (context.Context, context.CancelFunc) { + if _, ok := ctx.Deadline(); ok { + return ctx, func() {} + } + return context.WithTimeout(ctx, DefaultTimeout) +} diff --git a/internal/ocr/worker.go b/internal/ocr/worker.go new file mode 100644 index 0000000..f6d0787 --- /dev/null +++ b/internal/ocr/worker.go @@ -0,0 +1,205 @@ +package ocr + +import ( + "context" + "errors" + "log/slog" + "strings" + "sync" + + "archivmail/internal/index" + "archivmail/internal/storage" + "archivmail/pkg/mailparser" +) + +// Job describes one OCR work unit — extract text from all attachments of a +// stored mail and feed the result back into the per-tenant index. +type Job struct { + MailID string + TenantID *int64 +} + +// Worker runs OCR jobs on a buffered channel using N background goroutines. +// +// Lifecycle: NewWorker → Start(ctx) → Submit(...) (n times) → Stop(). +// Submit is non-blocking; jobs are dropped when the queue is full. +type Worker struct { + store *storage.Store + idxMgr index.TenantIndexer + logger *slog.Logger + queue chan Job + done chan struct{} + wg sync.WaitGroup + workers int + langs []string +} + +// Options configures a Worker. Zero values are replaced with sensible defaults. +type Options struct { + QueueSize int // default 1000 + Workers int // default 2 + Langs []string // default ["deu", "eng"] + Logger *slog.Logger +} + +// NewWorker constructs a worker that reads mails from store, runs OCR on +// supported attachments, and pushes the combined text into the per-tenant +// Manticore index via idxMgr. The store and idxMgr must be non-nil. +func NewWorker(store *storage.Store, idxMgr index.TenantIndexer, opts Options) *Worker { + if opts.QueueSize <= 0 { + opts.QueueSize = 1000 + } + if opts.Workers <= 0 { + opts.Workers = 2 + } + if len(opts.Langs) == 0 { + opts.Langs = []string{"deu", "eng"} + } + if opts.Logger == nil { + opts.Logger = slog.Default() + } + return &Worker{ + store: store, + idxMgr: idxMgr, + logger: opts.Logger, + queue: make(chan Job, opts.QueueSize), + done: make(chan struct{}), + workers: opts.Workers, + langs: opts.Langs, + } +} + +// Submit enqueues a job. Drops with a warning if the queue is full so the +// caller (mail intake) is never blocked. +func (w *Worker) Submit(mailID string, tenantID *int64) { + if mailID == "" { + return + } + select { + case w.queue <- Job{MailID: mailID, TenantID: tenantID}: + default: + w.logger.Warn("ocr worker: queue full, dropping job", "mail_id", mailID) + } +} + +// QueueLen returns the current number of pending jobs. +func (w *Worker) QueueLen() int { return len(w.queue) } + +// Start launches w.workers goroutines that consume the queue until Stop is +// called or ctx is cancelled. +func (w *Worker) Start(ctx context.Context) { + if !IsAvailable() { + w.logger.Warn("ocr worker: tesseract/pdftotext not on PATH — OCR disabled at runtime") + } + for i := 0; i < w.workers; i++ { + w.wg.Add(1) + go w.run(ctx, i) + } + w.logger.Info("ocr worker: started", "workers", w.workers, "queue", cap(w.queue)) +} + +// Stop drains the remaining queue and waits for all goroutines to exit. +func (w *Worker) Stop() { + close(w.done) + w.wg.Wait() + w.logger.Info("ocr worker: stopped") +} + +func (w *Worker) run(ctx context.Context, id int) { + defer w.wg.Done() + for { + select { + case job, ok := <-w.queue: + if !ok { + return + } + w.process(ctx, job) + case <-w.done: + // Drain remaining jobs so nothing in-flight is lost. + for { + select { + case job, ok := <-w.queue: + if !ok { + return + } + w.process(ctx, job) + default: + return + } + } + case <-ctx.Done(): + return + } + } +} + +func (w *Worker) process(ctx context.Context, job Job) { + logger := w.logger.With("mail_id", job.MailID, "tenant_id", job.TenantID) + + if w.store.OCREnabled(ctx, job.TenantID) == false { + _ = w.store.SetOCRStatus(ctx, job.MailID, "disabled") + return + } + + raw, err := w.store.Load(job.MailID) + if err != nil { + logger.Warn("ocr worker: load failed", "err", err) + _ = w.store.SetOCRStatus(ctx, job.MailID, "failed") + return + } + + pm, err := mailparser.Parse(raw) + if err != nil { + logger.Warn("ocr worker: parse failed", "err", err) + _ = w.store.SetOCRStatus(ctx, job.MailID, "failed") + return + } + + if len(pm.Attachments) == 0 { + _ = w.store.SetOCRStatus(ctx, job.MailID, "skipped") + return + } + + var combined strings.Builder + processed := 0 + for _, a := range pm.Attachments { + text, err := ExtractText(ctx, a.Data, a.ContentType, a.Filename, w.langs) + if err != nil { + if errors.Is(err, ErrUnsupported) || errors.Is(err, ErrEncrypted) || + errors.Is(err, ErrTooLarge) || errors.Is(err, ErrUnavailable) { + logger.Debug("ocr worker: attachment skipped", + "filename", a.Filename, "reason", err) + continue + } + logger.Warn("ocr worker: extract failed", + "filename", a.Filename, "err", err) + continue + } + if t := strings.TrimSpace(text); t != "" { + combined.WriteString(t) + combined.WriteString("\n\n") + processed++ + } + } + + if processed == 0 { + _ = w.store.SetOCRStatus(ctx, job.MailID, "skipped") + return + } + + idx := w.idxMgr.ForTenant(job.TenantID) + updater, ok := idx.(index.AttachmentTextUpdater) + if !ok { + logger.Warn("ocr worker: indexer does not support AttachmentTextUpdater — text dropped") + _ = w.store.SetOCRStatus(ctx, job.MailID, "failed") + return + } + if err := updater.UpdateAttachmentText(job.MailID, combined.String()); err != nil { + logger.Warn("ocr worker: index update failed", "err", err) + _ = w.store.SetOCRStatus(ctx, job.MailID, "failed") + return + } + + _ = w.store.SetOCRStatus(ctx, job.MailID, "done") + logger.Info("ocr worker: indexed", "attachments", processed, "chars", combined.Len()) +} diff --git a/internal/storage/ocr.go b/internal/storage/ocr.go new file mode 100644 index 0000000..7077827 --- /dev/null +++ b/internal/storage/ocr.go @@ -0,0 +1,154 @@ +package storage + +import ( + "context" + "errors" + "fmt" + + "github.com/jackc/pgx/v5" +) + +// PROJ-35: OCR-Status-Verwaltung pro Mail. +// Status-Werte: pending | done | failed | skipped | disabled + +// PendingOCRMail describes one entry in the OCR backlog. TenantID is nil +// when the mail has no tenant assignment (system-level / global). +type PendingOCRMail struct { + ID string + TenantID *int64 +} + +// SetOCRStatus updates emails.ocr_status for one mail. +// Valid values: pending | done | failed | skipped | disabled. +// Silently no-ops when no DB is configured. +func (s *Store) SetOCRStatus(ctx context.Context, id, status string) error { + if s.db == nil { + return nil + } + if id == "" { + return errors.New("storage: SetOCRStatus: empty id") + } + switch status { + case "pending", "done", "failed", "skipped", "disabled": + default: + return fmt.Errorf("storage: SetOCRStatus: invalid status %q", status) + } + _, err := s.db.Exec(ctx, `UPDATE emails SET ocr_status = $1 WHERE id = $2`, status, id) + if err != nil { + return fmt.Errorf("storage: set ocr status: %w", err) + } + return nil +} + +// OCREnabled reports whether OCR processing should run for the given tenant. +// Defaults to true when: +// - no DB is configured (DB-less mode) +// - tenantID is nil (global / no tenant) +// - the tenants row cannot be read (degrade gracefully) +// +// OCR is opt-out, not opt-in. +func (s *Store) OCREnabled(ctx context.Context, tenantID *int64) bool { + if s.db == nil || tenantID == nil { + return true + } + var enabled bool + err := s.db.QueryRow(ctx, + `SELECT COALESCE(ocr_enabled, TRUE) FROM tenants WHERE id = $1`, + *tenantID, + ).Scan(&enabled) + if errors.Is(err, pgx.ErrNoRows) { + return true + } + if err != nil { + // On unexpected errors (e.g. column missing on a freshly-initialised DB), + // default to enabled so OCR remains opt-out. + return true + } + return enabled +} + +// GetPendingOCRMails returns up to limit mails with ocr_status='pending'. +// If tenantID != nil, results are restricted to mails referenced by that +// tenant (via email_refs) or whose emails.tenant_id matches. +// limit <= 0 means no limit. +func (s *Store) GetPendingOCRMails(ctx context.Context, tenantID *int64, limit int) ([]PendingOCRMail, error) { + return s.GetMailsByOCRStatus(ctx, "pending", tenantID, limit) +} + +// GetMailsByOCRStatus returns mails matching a specific ocr_status filter. +// status="all" returns every mail regardless of status. +// If tenantID != nil, results are restricted to that tenant. +// limit <= 0 means no limit. +func (s *Store) GetMailsByOCRStatus(ctx context.Context, status string, tenantID *int64, limit int) ([]PendingOCRMail, error) { + if s.db == nil { + return nil, nil + } + + args := []interface{}{} + whereParts := []string{} + + if status != "" && status != "all" { + args = append(args, status) + whereParts = append(whereParts, fmt.Sprintf("COALESCE(e.ocr_status, 'pending') = $%d", len(args))) + } + + joinClause := "" + if tenantID != nil { + args = append(args, *tenantID) + joinClause = fmt.Sprintf( + "LEFT JOIN email_refs r ON r.email_id = e.id AND r.tenant_id = $%d", len(args), + ) + whereParts = append(whereParts, + fmt.Sprintf("(r.tenant_id = $%d OR e.tenant_id = $%d)", len(args), len(args)), + ) + } + + whereClause := "" + if len(whereParts) > 0 { + whereClause = "WHERE " + joinAnd(whereParts) + } + + limitClause := "" + if limit > 0 { + args = append(args, limit) + limitClause = fmt.Sprintf("LIMIT $%d", len(args)) + } + + q := fmt.Sprintf(` + SELECT e.id, e.tenant_id + FROM emails e + %s + %s + ORDER BY e.received_at DESC + %s + `, joinClause, whereClause, limitClause) + + rows, err := s.db.Query(ctx, q, args...) + if err != nil { + return nil, fmt.Errorf("storage: get mails by ocr status: %w", err) + } + defer rows.Close() + + var out []PendingOCRMail + for rows.Next() { + var m PendingOCRMail + if err := rows.Scan(&m.ID, &m.TenantID); err != nil { + continue + } + out = append(out, m) + } + return out, rows.Err() +} + +// joinAnd joins parts with " AND " — small helper to avoid pulling in strings +// for a single use. +func joinAnd(parts []string) string { + out := "" + for i, p := range parts { + if i > 0 { + out += " AND " + } + out += p + } + return out +} diff --git a/internal/storage/storage.go b/internal/storage/storage.go index fe8923f..0451d99 100644 --- a/internal/storage/storage.go +++ b/internal/storage/storage.go @@ -308,6 +308,15 @@ func (s *Store) initSchema(ctx context.Context) error { ); CREATE INDEX IF NOT EXISTS idx_saved_searches_user ON saved_searches(user_id, tenant_id); `) + if err != nil { + return err + } + + // PROJ-35: OCR-Status pro Mail (idempotent) + _, err = s.db.Exec(ctx, ` + ALTER TABLE emails ADD COLUMN IF NOT EXISTS ocr_status TEXT DEFAULT 'pending'; + CREATE INDEX IF NOT EXISTS idx_emails_ocr_status ON emails (ocr_status) WHERE ocr_status = 'pending'; + `) return err } @@ -1055,8 +1064,8 @@ func (s *Store) GetMailsWithUID(ctx context.Context, tenantID *int64) ([]MailWit return result, rows.Err() } -// GetMailsByRecipient returns mails where mail_to contains the given email address. -// Used for personal IMAP mode filtering. +// GetMailsByRecipient returns mails where mail_to or mail_from contains the given email address. +// Used for personal IMAP mode filtering — includes both received and sent mails. func (s *Store) GetMailsByRecipient(ctx context.Context, tenantID *int64, email string) ([]MailWithUID, error) { if s.db == nil || email == "" { return nil, nil @@ -1066,15 +1075,15 @@ func (s *Store) GetMailsByRecipient(ctx context.Context, tenantID *int64, email var err error if tenantID == nil { rows, err = s.db.Query(ctx, - `SELECT id, COALESCE(uid, 0) FROM emails WHERE mail_to ILIKE $1 ORDER BY uid ASC NULLS LAST`, + `SELECT id, COALESCE(uid, 0) FROM emails WHERE mail_to ILIKE $1 OR mail_from ILIKE $1 ORDER BY uid ASC NULLS LAST`, pattern) } else { rows, err = s.db.Query(ctx, ` - SELECT e.id, COALESCE(e.uid, 0) + SELECT DISTINCT e.id, COALESCE(e.uid, 0) FROM email_refs r JOIN emails e ON e.id = r.email_id WHERE r.tenant_id = $1 - AND e.mail_to ILIKE $2 + AND (e.mail_to ILIKE $2 OR e.mail_from ILIKE $2) ORDER BY e.uid ASC NULLS LAST`, *tenantID, pattern) } if err != nil { diff --git a/internal/tenantstore/store.go b/internal/tenantstore/store.go index a832836..d8b24fd 100644 --- a/internal/tenantstore/store.go +++ b/internal/tenantstore/store.go @@ -84,6 +84,7 @@ ALTER TABLE tenants ADD COLUMN IF NOT EXISTS retention_days INT NOT NULL DEFAULT ALTER TABLE tenants ADD COLUMN IF NOT EXISTS max_storage_bytes BIGINT; ALTER TABLE tenants ADD COLUMN IF NOT EXISTS max_users INT; ALTER TABLE tenants ADD COLUMN IF NOT EXISTS max_emails BIGINT; +ALTER TABLE tenants ADD COLUMN IF NOT EXISTS ocr_enabled BOOLEAN NOT NULL DEFAULT TRUE; ` // New connects to PostgreSQL and initialises the tenant schema. diff --git a/update.sh b/update.sh index caf28f9..760e222 100755 --- a/update.sh +++ b/update.sh @@ -77,6 +77,17 @@ if systemctl list-unit-files manticore.service >/dev/null 2>&1; then systemctl is-active --quiet manticore && log "Manticore Search läuft" fi +# ── OCR-Tools (PROJ-35: tesseract + poppler) ────────────────────────────── +# Optional: ohne diese Tools fällt OCR auf "deaktiviert" zurück, kein Abbruch. +if ! command -v tesseract >/dev/null 2>&1 || ! command -v pdftotext >/dev/null 2>&1; then + info "Installiere OCR-Tools (tesseract, poppler-utils)..." + apt-get install -y tesseract-ocr tesseract-ocr-deu poppler-utils 2>/dev/null || \ + warn "OCR-Tools konnten nicht installiert werden — OCR wird deaktiviert" +fi +if command -v tesseract >/dev/null 2>&1 && command -v pdftotext >/dev/null 2>&1; then + log "OCR-Tools verfügbar (tesseract $(tesseract --version 2>&1 | head -1 | awk '{print $2}'))" +fi + # ── Quellcode holen ─────────────────────────────────────────────────────── if [[ -d "$BUILD_DIR/.git" ]]; then