Files
archivmail/internal/storage/ocr.go
T
sysops 5078830469 feat(PROJ-44): ocr_chars-Spalte + SetOCRResult-Helper
DB-Schema bekommt eine idempotente ocr_chars BIGINT-Spalte (Default 0).
SetOCRResult schreibt status und chars atomar; GetOCRMeta liest beide
mit COALESCE-Defaults. Der OCR-Worker ersetzt jeden SetOCRStatus-Call
durch SetOCRResult und uebergibt die extrahierte Zeichenzahl bei 'done'.
2026-05-10 22:20:46 +02:00

206 lines
5.7 KiB
Go

package storage
import (
"context"
"errors"
"fmt"
"github.com/jackc/pgx/v5"
)
// PROJ-35: OCR-Status-Verwaltung pro Mail.
// Status-Werte: pending | done | failed | skipped | disabled
// PendingOCRMail describes one entry in the OCR backlog. TenantID is nil
// when the mail has no tenant assignment (system-level / global).
type PendingOCRMail struct {
ID string
TenantID *int64
}
// SetOCRStatus updates emails.ocr_status for one mail.
// Valid values: pending | done | failed | skipped | disabled.
// Silently no-ops when no DB is configured.
func (s *Store) SetOCRStatus(ctx context.Context, id, status string) error {
if s.db == nil {
return nil
}
if id == "" {
return errors.New("storage: SetOCRStatus: empty id")
}
switch status {
case "pending", "done", "failed", "skipped", "disabled":
default:
return fmt.Errorf("storage: SetOCRStatus: invalid status %q", status)
}
_, err := s.db.Exec(ctx, `UPDATE emails SET ocr_status = $1 WHERE id = $2`, status, id)
if err != nil {
return fmt.Errorf("storage: set ocr status: %w", err)
}
return nil
}
// SetOCRResult atomically writes both ocr_status and ocr_chars in one UPDATE.
// Used by the OCR worker after a job completes (PROJ-44).
// chars must be >= 0; for status='failed'/'skipped'/'disabled' callers pass 0.
// Silently no-ops when no DB is configured.
func (s *Store) SetOCRResult(ctx context.Context, id, status string, chars int64) error {
if s.db == nil {
return nil
}
if id == "" {
return errors.New("storage: SetOCRResult: empty id")
}
switch status {
case "pending", "done", "failed", "skipped", "disabled":
default:
return fmt.Errorf("storage: SetOCRResult: invalid status %q", status)
}
if chars < 0 {
chars = 0
}
_, err := s.db.Exec(ctx,
`UPDATE emails SET ocr_status = $1, ocr_chars = $2 WHERE id = $3`,
status, chars, id,
)
if err != nil {
return fmt.Errorf("storage: set ocr result: %w", err)
}
return nil
}
// GetOCRMeta returns ocr_status (defaulting to "pending" if NULL) and
// ocr_chars (defaulting to 0) for a single mail. Returns "", 0, nil when no
// DB is configured or the mail is not found.
func (s *Store) GetOCRMeta(ctx context.Context, id string) (status string, chars int64, err error) {
if s.db == nil {
return "", 0, nil
}
if id == "" {
return "", 0, errors.New("storage: GetOCRMeta: empty id")
}
row := s.db.QueryRow(ctx,
`SELECT COALESCE(ocr_status, 'pending'), COALESCE(ocr_chars, 0)
FROM emails WHERE id = $1`, id)
if scanErr := row.Scan(&status, &chars); scanErr != nil {
if errors.Is(scanErr, pgx.ErrNoRows) {
return "", 0, nil
}
return "", 0, fmt.Errorf("storage: get ocr meta: %w", scanErr)
}
return status, chars, nil
}
// OCREnabled reports whether OCR processing should run for the given tenant.
// Defaults to true when:
// - no DB is configured (DB-less mode)
// - tenantID is nil (global / no tenant)
// - the tenants row cannot be read (degrade gracefully)
//
// OCR is opt-out, not opt-in.
func (s *Store) OCREnabled(ctx context.Context, tenantID *int64) bool {
if s.db == nil || tenantID == nil {
return true
}
var enabled bool
err := s.db.QueryRow(ctx,
`SELECT COALESCE(ocr_enabled, TRUE) FROM tenants WHERE id = $1`,
*tenantID,
).Scan(&enabled)
if errors.Is(err, pgx.ErrNoRows) {
return true
}
if err != nil {
// On unexpected errors (e.g. column missing on a freshly-initialised DB),
// default to enabled so OCR remains opt-out.
return true
}
return enabled
}
// GetPendingOCRMails returns up to limit mails with ocr_status='pending'.
// If tenantID != nil, results are restricted to mails referenced by that
// tenant (via email_refs) or whose emails.tenant_id matches.
// limit <= 0 means no limit.
func (s *Store) GetPendingOCRMails(ctx context.Context, tenantID *int64, limit int) ([]PendingOCRMail, error) {
return s.GetMailsByOCRStatus(ctx, "pending", tenantID, limit)
}
// GetMailsByOCRStatus returns mails matching a specific ocr_status filter.
// status="all" returns every mail regardless of status.
// If tenantID != nil, results are restricted to that tenant.
// limit <= 0 means no limit.
func (s *Store) GetMailsByOCRStatus(ctx context.Context, status string, tenantID *int64, limit int) ([]PendingOCRMail, error) {
if s.db == nil {
return nil, nil
}
args := []interface{}{}
whereParts := []string{}
if status != "" && status != "all" {
args = append(args, status)
whereParts = append(whereParts, fmt.Sprintf("COALESCE(e.ocr_status, 'pending') = $%d", len(args)))
}
joinClause := ""
if tenantID != nil {
args = append(args, *tenantID)
joinClause = fmt.Sprintf(
"LEFT JOIN email_refs r ON r.email_id = e.id AND r.tenant_id = $%d", len(args),
)
whereParts = append(whereParts,
fmt.Sprintf("(r.tenant_id = $%d OR e.tenant_id = $%d)", len(args), len(args)),
)
}
whereClause := ""
if len(whereParts) > 0 {
whereClause = "WHERE " + joinAnd(whereParts)
}
limitClause := ""
if limit > 0 {
args = append(args, limit)
limitClause = fmt.Sprintf("LIMIT $%d", len(args))
}
q := fmt.Sprintf(`
SELECT e.id, e.tenant_id
FROM emails e
%s
%s
ORDER BY e.received_at DESC
%s
`, joinClause, whereClause, limitClause)
rows, err := s.db.Query(ctx, q, args...)
if err != nil {
return nil, fmt.Errorf("storage: get mails by ocr status: %w", err)
}
defer rows.Close()
var out []PendingOCRMail
for rows.Next() {
var m PendingOCRMail
if err := rows.Scan(&m.ID, &m.TenantID); err != nil {
continue
}
out = append(out, m)
}
return out, rows.Err()
}
// joinAnd joins parts with " AND " — small helper to avoid pulling in strings
// for a single use.
func joinAnd(parts []string) string {
out := ""
for i, p := range parts {
if i > 0 {
out += " AND "
}
out += p
}
return out
}