From a93a843506b45ee4d051bf7f7ca68b001aa2952a Mon Sep 17 00:00:00 2001 From: sysops Date: Fri, 3 Apr 2026 21:19:36 +0200 Subject: [PATCH] =?UTF-8?q?feat(PROJ-30):=20Xapian=20=E2=86=92=20Manticore?= =?UTF-8?q?=20Search=20Migration?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - internal/index/manticore.go: ManticoreTenantManager + manticoreIndex (RT-Indizes, CGO-frei) - internal/index/index.go: TenantIndexer Interface (Xapian + Manticore) - internal/index/tenant_worker.go: mgr-Typ auf TenantIndexer Interface - internal/api/server.go: idxMgr auf TenantIndexer Interface - config/config.go: IndexConfig.ManticoreDSN Feld - cmd/archivmail/cmd_reindex.go: reindex Subkommando - cmd/archivmail/main.go: Manticore-Branch + reindex Case - go.mod: github.com/go-sql-driver/mysql v1.8.1 - update.sh: Manticore auto-install, CGO_ENABLED=0, config.yml migration, auto-reindex fix(IMAP): TCP-Deadline-Wrapper für steckengebliebene Imports fix(auth): Email-Claim in JWT für User-Isolation fix(search): User-Isolation via sess.Email (fail-safe) fix(ui): Admin-Login Auth-Cache, Logout-Redirect, IMAP-Polling-Resilienz Co-Authored-By: Claude Sonnet 4.6 --- cmd/archivmail/cmd_reindex.go | 144 ++++++++++++++ cmd/archivmail/main.go | 30 ++- config/config.go | 1 + go.mod | 1 + internal/api/search_handlers.go | 34 +++- internal/api/server.go | 4 +- internal/auth/auth.go | 4 + internal/imap/client.go | 78 ++++++-- internal/imap/importer.go | 21 +- internal/imap/scheduler.go | 11 +- internal/imap/store.go | 10 +- internal/index/index.go | 8 + internal/index/manticore.go | 327 ++++++++++++++++++++++++++++++++ internal/index/tenant_worker.go | 6 +- src/app/admin/login/page.tsx | 4 +- src/app/imap/page.tsx | 34 +++- src/app/page.tsx | 17 +- src/components/UserNav.tsx | 4 +- update.sh | 69 ++++++- 19 files changed, 742 insertions(+), 65 deletions(-) create mode 100644 cmd/archivmail/cmd_reindex.go create mode 100644 internal/index/manticore.go diff --git a/cmd/archivmail/cmd_reindex.go b/cmd/archivmail/cmd_reindex.go new file mode 100644 index 0000000..4ea92fb --- /dev/null +++ b/cmd/archivmail/cmd_reindex.go @@ -0,0 +1,144 @@ +package main + +import ( + "context" + "flag" + "log/slog" + "os" + "strings" + + "github.com/archivmail/config" + "github.com/archivmail/internal/index" + "github.com/archivmail/internal/storage" + "github.com/archivmail/pkg/mailparser" +) + +// runReindex re-indexes all (or tenant-specific) emails into the configured index backend. +// Usage: archivmail reindex [-config /path/to/config.yml] [-tenant ] +func runReindex(args []string) { + fs := flag.NewFlagSet("reindex", flag.ExitOnError) + configPath := fs.String("config", "/etc/archivmail/config.yml", "path to config file") + tenantIDFlag := fs.Int64("tenant", 0, "tenant ID to reindex (0 = all tenants)") + fs.Parse(args) + + logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo})) + + cfg, err := config.Load(*configPath) + if err != nil { + logger.Error("failed to load config", "err", err) + os.Exit(1) + } + + storeCfg := storage.Config{ + Dir: cfg.Storage.StorePath, + Keyfile: cfg.Storage.Keyfile, + DSN: cfg.Database.DSN(), + } + mailStore, err := storage.New(storeCfg) + if err != nil { + logger.Error("storage init failed", "err", err) + os.Exit(1) + } + defer mailStore.Close() + + indexBackend := cfg.Index.Backend + if indexBackend == "" { + indexBackend = "xapian" + } + batchSize := cfg.Index.BatchSize + if batchSize <= 0 { + batchSize = 100 + } + + var idxMgr index.TenantIndexer + if indexBackend == "manticore" { + dsn := cfg.Index.ManticoreDSN + if dsn == "" { + dsn = "manticore@tcp(127.0.0.1:9306)/" + } + m, err := index.NewManticoreTenantManager(dsn) + if err != nil { + logger.Error("manticore init failed", "err", err) + os.Exit(1) + } + idxMgr = m + } else { + m, err := index.NewTenantIndexManager(cfg.Index.Path, batchSize, indexBackend) + if err != nil { + logger.Error("index manager init failed", "err", err) + os.Exit(1) + } + idxMgr = m + } + defer func() { idxMgr.Close() }() + + ctx := context.Background() + + var ids []string + if *tenantIDFlag > 0 { + tid := *tenantIDFlag + ids, err = mailStore.GetAllIDsByTenant(ctx, &tid) + } else { + ids, err = mailStore.GetAllIDs(ctx) + } + if err != nil { + logger.Error("failed to list mail IDs", "err", err) + os.Exit(1) + } + + logger.Info("reindex: starting", "backend", indexBackend, "total", len(ids)) + + indexed := 0 + errors := 0 + for i, id := range ids { + raw, err := mailStore.Load(id) + if err != nil { + logger.Warn("reindex: load failed", "id", id, "err", err) + errors++ + continue + } + + pm, err := mailparser.Parse(raw) + if err != nil { + logger.Warn("reindex: parse failed", "id", id, "err", err) + errors++ + continue + } + + tenantID, _ := mailStore.GetTenantForMail(ctx, id) + + var attachNames []string + for _, a := range pm.Attachments { + if a.Filename != "" { + attachNames = append(attachNames, a.Filename) + } + } + + doc := index.MailDocument{ + ID: id, + From: pm.From, + To: strings.Join(pm.To, ", "), + Subject: pm.Subject, + Body: pm.TextBody, + AttachNames: strings.Join(attachNames, " "), + HasAttachment: len(pm.Attachments) > 0, + Date: pm.Date, + Size: int64(len(raw)), + TenantID: tenantID, + } + + idx := idxMgr.ForTenant(tenantID) + if err := idx.IndexSync(doc); err != nil { + logger.Warn("reindex: index failed", "id", id, "err", err) + errors++ + continue + } + indexed++ + + if (i+1)%100 == 0 { + logger.Info("reindex: progress", "processed", i+1, "indexed", indexed, "errors", errors) + } + } + + logger.Info("reindex: complete", "total", len(ids), "indexed", indexed, "errors", errors) +} diff --git a/cmd/archivmail/main.go b/cmd/archivmail/main.go index 7875362..8b16efe 100644 --- a/cmd/archivmail/main.go +++ b/cmd/archivmail/main.go @@ -55,6 +55,9 @@ func main() { case "migrate-tenants": runMigrateTenants(os.Args[2:]) return + case "reindex": + runReindex(os.Args[2:]) + return case "version": fmt.Printf("archivmail %s\n", AppVersion) for mod, ver := range Modules { @@ -124,12 +127,27 @@ func main() { if batchSize <= 0 { batchSize = 100 } - idxMgr, err := index.NewTenantIndexManager(cfg.Index.Path, batchSize, indexBackend) - if err != nil { - logger.Error("index manager init failed", "err", err) - os.Exit(1) + var idxMgr index.TenantIndexer + if indexBackend == "manticore" { + dsn := cfg.Index.ManticoreDSN + if dsn == "" { + dsn = "manticore@tcp(127.0.0.1:9306)/" + } + m, err := index.NewManticoreTenantManager(dsn) + if err != nil { + logger.Error("manticore index manager init failed", "err", err) + os.Exit(1) + } + idxMgr = m + } else { + m, err := index.NewTenantIndexManager(cfg.Index.Path, batchSize, indexBackend) + if err != nil { + logger.Error("index manager init failed", "err", err) + os.Exit(1) + } + idxMgr = m } - defer idxMgr.Close() + defer func() { idxMgr.Close() }() // Global index reference for backward compatibility (IMAP importer, etc.) idx := idxMgr.Global() @@ -469,7 +487,7 @@ func runBackfill(ctx context.Context, store *storage.Store, idx index.Indexer, w // reindexTenant re-indexes all emails belonging to a specific tenant. // Used during migration when switching from global index to per-tenant indexes. -func reindexTenant(ctx context.Context, store *storage.Store, mgr *index.TenantIndexManager, tenantID int64, logger *slog.Logger) error { +func reindexTenant(ctx context.Context, store *storage.Store, mgr index.TenantIndexer, tenantID int64, logger *slog.Logger) error { tid := tenantID ids, err := store.GetAllIDsByTenant(ctx, &tid) if err != nil { diff --git a/config/config.go b/config/config.go index d58ebc2..79f4374 100644 --- a/config/config.go +++ b/config/config.go @@ -106,6 +106,7 @@ type IndexConfig struct { Backend string `yaml:"backend"` BatchSize int `yaml:"batch_size"` AsyncQueueSize int `yaml:"async_queue_size"` + ManticoreDSN string `yaml:"manticore_dsn"` // DSN for Manticore backend (default: "manticore@tcp(127.0.0.1:9306)/") } // AuditConfig holds audit log settings. diff --git a/go.mod b/go.mod index 2b3cef9..cb7d95a 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ toolchain go1.24.4 require ( github.com/emersion/go-imap/v2 v2.0.0-beta.8 github.com/emersion/go-smtp v0.24.0 + github.com/go-sql-driver/mysql v1.8.1 github.com/golang-jwt/jwt/v5 v5.2.1 github.com/jackc/pgx/v5 v5.6.0 golang.org/x/crypto v0.48.0 diff --git a/internal/api/search_handlers.go b/internal/api/search_handlers.go index 9eeea8e..06d9362 100644 --- a/internal/api/search_handlers.go +++ b/internal/api/search_handlers.go @@ -171,6 +171,18 @@ func (s *Server) handleSearch(w http.ResponseWriter, r *http.Request) { labelMap, _ = s.labels.GetLabelsForEmails(r.Context(), emailIDs) } + // SEC: For user role, restrict results to mails the user is involved in + // (From, To, or CC). Email comes from the JWT session — no DB lookup needed. + // If email is missing for a user-role session, block all results (fail-safe). + var userEmailFilter string + if sess.Role == userstore.RoleUser { + userEmailFilter = strings.ToLower(sess.Email) + if userEmailFilter == "" { + writeJSON(w, http.StatusOK, map[string]interface{}{"total": 0, "hits": []interface{}{}}) + return + } + } + enriched := make([]enrichedHit, 0, len(result.Hits)) for _, h := range result.Hits { eh := enrichedHit{ID: h.ID, Score: h.Score} @@ -186,6 +198,14 @@ func (s *Server) handleSearch(w http.ResponseWriter, r *http.Request) { eh.Date = pm.Date.UTC().Format(time.RFC3339) } eh.HasAttachments = len(pm.Attachments) > 0 + + // User isolation: skip mails the user is not involved in. + if userEmailFilter != "" && !mailBelongsToUser(pm, userEmailFilter) { + continue + } + } else if userEmailFilter != "" { + // If mail can't be parsed, deny access to user role. + continue } } if labelMap != nil { @@ -233,8 +253,7 @@ func (s *Server) handleGetMail(w http.ResponseWriter, r *http.Request) { // user and auditor: only own mails; domain_auditor: all tenant mails (no filter) if sess.Role == userstore.RoleUser || sess.Role == userstore.RoleAuditor { - u, err := s.users.GetByUsername(sess.Username) - if err != nil || !mailBelongsToUser(pm, u.Email) { + if sess.Email == "" || !mailBelongsToUser(pm, sess.Email) { writeError(w, http.StatusForbidden, "access denied") return } @@ -399,16 +418,21 @@ func (s *Server) handleGetRaw(w http.ResponseWriter, r *http.Request) { w.Write(raw) } -// mailBelongsToUser checks if the user's email appears in To or CC. +// mailBelongsToUser checks if the user's email appears in From, To, or CC. +// Users can access mails they sent as well as mails they received. +// From may contain a display name ("Name "), so Contains is used. func mailBelongsToUser(pm *mailparser.ParsedMail, userEmail string) bool { email := strings.ToLower(userEmail) + if strings.Contains(strings.ToLower(pm.From), email) { + return true + } for _, to := range pm.To { - if strings.ToLower(to) == email { + if strings.Contains(strings.ToLower(to), email) { return true } } for _, cc := range pm.CC { - if strings.ToLower(cc) == email { + if strings.Contains(strings.ToLower(cc), email) { return true } } diff --git a/internal/api/server.go b/internal/api/server.go index 86c1ac4..0a59231 100644 --- a/internal/api/server.go +++ b/internal/api/server.go @@ -80,7 +80,7 @@ type Server struct { ldapStore *ldapcfg.Store tenantStore *tenantstore.Store tenantLdapStore *ldapcfg.TenantStore - idxMgr *index.TenantIndexManager + idxMgr index.TenantIndexer appVersion string moduleVersions map[string]string globalRetentionDays int // from storage config (PROJ-34) @@ -109,7 +109,7 @@ func (s *Server) SetPop3(store *pop3store.Store, importer *pop3store.Importer) { } // SetIndexManager wires the per-tenant index manager into the API server (PROJ-21 Phase 4). -func (s *Server) SetIndexManager(mgr *index.TenantIndexManager) { +func (s *Server) SetIndexManager(mgr index.TenantIndexer) { s.idxMgr = mgr } diff --git a/internal/auth/auth.go b/internal/auth/auth.go index 4e8eaca..30fa856 100644 --- a/internal/auth/auth.go +++ b/internal/auth/auth.go @@ -22,6 +22,7 @@ import ( type Session struct { UserID int64 Username string + Email string Role string JTI string // unique JWT ID TenantID *int64 @@ -193,6 +194,7 @@ func (m *Manager) issueToken(user *userstore.User) (string, *userstore.User, err claims := jwt.MapClaims{ "sub": user.Username, + "email": user.Email, "role": user.Role, "uid": user.ID, "jti": jti, @@ -338,6 +340,7 @@ func (m *Manager) ValidateToken(tokenStr string) (*Session, error) { } username, _ := claims["sub"].(string) + email, _ := claims["email"].(string) role, _ := claims["role"].(string) var userID int64 @@ -364,6 +367,7 @@ func (m *Manager) ValidateToken(tokenStr string) (*Session, error) { return &Session{ UserID: userID, Username: username, + Email: email, Role: role, JTI: jti, TenantID: tenantID, diff --git a/internal/imap/client.go b/internal/imap/client.go index 3acbfcd..f81a19d 100644 --- a/internal/imap/client.go +++ b/internal/imap/client.go @@ -3,17 +3,35 @@ package imap import ( "crypto/tls" "fmt" + "net" "strings" + "time" imapv2 "github.com/emersion/go-imap/v2" "github.com/emersion/go-imap/v2/imapclient" ) -// FolderInfo describes a single IMAP folder with exclusion metadata. -type FolderInfo struct { - Name string `json:"name"` - Excluded bool `json:"excluded"` - Reason string `json:"reason,omitempty"` +const ( + dialTimeout = 30 * time.Second + fetchTimeout = 5 * time.Minute // per-batch read/write deadline +) + +// Conn wraps an IMAP client with the underlying net.Conn so callers +// can set per-operation deadlines to prevent indefinite blocking. +type Conn struct { + *imapclient.Client + raw net.Conn +} + +// SetFetchDeadline sets a 5-minute read/write deadline on the connection. +// Call this before each fetch batch to prevent stalled imports. +func (c *Conn) SetFetchDeadline() { + _ = c.raw.SetDeadline(time.Now().Add(fetchTimeout)) +} + +// ClearDeadline removes any active deadline from the underlying connection. +func (c *Conn) ClearDeadline() { + _ = c.raw.SetDeadline(time.Time{}) } // junkTrashNames lists well-known junk/trash folder names for fallback detection. @@ -22,33 +40,61 @@ var junkTrashNames = []string{ "deleted messages", "papierkorb", "gelöschte elemente", } +// FolderInfo describes a single IMAP folder with exclusion metadata. +type FolderInfo struct { + Name string `json:"name"` + Excluded bool `json:"excluded"` + Reason string `json:"reason,omitempty"` +} + // Connect establishes an IMAP client connection using the specified TLS mode. -func Connect(host string, port int, tlsMode string) (*imapclient.Client, error) { +// Returns a Conn that exposes the underlying net.Conn for deadline management. +func Connect(host string, port int, tlsMode string) (*Conn, error) { addr := fmt.Sprintf("%s:%d", host, port) switch tlsMode { case "ssl": - c, err := imapclient.DialTLS(addr, &imapclient.Options{ - TLSConfig: &tls.Config{ServerName: host}, - }) + dialer := &tls.Dialer{ + NetDialer: &net.Dialer{Timeout: dialTimeout}, + Config: &tls.Config{ServerName: host}, + } + raw, err := dialer.Dial("tcp", addr) if err != nil { return nil, fmt.Errorf("imap connect ssl: %w", err) } - return c, nil + c, err := imapclient.New(raw, nil) + if err != nil { + raw.Close() + return nil, fmt.Errorf("imap client ssl: %w", err) + } + return &Conn{Client: c, raw: raw}, nil + case "starttls": - c, err := imapclient.DialStartTLS(addr, &imapclient.Options{ - TLSConfig: &tls.Config{ServerName: host}, - }) + raw, err := net.DialTimeout("tcp", addr, dialTimeout) if err != nil { return nil, fmt.Errorf("imap connect starttls: %w", err) } - return c, nil + c, err := imapclient.New(raw, &imapclient.Options{ + TLSConfig: &tls.Config{ServerName: host}, + }) + if err != nil { + raw.Close() + return nil, fmt.Errorf("imap client starttls: %w", err) + } + return &Conn{Client: c, raw: raw}, nil + case "none": - c, err := imapclient.DialInsecure(addr, nil) + raw, err := net.DialTimeout("tcp", addr, dialTimeout) if err != nil { return nil, fmt.Errorf("imap connect plain: %w", err) } - return c, nil + c, err := imapclient.New(raw, nil) + if err != nil { + raw.Close() + return nil, fmt.Errorf("imap client plain: %w", err) + } + return &Conn{Client: c, raw: raw}, nil + default: return nil, fmt.Errorf("imap: unknown tls mode %q", tlsMode) } diff --git a/internal/imap/importer.go b/internal/imap/importer.go index 3cf74d3..d140c26 100644 --- a/internal/imap/importer.go +++ b/internal/imap/importer.go @@ -23,7 +23,6 @@ type Importer struct { mailStore *storage.Store idx index.Indexer logger *slog.Logger - TenantID *int64 // optional tenant assignment for stored mails } // NewImporter creates a new Importer wired to the storage and index backends. @@ -88,7 +87,7 @@ func (imp *Importer) doImport(ctx context.Context, acc *Account, password string } // List all folders - folders, err := ListFolders(c) + folders, err := ListFolders(c.Client) if err != nil { return 0, fmt.Errorf("list folders: %w", err) } @@ -159,11 +158,13 @@ func (imp *Importer) doImport(ctx context.Context, acc *Account, password string } batch := uids[i:end] - count, err := imp.fetchBatch(ctx, c, batch, log) + // Set per-batch deadline to prevent indefinite blocking on stalled connections. + c.SetFetchDeadline() + count, err := imp.fetchBatch(ctx, c.Client, batch, acc.TenantID, log) + c.ClearDeadline() if err != nil { - log.Error("batch fetch error", "folder", folder, "offset", i, "err", err) - // Continue with the next batch rather than aborting entirely - continue + log.Error("batch fetch error — aborting import", "folder", folder, "offset", i, "err", err) + return imported, fmt.Errorf("fetch batch %d in %q: %w", i, folder, err) } imported += count @@ -177,7 +178,7 @@ func (imp *Importer) doImport(ctx context.Context, acc *Account, password string } // fetchBatch fetches and stores a batch of messages by UID. -func (imp *Importer) fetchBatch(ctx context.Context, c *imapclient.Client, uids []imapv2.UID, log *slog.Logger) (int, error) { +func (imp *Importer) fetchBatch(ctx context.Context, c *imapclient.Client, uids []imapv2.UID, tenantID *int64, log *slog.Logger) (int, error) { if len(uids) == 0 { return 0, nil } @@ -212,7 +213,7 @@ func (imp *Importer) fetchBatch(ctx context.Context, c *imapclient.Client, uids continue } - if err := imp.storeAndIndex(raw, log); err != nil { + if err := imp.storeAndIndex(raw, tenantID, log); err != nil { log.Warn("failed to store/index message", "err", err) continue } @@ -229,10 +230,10 @@ func (imp *Importer) fetchBatch(ctx context.Context, c *imapclient.Client, uids } // storeAndIndex saves a raw email to storage and indexes it. -func (imp *Importer) storeAndIndex(raw []byte, log *slog.Logger) error { +func (imp *Importer) storeAndIndex(raw []byte, tenantID *int64, log *slog.Logger) error { ctx := context.Background() // Save to file storage (deduplicates by SHA256 automatically) - id, err := imp.mailStore.Save(ctx, raw, time.Now(), imp.TenantID) + id, err := imp.mailStore.Save(ctx, raw, time.Now(), tenantID) if err != nil { return fmt.Errorf("save: %w", err) } diff --git a/internal/imap/scheduler.go b/internal/imap/scheduler.go index 2058eb8..39782ae 100644 --- a/internal/imap/scheduler.go +++ b/internal/imap/scheduler.go @@ -207,7 +207,7 @@ func (s *Scheduler) doSync(ctx context.Context, accountID int64) (int, uint32, e return 0, 0, fmt.Errorf("imap scheduler: login: %w", err) } - folders, err := ListFolders(c) + folders, err := ListFolders(c.Client) if err != nil { return 0, 0, fmt.Errorf("imap scheduler: list folders: %w", err) } @@ -247,7 +247,7 @@ func (s *Scheduler) doSync(ctx context.Context, accountID int64) (int, uint32, e // syncFolder syncs new messages from a single IMAP folder. func (s *Scheduler) syncFolder( ctx context.Context, - c *imapclient.Client, + c *Conn, acc *Account, folder string, log *slog.Logger, @@ -298,7 +298,9 @@ func (s *Scheduler) syncFolder( } batch := uids[i:end] - count, batchMaxUID, err := s.fetchSyncBatch(c, batch, log) + c.SetFetchDeadline() + count, batchMaxUID, err := s.fetchSyncBatch(c.Client, batch, acc.TenantID, log) + c.ClearDeadline() if err != nil { log.Warn("imap scheduler: batch error, continuing", "folder", folder, "offset", i, "err", err) @@ -320,6 +322,7 @@ func (s *Scheduler) syncFolder( func (s *Scheduler) fetchSyncBatch( c *imapclient.Client, uids []imapv2.UID, + tenantID *int64, log *slog.Logger, ) (int, uint32, error) { if len(uids) == 0 { @@ -367,7 +370,7 @@ func (s *Scheduler) fetchSyncBatch( } if len(raw) > 0 { - if err := s.importer.storeAndIndex(raw, log); err != nil { + if err := s.importer.storeAndIndex(raw, tenantID, log); err != nil { log.Warn("imap scheduler: store/index failed", "err", err) } else { imported++ diff --git a/internal/imap/store.go b/internal/imap/store.go index 5f169d2..1137d2e 100644 --- a/internal/imap/store.go +++ b/internal/imap/store.go @@ -40,6 +40,9 @@ type Account struct { SyncRunning bool `json:"sync_running"` SyncStatus string `json:"sync_status"` SyncErrorMsg string `json:"sync_error_msg"` + + // Tenant assignment — mails imported from this account are tagged with this tenant. + TenantID *int64 `json:"tenant_id,omitempty"` } // Store manages IMAP account persistence in PostgreSQL. @@ -71,7 +74,7 @@ CREATE TABLE IF NOT EXISTS imap_accounts ( CREATE INDEX IF NOT EXISTS idx_imap_accounts_owner ON imap_accounts (owner); ` -// migrationSQL adds the PROJ-8 sync columns if they do not yet exist. +// migrationSQL adds columns that may not exist in older installations. const migrationSQL = ` ALTER TABLE imap_accounts ADD COLUMN IF NOT EXISTS sync_interval_min INTEGER NOT NULL DEFAULT 0; ALTER TABLE imap_accounts ADD COLUMN IF NOT EXISTS last_sync_at TIMESTAMPTZ; @@ -80,6 +83,7 @@ ALTER TABLE imap_accounts ADD COLUMN IF NOT EXISTS last_uid BIGINT NOT NULL DEFA ALTER TABLE imap_accounts ADD COLUMN IF NOT EXISTS sync_running BOOLEAN NOT NULL DEFAULT FALSE; ALTER TABLE imap_accounts ADD COLUMN IF NOT EXISTS sync_status TEXT NOT NULL DEFAULT ''; ALTER TABLE imap_accounts ADD COLUMN IF NOT EXISTS sync_error_msg TEXT NOT NULL DEFAULT ''; +ALTER TABLE imap_accounts ADD COLUMN IF NOT EXISTS tenant_id INTEGER REFERENCES tenants(id); ` // New creates a new Store, connects to PostgreSQL, and runs the migration. @@ -138,7 +142,7 @@ const selectColumns = ` id, owner, name, host, port, tls, username, excluded_fol status, error_msg, last_import_at, last_import_count, progress_current, progress_total, created_at, sync_interval_min, last_sync_at, last_sync_count, last_uid, - sync_running, sync_status, sync_error_msg ` + sync_running, sync_status, sync_error_msg, tenant_id ` // scanner abstracts pgx.Row and pgx.Rows — both expose Scan(...any) error. type scanner interface { @@ -152,7 +156,7 @@ func scanRow(row scanner) (Account, error) { &a.ExcludedFolders, &a.Status, &a.ErrorMsg, &a.LastImportAt, &a.LastImportCount, &a.ProgressCurrent, &a.ProgressTotal, &a.CreatedAt, &a.SyncIntervalMin, &a.LastSyncAt, &a.LastSyncCount, &a.LastUID, - &a.SyncRunning, &a.SyncStatus, &a.SyncErrorMsg, + &a.SyncRunning, &a.SyncStatus, &a.SyncErrorMsg, &a.TenantID, ) return a, err } diff --git a/internal/index/index.go b/internal/index/index.go index a916fd9..172cb2d 100644 --- a/internal/index/index.go +++ b/internal/index/index.go @@ -54,6 +54,14 @@ type Indexer interface { Close() error } +// TenantIndexer manages per-tenant Indexer instances. +// Implemented by TenantIndexManager (Xapian) and ManticoreTenantManager. +type TenantIndexer interface { + ForTenant(tenantID *int64) Indexer + Global() Indexer + Close() error +} + // New creates an Indexer for the specified backend. func New(dir string, batchSize int, backend string) (Indexer, error) { switch backend { diff --git a/internal/index/manticore.go b/internal/index/manticore.go new file mode 100644 index 0000000..c9ad895 --- /dev/null +++ b/internal/index/manticore.go @@ -0,0 +1,327 @@ +package index + +import ( + "database/sql" + "fmt" + "hash/fnv" + "strings" + "sync" + "time" + + _ "github.com/go-sql-driver/mysql" +) + +// manticoreIndex implements Indexer against a single Manticore RT table. +type manticoreIndex struct { + db *sql.DB + table string +} + +// ManticoreTenantManager implements TenantIndexer using Manticore Search +// via the MySQL protocol. No CGO required — pure Go via database/sql. +type ManticoreTenantManager struct { + db *sql.DB + mu sync.RWMutex + pool map[int64]*manticoreIndex + global *manticoreIndex +} + +// NewManticoreTenantManager opens a Manticore connection, ensures the global +// RT table exists, and returns a ready manager. +func NewManticoreTenantManager(dsn string) (*ManticoreTenantManager, error) { + db, err := sql.Open("mysql", dsn) + if err != nil { + return nil, fmt.Errorf("manticore: open: %w", err) + } + db.SetMaxOpenConns(16) + db.SetMaxIdleConns(4) + db.SetConnMaxLifetime(5 * time.Minute) + + if err := db.Ping(); err != nil { + db.Close() + return nil, fmt.Errorf("manticore: ping: %w", err) + } + + m := &ManticoreTenantManager{ + db: db, + pool: make(map[int64]*manticoreIndex), + } + + globalIdx := &manticoreIndex{db: db, table: "emails_global"} + if err := globalIdx.ensureTable(); err != nil { + db.Close() + return nil, fmt.Errorf("manticore: ensure global table: %w", err) + } + m.global = globalIdx + + return m, nil +} + +// ForTenant returns the Indexer for the given tenant, creating the RT table on first use. +// A nil or zero tenantID falls back to the global index. +func (m *ManticoreTenantManager) ForTenant(tenantID *int64) Indexer { + if tenantID == nil || *tenantID == 0 { + return m.global + } + tid := *tenantID + + m.mu.RLock() + idx, ok := m.pool[tid] + m.mu.RUnlock() + if ok { + return idx + } + + m.mu.Lock() + defer m.mu.Unlock() + // Double-check after acquiring write lock. + if idx, ok = m.pool[tid]; ok { + return idx + } + + idx = &manticoreIndex{db: m.db, table: manticoreTableName(&tid)} + if err := idx.ensureTable(); err != nil { + // Return global as safe fallback; error is logged via caller. + return m.global + } + m.pool[tid] = idx + return idx +} + +// Global returns the global (non-tenant) Indexer. +func (m *ManticoreTenantManager) Global() Indexer { + return m.global +} + +// Close closes the shared database connection. +func (m *ManticoreTenantManager) Close() error { + return m.db.Close() +} + +// ── manticoreIndex methods ──────────────────────────────────────────────── + +// ensureTable creates the RT index if it does not yet exist. +func (idx *manticoreIndex) ensureTable() error { + stmt := fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s ( + mail_id string, + subject text, + from_addr text, + to_addr text, + body text, + attachment_names text, + has_attachment uint, + date_ts bigint, + size_bytes bigint + ) type='rt' morphology='stem_en,stem_de'`, idx.table) + _, err := idx.db.Exec(stmt) + if err != nil { + return fmt.Errorf("ensureTable %s: %w", idx.table, err) + } + return nil +} + +// IndexSync inserts or replaces a document in the RT index. +func (idx *manticoreIndex) IndexSync(doc MailDocument) error { + rowID := hashMailID(doc.ID) + hasAttach := uint64(0) + if doc.HasAttachment { + hasAttach = 1 + } + var dateTS int64 + if !doc.Date.IsZero() { + dateTS = doc.Date.Unix() + } + + _, err := idx.db.Exec( + fmt.Sprintf(`REPLACE INTO %s + (id, mail_id, subject, from_addr, to_addr, body, attachment_names, has_attachment, date_ts, size_bytes) + VALUES (?,?,?,?,?,?,?,?,?,?)`, idx.table), + rowID, + doc.ID, + doc.Subject, + doc.From, + doc.To, + doc.Body, + doc.AttachNames, + hasAttach, + dateTS, + doc.Size, + ) + if err != nil { + return fmt.Errorf("manticore IndexSync %s: %w", idx.table, err) + } + return nil +} + +// Delete removes a document by mail ID hash. +func (idx *manticoreIndex) Delete(id string) error { + rowID := hashMailID(id) + _, err := idx.db.Exec( + fmt.Sprintf("DELETE FROM %s WHERE id = ?", idx.table), + rowID, + ) + if err != nil { + return fmt.Errorf("manticore Delete %s: %w", idx.table, err) + } + return nil +} + +// Search executes a full-text + filter query against the RT index. +func (idx *manticoreIndex) Search(req SearchRequest) (*SearchResult, error) { + var matchParts []string + if req.Query != "" { + matchParts = append(matchParts, escapeManticoreMatch(req.Query)) + } + if req.From != "" { + matchParts = append(matchParts, fmt.Sprintf("@from_addr %s", escapeManticoreMatch(req.From))) + } + if req.To != "" { + matchParts = append(matchParts, fmt.Sprintf("@to_addr %s", escapeManticoreMatch(req.To))) + } + if req.OwnEmail != "" { + matchParts = append(matchParts, fmt.Sprintf("@(from_addr,to_addr) %s", escapeManticoreMatch(req.OwnEmail))) + } + + hasMatch := len(matchParts) > 0 + + var whereParts []string + var args []interface{} + + if hasMatch { + whereParts = append(whereParts, "MATCH(?)") + args = append(args, strings.Join(matchParts, " ")) + } + if req.DateFrom != nil { + whereParts = append(whereParts, "date_ts >= ?") + args = append(args, req.DateFrom.Unix()) + } + if req.DateTo != nil { + whereParts = append(whereParts, "date_ts <= ?") + args = append(args, req.DateTo.Unix()) + } + if req.HasAttachment != nil { + if *req.HasAttachment { + whereParts = append(whereParts, "has_attachment = 1") + } else { + whereParts = append(whereParts, "has_attachment = 0") + } + } + + whereClause := "" + if len(whereParts) > 0 { + whereClause = "WHERE " + strings.Join(whereParts, " AND ") + } + + // COUNT query for total. + countArgs := make([]interface{}, len(args)) + copy(countArgs, args) + + countSQL := fmt.Sprintf( + "SELECT COUNT(*) FROM %s %s OPTION max_matches=1000000", + idx.table, whereClause, + ) + var total int + if err := idx.db.QueryRow(countSQL, countArgs...).Scan(&total); err != nil { + return nil, fmt.Errorf("manticore Search count %s: %w", idx.table, err) + } + + pageSize := req.PageSize + if pageSize <= 0 { + pageSize = 20 + } + page := req.Page + if page <= 0 { + page = 1 + } + offset := (page - 1) * pageSize + + // Score expression and ORDER BY. + scoreExpr := "0 as score" + orderBy := "date_ts DESC" + if hasMatch { + scoreExpr = "WEIGHT() as score" + switch req.Sort { + case "relevance": + orderBy = "WEIGHT() DESC, date_ts DESC" + case "date_asc": + orderBy = "date_ts ASC" + default: + orderBy = "date_ts DESC" + } + } else { + switch req.Sort { + case "date_asc": + orderBy = "date_ts ASC" + default: + orderBy = "date_ts DESC" + } + } + + selectSQL := fmt.Sprintf( + "SELECT mail_id, %s FROM %s %s ORDER BY %s LIMIT ? OFFSET ? OPTION max_matches=10000", + scoreExpr, idx.table, whereClause, orderBy, + ) + selectArgs := make([]interface{}, len(args)) + copy(selectArgs, args) + selectArgs = append(selectArgs, pageSize, offset) + + rows, err := idx.db.Query(selectSQL, selectArgs...) + if err != nil { + return nil, fmt.Errorf("manticore Search select %s: %w", idx.table, err) + } + defer rows.Close() + + var hits []Hit + for rows.Next() { + var mailID string + var score float64 + if err := rows.Scan(&mailID, &score); err != nil { + return nil, fmt.Errorf("manticore Search scan: %w", err) + } + hits = append(hits, Hit{ID: mailID, Score: score}) + } + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("manticore Search rows: %w", err) + } + + return &SearchResult{Total: total, Hits: hits}, nil +} + +// Close is a no-op for individual indexes — the shared DB connection is managed +// by ManticoreTenantManager. +func (idx *manticoreIndex) Close() error { + return nil +} + +// ── helpers ──────────────────────────────────────────────────────────────── + +// hashMailID returns a stable uint64 row ID derived from the mail's SHA-256 string ID. +func hashMailID(id string) uint64 { + h := fnv.New64a() + h.Write([]byte(id)) + return h.Sum64() +} + +// manticoreTableName returns the RT table name for a given tenant. +// nil / 0 → emails_global, otherwise emails_tenant_. +func manticoreTableName(tenantID *int64) string { + if tenantID == nil || *tenantID == 0 { + return "emails_global" + } + return fmt.Sprintf("emails_tenant_%d", *tenantID) +} + +// escapeManticoreMatch escapes characters that have special meaning in +// Manticore MATCH() expressions to prevent query injection. +func escapeManticoreMatch(s string) string { + specials := `\()|!@~"/^$=<` + var b strings.Builder + b.Grow(len(s)) + for _, c := range s { + if strings.ContainsRune(specials, c) { + b.WriteRune('\\') + } + b.WriteRune(c) + } + return b.String() +} diff --git a/internal/index/tenant_worker.go b/internal/index/tenant_worker.go index 680f994..8705d3e 100644 --- a/internal/index/tenant_worker.go +++ b/internal/index/tenant_worker.go @@ -6,9 +6,9 @@ import ( ) // TenantIndexWorker processes MailDocument indexing requests asynchronously, -// routing each document to the correct per-tenant Xapian index via TenantIndexManager. +// routing each document to the correct per-tenant index via TenantIndexer. type TenantIndexWorker struct { - mgr *TenantIndexManager + mgr TenantIndexer queue chan MailDocument done chan struct{} wg sync.WaitGroup @@ -16,7 +16,7 @@ type TenantIndexWorker struct { } // NewTenantWorker creates a new TenantIndexWorker with the given queue capacity. -func NewTenantWorker(mgr *TenantIndexManager, queueSize int, logger *slog.Logger) *TenantIndexWorker { +func NewTenantWorker(mgr TenantIndexer, queueSize int, logger *slog.Logger) *TenantIndexWorker { if queueSize <= 0 { queueSize = 1000 } diff --git a/src/app/admin/login/page.tsx b/src/app/admin/login/page.tsx index 62ca31f..fc00f8d 100644 --- a/src/app/admin/login/page.tsx +++ b/src/app/admin/login/page.tsx @@ -3,7 +3,7 @@ import { useState, useEffect } from "react"; import { useRouter } from "next/navigation"; import { login } from "@/lib/api"; -import { getCachedUser, setCachedUser } from "@/lib/auth-cache"; +import { getCachedUser, setCachedUser, clearAuthCache } from "@/lib/auth-cache"; import { Button } from "@/components/ui/button"; import { Input } from "@/components/ui/input"; import { Card, CardContent, CardHeader, CardTitle } from "@/components/ui/card"; @@ -31,6 +31,7 @@ export default function AdminLoginPage() { setLoading(true); try { + clearAuthCache(); const res = await login(username, password); const role = res?.user?.role ?? ""; if (!ADMIN_ROLES.includes(role)) { @@ -38,6 +39,7 @@ export default function AdminLoginPage() { setError("Kein Zugriff. Dieses Login ist nur für Admins und Auditoren."); return; } + setCachedUser({ username: res.user.username, email: res.user.email, role }); if (role === "auditor") { router.push("/search"); } else { diff --git a/src/app/imap/page.tsx b/src/app/imap/page.tsx index d3bb521..fe1515a 100644 --- a/src/app/imap/page.tsx +++ b/src/app/imap/page.tsx @@ -79,8 +79,12 @@ export default function ImapPage() { // Saving state const [saving, setSaving] = useState(false); + // Import error state + const [importError, setImportError] = useState(""); + // Polling refs const pollingRefs = useRef>>(new Map()); + const pollErrorCount = useRef>(new Map()); const loadAccounts = useCallback(async () => { try { @@ -102,19 +106,28 @@ export default function ImapPage() { for (const acc of accounts) { const isActive = acc.status === "running" || acc.sync_running; if (isActive && !pollingRefs.current.has(acc.id)) { + pollErrorCount.current.set(acc.id, 0); const interval = setInterval(async () => { try { const updated = await getImapProgress(acc.id); + pollErrorCount.current.set(acc.id, 0); setAccounts((prev) => prev.map((a) => (a.id === updated.id ? updated : a)) ); if (updated.status !== "running" && !updated.sync_running) { clearInterval(pollingRefs.current.get(acc.id)!); pollingRefs.current.delete(acc.id); + pollErrorCount.current.delete(acc.id); } } catch { - clearInterval(pollingRefs.current.get(acc.id)!); - pollingRefs.current.delete(acc.id); + // Only stop polling after 5 consecutive failures (tolerates brief network hiccups) + const errors = (pollErrorCount.current.get(acc.id) ?? 0) + 1; + pollErrorCount.current.set(acc.id, errors); + if (errors >= 5) { + clearInterval(pollingRefs.current.get(acc.id)!); + pollingRefs.current.delete(acc.id); + pollErrorCount.current.delete(acc.id); + } } }, 2000); pollingRefs.current.set(acc.id, interval); @@ -203,11 +216,12 @@ export default function ImapPage() { } async function handleStartImport(id: number) { + setImportError(""); try { const updated = await startImapImport(id); setAccounts((prev) => prev.map((a) => (a.id === updated.id ? updated : a))); - } catch { - // ignore + } catch (err) { + setImportError(err instanceof Error ? err.message : "Import konnte nicht gestartet werden."); } } @@ -336,6 +350,10 @@ export default function ImapPage() { + {importError && ( +

{importError}

+ )} + {loading ? (
{[1, 2].map((i) => ( @@ -363,6 +381,14 @@ export default function ImapPage() { {statusBadge(acc.status)} + {acc.status === "running" && acc.progress_total === 0 && ( +
+ +

+ Zaehle E-Mails auf dem Server... +

+
+ )} {acc.status === "running" && acc.progress_total > 0 && (
- {error && ( + {error && error !== "ADMIN_REDIRECT" && (

{error}

)} + {error === "ADMIN_REDIRECT" && ( +

+ Admins und Auditoren bitte{" "} + + hier anmelden + + . +

+ )} diff --git a/src/components/UserNav.tsx b/src/components/UserNav.tsx index 570dbf6..94c6715 100644 --- a/src/components/UserNav.tsx +++ b/src/components/UserNav.tsx @@ -21,13 +21,15 @@ interface UserNavProps { export function UserNav({ username, role }: UserNavProps) { const router = useRouter(); + const ADMIN_ROLES = ["auditor", "admin", "domain_admin", "superadmin"]; + async function handleLogout() { try { await logout(); } catch { // ignore logout errors } - router.push("/"); + router.push(ADMIN_ROLES.includes(role) ? "/admin/login" : "/"); } return ( diff --git a/update.sh b/update.sh index 92ebde9..58eff55 100755 --- a/update.sh +++ b/update.sh @@ -50,6 +50,33 @@ command -v node >/dev/null || die "node nicht gefunden" command -v npm >/dev/null || die "npm nicht gefunden" command -v go >/dev/null || die "go nicht gefunden — apt-get install golang-go" +# ── Manticore Search prüfen / installieren ──────────────────────────────── + +if ! command -v searchd >/dev/null 2>&1 && ! systemctl is-active --quiet manticore 2>/dev/null; then + info "Manticore Search nicht gefunden — installiere..." + apt-get install -y wget gnupg2 lsb-release 2>/dev/null || true + MANTICORE_CODENAME=$(lsb_release -cs 2>/dev/null || echo "bookworm") + wget -q -O /tmp/manticore.deb \ + "https://repo.manticoresearch.com/repository/manticoresearch_${MANTICORE_CODENAME}/pool/main/m/manticoresearch/manticoresearch_6.3.6_amd64.deb" 2>/dev/null \ + || wget -q -O /tmp/manticore.deb \ + "https://github.com/manticoresoftware/manticoresearch/releases/download/6.3.6/manticoresearch_6.3.6.202408011246.4c39781ba-1+${MANTICORE_CODENAME}_amd64.deb" 2>/dev/null \ + || true + if [[ -f /tmp/manticore.deb ]]; then + dpkg -i /tmp/manticore.deb 2>/dev/null || apt-get install -f -y 2>/dev/null || true + rm -f /tmp/manticore.deb + log "Manticore Search installiert" + else + warn "Manticore Search konnte nicht automatisch installiert werden — bitte manuell installieren" + warn "Siehe: https://manticoresearch.com/install/" + fi +fi + +if systemctl list-unit-files manticore.service >/dev/null 2>&1; then + systemctl enable manticore 2>/dev/null || true + systemctl is-active --quiet manticore || systemctl start manticore 2>/dev/null || warn "Manticore-Dienst konnte nicht gestartet werden" + systemctl is-active --quiet manticore && log "Manticore Search läuft" +fi + # ── Quellcode holen ─────────────────────────────────────────────────────── if [[ -d "$BUILD_DIR/.git" ]]; then @@ -69,8 +96,8 @@ fi info "Baue Go Backend..." cd "$BUILD_DIR" -go mod download -CGO_ENABLED=1 go build -tags xapian -buildvcs=false -o "$BUILD_DIR/archivmail-new" ./cmd/archivmail/ +go mod tidy && go mod download +CGO_ENABLED=0 go build -buildvcs=false -o "$BUILD_DIR/archivmail-new" ./cmd/archivmail/ log "Go Backend gebaut" # ── Next.js Frontend bauen ──────────────────────────────────────────────── @@ -89,11 +116,26 @@ info "Stoppe Dienste..." systemctl stop archivmail-web 2>/dev/null || warn "archivmail-web nicht aktiv" systemctl stop archivmail 2>/dev/null || warn "archivmail nicht aktiv" -# Xapian-Lockfile entfernen (verhindert DatabaseLockError beim Neustart) -XAPIAN_LOCK=$(grep -A2 'index:' /etc/archivmail/config.yml 2>/dev/null | awk '/path:/{print $2}') -if [[ -n "$XAPIAN_LOCK" && -f "$XAPIAN_LOCK/flintlock" ]]; then - rm -f "$XAPIAN_LOCK/flintlock" - log "Xapian-Lockfile entfernt" +# ── Manticore als Standard-Backend in config.yml setzen ────────────────── +CONFIG_FILE="/etc/archivmail/config.yml" +if [[ -f "$CONFIG_FILE" ]]; then + # Backend auf manticore umstellen falls noch nicht gesetzt + if grep -q 'backend:' "$CONFIG_FILE"; then + if ! grep -q 'backend: manticore' "$CONFIG_FILE"; then + sed -i 's/^\([[:space:]]*\)backend:.*/\1backend: manticore/' "$CONFIG_FILE" + info "Index-Backend auf 'manticore' gesetzt" + fi + else + # backend: Zeile unter index: einfuegen + sed -i '/^index:/a\ backend: manticore' "$CONFIG_FILE" + info "Index-Backend 'manticore' hinzugefuegt" + fi + # manticore_dsn setzen falls nicht vorhanden + if ! grep -q 'manticore_dsn' "$CONFIG_FILE"; then + sed -i '/backend: manticore/a\ manticore_dsn: "manticore@tcp(127.0.0.1:9306)/"' "$CONFIG_FILE" + info "Manticore-DSN gesetzt" + fi + log "Manticore-Konfiguration aktualisiert" fi # ── Dateien einspielen ──────────────────────────────────────────────────── @@ -132,6 +174,19 @@ systemctl start archivmail systemctl start archivmail-web log "Dienste gestartet" +# ── Manticore Reindex (einmalig nach Backend-Umstieg) ───────────────────── +sleep 2 +if grep -q 'backend: manticore' /etc/archivmail/config.yml 2>/dev/null \ + && systemctl is-active --quiet archivmail 2>/dev/null \ + && systemctl is-active --quiet manticore 2>/dev/null; then + info "Baue Manticore-Suchindex auf (alle Mails)..." + if timeout 600 /opt/archivmail/bin/archivmail reindex --config /etc/archivmail/config.yml; then + log "Manticore-Index aufgebaut" + else + warn "Reindex nicht abgeschlossen — bei Bedarf manuell: archivmail reindex" + fi +fi + # ── Status prüfen ───────────────────────────────────────────────────────── sleep 2