feat(PROJ-44): ocr_chars-Spalte + SetOCRResult-Helper
DB-Schema bekommt eine idempotente ocr_chars BIGINT-Spalte (Default 0). SetOCRResult schreibt status und chars atomar; GetOCRMeta liest beide mit COALESCE-Defaults. Der OCR-Worker ersetzt jeden SetOCRStatus-Call durch SetOCRResult und uebergibt die extrahierte Zeichenzahl bei 'done'.
This commit is contained in:
+10
-9
@@ -137,26 +137,26 @@ func (w *Worker) process(ctx context.Context, job Job) {
|
|||||||
logger := w.logger.With("mail_id", job.MailID, "tenant_id", job.TenantID)
|
logger := w.logger.With("mail_id", job.MailID, "tenant_id", job.TenantID)
|
||||||
|
|
||||||
if w.store.OCREnabled(ctx, job.TenantID) == false {
|
if w.store.OCREnabled(ctx, job.TenantID) == false {
|
||||||
_ = w.store.SetOCRStatus(ctx, job.MailID, "disabled")
|
_ = w.store.SetOCRResult(ctx, job.MailID, "disabled", 0)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
raw, err := w.store.Load(job.MailID)
|
raw, err := w.store.Load(job.MailID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Warn("ocr worker: load failed", "err", err)
|
logger.Warn("ocr worker: load failed", "err", err)
|
||||||
_ = w.store.SetOCRStatus(ctx, job.MailID, "failed")
|
_ = w.store.SetOCRResult(ctx, job.MailID, "failed", 0)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
pm, err := mailparser.Parse(raw)
|
pm, err := mailparser.Parse(raw)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Warn("ocr worker: parse failed", "err", err)
|
logger.Warn("ocr worker: parse failed", "err", err)
|
||||||
_ = w.store.SetOCRStatus(ctx, job.MailID, "failed")
|
_ = w.store.SetOCRResult(ctx, job.MailID, "failed", 0)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(pm.Attachments) == 0 {
|
if len(pm.Attachments) == 0 {
|
||||||
_ = w.store.SetOCRStatus(ctx, job.MailID, "skipped")
|
_ = w.store.SetOCRResult(ctx, job.MailID, "skipped", 0)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -183,7 +183,7 @@ func (w *Worker) process(ctx context.Context, job Job) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if processed == 0 {
|
if processed == 0 {
|
||||||
_ = w.store.SetOCRStatus(ctx, job.MailID, "skipped")
|
_ = w.store.SetOCRResult(ctx, job.MailID, "skipped", 0)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -191,15 +191,16 @@ func (w *Worker) process(ctx context.Context, job Job) {
|
|||||||
updater, ok := idx.(index.AttachmentTextUpdater)
|
updater, ok := idx.(index.AttachmentTextUpdater)
|
||||||
if !ok {
|
if !ok {
|
||||||
logger.Warn("ocr worker: indexer does not support AttachmentTextUpdater — text dropped")
|
logger.Warn("ocr worker: indexer does not support AttachmentTextUpdater — text dropped")
|
||||||
_ = w.store.SetOCRStatus(ctx, job.MailID, "failed")
|
_ = w.store.SetOCRResult(ctx, job.MailID, "failed", 0)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if err := updater.UpdateAttachmentText(job.MailID, combined.String()); err != nil {
|
if err := updater.UpdateAttachmentText(job.MailID, combined.String()); err != nil {
|
||||||
logger.Warn("ocr worker: index update failed", "err", err)
|
logger.Warn("ocr worker: index update failed", "err", err)
|
||||||
_ = w.store.SetOCRStatus(ctx, job.MailID, "failed")
|
_ = w.store.SetOCRResult(ctx, job.MailID, "failed", 0)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
_ = w.store.SetOCRStatus(ctx, job.MailID, "done")
|
chars := int64(combined.Len())
|
||||||
logger.Info("ocr worker: indexed", "attachments", processed, "chars", combined.Len())
|
_ = w.store.SetOCRResult(ctx, job.MailID, "done", chars)
|
||||||
|
logger.Info("ocr worker: indexed", "attachments", processed, "chars", chars)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -40,6 +40,57 @@ func (s *Store) SetOCRStatus(ctx context.Context, id, status string) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetOCRResult atomically writes both ocr_status and ocr_chars in one UPDATE.
|
||||||
|
// Used by the OCR worker after a job completes (PROJ-44).
|
||||||
|
// chars must be >= 0; for status='failed'/'skipped'/'disabled' callers pass 0.
|
||||||
|
// Silently no-ops when no DB is configured.
|
||||||
|
func (s *Store) SetOCRResult(ctx context.Context, id, status string, chars int64) error {
|
||||||
|
if s.db == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if id == "" {
|
||||||
|
return errors.New("storage: SetOCRResult: empty id")
|
||||||
|
}
|
||||||
|
switch status {
|
||||||
|
case "pending", "done", "failed", "skipped", "disabled":
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("storage: SetOCRResult: invalid status %q", status)
|
||||||
|
}
|
||||||
|
if chars < 0 {
|
||||||
|
chars = 0
|
||||||
|
}
|
||||||
|
_, err := s.db.Exec(ctx,
|
||||||
|
`UPDATE emails SET ocr_status = $1, ocr_chars = $2 WHERE id = $3`,
|
||||||
|
status, chars, id,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("storage: set ocr result: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetOCRMeta returns ocr_status (defaulting to "pending" if NULL) and
|
||||||
|
// ocr_chars (defaulting to 0) for a single mail. Returns "", 0, nil when no
|
||||||
|
// DB is configured or the mail is not found.
|
||||||
|
func (s *Store) GetOCRMeta(ctx context.Context, id string) (status string, chars int64, err error) {
|
||||||
|
if s.db == nil {
|
||||||
|
return "", 0, nil
|
||||||
|
}
|
||||||
|
if id == "" {
|
||||||
|
return "", 0, errors.New("storage: GetOCRMeta: empty id")
|
||||||
|
}
|
||||||
|
row := s.db.QueryRow(ctx,
|
||||||
|
`SELECT COALESCE(ocr_status, 'pending'), COALESCE(ocr_chars, 0)
|
||||||
|
FROM emails WHERE id = $1`, id)
|
||||||
|
if scanErr := row.Scan(&status, &chars); scanErr != nil {
|
||||||
|
if errors.Is(scanErr, pgx.ErrNoRows) {
|
||||||
|
return "", 0, nil
|
||||||
|
}
|
||||||
|
return "", 0, fmt.Errorf("storage: get ocr meta: %w", scanErr)
|
||||||
|
}
|
||||||
|
return status, chars, nil
|
||||||
|
}
|
||||||
|
|
||||||
// OCREnabled reports whether OCR processing should run for the given tenant.
|
// OCREnabled reports whether OCR processing should run for the given tenant.
|
||||||
// Defaults to true when:
|
// Defaults to true when:
|
||||||
// - no DB is configured (DB-less mode)
|
// - no DB is configured (DB-less mode)
|
||||||
|
|||||||
@@ -317,6 +317,15 @@ func (s *Store) initSchema(ctx context.Context) error {
|
|||||||
ALTER TABLE emails ADD COLUMN IF NOT EXISTS ocr_status TEXT DEFAULT 'pending';
|
ALTER TABLE emails ADD COLUMN IF NOT EXISTS ocr_status TEXT DEFAULT 'pending';
|
||||||
CREATE INDEX IF NOT EXISTS idx_emails_ocr_status ON emails (ocr_status) WHERE ocr_status = 'pending';
|
CREATE INDEX IF NOT EXISTS idx_emails_ocr_status ON emails (ocr_status) WHERE ocr_status = 'pending';
|
||||||
`)
|
`)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// PROJ-44: gecachte Anzahl der extrahierten OCR-Zeichen — vermeidet einen
|
||||||
|
// Manticore-Roundtrip auf der Mail-Detail-Seite. Idempotent wie ocr_status.
|
||||||
|
_, err = s.db.Exec(ctx, `
|
||||||
|
ALTER TABLE emails ADD COLUMN IF NOT EXISTS ocr_chars BIGINT DEFAULT 0;
|
||||||
|
`)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user