package storage import ( "context" "crypto/aes" "crypto/cipher" "crypto/rand" "crypto/sha256" "encoding/base64" "errors" "fmt" "io" "io/fs" "log/slog" "os" "path/filepath" "strings" "time" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" "archivmail/pkg/mailparser" ) // ErrRetentionLock is returned when a mail cannot be deleted because its retention period has not yet expired. var ErrRetentionLock = errors.New("storage: mail is within retention period") // Config holds the configuration for initialising a Store. type Config struct { Dir string // base directory for file storage Keyfile string // path to 32-byte AES key file; empty = no encryption DSN string // PostgreSQL DSN; empty = no DB RetentionDays int // 0 = no lock; >0 = GoBD retention period in days CompressEnabled bool // gzip-compress emails and attachments before encryption } // Store is a file-based email storage with optional AES-256-GCM encryption // and optional PostgreSQL metadata. type Store struct { dir string key []byte // nil = no encryption db *pgxpool.Pool // nil = no DB retentionDays int // 0 = no lock compressEnabled bool // gzip before encryption } // StoreStats reports total mail count and size in bytes. type StoreStats struct { TotalMails int64 TotalBytes int64 } // MailRef holds the ID and associated time of a stored mail. type MailRef struct { ID string ModTime time.Time } // MailWithUID holds the ID and stable IMAP UID of a stored mail. type MailWithUID struct { ID string UID int64 } // New initialises the storage directory, optionally loads the encryption key // and connects to PostgreSQL. func New(cfg Config) (*Store, error) { for _, sub := range []string{"store", "attachments", "meta"} { if err := os.MkdirAll(filepath.Join(cfg.Dir, sub), 0o755); err != nil { return nil, fmt.Errorf("storage: mkdir %s: %w", sub, err) } } s := &Store{dir: cfg.Dir, retentionDays: cfg.RetentionDays, compressEnabled: cfg.CompressEnabled} // Load encryption key if err := s.loadKey(cfg.Keyfile); err != nil { return nil, err } // Connect to PostgreSQL and auto-create schema if cfg.DSN != "" { pool, err := pgxpool.New(context.Background(), cfg.DSN) if err != nil { return nil, fmt.Errorf("storage: db connect: %w", err) } s.db = pool if err := s.initSchema(context.Background()); err != nil { pool.Close() return nil, fmt.Errorf("storage: init schema: %w", err) } ctx := context.Background() _, _ = s.db.Exec(ctx, `ALTER TABLE emails ADD COLUMN IF NOT EXISTS verify_ok BOOLEAN`) _, _ = s.db.Exec(ctx, `ALTER TABLE emails ADD COLUMN IF NOT EXISTS verified_at TIMESTAMPTZ`) _, _ = s.db.Exec(ctx, `ALTER TABLE emails ADD COLUMN IF NOT EXISTS message_id TEXT`) _, _ = s.db.Exec(ctx, `CREATE UNIQUE INDEX IF NOT EXISTS idx_emails_message_id ON emails (message_id) WHERE message_id IS NOT NULL`) // PROJ-34: GoBD retention lock _, _ = s.db.Exec(ctx, `ALTER TABLE emails ADD COLUMN IF NOT EXISTS retain_until TIMESTAMPTZ`) _, _ = s.db.Exec(ctx, `CREATE INDEX IF NOT EXISTS idx_emails_retain_until ON emails (retain_until) WHERE retain_until IS NOT NULL`) // PROJ-33: Stable IMAP UIDs _, _ = s.db.Exec(ctx, `ALTER TABLE emails ADD COLUMN IF NOT EXISTS uid BIGSERIAL`) _, _ = s.db.Exec(ctx, `CREATE UNIQUE INDEX IF NOT EXISTS idx_emails_uid ON emails (uid)`) // 2.0: storage_objects FK on emails _, _ = s.db.Exec(ctx, `ALTER TABLE emails ADD COLUMN IF NOT EXISTS storage_id BIGINT REFERENCES storage_objects(id)`) } return s, nil } // Close releases the database connection pool (if any). func (s *Store) Close() { if s.db != nil { s.db.Close() } } // ── Encryption key loading ──────────────────────────────────────────────── func (s *Store) loadKey(keyfile string) error { // Also check environment variable as fallback if keyfile == "" { keyfile = os.Getenv("ARCHIVMAIL_KEY") } if keyfile == "" { return nil // no encryption configured } // If the env var contains the key itself (not a path), it would be unusual. // We treat the value as a file path in all cases. data, err := os.ReadFile(keyfile) if err != nil { return fmt.Errorf("storage: read keyfile %s: %w", keyfile, err) } // Strip whitespace / newlines raw := strings.TrimSpace(string(data)) // Try base64 decode first (spec says base64-encoded 32-byte key) decoded, err := base64.StdEncoding.DecodeString(raw) if err != nil { // Fall back to raw bytes (for 32-byte binary key files) decoded = []byte(raw) } if len(decoded) != 32 { return fmt.Errorf("storage: keyfile must contain exactly 32 bytes (got %d)", len(decoded)) } s.key = decoded return nil } // ── AES-256-GCM encryption / decryption ─────────────────────────────────── // encrypt applies AES-256-GCM encryption. The 12-byte nonce is prepended // to the ciphertext. func (s *Store) encrypt(plaintext []byte) ([]byte, error) { block, err := aes.NewCipher(s.key) if err != nil { return nil, fmt.Errorf("storage: aes cipher: %w", err) } gcm, err := cipher.NewGCM(block) if err != nil { return nil, fmt.Errorf("storage: gcm: %w", err) } nonce := make([]byte, gcm.NonceSize()) // 12 bytes if _, err := io.ReadFull(rand.Reader, nonce); err != nil { return nil, fmt.Errorf("storage: random nonce: %w", err) } ciphertext := gcm.Seal(nonce, nonce, plaintext, nil) return ciphertext, nil } // decrypt reverses AES-256-GCM encryption. Expects nonce prepended to ciphertext. func (s *Store) decrypt(data []byte) ([]byte, error) { block, err := aes.NewCipher(s.key) if err != nil { return nil, fmt.Errorf("storage: aes cipher: %w", err) } gcm, err := cipher.NewGCM(block) if err != nil { return nil, fmt.Errorf("storage: gcm: %w", err) } nonceSize := gcm.NonceSize() if len(data) < nonceSize { return nil, fmt.Errorf("storage: ciphertext too short") } nonce, ciphertext := data[:nonceSize], data[nonceSize:] plaintext, err := gcm.Open(nil, nonce, ciphertext, nil) if err != nil { return nil, fmt.Errorf("storage: decrypt: %w", err) } return plaintext, nil } // ── Database schema ─────────────────────────────────────────────────────── func (s *Store) initSchema(ctx context.Context) error { // storage_objects must exist before emails (FK dependency) _, err := s.db.Exec(ctx, ` CREATE TABLE IF NOT EXISTS storage_objects ( id BIGSERIAL PRIMARY KEY, storage_type TEXT NOT NULL DEFAULT 'filesystem', path TEXT NOT NULL, compression TEXT NOT NULL DEFAULT 'none', size_original BIGINT, size_compressed BIGINT, checksum CHAR(64), created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); CREATE TABLE IF NOT EXISTS attachments ( id BIGSERIAL PRIMARY KEY, filename TEXT, mime_type TEXT, size_bytes BIGINT, hash CHAR(64) UNIQUE NOT NULL, storage_id BIGINT REFERENCES storage_objects(id), created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); CREATE INDEX IF NOT EXISTS idx_attachments_hash ON attachments (hash); `) if err != nil { return err } _, err = s.db.Exec(ctx, ` CREATE TABLE IF NOT EXISTS emails ( id TEXT PRIMARY KEY, received_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), mail_from TEXT, mail_to TEXT, subject TEXT, size_bytes BIGINT, has_attach BOOLEAN DEFAULT FALSE, indexed_at TIMESTAMPTZ ); CREATE INDEX IF NOT EXISTS idx_emails_received_at ON emails (received_at); CREATE INDEX IF NOT EXISTS idx_emails_mail_from ON emails (mail_from); CREATE INDEX IF NOT EXISTS idx_emails_subject ON emails USING gin (to_tsvector('simple', subject)); `) if err != nil { return err } // Phase 2b migrations: tenant isolation _, err = s.db.Exec(ctx, ` ALTER TABLE emails ADD COLUMN IF NOT EXISTS tenant_id BIGINT; CREATE INDEX IF NOT EXISTS idx_emails_tenant ON emails (tenant_id); CREATE TABLE IF NOT EXISTS email_refs ( id BIGSERIAL PRIMARY KEY, email_id TEXT NOT NULL REFERENCES emails(id), tenant_id BIGINT NOT NULL, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), UNIQUE(email_id, tenant_id) ); CREATE INDEX IF NOT EXISTS idx_email_refs_tenant ON email_refs (tenant_id); CREATE INDEX IF NOT EXISTS idx_email_refs_email ON email_refs (email_id); CREATE TABLE IF NOT EXISTS email_attachments ( email_id TEXT NOT NULL REFERENCES emails(id), attachment_id BIGINT NOT NULL REFERENCES attachments(id), PRIMARY KEY (email_id, attachment_id) ); CREATE INDEX IF NOT EXISTS idx_email_attachments_email ON email_attachments (email_id); `) if err != nil { return err } // PROJ-38: Mail-Threading _, err = s.db.Exec(ctx, ` ALTER TABLE emails ADD COLUMN IF NOT EXISTS thread_id TEXT; ALTER TABLE emails ADD COLUMN IF NOT EXISTS in_reply_to TEXT; CREATE INDEX IF NOT EXISTS idx_emails_thread ON emails (thread_id); `) return err } // ── Core operations ─────────────────────────────────────────────────────── // Save writes raw email bytes to storage. The ID is the hex-encoded SHA256 of // the plaintext content. If the file already exists, Save ensures an email_ref // exists for the tenant (cross-tenant dedup: one file, many refs). // tenantID may be nil for system-level ingestion without tenant assignment. // // Dedup order: // 1. Parse mail → extract Message-ID // 2. If Message-ID present: lookup in DB → if found, return existing ID (no disk I/O) // 3. Fallback: SHA-256-based dedup (existing behaviour) func (s *Store) Save(ctx context.Context, raw []byte, _ time.Time, tenantID *int64) (string, error) { // Step 1: parse for Message-ID dedup (best-effort) var messageID string pm, parseErr := mailparser.Parse(raw) if parseErr == nil { messageID = pm.MessageID } // Step 2: Message-ID lookup — avoids disk I/O for cross-channel duplicates if s.db != nil && messageID != "" { existingID, err := s.lookupByMessageID(ctx, messageID) if err != nil { return "", fmt.Errorf("storage: message-id lookup: %w", err) } if existingID != "" { // Already archived — ensure tenant ref exists and return if tenantID != nil { _, _ = s.db.Exec(ctx, ` INSERT INTO email_refs (email_id, tenant_id) VALUES ($1, $2) ON CONFLICT (email_id, tenant_id) DO NOTHING `, existingID, *tenantID) } return existingID, nil } } // Step 3: Quota check (PROJ-29) — reject before writing if err := s.CheckQuota(ctx, tenantID); err != nil { return "", err } // Step 4: SHA-256-based dedup (fallback / no Message-ID) sum := sha256.Sum256(raw) id := fmt.Sprintf("%x", sum[:]) // 64 hex chars path := s.filePath(id) if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil { return "", fmt.Errorf("storage: mkdir shard: %w", err) } fileExists := false if _, err := os.Stat(path); err == nil { fileExists = true } if !fileExists { // Compress before encryption (if enabled) toStore := raw compression := "none" if s.compressEnabled { compressed, cerr := compressGzip(raw) if cerr == nil && len(compressed) < len(raw) { toStore = compressed compression = "gzip" } } // Encrypt (if key configured) var toWrite []byte if s.key != nil { encrypted, err := s.encrypt(toStore) if err != nil { return "", err } toWrite = encrypted } else { toWrite = toStore } if err := os.WriteFile(path, toWrite, 0o644); err != nil { return "", fmt.Errorf("storage: write: %w", err) } // Insert metadata into DB if s.db != nil { // Register in storage_objects var storageID *int64 var sid int64 if soErr := s.db.QueryRow(ctx, ` INSERT INTO storage_objects (storage_type, path, compression, size_original, size_compressed, checksum) VALUES ('filesystem', $1, $2, $3, $4, $5) RETURNING id `, path, compression, int64(len(raw)), int64(len(toWrite)), id).Scan(&sid); soErr == nil { storageID = &sid } if parseErr == nil { // PROJ-38: resolve thread before inserting if pm.InReplyTo != "" || len(pm.References) > 0 { pm.MessageID = pm.MessageID // no-op; thread resolved inside insertMeta } threadID := s.resolveThreadID(ctx, pm) if err := s.insertMeta(ctx, id, pm, len(raw), tenantID, storageID, threadID); err != nil { // Race: another goroutine inserted via Message-ID UNIQUE conflict. // Resolve to the existing record's ID. if messageID != "" { if conflictID, lerr := s.lookupByMessageID(ctx, messageID); lerr == nil && conflictID != "" { if tenantID != nil { _, _ = s.db.Exec(ctx, ` INSERT INTO email_refs (email_id, tenant_id) VALUES ($1, $2) ON CONFLICT (email_id, tenant_id) DO NOTHING `, conflictID, *tenantID) } return conflictID, nil } } // Non-conflict insert error: log but continue (file is written, metadata can be backfilled) } } else { s.insertMetaMinimal(ctx, id, len(raw), tenantID, storageID) } // Sprint 2: deduplicate and store attachments if parseErr == nil { _ = s.saveAttachments(ctx, id, pm) } // PROJ-34: Set retention lock. // Mandanten-Mails: nur wenn der Mandant explizit retention_days > 0 gesetzt hat. // Globale config greift NICHT automatisch — jeder Mandant muss selbst opt-in. // Mails ohne Mandant (tenantID == nil): globale config als Fallback. if tenantID != nil { var tenantDays int if err := s.db.QueryRow(ctx, `SELECT retention_days FROM tenants WHERE id=$1`, *tenantID).Scan(&tenantDays); err == nil && tenantDays > 0 { until := time.Now().AddDate(0, 0, tenantDays) _, _ = s.db.Exec(ctx, `UPDATE emails SET retain_until=$1 WHERE id=$2 AND retain_until IS NULL`, until, id) } // else: tenant hat retention_days=0 → kein Lock gesetzt → keine automatische Löschung } else if s.retentionDays > 0 { until := time.Now().AddDate(0, 0, s.retentionDays) _, _ = s.db.Exec(ctx, `UPDATE emails SET retain_until=$1 WHERE id=$2 AND retain_until IS NULL`, until, id) } } } // Ensure email_ref entry for this tenant (even if file already existed) if s.db != nil && tenantID != nil { _, _ = s.db.Exec(ctx, ` INSERT INTO email_refs (email_id, tenant_id) VALUES ($1, $2) ON CONFLICT (email_id, tenant_id) DO NOTHING `, id, *tenantID) } return id, nil } // resolveThreadID determines the thread_id for a new mail by checking its // References and In-Reply-To headers against existing mails in the DB. // Returns the inherited thread_id, or the mail's own MessageID if it starts // a new thread, or "" if no MessageID is available. func (s *Store) resolveThreadID(ctx context.Context, pm *mailparser.ParsedMail) string { if s.db == nil { return pm.MessageID } // Check References in order (oldest first per RFC 5322 §3.6.4). // The first reference that has a known thread_id wins. for _, ref := range pm.References { var tid string err := s.db.QueryRow(ctx, `SELECT COALESCE(thread_id, message_id) FROM emails WHERE message_id = $1 LIMIT 1`, ref, ).Scan(&tid) if err == nil && tid != "" { return tid } } // Fall back to In-Reply-To if pm.InReplyTo != "" { var tid string err := s.db.QueryRow(ctx, `SELECT COALESCE(thread_id, message_id) FROM emails WHERE message_id = $1 LIMIT 1`, pm.InReplyTo, ).Scan(&tid) if err == nil && tid != "" { return tid } } // New thread — use own Message-ID as root return pm.MessageID } // ThreadInfo holds thread metadata for a single mail. type ThreadInfo struct { ThreadID string ThreadSize int } // GetThreadInfo returns thread_id and thread size for a batch of email IDs. // Only mails that belong to a thread (non-NULL thread_id) are enriched. func (s *Store) GetThreadInfo(ctx context.Context, ids []string) (map[string]ThreadInfo, error) { if s.db == nil || len(ids) == 0 { return nil, nil } // Step 1: get thread_ids for the given email IDs rows, err := s.db.Query(ctx, `SELECT id, thread_id FROM emails WHERE id = ANY($1) AND thread_id IS NOT NULL`, ids, ) if err != nil { return nil, err } defer rows.Close() threadByEmail := map[string]string{} var threadIDs []string for rows.Next() { var emailID, threadID string if err := rows.Scan(&emailID, &threadID); err == nil { threadByEmail[emailID] = threadID threadIDs = append(threadIDs, threadID) } } rows.Close() if len(threadIDs) == 0 { return map[string]ThreadInfo{}, nil } // Step 2: count mails per thread rows2, err := s.db.Query(ctx, `SELECT thread_id, COUNT(*) FROM emails WHERE thread_id = ANY($1) GROUP BY thread_id`, threadIDs, ) if err != nil { return nil, err } defer rows2.Close() threadSize := map[string]int{} for rows2.Next() { var tid string var cnt int if err := rows2.Scan(&tid, &cnt); err == nil { threadSize[tid] = cnt } } result := make(map[string]ThreadInfo, len(ids)) for emailID, tid := range threadByEmail { result[emailID] = ThreadInfo{ThreadID: tid, ThreadSize: threadSize[tid]} } return result, nil } // GetMailsByThread returns all email IDs in a thread, ordered by date ascending. func (s *Store) GetMailsByThread(ctx context.Context, threadID string, tenantID *int64) ([]string, error) { if s.db == nil { return nil, nil } var rows pgx.Rows var err error if tenantID != nil { rows, err = s.db.Query(ctx, ` SELECT e.id FROM emails e JOIN email_refs r ON r.email_id = e.id AND r.tenant_id = $2 WHERE e.thread_id = $1 ORDER BY e.received_at ASC `, threadID, *tenantID) } else { rows, err = s.db.Query(ctx, ` SELECT id FROM emails WHERE thread_id = $1 ORDER BY received_at ASC `, threadID) } if err != nil { return nil, err } defer rows.Close() var ids []string for rows.Next() { var id string if err := rows.Scan(&id); err == nil { ids = append(ids, id) } } return ids, nil } // lookupByMessageID returns the email ID for a given Message-ID header value, // or an empty string if not found. Returns an error only on unexpected DB failures. func (s *Store) lookupByMessageID(ctx context.Context, messageID string) (string, error) { var id string err := s.db.QueryRow(ctx, `SELECT id FROM emails WHERE message_id = $1 LIMIT 1`, messageID, ).Scan(&id) if errors.Is(err, pgx.ErrNoRows) { return "", nil } if err != nil { return "", err } return id, nil } // Load reads a stored email by its ID. If encryption is configured, the file // is decrypted before returning plaintext. func (s *Store) Load(id string) ([]byte, error) { path := s.filePath(id) data, err := os.ReadFile(path) if err != nil { if errors.Is(err, os.ErrNotExist) { return nil, fmt.Errorf("storage: not found: %s", id) } return nil, fmt.Errorf("storage: read: %w", err) } if s.key != nil { plaintext, err := s.decrypt(data) if err != nil { // Pre-encryption era: file stored unencrypted — try decompression anyway. out, _ := maybeDecompress(data) return out, nil } data = plaintext } return maybeDecompress(data) } // Delete removes a stored email by its ID, including its DB metadata row. func (s *Store) Delete(id string) error { // PROJ-34: Enforce retention lock before any disk or DB operation. if s.db != nil { ctx := context.Background() var until *time.Time _ = s.db.QueryRow(ctx, `SELECT retain_until FROM emails WHERE id=$1`, id).Scan(&until) if until != nil && time.Now().Before(*until) { return ErrRetentionLock } } path := s.filePath(id) if err := os.Remove(path); err != nil { if errors.Is(err, os.ErrNotExist) { return fmt.Errorf("storage: not found: %s", id) } return fmt.Errorf("storage: delete: %w", err) } if s.db != nil { _, _ = s.db.Exec(context.Background(), `DELETE FROM emails WHERE id = $1`, id) } return nil } // Purge deletes all mails whose retain_until has passed. // Returns the number of successfully deleted mails. // Mails that fail to delete (e.g. file missing) are skipped silently. func (s *Store) Purge(ctx context.Context) (int, error) { if s.db == nil { return 0, nil } rows, err := s.db.Query(ctx, `SELECT id FROM emails WHERE retain_until IS NOT NULL AND retain_until < NOW()`) if err != nil { return 0, fmt.Errorf("storage: purge query: %w", err) } defer rows.Close() var ids []string for rows.Next() { var id string if err := rows.Scan(&id); err == nil { ids = append(ids, id) } } if err := rows.Err(); err != nil { return 0, fmt.Errorf("storage: purge rows: %w", err) } deleted := 0 for _, id := range ids { if err := s.Delete(id); err == nil { deleted++ } } return deleted, nil } // Stats returns aggregate statistics. Uses the DB if available (fast), otherwise // falls back to walking the file system. func (s *Store) Stats() (*StoreStats, error) { if s.db != nil { return s.statsFromDB() } return s.statsFromFS() } func (s *Store) statsFromDB() (*StoreStats, error) { var stats StoreStats err := s.db.QueryRow(context.Background(), `SELECT COALESCE(COUNT(*), 0), COALESCE(SUM(size_bytes), 0) FROM emails`, ).Scan(&stats.TotalMails, &stats.TotalBytes) if err != nil { return nil, fmt.Errorf("storage: stats from db: %w", err) } return &stats, nil } func (s *Store) statsFromFS() (*StoreStats, error) { var stats StoreStats err := filepath.WalkDir(filepath.Join(s.dir, "store"), func(_ string, d fs.DirEntry, err error) error { if err != nil { return err } if d.IsDir() { return nil } info, err := d.Info() if err != nil { return err } stats.TotalMails++ stats.TotalBytes += info.Size() return nil }) if err != nil { return nil, fmt.Errorf("storage: stats: %w", err) } return &stats, nil } // FirstAndLastMail returns the oldest and newest mail. Uses the DB if available // (much faster than walking the filesystem). func (s *Store) FirstAndLastMail() (first, last *MailRef, err error) { if s.db != nil { return s.firstAndLastFromDB() } return s.firstAndLastFromFS() } func (s *Store) firstAndLastFromDB() (first, last *MailRef, err error) { ctx := context.Background() // Oldest var fID string var fTime time.Time err = s.db.QueryRow(ctx, `SELECT id, received_at FROM emails ORDER BY received_at ASC LIMIT 1`, ).Scan(&fID, &fTime) if err != nil && !errors.Is(err, pgx.ErrNoRows) { return nil, nil, fmt.Errorf("storage: first mail: %w", err) } if err == nil { first = &MailRef{ID: fID, ModTime: fTime} } // Newest var lID string var lTime time.Time err = s.db.QueryRow(ctx, `SELECT id, received_at FROM emails ORDER BY received_at DESC LIMIT 1`, ).Scan(&lID, &lTime) if err != nil && !errors.Is(err, pgx.ErrNoRows) { return nil, nil, fmt.Errorf("storage: last mail: %w", err) } if err == nil { last = &MailRef{ID: lID, ModTime: lTime} } return first, last, nil } func (s *Store) firstAndLastFromFS() (first, last *MailRef, err error) { err = filepath.WalkDir(filepath.Join(s.dir, "store"), func(path string, d fs.DirEntry, werr error) error { if werr != nil { return werr } if d.IsDir() { return nil } info, err := d.Info() if err != nil { return err } ref := &MailRef{ID: d.Name(), ModTime: info.ModTime()} if first == nil || ref.ModTime.Before(first.ModTime) { first = ref } if last == nil || ref.ModTime.After(last.ModTime) { last = ref } return nil }) if err != nil { return nil, nil, fmt.Errorf("storage: first/last: %w", err) } return first, last, nil } // ── Metadata helpers ────────────────────────────────────────────────────── // insertMeta inserts parsed email metadata into the emails table. // Returns an error so the caller can detect UNIQUE-constraint conflicts on message_id. func (s *Store) insertMeta(ctx context.Context, id string, pm *mailparser.ParsedMail, size int, tenantID *int64, storageID *int64, threadID string) error { mailTo := strings.Join(pm.To, ", ") hasAttach := len(pm.Attachments) > 0 var msgID *string if pm.MessageID != "" { msgID = &pm.MessageID } var tid *string if threadID != "" { tid = &threadID } var inReplyTo *string if pm.InReplyTo != "" { inReplyTo = &pm.InReplyTo } receivedAt := pm.Date if receivedAt.IsZero() { receivedAt = time.Now() } _, err := s.db.Exec(ctx, ` INSERT INTO emails (id, received_at, mail_from, mail_to, subject, size_bytes, has_attach, tenant_id, message_id, storage_id, thread_id, in_reply_to) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) ON CONFLICT (id) DO NOTHING `, id, receivedAt, pm.From, mailTo, pm.Subject, int64(size), hasAttach, tenantID, msgID, storageID, tid, inReplyTo) return err } // insertMetaMinimal inserts minimal metadata when parsing fails. func (s *Store) insertMetaMinimal(ctx context.Context, id string, size int, tenantID *int64, storageID *int64) { _, _ = s.db.Exec(ctx, ` INSERT INTO emails (id, received_at, size_bytes, tenant_id, storage_id) VALUES ($1, NOW(), $2, $3, $4) ON CONFLICT (id) DO NOTHING `, id, int64(size), tenantID, storageID) } // SaveMeta upserts metadata for a given email ID. Used by the backfill process. // Writes message_id when present so that backfill populates the UNIQUE index. func (s *Store) SaveMeta(ctx context.Context, id string, pm *mailparser.ParsedMail, size int) error { if s.db == nil { return nil } mailTo := strings.Join(pm.To, ", ") hasAttach := len(pm.Attachments) > 0 var msgID *string if pm.MessageID != "" { msgID = &pm.MessageID } metaDate := pm.Date if metaDate.IsZero() { metaDate = time.Now() } _, err := s.db.Exec(ctx, ` INSERT INTO emails (id, received_at, mail_from, mail_to, subject, size_bytes, has_attach, message_id) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT (id) DO UPDATE SET mail_from = EXCLUDED.mail_from, mail_to = EXCLUDED.mail_to, subject = EXCLUDED.subject, size_bytes = EXCLUDED.size_bytes, has_attach = EXCLUDED.has_attach, message_id = COALESCE(emails.message_id, EXCLUDED.message_id) `, id, metaDate, pm.From, mailTo, pm.Subject, int64(size), hasAttach, msgID) return err } // SetIndexedAt marks an email as indexed in the database. func (s *Store) SetIndexedAt(ctx context.Context, id string) error { if s.db == nil { return nil } _, err := s.db.Exec(ctx, `UPDATE emails SET indexed_at = NOW() WHERE id = $1`, id) return err } // IsIndexed checks whether a given email has been indexed (indexed_at IS NOT NULL). func (s *Store) IsIndexed(ctx context.Context, id string) (bool, error) { if s.db == nil { return false, nil } var indexed bool err := s.db.QueryRow(ctx, `SELECT indexed_at IS NOT NULL FROM emails WHERE id = $1`, id, ).Scan(&indexed) if errors.Is(err, pgx.ErrNoRows) { return false, nil } return indexed, err } // ── Backfill ────────────────────────────────────────────────────────────── // Backfill walks the store directory, parses each email, inserts missing DB // metadata rows, and re-indexes emails that have indexed_at IS NULL. func (s *Store) Backfill(ctx context.Context, idx interface{ IndexSync(doc interface{}) error }, logger *slog.Logger) error { // We accept a generic indexer interface to avoid import cycles. // The caller (main.go) wraps the real index.Indexer. return s.BackfillWithFuncs(ctx, logger) } // BackfillWithFuncs walks the store and calls the provided functions for each email. // This is the implementation used by the coordinator in main.go. func (s *Store) BackfillWithFuncs(ctx context.Context, logger *slog.Logger) error { if s.db == nil { logger.Info("backfill: skipping, no database configured") return nil } storeDir := filepath.Join(s.dir, "store") count := 0 indexed := 0 errCount := 0 err := filepath.WalkDir(storeDir, func(path string, d fs.DirEntry, werr error) error { if werr != nil { return werr } if d.IsDir() { return nil } select { case <-ctx.Done(): return ctx.Err() default: } count++ id := d.Name() // Load the file (handles decryption) raw, err := s.Load(id) if err != nil { logger.Warn("backfill: load failed", "id", id, "err", err) errCount++ return nil // continue with next file } // Parse pm, err := mailparser.Parse(raw) if err != nil { logger.Warn("backfill: parse failed", "id", id, "err", err) errCount++ return nil } // Upsert metadata if err := s.SaveMeta(ctx, id, pm, len(raw)); err != nil { logger.Warn("backfill: save meta failed", "id", id, "err", err) } // Check if already indexed alreadyIndexed, err := s.IsIndexed(ctx, id) if err != nil { logger.Warn("backfill: check indexed failed", "id", id, "err", err) } if !alreadyIndexed { indexed++ } if count%100 == 0 { logger.Info("backfill: progress", "processed", count, "need_index", indexed, "errors", errCount) } return nil }) if err != nil { return fmt.Errorf("backfill: walk: %w", err) } logger.Info("backfill: complete", "total", count, "need_index", indexed, "errors", errCount) return nil } // WalkStore iterates over all files in the store directory and calls fn for each. // The fn receives the email ID. This is used by the backfill coordinator. func (s *Store) WalkStore(ctx context.Context, fn func(id string) error) error { storeDir := filepath.Join(s.dir, "store") return filepath.WalkDir(storeDir, func(path string, d fs.DirEntry, werr error) error { if werr != nil { return werr } if d.IsDir() { return nil } select { case <-ctx.Done(): return ctx.Err() default: } return fn(d.Name()) }) } // ── IMAP UID queries ────────────────────────────────────────────────────── // GetMailsWithUID returns all email IDs with stable UIDs for a tenant, ordered by uid ASC. // Used for shared IMAP mode. func (s *Store) GetMailsWithUID(ctx context.Context, tenantID *int64) ([]MailWithUID, error) { if s.db == nil { ids, err := s.GetAllIDsByTenant(ctx, tenantID) if err != nil { return nil, err } out := make([]MailWithUID, len(ids)) for i, id := range ids { out[i] = MailWithUID{ID: id, UID: int64(i + 1)} } return out, nil } var rows pgx.Rows var err error if tenantID == nil { rows, err = s.db.Query(ctx, `SELECT id, COALESCE(uid, 0) FROM emails ORDER BY uid ASC NULLS LAST`) } else { rows, err = s.db.Query(ctx, ` SELECT e.id, COALESCE(e.uid, 0) FROM email_refs r JOIN emails e ON e.id = r.email_id WHERE r.tenant_id = $1 ORDER BY e.uid ASC NULLS LAST`, *tenantID) } if err != nil { return nil, fmt.Errorf("storage: get mails with uid: %w", err) } defer rows.Close() var result []MailWithUID for rows.Next() { var m MailWithUID if err := rows.Scan(&m.ID, &m.UID); err == nil { result = append(result, m) } } return result, rows.Err() } // GetMailsByRecipient returns mails where mail_to contains the given email address. // Used for personal IMAP mode filtering. func (s *Store) GetMailsByRecipient(ctx context.Context, tenantID *int64, email string) ([]MailWithUID, error) { if s.db == nil || email == "" { return nil, nil } pattern := "%" + email + "%" var rows pgx.Rows var err error if tenantID == nil { rows, err = s.db.Query(ctx, `SELECT id, COALESCE(uid, 0) FROM emails WHERE mail_to ILIKE $1 ORDER BY uid ASC NULLS LAST`, pattern) } else { rows, err = s.db.Query(ctx, ` SELECT e.id, COALESCE(e.uid, 0) FROM email_refs r JOIN emails e ON e.id = r.email_id WHERE r.tenant_id = $1 AND e.mail_to ILIKE $2 ORDER BY e.uid ASC NULLS LAST`, *tenantID, pattern) } if err != nil { return nil, fmt.Errorf("storage: get mails by recipient: %w", err) } defer rows.Close() var result []MailWithUID for rows.Next() { var m MailWithUID if err := rows.Scan(&m.ID, &m.UID); err == nil { result = append(result, m) } } return result, rows.Err() } // ── File path helper ────────────────────────────────────────────────────── // filePath returns the on-disk path for a given mail ID. // Uses 2-char prefix sharding: {dir}/store/{id[:2]}/{id} func (s *Store) filePath(id string) string { return filepath.Join(s.dir, "store", id[:2], id) } // ── Integrity verification ──────────────────────────────────────────────── // VerifyIntegrity re-computes the SHA-256 of the stored plaintext and // compares it to the file ID. Updates verify_ok and verified_at in the DB. func (s *Store) VerifyIntegrity(ctx context.Context, id string) (bool, error) { raw, err := s.Load(id) if err != nil { return false, err } sum := sha256.Sum256(raw) computed := fmt.Sprintf("%x", sum[:]) ok := computed == id if s.db != nil { s.db.Exec(ctx, `UPDATE emails SET verify_ok=$1, verified_at=NOW() WHERE id=$2`, ok, id) } return ok, nil } // GetTenantForMail returns the tenant_id stored directly on the email record. // Returns nil if no tenant is assigned or the mail does not exist. func (s *Store) GetTenantForMail(ctx context.Context, id string) (*int64, error) { if s.db == nil { return nil, nil } var tenantID *int64 err := s.db.QueryRow(ctx, `SELECT tenant_id FROM emails WHERE id = $1`, id).Scan(&tenantID) if errors.Is(err, pgx.ErrNoRows) { return nil, nil } if err != nil { return nil, fmt.Errorf("storage: get tenant for mail: %w", err) } return tenantID, nil } // GetAllIDsByTenant returns all email IDs visible to a tenant. // If tenantID is nil, all IDs are returned (superadmin / no-tenant context). func (s *Store) GetAllIDsByTenant(ctx context.Context, tenantID *int64) ([]string, error) { if s.db != nil { var ( rows pgx.Rows err error ) if tenantID == nil { rows, err = s.db.Query(ctx, `SELECT id FROM emails ORDER BY received_at`) } else { rows, err = s.db.Query(ctx, `SELECT email_id FROM email_refs WHERE tenant_id = $1`, *tenantID) } if err != nil { return nil, fmt.Errorf("storage: get ids by tenant: %w", err) } defer rows.Close() var ids []string for rows.Next() { var id string if err := rows.Scan(&id); err != nil { continue } ids = append(ids, id) } return ids, rows.Err() } // fallback: walk store (no tenant filtering possible without DB) var ids []string err := s.WalkStore(ctx, func(id string) error { ids = append(ids, id) return nil }) return ids, err } // StatsByTenant returns mail count and total size filtered by tenant. // If tenantID is nil, aggregate over all emails. func (s *Store) StatsByTenant(ctx context.Context, tenantID *int64) (map[string]interface{}, error) { if s.db == nil { st, err := s.statsFromFS() if err != nil { return nil, err } return map[string]interface{}{ "count": st.TotalMails, "total_size": st.TotalBytes, }, nil } var count int64 var totalSize int64 if tenantID == nil { err := s.db.QueryRow(ctx, `SELECT COALESCE(COUNT(*),0), COALESCE(SUM(size_bytes),0) FROM emails`, ).Scan(&count, &totalSize) if err != nil { return nil, fmt.Errorf("storage: stats by tenant: %w", err) } } else { err := s.db.QueryRow(ctx, ` SELECT COALESCE(COUNT(e.id),0), COALESCE(SUM(e.size_bytes),0) FROM email_refs r JOIN emails e ON e.id = r.email_id WHERE r.tenant_id = $1 `, *tenantID).Scan(&count, &totalSize) if err != nil { return nil, fmt.Errorf("storage: stats by tenant: %w", err) } } return map[string]interface{}{ "count": count, "total_size": totalSize, }, nil } // IsWithoutTenant reports whether a mail has no tenant assignment — // neither a direct tenant_id nor any email_refs entry. // Used by the auditor role to gate direct mail access. func (s *Store) IsWithoutTenant(ctx context.Context, id string) (bool, error) { if s.db == nil { return false, nil } var result bool err := s.db.QueryRow(ctx, ` SELECT (e.tenant_id IS NULL) AND NOT EXISTS (SELECT 1 FROM email_refs r WHERE r.email_id = e.id) FROM emails e WHERE e.id = $1 `, id).Scan(&result) if errors.Is(err, pgx.ErrNoRows) { return false, nil } if err != nil { return false, fmt.Errorf("storage: is without tenant: %w", err) } return result, nil } // GetAllIDsWithoutTenant returns email IDs that have no tenant assignment. // Used for the auditor role: access to global (untenanted) mails only. func (s *Store) GetAllIDsWithoutTenant(ctx context.Context) ([]string, error) { if s.db == nil { return nil, nil } rows, err := s.db.Query(ctx, ` SELECT e.id FROM emails e WHERE e.tenant_id IS NULL AND NOT EXISTS (SELECT 1 FROM email_refs r WHERE r.email_id = e.id) ORDER BY e.received_at `) if err != nil { return nil, fmt.Errorf("storage: get ids without tenant: %w", err) } defer rows.Close() var ids []string for rows.Next() { var id string if err := rows.Scan(&id); err != nil { continue } ids = append(ids, id) } return ids, rows.Err() } // GetAllIDs returns all email IDs from the DB, or walks the store if no DB. func (s *Store) GetAllIDs(ctx context.Context) ([]string, error) { if s.db != nil { rows, err := s.db.Query(ctx, `SELECT id FROM emails ORDER BY received_at`) if err != nil { return nil, err } defer rows.Close() var ids []string for rows.Next() { var id string if err := rows.Scan(&id); err != nil { continue } ids = append(ids, id) } return ids, nil } // fallback: walk store var ids []string err := s.WalkStore(ctx, func(id string) error { ids = append(ids, id) return nil }) return ids, err } // VerifyStatus holds the result of an integrity verification. type VerifyStatus struct { VerifyOK *bool // nil = not yet checked VerifiedAt *time.Time } // GetVerifyStatus returns the stored verification status for a given email ID. func (s *Store) GetVerifyStatus(ctx context.Context, id string) (VerifyStatus, error) { var vs VerifyStatus if s.db == nil { return vs, nil } row := s.db.QueryRow(ctx, `SELECT verify_ok, verified_at FROM emails WHERE id=$1`, id) var ok *bool var at *time.Time if err := row.Scan(&ok, &at); err != nil { return vs, nil // not found = not verified } vs.VerifyOK = ok vs.VerifiedAt = at return vs, nil }