package index import ( "database/sql" "fmt" "hash/fnv" "regexp" "strings" "sync" "time" _ "github.com/go-sql-driver/mysql" ) // validTableName guards against SQL injection via table name interpolation. // Only emails_global and emails_tenant_ are valid Manticore RT tables. var validTableName = regexp.MustCompile(`^emails_(global|tenant_\d+)$`) // manticoreIndex implements Indexer against a single Manticore RT table. type manticoreIndex struct { db *sql.DB table string } // ManticoreTenantManager implements TenantIndexer using Manticore Search // via the MySQL protocol. No CGO required — pure Go via database/sql. type ManticoreTenantManager struct { db *sql.DB mu sync.RWMutex pool map[int64]*manticoreIndex global *manticoreIndex } // NewManticoreTenantManager opens a Manticore connection, ensures the global // RT table exists, and returns a ready manager. func NewManticoreTenantManager(dsn string) (*ManticoreTenantManager, error) { db, err := sql.Open("mysql", dsn) if err != nil { return nil, fmt.Errorf("manticore: open: %w", err) } db.SetMaxOpenConns(16) db.SetMaxIdleConns(4) db.SetConnMaxLifetime(5 * time.Minute) if err := db.Ping(); err != nil { db.Close() return nil, fmt.Errorf("manticore: ping: %w", err) } m := &ManticoreTenantManager{ db: db, pool: make(map[int64]*manticoreIndex), } globalIdx := &manticoreIndex{db: db, table: "emails_global"} if err := globalIdx.ensureTable(); err != nil { db.Close() return nil, fmt.Errorf("manticore: ensure global table: %w", err) } m.global = globalIdx return m, nil } // ForTenant returns the Indexer for the given tenant, creating the RT table on first use. // A nil or zero tenantID falls back to the global index. func (m *ManticoreTenantManager) ForTenant(tenantID *int64) Indexer { if tenantID == nil || *tenantID == 0 { return m.global } tid := *tenantID m.mu.RLock() idx, ok := m.pool[tid] m.mu.RUnlock() if ok { return idx } m.mu.Lock() defer m.mu.Unlock() // Double-check after acquiring write lock. if idx, ok = m.pool[tid]; ok { return idx } idx = &manticoreIndex{db: m.db, table: manticoreTableName(&tid)} if err := idx.ensureTable(); err != nil { // Return global as safe fallback; error is logged via caller. return m.global } m.pool[tid] = idx return idx } // Global returns the global (non-tenant) Indexer. func (m *ManticoreTenantManager) Global() Indexer { return m.global } // Close closes the shared database connection. func (m *ManticoreTenantManager) Close() error { return m.db.Close() } // ── manticoreIndex methods ──────────────────────────────────────────────── // ensureTable creates the RT index if it does not yet exist and applies // idempotent column additions for schema migrations. func (idx *manticoreIndex) ensureTable() error { stmt := fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s ( mail_id string, subject text, from_addr text, to_addr text, body text, attachment_names text, attachment_text text, has_attachment uint, date_ts bigint, size_bytes bigint ) type='rt' morphology='stem_en,lemmatize_de_all'`, idx.table) if _, err := idx.db.Exec(stmt); err != nil { return fmt.Errorf("ensureTable %s: %w", idx.table, err) } // PROJ-35: ALTER existing tables to add attachment_text. Manticore lacks // ALTER IF NOT EXISTS, so we DESC first and only add when missing. if err := idx.ensureColumn("attachment_text", "text"); err != nil { return err } return nil } // ensureColumn checks DESC for the named column and adds it via // ALTER TABLE when missing. Safe to call repeatedly. func (idx *manticoreIndex) ensureColumn(name, typ string) error { rows, err := idx.db.Query(fmt.Sprintf("DESC %s", idx.table)) if err != nil { return fmt.Errorf("desc %s: %w", idx.table, err) } defer rows.Close() for rows.Next() { var field, fieldType string var props sql.NullString if err := rows.Scan(&field, &fieldType, &props); err != nil { return fmt.Errorf("desc scan %s: %w", idx.table, err) } if field == name { return nil } } if err := rows.Err(); err != nil { return fmt.Errorf("desc rows %s: %w", idx.table, err) } if _, err := idx.db.Exec(fmt.Sprintf("ALTER TABLE %s ADD COLUMN %s %s", idx.table, name, typ)); err != nil { return fmt.Errorf("alter %s add %s: %w", idx.table, name, err) } return nil } // IndexSync inserts or replaces a document in the RT index. func (idx *manticoreIndex) IndexSync(doc MailDocument) error { rowID := hashMailID(doc.ID) hasAttach := uint64(0) if doc.HasAttachment { hasAttach = 1 } var dateTS int64 if !doc.Date.IsZero() { dateTS = doc.Date.Unix() } _, err := idx.db.Exec( fmt.Sprintf(`REPLACE INTO %s (id, mail_id, subject, from_addr, to_addr, body, attachment_names, attachment_text, has_attachment, date_ts, size_bytes) VALUES (?,?,?,?,?,?,?,?,?,?,?)`, idx.table), rowID, doc.ID, doc.Subject, doc.From, doc.To, doc.Body, doc.AttachNames, doc.AttachmentText, hasAttach, dateTS, doc.Size, ) if err != nil { return fmt.Errorf("manticore IndexSync %s: %w", idx.table, err) } return nil } // UpdateAttachmentText partially updates only the attachment_text field of an // already-indexed document. Implements index.AttachmentTextUpdater. // // Manticore RT indexes do not support UPDATE on text columns, so this // re-fetches the full row and issues a REPLACE INTO with all fields preserved // and attachment_text overwritten. Returns sql.ErrNoRows-style nil result if // the document is not yet indexed (mail must be ingested first). func (idx *manticoreIndex) UpdateAttachmentText(mailID, text string) error { rowID := hashMailID(mailID) row := idx.db.QueryRow(fmt.Sprintf( `SELECT mail_id, subject, from_addr, to_addr, body, attachment_names, has_attachment, date_ts, size_bytes FROM %s WHERE id = ? LIMIT 1`, idx.table), rowID, ) var ( mid, subj, from, to, body, attachNames string hasAttach uint64 dateTS, sizeBytes int64 ) if err := row.Scan(&mid, &subj, &from, &to, &body, &attachNames, &hasAttach, &dateTS, &sizeBytes); err != nil { return fmt.Errorf("manticore UpdateAttachmentText %s: load row: %w", idx.table, err) } if _, err := idx.db.Exec( fmt.Sprintf(`REPLACE INTO %s (id, mail_id, subject, from_addr, to_addr, body, attachment_names, attachment_text, has_attachment, date_ts, size_bytes) VALUES (?,?,?,?,?,?,?,?,?,?,?)`, idx.table), rowID, mid, subj, from, to, body, attachNames, text, hasAttach, dateTS, sizeBytes, ); err != nil { return fmt.Errorf("manticore UpdateAttachmentText %s: replace: %w", idx.table, err) } return nil } // Delete removes a document by mail ID hash. func (idx *manticoreIndex) Delete(id string) error { rowID := hashMailID(id) _, err := idx.db.Exec( fmt.Sprintf("DELETE FROM %s WHERE id = ?", idx.table), rowID, ) if err != nil { return fmt.Errorf("manticore Delete %s: %w", idx.table, err) } return nil } // Search executes a full-text + filter query against the RT index. func (idx *manticoreIndex) Search(req SearchRequest) (*SearchResult, error) { var matchParts []string if req.Query != "" { matchParts = append(matchParts, escapeManticoreMatch(req.Query)) } if req.From != "" { matchParts = append(matchParts, fmt.Sprintf("@from_addr %s", escapeManticoreMatch(req.From))) } if req.To != "" { matchParts = append(matchParts, fmt.Sprintf("@to_addr %s", escapeManticoreMatch(req.To))) } if req.OwnEmail != "" { matchParts = append(matchParts, fmt.Sprintf("@(from_addr,to_addr) %s", escapeManticoreMatch(req.OwnEmail))) } hasMatch := len(matchParts) > 0 var whereParts []string var args []interface{} if hasMatch { whereParts = append(whereParts, "MATCH(?)") args = append(args, strings.Join(matchParts, " ")) } if req.DateFrom != nil { whereParts = append(whereParts, "date_ts >= ?") args = append(args, req.DateFrom.Unix()) } if req.DateTo != nil { whereParts = append(whereParts, "date_ts <= ?") args = append(args, req.DateTo.Unix()) } if req.HasAttachment != nil { if *req.HasAttachment { whereParts = append(whereParts, "has_attachment = 1") } else { whereParts = append(whereParts, "has_attachment = 0") } } whereClause := "" if len(whereParts) > 0 { whereClause = "WHERE " + strings.Join(whereParts, " AND ") } // COUNT query for total. countArgs := make([]interface{}, len(args)) copy(countArgs, args) countSQL := fmt.Sprintf( "SELECT COUNT(*) FROM %s %s OPTION max_matches=1000000", idx.table, whereClause, ) var total int if err := idx.db.QueryRow(countSQL, countArgs...).Scan(&total); err != nil { return nil, fmt.Errorf("manticore Search count %s: %w", idx.table, err) } pageSize := req.PageSize if pageSize <= 0 { pageSize = 20 } page := req.Page if page <= 0 { page = 1 } offset := (page - 1) * pageSize // Score expression and ORDER BY. scoreExpr := "0 as score" orderBy := "date_ts DESC" if hasMatch { scoreExpr = "WEIGHT() as score" switch req.Sort { case "relevance": orderBy = "WEIGHT() DESC, date_ts DESC" case "date_asc": orderBy = "date_ts ASC" default: orderBy = "date_ts DESC" } } else { switch req.Sort { case "date_asc": orderBy = "date_ts ASC" default: orderBy = "date_ts DESC" } } selectSQL := fmt.Sprintf( "SELECT mail_id, %s FROM %s %s ORDER BY %s LIMIT ? OFFSET ? OPTION max_matches=10000", scoreExpr, idx.table, whereClause, orderBy, ) selectArgs := make([]interface{}, len(args)) copy(selectArgs, args) selectArgs = append(selectArgs, pageSize, offset) rows, err := idx.db.Query(selectSQL, selectArgs...) if err != nil { return nil, fmt.Errorf("manticore Search select %s: %w", idx.table, err) } defer rows.Close() var hits []Hit for rows.Next() { var mailID string var score float64 if err := rows.Scan(&mailID, &score); err != nil { return nil, fmt.Errorf("manticore Search scan: %w", err) } hits = append(hits, Hit{ID: mailID, Score: score}) } if err := rows.Err(); err != nil { return nil, fmt.Errorf("manticore Search rows: %w", err) } return &SearchResult{Total: total, Hits: hits}, nil } // Close is a no-op for individual indexes — the shared DB connection is managed // by ManticoreTenantManager. func (idx *manticoreIndex) Close() error { return nil } // ── helpers ──────────────────────────────────────────────────────────────── // hashMailID returns a stable signed int64 row ID derived from the mail's // SHA-256 string ID. Manticore's `id` column is bigint (signed int64); the // mysql driver formats parameters as decimal strings, and Manticore rejects // values outside the signed range with "number ... is out of range". // // We FNV-64a-hash the string and reinterpret the resulting uint64 as int64 // (verlustfreier Bit-Cast). All bit patterns map 1:1, so already-indexed // documents stay reachable — values whose top bit was set previously // became negative IDs; their bit pattern is unchanged, only the SQL // rendering differs and now matches what Manticore expects. func hashMailID(id string) int64 { h := fnv.New64a() h.Write([]byte(id)) return int64(h.Sum64()) } // manticoreTableName returns the RT table name for a given tenant. // nil / 0 → emails_global, otherwise emails_tenant_. // Panics if the resulting name does not match validTableName — this would // indicate a programming error, not a runtime condition. func manticoreTableName(tenantID *int64) string { var name string if tenantID == nil || *tenantID == 0 { name = "emails_global" } else { name = fmt.Sprintf("emails_tenant_%d", *tenantID) } if !validTableName.MatchString(name) { panic(fmt.Sprintf("manticore: invalid table name: %q", name)) } return name } // escapeManticoreMatch escapes characters that have special meaning in // Manticore MATCH() expressions to prevent query injection. func escapeManticoreMatch(s string) string { specials := `\()|!@~"/^$=<` var b strings.Builder b.Grow(len(s)) for _, c := range s { if strings.ContainsRune(specials, c) { b.WriteRune('\\') } b.WriteRune(c) } return b.String() }