a93a843506
- internal/index/manticore.go: ManticoreTenantManager + manticoreIndex (RT-Indizes, CGO-frei) - internal/index/index.go: TenantIndexer Interface (Xapian + Manticore) - internal/index/tenant_worker.go: mgr-Typ auf TenantIndexer Interface - internal/api/server.go: idxMgr auf TenantIndexer Interface - config/config.go: IndexConfig.ManticoreDSN Feld - cmd/archivmail/cmd_reindex.go: reindex Subkommando - cmd/archivmail/main.go: Manticore-Branch + reindex Case - go.mod: github.com/go-sql-driver/mysql v1.8.1 - update.sh: Manticore auto-install, CGO_ENABLED=0, config.yml migration, auto-reindex fix(IMAP): TCP-Deadline-Wrapper für steckengebliebene Imports fix(auth): Email-Claim in JWT für User-Isolation fix(search): User-Isolation via sess.Email (fail-safe) fix(ui): Admin-Login Auth-Cache, Logout-Redirect, IMAP-Polling-Resilienz Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
91 lines
2.1 KiB
Go
91 lines
2.1 KiB
Go
package index
|
|
|
|
import (
|
|
"log/slog"
|
|
"sync"
|
|
)
|
|
|
|
// TenantIndexWorker processes MailDocument indexing requests asynchronously,
|
|
// routing each document to the correct per-tenant index via TenantIndexer.
|
|
type TenantIndexWorker struct {
|
|
mgr TenantIndexer
|
|
queue chan MailDocument
|
|
done chan struct{}
|
|
wg sync.WaitGroup
|
|
logger *slog.Logger
|
|
}
|
|
|
|
// NewTenantWorker creates a new TenantIndexWorker with the given queue capacity.
|
|
func NewTenantWorker(mgr TenantIndexer, queueSize int, logger *slog.Logger) *TenantIndexWorker {
|
|
if queueSize <= 0 {
|
|
queueSize = 1000
|
|
}
|
|
return &TenantIndexWorker{
|
|
mgr: mgr,
|
|
queue: make(chan MailDocument, queueSize),
|
|
done: make(chan struct{}),
|
|
logger: logger,
|
|
}
|
|
}
|
|
|
|
// Submit enqueues a document for background indexing. If the queue is full,
|
|
// the document is dropped and a warning is logged.
|
|
func (w *TenantIndexWorker) Submit(doc MailDocument) {
|
|
select {
|
|
case w.queue <- doc:
|
|
// queued
|
|
default:
|
|
w.logger.Warn("tenant index worker: queue full, dropping document", "id", doc.ID)
|
|
}
|
|
}
|
|
|
|
// Start launches the background goroutine that processes the queue.
|
|
func (w *TenantIndexWorker) Start() {
|
|
w.wg.Add(1)
|
|
go func() {
|
|
defer w.wg.Done()
|
|
w.logger.Info("tenant index worker: started", "queue_size", cap(w.queue))
|
|
for {
|
|
select {
|
|
case doc, ok := <-w.queue:
|
|
if !ok {
|
|
return
|
|
}
|
|
w.indexDoc(doc)
|
|
case <-w.done:
|
|
// Drain remaining items in the queue before exiting.
|
|
for {
|
|
select {
|
|
case doc, ok := <-w.queue:
|
|
if !ok {
|
|
return
|
|
}
|
|
w.indexDoc(doc)
|
|
default:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
// Stop signals the worker to drain remaining items and stop.
|
|
func (w *TenantIndexWorker) Stop() {
|
|
close(w.done)
|
|
w.wg.Wait()
|
|
w.logger.Info("tenant index worker: stopped")
|
|
}
|
|
|
|
// QueueLen returns the current number of items waiting in the queue.
|
|
func (w *TenantIndexWorker) QueueLen() int {
|
|
return len(w.queue)
|
|
}
|
|
|
|
func (w *TenantIndexWorker) indexDoc(doc MailDocument) {
|
|
idx := w.mgr.ForTenant(doc.TenantID)
|
|
if err := idx.IndexSync(doc); err != nil {
|
|
w.logger.Error("tenant index worker: index failed", "id", doc.ID, "tenant_id", doc.TenantID, "err", err)
|
|
}
|
|
}
|