package storage import ( "context" "crypto/aes" "crypto/cipher" "crypto/rand" "crypto/sha256" "encoding/base64" "errors" "fmt" "io" "io/fs" "log/slog" "os" "path/filepath" "strings" "time" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" "github.com/archivmail/pkg/mailparser" ) // Config holds the configuration for initialising a Store. type Config struct { Dir string // base directory for file storage Keyfile string // path to 32-byte AES key file; empty = no encryption DSN string // PostgreSQL DSN; empty = no DB } // Store is a file-based email storage with optional AES-256-GCM encryption // and optional PostgreSQL metadata. type Store struct { dir string key []byte // nil = no encryption db *pgxpool.Pool // nil = no DB } // StoreStats reports total mail count and size in bytes. type StoreStats struct { TotalMails int64 TotalBytes int64 } // MailRef holds the ID and associated time of a stored mail. type MailRef struct { ID string ModTime time.Time } // New initialises the storage directory, optionally loads the encryption key // and connects to PostgreSQL. func New(cfg Config) (*Store, error) { for _, sub := range []string{"store", "attachments", "meta"} { if err := os.MkdirAll(filepath.Join(cfg.Dir, sub), 0o755); err != nil { return nil, fmt.Errorf("storage: mkdir %s: %w", sub, err) } } s := &Store{dir: cfg.Dir} // Load encryption key if err := s.loadKey(cfg.Keyfile); err != nil { return nil, err } // Connect to PostgreSQL and auto-create schema if cfg.DSN != "" { pool, err := pgxpool.New(context.Background(), cfg.DSN) if err != nil { return nil, fmt.Errorf("storage: db connect: %w", err) } s.db = pool if err := s.initSchema(context.Background()); err != nil { pool.Close() return nil, fmt.Errorf("storage: init schema: %w", err) } ctx := context.Background() _, _ = s.db.Exec(ctx, `ALTER TABLE emails ADD COLUMN IF NOT EXISTS verify_ok BOOLEAN`) _, _ = s.db.Exec(ctx, `ALTER TABLE emails ADD COLUMN IF NOT EXISTS verified_at TIMESTAMPTZ`) } return s, nil } // Close releases the database connection pool (if any). func (s *Store) Close() { if s.db != nil { s.db.Close() } } // ── Encryption key loading ──────────────────────────────────────────────── func (s *Store) loadKey(keyfile string) error { // Also check environment variable as fallback if keyfile == "" { keyfile = os.Getenv("ARCHIVMAIL_KEY") } if keyfile == "" { return nil // no encryption configured } // If the env var contains the key itself (not a path), it would be unusual. // We treat the value as a file path in all cases. data, err := os.ReadFile(keyfile) if err != nil { return fmt.Errorf("storage: read keyfile %s: %w", keyfile, err) } // Strip whitespace / newlines raw := strings.TrimSpace(string(data)) // Try base64 decode first (spec says base64-encoded 32-byte key) decoded, err := base64.StdEncoding.DecodeString(raw) if err != nil { // Fall back to raw bytes (for 32-byte binary key files) decoded = []byte(raw) } if len(decoded) != 32 { return fmt.Errorf("storage: keyfile must contain exactly 32 bytes (got %d)", len(decoded)) } s.key = decoded return nil } // ── AES-256-GCM encryption / decryption ─────────────────────────────────── // encrypt applies AES-256-GCM encryption. The 12-byte nonce is prepended // to the ciphertext. func (s *Store) encrypt(plaintext []byte) ([]byte, error) { block, err := aes.NewCipher(s.key) if err != nil { return nil, fmt.Errorf("storage: aes cipher: %w", err) } gcm, err := cipher.NewGCM(block) if err != nil { return nil, fmt.Errorf("storage: gcm: %w", err) } nonce := make([]byte, gcm.NonceSize()) // 12 bytes if _, err := io.ReadFull(rand.Reader, nonce); err != nil { return nil, fmt.Errorf("storage: random nonce: %w", err) } ciphertext := gcm.Seal(nonce, nonce, plaintext, nil) return ciphertext, nil } // decrypt reverses AES-256-GCM encryption. Expects nonce prepended to ciphertext. func (s *Store) decrypt(data []byte) ([]byte, error) { block, err := aes.NewCipher(s.key) if err != nil { return nil, fmt.Errorf("storage: aes cipher: %w", err) } gcm, err := cipher.NewGCM(block) if err != nil { return nil, fmt.Errorf("storage: gcm: %w", err) } nonceSize := gcm.NonceSize() if len(data) < nonceSize { return nil, fmt.Errorf("storage: ciphertext too short") } nonce, ciphertext := data[:nonceSize], data[nonceSize:] plaintext, err := gcm.Open(nil, nonce, ciphertext, nil) if err != nil { return nil, fmt.Errorf("storage: decrypt: %w", err) } return plaintext, nil } // ── Database schema ─────────────────────────────────────────────────────── func (s *Store) initSchema(ctx context.Context) error { _, err := s.db.Exec(ctx, ` CREATE TABLE IF NOT EXISTS emails ( id TEXT PRIMARY KEY, received_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), mail_from TEXT, mail_to TEXT, subject TEXT, size_bytes BIGINT, has_attach BOOLEAN DEFAULT FALSE, indexed_at TIMESTAMPTZ ); CREATE INDEX IF NOT EXISTS idx_emails_received_at ON emails (received_at); CREATE INDEX IF NOT EXISTS idx_emails_mail_from ON emails (mail_from); CREATE INDEX IF NOT EXISTS idx_emails_subject ON emails USING gin (to_tsvector('simple', subject)); `) return err } // ── Core operations ─────────────────────────────────────────────────────── // Save writes raw email bytes to storage. The ID is the hex-encoded SHA256 of // the plaintext content. If the file already exists, Save is a no-op (dedup). // It also inserts metadata into the emails table if a DB is configured. func (s *Store) Save(raw []byte, _ time.Time) (string, error) { // Hash plaintext for dedup (always before encryption) sum := sha256.Sum256(raw) id := fmt.Sprintf("%x", sum[:]) // 64 hex chars path := s.filePath(id) if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil { return "", fmt.Errorf("storage: mkdir shard: %w", err) } // Dedup: if file already exists, return same id if _, err := os.Stat(path); err == nil { return id, nil } // Determine what to write: encrypted or plaintext var toWrite []byte if s.key != nil { encrypted, err := s.encrypt(raw) if err != nil { return "", err } toWrite = encrypted } else { toWrite = raw } if err := os.WriteFile(path, toWrite, 0o644); err != nil { return "", fmt.Errorf("storage: write: %w", err) } // Insert metadata into DB (best-effort parse) if s.db != nil { pm, parseErr := mailparser.Parse(raw) if parseErr == nil { s.insertMeta(context.Background(), id, pm, len(raw)) } else { // Insert minimal metadata even if parse fails s.insertMetaMinimal(context.Background(), id, len(raw)) } } return id, nil } // Load reads a stored email by its ID. If encryption is configured, the file // is decrypted before returning plaintext. func (s *Store) Load(id string) ([]byte, error) { path := s.filePath(id) data, err := os.ReadFile(path) if err != nil { if errors.Is(err, os.ErrNotExist) { return nil, fmt.Errorf("storage: not found: %s", id) } return nil, fmt.Errorf("storage: read: %w", err) } if s.key != nil { plaintext, err := s.decrypt(data) if err != nil { // If decryption fails, the file might be stored unencrypted // (pre-encryption era). Return as-is for backwards compatibility. return data, nil } return plaintext, nil } return data, nil } // Delete removes a stored email by its ID, including its DB metadata row. func (s *Store) Delete(id string) error { path := s.filePath(id) if err := os.Remove(path); err != nil { if errors.Is(err, os.ErrNotExist) { return fmt.Errorf("storage: not found: %s", id) } return fmt.Errorf("storage: delete: %w", err) } if s.db != nil { _, _ = s.db.Exec(context.Background(), `DELETE FROM emails WHERE id = $1`, id) } return nil } // Stats returns aggregate statistics. Uses the DB if available (fast), otherwise // falls back to walking the file system. func (s *Store) Stats() (*StoreStats, error) { if s.db != nil { return s.statsFromDB() } return s.statsFromFS() } func (s *Store) statsFromDB() (*StoreStats, error) { var stats StoreStats err := s.db.QueryRow(context.Background(), `SELECT COALESCE(COUNT(*), 0), COALESCE(SUM(size_bytes), 0) FROM emails`, ).Scan(&stats.TotalMails, &stats.TotalBytes) if err != nil { return nil, fmt.Errorf("storage: stats from db: %w", err) } return &stats, nil } func (s *Store) statsFromFS() (*StoreStats, error) { var stats StoreStats err := filepath.WalkDir(filepath.Join(s.dir, "store"), func(_ string, d fs.DirEntry, err error) error { if err != nil { return err } if d.IsDir() { return nil } info, err := d.Info() if err != nil { return err } stats.TotalMails++ stats.TotalBytes += info.Size() return nil }) if err != nil { return nil, fmt.Errorf("storage: stats: %w", err) } return &stats, nil } // FirstAndLastMail returns the oldest and newest mail. Uses the DB if available // (much faster than walking the filesystem). func (s *Store) FirstAndLastMail() (first, last *MailRef, err error) { if s.db != nil { return s.firstAndLastFromDB() } return s.firstAndLastFromFS() } func (s *Store) firstAndLastFromDB() (first, last *MailRef, err error) { ctx := context.Background() // Oldest var fID string var fTime time.Time err = s.db.QueryRow(ctx, `SELECT id, received_at FROM emails ORDER BY received_at ASC LIMIT 1`, ).Scan(&fID, &fTime) if err != nil && !errors.Is(err, pgx.ErrNoRows) { return nil, nil, fmt.Errorf("storage: first mail: %w", err) } if err == nil { first = &MailRef{ID: fID, ModTime: fTime} } // Newest var lID string var lTime time.Time err = s.db.QueryRow(ctx, `SELECT id, received_at FROM emails ORDER BY received_at DESC LIMIT 1`, ).Scan(&lID, &lTime) if err != nil && !errors.Is(err, pgx.ErrNoRows) { return nil, nil, fmt.Errorf("storage: last mail: %w", err) } if err == nil { last = &MailRef{ID: lID, ModTime: lTime} } return first, last, nil } func (s *Store) firstAndLastFromFS() (first, last *MailRef, err error) { err = filepath.WalkDir(filepath.Join(s.dir, "store"), func(path string, d fs.DirEntry, werr error) error { if werr != nil { return werr } if d.IsDir() { return nil } info, err := d.Info() if err != nil { return err } ref := &MailRef{ID: d.Name(), ModTime: info.ModTime()} if first == nil || ref.ModTime.Before(first.ModTime) { first = ref } if last == nil || ref.ModTime.After(last.ModTime) { last = ref } return nil }) if err != nil { return nil, nil, fmt.Errorf("storage: first/last: %w", err) } return first, last, nil } // ── Metadata helpers ────────────────────────────────────────────────────── // insertMeta inserts parsed email metadata into the emails table. func (s *Store) insertMeta(ctx context.Context, id string, pm *mailparser.ParsedMail, size int) { mailTo := strings.Join(pm.To, ", ") hasAttach := len(pm.Attachments) > 0 _, _ = s.db.Exec(ctx, ` INSERT INTO emails (id, received_at, mail_from, mail_to, subject, size_bytes, has_attach) VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (id) DO NOTHING `, id, pm.Date, pm.From, mailTo, pm.Subject, int64(size), hasAttach) } // insertMetaMinimal inserts minimal metadata when parsing fails. func (s *Store) insertMetaMinimal(ctx context.Context, id string, size int) { _, _ = s.db.Exec(ctx, ` INSERT INTO emails (id, received_at, size_bytes) VALUES ($1, NOW(), $2) ON CONFLICT (id) DO NOTHING `, id, int64(size)) } // SaveMeta upserts metadata for a given email ID. Used by the backfill process. func (s *Store) SaveMeta(ctx context.Context, id string, pm *mailparser.ParsedMail, size int) error { if s.db == nil { return nil } mailTo := strings.Join(pm.To, ", ") hasAttach := len(pm.Attachments) > 0 _, err := s.db.Exec(ctx, ` INSERT INTO emails (id, received_at, mail_from, mail_to, subject, size_bytes, has_attach) VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (id) DO UPDATE SET mail_from = EXCLUDED.mail_from, mail_to = EXCLUDED.mail_to, subject = EXCLUDED.subject, size_bytes = EXCLUDED.size_bytes, has_attach = EXCLUDED.has_attach `, id, pm.Date, pm.From, mailTo, pm.Subject, int64(size), hasAttach) return err } // SetIndexedAt marks an email as indexed in the database. func (s *Store) SetIndexedAt(ctx context.Context, id string) error { if s.db == nil { return nil } _, err := s.db.Exec(ctx, `UPDATE emails SET indexed_at = NOW() WHERE id = $1`, id) return err } // IsIndexed checks whether a given email has been indexed (indexed_at IS NOT NULL). func (s *Store) IsIndexed(ctx context.Context, id string) (bool, error) { if s.db == nil { return false, nil } var indexed bool err := s.db.QueryRow(ctx, `SELECT indexed_at IS NOT NULL FROM emails WHERE id = $1`, id, ).Scan(&indexed) if errors.Is(err, pgx.ErrNoRows) { return false, nil } return indexed, err } // ── Backfill ────────────────────────────────────────────────────────────── // Backfill walks the store directory, parses each email, inserts missing DB // metadata rows, and re-indexes emails that have indexed_at IS NULL. func (s *Store) Backfill(ctx context.Context, idx interface{ IndexSync(doc interface{}) error }, logger *slog.Logger) error { // We accept a generic indexer interface to avoid import cycles. // The caller (main.go) wraps the real index.Indexer. return s.BackfillWithFuncs(ctx, logger) } // BackfillWithFuncs walks the store and calls the provided functions for each email. // This is the implementation used by the coordinator in main.go. func (s *Store) BackfillWithFuncs(ctx context.Context, logger *slog.Logger) error { if s.db == nil { logger.Info("backfill: skipping, no database configured") return nil } storeDir := filepath.Join(s.dir, "store") count := 0 indexed := 0 errCount := 0 err := filepath.WalkDir(storeDir, func(path string, d fs.DirEntry, werr error) error { if werr != nil { return werr } if d.IsDir() { return nil } select { case <-ctx.Done(): return ctx.Err() default: } count++ id := d.Name() // Load the file (handles decryption) raw, err := s.Load(id) if err != nil { logger.Warn("backfill: load failed", "id", id, "err", err) errCount++ return nil // continue with next file } // Parse pm, err := mailparser.Parse(raw) if err != nil { logger.Warn("backfill: parse failed", "id", id, "err", err) errCount++ return nil } // Upsert metadata if err := s.SaveMeta(ctx, id, pm, len(raw)); err != nil { logger.Warn("backfill: save meta failed", "id", id, "err", err) } // Check if already indexed alreadyIndexed, err := s.IsIndexed(ctx, id) if err != nil { logger.Warn("backfill: check indexed failed", "id", id, "err", err) } if !alreadyIndexed { indexed++ } if count%100 == 0 { logger.Info("backfill: progress", "processed", count, "need_index", indexed, "errors", errCount) } return nil }) if err != nil { return fmt.Errorf("backfill: walk: %w", err) } logger.Info("backfill: complete", "total", count, "need_index", indexed, "errors", errCount) return nil } // WalkStore iterates over all files in the store directory and calls fn for each. // The fn receives the email ID. This is used by the backfill coordinator. func (s *Store) WalkStore(ctx context.Context, fn func(id string) error) error { storeDir := filepath.Join(s.dir, "store") return filepath.WalkDir(storeDir, func(path string, d fs.DirEntry, werr error) error { if werr != nil { return werr } if d.IsDir() { return nil } select { case <-ctx.Done(): return ctx.Err() default: } return fn(d.Name()) }) } // ── File path helper ────────────────────────────────────────────────────── // filePath returns the on-disk path for a given mail ID. // Uses 2-char prefix sharding: {dir}/store/{id[:2]}/{id} func (s *Store) filePath(id string) string { return filepath.Join(s.dir, "store", id[:2], id) } // ── Integrity verification ──────────────────────────────────────────────── // VerifyIntegrity re-computes the SHA-256 of the stored plaintext and // compares it to the file ID. Updates verify_ok and verified_at in the DB. func (s *Store) VerifyIntegrity(ctx context.Context, id string) (bool, error) { raw, err := s.Load(id) if err != nil { return false, err } sum := sha256.Sum256(raw) computed := fmt.Sprintf("%x", sum[:]) ok := computed == id if s.db != nil { s.db.Exec(ctx, `UPDATE emails SET verify_ok=$1, verified_at=NOW() WHERE id=$2`, ok, id) } return ok, nil } // GetAllIDs returns all email IDs from the DB, or walks the store if no DB. func (s *Store) GetAllIDs(ctx context.Context) ([]string, error) { if s.db != nil { rows, err := s.db.Query(ctx, `SELECT id FROM emails ORDER BY received_at`) if err != nil { return nil, err } defer rows.Close() var ids []string for rows.Next() { var id string if err := rows.Scan(&id); err != nil { continue } ids = append(ids, id) } return ids, nil } // fallback: walk store var ids []string err := s.WalkStore(ctx, func(id string) error { ids = append(ids, id) return nil }) return ids, err } // VerifyStatus holds the result of an integrity verification. type VerifyStatus struct { VerifyOK *bool // nil = not yet checked VerifiedAt *time.Time } // GetVerifyStatus returns the stored verification status for a given email ID. func (s *Store) GetVerifyStatus(ctx context.Context, id string) (VerifyStatus, error) { var vs VerifyStatus if s.db == nil { return vs, nil } row := s.db.QueryRow(ctx, `SELECT verify_ok, verified_at FROM emails WHERE id=$1`, id) var ok *bool var at *time.Time if err := row.Scan(&ok, &at); err != nil { return vs, nil // not found = not verified } vs.VerifyOK = ok vs.VerifiedAt = at return vs, nil }