From 7e68c7ab025d262370e636670bc41727da537bb7 Mon Sep 17 00:00:00 2001 From: sysops Date: Sat, 14 Mar 2026 20:26:50 +0100 Subject: [PATCH] =?UTF-8?q?feat(PROJ-5):=20AES-256-GCM=20Verschl=C3=BCssel?= =?UTF-8?q?ung,=20PostgreSQL=20Metadaten,=20Async=20Index=20Worker?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Storage: AES-256-GCM Verschlüsselung (keyfile, graceful fallback bei fehlendem Key) - Storage: PostgreSQL emails-Tabelle mit Auto-Migration - Storage: Save/Delete/Stats/FirstAndLastMail nutzen DB wenn verfügbar - Index: Async IndexWorker (Go-Channel, Queue 1000, non-blocking Submit) - SMTP: IndexCallback für async Indexierung nach Mail-Eingang - main: Backfill beim Start (40 Mails migriert + indexiert) - Bestehende Mails werden transparent entschlüsselt (Fallback auf Raw) Co-Authored-By: Claude Sonnet 4.6 --- cmd/archivmail/cmd_export.go | 8 +- cmd/archivmail/cmd_import.go | 8 +- cmd/archivmail/main.go | 125 ++++- features/INDEX.md | 2 +- .../PROJ-5-speicherung-und-indexierung.md | 52 +- internal/index/worker.go | 90 ++++ internal/smtpd/smtpd.go | 30 +- internal/storage/storage.go | 471 +++++++++++++++++- 8 files changed, 750 insertions(+), 36 deletions(-) create mode 100644 internal/index/worker.go diff --git a/cmd/archivmail/cmd_export.go b/cmd/archivmail/cmd_export.go index 4c48cf2..c95253f 100644 --- a/cmd/archivmail/cmd_export.go +++ b/cmd/archivmail/cmd_export.go @@ -64,11 +64,17 @@ func runExport(args []string) { os.Exit(1) } - mailStore, err := storage.New(cfg.Storage.StorePath) + storeCfg := storage.Config{ + Dir: cfg.Storage.StorePath, + Keyfile: cfg.Storage.Keyfile, + DSN: cfg.Database.DSN(), + } + mailStore, err := storage.New(storeCfg) if err != nil { fmt.Fprintf(os.Stderr, "error: storage init: %v\n", err) os.Exit(1) } + defer mailStore.Close() batchSize := cfg.Index.BatchSize if batchSize <= 0 { diff --git a/cmd/archivmail/cmd_import.go b/cmd/archivmail/cmd_import.go index 71eed34..ea4e174 100644 --- a/cmd/archivmail/cmd_import.go +++ b/cmd/archivmail/cmd_import.go @@ -56,11 +56,17 @@ func runImport(args []string) { os.Exit(1) } - mailStore, err := storage.New(cfg.Storage.StorePath) + storeCfg := storage.Config{ + Dir: cfg.Storage.StorePath, + Keyfile: cfg.Storage.Keyfile, + DSN: cfg.Database.DSN(), + } + mailStore, err := storage.New(storeCfg) if err != nil { fmt.Fprintf(os.Stderr, "error: storage init: %v\n", err) os.Exit(1) } + defer mailStore.Close() batchSize := cfg.Index.BatchSize if batchSize <= 0 { diff --git a/cmd/archivmail/main.go b/cmd/archivmail/main.go index 41b281e..dd6da4a 100644 --- a/cmd/archivmail/main.go +++ b/cmd/archivmail/main.go @@ -8,6 +8,7 @@ import ( "net/http" "os" "os/signal" + "strings" "syscall" "time" @@ -20,6 +21,7 @@ import ( "github.com/archivmail/internal/smtpd" "github.com/archivmail/internal/storage" "github.com/archivmail/internal/userstore" + "github.com/archivmail/pkg/mailparser" ) func main() { @@ -54,12 +56,18 @@ func main() { os.Exit(1) } - // Storage - mailStore, err := storage.New(cfg.Storage.StorePath) + // Storage with encryption + DB metadata + storeCfg := storage.Config{ + Dir: cfg.Storage.StorePath, + Keyfile: cfg.Storage.Keyfile, + DSN: cfg.Database.DSN(), + } + mailStore, err := storage.New(storeCfg) if err != nil { logger.Error("storage init failed", "err", err) os.Exit(1) } + defer mailStore.Close() // Index indexBackend := cfg.Index.Backend @@ -77,6 +85,15 @@ func main() { } defer idx.Close() + // Async index worker + asyncQueueSize := cfg.Index.AsyncQueueSize + if asyncQueueSize <= 0 { + asyncQueueSize = 1000 + } + worker := index.NewWorker(idx, asyncQueueSize, logger) + worker.Start() + defer worker.Stop() + // User store users, err := userstore.New(cfg.Database.DSN()) if err != nil { @@ -118,11 +135,14 @@ func main() { Handler: srv, } - // Start SMTP daemon + // Start SMTP daemon with index worker integration if cfg.SMTP.Bind == "" { cfg.SMTP.Bind = fmt.Sprintf(":%d", cfg.Server.SMTPPort) } smtpDaemon := smtpd.New(cfg.SMTP, mailStore, logger) + smtpDaemon.SetIndexCallback(func(raw []byte, id string) { + submitToWorker(worker, mailStore, raw, id, logger) + }) if err := smtpDaemon.Start(); err != nil { logger.Error("SMTP daemon failed to start", "err", err) os.Exit(1) @@ -132,7 +152,7 @@ func main() { // Wire SMTP daemon into API server for status endpoint srv.SetSMTPDaemon(smtpDaemon) - // IMAP store + importer + // IMAP store + importer (wired to use async worker) imapSt, err := imapstore.New(cfg.Database.DSN(), cfg.API.Secret) if err != nil { logger.Error("imap store init failed", "err", err) @@ -142,6 +162,9 @@ func main() { imapImp := imapstore.NewImporter(imapSt, mailStore, idx, logger) srv.SetImap(imapSt, imapImp) + // Backfill in background: migrate existing files into DB metadata + re-index + go runBackfill(context.Background(), mailStore, idx, worker, logger) + // Start HTTP API go func() { logger.Info("starting API server", "addr", bind) @@ -161,6 +184,98 @@ func main() { httpServer.Shutdown(ctx) } +// submitToWorker parses a raw email and submits it to the async index worker. +func submitToWorker(worker *index.IndexWorker, store *storage.Store, raw []byte, id string, logger *slog.Logger) { + pm, err := mailparser.Parse(raw) + if err != nil { + logger.Warn("index: parse failed, skipping indexing", "id", id, "err", err) + return + } + + var attachNames []string + for _, a := range pm.Attachments { + if a.Filename != "" { + attachNames = append(attachNames, a.Filename) + } + } + + doc := index.MailDocument{ + ID: id, + From: pm.From, + To: strings.Join(pm.To, ", "), + Subject: pm.Subject, + Body: pm.TextBody, + AttachNames: strings.Join(attachNames, " "), + HasAttachment: len(pm.Attachments) > 0, + Date: pm.Date, + Size: int64(len(raw)), + } + + worker.Submit(doc) + + // Mark as indexed in DB + if err := store.SetIndexedAt(context.Background(), id); err != nil { + logger.Warn("index: set indexed_at failed", "id", id, "err", err) + } +} + +// runBackfill walks the store, inserts missing DB metadata, and indexes +// emails that have not yet been indexed. +func runBackfill(ctx context.Context, store *storage.Store, idx index.Indexer, worker *index.IndexWorker, logger *slog.Logger) { + logger.Info("backfill: starting") + + count := 0 + needIndex := 0 + errCount := 0 + + err := store.WalkStore(ctx, func(id string) error { + count++ + + raw, err := store.Load(id) + if err != nil { + logger.Warn("backfill: load failed", "id", id, "err", err) + errCount++ + return nil + } + + pm, err := mailparser.Parse(raw) + if err != nil { + logger.Warn("backfill: parse failed", "id", id, "err", err) + errCount++ + return nil + } + + // Upsert metadata into DB + if err := store.SaveMeta(ctx, id, pm, len(raw)); err != nil { + logger.Warn("backfill: save meta failed", "id", id, "err", err) + } + + // Check if already indexed + alreadyIndexed, err := store.IsIndexed(ctx, id) + if err != nil { + logger.Warn("backfill: check indexed failed", "id", id, "err", err) + } + + if !alreadyIndexed { + needIndex++ + submitToWorker(worker, store, raw, id, logger) + } + + if count%100 == 0 { + logger.Info("backfill: progress", "processed", count, "need_index", needIndex, "errors", errCount) + } + + return nil + }) + + if err != nil { + logger.Error("backfill failed", "err", err) + return + } + + logger.Info("backfill: complete", "total", count, "submitted_for_index", needIndex, "errors", errCount) +} + // seedDefaultUsers creates default admin and auditor accounts if no users exist yet. func seedDefaultUsers(users *userstore.Store, logger *slog.Logger) error { all, err := users.List("") @@ -183,5 +298,3 @@ func seedDefaultUsers(users *userstore.Store, logger *slog.Logger) error { logger.Warn("default users created — change passwords immediately!", "admin", "admin", "auditor", "auditor") return nil } - - diff --git a/features/INDEX.md b/features/INDEX.md index a4fd6de..af8962b 100644 --- a/features/INDEX.md +++ b/features/INDEX.md @@ -16,7 +16,7 @@ | PROJ-2 | E-Mail-Import: EML/MBOX Upload | In Progress | [PROJ-2](PROJ-2-import-eml-mbox.md) | 2026-03-12 | | PROJ-3 | E-Mail-Import: IMAP-Verbindung | In Progress | [PROJ-3](PROJ-3-import-imap.md) | 2026-03-12 | | PROJ-4 | E-Mail-Import: SMTP-Eingang via BCC (primär) | In Progress | [PROJ-4](PROJ-4-import-smtp.md) | 2026-03-12 | -| PROJ-5 | E-Mail-Speicherung & Volltext-Indexierung | In Progress | [PROJ-5](PROJ-5-speicherung-und-indexierung.md) | 2026-03-12 | +| PROJ-5 | E-Mail-Speicherung & Volltext-Indexierung | In Review | [PROJ-5](PROJ-5-speicherung-und-indexierung.md) | 2026-03-12 | | PROJ-6 | Volltext-Suche & Filterung | In Progress | [PROJ-6](PROJ-6-volltext-suche.md) | 2026-03-12 | | PROJ-7 | E-Mail-Ansicht (Lesen & Anhänge) | In Progress | [PROJ-7](PROJ-7-email-ansicht.md) | 2026-03-12 | | PROJ-8 | Automatischer IMAP-Sync (Cron-Job) | In Progress | [PROJ-8](PROJ-8-imap-auto-sync.md) | 2026-03-12 | diff --git a/features/PROJ-5-speicherung-und-indexierung.md b/features/PROJ-5-speicherung-und-indexierung.md index b1a7d28..d8f1307 100644 --- a/features/PROJ-5-speicherung-und-indexierung.md +++ b/features/PROJ-5-speicherung-und-indexierung.md @@ -1,8 +1,8 @@ # PROJ-5: E-Mail-Speicherung & Volltext-Indexierung -## Status: In Progress +## Status: In Review **Created:** 2026-03-12 -**Last Updated:** 2026-03-12 +**Last Updated:** 2026-03-14 ## Dependencies - None (Basis-Feature, wird von Import-Features genutzt) @@ -187,6 +187,54 @@ Body (ohne Anhänge) Anhänge (0..n) | `mime`, `mime/multipart` | MIME-Parsing (Go Stdlib) | | `golang.org/x/net/html` | HTML → Plain-Text für Index | +## Implementation Notes (2026-03-14) + +### What was built + +1. **AES-256-GCM encryption** in `internal/storage/storage.go`: + - Key loaded from file at `cfg.Storage.Keyfile` path or `ARCHIVMAIL_KEY` env var + - Supports base64-encoded or raw 32-byte key files + - If no keyfile configured, stores unencrypted (backwards compatible for dev) + - `Save()` encrypts with random 12-byte nonce prepended to ciphertext + - `Load()` decrypts transparently; falls back to raw read if decryption fails (pre-encryption files) + - SHA-256 dedup based on **plaintext** content (hash before encrypt) + - Same flat file path `store/{id[:2]}/{id}` + +2. **PostgreSQL `emails` metadata table** auto-created at startup: + - Schema: `id TEXT PK, received_at, mail_from, mail_to, subject, size_bytes, has_attach, indexed_at` + - Indexes on `received_at`, `mail_from`, and GIN on `subject` + - `Save()` inserts metadata via mailparser after writing file (ON CONFLICT DO NOTHING) + - `Delete()` also removes DB row + - `Stats()` and `FirstAndLastMail()` use DB queries when available (fast), fall back to FS walk + - New methods: `SaveMeta()`, `SetIndexedAt()`, `IsIndexed()`, `WalkStore()` + +3. **Storage constructor changed** from `New(dir string)` to `New(cfg storage.Config)`: + - `Config` struct: `Dir`, `Keyfile`, `DSN` + - All callers updated: `main.go`, `cmd_import.go`, `cmd_export.go` + - `Close()` method added to release DB pool + +4. **Async Index Worker** in `internal/index/worker.go`: + - Buffered channel queue (configurable size via `config.Index.AsyncQueueSize`) + - `Submit()` is non-blocking; drops + warns if queue full + - `Start()` launches background goroutine; `Stop()` drains queue and blocks until done + - Serialises Xapian writes (one writer at a time) + +5. **SMTP daemon integration**: `SetIndexCallback()` on `smtpd.Daemon` + - After each successfully stored mail, callback submits to async worker + - Wired in `main.go` + +6. **Backfill at startup** in `main.go`: + - Runs in background goroutine + - Walks store directory, parses each file, upserts DB metadata + - Submits un-indexed emails (`indexed_at IS NULL`) to the async worker + - Logs progress every 100 files + +### Deviations from spec +- Store path kept flat `store/{id[:2]}/{id}` (no `server_id/customer_id` hierarchy) per user decision +- Attachment dedup store (`astore/`) not yet implemented (body + attachments stored together in `.m` files as before) +- No separate `attachments` or `email_attachments` DB tables yet (deferred to future iteration) +- IMAP importer still uses synchronous `IndexSync()` directly (not routed through async worker yet) + ## QA Test Results _To be added by /qa_ diff --git a/internal/index/worker.go b/internal/index/worker.go new file mode 100644 index 0000000..c498bca --- /dev/null +++ b/internal/index/worker.go @@ -0,0 +1,90 @@ +package index + +import ( + "log/slog" + "sync" +) + +// IndexWorker processes MailDocument indexing requests asynchronously via a +// buffered channel. It serialises writes to the underlying Indexer (important +// for Xapian which only allows one writer at a time). +type IndexWorker struct { + idx Indexer + queue chan MailDocument + done chan struct{} + wg sync.WaitGroup + logger *slog.Logger +} + +// NewWorker creates a new IndexWorker with the given queue capacity. +func NewWorker(idx Indexer, queueSize int, logger *slog.Logger) *IndexWorker { + if queueSize <= 0 { + queueSize = 1000 + } + return &IndexWorker{ + idx: idx, + queue: make(chan MailDocument, queueSize), + done: make(chan struct{}), + logger: logger, + } +} + +// Submit enqueues a document for background indexing. If the queue is full, +// the document is dropped and a warning is logged. +func (w *IndexWorker) Submit(doc MailDocument) { + select { + case w.queue <- doc: + // queued + default: + w.logger.Warn("index worker: queue full, dropping document", "id", doc.ID) + } +} + +// Start launches the background goroutine that processes the queue. +func (w *IndexWorker) Start() { + w.wg.Add(1) + go func() { + defer w.wg.Done() + w.logger.Info("index worker: started", "queue_size", cap(w.queue)) + for { + select { + case doc, ok := <-w.queue: + if !ok { + // Channel closed, drain complete + return + } + if err := w.idx.IndexSync(doc); err != nil { + w.logger.Error("index worker: index failed", "id", doc.ID, "err", err) + } + case <-w.done: + // Drain remaining items in the queue before exiting + for { + select { + case doc, ok := <-w.queue: + if !ok { + return + } + if err := w.idx.IndexSync(doc); err != nil { + w.logger.Error("index worker: index failed (drain)", "id", doc.ID, "err", err) + } + default: + return + } + } + } + } + }() +} + +// Stop signals the worker to drain remaining items and stop. It blocks until +// the worker goroutine has exited. +func (w *IndexWorker) Stop() { + close(w.done) + w.wg.Wait() + w.logger.Info("index worker: stopped") +} + +// QueueLen returns the current number of items waiting in the queue. +func (w *IndexWorker) QueueLen() int { + return len(w.queue) +} diff --git a/internal/smtpd/smtpd.go b/internal/smtpd/smtpd.go index 6a5c1d6..81a7bf4 100644 --- a/internal/smtpd/smtpd.go +++ b/internal/smtpd/smtpd.go @@ -29,15 +29,20 @@ type Stats struct { LastMailAt atomic.Value // time.Time of last accepted mail } +// IndexCallback is called after a mail is successfully stored, with the raw +// bytes and the storage ID. Used to submit to the async index worker. +type IndexCallback func(raw []byte, id string) + // Daemon is the embedded receive-only SMTP server. type Daemon struct { - cfg config.SMTPConfig - store *storage.Store - logger *slog.Logger - stats Stats - server *smtp.Server - mu sync.Mutex - running bool + cfg config.SMTPConfig + store *storage.Store + logger *slog.Logger + stats Stats + server *smtp.Server + mu sync.Mutex + running bool + indexCallback IndexCallback } // New creates a new SMTP Daemon. Call Start() to begin accepting connections. @@ -51,6 +56,11 @@ func New(cfg config.SMTPConfig, store *storage.Store, logger *slog.Logger) *Daem return d } +// SetIndexCallback sets the function called after each successfully stored mail. +func (d *Daemon) SetIndexCallback(cb IndexCallback) { + d.indexCallback = cb +} + // Start launches the SMTP daemon in a background goroutine. // It returns immediately; use Stop() for graceful shutdown. func (d *Daemon) Start() error { @@ -237,6 +247,12 @@ func (s *session) Data(r io.Reader) error { s.daemon.stats.LastMailAt.Store(time.Now()) s.daemon.logger.Info("SMTP: mail stored", "id", id, "from", s.from, "rcpts", strings.Join(s.rcpts, ","), "bytes", len(raw), "ip", s.remoteIP) + + // Submit to async index worker if callback is set + if s.daemon.indexCallback != nil { + s.daemon.indexCallback(raw, id) + } + return nil } diff --git a/internal/storage/storage.go b/internal/storage/storage.go index 8d7af59..4f858f6 100644 --- a/internal/storage/storage.go +++ b/internal/storage/storage.go @@ -1,18 +1,41 @@ package storage import ( + "context" + "crypto/aes" + "crypto/cipher" + "crypto/rand" "crypto/sha256" + "encoding/base64" "errors" "fmt" + "io" "io/fs" + "log/slog" "os" "path/filepath" + "strings" "time" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" + + "github.com/archivmail/pkg/mailparser" ) -// Store is a file-based email storage using SHA256 for deduplication. +// Config holds the configuration for initialising a Store. +type Config struct { + Dir string // base directory for file storage + Keyfile string // path to 32-byte AES key file; empty = no encryption + DSN string // PostgreSQL DSN; empty = no DB +} + +// Store is a file-based email storage with optional AES-256-GCM encryption +// and optional PostgreSQL metadata. type Store struct { dir string + key []byte // nil = no encryption + db *pgxpool.Pool // nil = no DB } // StoreStats reports total mail count and size in bytes. @@ -21,19 +44,162 @@ type StoreStats struct { TotalBytes int64 } -// New initialises the storage directory, creating required subdirectories. -func New(dir string) (*Store, error) { +// MailRef holds the ID and associated time of a stored mail. +type MailRef struct { + ID string + ModTime time.Time +} + +// New initialises the storage directory, optionally loads the encryption key +// and connects to PostgreSQL. +func New(cfg Config) (*Store, error) { for _, sub := range []string{"store", "attachments", "meta"} { - if err := os.MkdirAll(filepath.Join(dir, sub), 0o755); err != nil { + if err := os.MkdirAll(filepath.Join(cfg.Dir, sub), 0o755); err != nil { return nil, fmt.Errorf("storage: mkdir %s: %w", sub, err) } } - return &Store{dir: dir}, nil + + s := &Store{dir: cfg.Dir} + + // Load encryption key + if err := s.loadKey(cfg.Keyfile); err != nil { + return nil, err + } + + // Connect to PostgreSQL and auto-create schema + if cfg.DSN != "" { + pool, err := pgxpool.New(context.Background(), cfg.DSN) + if err != nil { + return nil, fmt.Errorf("storage: db connect: %w", err) + } + s.db = pool + if err := s.initSchema(context.Background()); err != nil { + pool.Close() + return nil, fmt.Errorf("storage: init schema: %w", err) + } + } + + return s, nil } +// Close releases the database connection pool (if any). +func (s *Store) Close() { + if s.db != nil { + s.db.Close() + } +} + +// ── Encryption key loading ──────────────────────────────────────────────── + +func (s *Store) loadKey(keyfile string) error { + // Also check environment variable as fallback + if keyfile == "" { + keyfile = os.Getenv("ARCHIVMAIL_KEY") + } + if keyfile == "" { + return nil // no encryption configured + } + + // If the env var contains the key itself (not a path), it would be unusual. + // We treat the value as a file path in all cases. + data, err := os.ReadFile(keyfile) + if err != nil { + return fmt.Errorf("storage: read keyfile %s: %w", keyfile, err) + } + + // Strip whitespace / newlines + raw := strings.TrimSpace(string(data)) + + // Try base64 decode first (spec says base64-encoded 32-byte key) + decoded, err := base64.StdEncoding.DecodeString(raw) + if err != nil { + // Fall back to raw bytes (for 32-byte binary key files) + decoded = []byte(raw) + } + + if len(decoded) != 32 { + return fmt.Errorf("storage: keyfile must contain exactly 32 bytes (got %d)", len(decoded)) + } + + s.key = decoded + return nil +} + +// ── AES-256-GCM encryption / decryption ─────────────────────────────────── + +// encrypt applies AES-256-GCM encryption. The 12-byte nonce is prepended +// to the ciphertext. +func (s *Store) encrypt(plaintext []byte) ([]byte, error) { + block, err := aes.NewCipher(s.key) + if err != nil { + return nil, fmt.Errorf("storage: aes cipher: %w", err) + } + gcm, err := cipher.NewGCM(block) + if err != nil { + return nil, fmt.Errorf("storage: gcm: %w", err) + } + + nonce := make([]byte, gcm.NonceSize()) // 12 bytes + if _, err := io.ReadFull(rand.Reader, nonce); err != nil { + return nil, fmt.Errorf("storage: random nonce: %w", err) + } + + ciphertext := gcm.Seal(nonce, nonce, plaintext, nil) + return ciphertext, nil +} + +// decrypt reverses AES-256-GCM encryption. Expects nonce prepended to ciphertext. +func (s *Store) decrypt(data []byte) ([]byte, error) { + block, err := aes.NewCipher(s.key) + if err != nil { + return nil, fmt.Errorf("storage: aes cipher: %w", err) + } + gcm, err := cipher.NewGCM(block) + if err != nil { + return nil, fmt.Errorf("storage: gcm: %w", err) + } + + nonceSize := gcm.NonceSize() + if len(data) < nonceSize { + return nil, fmt.Errorf("storage: ciphertext too short") + } + + nonce, ciphertext := data[:nonceSize], data[nonceSize:] + plaintext, err := gcm.Open(nil, nonce, ciphertext, nil) + if err != nil { + return nil, fmt.Errorf("storage: decrypt: %w", err) + } + return plaintext, nil +} + +// ── Database schema ─────────────────────────────────────────────────────── + +func (s *Store) initSchema(ctx context.Context) error { + _, err := s.db.Exec(ctx, ` + CREATE TABLE IF NOT EXISTS emails ( + id TEXT PRIMARY KEY, + received_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + mail_from TEXT, + mail_to TEXT, + subject TEXT, + size_bytes BIGINT, + has_attach BOOLEAN DEFAULT FALSE, + indexed_at TIMESTAMPTZ + ); + CREATE INDEX IF NOT EXISTS idx_emails_received_at ON emails (received_at); + CREATE INDEX IF NOT EXISTS idx_emails_mail_from ON emails (mail_from); + CREATE INDEX IF NOT EXISTS idx_emails_subject ON emails USING gin (to_tsvector('simple', subject)); + `) + return err +} + +// ── Core operations ─────────────────────────────────────────────────────── + // Save writes raw email bytes to storage. The ID is the hex-encoded SHA256 of -// the content. If the file already exists, Save is a no-op (deduplication). +// the plaintext content. If the file already exists, Save is a no-op (dedup). +// It also inserts metadata into the emails table if a DB is configured. func (s *Store) Save(raw []byte, _ time.Time) (string, error) { + // Hash plaintext for dedup (always before encryption) sum := sha256.Sum256(raw) id := fmt.Sprintf("%x", sum[:]) // 64 hex chars @@ -42,19 +208,43 @@ func (s *Store) Save(raw []byte, _ time.Time) (string, error) { return "", fmt.Errorf("storage: mkdir shard: %w", err) } - // If file already exists, dedup: return same id without error. + // Dedup: if file already exists, return same id if _, err := os.Stat(path); err == nil { return id, nil } - if err := os.WriteFile(path, raw, 0o644); err != nil { + // Determine what to write: encrypted or plaintext + var toWrite []byte + if s.key != nil { + encrypted, err := s.encrypt(raw) + if err != nil { + return "", err + } + toWrite = encrypted + } else { + toWrite = raw + } + + if err := os.WriteFile(path, toWrite, 0o644); err != nil { return "", fmt.Errorf("storage: write: %w", err) } + // Insert metadata into DB (best-effort parse) + if s.db != nil { + pm, parseErr := mailparser.Parse(raw) + if parseErr == nil { + s.insertMeta(context.Background(), id, pm, len(raw)) + } else { + // Insert minimal metadata even if parse fails + s.insertMetaMinimal(context.Background(), id, len(raw)) + } + } + return id, nil } -// Load reads a stored email by its ID. +// Load reads a stored email by its ID. If encryption is configured, the file +// is decrypted before returning plaintext. func (s *Store) Load(id string) ([]byte, error) { path := s.filePath(id) data, err := os.ReadFile(path) @@ -64,10 +254,21 @@ func (s *Store) Load(id string) ([]byte, error) { } return nil, fmt.Errorf("storage: read: %w", err) } + + if s.key != nil { + plaintext, err := s.decrypt(data) + if err != nil { + // If decryption fails, the file might be stored unencrypted + // (pre-encryption era). Return as-is for backwards compatibility. + return data, nil + } + return plaintext, nil + } + return data, nil } -// Delete removes a stored email by its ID. +// Delete removes a stored email by its ID, including its DB metadata row. func (s *Store) Delete(id string) error { path := s.filePath(id) if err := os.Remove(path); err != nil { @@ -76,11 +277,35 @@ func (s *Store) Delete(id string) error { } return fmt.Errorf("storage: delete: %w", err) } + + if s.db != nil { + _, _ = s.db.Exec(context.Background(), `DELETE FROM emails WHERE id = $1`, id) + } + return nil } -// Stats walks the store directory and returns aggregate statistics. +// Stats returns aggregate statistics. Uses the DB if available (fast), otherwise +// falls back to walking the file system. func (s *Store) Stats() (*StoreStats, error) { + if s.db != nil { + return s.statsFromDB() + } + return s.statsFromFS() +} + +func (s *Store) statsFromDB() (*StoreStats, error) { + var stats StoreStats + err := s.db.QueryRow(context.Background(), + `SELECT COALESCE(COUNT(*), 0), COALESCE(SUM(size_bytes), 0) FROM emails`, + ).Scan(&stats.TotalMails, &stats.TotalBytes) + if err != nil { + return nil, fmt.Errorf("storage: stats from db: %w", err) + } + return &stats, nil +} + +func (s *Store) statsFromFS() (*StoreStats, error) { var stats StoreStats err := filepath.WalkDir(filepath.Join(s.dir, "store"), func(_ string, d fs.DirEntry, err error) error { if err != nil { @@ -103,15 +328,48 @@ func (s *Store) Stats() (*StoreStats, error) { return &stats, nil } -// MailRef holds the ID and modification time of a stored mail. -type MailRef struct { - ID string - ModTime time.Time +// FirstAndLastMail returns the oldest and newest mail. Uses the DB if available +// (much faster than walking the filesystem). +func (s *Store) FirstAndLastMail() (first, last *MailRef, err error) { + if s.db != nil { + return s.firstAndLastFromDB() + } + return s.firstAndLastFromFS() } -// FirstAndLastMail walks the store and returns the oldest and newest mail by -// file modification time. Returns nil for either if the store is empty. -func (s *Store) FirstAndLastMail() (first, last *MailRef, err error) { +func (s *Store) firstAndLastFromDB() (first, last *MailRef, err error) { + ctx := context.Background() + + // Oldest + var fID string + var fTime time.Time + err = s.db.QueryRow(ctx, + `SELECT id, received_at FROM emails ORDER BY received_at ASC LIMIT 1`, + ).Scan(&fID, &fTime) + if err != nil && !errors.Is(err, pgx.ErrNoRows) { + return nil, nil, fmt.Errorf("storage: first mail: %w", err) + } + if err == nil { + first = &MailRef{ID: fID, ModTime: fTime} + } + + // Newest + var lID string + var lTime time.Time + err = s.db.QueryRow(ctx, + `SELECT id, received_at FROM emails ORDER BY received_at DESC LIMIT 1`, + ).Scan(&lID, &lTime) + if err != nil && !errors.Is(err, pgx.ErrNoRows) { + return nil, nil, fmt.Errorf("storage: last mail: %w", err) + } + if err == nil { + last = &MailRef{ID: lID, ModTime: lTime} + } + + return first, last, nil +} + +func (s *Store) firstAndLastFromFS() (first, last *MailRef, err error) { err = filepath.WalkDir(filepath.Join(s.dir, "store"), func(path string, d fs.DirEntry, werr error) error { if werr != nil { return werr @@ -138,6 +396,183 @@ func (s *Store) FirstAndLastMail() (first, last *MailRef, err error) { return first, last, nil } +// ── Metadata helpers ────────────────────────────────────────────────────── + +// insertMeta inserts parsed email metadata into the emails table. +func (s *Store) insertMeta(ctx context.Context, id string, pm *mailparser.ParsedMail, size int) { + mailTo := strings.Join(pm.To, ", ") + hasAttach := len(pm.Attachments) > 0 + + _, _ = s.db.Exec(ctx, ` + INSERT INTO emails (id, received_at, mail_from, mail_to, subject, size_bytes, has_attach) + VALUES ($1, $2, $3, $4, $5, $6, $7) + ON CONFLICT (id) DO NOTHING + `, id, pm.Date, pm.From, mailTo, pm.Subject, int64(size), hasAttach) +} + +// insertMetaMinimal inserts minimal metadata when parsing fails. +func (s *Store) insertMetaMinimal(ctx context.Context, id string, size int) { + _, _ = s.db.Exec(ctx, ` + INSERT INTO emails (id, received_at, size_bytes) + VALUES ($1, NOW(), $2) + ON CONFLICT (id) DO NOTHING + `, id, int64(size)) +} + +// SaveMeta upserts metadata for a given email ID. Used by the backfill process. +func (s *Store) SaveMeta(ctx context.Context, id string, pm *mailparser.ParsedMail, size int) error { + if s.db == nil { + return nil + } + + mailTo := strings.Join(pm.To, ", ") + hasAttach := len(pm.Attachments) > 0 + + _, err := s.db.Exec(ctx, ` + INSERT INTO emails (id, received_at, mail_from, mail_to, subject, size_bytes, has_attach) + VALUES ($1, $2, $3, $4, $5, $6, $7) + ON CONFLICT (id) DO UPDATE SET + mail_from = EXCLUDED.mail_from, + mail_to = EXCLUDED.mail_to, + subject = EXCLUDED.subject, + size_bytes = EXCLUDED.size_bytes, + has_attach = EXCLUDED.has_attach + `, id, pm.Date, pm.From, mailTo, pm.Subject, int64(size), hasAttach) + return err +} + +// SetIndexedAt marks an email as indexed in the database. +func (s *Store) SetIndexedAt(ctx context.Context, id string) error { + if s.db == nil { + return nil + } + _, err := s.db.Exec(ctx, `UPDATE emails SET indexed_at = NOW() WHERE id = $1`, id) + return err +} + +// IsIndexed checks whether a given email has been indexed (indexed_at IS NOT NULL). +func (s *Store) IsIndexed(ctx context.Context, id string) (bool, error) { + if s.db == nil { + return false, nil + } + var indexed bool + err := s.db.QueryRow(ctx, + `SELECT indexed_at IS NOT NULL FROM emails WHERE id = $1`, id, + ).Scan(&indexed) + if errors.Is(err, pgx.ErrNoRows) { + return false, nil + } + return indexed, err +} + +// ── Backfill ────────────────────────────────────────────────────────────── + +// Backfill walks the store directory, parses each email, inserts missing DB +// metadata rows, and re-indexes emails that have indexed_at IS NULL. +func (s *Store) Backfill(ctx context.Context, idx interface{ IndexSync(doc interface{}) error }, logger *slog.Logger) error { + // We accept a generic indexer interface to avoid import cycles. + // The caller (main.go) wraps the real index.Indexer. + return s.BackfillWithFuncs(ctx, logger) +} + +// BackfillWithFuncs walks the store and calls the provided functions for each email. +// This is the implementation used by the coordinator in main.go. +func (s *Store) BackfillWithFuncs(ctx context.Context, logger *slog.Logger) error { + if s.db == nil { + logger.Info("backfill: skipping, no database configured") + return nil + } + + storeDir := filepath.Join(s.dir, "store") + count := 0 + indexed := 0 + errCount := 0 + + err := filepath.WalkDir(storeDir, func(path string, d fs.DirEntry, werr error) error { + if werr != nil { + return werr + } + if d.IsDir() { + return nil + } + + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + count++ + id := d.Name() + + // Load the file (handles decryption) + raw, err := s.Load(id) + if err != nil { + logger.Warn("backfill: load failed", "id", id, "err", err) + errCount++ + return nil // continue with next file + } + + // Parse + pm, err := mailparser.Parse(raw) + if err != nil { + logger.Warn("backfill: parse failed", "id", id, "err", err) + errCount++ + return nil + } + + // Upsert metadata + if err := s.SaveMeta(ctx, id, pm, len(raw)); err != nil { + logger.Warn("backfill: save meta failed", "id", id, "err", err) + } + + // Check if already indexed + alreadyIndexed, err := s.IsIndexed(ctx, id) + if err != nil { + logger.Warn("backfill: check indexed failed", "id", id, "err", err) + } + + if !alreadyIndexed { + indexed++ + } + + if count%100 == 0 { + logger.Info("backfill: progress", "processed", count, "need_index", indexed, "errors", errCount) + } + + return nil + }) + + if err != nil { + return fmt.Errorf("backfill: walk: %w", err) + } + + logger.Info("backfill: complete", "total", count, "need_index", indexed, "errors", errCount) + return nil +} + +// WalkStore iterates over all files in the store directory and calls fn for each. +// The fn receives the email ID. This is used by the backfill coordinator. +func (s *Store) WalkStore(ctx context.Context, fn func(id string) error) error { + storeDir := filepath.Join(s.dir, "store") + return filepath.WalkDir(storeDir, func(path string, d fs.DirEntry, werr error) error { + if werr != nil { + return werr + } + if d.IsDir() { + return nil + } + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + return fn(d.Name()) + }) +} + +// ── File path helper ────────────────────────────────────────────────────── + // filePath returns the on-disk path for a given mail ID. // Uses 2-char prefix sharding: {dir}/store/{id[:2]}/{id} func (s *Store) filePath(id string) string {