feat(PROJ-30): Xapian → Manticore Search Migration

- internal/index/manticore.go: ManticoreTenantManager + manticoreIndex (RT-Indizes, CGO-frei)
- internal/index/index.go: TenantIndexer Interface (Xapian + Manticore)
- internal/index/tenant_worker.go: mgr-Typ auf TenantIndexer Interface
- internal/api/server.go: idxMgr auf TenantIndexer Interface
- config/config.go: IndexConfig.ManticoreDSN Feld
- cmd/archivmail/cmd_reindex.go: reindex Subkommando
- cmd/archivmail/main.go: Manticore-Branch + reindex Case
- go.mod: github.com/go-sql-driver/mysql v1.8.1
- update.sh: Manticore auto-install, CGO_ENABLED=0, config.yml migration, auto-reindex

fix(IMAP): TCP-Deadline-Wrapper für steckengebliebene Imports
fix(auth): Email-Claim in JWT für User-Isolation
fix(search): User-Isolation via sess.Email (fail-safe)
fix(ui): Admin-Login Auth-Cache, Logout-Redirect, IMAP-Polling-Resilienz

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
sysops
2026-04-03 21:19:36 +02:00
parent e90d588e30
commit a93a843506
19 changed files with 742 additions and 65 deletions
+29 -5
View File
@@ -171,6 +171,18 @@ func (s *Server) handleSearch(w http.ResponseWriter, r *http.Request) {
labelMap, _ = s.labels.GetLabelsForEmails(r.Context(), emailIDs)
}
// SEC: For user role, restrict results to mails the user is involved in
// (From, To, or CC). Email comes from the JWT session — no DB lookup needed.
// If email is missing for a user-role session, block all results (fail-safe).
var userEmailFilter string
if sess.Role == userstore.RoleUser {
userEmailFilter = strings.ToLower(sess.Email)
if userEmailFilter == "" {
writeJSON(w, http.StatusOK, map[string]interface{}{"total": 0, "hits": []interface{}{}})
return
}
}
enriched := make([]enrichedHit, 0, len(result.Hits))
for _, h := range result.Hits {
eh := enrichedHit{ID: h.ID, Score: h.Score}
@@ -186,6 +198,14 @@ func (s *Server) handleSearch(w http.ResponseWriter, r *http.Request) {
eh.Date = pm.Date.UTC().Format(time.RFC3339)
}
eh.HasAttachments = len(pm.Attachments) > 0
// User isolation: skip mails the user is not involved in.
if userEmailFilter != "" && !mailBelongsToUser(pm, userEmailFilter) {
continue
}
} else if userEmailFilter != "" {
// If mail can't be parsed, deny access to user role.
continue
}
}
if labelMap != nil {
@@ -233,8 +253,7 @@ func (s *Server) handleGetMail(w http.ResponseWriter, r *http.Request) {
// user and auditor: only own mails; domain_auditor: all tenant mails (no filter)
if sess.Role == userstore.RoleUser || sess.Role == userstore.RoleAuditor {
u, err := s.users.GetByUsername(sess.Username)
if err != nil || !mailBelongsToUser(pm, u.Email) {
if sess.Email == "" || !mailBelongsToUser(pm, sess.Email) {
writeError(w, http.StatusForbidden, "access denied")
return
}
@@ -399,16 +418,21 @@ func (s *Server) handleGetRaw(w http.ResponseWriter, r *http.Request) {
w.Write(raw)
}
// mailBelongsToUser checks if the user's email appears in To or CC.
// mailBelongsToUser checks if the user's email appears in From, To, or CC.
// Users can access mails they sent as well as mails they received.
// From may contain a display name ("Name <addr>"), so Contains is used.
func mailBelongsToUser(pm *mailparser.ParsedMail, userEmail string) bool {
email := strings.ToLower(userEmail)
if strings.Contains(strings.ToLower(pm.From), email) {
return true
}
for _, to := range pm.To {
if strings.ToLower(to) == email {
if strings.Contains(strings.ToLower(to), email) {
return true
}
}
for _, cc := range pm.CC {
if strings.ToLower(cc) == email {
if strings.Contains(strings.ToLower(cc), email) {
return true
}
}
+2 -2
View File
@@ -80,7 +80,7 @@ type Server struct {
ldapStore *ldapcfg.Store
tenantStore *tenantstore.Store
tenantLdapStore *ldapcfg.TenantStore
idxMgr *index.TenantIndexManager
idxMgr index.TenantIndexer
appVersion string
moduleVersions map[string]string
globalRetentionDays int // from storage config (PROJ-34)
@@ -109,7 +109,7 @@ func (s *Server) SetPop3(store *pop3store.Store, importer *pop3store.Importer) {
}
// SetIndexManager wires the per-tenant index manager into the API server (PROJ-21 Phase 4).
func (s *Server) SetIndexManager(mgr *index.TenantIndexManager) {
func (s *Server) SetIndexManager(mgr index.TenantIndexer) {
s.idxMgr = mgr
}
+4
View File
@@ -22,6 +22,7 @@ import (
type Session struct {
UserID int64
Username string
Email string
Role string
JTI string // unique JWT ID
TenantID *int64
@@ -193,6 +194,7 @@ func (m *Manager) issueToken(user *userstore.User) (string, *userstore.User, err
claims := jwt.MapClaims{
"sub": user.Username,
"email": user.Email,
"role": user.Role,
"uid": user.ID,
"jti": jti,
@@ -338,6 +340,7 @@ func (m *Manager) ValidateToken(tokenStr string) (*Session, error) {
}
username, _ := claims["sub"].(string)
email, _ := claims["email"].(string)
role, _ := claims["role"].(string)
var userID int64
@@ -364,6 +367,7 @@ func (m *Manager) ValidateToken(tokenStr string) (*Session, error) {
return &Session{
UserID: userID,
Username: username,
Email: email,
Role: role,
JTI: jti,
TenantID: tenantID,
+62 -16
View File
@@ -3,17 +3,35 @@ package imap
import (
"crypto/tls"
"fmt"
"net"
"strings"
"time"
imapv2 "github.com/emersion/go-imap/v2"
"github.com/emersion/go-imap/v2/imapclient"
)
// FolderInfo describes a single IMAP folder with exclusion metadata.
type FolderInfo struct {
Name string `json:"name"`
Excluded bool `json:"excluded"`
Reason string `json:"reason,omitempty"`
const (
dialTimeout = 30 * time.Second
fetchTimeout = 5 * time.Minute // per-batch read/write deadline
)
// Conn wraps an IMAP client with the underlying net.Conn so callers
// can set per-operation deadlines to prevent indefinite blocking.
type Conn struct {
*imapclient.Client
raw net.Conn
}
// SetFetchDeadline sets a 5-minute read/write deadline on the connection.
// Call this before each fetch batch to prevent stalled imports.
func (c *Conn) SetFetchDeadline() {
_ = c.raw.SetDeadline(time.Now().Add(fetchTimeout))
}
// ClearDeadline removes any active deadline from the underlying connection.
func (c *Conn) ClearDeadline() {
_ = c.raw.SetDeadline(time.Time{})
}
// junkTrashNames lists well-known junk/trash folder names for fallback detection.
@@ -22,33 +40,61 @@ var junkTrashNames = []string{
"deleted messages", "papierkorb", "gelöschte elemente",
}
// FolderInfo describes a single IMAP folder with exclusion metadata.
type FolderInfo struct {
Name string `json:"name"`
Excluded bool `json:"excluded"`
Reason string `json:"reason,omitempty"`
}
// Connect establishes an IMAP client connection using the specified TLS mode.
func Connect(host string, port int, tlsMode string) (*imapclient.Client, error) {
// Returns a Conn that exposes the underlying net.Conn for deadline management.
func Connect(host string, port int, tlsMode string) (*Conn, error) {
addr := fmt.Sprintf("%s:%d", host, port)
switch tlsMode {
case "ssl":
c, err := imapclient.DialTLS(addr, &imapclient.Options{
TLSConfig: &tls.Config{ServerName: host},
})
dialer := &tls.Dialer{
NetDialer: &net.Dialer{Timeout: dialTimeout},
Config: &tls.Config{ServerName: host},
}
raw, err := dialer.Dial("tcp", addr)
if err != nil {
return nil, fmt.Errorf("imap connect ssl: %w", err)
}
return c, nil
c, err := imapclient.New(raw, nil)
if err != nil {
raw.Close()
return nil, fmt.Errorf("imap client ssl: %w", err)
}
return &Conn{Client: c, raw: raw}, nil
case "starttls":
c, err := imapclient.DialStartTLS(addr, &imapclient.Options{
TLSConfig: &tls.Config{ServerName: host},
})
raw, err := net.DialTimeout("tcp", addr, dialTimeout)
if err != nil {
return nil, fmt.Errorf("imap connect starttls: %w", err)
}
return c, nil
c, err := imapclient.New(raw, &imapclient.Options{
TLSConfig: &tls.Config{ServerName: host},
})
if err != nil {
raw.Close()
return nil, fmt.Errorf("imap client starttls: %w", err)
}
return &Conn{Client: c, raw: raw}, nil
case "none":
c, err := imapclient.DialInsecure(addr, nil)
raw, err := net.DialTimeout("tcp", addr, dialTimeout)
if err != nil {
return nil, fmt.Errorf("imap connect plain: %w", err)
}
return c, nil
c, err := imapclient.New(raw, nil)
if err != nil {
raw.Close()
return nil, fmt.Errorf("imap client plain: %w", err)
}
return &Conn{Client: c, raw: raw}, nil
default:
return nil, fmt.Errorf("imap: unknown tls mode %q", tlsMode)
}
+11 -10
View File
@@ -23,7 +23,6 @@ type Importer struct {
mailStore *storage.Store
idx index.Indexer
logger *slog.Logger
TenantID *int64 // optional tenant assignment for stored mails
}
// NewImporter creates a new Importer wired to the storage and index backends.
@@ -88,7 +87,7 @@ func (imp *Importer) doImport(ctx context.Context, acc *Account, password string
}
// List all folders
folders, err := ListFolders(c)
folders, err := ListFolders(c.Client)
if err != nil {
return 0, fmt.Errorf("list folders: %w", err)
}
@@ -159,11 +158,13 @@ func (imp *Importer) doImport(ctx context.Context, acc *Account, password string
}
batch := uids[i:end]
count, err := imp.fetchBatch(ctx, c, batch, log)
// Set per-batch deadline to prevent indefinite blocking on stalled connections.
c.SetFetchDeadline()
count, err := imp.fetchBatch(ctx, c.Client, batch, acc.TenantID, log)
c.ClearDeadline()
if err != nil {
log.Error("batch fetch error", "folder", folder, "offset", i, "err", err)
// Continue with the next batch rather than aborting entirely
continue
log.Error("batch fetch error — aborting import", "folder", folder, "offset", i, "err", err)
return imported, fmt.Errorf("fetch batch %d in %q: %w", i, folder, err)
}
imported += count
@@ -177,7 +178,7 @@ func (imp *Importer) doImport(ctx context.Context, acc *Account, password string
}
// fetchBatch fetches and stores a batch of messages by UID.
func (imp *Importer) fetchBatch(ctx context.Context, c *imapclient.Client, uids []imapv2.UID, log *slog.Logger) (int, error) {
func (imp *Importer) fetchBatch(ctx context.Context, c *imapclient.Client, uids []imapv2.UID, tenantID *int64, log *slog.Logger) (int, error) {
if len(uids) == 0 {
return 0, nil
}
@@ -212,7 +213,7 @@ func (imp *Importer) fetchBatch(ctx context.Context, c *imapclient.Client, uids
continue
}
if err := imp.storeAndIndex(raw, log); err != nil {
if err := imp.storeAndIndex(raw, tenantID, log); err != nil {
log.Warn("failed to store/index message", "err", err)
continue
}
@@ -229,10 +230,10 @@ func (imp *Importer) fetchBatch(ctx context.Context, c *imapclient.Client, uids
}
// storeAndIndex saves a raw email to storage and indexes it.
func (imp *Importer) storeAndIndex(raw []byte, log *slog.Logger) error {
func (imp *Importer) storeAndIndex(raw []byte, tenantID *int64, log *slog.Logger) error {
ctx := context.Background()
// Save to file storage (deduplicates by SHA256 automatically)
id, err := imp.mailStore.Save(ctx, raw, time.Now(), imp.TenantID)
id, err := imp.mailStore.Save(ctx, raw, time.Now(), tenantID)
if err != nil {
return fmt.Errorf("save: %w", err)
}
+7 -4
View File
@@ -207,7 +207,7 @@ func (s *Scheduler) doSync(ctx context.Context, accountID int64) (int, uint32, e
return 0, 0, fmt.Errorf("imap scheduler: login: %w", err)
}
folders, err := ListFolders(c)
folders, err := ListFolders(c.Client)
if err != nil {
return 0, 0, fmt.Errorf("imap scheduler: list folders: %w", err)
}
@@ -247,7 +247,7 @@ func (s *Scheduler) doSync(ctx context.Context, accountID int64) (int, uint32, e
// syncFolder syncs new messages from a single IMAP folder.
func (s *Scheduler) syncFolder(
ctx context.Context,
c *imapclient.Client,
c *Conn,
acc *Account,
folder string,
log *slog.Logger,
@@ -298,7 +298,9 @@ func (s *Scheduler) syncFolder(
}
batch := uids[i:end]
count, batchMaxUID, err := s.fetchSyncBatch(c, batch, log)
c.SetFetchDeadline()
count, batchMaxUID, err := s.fetchSyncBatch(c.Client, batch, acc.TenantID, log)
c.ClearDeadline()
if err != nil {
log.Warn("imap scheduler: batch error, continuing",
"folder", folder, "offset", i, "err", err)
@@ -320,6 +322,7 @@ func (s *Scheduler) syncFolder(
func (s *Scheduler) fetchSyncBatch(
c *imapclient.Client,
uids []imapv2.UID,
tenantID *int64,
log *slog.Logger,
) (int, uint32, error) {
if len(uids) == 0 {
@@ -367,7 +370,7 @@ func (s *Scheduler) fetchSyncBatch(
}
if len(raw) > 0 {
if err := s.importer.storeAndIndex(raw, log); err != nil {
if err := s.importer.storeAndIndex(raw, tenantID, log); err != nil {
log.Warn("imap scheduler: store/index failed", "err", err)
} else {
imported++
+7 -3
View File
@@ -40,6 +40,9 @@ type Account struct {
SyncRunning bool `json:"sync_running"`
SyncStatus string `json:"sync_status"`
SyncErrorMsg string `json:"sync_error_msg"`
// Tenant assignment — mails imported from this account are tagged with this tenant.
TenantID *int64 `json:"tenant_id,omitempty"`
}
// Store manages IMAP account persistence in PostgreSQL.
@@ -71,7 +74,7 @@ CREATE TABLE IF NOT EXISTS imap_accounts (
CREATE INDEX IF NOT EXISTS idx_imap_accounts_owner ON imap_accounts (owner);
`
// migrationSQL adds the PROJ-8 sync columns if they do not yet exist.
// migrationSQL adds columns that may not exist in older installations.
const migrationSQL = `
ALTER TABLE imap_accounts ADD COLUMN IF NOT EXISTS sync_interval_min INTEGER NOT NULL DEFAULT 0;
ALTER TABLE imap_accounts ADD COLUMN IF NOT EXISTS last_sync_at TIMESTAMPTZ;
@@ -80,6 +83,7 @@ ALTER TABLE imap_accounts ADD COLUMN IF NOT EXISTS last_uid BIGINT NOT NULL DEFA
ALTER TABLE imap_accounts ADD COLUMN IF NOT EXISTS sync_running BOOLEAN NOT NULL DEFAULT FALSE;
ALTER TABLE imap_accounts ADD COLUMN IF NOT EXISTS sync_status TEXT NOT NULL DEFAULT '';
ALTER TABLE imap_accounts ADD COLUMN IF NOT EXISTS sync_error_msg TEXT NOT NULL DEFAULT '';
ALTER TABLE imap_accounts ADD COLUMN IF NOT EXISTS tenant_id INTEGER REFERENCES tenants(id);
`
// New creates a new Store, connects to PostgreSQL, and runs the migration.
@@ -138,7 +142,7 @@ const selectColumns = ` id, owner, name, host, port, tls, username, excluded_fol
status, error_msg, last_import_at, last_import_count,
progress_current, progress_total, created_at,
sync_interval_min, last_sync_at, last_sync_count, last_uid,
sync_running, sync_status, sync_error_msg `
sync_running, sync_status, sync_error_msg, tenant_id `
// scanner abstracts pgx.Row and pgx.Rows — both expose Scan(...any) error.
type scanner interface {
@@ -152,7 +156,7 @@ func scanRow(row scanner) (Account, error) {
&a.ExcludedFolders, &a.Status, &a.ErrorMsg, &a.LastImportAt,
&a.LastImportCount, &a.ProgressCurrent, &a.ProgressTotal, &a.CreatedAt,
&a.SyncIntervalMin, &a.LastSyncAt, &a.LastSyncCount, &a.LastUID,
&a.SyncRunning, &a.SyncStatus, &a.SyncErrorMsg,
&a.SyncRunning, &a.SyncStatus, &a.SyncErrorMsg, &a.TenantID,
)
return a, err
}
+8
View File
@@ -54,6 +54,14 @@ type Indexer interface {
Close() error
}
// TenantIndexer manages per-tenant Indexer instances.
// Implemented by TenantIndexManager (Xapian) and ManticoreTenantManager.
type TenantIndexer interface {
ForTenant(tenantID *int64) Indexer
Global() Indexer
Close() error
}
// New creates an Indexer for the specified backend.
func New(dir string, batchSize int, backend string) (Indexer, error) {
switch backend {
+327
View File
@@ -0,0 +1,327 @@
package index
import (
"database/sql"
"fmt"
"hash/fnv"
"strings"
"sync"
"time"
_ "github.com/go-sql-driver/mysql"
)
// manticoreIndex implements Indexer against a single Manticore RT table.
type manticoreIndex struct {
db *sql.DB
table string
}
// ManticoreTenantManager implements TenantIndexer using Manticore Search
// via the MySQL protocol. No CGO required — pure Go via database/sql.
type ManticoreTenantManager struct {
db *sql.DB
mu sync.RWMutex
pool map[int64]*manticoreIndex
global *manticoreIndex
}
// NewManticoreTenantManager opens a Manticore connection, ensures the global
// RT table exists, and returns a ready manager.
func NewManticoreTenantManager(dsn string) (*ManticoreTenantManager, error) {
db, err := sql.Open("mysql", dsn)
if err != nil {
return nil, fmt.Errorf("manticore: open: %w", err)
}
db.SetMaxOpenConns(16)
db.SetMaxIdleConns(4)
db.SetConnMaxLifetime(5 * time.Minute)
if err := db.Ping(); err != nil {
db.Close()
return nil, fmt.Errorf("manticore: ping: %w", err)
}
m := &ManticoreTenantManager{
db: db,
pool: make(map[int64]*manticoreIndex),
}
globalIdx := &manticoreIndex{db: db, table: "emails_global"}
if err := globalIdx.ensureTable(); err != nil {
db.Close()
return nil, fmt.Errorf("manticore: ensure global table: %w", err)
}
m.global = globalIdx
return m, nil
}
// ForTenant returns the Indexer for the given tenant, creating the RT table on first use.
// A nil or zero tenantID falls back to the global index.
func (m *ManticoreTenantManager) ForTenant(tenantID *int64) Indexer {
if tenantID == nil || *tenantID == 0 {
return m.global
}
tid := *tenantID
m.mu.RLock()
idx, ok := m.pool[tid]
m.mu.RUnlock()
if ok {
return idx
}
m.mu.Lock()
defer m.mu.Unlock()
// Double-check after acquiring write lock.
if idx, ok = m.pool[tid]; ok {
return idx
}
idx = &manticoreIndex{db: m.db, table: manticoreTableName(&tid)}
if err := idx.ensureTable(); err != nil {
// Return global as safe fallback; error is logged via caller.
return m.global
}
m.pool[tid] = idx
return idx
}
// Global returns the global (non-tenant) Indexer.
func (m *ManticoreTenantManager) Global() Indexer {
return m.global
}
// Close closes the shared database connection.
func (m *ManticoreTenantManager) Close() error {
return m.db.Close()
}
// ── manticoreIndex methods ────────────────────────────────────────────────
// ensureTable creates the RT index if it does not yet exist.
func (idx *manticoreIndex) ensureTable() error {
stmt := fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s (
mail_id string,
subject text,
from_addr text,
to_addr text,
body text,
attachment_names text,
has_attachment uint,
date_ts bigint,
size_bytes bigint
) type='rt' morphology='stem_en,stem_de'`, idx.table)
_, err := idx.db.Exec(stmt)
if err != nil {
return fmt.Errorf("ensureTable %s: %w", idx.table, err)
}
return nil
}
// IndexSync inserts or replaces a document in the RT index.
func (idx *manticoreIndex) IndexSync(doc MailDocument) error {
rowID := hashMailID(doc.ID)
hasAttach := uint64(0)
if doc.HasAttachment {
hasAttach = 1
}
var dateTS int64
if !doc.Date.IsZero() {
dateTS = doc.Date.Unix()
}
_, err := idx.db.Exec(
fmt.Sprintf(`REPLACE INTO %s
(id, mail_id, subject, from_addr, to_addr, body, attachment_names, has_attachment, date_ts, size_bytes)
VALUES (?,?,?,?,?,?,?,?,?,?)`, idx.table),
rowID,
doc.ID,
doc.Subject,
doc.From,
doc.To,
doc.Body,
doc.AttachNames,
hasAttach,
dateTS,
doc.Size,
)
if err != nil {
return fmt.Errorf("manticore IndexSync %s: %w", idx.table, err)
}
return nil
}
// Delete removes a document by mail ID hash.
func (idx *manticoreIndex) Delete(id string) error {
rowID := hashMailID(id)
_, err := idx.db.Exec(
fmt.Sprintf("DELETE FROM %s WHERE id = ?", idx.table),
rowID,
)
if err != nil {
return fmt.Errorf("manticore Delete %s: %w", idx.table, err)
}
return nil
}
// Search executes a full-text + filter query against the RT index.
func (idx *manticoreIndex) Search(req SearchRequest) (*SearchResult, error) {
var matchParts []string
if req.Query != "" {
matchParts = append(matchParts, escapeManticoreMatch(req.Query))
}
if req.From != "" {
matchParts = append(matchParts, fmt.Sprintf("@from_addr %s", escapeManticoreMatch(req.From)))
}
if req.To != "" {
matchParts = append(matchParts, fmt.Sprintf("@to_addr %s", escapeManticoreMatch(req.To)))
}
if req.OwnEmail != "" {
matchParts = append(matchParts, fmt.Sprintf("@(from_addr,to_addr) %s", escapeManticoreMatch(req.OwnEmail)))
}
hasMatch := len(matchParts) > 0
var whereParts []string
var args []interface{}
if hasMatch {
whereParts = append(whereParts, "MATCH(?)")
args = append(args, strings.Join(matchParts, " "))
}
if req.DateFrom != nil {
whereParts = append(whereParts, "date_ts >= ?")
args = append(args, req.DateFrom.Unix())
}
if req.DateTo != nil {
whereParts = append(whereParts, "date_ts <= ?")
args = append(args, req.DateTo.Unix())
}
if req.HasAttachment != nil {
if *req.HasAttachment {
whereParts = append(whereParts, "has_attachment = 1")
} else {
whereParts = append(whereParts, "has_attachment = 0")
}
}
whereClause := ""
if len(whereParts) > 0 {
whereClause = "WHERE " + strings.Join(whereParts, " AND ")
}
// COUNT query for total.
countArgs := make([]interface{}, len(args))
copy(countArgs, args)
countSQL := fmt.Sprintf(
"SELECT COUNT(*) FROM %s %s OPTION max_matches=1000000",
idx.table, whereClause,
)
var total int
if err := idx.db.QueryRow(countSQL, countArgs...).Scan(&total); err != nil {
return nil, fmt.Errorf("manticore Search count %s: %w", idx.table, err)
}
pageSize := req.PageSize
if pageSize <= 0 {
pageSize = 20
}
page := req.Page
if page <= 0 {
page = 1
}
offset := (page - 1) * pageSize
// Score expression and ORDER BY.
scoreExpr := "0 as score"
orderBy := "date_ts DESC"
if hasMatch {
scoreExpr = "WEIGHT() as score"
switch req.Sort {
case "relevance":
orderBy = "WEIGHT() DESC, date_ts DESC"
case "date_asc":
orderBy = "date_ts ASC"
default:
orderBy = "date_ts DESC"
}
} else {
switch req.Sort {
case "date_asc":
orderBy = "date_ts ASC"
default:
orderBy = "date_ts DESC"
}
}
selectSQL := fmt.Sprintf(
"SELECT mail_id, %s FROM %s %s ORDER BY %s LIMIT ? OFFSET ? OPTION max_matches=10000",
scoreExpr, idx.table, whereClause, orderBy,
)
selectArgs := make([]interface{}, len(args))
copy(selectArgs, args)
selectArgs = append(selectArgs, pageSize, offset)
rows, err := idx.db.Query(selectSQL, selectArgs...)
if err != nil {
return nil, fmt.Errorf("manticore Search select %s: %w", idx.table, err)
}
defer rows.Close()
var hits []Hit
for rows.Next() {
var mailID string
var score float64
if err := rows.Scan(&mailID, &score); err != nil {
return nil, fmt.Errorf("manticore Search scan: %w", err)
}
hits = append(hits, Hit{ID: mailID, Score: score})
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("manticore Search rows: %w", err)
}
return &SearchResult{Total: total, Hits: hits}, nil
}
// Close is a no-op for individual indexes — the shared DB connection is managed
// by ManticoreTenantManager.
func (idx *manticoreIndex) Close() error {
return nil
}
// ── helpers ────────────────────────────────────────────────────────────────
// hashMailID returns a stable uint64 row ID derived from the mail's SHA-256 string ID.
func hashMailID(id string) uint64 {
h := fnv.New64a()
h.Write([]byte(id))
return h.Sum64()
}
// manticoreTableName returns the RT table name for a given tenant.
// nil / 0 → emails_global, otherwise emails_tenant_<id>.
func manticoreTableName(tenantID *int64) string {
if tenantID == nil || *tenantID == 0 {
return "emails_global"
}
return fmt.Sprintf("emails_tenant_%d", *tenantID)
}
// escapeManticoreMatch escapes characters that have special meaning in
// Manticore MATCH() expressions to prevent query injection.
func escapeManticoreMatch(s string) string {
specials := `\()|!@~"/^$=<`
var b strings.Builder
b.Grow(len(s))
for _, c := range s {
if strings.ContainsRune(specials, c) {
b.WriteRune('\\')
}
b.WriteRune(c)
}
return b.String()
}
+3 -3
View File
@@ -6,9 +6,9 @@ import (
)
// TenantIndexWorker processes MailDocument indexing requests asynchronously,
// routing each document to the correct per-tenant Xapian index via TenantIndexManager.
// routing each document to the correct per-tenant index via TenantIndexer.
type TenantIndexWorker struct {
mgr *TenantIndexManager
mgr TenantIndexer
queue chan MailDocument
done chan struct{}
wg sync.WaitGroup
@@ -16,7 +16,7 @@ type TenantIndexWorker struct {
}
// NewTenantWorker creates a new TenantIndexWorker with the given queue capacity.
func NewTenantWorker(mgr *TenantIndexManager, queueSize int, logger *slog.Logger) *TenantIndexWorker {
func NewTenantWorker(mgr TenantIndexer, queueSize int, logger *slog.Logger) *TenantIndexWorker {
if queueSize <= 0 {
queueSize = 1000
}