package index import ( "log/slog" "sync" ) // TenantIndexWorker processes MailDocument indexing requests asynchronously, // routing each document to the correct per-tenant index via TenantIndexer. type TenantIndexWorker struct { mgr TenantIndexer queue chan MailDocument done chan struct{} wg sync.WaitGroup logger *slog.Logger } // NewTenantWorker creates a new TenantIndexWorker with the given queue capacity. func NewTenantWorker(mgr TenantIndexer, queueSize int, logger *slog.Logger) *TenantIndexWorker { if queueSize <= 0 { queueSize = 1000 } return &TenantIndexWorker{ mgr: mgr, queue: make(chan MailDocument, queueSize), done: make(chan struct{}), logger: logger, } } // Submit enqueues a document for background indexing. If the queue is full, // the document is dropped and a warning is logged. func (w *TenantIndexWorker) Submit(doc MailDocument) { select { case w.queue <- doc: // queued default: w.logger.Warn("tenant index worker: queue full, dropping document", "id", doc.ID) } } // Start launches the background goroutine that processes the queue. func (w *TenantIndexWorker) Start() { w.wg.Add(1) go func() { defer w.wg.Done() w.logger.Info("tenant index worker: started", "queue_size", cap(w.queue)) for { select { case doc, ok := <-w.queue: if !ok { return } w.indexDoc(doc) case <-w.done: // Drain remaining items in the queue before exiting. for { select { case doc, ok := <-w.queue: if !ok { return } w.indexDoc(doc) default: return } } } } }() } // Stop signals the worker to drain remaining items and stop. func (w *TenantIndexWorker) Stop() { close(w.done) w.wg.Wait() w.logger.Info("tenant index worker: stopped") } // QueueLen returns the current number of items waiting in the queue. func (w *TenantIndexWorker) QueueLen() int { return len(w.queue) } func (w *TenantIndexWorker) indexDoc(doc MailDocument) { idx := w.mgr.ForTenant(doc.TenantID) if err := idx.IndexSync(doc); err != nil { w.logger.Error("tenant index worker: index failed", "id", doc.ID, "tenant_id", doc.TenantID, "err", err) } }