package storage import ( "context" "crypto/aes" "crypto/cipher" "crypto/rand" "crypto/sha256" "encoding/base64" "errors" "fmt" "io" "io/fs" "log/slog" "os" "path/filepath" "strings" "time" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" "github.com/archivmail/pkg/mailparser" ) // Config holds the configuration for initialising a Store. type Config struct { Dir string // base directory for file storage Keyfile string // path to 32-byte AES key file; empty = no encryption DSN string // PostgreSQL DSN; empty = no DB } // Store is a file-based email storage with optional AES-256-GCM encryption // and optional PostgreSQL metadata. type Store struct { dir string key []byte // nil = no encryption db *pgxpool.Pool // nil = no DB } // StoreStats reports total mail count and size in bytes. type StoreStats struct { TotalMails int64 TotalBytes int64 } // MailRef holds the ID and associated time of a stored mail. type MailRef struct { ID string ModTime time.Time } // New initialises the storage directory, optionally loads the encryption key // and connects to PostgreSQL. func New(cfg Config) (*Store, error) { for _, sub := range []string{"store", "attachments", "meta"} { if err := os.MkdirAll(filepath.Join(cfg.Dir, sub), 0o755); err != nil { return nil, fmt.Errorf("storage: mkdir %s: %w", sub, err) } } s := &Store{dir: cfg.Dir} // Load encryption key if err := s.loadKey(cfg.Keyfile); err != nil { return nil, err } // Connect to PostgreSQL and auto-create schema if cfg.DSN != "" { pool, err := pgxpool.New(context.Background(), cfg.DSN) if err != nil { return nil, fmt.Errorf("storage: db connect: %w", err) } s.db = pool if err := s.initSchema(context.Background()); err != nil { pool.Close() return nil, fmt.Errorf("storage: init schema: %w", err) } ctx := context.Background() _, _ = s.db.Exec(ctx, `ALTER TABLE emails ADD COLUMN IF NOT EXISTS verify_ok BOOLEAN`) _, _ = s.db.Exec(ctx, `ALTER TABLE emails ADD COLUMN IF NOT EXISTS verified_at TIMESTAMPTZ`) } return s, nil } // Close releases the database connection pool (if any). func (s *Store) Close() { if s.db != nil { s.db.Close() } } // ── Encryption key loading ──────────────────────────────────────────────── func (s *Store) loadKey(keyfile string) error { // Also check environment variable as fallback if keyfile == "" { keyfile = os.Getenv("ARCHIVMAIL_KEY") } if keyfile == "" { return nil // no encryption configured } // If the env var contains the key itself (not a path), it would be unusual. // We treat the value as a file path in all cases. data, err := os.ReadFile(keyfile) if err != nil { return fmt.Errorf("storage: read keyfile %s: %w", keyfile, err) } // Strip whitespace / newlines raw := strings.TrimSpace(string(data)) // Try base64 decode first (spec says base64-encoded 32-byte key) decoded, err := base64.StdEncoding.DecodeString(raw) if err != nil { // Fall back to raw bytes (for 32-byte binary key files) decoded = []byte(raw) } if len(decoded) != 32 { return fmt.Errorf("storage: keyfile must contain exactly 32 bytes (got %d)", len(decoded)) } s.key = decoded return nil } // ── AES-256-GCM encryption / decryption ─────────────────────────────────── // encrypt applies AES-256-GCM encryption. The 12-byte nonce is prepended // to the ciphertext. func (s *Store) encrypt(plaintext []byte) ([]byte, error) { block, err := aes.NewCipher(s.key) if err != nil { return nil, fmt.Errorf("storage: aes cipher: %w", err) } gcm, err := cipher.NewGCM(block) if err != nil { return nil, fmt.Errorf("storage: gcm: %w", err) } nonce := make([]byte, gcm.NonceSize()) // 12 bytes if _, err := io.ReadFull(rand.Reader, nonce); err != nil { return nil, fmt.Errorf("storage: random nonce: %w", err) } ciphertext := gcm.Seal(nonce, nonce, plaintext, nil) return ciphertext, nil } // decrypt reverses AES-256-GCM encryption. Expects nonce prepended to ciphertext. func (s *Store) decrypt(data []byte) ([]byte, error) { block, err := aes.NewCipher(s.key) if err != nil { return nil, fmt.Errorf("storage: aes cipher: %w", err) } gcm, err := cipher.NewGCM(block) if err != nil { return nil, fmt.Errorf("storage: gcm: %w", err) } nonceSize := gcm.NonceSize() if len(data) < nonceSize { return nil, fmt.Errorf("storage: ciphertext too short") } nonce, ciphertext := data[:nonceSize], data[nonceSize:] plaintext, err := gcm.Open(nil, nonce, ciphertext, nil) if err != nil { return nil, fmt.Errorf("storage: decrypt: %w", err) } return plaintext, nil } // ── Database schema ─────────────────────────────────────────────────────── func (s *Store) initSchema(ctx context.Context) error { _, err := s.db.Exec(ctx, ` CREATE TABLE IF NOT EXISTS emails ( id TEXT PRIMARY KEY, received_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), mail_from TEXT, mail_to TEXT, subject TEXT, size_bytes BIGINT, has_attach BOOLEAN DEFAULT FALSE, indexed_at TIMESTAMPTZ ); CREATE INDEX IF NOT EXISTS idx_emails_received_at ON emails (received_at); CREATE INDEX IF NOT EXISTS idx_emails_mail_from ON emails (mail_from); CREATE INDEX IF NOT EXISTS idx_emails_subject ON emails USING gin (to_tsvector('simple', subject)); `) if err != nil { return err } // Phase 2b migrations: tenant isolation _, err = s.db.Exec(ctx, ` ALTER TABLE emails ADD COLUMN IF NOT EXISTS tenant_id BIGINT; CREATE INDEX IF NOT EXISTS idx_emails_tenant ON emails (tenant_id); CREATE TABLE IF NOT EXISTS email_refs ( id BIGSERIAL PRIMARY KEY, email_id TEXT NOT NULL REFERENCES emails(id), tenant_id BIGINT NOT NULL, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), UNIQUE(email_id, tenant_id) ); CREATE INDEX IF NOT EXISTS idx_email_refs_tenant ON email_refs (tenant_id); CREATE INDEX IF NOT EXISTS idx_email_refs_email ON email_refs (email_id); `) return err } // ── Core operations ─────────────────────────────────────────────────────── // Save writes raw email bytes to storage. The ID is the hex-encoded SHA256 of // the plaintext content. If the file already exists, Save ensures an email_ref // exists for the tenant (cross-tenant dedup: one file, many refs). // tenantID may be nil for system-level ingestion without tenant assignment. func (s *Store) Save(ctx context.Context, raw []byte, _ time.Time, tenantID *int64) (string, error) { // Hash plaintext for dedup (always before encryption) sum := sha256.Sum256(raw) id := fmt.Sprintf("%x", sum[:]) // 64 hex chars path := s.filePath(id) if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil { return "", fmt.Errorf("storage: mkdir shard: %w", err) } fileExists := false if _, err := os.Stat(path); err == nil { fileExists = true } if !fileExists { // Determine what to write: encrypted or plaintext var toWrite []byte if s.key != nil { encrypted, err := s.encrypt(raw) if err != nil { return "", err } toWrite = encrypted } else { toWrite = raw } if err := os.WriteFile(path, toWrite, 0o644); err != nil { return "", fmt.Errorf("storage: write: %w", err) } // Insert metadata into DB (best-effort parse) if s.db != nil { pm, parseErr := mailparser.Parse(raw) if parseErr == nil { s.insertMeta(ctx, id, pm, len(raw), tenantID) } else { s.insertMetaMinimal(ctx, id, len(raw), tenantID) } } } // Ensure email_ref entry for this tenant (even if file already existed) if s.db != nil && tenantID != nil { _, _ = s.db.Exec(ctx, ` INSERT INTO email_refs (email_id, tenant_id) VALUES ($1, $2) ON CONFLICT (email_id, tenant_id) DO NOTHING `, id, *tenantID) } return id, nil } // Load reads a stored email by its ID. If encryption is configured, the file // is decrypted before returning plaintext. func (s *Store) Load(id string) ([]byte, error) { path := s.filePath(id) data, err := os.ReadFile(path) if err != nil { if errors.Is(err, os.ErrNotExist) { return nil, fmt.Errorf("storage: not found: %s", id) } return nil, fmt.Errorf("storage: read: %w", err) } if s.key != nil { plaintext, err := s.decrypt(data) if err != nil { // If decryption fails, the file might be stored unencrypted // (pre-encryption era). Return as-is for backwards compatibility. return data, nil } return plaintext, nil } return data, nil } // Delete removes a stored email by its ID, including its DB metadata row. func (s *Store) Delete(id string) error { path := s.filePath(id) if err := os.Remove(path); err != nil { if errors.Is(err, os.ErrNotExist) { return fmt.Errorf("storage: not found: %s", id) } return fmt.Errorf("storage: delete: %w", err) } if s.db != nil { _, _ = s.db.Exec(context.Background(), `DELETE FROM emails WHERE id = $1`, id) } return nil } // Stats returns aggregate statistics. Uses the DB if available (fast), otherwise // falls back to walking the file system. func (s *Store) Stats() (*StoreStats, error) { if s.db != nil { return s.statsFromDB() } return s.statsFromFS() } func (s *Store) statsFromDB() (*StoreStats, error) { var stats StoreStats err := s.db.QueryRow(context.Background(), `SELECT COALESCE(COUNT(*), 0), COALESCE(SUM(size_bytes), 0) FROM emails`, ).Scan(&stats.TotalMails, &stats.TotalBytes) if err != nil { return nil, fmt.Errorf("storage: stats from db: %w", err) } return &stats, nil } func (s *Store) statsFromFS() (*StoreStats, error) { var stats StoreStats err := filepath.WalkDir(filepath.Join(s.dir, "store"), func(_ string, d fs.DirEntry, err error) error { if err != nil { return err } if d.IsDir() { return nil } info, err := d.Info() if err != nil { return err } stats.TotalMails++ stats.TotalBytes += info.Size() return nil }) if err != nil { return nil, fmt.Errorf("storage: stats: %w", err) } return &stats, nil } // FirstAndLastMail returns the oldest and newest mail. Uses the DB if available // (much faster than walking the filesystem). func (s *Store) FirstAndLastMail() (first, last *MailRef, err error) { if s.db != nil { return s.firstAndLastFromDB() } return s.firstAndLastFromFS() } func (s *Store) firstAndLastFromDB() (first, last *MailRef, err error) { ctx := context.Background() // Oldest var fID string var fTime time.Time err = s.db.QueryRow(ctx, `SELECT id, received_at FROM emails ORDER BY received_at ASC LIMIT 1`, ).Scan(&fID, &fTime) if err != nil && !errors.Is(err, pgx.ErrNoRows) { return nil, nil, fmt.Errorf("storage: first mail: %w", err) } if err == nil { first = &MailRef{ID: fID, ModTime: fTime} } // Newest var lID string var lTime time.Time err = s.db.QueryRow(ctx, `SELECT id, received_at FROM emails ORDER BY received_at DESC LIMIT 1`, ).Scan(&lID, &lTime) if err != nil && !errors.Is(err, pgx.ErrNoRows) { return nil, nil, fmt.Errorf("storage: last mail: %w", err) } if err == nil { last = &MailRef{ID: lID, ModTime: lTime} } return first, last, nil } func (s *Store) firstAndLastFromFS() (first, last *MailRef, err error) { err = filepath.WalkDir(filepath.Join(s.dir, "store"), func(path string, d fs.DirEntry, werr error) error { if werr != nil { return werr } if d.IsDir() { return nil } info, err := d.Info() if err != nil { return err } ref := &MailRef{ID: d.Name(), ModTime: info.ModTime()} if first == nil || ref.ModTime.Before(first.ModTime) { first = ref } if last == nil || ref.ModTime.After(last.ModTime) { last = ref } return nil }) if err != nil { return nil, nil, fmt.Errorf("storage: first/last: %w", err) } return first, last, nil } // ── Metadata helpers ────────────────────────────────────────────────────── // insertMeta inserts parsed email metadata into the emails table. func (s *Store) insertMeta(ctx context.Context, id string, pm *mailparser.ParsedMail, size int, tenantID *int64) { mailTo := strings.Join(pm.To, ", ") hasAttach := len(pm.Attachments) > 0 _, _ = s.db.Exec(ctx, ` INSERT INTO emails (id, received_at, mail_from, mail_to, subject, size_bytes, has_attach, tenant_id) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT (id) DO NOTHING `, id, pm.Date, pm.From, mailTo, pm.Subject, int64(size), hasAttach, tenantID) } // insertMetaMinimal inserts minimal metadata when parsing fails. func (s *Store) insertMetaMinimal(ctx context.Context, id string, size int, tenantID *int64) { _, _ = s.db.Exec(ctx, ` INSERT INTO emails (id, received_at, size_bytes, tenant_id) VALUES ($1, NOW(), $2, $3) ON CONFLICT (id) DO NOTHING `, id, int64(size), tenantID) } // SaveMeta upserts metadata for a given email ID. Used by the backfill process. func (s *Store) SaveMeta(ctx context.Context, id string, pm *mailparser.ParsedMail, size int) error { if s.db == nil { return nil } mailTo := strings.Join(pm.To, ", ") hasAttach := len(pm.Attachments) > 0 _, err := s.db.Exec(ctx, ` INSERT INTO emails (id, received_at, mail_from, mail_to, subject, size_bytes, has_attach) VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (id) DO UPDATE SET mail_from = EXCLUDED.mail_from, mail_to = EXCLUDED.mail_to, subject = EXCLUDED.subject, size_bytes = EXCLUDED.size_bytes, has_attach = EXCLUDED.has_attach `, id, pm.Date, pm.From, mailTo, pm.Subject, int64(size), hasAttach) return err } // SetIndexedAt marks an email as indexed in the database. func (s *Store) SetIndexedAt(ctx context.Context, id string) error { if s.db == nil { return nil } _, err := s.db.Exec(ctx, `UPDATE emails SET indexed_at = NOW() WHERE id = $1`, id) return err } // IsIndexed checks whether a given email has been indexed (indexed_at IS NOT NULL). func (s *Store) IsIndexed(ctx context.Context, id string) (bool, error) { if s.db == nil { return false, nil } var indexed bool err := s.db.QueryRow(ctx, `SELECT indexed_at IS NOT NULL FROM emails WHERE id = $1`, id, ).Scan(&indexed) if errors.Is(err, pgx.ErrNoRows) { return false, nil } return indexed, err } // ── Backfill ────────────────────────────────────────────────────────────── // Backfill walks the store directory, parses each email, inserts missing DB // metadata rows, and re-indexes emails that have indexed_at IS NULL. func (s *Store) Backfill(ctx context.Context, idx interface{ IndexSync(doc interface{}) error }, logger *slog.Logger) error { // We accept a generic indexer interface to avoid import cycles. // The caller (main.go) wraps the real index.Indexer. return s.BackfillWithFuncs(ctx, logger) } // BackfillWithFuncs walks the store and calls the provided functions for each email. // This is the implementation used by the coordinator in main.go. func (s *Store) BackfillWithFuncs(ctx context.Context, logger *slog.Logger) error { if s.db == nil { logger.Info("backfill: skipping, no database configured") return nil } storeDir := filepath.Join(s.dir, "store") count := 0 indexed := 0 errCount := 0 err := filepath.WalkDir(storeDir, func(path string, d fs.DirEntry, werr error) error { if werr != nil { return werr } if d.IsDir() { return nil } select { case <-ctx.Done(): return ctx.Err() default: } count++ id := d.Name() // Load the file (handles decryption) raw, err := s.Load(id) if err != nil { logger.Warn("backfill: load failed", "id", id, "err", err) errCount++ return nil // continue with next file } // Parse pm, err := mailparser.Parse(raw) if err != nil { logger.Warn("backfill: parse failed", "id", id, "err", err) errCount++ return nil } // Upsert metadata if err := s.SaveMeta(ctx, id, pm, len(raw)); err != nil { logger.Warn("backfill: save meta failed", "id", id, "err", err) } // Check if already indexed alreadyIndexed, err := s.IsIndexed(ctx, id) if err != nil { logger.Warn("backfill: check indexed failed", "id", id, "err", err) } if !alreadyIndexed { indexed++ } if count%100 == 0 { logger.Info("backfill: progress", "processed", count, "need_index", indexed, "errors", errCount) } return nil }) if err != nil { return fmt.Errorf("backfill: walk: %w", err) } logger.Info("backfill: complete", "total", count, "need_index", indexed, "errors", errCount) return nil } // WalkStore iterates over all files in the store directory and calls fn for each. // The fn receives the email ID. This is used by the backfill coordinator. func (s *Store) WalkStore(ctx context.Context, fn func(id string) error) error { storeDir := filepath.Join(s.dir, "store") return filepath.WalkDir(storeDir, func(path string, d fs.DirEntry, werr error) error { if werr != nil { return werr } if d.IsDir() { return nil } select { case <-ctx.Done(): return ctx.Err() default: } return fn(d.Name()) }) } // ── File path helper ────────────────────────────────────────────────────── // filePath returns the on-disk path for a given mail ID. // Uses 2-char prefix sharding: {dir}/store/{id[:2]}/{id} func (s *Store) filePath(id string) string { return filepath.Join(s.dir, "store", id[:2], id) } // ── Integrity verification ──────────────────────────────────────────────── // VerifyIntegrity re-computes the SHA-256 of the stored plaintext and // compares it to the file ID. Updates verify_ok and verified_at in the DB. func (s *Store) VerifyIntegrity(ctx context.Context, id string) (bool, error) { raw, err := s.Load(id) if err != nil { return false, err } sum := sha256.Sum256(raw) computed := fmt.Sprintf("%x", sum[:]) ok := computed == id if s.db != nil { s.db.Exec(ctx, `UPDATE emails SET verify_ok=$1, verified_at=NOW() WHERE id=$2`, ok, id) } return ok, nil } // GetTenantForMail returns the tenant_id stored directly on the email record. // Returns nil if no tenant is assigned or the mail does not exist. func (s *Store) GetTenantForMail(ctx context.Context, id string) (*int64, error) { if s.db == nil { return nil, nil } var tenantID *int64 err := s.db.QueryRow(ctx, `SELECT tenant_id FROM emails WHERE id = $1`, id).Scan(&tenantID) if errors.Is(err, pgx.ErrNoRows) { return nil, nil } if err != nil { return nil, fmt.Errorf("storage: get tenant for mail: %w", err) } return tenantID, nil } // GetAllIDsByTenant returns all email IDs visible to a tenant. // If tenantID is nil, all IDs are returned (superadmin / no-tenant context). func (s *Store) GetAllIDsByTenant(ctx context.Context, tenantID *int64) ([]string, error) { if s.db != nil { var ( rows pgx.Rows err error ) if tenantID == nil { rows, err = s.db.Query(ctx, `SELECT id FROM emails ORDER BY received_at`) } else { rows, err = s.db.Query(ctx, `SELECT email_id FROM email_refs WHERE tenant_id = $1`, *tenantID) } if err != nil { return nil, fmt.Errorf("storage: get ids by tenant: %w", err) } defer rows.Close() var ids []string for rows.Next() { var id string if err := rows.Scan(&id); err != nil { continue } ids = append(ids, id) } return ids, rows.Err() } // fallback: walk store (no tenant filtering possible without DB) var ids []string err := s.WalkStore(ctx, func(id string) error { ids = append(ids, id) return nil }) return ids, err } // StatsByTenant returns mail count and total size filtered by tenant. // If tenantID is nil, aggregate over all emails. func (s *Store) StatsByTenant(ctx context.Context, tenantID *int64) (map[string]interface{}, error) { if s.db == nil { st, err := s.statsFromFS() if err != nil { return nil, err } return map[string]interface{}{ "count": st.TotalMails, "total_size": st.TotalBytes, }, nil } var count int64 var totalSize int64 if tenantID == nil { err := s.db.QueryRow(ctx, `SELECT COALESCE(COUNT(*),0), COALESCE(SUM(size_bytes),0) FROM emails`, ).Scan(&count, &totalSize) if err != nil { return nil, fmt.Errorf("storage: stats by tenant: %w", err) } } else { err := s.db.QueryRow(ctx, ` SELECT COALESCE(COUNT(e.id),0), COALESCE(SUM(e.size_bytes),0) FROM email_refs r JOIN emails e ON e.id = r.email_id WHERE r.tenant_id = $1 `, *tenantID).Scan(&count, &totalSize) if err != nil { return nil, fmt.Errorf("storage: stats by tenant: %w", err) } } return map[string]interface{}{ "count": count, "total_size": totalSize, }, nil } // GetAllIDs returns all email IDs from the DB, or walks the store if no DB. func (s *Store) GetAllIDs(ctx context.Context) ([]string, error) { if s.db != nil { rows, err := s.db.Query(ctx, `SELECT id FROM emails ORDER BY received_at`) if err != nil { return nil, err } defer rows.Close() var ids []string for rows.Next() { var id string if err := rows.Scan(&id); err != nil { continue } ids = append(ids, id) } return ids, nil } // fallback: walk store var ids []string err := s.WalkStore(ctx, func(id string) error { ids = append(ids, id) return nil }) return ids, err } // VerifyStatus holds the result of an integrity verification. type VerifyStatus struct { VerifyOK *bool // nil = not yet checked VerifiedAt *time.Time } // GetVerifyStatus returns the stored verification status for a given email ID. func (s *Store) GetVerifyStatus(ctx context.Context, id string) (VerifyStatus, error) { var vs VerifyStatus if s.db == nil { return vs, nil } row := s.db.QueryRow(ctx, `SELECT verify_ok, verified_at FROM emails WHERE id=$1`, id) var ok *bool var at *time.Time if err := row.Scan(&ok, &at); err != nil { return vs, nil // not found = not verified } vs.VerifyOK = ok vs.VerifiedAt = at return vs, nil }