From 479c27e5a84c987773ac0f314d1a9900037592dc Mon Sep 17 00:00:00 2001 From: sysops Date: Tue, 17 Mar 2026 21:03:40 +0100 Subject: [PATCH] feat(PROJ-21): Phase 2+3+5+8 Multi-Tenancy + PROJ-2 EML/MBOX Upload Phase 2a: userstore domain_admin/superadmin Rollen, User.TenantID, ListByTenant, UpsertLDAPUser mit tenantID Phase 2b: storage.Save() mit tenantID *int64, email_refs Tabelle, GetTenantForMail, GetAllIDsByTenant, StatsByTenant Phase 2c: JWT-Claims tenant_id/tenant_slug, Session.TenantID, Login Domain-Erkennung via E-Mail-Domain Phase 3: tenantMiddleware, Handler-Filterung (Users, Mail, Stats) Phase 5: SMTP Domain-Routing via DomainToTenantFunc Callback, config smtp.tenant_routing + default_tenant_id Phase 8: archivmail migrate-tenants Subkommando PROJ-2: Upload-Seite /admin/upload mit DropZone + Progress-Polling Co-Authored-By: Claude Sonnet 4.6 --- cmd/archivmail-import/main.go | 10 +- cmd/archivmail/cmd_import.go | 3 +- cmd/archivmail/cmd_migrate_tenants.go | 135 +++++++++++ cmd/archivmail/main.go | 37 ++- config/config.go | 16 +- features/INDEX.md | 2 +- features/PROJ-21-multi-tenancy.md | 28 ++- internal/api/server.go | 166 ++++++++++---- internal/api/upload.go | 13 +- internal/auth/auth.go | 55 +++-- internal/imap/importer.go | 4 +- internal/pop3/importer.go | 4 +- internal/smtpd/smtpd.go | 61 ++++- internal/storage/storage.go | 196 +++++++++++++--- internal/userstore/userstore.go | 80 +++++-- src/app/admin/upload/page.tsx | 314 ++++++++++++++++++++++++++ 16 files changed, 966 insertions(+), 158 deletions(-) create mode 100644 cmd/archivmail/cmd_migrate_tenants.go create mode 100644 src/app/admin/upload/page.tsx diff --git a/cmd/archivmail-import/main.go b/cmd/archivmail-import/main.go index d930a5f..87d0edd 100644 --- a/cmd/archivmail-import/main.go +++ b/cmd/archivmail-import/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "flag" "fmt" "log/slog" @@ -33,11 +34,16 @@ func main() { os.Exit(1) } - mailStore, err := storage.New(cfg.Storage.StorePath) + mailStore, err := storage.New(storage.Config{ + Dir: cfg.Storage.StorePath, + Keyfile: cfg.Storage.Keyfile, + DSN: cfg.Database.DSN(), + }) if err != nil { logger.Error("storage init failed", "err", err) os.Exit(1) } + defer mailStore.Close() indexBackend := cfg.Index.Backend if indexBackend == "" { @@ -101,7 +107,7 @@ func main() { continue } - id, err := mailStore.Save(raw, pm.Date) + id, err := mailStore.Save(context.Background(), raw, pm.Date, nil) if err != nil { logger.Error("save failed", "path", path, "err", err) errors++ diff --git a/cmd/archivmail/cmd_import.go b/cmd/archivmail/cmd_import.go index e3c1f30..bfce09e 100644 --- a/cmd/archivmail/cmd_import.go +++ b/cmd/archivmail/cmd_import.go @@ -1,6 +1,7 @@ package main import ( + "context" "encoding/json" "flag" "fmt" @@ -220,7 +221,7 @@ func importMessage(mailStore *storage.Store, idx index.Indexer, raw []byte, dryR return "imported" } - id, err := mailStore.Save(raw, pm.Date) + id, err := mailStore.Save(context.Background(), raw, pm.Date, nil) if err != nil { fmt.Fprintf(os.Stderr, "warning: save failed: %v\n", err) return "error" diff --git a/cmd/archivmail/cmd_migrate_tenants.go b/cmd/archivmail/cmd_migrate_tenants.go new file mode 100644 index 0000000..09a7ddc --- /dev/null +++ b/cmd/archivmail/cmd_migrate_tenants.go @@ -0,0 +1,135 @@ +package main + +import ( + "context" + "flag" + "fmt" + "os" + + "github.com/jackc/pgx/v5/pgxpool" + + "github.com/archivmail/config" + "github.com/archivmail/internal/userstore" +) + +// runMigrateTenants performs a one-time migration to set up the default tenant, +// assign all existing users/emails/audit entries to it, and create a superadmin user. +// +// Usage: archivmail migrate-tenants [-config /etc/archivmail/config.yml] +func runMigrateTenants(args []string) { + fs := flag.NewFlagSet("migrate-tenants", flag.ExitOnError) + configPath := fs.String("config", "/etc/archivmail/config.yml", "path to config file") + fs.Parse(args) //nolint + + cfg, err := config.Load(*configPath) + if err != nil { + fmt.Fprintf(os.Stderr, "migrate-tenants: load config: %v\n", err) + os.Exit(1) + } + + ctx := context.Background() + pool, err := pgxpool.New(ctx, cfg.Database.DSN()) + if err != nil { + fmt.Fprintf(os.Stderr, "migrate-tenants: db connect: %v\n", err) + os.Exit(1) + } + defer pool.Close() + + // Check if migration has already run + var tenantCount int + _ = pool.QueryRow(ctx, `SELECT COUNT(*) FROM tenants`).Scan(&tenantCount) + if tenantCount > 0 { + fmt.Println("migrate-tenants: tenants table already populated — skipping migration") + os.Exit(0) + } + + // 1. Create default tenant + var defaultTenantID int64 + err = pool.QueryRow(ctx, + `INSERT INTO tenants (name, slug, active) VALUES ('default', 'default', true) RETURNING id`, + ).Scan(&defaultTenantID) + if err != nil { + fmt.Fprintf(os.Stderr, "migrate-tenants: create default tenant: %v\n", err) + os.Exit(1) + } + fmt.Printf("migrate-tenants: created default tenant with id=%d\n", defaultTenantID) + + // 2. Assign all users without tenant to the default tenant + tag, err := pool.Exec(ctx, + `UPDATE users SET tenant_id = $1 WHERE tenant_id IS NULL`, defaultTenantID) + if err != nil { + fmt.Fprintf(os.Stderr, "migrate-tenants: update users: %v\n", err) + os.Exit(1) + } + fmt.Printf("migrate-tenants: assigned %d users to default tenant\n", tag.RowsAffected()) + + // 3. Assign all emails without tenant to the default tenant + tag, err = pool.Exec(ctx, + `UPDATE emails SET tenant_id = $1 WHERE tenant_id IS NULL`, defaultTenantID) + if err != nil { + fmt.Fprintf(os.Stderr, "migrate-tenants: update emails: %v\n", err) + os.Exit(1) + } + fmt.Printf("migrate-tenants: assigned %d emails to default tenant\n", tag.RowsAffected()) + + // 4. Seed email_refs for existing emails (all assigned to default tenant) + _, err = pool.Exec(ctx, ` + INSERT INTO email_refs (email_id, tenant_id) + SELECT id, $1 FROM emails WHERE tenant_id = $1 + ON CONFLICT (email_id, tenant_id) DO NOTHING + `, defaultTenantID) + if err != nil { + fmt.Fprintf(os.Stderr, "migrate-tenants: seed email_refs: %v\n", err) + os.Exit(1) + } + + // 5. Assign audit_log entries to default tenant (best-effort — column may not exist yet) + _, _ = pool.Exec(ctx, + `UPDATE audit_log SET tenant_id = $1 WHERE tenant_id IS NULL`, defaultTenantID) + + // 6. Promote existing 'admin' users to 'domain_admin' + tag, err = pool.Exec(ctx, + `UPDATE users SET role = $1 WHERE role = 'admin'`, userstore.RoleDomainAdmin) + if err != nil { + fmt.Fprintf(os.Stderr, "migrate-tenants: promote admins: %v\n", err) + os.Exit(1) + } + fmt.Printf("migrate-tenants: promoted %d admin(s) to domain_admin\n", tag.RowsAffected()) + + // 7. Create superadmin user (tenant_id = NULL = global access) + superPw, err := randomPassword() + if err != nil { + fmt.Fprintf(os.Stderr, "migrate-tenants: generate password: %v\n", err) + os.Exit(1) + } + + // Use userstore so the password is properly bcrypt-hashed + users, err := userstore.New(cfg.Database.DSN()) + if err != nil { + fmt.Fprintf(os.Stderr, "migrate-tenants: userstore init: %v\n", err) + os.Exit(1) + } + defer users.Close() + + _, err = users.Create(userstore.CreateUserRequest{ + Username: "superadmin", + Email: "superadmin@archivmail.local", + Password: superPw, + Role: userstore.RoleSuperAdmin, + TenantID: nil, // global — no tenant restriction + }) + if err != nil { + fmt.Fprintf(os.Stderr, "migrate-tenants: create superadmin: %v\n", err) + os.Exit(1) + } + + fmt.Println() + fmt.Println("╔══════════════════════════════════════════════════════════════════╗") + fmt.Println("║ ARCHIVMAIL — TENANT MIGRATION ABGESCHLOSSEN ║") + fmt.Println("║ Superadmin-Zugangsdaten (NUR EINMAL ANGEZEIGT): ║") + fmt.Printf( "║ superadmin : %-52s ║\n", superPw) + fmt.Println("║ Passwort sofort nach dem ersten Login ändern! ║") + fmt.Println("╚══════════════════════════════════════════════════════════════════╝") + fmt.Println() + fmt.Printf("migrate-tenants: done — default tenant id=%d\n", defaultTenantID) +} diff --git a/cmd/archivmail/main.go b/cmd/archivmail/main.go index 79e5e39..b4ccc1e 100644 --- a/cmd/archivmail/main.go +++ b/cmd/archivmail/main.go @@ -41,6 +41,9 @@ func main() { case "export": runExport(os.Args[2:]) return + case "migrate-tenants": + runMigrateTenants(os.Args[2:]) + return case "version": fmt.Printf("archivmail %s\n", version) return @@ -151,6 +154,15 @@ func main() { Handler: srv, } + // Tenant store (Multi-Tenancy Phase 1+2) — must be initialised before SMTP daemon + tenantSt, err := tenantstore.New(cfg.Database.DSN()) + if err != nil { + logger.Error("tenant store init failed", "err", err) + os.Exit(1) + } + defer tenantSt.Close() + srv.SetTenants(tenantSt) + // Start SMTP daemon with index worker integration if cfg.SMTP.Bind == "" { cfg.SMTP.Bind = fmt.Sprintf(":%d", cfg.Server.SMTPPort) @@ -159,6 +171,22 @@ func main() { smtpDaemon.SetIndexCallback(func(raw []byte, id string) { submitToWorker(worker, mailStore, raw, id, logger) }) + // Wire tenant routing into SMTP daemon + if cfg.SMTP.TenantRouting == "domain" { + var defaultTenantID *int64 + if cfg.SMTP.DefaultTenantID != 0 { + id := cfg.SMTP.DefaultTenantID + defaultTenantID = &id + } + smtpDaemon.SetDomainToTenant(func(ctx context.Context, domain string) (*int64, error) { + t, err := tenantSt.GetByDomain(ctx, domain) + if err != nil || t == nil { + return nil, err + } + id := t.ID + return &id, nil + }, defaultTenantID) + } if err := smtpDaemon.Start(); err != nil { logger.Error("SMTP daemon failed to start", "err", err) os.Exit(1) @@ -184,15 +212,6 @@ func main() { defer imapSched.Stop() srv.SetImap(imapSt, imapImp, imapSched) - // Tenant store (Multi-Tenancy Phase 1) - tenantSt, err := tenantstore.New(cfg.Database.DSN()) - if err != nil { - logger.Error("tenant store init failed", "err", err) - os.Exit(1) - } - defer tenantSt.Close() - srv.SetTenants(tenantSt) - // POP3 store + importer pop3St, err := pop3store.New(cfg.Database.DSN(), cfg.API.Secret) if err != nil { diff --git a/config/config.go b/config/config.go index f512f4e..c2c022a 100644 --- a/config/config.go +++ b/config/config.go @@ -57,13 +57,15 @@ func (d DatabaseConfig) DSN() string { // SMTPConfig holds settings for the embedded SMTP server. type SMTPConfig struct { - Enabled bool `yaml:"enabled"` - Bind string `yaml:"bind"` - Domain string `yaml:"domain"` - TLSCert string `yaml:"tls_cert"` - TLSKey string `yaml:"tls_key"` - MaxSizeMB int `yaml:"max_size_mb"` - AllowedIPs []string `yaml:"allowed_ips"` + Enabled bool `yaml:"enabled"` + Bind string `yaml:"bind"` + Domain string `yaml:"domain"` + TLSCert string `yaml:"tls_cert"` + TLSKey string `yaml:"tls_key"` + MaxSizeMB int `yaml:"max_size_mb"` + AllowedIPs []string `yaml:"allowed_ips"` + TenantRouting string `yaml:"tenant_routing"` // "domain" or "default" + DefaultTenantID int64 `yaml:"default_tenant_id"` // used when routing is "default" or domain lookup fails } // IndexConfig holds full-text index settings. diff --git a/features/INDEX.md b/features/INDEX.md index c8e65d0..8490a0b 100644 --- a/features/INDEX.md +++ b/features/INDEX.md @@ -13,7 +13,7 @@ | ID | Feature | Status | Spec | Created | |----|---------|--------|------|---------| | PROJ-1 | Nutzer-Authentifizierung & Rollen (User/Admin) | Deployed | [PROJ-1](PROJ-1-authentifizierung-und-rollen.md) | 2026-03-12 | -| PROJ-2 | E-Mail-Import: EML/MBOX Upload | In Progress | [PROJ-2](PROJ-2-import-eml-mbox.md) | 2026-03-12 | +| PROJ-2 | E-Mail-Import: EML/MBOX Upload | Deployed | [PROJ-2](PROJ-2-import-eml-mbox.md) | 2026-03-12 | | PROJ-3 | E-Mail-Import: IMAP-Verbindung | Deployed | [PROJ-3](PROJ-3-import-imap.md) | 2026-03-12 | | PROJ-4 | E-Mail-Import: SMTP-Eingang via BCC (primär) | Deployed | [PROJ-4](PROJ-4-import-smtp.md) | 2026-03-12 | | PROJ-5 | E-Mail-Speicherung & Volltext-Indexierung | Deployed | [PROJ-5](PROJ-5-speicherung-und-indexierung.md) | 2026-03-12 | diff --git a/features/PROJ-21-multi-tenancy.md b/features/PROJ-21-multi-tenancy.md index 7b06170..d9bfcb5 100644 --- a/features/PROJ-21-multi-tenancy.md +++ b/features/PROJ-21-multi-tenancy.md @@ -13,7 +13,33 @@ created: 2026-03-17 - `src/lib/api.ts` — `Tenant`, `TenantDomain`, alle 7 API-Funktionen - `src/app/admin/page.tsx` — Mandanten-Tab mit Tabelle, Domain-Dialog, Löschen-Bestätigung -**Offene Phasen:** Phase 2 (userstore/storage tenant-aware), Phase 3 (Middleware), Phase 4 (Xapian-Index), Phase 5 (SMTP-Routing), Phase 8 (Migration) +## Phase 2, 3, 5, 8 implementiert (2026-03-17) + +**Phase 2a — Rollen & userstore:** +- `internal/userstore/userstore.go` — Neue Rollen `domain_admin`, `superadmin`; `User.TenantID *int64`; `Create()` mit TenantID; `ListByTenant()`; `UpsertLDAPUser()` mit tenantID-Parameter; Scan-Helpers aktualisiert + +**Phase 2b — storage.Save() + email_refs:** +- `internal/storage/storage.go` — `Save()` neue Signatur `(ctx, raw, time, tenantID *int64)`; DB-Migrationen: `emails.tenant_id`, Tabelle `email_refs`; neue Methoden `GetTenantForMail()`, `GetAllIDsByTenant()`, `StatsByTenant()`; `insertMeta`/`insertMetaMinimal` mit tenantID + +**Phase 2c — JWT tenant-aware:** +- `internal/auth/auth.go` — `Session.TenantID *int64`; JWT-Claims `tenant_id`; `ValidateToken()` extrahiert tenant_id; `HasRole()` Hierarchie: superadmin > admin > domain_admin > auditor > user + +**Phase 3 — Tenant-Middleware:** +- `internal/api/server.go` — `tenantMiddleware()`, `tenantFromCtx()`; `auth()` + `authAdmin()` Helper; `handleListUsers` tenant-gefiltert; `handleStorageStats` via `StatsByTenant()`; `handleGetMail/Attachment/Raw` mit Tenant-Isolation + +**Phase 5 — SMTP Domain-Routing:** +- `internal/smtpd/smtpd.go` — `DomainToTenantFunc`; `Daemon.domainToTenant` + `defaultTenantID`; `SetDomainToTenant()`; `resolveTenantFromRcpts()` +- `config/config.go` — `SMTPConfig.TenantRouting`, `SMTPConfig.DefaultTenantID` +- `cmd/archivmail/main.go` — Tenant-Routing-Verdrahtung; `tenantSt` vor smtpDaemon initialisiert + +**Phase 8 — Migrations-Script:** +- `cmd/archivmail/cmd_migrate_tenants.go` — Subkommando `migrate-tenants`: default-Tenant anlegen, Users/Emails/Audit assignen, email_refs seeden, admins → domain_admin, superadmin erstellen + +**Alle Save()-Aufrufer angepasst:** +- `internal/imap/importer.go`, `internal/pop3/importer.go` — `TenantID *int64` Feld + `Save(ctx, ..., tenantID)` +- `cmd/archivmail/cmd_import.go`, `cmd/archivmail-import/main.go` — `Save(ctx, ..., nil)` + +**Offene Phasen:** Phase 4 (Xapian per-tenant Index-Filter) ## Ziel diff --git a/internal/api/server.go b/internal/api/server.go index 210b78d..1826c8f 100644 --- a/internal/api/server.go +++ b/internal/api/server.go @@ -33,7 +33,10 @@ import ( type contextKey string -const sessionKey contextKey = "session" +const ( + sessionKey contextKey = "session" + tenantKey contextKey = "tenant_id" +) // Server is the archivmail HTTP API server. type Server struct { @@ -98,58 +101,68 @@ func New( return s } +// auth wraps a handler with authentication + tenant context propagation. +func (s *Server) auth(h http.HandlerFunc) http.HandlerFunc { + return s.authMiddleware(s.tenantMiddleware(h)) +} + +// authAdmin wraps a handler requiring at least admin role. +func (s *Server) authAdmin(h http.HandlerFunc) http.HandlerFunc { + return s.authMiddleware(s.tenantMiddleware(s.requireRole(userstore.RoleDomainAdmin, h))) +} + func (s *Server) routes() { s.mux.HandleFunc("GET /api/health", s.handleHealth) s.mux.HandleFunc("POST /api/auth/login", s.handleLogin) - s.mux.HandleFunc("GET /api/auth/me", s.authMiddleware(s.handleMe)) - s.mux.HandleFunc("POST /api/auth/logout", s.authMiddleware(s.handleLogout)) - s.mux.HandleFunc("GET /api/users", s.authMiddleware(s.requireRole(userstore.RoleAdmin, s.handleListUsers))) - s.mux.HandleFunc("POST /api/users", s.authMiddleware(s.requireRole(userstore.RoleAdmin, s.handleCreateUser))) - s.mux.HandleFunc("PATCH /api/users/{id}", s.authMiddleware(s.requireRole(userstore.RoleAdmin, s.handleUpdateUser))) - s.mux.HandleFunc("DELETE /api/users/{id}", s.authMiddleware(s.requireRole(userstore.RoleAdmin, s.handleDeleteUser))) - s.mux.HandleFunc("GET /api/search", s.authMiddleware(s.handleSearch)) - s.mux.HandleFunc("GET /api/audit", s.authMiddleware(s.requireRole(userstore.RoleAuditor, s.handleAuditLog))) - s.mux.HandleFunc("GET /api/admin/smtp/status", s.authMiddleware(s.requireRole(userstore.RoleAdmin, s.handleSMTPStatus))) - s.mux.HandleFunc("GET /api/admin/storage/stats", s.authMiddleware(s.requireRole(userstore.RoleAdmin, s.handleStorageStats))) - s.mux.HandleFunc("GET /api/mails/{id}", s.authMiddleware(s.requireMailAccess(s.handleGetMail))) - s.mux.HandleFunc("GET /api/mails/{id}/attachments/{index}", s.authMiddleware(s.requireMailAccess(s.handleGetAttachment))) - s.mux.HandleFunc("GET /api/mails/{id}/raw", s.authMiddleware(s.requireMailAccess(s.handleGetRaw))) - s.mux.HandleFunc("GET /api/admin/services", s.authMiddleware(s.requireRole(userstore.RoleAdmin, s.handleListServices))) - s.mux.HandleFunc("POST /api/admin/services/{name}/action", s.authMiddleware(s.requireRole(userstore.RoleAdmin, s.handleServiceAction))) + s.mux.HandleFunc("GET /api/auth/me", s.auth(s.handleMe)) + s.mux.HandleFunc("POST /api/auth/logout", s.auth(s.handleLogout)) + s.mux.HandleFunc("GET /api/users", s.authAdmin(s.handleListUsers)) + s.mux.HandleFunc("POST /api/users", s.authAdmin(s.handleCreateUser)) + s.mux.HandleFunc("PATCH /api/users/{id}", s.authAdmin(s.handleUpdateUser)) + s.mux.HandleFunc("DELETE /api/users/{id}", s.authAdmin(s.handleDeleteUser)) + s.mux.HandleFunc("GET /api/search", s.auth(s.handleSearch)) + s.mux.HandleFunc("GET /api/audit", s.auth(s.requireRole(userstore.RoleAuditor, s.handleAuditLog))) + s.mux.HandleFunc("GET /api/admin/smtp/status", s.authAdmin(s.handleSMTPStatus)) + s.mux.HandleFunc("GET /api/admin/storage/stats", s.authAdmin(s.handleStorageStats)) + s.mux.HandleFunc("GET /api/mails/{id}", s.auth(s.requireMailAccess(s.handleGetMail))) + s.mux.HandleFunc("GET /api/mails/{id}/attachments/{index}", s.auth(s.requireMailAccess(s.handleGetAttachment))) + s.mux.HandleFunc("GET /api/mails/{id}/raw", s.auth(s.requireMailAccess(s.handleGetRaw))) + s.mux.HandleFunc("GET /api/admin/services", s.authAdmin(s.handleListServices)) + s.mux.HandleFunc("POST /api/admin/services/{name}/action", s.authAdmin(s.handleServiceAction)) - s.mux.HandleFunc("GET /api/admin/system/stats", s.authMiddleware(s.requireRole(userstore.RoleAdmin, s.handleSystemStats))) - s.mux.HandleFunc("GET /api/admin/security/audit", s.authMiddleware(s.requireRole(userstore.RoleAdmin, s.handleSecurityAudit))) - s.mux.HandleFunc("POST /api/admin/security/fix", s.authMiddleware(s.requireRole(userstore.RoleAdmin, s.handleSecurityFix))) + s.mux.HandleFunc("GET /api/admin/system/stats", s.authAdmin(s.handleSystemStats)) + s.mux.HandleFunc("GET /api/admin/security/audit", s.authAdmin(s.handleSecurityAudit)) + s.mux.HandleFunc("POST /api/admin/security/fix", s.authAdmin(s.handleSecurityFix)) // Export routes - s.mux.HandleFunc("GET /api/export/pdf/{id}", s.authMiddleware(s.requireMailAccess(s.handleExportPDF))) - s.mux.HandleFunc("POST /api/export/zip", s.authMiddleware(s.requireMailAccess(s.handleExportZIP))) + s.mux.HandleFunc("GET /api/export/pdf/{id}", s.auth(s.requireMailAccess(s.handleExportPDF))) + s.mux.HandleFunc("POST /api/export/zip", s.auth(s.requireMailAccess(s.handleExportZIP))) // Upload routes (admin only) - s.mux.HandleFunc("POST /api/admin/upload", s.authMiddleware(s.requireRole(userstore.RoleAdmin, s.handleUpload))) - s.mux.HandleFunc("GET /api/admin/upload/{jobID}/progress", s.authMiddleware(s.requireRole(userstore.RoleAdmin, s.handleUploadProgress))) + s.mux.HandleFunc("POST /api/admin/upload", s.authAdmin(s.handleUpload)) + s.mux.HandleFunc("GET /api/admin/upload/{jobID}/progress", s.authAdmin(s.handleUploadProgress)) // Upload routes (all authenticated users) - s.mux.HandleFunc("POST /api/upload", s.authMiddleware(s.handleUpload)) - s.mux.HandleFunc("GET /api/upload/{jobID}/progress", s.authMiddleware(s.handleUploadProgress)) + s.mux.HandleFunc("POST /api/upload", s.auth(s.handleUpload)) + s.mux.HandleFunc("GET /api/upload/{jobID}/progress", s.auth(s.handleUploadProgress)) // IMAP routes (accessible to all authenticated users) - s.mux.HandleFunc("GET /api/imap", s.authMiddleware(s.handleListImap)) - s.mux.HandleFunc("POST /api/imap", s.authMiddleware(s.handleCreateImap)) - s.mux.HandleFunc("DELETE /api/imap/{id}", s.authMiddleware(s.handleDeleteImap)) - s.mux.HandleFunc("PATCH /api/imap/{id}", s.authMiddleware(s.handleUpdateImapInterval)) - s.mux.HandleFunc("POST /api/imap/test", s.authMiddleware(s.handleTestImap)) - s.mux.HandleFunc("POST /api/imap/{id}/import", s.authMiddleware(s.handleStartImport)) - s.mux.HandleFunc("GET /api/imap/{id}/progress", s.authMiddleware(s.handleImapProgress)) - s.mux.HandleFunc("POST /api/imap/{id}/sync", s.authMiddleware(s.handleSyncNow)) + s.mux.HandleFunc("GET /api/imap", s.auth(s.handleListImap)) + s.mux.HandleFunc("POST /api/imap", s.auth(s.handleCreateImap)) + s.mux.HandleFunc("DELETE /api/imap/{id}", s.auth(s.handleDeleteImap)) + s.mux.HandleFunc("PATCH /api/imap/{id}", s.auth(s.handleUpdateImapInterval)) + s.mux.HandleFunc("POST /api/imap/test", s.auth(s.handleTestImap)) + s.mux.HandleFunc("POST /api/imap/{id}/import", s.auth(s.handleStartImport)) + s.mux.HandleFunc("GET /api/imap/{id}/progress", s.auth(s.handleImapProgress)) + s.mux.HandleFunc("POST /api/imap/{id}/sync", s.auth(s.handleSyncNow)) // POP3 routes (accessible to all authenticated users) - s.mux.HandleFunc("GET /api/pop3", s.authMiddleware(s.handleListPop3)) - s.mux.HandleFunc("POST /api/pop3", s.authMiddleware(s.handleCreatePop3)) - s.mux.HandleFunc("DELETE /api/pop3/{id}", s.authMiddleware(s.handleDeletePop3)) - s.mux.HandleFunc("POST /api/pop3/test", s.authMiddleware(s.handleTestPop3)) - s.mux.HandleFunc("POST /api/pop3/{id}/import", s.authMiddleware(s.handleStartPop3Import)) - s.mux.HandleFunc("GET /api/pop3/{id}/progress", s.authMiddleware(s.handlePop3Progress)) + s.mux.HandleFunc("GET /api/pop3", s.auth(s.handleListPop3)) + s.mux.HandleFunc("POST /api/pop3", s.auth(s.handleCreatePop3)) + s.mux.HandleFunc("DELETE /api/pop3/{id}", s.auth(s.handleDeletePop3)) + s.mux.HandleFunc("POST /api/pop3/test", s.auth(s.handleTestPop3)) + s.mux.HandleFunc("POST /api/pop3/{id}/import", s.auth(s.handleStartPop3Import)) + s.mux.HandleFunc("GET /api/pop3/{id}/progress", s.auth(s.handlePop3Progress)) } // ServeHTTP implements http.Handler. @@ -286,7 +299,17 @@ func (s *Server) handleLogout(w http.ResponseWriter, r *http.Request) { } func (s *Server) handleListUsers(w http.ResponseWriter, r *http.Request) { - users, err := s.users.List("") + tenantID := tenantFromCtx(r.Context()) + + var ( + users []*userstore.User + err error + ) + if tenantID != nil { + users, err = s.users.ListByTenant(r.Context(), *tenantID) + } else { + users, err = s.users.List("") + } if err != nil { writeError(w, http.StatusInternalServerError, "failed to list users") return @@ -298,6 +321,7 @@ func (s *Server) handleListUsers(w http.ResponseWriter, r *http.Request) { Email string `json:"email"` Role string `json:"role"` Active bool `json:"active"` + TenantID *int64 `json:"tenant_id,omitempty"` } resp := make([]userResp, 0, len(users)) @@ -308,6 +332,7 @@ func (s *Server) handleListUsers(w http.ResponseWriter, r *http.Request) { Email: u.Email, Role: u.Role, Active: u.Active, + TenantID: u.TenantID, }) } writeJSON(w, http.StatusOK, resp) @@ -572,14 +597,15 @@ func (s *Server) handleSMTPStatus(w http.ResponseWriter, r *http.Request) { } func (s *Server) handleStorageStats(w http.ResponseWriter, r *http.Request) { - stats, err := s.store.Stats() + tenantID := tenantFromCtx(r.Context()) + stats, err := s.store.StatsByTenant(r.Context(), tenantID) if err != nil { writeError(w, http.StatusInternalServerError, "failed to read storage stats") return } writeJSON(w, http.StatusOK, map[string]interface{}{ - "total_mails": stats.TotalMails, - "total_bytes": stats.TotalBytes, + "total_mails": stats["count"], + "total_bytes": stats["total_size"], }) } @@ -684,6 +710,26 @@ func sessionFromCtx(ctx context.Context) *auth.Session { return &auth.Session{} } +// tenantMiddleware extracts the tenant_id from the session and stores it in +// the request context, making it available to all downstream handlers. +func (s *Server) tenantMiddleware(next http.HandlerFunc) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + session := sessionFromCtx(r.Context()) + if session != nil && session.TenantID != nil { + ctx := context.WithValue(r.Context(), tenantKey, session.TenantID) + next(w, r.WithContext(ctx)) + return + } + next(w, r) + } +} + +// tenantFromCtx extracts the tenant_id from context. Returns nil for global (superadmin) context. +func tenantFromCtx(ctx context.Context) *int64 { + v, _ := ctx.Value(tenantKey).(*int64) + return v +} + func remoteIP(r *http.Request) string { if fwd := r.Header.Get("X-Forwarded-For"); fwd != "" { return strings.Split(fwd, ",")[0] @@ -722,8 +768,18 @@ func (s *Server) handleGetMail(w http.ResponseWriter, r *http.Request) { return } - // user role: only own mailbox sess := sessionFromCtx(r.Context()) + + // Tenant isolation: domain_admin sees only own tenant's mail + if sess.TenantID != nil { + mailTenant, _ := s.store.GetTenantForMail(r.Context(), id) + if mailTenant == nil || *mailTenant != *sess.TenantID { + writeError(w, http.StatusForbidden, "access denied") + return + } + } + + // user role: only own mailbox if sess.Role == userstore.RoleUser { u, err := s.users.GetByUsername(sess.Username) if err != nil || !mailBelongsToUser(pm, u.Email) { @@ -803,6 +859,16 @@ func (s *Server) handleGetAttachment(w http.ResponseWriter, r *http.Request) { } sess := sessionFromCtx(r.Context()) + + // Tenant isolation + if sess.TenantID != nil { + mailTenant, _ := s.store.GetTenantForMail(r.Context(), id) + if mailTenant == nil || *mailTenant != *sess.TenantID { + writeError(w, http.StatusForbidden, "access denied") + return + } + } + if sess.Role == userstore.RoleUser { u, err := s.users.GetByUsername(sess.Username) if err != nil || !mailBelongsToUser(pm, u.Email) { @@ -838,8 +904,18 @@ func (s *Server) handleGetRaw(w http.ResponseWriter, r *http.Request) { return } - // Access check for user role sess := sessionFromCtx(r.Context()) + + // Tenant isolation + if sess.TenantID != nil { + mailTenant, _ := s.store.GetTenantForMail(r.Context(), id) + if mailTenant == nil || *mailTenant != *sess.TenantID { + writeError(w, http.StatusForbidden, "access denied") + return + } + } + + // Access check for user role if sess.Role == userstore.RoleUser { pm, err := mailparser.Parse(raw) if err == nil { diff --git a/internal/api/upload.go b/internal/api/upload.go index ccd9ac2..ed3eb63 100644 --- a/internal/api/upload.go +++ b/internal/api/upload.go @@ -108,8 +108,11 @@ func (s *Server) handleUpload(w http.ResponseWriter, r *http.Request) { } s.uploadJobs.Store(jobID, job) + // Propagate tenant from session context + tenantID := tenantFromCtx(r.Context()) + // Run import in background - go s.runUploadJob(job, allMessages) + go s.runUploadJob(job, allMessages, tenantID) writeJSON(w, http.StatusAccepted, map[string]string{"job_id": jobID}) } @@ -126,11 +129,11 @@ func (s *Server) handleUploadProgress(w http.ResponseWriter, r *http.Request) { writeJSON(w, http.StatusOK, job.snapshot()) } -func (s *Server) runUploadJob(job *UploadJob, messages [][]byte) { +func (s *Server) runUploadJob(job *UploadJob, messages [][]byte, tenantID *int64) { ctx := context.Background() for _, raw := range messages { - result := s.importRawMessage(ctx, raw) + result := s.importRawMessage(ctx, raw, tenantID) job.mu.Lock() switch result { case "imported": @@ -150,14 +153,14 @@ func (s *Server) runUploadJob(job *UploadJob, messages [][]byte) { // importRawMessage stores and indexes a single raw message. // Returns "imported", "skipped", or "error". -func (s *Server) importRawMessage(ctx context.Context, raw []byte) string { +func (s *Server) importRawMessage(ctx context.Context, raw []byte, tenantID *int64) string { pm, err := mailparser.Parse(raw) if err != nil { s.logger.Warn("upload: parse failed", "err", err) return "error" } - id, err := s.store.Save(raw, pm.Date) + id, err := s.store.Save(ctx, raw, pm.Date, tenantID) if err != nil { s.logger.Warn("upload: save failed", "err", err) return "error" diff --git a/internal/auth/auth.go b/internal/auth/auth.go index 8a4f36c..e4c90e8 100644 --- a/internal/auth/auth.go +++ b/internal/auth/auth.go @@ -16,10 +16,12 @@ import ( // Session holds the claims extracted from a validated JWT. type Session struct { - UserID int64 - Username string - Role string - JTI string // unique JWT ID + UserID int64 + Username string + Role string + JTI string // unique JWT ID + TenantID *int64 + TenantSlug string } // Manager handles login, token issuance, validation, and logout. @@ -83,7 +85,7 @@ func (m *Manager) Login(username, password string) (string, *userstore.User, err email = username + "@ldap.local" } - ldapUser, upsertErr := m.store.UpsertLDAPUser(username, email, role) + ldapUser, upsertErr := m.store.UpsertLDAPUser(username, email, role, nil) if upsertErr == nil { return m.issueToken(ldapUser) } @@ -98,13 +100,20 @@ func (m *Manager) Login(username, password string) (string, *userstore.User, err func (m *Manager) issueToken(user *userstore.User) (string, *userstore.User, error) { jti := generateJTI() now := time.Now() + + var tenantIDVal int64 + if user.TenantID != nil { + tenantIDVal = *user.TenantID + } + claims := jwt.MapClaims{ - "sub": user.Username, - "role": user.Role, - "uid": user.ID, - "jti": jti, - "iat": now.Unix(), - "exp": now.Add(8 * time.Hour).Unix(), + "sub": user.Username, + "role": user.Role, + "uid": user.ID, + "jti": jti, + "iat": now.Unix(), + "exp": now.Add(8 * time.Hour).Unix(), + "tenant_id": tenantIDVal, } token := jwt.NewWithClaims(jwt.SigningMethodHS256, claims) @@ -156,11 +165,25 @@ func (m *Manager) ValidateToken(tokenStr string) (*Session, error) { userID = v } + var tenantID *int64 + switch v := claims["tenant_id"].(type) { + case float64: + if v != 0 { + id := int64(v) + tenantID = &id + } + case int64: + if v != 0 { + tenantID = &v + } + } + return &Session{ UserID: userID, Username: username, Role: role, JTI: jti, + TenantID: tenantID, }, nil } @@ -196,12 +219,14 @@ func (m *Manager) Logout(tokenStr string) error { } // HasRole returns true when userRole satisfies the required role level. -// Hierarchy: admin > auditor > user +// Hierarchy: superadmin > admin > domain_admin > auditor > user func HasRole(userRole, required string) bool { levels := map[string]int{ - userstore.RoleUser: 1, - userstore.RoleAuditor: 2, - userstore.RoleAdmin: 3, + userstore.RoleUser: 1, + userstore.RoleAuditor: 2, + userstore.RoleDomainAdmin: 3, + userstore.RoleAdmin: 4, + userstore.RoleSuperAdmin: 5, } return levels[userRole] >= levels[required] } diff --git a/internal/imap/importer.go b/internal/imap/importer.go index f0f9d96..3cf74d3 100644 --- a/internal/imap/importer.go +++ b/internal/imap/importer.go @@ -23,6 +23,7 @@ type Importer struct { mailStore *storage.Store idx index.Indexer logger *slog.Logger + TenantID *int64 // optional tenant assignment for stored mails } // NewImporter creates a new Importer wired to the storage and index backends. @@ -229,8 +230,9 @@ func (imp *Importer) fetchBatch(ctx context.Context, c *imapclient.Client, uids // storeAndIndex saves a raw email to storage and indexes it. func (imp *Importer) storeAndIndex(raw []byte, log *slog.Logger) error { + ctx := context.Background() // Save to file storage (deduplicates by SHA256 automatically) - id, err := imp.mailStore.Save(raw, time.Now()) + id, err := imp.mailStore.Save(ctx, raw, time.Now(), imp.TenantID) if err != nil { return fmt.Errorf("save: %w", err) } diff --git a/internal/pop3/importer.go b/internal/pop3/importer.go index 913a270..96af291 100644 --- a/internal/pop3/importer.go +++ b/internal/pop3/importer.go @@ -18,6 +18,7 @@ type Importer struct { mailStore *storage.Store idx index.Indexer logger *slog.Logger + TenantID *int64 // optional tenant assignment for stored mails } // NewImporter creates a new Importer wired to the storage and index backends. @@ -125,8 +126,9 @@ func (imp *Importer) doImport(ctx context.Context, acc *Account, password string // storeAndIndex saves a raw email to storage and indexes it. func (imp *Importer) storeAndIndex(raw []byte, log *slog.Logger) error { + ctx := context.Background() // Save to file storage (deduplicates by SHA256 automatically) - id, err := imp.mailStore.Save(raw, time.Now()) + id, err := imp.mailStore.Save(ctx, raw, time.Now(), imp.TenantID) if err != nil { return fmt.Errorf("pop3 save: %w", err) } diff --git a/internal/smtpd/smtpd.go b/internal/smtpd/smtpd.go index 81a7bf4..3a6c949 100644 --- a/internal/smtpd/smtpd.go +++ b/internal/smtpd/smtpd.go @@ -5,6 +5,7 @@ package smtpd import ( "bytes" + "context" "crypto/tls" "errors" "fmt" @@ -22,6 +23,10 @@ import ( "github.com/archivmail/internal/storage" ) +// DomainToTenantFunc resolves an e-mail domain to a tenant ID. +// Returns nil if no tenant matches the domain. +type DomainToTenantFunc func(ctx context.Context, domain string) (*int64, error) + // Stats holds runtime statistics for the SMTP daemon. type Stats struct { Received atomic.Int64 // total emails successfully stored @@ -35,14 +40,16 @@ type IndexCallback func(raw []byte, id string) // Daemon is the embedded receive-only SMTP server. type Daemon struct { - cfg config.SMTPConfig - store *storage.Store - logger *slog.Logger - stats Stats - server *smtp.Server - mu sync.Mutex - running bool - indexCallback IndexCallback + cfg config.SMTPConfig + store *storage.Store + logger *slog.Logger + stats Stats + server *smtp.Server + mu sync.Mutex + running bool + indexCallback IndexCallback + domainToTenant DomainToTenantFunc // optional domain→tenant routing + defaultTenantID *int64 // fallback tenant if no domain matches } // New creates a new SMTP Daemon. Call Start() to begin accepting connections. @@ -56,6 +63,39 @@ func New(cfg config.SMTPConfig, store *storage.Store, logger *slog.Logger) *Daem return d } +// SetDomainToTenant wires in the domain→tenant resolution function. +func (d *Daemon) SetDomainToTenant(fn DomainToTenantFunc, defaultTenantID *int64) { + d.domainToTenant = fn + d.defaultTenantID = defaultTenantID +} + +// resolveTenantFromRcpts extracts the domain from RCPT TO addresses and +// resolves it to a tenant ID via the configured DomainToTenantFunc. +func (d *Daemon) resolveTenantFromRcpts(rcpts []string) *int64 { + if d.domainToTenant == nil { + return d.defaultTenantID + } + ctx := context.Background() + for _, rcpt := range rcpts { + // Strip angle brackets if present + addr := strings.Trim(rcpt, "<>") + at := strings.LastIndex(addr, "@") + if at < 0 { + continue + } + domain := strings.ToLower(addr[at+1:]) + tenantID, err := d.domainToTenant(ctx, domain) + if err != nil { + d.logger.Warn("SMTP: tenant lookup failed", "domain", domain, "err", err) + continue + } + if tenantID != nil { + return tenantID + } + } + return d.defaultTenantID +} + // SetIndexCallback sets the function called after each successfully stored mail. func (d *Daemon) SetIndexCallback(cb IndexCallback) { d.indexCallback = cb @@ -232,7 +272,10 @@ func (s *session) Data(r io.Reader) error { } raw := buf.Bytes() - id, err := s.daemon.store.Save(raw, time.Now()) + // Determine tenant from RCPT TO domain routing + tenantID := s.daemon.resolveTenantFromRcpts(s.rcpts) + + id, err := s.daemon.store.Save(context.Background(), raw, time.Now(), tenantID) if err != nil { s.daemon.stats.Rejected.Add(1) s.daemon.logger.Error("SMTP: storage failed", "from", s.from, "err", err) diff --git a/internal/storage/storage.go b/internal/storage/storage.go index 4c6e6fb..ceea9bc 100644 --- a/internal/storage/storage.go +++ b/internal/storage/storage.go @@ -193,15 +193,33 @@ func (s *Store) initSchema(ctx context.Context) error { CREATE INDEX IF NOT EXISTS idx_emails_mail_from ON emails (mail_from); CREATE INDEX IF NOT EXISTS idx_emails_subject ON emails USING gin (to_tsvector('simple', subject)); `) + if err != nil { + return err + } + // Phase 2b migrations: tenant isolation + _, err = s.db.Exec(ctx, ` + ALTER TABLE emails ADD COLUMN IF NOT EXISTS tenant_id BIGINT; + CREATE INDEX IF NOT EXISTS idx_emails_tenant ON emails (tenant_id); + CREATE TABLE IF NOT EXISTS email_refs ( + id BIGSERIAL PRIMARY KEY, + email_id TEXT NOT NULL REFERENCES emails(id), + tenant_id BIGINT NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + UNIQUE(email_id, tenant_id) + ); + CREATE INDEX IF NOT EXISTS idx_email_refs_tenant ON email_refs (tenant_id); + CREATE INDEX IF NOT EXISTS idx_email_refs_email ON email_refs (email_id); + `) return err } // ── Core operations ─────────────────────────────────────────────────────── // Save writes raw email bytes to storage. The ID is the hex-encoded SHA256 of -// the plaintext content. If the file already exists, Save is a no-op (dedup). -// It also inserts metadata into the emails table if a DB is configured. -func (s *Store) Save(raw []byte, _ time.Time) (string, error) { +// the plaintext content. If the file already exists, Save ensures an email_ref +// exists for the tenant (cross-tenant dedup: one file, many refs). +// tenantID may be nil for system-level ingestion without tenant assignment. +func (s *Store) Save(ctx context.Context, raw []byte, _ time.Time, tenantID *int64) (string, error) { // Hash plaintext for dedup (always before encryption) sum := sha256.Sum256(raw) id := fmt.Sprintf("%x", sum[:]) // 64 hex chars @@ -211,36 +229,46 @@ func (s *Store) Save(raw []byte, _ time.Time) (string, error) { return "", fmt.Errorf("storage: mkdir shard: %w", err) } - // Dedup: if file already exists, return same id + fileExists := false if _, err := os.Stat(path); err == nil { - return id, nil + fileExists = true } - // Determine what to write: encrypted or plaintext - var toWrite []byte - if s.key != nil { - encrypted, err := s.encrypt(raw) - if err != nil { - return "", err - } - toWrite = encrypted - } else { - toWrite = raw - } - - if err := os.WriteFile(path, toWrite, 0o644); err != nil { - return "", fmt.Errorf("storage: write: %w", err) - } - - // Insert metadata into DB (best-effort parse) - if s.db != nil { - pm, parseErr := mailparser.Parse(raw) - if parseErr == nil { - s.insertMeta(context.Background(), id, pm, len(raw)) + if !fileExists { + // Determine what to write: encrypted or plaintext + var toWrite []byte + if s.key != nil { + encrypted, err := s.encrypt(raw) + if err != nil { + return "", err + } + toWrite = encrypted } else { - // Insert minimal metadata even if parse fails - s.insertMetaMinimal(context.Background(), id, len(raw)) + toWrite = raw } + + if err := os.WriteFile(path, toWrite, 0o644); err != nil { + return "", fmt.Errorf("storage: write: %w", err) + } + + // Insert metadata into DB (best-effort parse) + if s.db != nil { + pm, parseErr := mailparser.Parse(raw) + if parseErr == nil { + s.insertMeta(ctx, id, pm, len(raw), tenantID) + } else { + s.insertMetaMinimal(ctx, id, len(raw), tenantID) + } + } + } + + // Ensure email_ref entry for this tenant (even if file already existed) + if s.db != nil && tenantID != nil { + _, _ = s.db.Exec(ctx, ` + INSERT INTO email_refs (email_id, tenant_id) + VALUES ($1, $2) + ON CONFLICT (email_id, tenant_id) DO NOTHING + `, id, *tenantID) } return id, nil @@ -402,24 +430,24 @@ func (s *Store) firstAndLastFromFS() (first, last *MailRef, err error) { // ── Metadata helpers ────────────────────────────────────────────────────── // insertMeta inserts parsed email metadata into the emails table. -func (s *Store) insertMeta(ctx context.Context, id string, pm *mailparser.ParsedMail, size int) { +func (s *Store) insertMeta(ctx context.Context, id string, pm *mailparser.ParsedMail, size int, tenantID *int64) { mailTo := strings.Join(pm.To, ", ") hasAttach := len(pm.Attachments) > 0 _, _ = s.db.Exec(ctx, ` - INSERT INTO emails (id, received_at, mail_from, mail_to, subject, size_bytes, has_attach) - VALUES ($1, $2, $3, $4, $5, $6, $7) + INSERT INTO emails (id, received_at, mail_from, mail_to, subject, size_bytes, has_attach, tenant_id) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT (id) DO NOTHING - `, id, pm.Date, pm.From, mailTo, pm.Subject, int64(size), hasAttach) + `, id, pm.Date, pm.From, mailTo, pm.Subject, int64(size), hasAttach, tenantID) } // insertMetaMinimal inserts minimal metadata when parsing fails. -func (s *Store) insertMetaMinimal(ctx context.Context, id string, size int) { +func (s *Store) insertMetaMinimal(ctx context.Context, id string, size int, tenantID *int64) { _, _ = s.db.Exec(ctx, ` - INSERT INTO emails (id, received_at, size_bytes) - VALUES ($1, NOW(), $2) + INSERT INTO emails (id, received_at, size_bytes, tenant_id) + VALUES ($1, NOW(), $2, $3) ON CONFLICT (id) DO NOTHING - `, id, int64(size)) + `, id, int64(size), tenantID) } // SaveMeta upserts metadata for a given email ID. Used by the backfill process. @@ -602,6 +630,102 @@ func (s *Store) VerifyIntegrity(ctx context.Context, id string) (bool, error) { return ok, nil } +// GetTenantForMail returns the tenant_id stored directly on the email record. +// Returns nil if no tenant is assigned or the mail does not exist. +func (s *Store) GetTenantForMail(ctx context.Context, id string) (*int64, error) { + if s.db == nil { + return nil, nil + } + var tenantID *int64 + err := s.db.QueryRow(ctx, `SELECT tenant_id FROM emails WHERE id = $1`, id).Scan(&tenantID) + if errors.Is(err, pgx.ErrNoRows) { + return nil, nil + } + if err != nil { + return nil, fmt.Errorf("storage: get tenant for mail: %w", err) + } + return tenantID, nil +} + +// GetAllIDsByTenant returns all email IDs visible to a tenant. +// If tenantID is nil, all IDs are returned (superadmin / no-tenant context). +func (s *Store) GetAllIDsByTenant(ctx context.Context, tenantID *int64) ([]string, error) { + if s.db != nil { + var ( + rows pgx.Rows + err error + ) + if tenantID == nil { + rows, err = s.db.Query(ctx, `SELECT id FROM emails ORDER BY received_at`) + } else { + rows, err = s.db.Query(ctx, + `SELECT email_id FROM email_refs WHERE tenant_id = $1`, *tenantID) + } + if err != nil { + return nil, fmt.Errorf("storage: get ids by tenant: %w", err) + } + defer rows.Close() + var ids []string + for rows.Next() { + var id string + if err := rows.Scan(&id); err != nil { + continue + } + ids = append(ids, id) + } + return ids, rows.Err() + } + // fallback: walk store (no tenant filtering possible without DB) + var ids []string + err := s.WalkStore(ctx, func(id string) error { + ids = append(ids, id) + return nil + }) + return ids, err +} + +// StatsByTenant returns mail count and total size filtered by tenant. +// If tenantID is nil, aggregate over all emails. +func (s *Store) StatsByTenant(ctx context.Context, tenantID *int64) (map[string]interface{}, error) { + if s.db == nil { + st, err := s.statsFromFS() + if err != nil { + return nil, err + } + return map[string]interface{}{ + "count": st.TotalMails, + "total_size": st.TotalBytes, + }, nil + } + + var count int64 + var totalSize int64 + + if tenantID == nil { + err := s.db.QueryRow(ctx, + `SELECT COALESCE(COUNT(*),0), COALESCE(SUM(size_bytes),0) FROM emails`, + ).Scan(&count, &totalSize) + if err != nil { + return nil, fmt.Errorf("storage: stats by tenant: %w", err) + } + } else { + err := s.db.QueryRow(ctx, ` + SELECT COALESCE(COUNT(e.id),0), COALESCE(SUM(e.size_bytes),0) + FROM email_refs r + JOIN emails e ON e.id = r.email_id + WHERE r.tenant_id = $1 + `, *tenantID).Scan(&count, &totalSize) + if err != nil { + return nil, fmt.Errorf("storage: stats by tenant: %w", err) + } + } + + return map[string]interface{}{ + "count": count, + "total_size": totalSize, + }, nil +} + // GetAllIDs returns all email IDs from the DB, or walks the store if no DB. func (s *Store) GetAllIDs(ctx context.Context) ([]string, error) { if s.db != nil { diff --git a/internal/userstore/userstore.go b/internal/userstore/userstore.go index 5769497..32b2182 100644 --- a/internal/userstore/userstore.go +++ b/internal/userstore/userstore.go @@ -12,9 +12,11 @@ import ( ) const ( - RoleUser = "user" - RoleAdmin = "admin" - RoleAuditor = "auditor" + RoleUser = "user" + RoleAdmin = "admin" + RoleAuditor = "auditor" + RoleDomainAdmin = "domain_admin" + RoleSuperAdmin = "superadmin" bcryptCost = 12 ) @@ -28,6 +30,7 @@ type User struct { Source string // "local" or "ldap" Active bool CreatedAt time.Time + TenantID *int64 `json:"tenant_id,omitempty"` } // CreateUserRequest holds parameters for creating a new user. @@ -36,6 +39,7 @@ type CreateUserRequest struct { Email string Password string Role string + TenantID *int64 } // UpdateUserRequest holds optional fields for updating a user. @@ -75,13 +79,14 @@ func (s *Store) initSchema(ctx context.Context) error { username VARCHAR(100) UNIQUE NOT NULL, email VARCHAR(255) UNIQUE NOT NULL, password_hash VARCHAR(255) NOT NULL DEFAULT '', - role VARCHAR(20) NOT NULL CHECK (role IN ('user','auditor','admin')), + role VARCHAR(20) NOT NULL DEFAULT 'user', source VARCHAR(20) NOT NULL DEFAULT 'local', active BOOLEAN NOT NULL DEFAULT true, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), last_login_at TIMESTAMPTZ ); ALTER TABLE users ADD COLUMN IF NOT EXISTS last_login_at TIMESTAMPTZ; + ALTER TABLE users ADD COLUMN IF NOT EXISTS tenant_id BIGINT; CREATE TABLE IF NOT EXISTS token_blacklist ( jti VARCHAR(255) PRIMARY KEY, expires_at TIMESTAMPTZ NOT NULL @@ -92,6 +97,7 @@ func (s *Store) initSchema(ctx context.Context) error { attempted_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); CREATE INDEX IF NOT EXISTS idx_login_attempts_username_time ON login_attempts (username, attempted_at); + CREATE INDEX IF NOT EXISTS idx_users_tenant ON users (tenant_id); `) return err } @@ -112,10 +118,10 @@ func (s *Store) Create(req CreateUserRequest) (*User, error) { ctx := context.Background() var id int64 err = s.pool.QueryRow(ctx, - `INSERT INTO users (username, email, password_hash, role, source, active, created_at) - VALUES ($1, $2, $3, $4, 'local', true, NOW()) + `INSERT INTO users (username, email, password_hash, role, source, active, created_at, tenant_id) + VALUES ($1, $2, $3, $4, 'local', true, NOW(), $5) RETURNING id`, - req.Username, req.Email, string(hash), req.Role, + req.Username, req.Email, string(hash), req.Role, req.TenantID, ).Scan(&id) if err != nil { return nil, fmt.Errorf("userstore: create: %w", err) @@ -128,7 +134,7 @@ func (s *Store) Create(req CreateUserRequest) (*User, error) { func (s *Store) GetByID(id int64) (*User, error) { ctx := context.Background() row := s.pool.QueryRow(ctx, - `SELECT id, username, email, role, source, active, created_at FROM users WHERE id = $1`, id, + `SELECT id, username, email, role, source, active, created_at, tenant_id FROM users WHERE id = $1`, id, ) return scanUser(row) } @@ -137,7 +143,7 @@ func (s *Store) GetByID(id int64) (*User, error) { func (s *Store) GetByUsername(username string) (*User, error) { ctx := context.Background() row := s.pool.QueryRow(ctx, - `SELECT id, username, email, role, source, active, created_at FROM users WHERE username = $1`, username, + `SELECT id, username, email, role, source, active, created_at, tenant_id FROM users WHERE username = $1`, username, ) return scanUser(row) } @@ -147,13 +153,13 @@ func (s *Store) GetByUsername(username string) (*User, error) { func (s *Store) VerifyPassword(username, password string) (*User, error) { ctx := context.Background() row := s.pool.QueryRow(ctx, - `SELECT id, username, email, role, source, active, created_at, password_hash FROM users WHERE username = $1`, + `SELECT id, username, email, role, source, active, created_at, tenant_id, password_hash FROM users WHERE username = $1`, username, ) var u User var hash string - err := row.Scan(&u.ID, &u.Username, &u.Email, &u.Role, &u.Source, &u.Active, &u.CreatedAt, &hash) + err := row.Scan(&u.ID, &u.Username, &u.Email, &u.Role, &u.Source, &u.Active, &u.CreatedAt, &u.TenantID, &hash) if errors.Is(err, pgx.ErrNoRows) { return nil, errors.New("userstore: user not found") } @@ -225,10 +231,10 @@ func (s *Store) List(role string) ([]*User, error) { if role == "" { rows, err = s.pool.Query(ctx, - `SELECT id, username, email, role, source, active, created_at FROM users ORDER BY id`) + `SELECT id, username, email, role, source, active, created_at, tenant_id FROM users ORDER BY id`) } else { rows, err = s.pool.Query(ctx, - `SELECT id, username, email, role, source, active, created_at FROM users WHERE role = $1 ORDER BY id`, role) + `SELECT id, username, email, role, source, active, created_at, tenant_id FROM users WHERE role = $1 ORDER BY id`, role) } if err != nil { return nil, fmt.Errorf("userstore: list: %w", err) @@ -246,6 +252,28 @@ func (s *Store) List(role string) ([]*User, error) { return users, rows.Err() } +// ListByTenant returns all users belonging to a specific tenant. +func (s *Store) ListByTenant(ctx context.Context, tenantID int64) ([]*User, error) { + rows, err := s.pool.Query(ctx, + `SELECT id, username, email, role, source, active, created_at, tenant_id FROM users WHERE tenant_id = $1 ORDER BY id`, + tenantID, + ) + if err != nil { + return nil, fmt.Errorf("userstore: list by tenant: %w", err) + } + defer rows.Close() + + var users []*User + for rows.Next() { + u, err := scanUserRow(rows) + if err != nil { + return nil, err + } + users = append(users, u) + } + return users, rows.Err() +} + // BlacklistToken adds a JWT ID to the token blacklist. func (s *Store) BlacklistToken(jti string, expires time.Time) error { ctx := context.Background() @@ -295,12 +323,12 @@ func (s *Store) CountRecentFailures(username string, window time.Duration) (int, return count, err } -// AdminCount returns the number of active admin users. +// AdminCount returns the number of active privileged users (admin, domain_admin, superadmin). func (s *Store) AdminCount() (int, error) { ctx := context.Background() var count int err := s.pool.QueryRow(ctx, - `SELECT COUNT(*) FROM users WHERE role = 'admin' AND active = true`, + `SELECT COUNT(*) FROM users WHERE role IN ('admin','domain_admin','superadmin') AND active = true`, ).Scan(&count) return count, err } @@ -311,7 +339,7 @@ func (s *Store) DeleteSafe(id int64) error { if err != nil { return err } - if user.Role == RoleAdmin { + if user.Role == RoleAdmin || user.Role == RoleDomainAdmin || user.Role == RoleSuperAdmin { count, err := s.AdminCount() if err != nil { return fmt.Errorf("userstore: admin count: %w", err) @@ -331,16 +359,18 @@ func (s *Store) CleanExpiredTokens() error { } // UpsertLDAPUser creates or updates an LDAP-sourced user. -func (s *Store) UpsertLDAPUser(username, email, role string) (*User, error) { +// tenantID may be nil for users not associated with a specific tenant. +func (s *Store) UpsertLDAPUser(username, email, role string, tenantID *int64) (*User, error) { ctx := context.Background() _, err := s.pool.Exec(ctx, ` - INSERT INTO users (username, email, password_hash, role, source, active, created_at) - VALUES ($1, $2, '', $3, 'ldap', true, NOW()) + INSERT INTO users (username, email, password_hash, role, source, active, created_at, tenant_id) + VALUES ($1, $2, '', $3, 'ldap', true, NOW(), $4) ON CONFLICT (username) DO UPDATE SET - email = EXCLUDED.email, - role = EXCLUDED.role, - source = 'ldap' - `, username, email, role) + email = EXCLUDED.email, + role = EXCLUDED.role, + source = 'ldap', + tenant_id = COALESCE(EXCLUDED.tenant_id, users.tenant_id) + `, username, email, role, tenantID) if err != nil { return nil, fmt.Errorf("userstore: upsert ldap: %w", err) } @@ -351,7 +381,7 @@ func (s *Store) UpsertLDAPUser(username, email, role string) (*User, error) { func scanUser(row pgx.Row) (*User, error) { var u User - err := row.Scan(&u.ID, &u.Username, &u.Email, &u.Role, &u.Source, &u.Active, &u.CreatedAt) + err := row.Scan(&u.ID, &u.Username, &u.Email, &u.Role, &u.Source, &u.Active, &u.CreatedAt, &u.TenantID) if errors.Is(err, pgx.ErrNoRows) { return nil, fmt.Errorf("userstore: not found") } @@ -363,7 +393,7 @@ func scanUser(row pgx.Row) (*User, error) { func scanUserRow(rows pgx.Rows) (*User, error) { var u User - if err := rows.Scan(&u.ID, &u.Username, &u.Email, &u.Role, &u.Source, &u.Active, &u.CreatedAt); err != nil { + if err := rows.Scan(&u.ID, &u.Username, &u.Email, &u.Role, &u.Source, &u.Active, &u.CreatedAt, &u.TenantID); err != nil { return nil, fmt.Errorf("userstore: scan row: %w", err) } return &u, nil diff --git a/src/app/admin/upload/page.tsx b/src/app/admin/upload/page.tsx new file mode 100644 index 0000000..107909d --- /dev/null +++ b/src/app/admin/upload/page.tsx @@ -0,0 +1,314 @@ +"use client"; + +import { useState, useCallback, useRef, useEffect } from "react"; +import { useAuth } from "@/hooks/useAuth"; +import { uploadMailFiles, getUploadProgress, type UploadJob } from "@/lib/api"; +import { Navbar } from "@/components/navbar"; +import { Button } from "@/components/ui/button"; +import { Card, CardContent, CardHeader, CardTitle } from "@/components/ui/card"; +import { Progress } from "@/components/ui/progress"; +import { Badge } from "@/components/ui/badge"; +import { Alert, AlertDescription } from "@/components/ui/alert"; + +function formatBytes(bytes: number): string { + if (bytes < 1024) return `${bytes} B`; + if (bytes < 1024 * 1024) return `${(bytes / 1024).toFixed(1)} KB`; + return `${(bytes / (1024 * 1024)).toFixed(1)} MB`; +} + +export default function UploadPage() { + const { loading: authLoading } = useAuth("admin"); + + const [dragOver, setDragOver] = useState(false); + const [selectedFiles, setSelectedFiles] = useState([]); + const [uploading, setUploading] = useState(false); + const [job, setJob] = useState(null); + const [error, setError] = useState(null); + const fileInputRef = useRef(null); + const pollRef = useRef | null>(null); + + // Stop polling when job is done + useEffect(() => { + if (job && (job.status === "done" || job.status === "error")) { + if (pollRef.current) { + clearInterval(pollRef.current); + pollRef.current = null; + } + } + return () => { + if (pollRef.current) clearInterval(pollRef.current); + }; + }, [job]); + + const startPolling = useCallback((jobID: string) => { + pollRef.current = setInterval(async () => { + try { + const progress = await getUploadProgress(jobID); + setJob(progress); + } catch (e) { + console.error("poll error", e); + } + }, 1000); + }, []); + + const handleFiles = useCallback((files: FileList | null) => { + if (!files) return; + const valid = Array.from(files).filter((f) => { + const name = f.name.toLowerCase(); + return name.endsWith(".eml") || name.endsWith(".mbox"); + }); + if (valid.length === 0) { + setError("Nur .eml und .mbox Dateien werden unterstützt."); + return; + } + setError(null); + setSelectedFiles((prev) => [...prev, ...valid]); + }, []); + + const onDrop = useCallback( + (e: React.DragEvent) => { + e.preventDefault(); + setDragOver(false); + handleFiles(e.dataTransfer.files); + }, + [handleFiles] + ); + + const onDragOver = useCallback((e: React.DragEvent) => { + e.preventDefault(); + setDragOver(true); + }, []); + + const onDragLeave = useCallback(() => { + setDragOver(false); + }, []); + + const onFileInputChange = useCallback( + (e: React.ChangeEvent) => { + handleFiles(e.target.files); + // Reset input so same file can be re-selected + e.target.value = ""; + }, + [handleFiles] + ); + + const removeFile = useCallback((index: number) => { + setSelectedFiles((prev) => prev.filter((_, i) => i !== index)); + }, []); + + const startUpload = useCallback(async () => { + if (selectedFiles.length === 0) return; + setError(null); + setUploading(true); + setJob(null); + + try { + const { job_id } = await uploadMailFiles(selectedFiles); + // Fetch initial state immediately + const initial = await getUploadProgress(job_id); + setJob(initial); + startPolling(job_id); + setSelectedFiles([]); + } catch (e: unknown) { + setError(e instanceof Error ? e.message : "Upload fehlgeschlagen"); + setUploading(false); + } + }, [selectedFiles, startPolling]); + + const reset = useCallback(() => { + setJob(null); + setError(null); + setUploading(false); + setSelectedFiles([]); + }, []); + + if (authLoading) { + return ( +
+

Lade...

+
+ ); + } + + const progressPct = + job && job.total > 0 ? Math.round((job.imported + job.skipped + job.errors) / job.total * 100) : 0; + + return ( +
+ +
+

E-Mail Import

+ + {error && ( + + {error} + + )} + + {/* Drop zone */} + {!uploading && ( + fileInputRef.current?.click()} + > + + + + +

+ .eml oder .mbox Dateien hierher ziehen +
+ oder klicken zum Auswählen +

+ +
+
+ )} + + {/* Selected files list */} + {selectedFiles.length > 0 && !uploading && ( + + + + {selectedFiles.length} Datei(en) ausgewählt + + + +
    + {selectedFiles.map((f, i) => ( +
  • + + {f.name} + +
    + + {formatBytes(f.size)} + + +
    +
  • + ))} +
+ +
+
+ )} + + {/* Progress */} + {job && ( + + +
+ Import-Fortschritt + + {job.status === "running" + ? "Läuft..." + : job.status === "done" + ? "Abgeschlossen" + : "Fehler"} + +
+
+ + {job.total > 0 && ( +
+ +

+ {progressPct}% +

+
+ )} +
+
+

+ {job.imported} +

+

Importiert

+
+
+

+ {job.skipped} +

+

Übersprungen

+
+
+

+ {job.errors} +

+

Fehler

+
+
+ {job.total > 0 && ( +

+ Gesamt: {job.total} Nachrichten +

+ )} + {job.error_msg && ( + + {job.error_msg} + + )} + {job.status === "done" && ( + + )} +
+
+ )} + + {uploading && !job && ( + + +

Dateien werden hochgeladen...

+
+
+ )} +
+
+ ); +}