6d835aefac
Mit systemd ProtectSystem=strict ist /tmp fuer den Service read-only. ocr.SetTempDir(storage_path/ocr-tmp) nutzt einen RW-Pfad innerhalb der ohnehin freigegebenen ReadWritePaths.
134 lines
3.7 KiB
Go
134 lines
3.7 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"flag"
|
|
"log/slog"
|
|
"os"
|
|
"time"
|
|
|
|
"archivmail/config"
|
|
"archivmail/internal/index"
|
|
"archivmail/internal/ocr"
|
|
"archivmail/internal/storage"
|
|
)
|
|
|
|
// runOCRReprocess re-runs OCR for selected mails. It loads matching IDs from
|
|
// the DB by ocr_status, queues them on the OCR worker, then waits for the
|
|
// worker to drain.
|
|
//
|
|
// Usage:
|
|
//
|
|
// archivmail ocr-reprocess --config /etc/archivmail/config.yml
|
|
// archivmail ocr-reprocess --tenant 1 --status failed
|
|
// archivmail ocr-reprocess --status all --limit 1000
|
|
func runOCRReprocess(args []string) {
|
|
fs := flag.NewFlagSet("ocr-reprocess", flag.ExitOnError)
|
|
configPath := fs.String("config", "/etc/archivmail/config.yml", "path to config file")
|
|
tenantIDFlag := fs.Int64("tenant", 0, "tenant ID (0 = all tenants)")
|
|
statusFlag := fs.String("status", "pending", "ocr_status filter: pending|done|failed|skipped|disabled|all")
|
|
limitFlag := fs.Int("limit", 0, "max number of mails to process (0 = no limit)")
|
|
fs.Parse(args)
|
|
|
|
logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo}))
|
|
|
|
cfg, err := config.Load(*configPath)
|
|
if err != nil {
|
|
logger.Error("failed to load config", "err", err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
storeCfg := storage.Config{
|
|
Dir: cfg.Storage.StorePath,
|
|
Keyfile: cfg.Storage.Keyfile,
|
|
DSN: cfg.Database.DSN(),
|
|
CompressEnabled: cfg.Storage.Compress,
|
|
}
|
|
mailStore, err := storage.New(storeCfg)
|
|
if err != nil {
|
|
logger.Error("storage init failed", "err", err)
|
|
os.Exit(1)
|
|
}
|
|
defer mailStore.Close()
|
|
|
|
indexBackend := cfg.Index.Backend
|
|
if indexBackend == "" {
|
|
indexBackend = "manticore"
|
|
}
|
|
if indexBackend != "manticore" {
|
|
logger.Error("ocr-reprocess requires the manticore backend", "configured", indexBackend)
|
|
os.Exit(1)
|
|
}
|
|
dsn := cfg.Index.ManticoreDSN
|
|
if dsn == "" {
|
|
dsn = "manticore@tcp(127.0.0.1:9306)/"
|
|
}
|
|
idxMgr, err := index.NewManticoreTenantManager(dsn)
|
|
if err != nil {
|
|
logger.Error("manticore init failed", "err", err)
|
|
os.Exit(1)
|
|
}
|
|
defer idxMgr.Close()
|
|
|
|
if !ocr.IsAvailable() {
|
|
ts := ocr.CheckTools()
|
|
logger.Warn("ocr tools not on PATH — install tesseract-ocr + poppler-utils",
|
|
"pdftotext", ts.HasPdftotext, "tesseract", ts.HasTesseract, "pdftoppm", ts.HasPdftoppm)
|
|
}
|
|
// Keep scratch space inside the storage dir (matches the daemon's setup).
|
|
ocr.SetTempDir(cfg.Storage.StorePath + "/ocr-tmp")
|
|
|
|
ctx := context.Background()
|
|
|
|
var tenantPtr *int64
|
|
if *tenantIDFlag > 0 {
|
|
t := *tenantIDFlag
|
|
tenantPtr = &t
|
|
}
|
|
|
|
mails, err := mailStore.GetMailsByOCRStatus(ctx, *statusFlag, tenantPtr, *limitFlag)
|
|
if err != nil {
|
|
logger.Error("failed to list mails", "err", err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
logger.Info("ocr-reprocess: starting",
|
|
"status", *statusFlag, "tenant", *tenantIDFlag, "count", len(mails))
|
|
if len(mails) == 0 {
|
|
return
|
|
}
|
|
|
|
// Queue size needs to fit the entire batch so Submit never drops.
|
|
qSize := len(mails) + 16
|
|
worker := ocr.NewWorker(mailStore, idxMgr, ocr.Options{
|
|
Workers: 2,
|
|
QueueSize: qSize,
|
|
Logger: logger,
|
|
})
|
|
worker.Start(ctx)
|
|
|
|
// Reset status to 'pending' before submission so the worker actually picks
|
|
// them up — for callers that want to reprocess 'failed'/'skipped' mails.
|
|
if *statusFlag != "pending" && *statusFlag != "all" {
|
|
for _, m := range mails {
|
|
_ = mailStore.SetOCRStatus(ctx, m.ID, "pending")
|
|
}
|
|
}
|
|
|
|
for _, m := range mails {
|
|
worker.Submit(m.ID, m.TenantID)
|
|
}
|
|
|
|
// Periodic progress while waiting for the queue to drain.
|
|
tick := time.NewTicker(5 * time.Second)
|
|
defer tick.Stop()
|
|
go func() {
|
|
for range tick.C {
|
|
logger.Info("ocr-reprocess: progress", "queue_remaining", worker.QueueLen())
|
|
}
|
|
}()
|
|
|
|
worker.Stop() // waits for all in-flight jobs
|
|
logger.Info("ocr-reprocess: complete", "submitted", len(mails))
|
|
}
|