feat(PROJ-32): Message-ID-basierte Duplikatserkennung

- message_id Spalte + UNIQUE-Index in emails-Tabelle
- Save() prüft Message-ID vor SHA-256-Flow (kein Disk-I/O bei Duplikat)
- lookupByMessageID() als private Hilfsfunktion
- insertMeta() schreibt message_id, gibt error zurück (Race-safe)
- SaveMeta() schreibt message_id idempotent (Backfill)

feat(PROJ-34): Retention-Policy + Löschsperre (GoBD)

- retain_until TIMESTAMPTZ Spalte in emails-Tabelle
- ErrRetentionLock typed error
- Delete() prüft Retention-Frist vor Löschung
- Purge() löscht alle Mails mit abgelaufener Retention
- POST /api/admin/purge Endpunkt (superadmin only)
- config: storage.retention_days

fix: Superadmin-Benutzerübersicht zeigt Mandant-Spalte

- UsersTab: Mandant-Spalte wenn isSuperAdmin
- domain_auditor Rolle im Create-Dialog ergänzt
- storage Modulversion → 1.6

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
sysops
2026-03-31 01:29:25 +02:00
parent cb31c48ce8
commit b6856af2eb
9 changed files with 200 additions and 30 deletions
+153 -20
View File
@@ -23,19 +23,24 @@ import (
"github.com/archivmail/pkg/mailparser"
)
// ErrRetentionLock is returned when a mail cannot be deleted because its retention period has not yet expired.
var ErrRetentionLock = errors.New("storage: mail is within retention period")
// Config holds the configuration for initialising a Store.
type Config struct {
Dir string // base directory for file storage
Keyfile string // path to 32-byte AES key file; empty = no encryption
DSN string // PostgreSQL DSN; empty = no DB
Dir string // base directory for file storage
Keyfile string // path to 32-byte AES key file; empty = no encryption
DSN string // PostgreSQL DSN; empty = no DB
RetentionDays int // 0 = no lock; >0 = GoBD retention period in days
}
// Store is a file-based email storage with optional AES-256-GCM encryption
// and optional PostgreSQL metadata.
type Store struct {
dir string
key []byte // nil = no encryption
db *pgxpool.Pool // nil = no DB
dir string
key []byte // nil = no encryption
db *pgxpool.Pool // nil = no DB
retentionDays int // 0 = no lock
}
// StoreStats reports total mail count and size in bytes.
@@ -59,7 +64,7 @@ func New(cfg Config) (*Store, error) {
}
}
s := &Store{dir: cfg.Dir}
s := &Store{dir: cfg.Dir, retentionDays: cfg.RetentionDays}
// Load encryption key
if err := s.loadKey(cfg.Keyfile); err != nil {
@@ -80,6 +85,11 @@ func New(cfg Config) (*Store, error) {
ctx := context.Background()
_, _ = s.db.Exec(ctx, `ALTER TABLE emails ADD COLUMN IF NOT EXISTS verify_ok BOOLEAN`)
_, _ = s.db.Exec(ctx, `ALTER TABLE emails ADD COLUMN IF NOT EXISTS verified_at TIMESTAMPTZ`)
_, _ = s.db.Exec(ctx, `ALTER TABLE emails ADD COLUMN IF NOT EXISTS message_id TEXT`)
_, _ = s.db.Exec(ctx, `CREATE UNIQUE INDEX IF NOT EXISTS idx_emails_message_id ON emails (message_id) WHERE message_id IS NOT NULL`)
// PROJ-34: GoBD retention lock
_, _ = s.db.Exec(ctx, `ALTER TABLE emails ADD COLUMN IF NOT EXISTS retain_until TIMESTAMPTZ`)
_, _ = s.db.Exec(ctx, `CREATE INDEX IF NOT EXISTS idx_emails_retain_until ON emails (retain_until) WHERE retain_until IS NOT NULL`)
}
return s, nil
@@ -219,8 +229,39 @@ func (s *Store) initSchema(ctx context.Context) error {
// the plaintext content. If the file already exists, Save ensures an email_ref
// exists for the tenant (cross-tenant dedup: one file, many refs).
// tenantID may be nil for system-level ingestion without tenant assignment.
//
// Dedup order:
// 1. Parse mail → extract Message-ID
// 2. If Message-ID present: lookup in DB → if found, return existing ID (no disk I/O)
// 3. Fallback: SHA-256-based dedup (existing behaviour)
func (s *Store) Save(ctx context.Context, raw []byte, _ time.Time, tenantID *int64) (string, error) {
// Hash plaintext for dedup (always before encryption)
// Step 1: parse for Message-ID dedup (best-effort)
var messageID string
pm, parseErr := mailparser.Parse(raw)
if parseErr == nil {
messageID = pm.MessageID
}
// Step 2: Message-ID lookup — avoids disk I/O for cross-channel duplicates
if s.db != nil && messageID != "" {
existingID, err := s.lookupByMessageID(ctx, messageID)
if err != nil {
return "", fmt.Errorf("storage: message-id lookup: %w", err)
}
if existingID != "" {
// Already archived — ensure tenant ref exists and return
if tenantID != nil {
_, _ = s.db.Exec(ctx, `
INSERT INTO email_refs (email_id, tenant_id)
VALUES ($1, $2)
ON CONFLICT (email_id, tenant_id) DO NOTHING
`, existingID, *tenantID)
}
return existingID, nil
}
}
// Step 3: SHA-256-based dedup (fallback / no Message-ID)
sum := sha256.Sum256(raw)
id := fmt.Sprintf("%x", sum[:]) // 64 hex chars
@@ -251,14 +292,34 @@ func (s *Store) Save(ctx context.Context, raw []byte, _ time.Time, tenantID *int
return "", fmt.Errorf("storage: write: %w", err)
}
// Insert metadata into DB (best-effort parse)
// Insert metadata into DB
if s.db != nil {
pm, parseErr := mailparser.Parse(raw)
if parseErr == nil {
s.insertMeta(ctx, id, pm, len(raw), tenantID)
if err := s.insertMeta(ctx, id, pm, len(raw), tenantID); err != nil {
// Race: another goroutine inserted via Message-ID UNIQUE conflict.
// Resolve to the existing record's ID.
if messageID != "" {
if conflictID, lerr := s.lookupByMessageID(ctx, messageID); lerr == nil && conflictID != "" {
if tenantID != nil {
_, _ = s.db.Exec(ctx, `
INSERT INTO email_refs (email_id, tenant_id)
VALUES ($1, $2)
ON CONFLICT (email_id, tenant_id) DO NOTHING
`, conflictID, *tenantID)
}
return conflictID, nil
}
}
// Non-conflict insert error: log but continue (file is written, metadata can be backfilled)
}
} else {
s.insertMetaMinimal(ctx, id, len(raw), tenantID)
}
// PROJ-34: Set retention lock if configured
if s.retentionDays > 0 {
until := time.Now().AddDate(0, 0, s.retentionDays)
_, _ = s.db.Exec(ctx, `UPDATE emails SET retain_until=$1 WHERE id=$2 AND retain_until IS NULL`, until, id)
}
}
}
@@ -274,6 +335,22 @@ func (s *Store) Save(ctx context.Context, raw []byte, _ time.Time, tenantID *int
return id, nil
}
// lookupByMessageID returns the email ID for a given Message-ID header value,
// or an empty string if not found. Returns an error only on unexpected DB failures.
func (s *Store) lookupByMessageID(ctx context.Context, messageID string) (string, error) {
var id string
err := s.db.QueryRow(ctx,
`SELECT id FROM emails WHERE message_id = $1 LIMIT 1`, messageID,
).Scan(&id)
if errors.Is(err, pgx.ErrNoRows) {
return "", nil
}
if err != nil {
return "", err
}
return id, nil
}
// Load reads a stored email by its ID. If encryption is configured, the file
// is decrypted before returning plaintext.
func (s *Store) Load(id string) ([]byte, error) {
@@ -301,6 +378,16 @@ func (s *Store) Load(id string) ([]byte, error) {
// Delete removes a stored email by its ID, including its DB metadata row.
func (s *Store) Delete(id string) error {
// PROJ-34: Enforce retention lock before any disk or DB operation.
if s.db != nil {
ctx := context.Background()
var until *time.Time
_ = s.db.QueryRow(ctx, `SELECT retain_until FROM emails WHERE id=$1`, id).Scan(&until)
if until != nil && time.Now().Before(*until) {
return ErrRetentionLock
}
}
path := s.filePath(id)
if err := os.Remove(path); err != nil {
if errors.Is(err, os.ErrNotExist) {
@@ -316,6 +403,38 @@ func (s *Store) Delete(id string) error {
return nil
}
// Purge deletes all mails whose retain_until has passed.
// Returns the number of successfully deleted mails.
// Mails that fail to delete (e.g. file missing) are skipped silently.
func (s *Store) Purge(ctx context.Context) (int, error) {
if s.db == nil {
return 0, nil
}
rows, err := s.db.Query(ctx,
`SELECT id FROM emails WHERE retain_until IS NOT NULL AND retain_until < NOW()`)
if err != nil {
return 0, fmt.Errorf("storage: purge query: %w", err)
}
defer rows.Close()
var ids []string
for rows.Next() {
var id string
if err := rows.Scan(&id); err == nil {
ids = append(ids, id)
}
}
if err := rows.Err(); err != nil {
return 0, fmt.Errorf("storage: purge rows: %w", err)
}
deleted := 0
for _, id := range ids {
if err := s.Delete(id); err == nil {
deleted++
}
}
return deleted, nil
}
// Stats returns aggregate statistics. Uses the DB if available (fast), otherwise
// falls back to walking the file system.
func (s *Store) Stats() (*StoreStats, error) {
@@ -430,15 +549,22 @@ func (s *Store) firstAndLastFromFS() (first, last *MailRef, err error) {
// ── Metadata helpers ──────────────────────────────────────────────────────
// insertMeta inserts parsed email metadata into the emails table.
func (s *Store) insertMeta(ctx context.Context, id string, pm *mailparser.ParsedMail, size int, tenantID *int64) {
// Returns an error so the caller can detect UNIQUE-constraint conflicts on message_id.
func (s *Store) insertMeta(ctx context.Context, id string, pm *mailparser.ParsedMail, size int, tenantID *int64) error {
mailTo := strings.Join(pm.To, ", ")
hasAttach := len(pm.Attachments) > 0
_, _ = s.db.Exec(ctx, `
INSERT INTO emails (id, received_at, mail_from, mail_to, subject, size_bytes, has_attach, tenant_id)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
var msgID *string
if pm.MessageID != "" {
msgID = &pm.MessageID
}
_, err := s.db.Exec(ctx, `
INSERT INTO emails (id, received_at, mail_from, mail_to, subject, size_bytes, has_attach, tenant_id, message_id)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
ON CONFLICT (id) DO NOTHING
`, id, pm.Date, pm.From, mailTo, pm.Subject, int64(size), hasAttach, tenantID)
`, id, pm.Date, pm.From, mailTo, pm.Subject, int64(size), hasAttach, tenantID, msgID)
return err
}
// insertMetaMinimal inserts minimal metadata when parsing fails.
@@ -451,6 +577,7 @@ func (s *Store) insertMetaMinimal(ctx context.Context, id string, size int, tena
}
// SaveMeta upserts metadata for a given email ID. Used by the backfill process.
// Writes message_id when present so that backfill populates the UNIQUE index.
func (s *Store) SaveMeta(ctx context.Context, id string, pm *mailparser.ParsedMail, size int) error {
if s.db == nil {
return nil
@@ -459,16 +586,22 @@ func (s *Store) SaveMeta(ctx context.Context, id string, pm *mailparser.ParsedMa
mailTo := strings.Join(pm.To, ", ")
hasAttach := len(pm.Attachments) > 0
var msgID *string
if pm.MessageID != "" {
msgID = &pm.MessageID
}
_, err := s.db.Exec(ctx, `
INSERT INTO emails (id, received_at, mail_from, mail_to, subject, size_bytes, has_attach)
VALUES ($1, $2, $3, $4, $5, $6, $7)
INSERT INTO emails (id, received_at, mail_from, mail_to, subject, size_bytes, has_attach, message_id)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT (id) DO UPDATE SET
mail_from = EXCLUDED.mail_from,
mail_to = EXCLUDED.mail_to,
subject = EXCLUDED.subject,
size_bytes = EXCLUDED.size_bytes,
has_attach = EXCLUDED.has_attach
`, id, pm.Date, pm.From, mailTo, pm.Subject, int64(size), hasAttach)
has_attach = EXCLUDED.has_attach,
message_id = COALESCE(emails.message_id, EXCLUDED.message_id)
`, id, pm.Date, pm.From, mailTo, pm.Subject, int64(size), hasAttach, msgID)
return err
}