Files
sysops a93a843506 feat(PROJ-30): Xapian → Manticore Search Migration
- internal/index/manticore.go: ManticoreTenantManager + manticoreIndex (RT-Indizes, CGO-frei)
- internal/index/index.go: TenantIndexer Interface (Xapian + Manticore)
- internal/index/tenant_worker.go: mgr-Typ auf TenantIndexer Interface
- internal/api/server.go: idxMgr auf TenantIndexer Interface
- config/config.go: IndexConfig.ManticoreDSN Feld
- cmd/archivmail/cmd_reindex.go: reindex Subkommando
- cmd/archivmail/main.go: Manticore-Branch + reindex Case
- go.mod: github.com/go-sql-driver/mysql v1.8.1
- update.sh: Manticore auto-install, CGO_ENABLED=0, config.yml migration, auto-reindex

fix(IMAP): TCP-Deadline-Wrapper für steckengebliebene Imports
fix(auth): Email-Claim in JWT für User-Isolation
fix(search): User-Isolation via sess.Email (fail-safe)
fix(ui): Admin-Login Auth-Cache, Logout-Redirect, IMAP-Polling-Resilienz

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-03 21:19:36 +02:00

91 lines
2.1 KiB
Go

package index
import (
"log/slog"
"sync"
)
// TenantIndexWorker processes MailDocument indexing requests asynchronously,
// routing each document to the correct per-tenant index via TenantIndexer.
type TenantIndexWorker struct {
mgr TenantIndexer
queue chan MailDocument
done chan struct{}
wg sync.WaitGroup
logger *slog.Logger
}
// NewTenantWorker creates a new TenantIndexWorker with the given queue capacity.
func NewTenantWorker(mgr TenantIndexer, queueSize int, logger *slog.Logger) *TenantIndexWorker {
if queueSize <= 0 {
queueSize = 1000
}
return &TenantIndexWorker{
mgr: mgr,
queue: make(chan MailDocument, queueSize),
done: make(chan struct{}),
logger: logger,
}
}
// Submit enqueues a document for background indexing. If the queue is full,
// the document is dropped and a warning is logged.
func (w *TenantIndexWorker) Submit(doc MailDocument) {
select {
case w.queue <- doc:
// queued
default:
w.logger.Warn("tenant index worker: queue full, dropping document", "id", doc.ID)
}
}
// Start launches the background goroutine that processes the queue.
func (w *TenantIndexWorker) Start() {
w.wg.Add(1)
go func() {
defer w.wg.Done()
w.logger.Info("tenant index worker: started", "queue_size", cap(w.queue))
for {
select {
case doc, ok := <-w.queue:
if !ok {
return
}
w.indexDoc(doc)
case <-w.done:
// Drain remaining items in the queue before exiting.
for {
select {
case doc, ok := <-w.queue:
if !ok {
return
}
w.indexDoc(doc)
default:
return
}
}
}
}
}()
}
// Stop signals the worker to drain remaining items and stop.
func (w *TenantIndexWorker) Stop() {
close(w.done)
w.wg.Wait()
w.logger.Info("tenant index worker: stopped")
}
// QueueLen returns the current number of items waiting in the queue.
func (w *TenantIndexWorker) QueueLen() int {
return len(w.queue)
}
func (w *TenantIndexWorker) indexDoc(doc MailDocument) {
idx := w.mgr.ForTenant(doc.TenantID)
if err := idx.IndexSync(doc); err != nil {
w.logger.Error("tenant index worker: index failed", "id", doc.ID, "tenant_id", doc.TenantID, "err", err)
}
}