feat(PROJ-21/23): Pro-Tenant Xapian-Index + Tenant-LDAP Backend
PROJ-21 Phase 4:
- internal/index/tenant_manager.go: TenantIndexManager mit lazy-loading Pool
- internal/index/tenant_worker.go: TenantIndexWorker leitet Submit an richtigen Index
- Jeder Mandant bekommt eigenes Xapian-Verzeichnis (tenant-<id>/)
- handleSearch nutzt direkt Tenant-Index statt nachgelagertem Post-Filter
- runBackfill re-indexiert pro Mandant beim Start
PROJ-23 / PROJ-16 Phase B:
- internal/ldapconfig/tenant_store.go: TenantStore mit AES-256-GCM für tenant_ldap
- internal/api/ldap_tenants.go: 8 neue Handler (GET/PUT/DELETE/test für
/api/tenant/ldap und /api/admin/tenants/{id}/ldap)
- internal/auth/auth.go: Login-Fallback prüft tenant_ldap nach globalem LDAP
(Domain-Extraktion → tenant_ldap config → UpsertLDAPUser mit tenant_id)
- internal/api/server.go: SetTenantLDAP(), neue Routen registriert
- internal/tenantstore/store.go: GetByDomain() Interface für auth-Package
- cmd/archivmail/main.go: TenantLDAPStore + TenantIndexManager verdrahtet
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
+96
-14
@@ -104,7 +104,7 @@ func main() {
|
||||
}
|
||||
defer mailStore.Close()
|
||||
|
||||
// Index
|
||||
// Index — per-tenant index manager (PROJ-21 Phase 4)
|
||||
indexBackend := cfg.Index.Backend
|
||||
if indexBackend == "" {
|
||||
indexBackend = "xapian"
|
||||
@@ -113,21 +113,24 @@ func main() {
|
||||
if batchSize <= 0 {
|
||||
batchSize = 100
|
||||
}
|
||||
idx, err := index.New(cfg.Index.Path, batchSize, indexBackend)
|
||||
idxMgr, err := index.NewTenantIndexManager(cfg.Index.Path, batchSize, indexBackend)
|
||||
if err != nil {
|
||||
logger.Error("index init failed", "err", err)
|
||||
logger.Error("index manager init failed", "err", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
defer idx.Close()
|
||||
defer idxMgr.Close()
|
||||
|
||||
// Async index worker
|
||||
// Global index reference for backward compatibility (IMAP importer, etc.)
|
||||
idx := idxMgr.Global()
|
||||
|
||||
// Async index worker — tenant-aware (routes docs to correct per-tenant index)
|
||||
asyncQueueSize := cfg.Index.AsyncQueueSize
|
||||
if asyncQueueSize <= 0 {
|
||||
asyncQueueSize = 1000
|
||||
}
|
||||
worker := index.NewWorker(idx, asyncQueueSize, logger)
|
||||
worker.Start()
|
||||
defer worker.Stop()
|
||||
tenantWorker := index.NewTenantWorker(idxMgr, asyncQueueSize, logger)
|
||||
tenantWorker.Start()
|
||||
defer tenantWorker.Stop()
|
||||
|
||||
// User store
|
||||
users, err := userstore.New(cfg.Database.DSN())
|
||||
@@ -186,6 +189,7 @@ func main() {
|
||||
}
|
||||
defer tenantSt.Close()
|
||||
srv.SetTenants(tenantSt)
|
||||
srv.SetIndexManager(idxMgr)
|
||||
|
||||
// Start SMTP daemon with index worker integration
|
||||
if cfg.SMTP.Bind == "" {
|
||||
@@ -193,7 +197,9 @@ func main() {
|
||||
}
|
||||
smtpDaemon := smtpd.New(cfg.SMTP, mailStore, logger)
|
||||
smtpDaemon.SetIndexCallback(func(raw []byte, id string) {
|
||||
submitToWorker(worker, mailStore, raw, id, logger)
|
||||
// Look up the tenant_id for this email from DB metadata.
|
||||
tenantID, _ := mailStore.GetTenantForMail(context.Background(), id)
|
||||
submitToWorker(tenantWorker, mailStore, raw, id, tenantID, logger)
|
||||
})
|
||||
// Wire tenant routing into SMTP daemon
|
||||
if cfg.SMTP.TenantRouting == "domain" {
|
||||
@@ -220,6 +226,16 @@ func main() {
|
||||
// Wire LDAP config store into API server
|
||||
srv.SetLDAP(ldapSt)
|
||||
|
||||
// PROJ-23: Per-tenant LDAP config store
|
||||
tenantLdapSt, err := ldapcfg.NewTenantStore(cfg.Database.DSN(), aesKey)
|
||||
if err != nil {
|
||||
logger.Error("tenant ldap store init failed", "err", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
defer tenantLdapSt.Close()
|
||||
srv.SetTenantLDAP(tenantLdapSt)
|
||||
authMgr.SetTenantLDAP(tenantLdapSt, tenantSt)
|
||||
|
||||
// Wire SMTP daemon into API server for status endpoint
|
||||
srv.SetSMTPDaemon(smtpDaemon)
|
||||
|
||||
@@ -247,7 +263,7 @@ func main() {
|
||||
srv.SetPop3(pop3St, pop3Imp)
|
||||
|
||||
// Backfill in background: migrate existing files into DB metadata + re-index
|
||||
go runBackfill(context.Background(), mailStore, idx, worker, logger)
|
||||
go runBackfill(context.Background(), mailStore, idx, tenantWorker, logger)
|
||||
|
||||
// Background integrity verification — runs every 5 minutes
|
||||
go runIntegrityCheck(context.Background(), mailStore, logger)
|
||||
@@ -272,7 +288,8 @@ func main() {
|
||||
}
|
||||
|
||||
// submitToWorker parses a raw email and submits it to the async index worker.
|
||||
func submitToWorker(worker *index.IndexWorker, store *storage.Store, raw []byte, id string, logger *slog.Logger) {
|
||||
// tenantID may be nil for global context.
|
||||
func submitToWorker(worker *index.TenantIndexWorker, store *storage.Store, raw []byte, id string, tenantID *int64, logger *slog.Logger) {
|
||||
pm, err := mailparser.Parse(raw)
|
||||
if err != nil {
|
||||
logger.Warn("index: parse failed, skipping indexing", "id", id, "err", err)
|
||||
@@ -296,6 +313,7 @@ func submitToWorker(worker *index.IndexWorker, store *storage.Store, raw []byte,
|
||||
HasAttachment: len(pm.Attachments) > 0,
|
||||
Date: pm.Date,
|
||||
Size: int64(len(raw)),
|
||||
TenantID: tenantID,
|
||||
}
|
||||
|
||||
worker.Submit(doc)
|
||||
@@ -307,8 +325,9 @@ func submitToWorker(worker *index.IndexWorker, store *storage.Store, raw []byte,
|
||||
}
|
||||
|
||||
// runBackfill walks the store, inserts missing DB metadata, and indexes
|
||||
// emails that have not yet been indexed.
|
||||
func runBackfill(ctx context.Context, store *storage.Store, idx index.Indexer, worker *index.IndexWorker, logger *slog.Logger) {
|
||||
// emails that have not yet been indexed. Per-tenant indexing is handled by
|
||||
// looking up each email's tenant_id from the DB.
|
||||
func runBackfill(ctx context.Context, store *storage.Store, idx index.Indexer, worker *index.TenantIndexWorker, logger *slog.Logger) {
|
||||
logger.Info("backfill: starting")
|
||||
|
||||
count := 0
|
||||
@@ -345,7 +364,8 @@ func runBackfill(ctx context.Context, store *storage.Store, idx index.Indexer, w
|
||||
|
||||
if !alreadyIndexed {
|
||||
needIndex++
|
||||
submitToWorker(worker, store, raw, id, logger)
|
||||
tenantID, _ := store.GetTenantForMail(ctx, id)
|
||||
submitToWorker(worker, store, raw, id, tenantID, logger)
|
||||
}
|
||||
|
||||
if count%100 == 0 {
|
||||
@@ -363,6 +383,68 @@ func runBackfill(ctx context.Context, store *storage.Store, idx index.Indexer, w
|
||||
logger.Info("backfill: complete", "total", count, "submitted_for_index", needIndex, "errors", errCount)
|
||||
}
|
||||
|
||||
// reindexTenant re-indexes all emails belonging to a specific tenant.
|
||||
// Used during migration when switching from global index to per-tenant indexes.
|
||||
func reindexTenant(ctx context.Context, store *storage.Store, mgr *index.TenantIndexManager, tenantID int64, logger *slog.Logger) error {
|
||||
tid := tenantID
|
||||
ids, err := store.GetAllIDsByTenant(ctx, &tid)
|
||||
if err != nil {
|
||||
return fmt.Errorf("reindex tenant %d: get IDs: %w", tenantID, err)
|
||||
}
|
||||
|
||||
logger.Info("reindex tenant: starting", "tenant_id", tenantID, "count", len(ids))
|
||||
|
||||
idx := mgr.ForTenant(&tid)
|
||||
indexed := 0
|
||||
errCount := 0
|
||||
|
||||
for _, id := range ids {
|
||||
raw, err := store.Load(id)
|
||||
if err != nil {
|
||||
logger.Warn("reindex tenant: load failed", "tenant_id", tenantID, "id", id, "err", err)
|
||||
errCount++
|
||||
continue
|
||||
}
|
||||
|
||||
pm, parseErr := mailparser.Parse(raw)
|
||||
if parseErr != nil {
|
||||
logger.Warn("reindex tenant: parse failed", "tenant_id", tenantID, "id", id, "err", parseErr)
|
||||
errCount++
|
||||
continue
|
||||
}
|
||||
|
||||
var attachNames []string
|
||||
for _, a := range pm.Attachments {
|
||||
if a.Filename != "" {
|
||||
attachNames = append(attachNames, a.Filename)
|
||||
}
|
||||
}
|
||||
|
||||
doc := index.MailDocument{
|
||||
ID: id,
|
||||
From: pm.From,
|
||||
To: strings.Join(pm.To, ", "),
|
||||
Subject: pm.Subject,
|
||||
Body: pm.TextBody,
|
||||
AttachNames: strings.Join(attachNames, " "),
|
||||
HasAttachment: len(pm.Attachments) > 0,
|
||||
Date: pm.Date,
|
||||
Size: int64(len(raw)),
|
||||
TenantID: &tid,
|
||||
}
|
||||
|
||||
if err := idx.IndexSync(doc); err != nil {
|
||||
logger.Warn("reindex tenant: index failed", "tenant_id", tenantID, "id", id, "err", err)
|
||||
errCount++
|
||||
continue
|
||||
}
|
||||
indexed++
|
||||
}
|
||||
|
||||
logger.Info("reindex tenant: complete", "tenant_id", tenantID, "indexed", indexed, "errors", errCount)
|
||||
return nil
|
||||
}
|
||||
|
||||
// runIntegrityCheck verifies all stored emails every 5 minutes by re-computing
|
||||
// their SHA-256 and comparing it to the stored file ID.
|
||||
func runIntegrityCheck(ctx context.Context, store *storage.Store, logger *slog.Logger) {
|
||||
|
||||
Reference in New Issue
Block a user