Files
archivmail/internal/index/manticore.go
T
sysops 0bda21033e feat(PROJ-35): OCR & Anhang-Volltext-Indexierung
Asynchrone OCR fuer PDF- und Bild-Anhaenge via tesseract + poppler-utils.
Extrahierter Text wird in Manticore (attachment_text) gespeichert und ist
ueber die normale Volltextsuche auffindbar.

- internal/ocr: ExtractText + Worker (queue + drain)
- internal/storage/ocr.go: SetOCRStatus, OCREnabled, GetMailsByOCRStatus
- emails.ocr_status (pending|done|failed|skipped|disabled)
- tenants.ocr_enabled (Default TRUE, opt-out)
- Manticore: attachment_text-Feld + UpdateAttachmentText
- Boot-resume: pending Jobs nach Restart automatisch in die Queue
- CLI: archivmail ocr-reprocess --tenant N --status pending|failed|all
- update.sh: tesseract-ocr + poppler-utils optional installieren
2026-05-08 22:11:17 +02:00

409 lines
12 KiB
Go

package index
import (
"database/sql"
"fmt"
"hash/fnv"
"regexp"
"strings"
"sync"
"time"
_ "github.com/go-sql-driver/mysql"
)
// validTableName guards against SQL injection via table name interpolation.
// Only emails_global and emails_tenant_<digits> are valid Manticore RT tables.
var validTableName = regexp.MustCompile(`^emails_(global|tenant_\d+)$`)
// manticoreIndex implements Indexer against a single Manticore RT table.
type manticoreIndex struct {
db *sql.DB
table string
}
// ManticoreTenantManager implements TenantIndexer using Manticore Search
// via the MySQL protocol. No CGO required — pure Go via database/sql.
type ManticoreTenantManager struct {
db *sql.DB
mu sync.RWMutex
pool map[int64]*manticoreIndex
global *manticoreIndex
}
// NewManticoreTenantManager opens a Manticore connection, ensures the global
// RT table exists, and returns a ready manager.
func NewManticoreTenantManager(dsn string) (*ManticoreTenantManager, error) {
db, err := sql.Open("mysql", dsn)
if err != nil {
return nil, fmt.Errorf("manticore: open: %w", err)
}
db.SetMaxOpenConns(16)
db.SetMaxIdleConns(4)
db.SetConnMaxLifetime(5 * time.Minute)
if err := db.Ping(); err != nil {
db.Close()
return nil, fmt.Errorf("manticore: ping: %w", err)
}
m := &ManticoreTenantManager{
db: db,
pool: make(map[int64]*manticoreIndex),
}
globalIdx := &manticoreIndex{db: db, table: "emails_global"}
if err := globalIdx.ensureTable(); err != nil {
db.Close()
return nil, fmt.Errorf("manticore: ensure global table: %w", err)
}
m.global = globalIdx
return m, nil
}
// ForTenant returns the Indexer for the given tenant, creating the RT table on first use.
// A nil or zero tenantID falls back to the global index.
func (m *ManticoreTenantManager) ForTenant(tenantID *int64) Indexer {
if tenantID == nil || *tenantID == 0 {
return m.global
}
tid := *tenantID
m.mu.RLock()
idx, ok := m.pool[tid]
m.mu.RUnlock()
if ok {
return idx
}
m.mu.Lock()
defer m.mu.Unlock()
// Double-check after acquiring write lock.
if idx, ok = m.pool[tid]; ok {
return idx
}
idx = &manticoreIndex{db: m.db, table: manticoreTableName(&tid)}
if err := idx.ensureTable(); err != nil {
// Return global as safe fallback; error is logged via caller.
return m.global
}
m.pool[tid] = idx
return idx
}
// Global returns the global (non-tenant) Indexer.
func (m *ManticoreTenantManager) Global() Indexer {
return m.global
}
// Close closes the shared database connection.
func (m *ManticoreTenantManager) Close() error {
return m.db.Close()
}
// ── manticoreIndex methods ────────────────────────────────────────────────
// ensureTable creates the RT index if it does not yet exist and applies
// idempotent column additions for schema migrations.
func (idx *manticoreIndex) ensureTable() error {
stmt := fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s (
mail_id string,
subject text,
from_addr text,
to_addr text,
body text,
attachment_names text,
attachment_text text,
has_attachment uint,
date_ts bigint,
size_bytes bigint
) type='rt' morphology='stem_en,lemmatize_de_all'`, idx.table)
if _, err := idx.db.Exec(stmt); err != nil {
return fmt.Errorf("ensureTable %s: %w", idx.table, err)
}
// PROJ-35: ALTER existing tables to add attachment_text. Manticore lacks
// ALTER IF NOT EXISTS, so we DESC first and only add when missing.
if err := idx.ensureColumn("attachment_text", "text"); err != nil {
return err
}
return nil
}
// ensureColumn checks DESC <table> for the named column and adds it via
// ALTER TABLE when missing. Safe to call repeatedly.
func (idx *manticoreIndex) ensureColumn(name, typ string) error {
rows, err := idx.db.Query(fmt.Sprintf("DESC %s", idx.table))
if err != nil {
return fmt.Errorf("desc %s: %w", idx.table, err)
}
defer rows.Close()
for rows.Next() {
var field, fieldType string
var props sql.NullString
if err := rows.Scan(&field, &fieldType, &props); err != nil {
return fmt.Errorf("desc scan %s: %w", idx.table, err)
}
if field == name {
return nil
}
}
if err := rows.Err(); err != nil {
return fmt.Errorf("desc rows %s: %w", idx.table, err)
}
if _, err := idx.db.Exec(fmt.Sprintf("ALTER TABLE %s ADD COLUMN %s %s", idx.table, name, typ)); err != nil {
return fmt.Errorf("alter %s add %s: %w", idx.table, name, err)
}
return nil
}
// IndexSync inserts or replaces a document in the RT index.
func (idx *manticoreIndex) IndexSync(doc MailDocument) error {
rowID := hashMailID(doc.ID)
hasAttach := uint64(0)
if doc.HasAttachment {
hasAttach = 1
}
var dateTS int64
if !doc.Date.IsZero() {
dateTS = doc.Date.Unix()
}
_, err := idx.db.Exec(
fmt.Sprintf(`REPLACE INTO %s
(id, mail_id, subject, from_addr, to_addr, body, attachment_names, attachment_text, has_attachment, date_ts, size_bytes)
VALUES (?,?,?,?,?,?,?,?,?,?,?)`, idx.table),
rowID,
doc.ID,
doc.Subject,
doc.From,
doc.To,
doc.Body,
doc.AttachNames,
doc.AttachmentText,
hasAttach,
dateTS,
doc.Size,
)
if err != nil {
return fmt.Errorf("manticore IndexSync %s: %w", idx.table, err)
}
return nil
}
// UpdateAttachmentText partially updates only the attachment_text field of an
// already-indexed document. Implements index.AttachmentTextUpdater.
//
// Manticore RT indexes do not support UPDATE on text columns, so this
// re-fetches the full row and issues a REPLACE INTO with all fields preserved
// and attachment_text overwritten. Returns sql.ErrNoRows-style nil result if
// the document is not yet indexed (mail must be ingested first).
func (idx *manticoreIndex) UpdateAttachmentText(mailID, text string) error {
rowID := hashMailID(mailID)
row := idx.db.QueryRow(fmt.Sprintf(
`SELECT mail_id, subject, from_addr, to_addr, body, attachment_names,
has_attachment, date_ts, size_bytes
FROM %s WHERE id = ? LIMIT 1`, idx.table),
rowID,
)
var (
mid, subj, from, to, body, attachNames string
hasAttach uint64
dateTS, sizeBytes int64
)
if err := row.Scan(&mid, &subj, &from, &to, &body, &attachNames, &hasAttach, &dateTS, &sizeBytes); err != nil {
return fmt.Errorf("manticore UpdateAttachmentText %s: load row: %w", idx.table, err)
}
if _, err := idx.db.Exec(
fmt.Sprintf(`REPLACE INTO %s
(id, mail_id, subject, from_addr, to_addr, body, attachment_names, attachment_text, has_attachment, date_ts, size_bytes)
VALUES (?,?,?,?,?,?,?,?,?,?,?)`, idx.table),
rowID, mid, subj, from, to, body, attachNames, text, hasAttach, dateTS, sizeBytes,
); err != nil {
return fmt.Errorf("manticore UpdateAttachmentText %s: replace: %w", idx.table, err)
}
return nil
}
// Delete removes a document by mail ID hash.
func (idx *manticoreIndex) Delete(id string) error {
rowID := hashMailID(id)
_, err := idx.db.Exec(
fmt.Sprintf("DELETE FROM %s WHERE id = ?", idx.table),
rowID,
)
if err != nil {
return fmt.Errorf("manticore Delete %s: %w", idx.table, err)
}
return nil
}
// Search executes a full-text + filter query against the RT index.
func (idx *manticoreIndex) Search(req SearchRequest) (*SearchResult, error) {
var matchParts []string
if req.Query != "" {
matchParts = append(matchParts, escapeManticoreMatch(req.Query))
}
if req.From != "" {
matchParts = append(matchParts, fmt.Sprintf("@from_addr %s", escapeManticoreMatch(req.From)))
}
if req.To != "" {
matchParts = append(matchParts, fmt.Sprintf("@to_addr %s", escapeManticoreMatch(req.To)))
}
if req.OwnEmail != "" {
matchParts = append(matchParts, fmt.Sprintf("@(from_addr,to_addr) %s", escapeManticoreMatch(req.OwnEmail)))
}
hasMatch := len(matchParts) > 0
var whereParts []string
var args []interface{}
if hasMatch {
whereParts = append(whereParts, "MATCH(?)")
args = append(args, strings.Join(matchParts, " "))
}
if req.DateFrom != nil {
whereParts = append(whereParts, "date_ts >= ?")
args = append(args, req.DateFrom.Unix())
}
if req.DateTo != nil {
whereParts = append(whereParts, "date_ts <= ?")
args = append(args, req.DateTo.Unix())
}
if req.HasAttachment != nil {
if *req.HasAttachment {
whereParts = append(whereParts, "has_attachment = 1")
} else {
whereParts = append(whereParts, "has_attachment = 0")
}
}
whereClause := ""
if len(whereParts) > 0 {
whereClause = "WHERE " + strings.Join(whereParts, " AND ")
}
// COUNT query for total.
countArgs := make([]interface{}, len(args))
copy(countArgs, args)
countSQL := fmt.Sprintf(
"SELECT COUNT(*) FROM %s %s OPTION max_matches=1000000",
idx.table, whereClause,
)
var total int
if err := idx.db.QueryRow(countSQL, countArgs...).Scan(&total); err != nil {
return nil, fmt.Errorf("manticore Search count %s: %w", idx.table, err)
}
pageSize := req.PageSize
if pageSize <= 0 {
pageSize = 20
}
page := req.Page
if page <= 0 {
page = 1
}
offset := (page - 1) * pageSize
// Score expression and ORDER BY.
scoreExpr := "0 as score"
orderBy := "date_ts DESC"
if hasMatch {
scoreExpr = "WEIGHT() as score"
switch req.Sort {
case "relevance":
orderBy = "WEIGHT() DESC, date_ts DESC"
case "date_asc":
orderBy = "date_ts ASC"
default:
orderBy = "date_ts DESC"
}
} else {
switch req.Sort {
case "date_asc":
orderBy = "date_ts ASC"
default:
orderBy = "date_ts DESC"
}
}
selectSQL := fmt.Sprintf(
"SELECT mail_id, %s FROM %s %s ORDER BY %s LIMIT ? OFFSET ? OPTION max_matches=10000",
scoreExpr, idx.table, whereClause, orderBy,
)
selectArgs := make([]interface{}, len(args))
copy(selectArgs, args)
selectArgs = append(selectArgs, pageSize, offset)
rows, err := idx.db.Query(selectSQL, selectArgs...)
if err != nil {
return nil, fmt.Errorf("manticore Search select %s: %w", idx.table, err)
}
defer rows.Close()
var hits []Hit
for rows.Next() {
var mailID string
var score float64
if err := rows.Scan(&mailID, &score); err != nil {
return nil, fmt.Errorf("manticore Search scan: %w", err)
}
hits = append(hits, Hit{ID: mailID, Score: score})
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("manticore Search rows: %w", err)
}
return &SearchResult{Total: total, Hits: hits}, nil
}
// Close is a no-op for individual indexes — the shared DB connection is managed
// by ManticoreTenantManager.
func (idx *manticoreIndex) Close() error {
return nil
}
// ── helpers ────────────────────────────────────────────────────────────────
// hashMailID returns a stable uint64 row ID derived from the mail's SHA-256 string ID.
func hashMailID(id string) uint64 {
h := fnv.New64a()
h.Write([]byte(id))
return h.Sum64()
}
// manticoreTableName returns the RT table name for a given tenant.
// nil / 0 → emails_global, otherwise emails_tenant_<id>.
// Panics if the resulting name does not match validTableName — this would
// indicate a programming error, not a runtime condition.
func manticoreTableName(tenantID *int64) string {
var name string
if tenantID == nil || *tenantID == 0 {
name = "emails_global"
} else {
name = fmt.Sprintf("emails_tenant_%d", *tenantID)
}
if !validTableName.MatchString(name) {
panic(fmt.Sprintf("manticore: invalid table name: %q", name))
}
return name
}
// escapeManticoreMatch escapes characters that have special meaning in
// Manticore MATCH() expressions to prevent query injection.
func escapeManticoreMatch(s string) string {
specials := `\()|!@~"/^$=<`
var b strings.Builder
b.Grow(len(s))
for _, c := range s {
if strings.ContainsRune(specials, c) {
b.WriteRune('\\')
}
b.WriteRune(c)
}
return b.String()
}