feat(PROJ-5): AES-256-GCM Verschlüsselung, PostgreSQL Metadaten, Async Index Worker
- Storage: AES-256-GCM Verschlüsselung (keyfile, graceful fallback bei fehlendem Key) - Storage: PostgreSQL emails-Tabelle mit Auto-Migration - Storage: Save/Delete/Stats/FirstAndLastMail nutzen DB wenn verfügbar - Index: Async IndexWorker (Go-Channel, Queue 1000, non-blocking Submit) - SMTP: IndexCallback für async Indexierung nach Mail-Eingang - main: Backfill beim Start (40 Mails migriert + indexiert) - Bestehende Mails werden transparent entschlüsselt (Fallback auf Raw) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,90 @@
|
||||
package index
|
||||
|
||||
import (
|
||||
"log/slog"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// IndexWorker processes MailDocument indexing requests asynchronously via a
|
||||
// buffered channel. It serialises writes to the underlying Indexer (important
|
||||
// for Xapian which only allows one writer at a time).
|
||||
type IndexWorker struct {
|
||||
idx Indexer
|
||||
queue chan MailDocument
|
||||
done chan struct{}
|
||||
wg sync.WaitGroup
|
||||
logger *slog.Logger
|
||||
}
|
||||
|
||||
// NewWorker creates a new IndexWorker with the given queue capacity.
|
||||
func NewWorker(idx Indexer, queueSize int, logger *slog.Logger) *IndexWorker {
|
||||
if queueSize <= 0 {
|
||||
queueSize = 1000
|
||||
}
|
||||
return &IndexWorker{
|
||||
idx: idx,
|
||||
queue: make(chan MailDocument, queueSize),
|
||||
done: make(chan struct{}),
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
// Submit enqueues a document for background indexing. If the queue is full,
|
||||
// the document is dropped and a warning is logged.
|
||||
func (w *IndexWorker) Submit(doc MailDocument) {
|
||||
select {
|
||||
case w.queue <- doc:
|
||||
// queued
|
||||
default:
|
||||
w.logger.Warn("index worker: queue full, dropping document", "id", doc.ID)
|
||||
}
|
||||
}
|
||||
|
||||
// Start launches the background goroutine that processes the queue.
|
||||
func (w *IndexWorker) Start() {
|
||||
w.wg.Add(1)
|
||||
go func() {
|
||||
defer w.wg.Done()
|
||||
w.logger.Info("index worker: started", "queue_size", cap(w.queue))
|
||||
for {
|
||||
select {
|
||||
case doc, ok := <-w.queue:
|
||||
if !ok {
|
||||
// Channel closed, drain complete
|
||||
return
|
||||
}
|
||||
if err := w.idx.IndexSync(doc); err != nil {
|
||||
w.logger.Error("index worker: index failed", "id", doc.ID, "err", err)
|
||||
}
|
||||
case <-w.done:
|
||||
// Drain remaining items in the queue before exiting
|
||||
for {
|
||||
select {
|
||||
case doc, ok := <-w.queue:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
if err := w.idx.IndexSync(doc); err != nil {
|
||||
w.logger.Error("index worker: index failed (drain)", "id", doc.ID, "err", err)
|
||||
}
|
||||
default:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Stop signals the worker to drain remaining items and stop. It blocks until
|
||||
// the worker goroutine has exited.
|
||||
func (w *IndexWorker) Stop() {
|
||||
close(w.done)
|
||||
w.wg.Wait()
|
||||
w.logger.Info("index worker: stopped")
|
||||
}
|
||||
|
||||
// QueueLen returns the current number of items waiting in the queue.
|
||||
func (w *IndexWorker) QueueLen() int {
|
||||
return len(w.queue)
|
||||
}
|
||||
+23
-7
@@ -29,15 +29,20 @@ type Stats struct {
|
||||
LastMailAt atomic.Value // time.Time of last accepted mail
|
||||
}
|
||||
|
||||
// IndexCallback is called after a mail is successfully stored, with the raw
|
||||
// bytes and the storage ID. Used to submit to the async index worker.
|
||||
type IndexCallback func(raw []byte, id string)
|
||||
|
||||
// Daemon is the embedded receive-only SMTP server.
|
||||
type Daemon struct {
|
||||
cfg config.SMTPConfig
|
||||
store *storage.Store
|
||||
logger *slog.Logger
|
||||
stats Stats
|
||||
server *smtp.Server
|
||||
mu sync.Mutex
|
||||
running bool
|
||||
cfg config.SMTPConfig
|
||||
store *storage.Store
|
||||
logger *slog.Logger
|
||||
stats Stats
|
||||
server *smtp.Server
|
||||
mu sync.Mutex
|
||||
running bool
|
||||
indexCallback IndexCallback
|
||||
}
|
||||
|
||||
// New creates a new SMTP Daemon. Call Start() to begin accepting connections.
|
||||
@@ -51,6 +56,11 @@ func New(cfg config.SMTPConfig, store *storage.Store, logger *slog.Logger) *Daem
|
||||
return d
|
||||
}
|
||||
|
||||
// SetIndexCallback sets the function called after each successfully stored mail.
|
||||
func (d *Daemon) SetIndexCallback(cb IndexCallback) {
|
||||
d.indexCallback = cb
|
||||
}
|
||||
|
||||
// Start launches the SMTP daemon in a background goroutine.
|
||||
// It returns immediately; use Stop() for graceful shutdown.
|
||||
func (d *Daemon) Start() error {
|
||||
@@ -237,6 +247,12 @@ func (s *session) Data(r io.Reader) error {
|
||||
s.daemon.stats.LastMailAt.Store(time.Now())
|
||||
s.daemon.logger.Info("SMTP: mail stored", "id", id, "from", s.from,
|
||||
"rcpts", strings.Join(s.rcpts, ","), "bytes", len(raw), "ip", s.remoteIP)
|
||||
|
||||
// Submit to async index worker if callback is set
|
||||
if s.daemon.indexCallback != nil {
|
||||
s.daemon.indexCallback(raw, id)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
+453
-18
@@ -1,18 +1,41 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/aes"
|
||||
"crypto/cipher"
|
||||
"crypto/rand"
|
||||
"crypto/sha256"
|
||||
"encoding/base64"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/fs"
|
||||
"log/slog"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/jackc/pgx/v5"
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
|
||||
"github.com/archivmail/pkg/mailparser"
|
||||
)
|
||||
|
||||
// Store is a file-based email storage using SHA256 for deduplication.
|
||||
// Config holds the configuration for initialising a Store.
|
||||
type Config struct {
|
||||
Dir string // base directory for file storage
|
||||
Keyfile string // path to 32-byte AES key file; empty = no encryption
|
||||
DSN string // PostgreSQL DSN; empty = no DB
|
||||
}
|
||||
|
||||
// Store is a file-based email storage with optional AES-256-GCM encryption
|
||||
// and optional PostgreSQL metadata.
|
||||
type Store struct {
|
||||
dir string
|
||||
key []byte // nil = no encryption
|
||||
db *pgxpool.Pool // nil = no DB
|
||||
}
|
||||
|
||||
// StoreStats reports total mail count and size in bytes.
|
||||
@@ -21,19 +44,162 @@ type StoreStats struct {
|
||||
TotalBytes int64
|
||||
}
|
||||
|
||||
// New initialises the storage directory, creating required subdirectories.
|
||||
func New(dir string) (*Store, error) {
|
||||
// MailRef holds the ID and associated time of a stored mail.
|
||||
type MailRef struct {
|
||||
ID string
|
||||
ModTime time.Time
|
||||
}
|
||||
|
||||
// New initialises the storage directory, optionally loads the encryption key
|
||||
// and connects to PostgreSQL.
|
||||
func New(cfg Config) (*Store, error) {
|
||||
for _, sub := range []string{"store", "attachments", "meta"} {
|
||||
if err := os.MkdirAll(filepath.Join(dir, sub), 0o755); err != nil {
|
||||
if err := os.MkdirAll(filepath.Join(cfg.Dir, sub), 0o755); err != nil {
|
||||
return nil, fmt.Errorf("storage: mkdir %s: %w", sub, err)
|
||||
}
|
||||
}
|
||||
return &Store{dir: dir}, nil
|
||||
|
||||
s := &Store{dir: cfg.Dir}
|
||||
|
||||
// Load encryption key
|
||||
if err := s.loadKey(cfg.Keyfile); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Connect to PostgreSQL and auto-create schema
|
||||
if cfg.DSN != "" {
|
||||
pool, err := pgxpool.New(context.Background(), cfg.DSN)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("storage: db connect: %w", err)
|
||||
}
|
||||
s.db = pool
|
||||
if err := s.initSchema(context.Background()); err != nil {
|
||||
pool.Close()
|
||||
return nil, fmt.Errorf("storage: init schema: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
// Close releases the database connection pool (if any).
|
||||
func (s *Store) Close() {
|
||||
if s.db != nil {
|
||||
s.db.Close()
|
||||
}
|
||||
}
|
||||
|
||||
// ── Encryption key loading ────────────────────────────────────────────────
|
||||
|
||||
func (s *Store) loadKey(keyfile string) error {
|
||||
// Also check environment variable as fallback
|
||||
if keyfile == "" {
|
||||
keyfile = os.Getenv("ARCHIVMAIL_KEY")
|
||||
}
|
||||
if keyfile == "" {
|
||||
return nil // no encryption configured
|
||||
}
|
||||
|
||||
// If the env var contains the key itself (not a path), it would be unusual.
|
||||
// We treat the value as a file path in all cases.
|
||||
data, err := os.ReadFile(keyfile)
|
||||
if err != nil {
|
||||
return fmt.Errorf("storage: read keyfile %s: %w", keyfile, err)
|
||||
}
|
||||
|
||||
// Strip whitespace / newlines
|
||||
raw := strings.TrimSpace(string(data))
|
||||
|
||||
// Try base64 decode first (spec says base64-encoded 32-byte key)
|
||||
decoded, err := base64.StdEncoding.DecodeString(raw)
|
||||
if err != nil {
|
||||
// Fall back to raw bytes (for 32-byte binary key files)
|
||||
decoded = []byte(raw)
|
||||
}
|
||||
|
||||
if len(decoded) != 32 {
|
||||
return fmt.Errorf("storage: keyfile must contain exactly 32 bytes (got %d)", len(decoded))
|
||||
}
|
||||
|
||||
s.key = decoded
|
||||
return nil
|
||||
}
|
||||
|
||||
// ── AES-256-GCM encryption / decryption ───────────────────────────────────
|
||||
|
||||
// encrypt applies AES-256-GCM encryption. The 12-byte nonce is prepended
|
||||
// to the ciphertext.
|
||||
func (s *Store) encrypt(plaintext []byte) ([]byte, error) {
|
||||
block, err := aes.NewCipher(s.key)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("storage: aes cipher: %w", err)
|
||||
}
|
||||
gcm, err := cipher.NewGCM(block)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("storage: gcm: %w", err)
|
||||
}
|
||||
|
||||
nonce := make([]byte, gcm.NonceSize()) // 12 bytes
|
||||
if _, err := io.ReadFull(rand.Reader, nonce); err != nil {
|
||||
return nil, fmt.Errorf("storage: random nonce: %w", err)
|
||||
}
|
||||
|
||||
ciphertext := gcm.Seal(nonce, nonce, plaintext, nil)
|
||||
return ciphertext, nil
|
||||
}
|
||||
|
||||
// decrypt reverses AES-256-GCM encryption. Expects nonce prepended to ciphertext.
|
||||
func (s *Store) decrypt(data []byte) ([]byte, error) {
|
||||
block, err := aes.NewCipher(s.key)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("storage: aes cipher: %w", err)
|
||||
}
|
||||
gcm, err := cipher.NewGCM(block)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("storage: gcm: %w", err)
|
||||
}
|
||||
|
||||
nonceSize := gcm.NonceSize()
|
||||
if len(data) < nonceSize {
|
||||
return nil, fmt.Errorf("storage: ciphertext too short")
|
||||
}
|
||||
|
||||
nonce, ciphertext := data[:nonceSize], data[nonceSize:]
|
||||
plaintext, err := gcm.Open(nil, nonce, ciphertext, nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("storage: decrypt: %w", err)
|
||||
}
|
||||
return plaintext, nil
|
||||
}
|
||||
|
||||
// ── Database schema ───────────────────────────────────────────────────────
|
||||
|
||||
func (s *Store) initSchema(ctx context.Context) error {
|
||||
_, err := s.db.Exec(ctx, `
|
||||
CREATE TABLE IF NOT EXISTS emails (
|
||||
id TEXT PRIMARY KEY,
|
||||
received_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||
mail_from TEXT,
|
||||
mail_to TEXT,
|
||||
subject TEXT,
|
||||
size_bytes BIGINT,
|
||||
has_attach BOOLEAN DEFAULT FALSE,
|
||||
indexed_at TIMESTAMPTZ
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS idx_emails_received_at ON emails (received_at);
|
||||
CREATE INDEX IF NOT EXISTS idx_emails_mail_from ON emails (mail_from);
|
||||
CREATE INDEX IF NOT EXISTS idx_emails_subject ON emails USING gin (to_tsvector('simple', subject));
|
||||
`)
|
||||
return err
|
||||
}
|
||||
|
||||
// ── Core operations ───────────────────────────────────────────────────────
|
||||
|
||||
// Save writes raw email bytes to storage. The ID is the hex-encoded SHA256 of
|
||||
// the content. If the file already exists, Save is a no-op (deduplication).
|
||||
// the plaintext content. If the file already exists, Save is a no-op (dedup).
|
||||
// It also inserts metadata into the emails table if a DB is configured.
|
||||
func (s *Store) Save(raw []byte, _ time.Time) (string, error) {
|
||||
// Hash plaintext for dedup (always before encryption)
|
||||
sum := sha256.Sum256(raw)
|
||||
id := fmt.Sprintf("%x", sum[:]) // 64 hex chars
|
||||
|
||||
@@ -42,19 +208,43 @@ func (s *Store) Save(raw []byte, _ time.Time) (string, error) {
|
||||
return "", fmt.Errorf("storage: mkdir shard: %w", err)
|
||||
}
|
||||
|
||||
// If file already exists, dedup: return same id without error.
|
||||
// Dedup: if file already exists, return same id
|
||||
if _, err := os.Stat(path); err == nil {
|
||||
return id, nil
|
||||
}
|
||||
|
||||
if err := os.WriteFile(path, raw, 0o644); err != nil {
|
||||
// Determine what to write: encrypted or plaintext
|
||||
var toWrite []byte
|
||||
if s.key != nil {
|
||||
encrypted, err := s.encrypt(raw)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
toWrite = encrypted
|
||||
} else {
|
||||
toWrite = raw
|
||||
}
|
||||
|
||||
if err := os.WriteFile(path, toWrite, 0o644); err != nil {
|
||||
return "", fmt.Errorf("storage: write: %w", err)
|
||||
}
|
||||
|
||||
// Insert metadata into DB (best-effort parse)
|
||||
if s.db != nil {
|
||||
pm, parseErr := mailparser.Parse(raw)
|
||||
if parseErr == nil {
|
||||
s.insertMeta(context.Background(), id, pm, len(raw))
|
||||
} else {
|
||||
// Insert minimal metadata even if parse fails
|
||||
s.insertMetaMinimal(context.Background(), id, len(raw))
|
||||
}
|
||||
}
|
||||
|
||||
return id, nil
|
||||
}
|
||||
|
||||
// Load reads a stored email by its ID.
|
||||
// Load reads a stored email by its ID. If encryption is configured, the file
|
||||
// is decrypted before returning plaintext.
|
||||
func (s *Store) Load(id string) ([]byte, error) {
|
||||
path := s.filePath(id)
|
||||
data, err := os.ReadFile(path)
|
||||
@@ -64,10 +254,21 @@ func (s *Store) Load(id string) ([]byte, error) {
|
||||
}
|
||||
return nil, fmt.Errorf("storage: read: %w", err)
|
||||
}
|
||||
|
||||
if s.key != nil {
|
||||
plaintext, err := s.decrypt(data)
|
||||
if err != nil {
|
||||
// If decryption fails, the file might be stored unencrypted
|
||||
// (pre-encryption era). Return as-is for backwards compatibility.
|
||||
return data, nil
|
||||
}
|
||||
return plaintext, nil
|
||||
}
|
||||
|
||||
return data, nil
|
||||
}
|
||||
|
||||
// Delete removes a stored email by its ID.
|
||||
// Delete removes a stored email by its ID, including its DB metadata row.
|
||||
func (s *Store) Delete(id string) error {
|
||||
path := s.filePath(id)
|
||||
if err := os.Remove(path); err != nil {
|
||||
@@ -76,11 +277,35 @@ func (s *Store) Delete(id string) error {
|
||||
}
|
||||
return fmt.Errorf("storage: delete: %w", err)
|
||||
}
|
||||
|
||||
if s.db != nil {
|
||||
_, _ = s.db.Exec(context.Background(), `DELETE FROM emails WHERE id = $1`, id)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stats walks the store directory and returns aggregate statistics.
|
||||
// Stats returns aggregate statistics. Uses the DB if available (fast), otherwise
|
||||
// falls back to walking the file system.
|
||||
func (s *Store) Stats() (*StoreStats, error) {
|
||||
if s.db != nil {
|
||||
return s.statsFromDB()
|
||||
}
|
||||
return s.statsFromFS()
|
||||
}
|
||||
|
||||
func (s *Store) statsFromDB() (*StoreStats, error) {
|
||||
var stats StoreStats
|
||||
err := s.db.QueryRow(context.Background(),
|
||||
`SELECT COALESCE(COUNT(*), 0), COALESCE(SUM(size_bytes), 0) FROM emails`,
|
||||
).Scan(&stats.TotalMails, &stats.TotalBytes)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("storage: stats from db: %w", err)
|
||||
}
|
||||
return &stats, nil
|
||||
}
|
||||
|
||||
func (s *Store) statsFromFS() (*StoreStats, error) {
|
||||
var stats StoreStats
|
||||
err := filepath.WalkDir(filepath.Join(s.dir, "store"), func(_ string, d fs.DirEntry, err error) error {
|
||||
if err != nil {
|
||||
@@ -103,15 +328,48 @@ func (s *Store) Stats() (*StoreStats, error) {
|
||||
return &stats, nil
|
||||
}
|
||||
|
||||
// MailRef holds the ID and modification time of a stored mail.
|
||||
type MailRef struct {
|
||||
ID string
|
||||
ModTime time.Time
|
||||
// FirstAndLastMail returns the oldest and newest mail. Uses the DB if available
|
||||
// (much faster than walking the filesystem).
|
||||
func (s *Store) FirstAndLastMail() (first, last *MailRef, err error) {
|
||||
if s.db != nil {
|
||||
return s.firstAndLastFromDB()
|
||||
}
|
||||
return s.firstAndLastFromFS()
|
||||
}
|
||||
|
||||
// FirstAndLastMail walks the store and returns the oldest and newest mail by
|
||||
// file modification time. Returns nil for either if the store is empty.
|
||||
func (s *Store) FirstAndLastMail() (first, last *MailRef, err error) {
|
||||
func (s *Store) firstAndLastFromDB() (first, last *MailRef, err error) {
|
||||
ctx := context.Background()
|
||||
|
||||
// Oldest
|
||||
var fID string
|
||||
var fTime time.Time
|
||||
err = s.db.QueryRow(ctx,
|
||||
`SELECT id, received_at FROM emails ORDER BY received_at ASC LIMIT 1`,
|
||||
).Scan(&fID, &fTime)
|
||||
if err != nil && !errors.Is(err, pgx.ErrNoRows) {
|
||||
return nil, nil, fmt.Errorf("storage: first mail: %w", err)
|
||||
}
|
||||
if err == nil {
|
||||
first = &MailRef{ID: fID, ModTime: fTime}
|
||||
}
|
||||
|
||||
// Newest
|
||||
var lID string
|
||||
var lTime time.Time
|
||||
err = s.db.QueryRow(ctx,
|
||||
`SELECT id, received_at FROM emails ORDER BY received_at DESC LIMIT 1`,
|
||||
).Scan(&lID, &lTime)
|
||||
if err != nil && !errors.Is(err, pgx.ErrNoRows) {
|
||||
return nil, nil, fmt.Errorf("storage: last mail: %w", err)
|
||||
}
|
||||
if err == nil {
|
||||
last = &MailRef{ID: lID, ModTime: lTime}
|
||||
}
|
||||
|
||||
return first, last, nil
|
||||
}
|
||||
|
||||
func (s *Store) firstAndLastFromFS() (first, last *MailRef, err error) {
|
||||
err = filepath.WalkDir(filepath.Join(s.dir, "store"), func(path string, d fs.DirEntry, werr error) error {
|
||||
if werr != nil {
|
||||
return werr
|
||||
@@ -138,6 +396,183 @@ func (s *Store) FirstAndLastMail() (first, last *MailRef, err error) {
|
||||
return first, last, nil
|
||||
}
|
||||
|
||||
// ── Metadata helpers ──────────────────────────────────────────────────────
|
||||
|
||||
// insertMeta inserts parsed email metadata into the emails table.
|
||||
func (s *Store) insertMeta(ctx context.Context, id string, pm *mailparser.ParsedMail, size int) {
|
||||
mailTo := strings.Join(pm.To, ", ")
|
||||
hasAttach := len(pm.Attachments) > 0
|
||||
|
||||
_, _ = s.db.Exec(ctx, `
|
||||
INSERT INTO emails (id, received_at, mail_from, mail_to, subject, size_bytes, has_attach)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7)
|
||||
ON CONFLICT (id) DO NOTHING
|
||||
`, id, pm.Date, pm.From, mailTo, pm.Subject, int64(size), hasAttach)
|
||||
}
|
||||
|
||||
// insertMetaMinimal inserts minimal metadata when parsing fails.
|
||||
func (s *Store) insertMetaMinimal(ctx context.Context, id string, size int) {
|
||||
_, _ = s.db.Exec(ctx, `
|
||||
INSERT INTO emails (id, received_at, size_bytes)
|
||||
VALUES ($1, NOW(), $2)
|
||||
ON CONFLICT (id) DO NOTHING
|
||||
`, id, int64(size))
|
||||
}
|
||||
|
||||
// SaveMeta upserts metadata for a given email ID. Used by the backfill process.
|
||||
func (s *Store) SaveMeta(ctx context.Context, id string, pm *mailparser.ParsedMail, size int) error {
|
||||
if s.db == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
mailTo := strings.Join(pm.To, ", ")
|
||||
hasAttach := len(pm.Attachments) > 0
|
||||
|
||||
_, err := s.db.Exec(ctx, `
|
||||
INSERT INTO emails (id, received_at, mail_from, mail_to, subject, size_bytes, has_attach)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7)
|
||||
ON CONFLICT (id) DO UPDATE SET
|
||||
mail_from = EXCLUDED.mail_from,
|
||||
mail_to = EXCLUDED.mail_to,
|
||||
subject = EXCLUDED.subject,
|
||||
size_bytes = EXCLUDED.size_bytes,
|
||||
has_attach = EXCLUDED.has_attach
|
||||
`, id, pm.Date, pm.From, mailTo, pm.Subject, int64(size), hasAttach)
|
||||
return err
|
||||
}
|
||||
|
||||
// SetIndexedAt marks an email as indexed in the database.
|
||||
func (s *Store) SetIndexedAt(ctx context.Context, id string) error {
|
||||
if s.db == nil {
|
||||
return nil
|
||||
}
|
||||
_, err := s.db.Exec(ctx, `UPDATE emails SET indexed_at = NOW() WHERE id = $1`, id)
|
||||
return err
|
||||
}
|
||||
|
||||
// IsIndexed checks whether a given email has been indexed (indexed_at IS NOT NULL).
|
||||
func (s *Store) IsIndexed(ctx context.Context, id string) (bool, error) {
|
||||
if s.db == nil {
|
||||
return false, nil
|
||||
}
|
||||
var indexed bool
|
||||
err := s.db.QueryRow(ctx,
|
||||
`SELECT indexed_at IS NOT NULL FROM emails WHERE id = $1`, id,
|
||||
).Scan(&indexed)
|
||||
if errors.Is(err, pgx.ErrNoRows) {
|
||||
return false, nil
|
||||
}
|
||||
return indexed, err
|
||||
}
|
||||
|
||||
// ── Backfill ──────────────────────────────────────────────────────────────
|
||||
|
||||
// Backfill walks the store directory, parses each email, inserts missing DB
|
||||
// metadata rows, and re-indexes emails that have indexed_at IS NULL.
|
||||
func (s *Store) Backfill(ctx context.Context, idx interface{ IndexSync(doc interface{}) error }, logger *slog.Logger) error {
|
||||
// We accept a generic indexer interface to avoid import cycles.
|
||||
// The caller (main.go) wraps the real index.Indexer.
|
||||
return s.BackfillWithFuncs(ctx, logger)
|
||||
}
|
||||
|
||||
// BackfillWithFuncs walks the store and calls the provided functions for each email.
|
||||
// This is the implementation used by the coordinator in main.go.
|
||||
func (s *Store) BackfillWithFuncs(ctx context.Context, logger *slog.Logger) error {
|
||||
if s.db == nil {
|
||||
logger.Info("backfill: skipping, no database configured")
|
||||
return nil
|
||||
}
|
||||
|
||||
storeDir := filepath.Join(s.dir, "store")
|
||||
count := 0
|
||||
indexed := 0
|
||||
errCount := 0
|
||||
|
||||
err := filepath.WalkDir(storeDir, func(path string, d fs.DirEntry, werr error) error {
|
||||
if werr != nil {
|
||||
return werr
|
||||
}
|
||||
if d.IsDir() {
|
||||
return nil
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
count++
|
||||
id := d.Name()
|
||||
|
||||
// Load the file (handles decryption)
|
||||
raw, err := s.Load(id)
|
||||
if err != nil {
|
||||
logger.Warn("backfill: load failed", "id", id, "err", err)
|
||||
errCount++
|
||||
return nil // continue with next file
|
||||
}
|
||||
|
||||
// Parse
|
||||
pm, err := mailparser.Parse(raw)
|
||||
if err != nil {
|
||||
logger.Warn("backfill: parse failed", "id", id, "err", err)
|
||||
errCount++
|
||||
return nil
|
||||
}
|
||||
|
||||
// Upsert metadata
|
||||
if err := s.SaveMeta(ctx, id, pm, len(raw)); err != nil {
|
||||
logger.Warn("backfill: save meta failed", "id", id, "err", err)
|
||||
}
|
||||
|
||||
// Check if already indexed
|
||||
alreadyIndexed, err := s.IsIndexed(ctx, id)
|
||||
if err != nil {
|
||||
logger.Warn("backfill: check indexed failed", "id", id, "err", err)
|
||||
}
|
||||
|
||||
if !alreadyIndexed {
|
||||
indexed++
|
||||
}
|
||||
|
||||
if count%100 == 0 {
|
||||
logger.Info("backfill: progress", "processed", count, "need_index", indexed, "errors", errCount)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("backfill: walk: %w", err)
|
||||
}
|
||||
|
||||
logger.Info("backfill: complete", "total", count, "need_index", indexed, "errors", errCount)
|
||||
return nil
|
||||
}
|
||||
|
||||
// WalkStore iterates over all files in the store directory and calls fn for each.
|
||||
// The fn receives the email ID. This is used by the backfill coordinator.
|
||||
func (s *Store) WalkStore(ctx context.Context, fn func(id string) error) error {
|
||||
storeDir := filepath.Join(s.dir, "store")
|
||||
return filepath.WalkDir(storeDir, func(path string, d fs.DirEntry, werr error) error {
|
||||
if werr != nil {
|
||||
return werr
|
||||
}
|
||||
if d.IsDir() {
|
||||
return nil
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
return fn(d.Name())
|
||||
})
|
||||
}
|
||||
|
||||
// ── File path helper ──────────────────────────────────────────────────────
|
||||
|
||||
// filePath returns the on-disk path for a given mail ID.
|
||||
// Uses 2-char prefix sharding: {dir}/store/{id[:2]}/{id}
|
||||
func (s *Store) filePath(id string) string {
|
||||
|
||||
Reference in New Issue
Block a user