78d83d3e98
PROJ-21 Phase 4:
- internal/index/tenant_manager.go: TenantIndexManager mit lazy-loading Pool
- internal/index/tenant_worker.go: TenantIndexWorker leitet Submit an richtigen Index
- Jeder Mandant bekommt eigenes Xapian-Verzeichnis (tenant-<id>/)
- handleSearch nutzt direkt Tenant-Index statt nachgelagertem Post-Filter
- runBackfill re-indexiert pro Mandant beim Start
PROJ-23 / PROJ-16 Phase B:
- internal/ldapconfig/tenant_store.go: TenantStore mit AES-256-GCM für tenant_ldap
- internal/api/ldap_tenants.go: 8 neue Handler (GET/PUT/DELETE/test für
/api/tenant/ldap und /api/admin/tenants/{id}/ldap)
- internal/auth/auth.go: Login-Fallback prüft tenant_ldap nach globalem LDAP
(Domain-Extraktion → tenant_ldap config → UpsertLDAPUser mit tenant_id)
- internal/api/server.go: SetTenantLDAP(), neue Routen registriert
- internal/tenantstore/store.go: GetByDomain() Interface für auth-Package
- cmd/archivmail/main.go: TenantLDAPStore + TenantIndexManager verdrahtet
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
91 lines
2.2 KiB
Go
91 lines
2.2 KiB
Go
package index
|
|
|
|
import (
|
|
"log/slog"
|
|
"sync"
|
|
)
|
|
|
|
// TenantIndexWorker processes MailDocument indexing requests asynchronously,
|
|
// routing each document to the correct per-tenant Xapian index via TenantIndexManager.
|
|
type TenantIndexWorker struct {
|
|
mgr *TenantIndexManager
|
|
queue chan MailDocument
|
|
done chan struct{}
|
|
wg sync.WaitGroup
|
|
logger *slog.Logger
|
|
}
|
|
|
|
// NewTenantWorker creates a new TenantIndexWorker with the given queue capacity.
|
|
func NewTenantWorker(mgr *TenantIndexManager, queueSize int, logger *slog.Logger) *TenantIndexWorker {
|
|
if queueSize <= 0 {
|
|
queueSize = 1000
|
|
}
|
|
return &TenantIndexWorker{
|
|
mgr: mgr,
|
|
queue: make(chan MailDocument, queueSize),
|
|
done: make(chan struct{}),
|
|
logger: logger,
|
|
}
|
|
}
|
|
|
|
// Submit enqueues a document for background indexing. If the queue is full,
|
|
// the document is dropped and a warning is logged.
|
|
func (w *TenantIndexWorker) Submit(doc MailDocument) {
|
|
select {
|
|
case w.queue <- doc:
|
|
// queued
|
|
default:
|
|
w.logger.Warn("tenant index worker: queue full, dropping document", "id", doc.ID)
|
|
}
|
|
}
|
|
|
|
// Start launches the background goroutine that processes the queue.
|
|
func (w *TenantIndexWorker) Start() {
|
|
w.wg.Add(1)
|
|
go func() {
|
|
defer w.wg.Done()
|
|
w.logger.Info("tenant index worker: started", "queue_size", cap(w.queue))
|
|
for {
|
|
select {
|
|
case doc, ok := <-w.queue:
|
|
if !ok {
|
|
return
|
|
}
|
|
w.indexDoc(doc)
|
|
case <-w.done:
|
|
// Drain remaining items in the queue before exiting.
|
|
for {
|
|
select {
|
|
case doc, ok := <-w.queue:
|
|
if !ok {
|
|
return
|
|
}
|
|
w.indexDoc(doc)
|
|
default:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
// Stop signals the worker to drain remaining items and stop.
|
|
func (w *TenantIndexWorker) Stop() {
|
|
close(w.done)
|
|
w.wg.Wait()
|
|
w.logger.Info("tenant index worker: stopped")
|
|
}
|
|
|
|
// QueueLen returns the current number of items waiting in the queue.
|
|
func (w *TenantIndexWorker) QueueLen() int {
|
|
return len(w.queue)
|
|
}
|
|
|
|
func (w *TenantIndexWorker) indexDoc(doc MailDocument) {
|
|
idx := w.mgr.ForTenant(doc.TenantID)
|
|
if err := idx.IndexSync(doc); err != nil {
|
|
w.logger.Error("tenant index worker: index failed", "id", doc.ID, "tenant_id", doc.TenantID, "err", err)
|
|
}
|
|
}
|