Files
archivmail/internal/index/worker.go
T
sysops 7e68c7ab02 feat(PROJ-5): AES-256-GCM Verschlüsselung, PostgreSQL Metadaten, Async Index Worker
- Storage: AES-256-GCM Verschlüsselung (keyfile, graceful fallback bei fehlendem Key)
- Storage: PostgreSQL emails-Tabelle mit Auto-Migration
- Storage: Save/Delete/Stats/FirstAndLastMail nutzen DB wenn verfügbar
- Index: Async IndexWorker (Go-Channel, Queue 1000, non-blocking Submit)
- SMTP: IndexCallback für async Indexierung nach Mail-Eingang
- main: Backfill beim Start (40 Mails migriert + indexiert)
- Bestehende Mails werden transparent entschlüsselt (Fallback auf Raw)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-14 20:26:50 +01:00

91 lines
2.2 KiB
Go

package index
import (
"log/slog"
"sync"
)
// IndexWorker processes MailDocument indexing requests asynchronously via a
// buffered channel. It serialises writes to the underlying Indexer (important
// for Xapian which only allows one writer at a time).
type IndexWorker struct {
idx Indexer
queue chan MailDocument
done chan struct{}
wg sync.WaitGroup
logger *slog.Logger
}
// NewWorker creates a new IndexWorker with the given queue capacity.
func NewWorker(idx Indexer, queueSize int, logger *slog.Logger) *IndexWorker {
if queueSize <= 0 {
queueSize = 1000
}
return &IndexWorker{
idx: idx,
queue: make(chan MailDocument, queueSize),
done: make(chan struct{}),
logger: logger,
}
}
// Submit enqueues a document for background indexing. If the queue is full,
// the document is dropped and a warning is logged.
func (w *IndexWorker) Submit(doc MailDocument) {
select {
case w.queue <- doc:
// queued
default:
w.logger.Warn("index worker: queue full, dropping document", "id", doc.ID)
}
}
// Start launches the background goroutine that processes the queue.
func (w *IndexWorker) Start() {
w.wg.Add(1)
go func() {
defer w.wg.Done()
w.logger.Info("index worker: started", "queue_size", cap(w.queue))
for {
select {
case doc, ok := <-w.queue:
if !ok {
// Channel closed, drain complete
return
}
if err := w.idx.IndexSync(doc); err != nil {
w.logger.Error("index worker: index failed", "id", doc.ID, "err", err)
}
case <-w.done:
// Drain remaining items in the queue before exiting
for {
select {
case doc, ok := <-w.queue:
if !ok {
return
}
if err := w.idx.IndexSync(doc); err != nil {
w.logger.Error("index worker: index failed (drain)", "id", doc.ID, "err", err)
}
default:
return
}
}
}
}
}()
}
// Stop signals the worker to drain remaining items and stop. It blocks until
// the worker goroutine has exited.
func (w *IndexWorker) Stop() {
close(w.done)
w.wg.Wait()
w.logger.Info("index worker: stopped")
}
// QueueLen returns the current number of items waiting in the queue.
func (w *IndexWorker) QueueLen() int {
return len(w.queue)
}