472ba6a087
- users.list_page_size (Default 25), PATCH /api/auth/preferences, Whitelist 25/50/100/200, Wert in login/me-Response - Settings-UI mit Select, /search nutzt gespeicherte Seitengröße - /api/search page_size serverseitig auf max. 500 gecappt fix(PROJ-46): login_attempts-Migration nutzte s.db statt s.pool (Backend kompilierte nicht) feat(PROJ-50): DSGVO-Löschersuchen Backend (dsgvo_requests, Handler, cc_addr/bcc_addr Indexerweiterung) — noch nicht QA'd/deployed
1407 lines
42 KiB
Go
1407 lines
42 KiB
Go
package storage
|
|
|
|
import (
|
|
"context"
|
|
"crypto/aes"
|
|
"crypto/cipher"
|
|
"crypto/rand"
|
|
"crypto/sha256"
|
|
"encoding/base64"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"io/fs"
|
|
"log/slog"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/jackc/pgx/v5"
|
|
"github.com/jackc/pgx/v5/pgxpool"
|
|
|
|
"archivmail/pkg/mailparser"
|
|
)
|
|
|
|
// ErrRetentionLock is returned when a mail cannot be deleted because its retention period has not yet expired.
|
|
var ErrRetentionLock = errors.New("storage: mail is within retention period")
|
|
|
|
// Config holds the configuration for initialising a Store.
|
|
type Config struct {
|
|
Dir string // base directory for file storage
|
|
Keyfile string // path to 32-byte AES key file; empty = no encryption
|
|
DSN string // PostgreSQL DSN; empty = no DB
|
|
RetentionDays int // 0 = no lock; >0 = GoBD retention period in days
|
|
MinRetentionDays int // PROJ-51: global minimum retention floor (0 = none)
|
|
CompressEnabled bool // gzip-compress emails and attachments before encryption
|
|
}
|
|
|
|
// Store is a file-based email storage with optional AES-256-GCM encryption
|
|
// and optional PostgreSQL metadata.
|
|
type Store struct {
|
|
dir string
|
|
key []byte // nil = no encryption
|
|
db *pgxpool.Pool // nil = no DB
|
|
retentionDays int // 0 = no lock
|
|
minRetentionDays int // PROJ-51: global minimum retention floor (0 = none)
|
|
compressEnabled bool // gzip before encryption
|
|
}
|
|
|
|
// StoreStats reports total mail count and size in bytes.
|
|
type StoreStats struct {
|
|
TotalMails int64
|
|
TotalBytes int64
|
|
}
|
|
|
|
// MailRef holds the ID and associated time of a stored mail.
|
|
type MailRef struct {
|
|
ID string
|
|
ModTime time.Time
|
|
}
|
|
|
|
// MailWithUID holds the ID and stable IMAP UID of a stored mail.
|
|
type MailWithUID struct {
|
|
ID string
|
|
UID int64
|
|
}
|
|
|
|
// New initialises the storage directory, optionally loads the encryption key
|
|
// and connects to PostgreSQL.
|
|
func New(cfg Config) (*Store, error) {
|
|
for _, sub := range []string{"store", "attachments", "meta"} {
|
|
if err := os.MkdirAll(filepath.Join(cfg.Dir, sub), 0o755); err != nil {
|
|
return nil, fmt.Errorf("storage: mkdir %s: %w", sub, err)
|
|
}
|
|
}
|
|
|
|
s := &Store{dir: cfg.Dir, retentionDays: cfg.RetentionDays, minRetentionDays: cfg.MinRetentionDays, compressEnabled: cfg.CompressEnabled}
|
|
|
|
// Load encryption key
|
|
if err := s.loadKey(cfg.Keyfile); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Connect to PostgreSQL and auto-create schema
|
|
if cfg.DSN != "" {
|
|
pool, err := pgxpool.New(context.Background(), cfg.DSN)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("storage: db connect: %w", err)
|
|
}
|
|
s.db = pool
|
|
if err := s.initSchema(context.Background()); err != nil {
|
|
pool.Close()
|
|
return nil, fmt.Errorf("storage: init schema: %w", err)
|
|
}
|
|
ctx := context.Background()
|
|
_, _ = s.db.Exec(ctx, `ALTER TABLE emails ADD COLUMN IF NOT EXISTS verify_ok BOOLEAN`)
|
|
_, _ = s.db.Exec(ctx, `ALTER TABLE emails ADD COLUMN IF NOT EXISTS verified_at TIMESTAMPTZ`)
|
|
_, _ = s.db.Exec(ctx, `ALTER TABLE emails ADD COLUMN IF NOT EXISTS message_id TEXT`)
|
|
_, _ = s.db.Exec(ctx, `CREATE UNIQUE INDEX IF NOT EXISTS idx_emails_message_id ON emails (message_id) WHERE message_id IS NOT NULL`)
|
|
// PROJ-34: GoBD retention lock
|
|
_, _ = s.db.Exec(ctx, `ALTER TABLE emails ADD COLUMN IF NOT EXISTS retain_until TIMESTAMPTZ`)
|
|
_, _ = s.db.Exec(ctx, `CREATE INDEX IF NOT EXISTS idx_emails_retain_until ON emails (retain_until) WHERE retain_until IS NOT NULL`)
|
|
// PROJ-33: Stable IMAP UIDs
|
|
_, _ = s.db.Exec(ctx, `ALTER TABLE emails ADD COLUMN IF NOT EXISTS uid BIGSERIAL`)
|
|
_, _ = s.db.Exec(ctx, `CREATE UNIQUE INDEX IF NOT EXISTS idx_emails_uid ON emails (uid)`)
|
|
// 2.0: storage_objects FK on emails
|
|
_, _ = s.db.Exec(ctx, `ALTER TABLE emails ADD COLUMN IF NOT EXISTS storage_id BIGINT REFERENCES storage_objects(id)`)
|
|
// PROJ-51: archiving_rules table + retain_until_source column
|
|
s.initRetentionRulesSchema(ctx)
|
|
// PROJ-50: DSGVO Löschersuchen
|
|
s.initDSGVOSchema(ctx)
|
|
}
|
|
|
|
return s, nil
|
|
}
|
|
|
|
// EncryptionEnabled reports whether at-rest AES-256-GCM encryption is active,
|
|
// i.e. whether a valid 32-byte key was loaded (PROJ-49). Returns false if no
|
|
// keyfile was configured.
|
|
func (s *Store) EncryptionEnabled() bool {
|
|
return len(s.key) == 32
|
|
}
|
|
|
|
// Close releases the database connection pool (if any).
|
|
func (s *Store) Close() {
|
|
if s.db != nil {
|
|
s.db.Close()
|
|
}
|
|
}
|
|
|
|
// ── Encryption key loading ────────────────────────────────────────────────
|
|
|
|
func (s *Store) loadKey(keyfile string) error {
|
|
// Also check environment variable as fallback
|
|
if keyfile == "" {
|
|
keyfile = os.Getenv("ARCHIVMAIL_KEY")
|
|
}
|
|
if keyfile == "" {
|
|
return nil // no encryption configured
|
|
}
|
|
|
|
// If the env var contains the key itself (not a path), it would be unusual.
|
|
// We treat the value as a file path in all cases.
|
|
//
|
|
// PROJ-49: A configured-but-broken keyfile (unreadable / wrong size) must NOT
|
|
// hard-fail the service. The spec mandates full backwards compatibility:
|
|
// the service starts unencrypted (s.key stays nil) and the startup warning in
|
|
// cmd/archivmail/encryption_status.go surfaces the concrete reason. Hard-failing
|
|
// here would make the documented edge-case warnings unreachable.
|
|
data, err := os.ReadFile(keyfile)
|
|
if err != nil {
|
|
return nil // unreadable → run unencrypted; warnEncryptionStatus() logs the reason
|
|
}
|
|
|
|
// Strip whitespace / newlines
|
|
raw := strings.TrimSpace(string(data))
|
|
|
|
// Try base64 decode first (spec says base64-encoded 32-byte key)
|
|
decoded, derr := base64.StdEncoding.DecodeString(raw)
|
|
if derr != nil {
|
|
// Fall back to raw bytes (for 32-byte binary key files)
|
|
decoded = []byte(raw)
|
|
}
|
|
|
|
if len(decoded) != 32 {
|
|
return nil // wrong size → run unencrypted; warnEncryptionStatus() logs the reason
|
|
}
|
|
|
|
s.key = decoded
|
|
return nil
|
|
}
|
|
|
|
// ── AES-256-GCM encryption / decryption ───────────────────────────────────
|
|
|
|
// encrypt applies AES-256-GCM encryption. The 12-byte nonce is prepended
|
|
// to the ciphertext.
|
|
func (s *Store) encrypt(plaintext []byte) ([]byte, error) {
|
|
block, err := aes.NewCipher(s.key)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("storage: aes cipher: %w", err)
|
|
}
|
|
gcm, err := cipher.NewGCM(block)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("storage: gcm: %w", err)
|
|
}
|
|
|
|
nonce := make([]byte, gcm.NonceSize()) // 12 bytes
|
|
if _, err := io.ReadFull(rand.Reader, nonce); err != nil {
|
|
return nil, fmt.Errorf("storage: random nonce: %w", err)
|
|
}
|
|
|
|
ciphertext := gcm.Seal(nonce, nonce, plaintext, nil)
|
|
return ciphertext, nil
|
|
}
|
|
|
|
// decrypt reverses AES-256-GCM encryption. Expects nonce prepended to ciphertext.
|
|
func (s *Store) decrypt(data []byte) ([]byte, error) {
|
|
block, err := aes.NewCipher(s.key)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("storage: aes cipher: %w", err)
|
|
}
|
|
gcm, err := cipher.NewGCM(block)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("storage: gcm: %w", err)
|
|
}
|
|
|
|
nonceSize := gcm.NonceSize()
|
|
if len(data) < nonceSize {
|
|
return nil, fmt.Errorf("storage: ciphertext too short")
|
|
}
|
|
|
|
nonce, ciphertext := data[:nonceSize], data[nonceSize:]
|
|
plaintext, err := gcm.Open(nil, nonce, ciphertext, nil)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("storage: decrypt: %w", err)
|
|
}
|
|
return plaintext, nil
|
|
}
|
|
|
|
// ── Database schema ───────────────────────────────────────────────────────
|
|
|
|
func (s *Store) initSchema(ctx context.Context) error {
|
|
// storage_objects must exist before emails (FK dependency)
|
|
_, err := s.db.Exec(ctx, `
|
|
CREATE TABLE IF NOT EXISTS storage_objects (
|
|
id BIGSERIAL PRIMARY KEY,
|
|
storage_type TEXT NOT NULL DEFAULT 'filesystem',
|
|
path TEXT NOT NULL,
|
|
compression TEXT NOT NULL DEFAULT 'none',
|
|
size_original BIGINT,
|
|
size_compressed BIGINT,
|
|
checksum CHAR(64),
|
|
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
|
);
|
|
CREATE TABLE IF NOT EXISTS attachments (
|
|
id BIGSERIAL PRIMARY KEY,
|
|
filename TEXT,
|
|
mime_type TEXT,
|
|
size_bytes BIGINT,
|
|
hash CHAR(64) UNIQUE NOT NULL,
|
|
storage_id BIGINT REFERENCES storage_objects(id),
|
|
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_attachments_hash ON attachments (hash);
|
|
`)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
_, err = s.db.Exec(ctx, `
|
|
CREATE TABLE IF NOT EXISTS emails (
|
|
id TEXT PRIMARY KEY,
|
|
received_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
|
mail_from TEXT,
|
|
mail_to TEXT,
|
|
subject TEXT,
|
|
size_bytes BIGINT,
|
|
has_attach BOOLEAN DEFAULT FALSE,
|
|
indexed_at TIMESTAMPTZ
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_emails_received_at ON emails (received_at);
|
|
CREATE INDEX IF NOT EXISTS idx_emails_mail_from ON emails (mail_from);
|
|
CREATE INDEX IF NOT EXISTS idx_emails_subject ON emails USING gin (to_tsvector('simple', subject));
|
|
`)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// Phase 2b migrations: tenant isolation
|
|
_, err = s.db.Exec(ctx, `
|
|
ALTER TABLE emails ADD COLUMN IF NOT EXISTS tenant_id BIGINT;
|
|
CREATE INDEX IF NOT EXISTS idx_emails_tenant ON emails (tenant_id);
|
|
CREATE TABLE IF NOT EXISTS email_refs (
|
|
id BIGSERIAL PRIMARY KEY,
|
|
email_id TEXT NOT NULL REFERENCES emails(id),
|
|
tenant_id BIGINT NOT NULL,
|
|
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
|
UNIQUE(email_id, tenant_id)
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_email_refs_tenant ON email_refs (tenant_id);
|
|
CREATE INDEX IF NOT EXISTS idx_email_refs_email ON email_refs (email_id);
|
|
CREATE TABLE IF NOT EXISTS email_attachments (
|
|
email_id TEXT NOT NULL REFERENCES emails(id),
|
|
attachment_id BIGINT NOT NULL REFERENCES attachments(id),
|
|
PRIMARY KEY (email_id, attachment_id)
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_email_attachments_email ON email_attachments (email_id);
|
|
`)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// PROJ-38: Mail-Threading
|
|
_, err = s.db.Exec(ctx, `
|
|
ALTER TABLE emails ADD COLUMN IF NOT EXISTS thread_id TEXT;
|
|
ALTER TABLE emails ADD COLUMN IF NOT EXISTS in_reply_to TEXT;
|
|
CREATE INDEX IF NOT EXISTS idx_emails_thread ON emails (thread_id);
|
|
`)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// PROJ-13: API keys for external CRM integration
|
|
_, err = s.db.Exec(ctx, `
|
|
CREATE TABLE IF NOT EXISTS api_keys (
|
|
id BIGSERIAL PRIMARY KEY,
|
|
tenant_id BIGINT NOT NULL,
|
|
name TEXT NOT NULL,
|
|
token_hash TEXT NOT NULL UNIQUE,
|
|
role TEXT NOT NULL DEFAULT 'user',
|
|
active BOOLEAN NOT NULL DEFAULT TRUE,
|
|
rate_limit INT NOT NULL DEFAULT 60,
|
|
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
|
last_used_at TIMESTAMPTZ
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_api_keys_token_hash ON api_keys(token_hash);
|
|
`)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// PROJ-42: Gespeicherte Suchanfragen
|
|
_, err = s.db.Exec(ctx, `
|
|
CREATE TABLE IF NOT EXISTS saved_searches (
|
|
id BIGSERIAL PRIMARY KEY,
|
|
user_id BIGINT NOT NULL,
|
|
tenant_id BIGINT NOT NULL,
|
|
name TEXT NOT NULL,
|
|
query_json JSONB NOT NULL,
|
|
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_saved_searches_user ON saved_searches(user_id, tenant_id);
|
|
`)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// PROJ-35: OCR-Status pro Mail (idempotent)
|
|
_, err = s.db.Exec(ctx, `
|
|
ALTER TABLE emails ADD COLUMN IF NOT EXISTS ocr_status TEXT DEFAULT 'pending';
|
|
CREATE INDEX IF NOT EXISTS idx_emails_ocr_status ON emails (ocr_status) WHERE ocr_status = 'pending';
|
|
`)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// PROJ-44: gecachte Anzahl der extrahierten OCR-Zeichen — vermeidet einen
|
|
// Manticore-Roundtrip auf der Mail-Detail-Seite. Idempotent wie ocr_status.
|
|
_, err = s.db.Exec(ctx, `
|
|
ALTER TABLE emails ADD COLUMN IF NOT EXISTS ocr_chars BIGINT DEFAULT 0;
|
|
`)
|
|
return err
|
|
}
|
|
|
|
// ── Core operations ───────────────────────────────────────────────────────
|
|
|
|
// Save writes raw email bytes to storage. The ID is the hex-encoded SHA256 of
|
|
// the plaintext content. If the file already exists, Save ensures an email_ref
|
|
// exists for the tenant (cross-tenant dedup: one file, many refs).
|
|
// tenantID may be nil for system-level ingestion without tenant assignment.
|
|
//
|
|
// Dedup order:
|
|
// 1. Parse mail → extract Message-ID
|
|
// 2. If Message-ID present: lookup in DB → if found, return existing ID (no disk I/O)
|
|
// 3. Fallback: SHA-256-based dedup (existing behaviour)
|
|
func (s *Store) Save(ctx context.Context, raw []byte, _ time.Time, tenantID *int64) (string, error) {
|
|
// Step 1: parse for Message-ID dedup (best-effort)
|
|
var messageID string
|
|
pm, parseErr := mailparser.Parse(raw)
|
|
if parseErr == nil {
|
|
messageID = pm.MessageID
|
|
}
|
|
|
|
// Step 2: Message-ID lookup — avoids disk I/O for cross-channel duplicates
|
|
if s.db != nil && messageID != "" {
|
|
existingID, err := s.lookupByMessageID(ctx, messageID)
|
|
if err != nil {
|
|
return "", fmt.Errorf("storage: message-id lookup: %w", err)
|
|
}
|
|
if existingID != "" {
|
|
// Already archived — ensure tenant ref exists and return
|
|
if tenantID != nil {
|
|
_, _ = s.db.Exec(ctx, `
|
|
INSERT INTO email_refs (email_id, tenant_id)
|
|
VALUES ($1, $2)
|
|
ON CONFLICT (email_id, tenant_id) DO NOTHING
|
|
`, existingID, *tenantID)
|
|
}
|
|
return existingID, nil
|
|
}
|
|
}
|
|
|
|
// Step 3: Quota check (PROJ-29) — reject before writing
|
|
if err := s.CheckQuota(ctx, tenantID); err != nil {
|
|
return "", err
|
|
}
|
|
|
|
// Step 4: SHA-256-based dedup (fallback / no Message-ID)
|
|
sum := sha256.Sum256(raw)
|
|
id := fmt.Sprintf("%x", sum[:]) // 64 hex chars
|
|
|
|
path := s.filePath(id)
|
|
if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil {
|
|
return "", fmt.Errorf("storage: mkdir shard: %w", err)
|
|
}
|
|
|
|
fileExists := false
|
|
if _, err := os.Stat(path); err == nil {
|
|
fileExists = true
|
|
}
|
|
|
|
if !fileExists {
|
|
// Compress before encryption (if enabled)
|
|
toStore := raw
|
|
compression := "none"
|
|
if s.compressEnabled {
|
|
compressed, cerr := compressGzip(raw)
|
|
if cerr == nil && len(compressed) < len(raw) {
|
|
toStore = compressed
|
|
compression = "gzip"
|
|
}
|
|
}
|
|
|
|
// Encrypt (if key configured)
|
|
var toWrite []byte
|
|
if s.key != nil {
|
|
encrypted, err := s.encrypt(toStore)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
toWrite = encrypted
|
|
} else {
|
|
toWrite = toStore
|
|
}
|
|
|
|
if err := os.WriteFile(path, toWrite, 0o644); err != nil {
|
|
return "", fmt.Errorf("storage: write: %w", err)
|
|
}
|
|
|
|
// Insert metadata into DB
|
|
if s.db != nil {
|
|
// Register in storage_objects
|
|
var storageID *int64
|
|
var sid int64
|
|
if soErr := s.db.QueryRow(ctx, `
|
|
INSERT INTO storage_objects (storage_type, path, compression, size_original, size_compressed, checksum)
|
|
VALUES ('filesystem', $1, $2, $3, $4, $5)
|
|
RETURNING id
|
|
`, path, compression, int64(len(raw)), int64(len(toWrite)), id).Scan(&sid); soErr == nil {
|
|
storageID = &sid
|
|
}
|
|
|
|
if parseErr == nil {
|
|
// PROJ-38: resolve thread before inserting
|
|
if pm.InReplyTo != "" || len(pm.References) > 0 {
|
|
pm.MessageID = pm.MessageID // no-op; thread resolved inside insertMeta
|
|
}
|
|
threadID := s.resolveThreadID(ctx, pm)
|
|
if err := s.insertMeta(ctx, id, pm, len(raw), tenantID, storageID, threadID); err != nil {
|
|
// Race: another goroutine inserted via Message-ID UNIQUE conflict.
|
|
// Resolve to the existing record's ID.
|
|
if messageID != "" {
|
|
if conflictID, lerr := s.lookupByMessageID(ctx, messageID); lerr == nil && conflictID != "" {
|
|
if tenantID != nil {
|
|
_, _ = s.db.Exec(ctx, `
|
|
INSERT INTO email_refs (email_id, tenant_id)
|
|
VALUES ($1, $2)
|
|
ON CONFLICT (email_id, tenant_id) DO NOTHING
|
|
`, conflictID, *tenantID)
|
|
}
|
|
return conflictID, nil
|
|
}
|
|
}
|
|
// Non-conflict insert error: log but continue (file is written, metadata can be backfilled)
|
|
}
|
|
} else {
|
|
s.insertMetaMinimal(ctx, id, len(raw), tenantID, storageID)
|
|
}
|
|
// Sprint 2: deduplicate and store attachments
|
|
if parseErr == nil {
|
|
_ = s.saveAttachments(ctx, id, pm)
|
|
}
|
|
// PROJ-34 + PROJ-51: Set retention lock.
|
|
// Priority: matching archiving_rule with retention_days > tenant
|
|
// default > global default, then raised by the global minimum floor.
|
|
// Behaviour without rules/min stays identical to PROJ-34 (tenant
|
|
// opt-in; global default only for tenant-less mails).
|
|
s.applyRetention(ctx, id, pm, tenantID)
|
|
}
|
|
}
|
|
|
|
// Ensure email_ref entry for this tenant (even if file already existed)
|
|
if s.db != nil && tenantID != nil {
|
|
_, _ = s.db.Exec(ctx, `
|
|
INSERT INTO email_refs (email_id, tenant_id)
|
|
VALUES ($1, $2)
|
|
ON CONFLICT (email_id, tenant_id) DO NOTHING
|
|
`, id, *tenantID)
|
|
}
|
|
|
|
return id, nil
|
|
}
|
|
|
|
// resolveThreadID determines the thread_id for a new mail by checking its
|
|
// References and In-Reply-To headers against existing mails in the DB.
|
|
// Returns the inherited thread_id, or the mail's own MessageID if it starts
|
|
// a new thread, or "" if no MessageID is available.
|
|
func (s *Store) resolveThreadID(ctx context.Context, pm *mailparser.ParsedMail) string {
|
|
if s.db == nil {
|
|
return pm.MessageID
|
|
}
|
|
// Check References in order (oldest first per RFC 5322 §3.6.4).
|
|
// The first reference that has a known thread_id wins.
|
|
for _, ref := range pm.References {
|
|
var tid string
|
|
err := s.db.QueryRow(ctx,
|
|
`SELECT COALESCE(thread_id, message_id) FROM emails WHERE message_id = $1 LIMIT 1`, ref,
|
|
).Scan(&tid)
|
|
if err == nil && tid != "" {
|
|
return tid
|
|
}
|
|
}
|
|
// Fall back to In-Reply-To
|
|
if pm.InReplyTo != "" {
|
|
var tid string
|
|
err := s.db.QueryRow(ctx,
|
|
`SELECT COALESCE(thread_id, message_id) FROM emails WHERE message_id = $1 LIMIT 1`, pm.InReplyTo,
|
|
).Scan(&tid)
|
|
if err == nil && tid != "" {
|
|
return tid
|
|
}
|
|
}
|
|
// New thread — use own Message-ID as root
|
|
return pm.MessageID
|
|
}
|
|
|
|
// ThreadInfo holds thread metadata for a single mail.
|
|
type ThreadInfo struct {
|
|
ThreadID string
|
|
ThreadSize int
|
|
}
|
|
|
|
// GetThreadInfo returns thread_id and thread size for a batch of email IDs.
|
|
// Only mails that belong to a thread (non-NULL thread_id) are enriched.
|
|
func (s *Store) GetThreadInfo(ctx context.Context, ids []string) (map[string]ThreadInfo, error) {
|
|
if s.db == nil || len(ids) == 0 {
|
|
return nil, nil
|
|
}
|
|
// Step 1: get thread_ids for the given email IDs
|
|
rows, err := s.db.Query(ctx,
|
|
`SELECT id, thread_id FROM emails WHERE id = ANY($1) AND thread_id IS NOT NULL`, ids,
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
threadByEmail := map[string]string{}
|
|
var threadIDs []string
|
|
for rows.Next() {
|
|
var emailID, threadID string
|
|
if err := rows.Scan(&emailID, &threadID); err == nil {
|
|
threadByEmail[emailID] = threadID
|
|
threadIDs = append(threadIDs, threadID)
|
|
}
|
|
}
|
|
rows.Close()
|
|
|
|
if len(threadIDs) == 0 {
|
|
return map[string]ThreadInfo{}, nil
|
|
}
|
|
|
|
// Step 2: count mails per thread
|
|
rows2, err := s.db.Query(ctx,
|
|
`SELECT thread_id, COUNT(*) FROM emails WHERE thread_id = ANY($1) GROUP BY thread_id`, threadIDs,
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows2.Close()
|
|
|
|
threadSize := map[string]int{}
|
|
for rows2.Next() {
|
|
var tid string
|
|
var cnt int
|
|
if err := rows2.Scan(&tid, &cnt); err == nil {
|
|
threadSize[tid] = cnt
|
|
}
|
|
}
|
|
|
|
result := make(map[string]ThreadInfo, len(ids))
|
|
for emailID, tid := range threadByEmail {
|
|
result[emailID] = ThreadInfo{ThreadID: tid, ThreadSize: threadSize[tid]}
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
// GetMailsByThread returns all email IDs in a thread, ordered by date ascending.
|
|
func (s *Store) GetMailsByThread(ctx context.Context, threadID string, tenantID *int64) ([]string, error) {
|
|
if s.db == nil {
|
|
return nil, nil
|
|
}
|
|
var rows pgx.Rows
|
|
var err error
|
|
if tenantID != nil {
|
|
rows, err = s.db.Query(ctx, `
|
|
SELECT e.id FROM emails e
|
|
JOIN email_refs r ON r.email_id = e.id AND r.tenant_id = $2
|
|
WHERE e.thread_id = $1
|
|
ORDER BY e.received_at ASC
|
|
`, threadID, *tenantID)
|
|
} else {
|
|
rows, err = s.db.Query(ctx, `
|
|
SELECT id FROM emails WHERE thread_id = $1 ORDER BY received_at ASC
|
|
`, threadID)
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
var ids []string
|
|
for rows.Next() {
|
|
var id string
|
|
if err := rows.Scan(&id); err == nil {
|
|
ids = append(ids, id)
|
|
}
|
|
}
|
|
return ids, nil
|
|
}
|
|
|
|
// lookupByMessageID returns the email ID for a given Message-ID header value,
|
|
// or an empty string if not found. Returns an error only on unexpected DB failures.
|
|
func (s *Store) lookupByMessageID(ctx context.Context, messageID string) (string, error) {
|
|
var id string
|
|
err := s.db.QueryRow(ctx,
|
|
`SELECT id FROM emails WHERE message_id = $1 LIMIT 1`, messageID,
|
|
).Scan(&id)
|
|
if errors.Is(err, pgx.ErrNoRows) {
|
|
return "", nil
|
|
}
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
return id, nil
|
|
}
|
|
|
|
// Load reads a stored email by its ID. If encryption is configured, the file
|
|
// is decrypted before returning plaintext.
|
|
func (s *Store) Load(id string) ([]byte, error) {
|
|
path := s.filePath(id)
|
|
data, err := os.ReadFile(path)
|
|
if err != nil {
|
|
if errors.Is(err, os.ErrNotExist) {
|
|
return nil, fmt.Errorf("storage: not found: %s", id)
|
|
}
|
|
return nil, fmt.Errorf("storage: read: %w", err)
|
|
}
|
|
|
|
if s.key != nil {
|
|
plaintext, err := s.decrypt(data)
|
|
if err != nil {
|
|
// Pre-encryption era: file stored unencrypted — try decompression anyway.
|
|
out, _ := maybeDecompress(data)
|
|
return out, nil
|
|
}
|
|
data = plaintext
|
|
}
|
|
|
|
return maybeDecompress(data)
|
|
}
|
|
|
|
// Delete removes a stored email by its ID, including its DB metadata row.
|
|
func (s *Store) Delete(id string) error {
|
|
// PROJ-34: Enforce retention lock before any disk or DB operation.
|
|
if s.db != nil {
|
|
ctx := context.Background()
|
|
var until *time.Time
|
|
_ = s.db.QueryRow(ctx, `SELECT retain_until FROM emails WHERE id=$1`, id).Scan(&until)
|
|
if until != nil && time.Now().Before(*until) {
|
|
return ErrRetentionLock
|
|
}
|
|
}
|
|
|
|
path := s.filePath(id)
|
|
if err := os.Remove(path); err != nil {
|
|
if errors.Is(err, os.ErrNotExist) {
|
|
return fmt.Errorf("storage: not found: %s", id)
|
|
}
|
|
return fmt.Errorf("storage: delete: %w", err)
|
|
}
|
|
|
|
if s.db != nil {
|
|
_, _ = s.db.Exec(context.Background(), `DELETE FROM emails WHERE id = $1`, id)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// GetReceivedAts returns the received_at timestamp for each mail ID in the
|
|
// provided slice. Used as a date fallback when the email Date header cannot
|
|
// be parsed. Missing IDs are silently omitted from the result map.
|
|
func (s *Store) GetReceivedAts(ctx context.Context, ids []string) map[string]time.Time {
|
|
if s.db == nil || len(ids) == 0 {
|
|
return nil
|
|
}
|
|
// pgx supports $1 = []string via ANY
|
|
rows, err := s.db.Query(ctx,
|
|
`SELECT id, received_at FROM emails WHERE id = ANY($1)`, ids)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
defer rows.Close()
|
|
result := make(map[string]time.Time, len(ids))
|
|
for rows.Next() {
|
|
var id string
|
|
var t time.Time
|
|
if err := rows.Scan(&id, &t); err == nil {
|
|
result[id] = t
|
|
}
|
|
}
|
|
return result
|
|
}
|
|
|
|
// Purge deletes all mails whose retain_until has passed.
|
|
// Returns the number of successfully deleted mails.
|
|
// Mails that fail to delete (e.g. file missing) are skipped silently.
|
|
func (s *Store) Purge(ctx context.Context) (int, error) {
|
|
if s.db == nil {
|
|
return 0, nil
|
|
}
|
|
rows, err := s.db.Query(ctx,
|
|
`SELECT id FROM emails WHERE retain_until IS NOT NULL AND retain_until < NOW()`)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("storage: purge query: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
var ids []string
|
|
for rows.Next() {
|
|
var id string
|
|
if err := rows.Scan(&id); err == nil {
|
|
ids = append(ids, id)
|
|
}
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return 0, fmt.Errorf("storage: purge rows: %w", err)
|
|
}
|
|
deleted := 0
|
|
for _, id := range ids {
|
|
if err := s.Delete(id); err == nil {
|
|
deleted++
|
|
}
|
|
}
|
|
return deleted, nil
|
|
}
|
|
|
|
// Stats returns aggregate statistics. Uses the DB if available (fast), otherwise
|
|
// falls back to walking the file system.
|
|
func (s *Store) Stats() (*StoreStats, error) {
|
|
if s.db != nil {
|
|
return s.statsFromDB()
|
|
}
|
|
return s.statsFromFS()
|
|
}
|
|
|
|
func (s *Store) statsFromDB() (*StoreStats, error) {
|
|
var stats StoreStats
|
|
err := s.db.QueryRow(context.Background(),
|
|
`SELECT COALESCE(COUNT(*), 0), COALESCE(SUM(size_bytes), 0) FROM emails`,
|
|
).Scan(&stats.TotalMails, &stats.TotalBytes)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("storage: stats from db: %w", err)
|
|
}
|
|
return &stats, nil
|
|
}
|
|
|
|
func (s *Store) statsFromFS() (*StoreStats, error) {
|
|
var stats StoreStats
|
|
err := filepath.WalkDir(filepath.Join(s.dir, "store"), func(_ string, d fs.DirEntry, err error) error {
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if d.IsDir() {
|
|
return nil
|
|
}
|
|
info, err := d.Info()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
stats.TotalMails++
|
|
stats.TotalBytes += info.Size()
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("storage: stats: %w", err)
|
|
}
|
|
return &stats, nil
|
|
}
|
|
|
|
// FirstAndLastMail returns the oldest and newest mail. Uses the DB if available
|
|
// (much faster than walking the filesystem).
|
|
func (s *Store) FirstAndLastMail() (first, last *MailRef, err error) {
|
|
if s.db != nil {
|
|
return s.firstAndLastFromDB()
|
|
}
|
|
return s.firstAndLastFromFS()
|
|
}
|
|
|
|
func (s *Store) firstAndLastFromDB() (first, last *MailRef, err error) {
|
|
ctx := context.Background()
|
|
|
|
// Oldest
|
|
var fID string
|
|
var fTime time.Time
|
|
err = s.db.QueryRow(ctx,
|
|
`SELECT id, received_at FROM emails ORDER BY received_at ASC LIMIT 1`,
|
|
).Scan(&fID, &fTime)
|
|
if err != nil && !errors.Is(err, pgx.ErrNoRows) {
|
|
return nil, nil, fmt.Errorf("storage: first mail: %w", err)
|
|
}
|
|
if err == nil {
|
|
first = &MailRef{ID: fID, ModTime: fTime}
|
|
}
|
|
|
|
// Newest
|
|
var lID string
|
|
var lTime time.Time
|
|
err = s.db.QueryRow(ctx,
|
|
`SELECT id, received_at FROM emails ORDER BY received_at DESC LIMIT 1`,
|
|
).Scan(&lID, &lTime)
|
|
if err != nil && !errors.Is(err, pgx.ErrNoRows) {
|
|
return nil, nil, fmt.Errorf("storage: last mail: %w", err)
|
|
}
|
|
if err == nil {
|
|
last = &MailRef{ID: lID, ModTime: lTime}
|
|
}
|
|
|
|
return first, last, nil
|
|
}
|
|
|
|
func (s *Store) firstAndLastFromFS() (first, last *MailRef, err error) {
|
|
err = filepath.WalkDir(filepath.Join(s.dir, "store"), func(path string, d fs.DirEntry, werr error) error {
|
|
if werr != nil {
|
|
return werr
|
|
}
|
|
if d.IsDir() {
|
|
return nil
|
|
}
|
|
info, err := d.Info()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
ref := &MailRef{ID: d.Name(), ModTime: info.ModTime()}
|
|
if first == nil || ref.ModTime.Before(first.ModTime) {
|
|
first = ref
|
|
}
|
|
if last == nil || ref.ModTime.After(last.ModTime) {
|
|
last = ref
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("storage: first/last: %w", err)
|
|
}
|
|
return first, last, nil
|
|
}
|
|
|
|
// ── Metadata helpers ──────────────────────────────────────────────────────
|
|
|
|
// insertMeta inserts parsed email metadata into the emails table.
|
|
// Returns an error so the caller can detect UNIQUE-constraint conflicts on message_id.
|
|
func (s *Store) insertMeta(ctx context.Context, id string, pm *mailparser.ParsedMail, size int, tenantID *int64, storageID *int64, threadID string) error {
|
|
mailTo := strings.Join(pm.To, ", ")
|
|
hasAttach := len(pm.Attachments) > 0
|
|
|
|
var msgID *string
|
|
if pm.MessageID != "" {
|
|
msgID = &pm.MessageID
|
|
}
|
|
var tid *string
|
|
if threadID != "" {
|
|
tid = &threadID
|
|
}
|
|
var inReplyTo *string
|
|
if pm.InReplyTo != "" {
|
|
inReplyTo = &pm.InReplyTo
|
|
}
|
|
|
|
receivedAt := pm.Date
|
|
if receivedAt.IsZero() {
|
|
receivedAt = time.Now()
|
|
}
|
|
_, err := s.db.Exec(ctx, `
|
|
INSERT INTO emails (id, received_at, mail_from, mail_to, subject, size_bytes, has_attach, tenant_id, message_id, storage_id, thread_id, in_reply_to)
|
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
|
|
ON CONFLICT (id) DO NOTHING
|
|
`, id, receivedAt, pm.From, mailTo, pm.Subject, int64(size), hasAttach, tenantID, msgID, storageID, tid, inReplyTo)
|
|
return err
|
|
}
|
|
|
|
// insertMetaMinimal inserts minimal metadata when parsing fails.
|
|
func (s *Store) insertMetaMinimal(ctx context.Context, id string, size int, tenantID *int64, storageID *int64) {
|
|
_, _ = s.db.Exec(ctx, `
|
|
INSERT INTO emails (id, received_at, size_bytes, tenant_id, storage_id)
|
|
VALUES ($1, NOW(), $2, $3, $4)
|
|
ON CONFLICT (id) DO NOTHING
|
|
`, id, int64(size), tenantID, storageID)
|
|
}
|
|
|
|
// SaveMeta upserts metadata for a given email ID. Used by the backfill process.
|
|
// Writes message_id when present so that backfill populates the UNIQUE index.
|
|
func (s *Store) SaveMeta(ctx context.Context, id string, pm *mailparser.ParsedMail, size int) error {
|
|
if s.db == nil {
|
|
return nil
|
|
}
|
|
|
|
mailTo := strings.Join(pm.To, ", ")
|
|
hasAttach := len(pm.Attachments) > 0
|
|
|
|
var msgID *string
|
|
if pm.MessageID != "" {
|
|
msgID = &pm.MessageID
|
|
}
|
|
|
|
metaDate := pm.Date
|
|
if metaDate.IsZero() {
|
|
metaDate = time.Now()
|
|
}
|
|
_, err := s.db.Exec(ctx, `
|
|
INSERT INTO emails (id, received_at, mail_from, mail_to, subject, size_bytes, has_attach, message_id)
|
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
|
|
ON CONFLICT (id) DO UPDATE SET
|
|
mail_from = EXCLUDED.mail_from,
|
|
mail_to = EXCLUDED.mail_to,
|
|
subject = EXCLUDED.subject,
|
|
size_bytes = EXCLUDED.size_bytes,
|
|
has_attach = EXCLUDED.has_attach,
|
|
message_id = COALESCE(emails.message_id, EXCLUDED.message_id)
|
|
`, id, metaDate, pm.From, mailTo, pm.Subject, int64(size), hasAttach, msgID)
|
|
return err
|
|
}
|
|
|
|
// SetIndexedAt marks an email as indexed in the database.
|
|
func (s *Store) SetIndexedAt(ctx context.Context, id string) error {
|
|
if s.db == nil {
|
|
return nil
|
|
}
|
|
_, err := s.db.Exec(ctx, `UPDATE emails SET indexed_at = NOW() WHERE id = $1`, id)
|
|
return err
|
|
}
|
|
|
|
// IsIndexed checks whether a given email has been indexed (indexed_at IS NOT NULL).
|
|
func (s *Store) IsIndexed(ctx context.Context, id string) (bool, error) {
|
|
if s.db == nil {
|
|
return false, nil
|
|
}
|
|
var indexed bool
|
|
err := s.db.QueryRow(ctx,
|
|
`SELECT indexed_at IS NOT NULL FROM emails WHERE id = $1`, id,
|
|
).Scan(&indexed)
|
|
if errors.Is(err, pgx.ErrNoRows) {
|
|
return false, nil
|
|
}
|
|
return indexed, err
|
|
}
|
|
|
|
// ── Backfill ──────────────────────────────────────────────────────────────
|
|
|
|
// Backfill walks the store directory, parses each email, inserts missing DB
|
|
// metadata rows, and re-indexes emails that have indexed_at IS NULL.
|
|
func (s *Store) Backfill(ctx context.Context, idx interface{ IndexSync(doc interface{}) error }, logger *slog.Logger) error {
|
|
// We accept a generic indexer interface to avoid import cycles.
|
|
// The caller (main.go) wraps the real index.Indexer.
|
|
return s.BackfillWithFuncs(ctx, logger)
|
|
}
|
|
|
|
// BackfillWithFuncs walks the store and calls the provided functions for each email.
|
|
// This is the implementation used by the coordinator in main.go.
|
|
func (s *Store) BackfillWithFuncs(ctx context.Context, logger *slog.Logger) error {
|
|
if s.db == nil {
|
|
logger.Info("backfill: skipping, no database configured")
|
|
return nil
|
|
}
|
|
|
|
storeDir := filepath.Join(s.dir, "store")
|
|
count := 0
|
|
indexed := 0
|
|
errCount := 0
|
|
|
|
err := filepath.WalkDir(storeDir, func(path string, d fs.DirEntry, werr error) error {
|
|
if werr != nil {
|
|
return werr
|
|
}
|
|
if d.IsDir() {
|
|
return nil
|
|
}
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
default:
|
|
}
|
|
|
|
count++
|
|
id := d.Name()
|
|
|
|
// Load the file (handles decryption)
|
|
raw, err := s.Load(id)
|
|
if err != nil {
|
|
logger.Warn("backfill: load failed", "id", id, "err", err)
|
|
errCount++
|
|
return nil // continue with next file
|
|
}
|
|
|
|
// Parse
|
|
pm, err := mailparser.Parse(raw)
|
|
if err != nil {
|
|
logger.Warn("backfill: parse failed", "id", id, "err", err)
|
|
errCount++
|
|
return nil
|
|
}
|
|
|
|
// Upsert metadata
|
|
if err := s.SaveMeta(ctx, id, pm, len(raw)); err != nil {
|
|
logger.Warn("backfill: save meta failed", "id", id, "err", err)
|
|
}
|
|
|
|
// Check if already indexed
|
|
alreadyIndexed, err := s.IsIndexed(ctx, id)
|
|
if err != nil {
|
|
logger.Warn("backfill: check indexed failed", "id", id, "err", err)
|
|
}
|
|
|
|
if !alreadyIndexed {
|
|
indexed++
|
|
}
|
|
|
|
if count%100 == 0 {
|
|
logger.Info("backfill: progress", "processed", count, "need_index", indexed, "errors", errCount)
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("backfill: walk: %w", err)
|
|
}
|
|
|
|
logger.Info("backfill: complete", "total", count, "need_index", indexed, "errors", errCount)
|
|
return nil
|
|
}
|
|
|
|
// WalkStore iterates over all files in the store directory and calls fn for each.
|
|
// The fn receives the email ID. This is used by the backfill coordinator.
|
|
func (s *Store) WalkStore(ctx context.Context, fn func(id string) error) error {
|
|
storeDir := filepath.Join(s.dir, "store")
|
|
return filepath.WalkDir(storeDir, func(path string, d fs.DirEntry, werr error) error {
|
|
if werr != nil {
|
|
return werr
|
|
}
|
|
if d.IsDir() {
|
|
return nil
|
|
}
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
default:
|
|
}
|
|
return fn(d.Name())
|
|
})
|
|
}
|
|
|
|
// ── IMAP UID queries ──────────────────────────────────────────────────────
|
|
|
|
// GetMailsWithUID returns all email IDs with stable UIDs for a tenant, ordered by uid ASC.
|
|
// Used for shared IMAP mode.
|
|
func (s *Store) GetMailsWithUID(ctx context.Context, tenantID *int64) ([]MailWithUID, error) {
|
|
if s.db == nil {
|
|
ids, err := s.GetAllIDsByTenant(ctx, tenantID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
out := make([]MailWithUID, len(ids))
|
|
for i, id := range ids {
|
|
out[i] = MailWithUID{ID: id, UID: int64(i + 1)}
|
|
}
|
|
return out, nil
|
|
}
|
|
var rows pgx.Rows
|
|
var err error
|
|
if tenantID == nil {
|
|
rows, err = s.db.Query(ctx,
|
|
`SELECT id, COALESCE(uid, 0) FROM emails ORDER BY uid ASC NULLS LAST`)
|
|
} else {
|
|
rows, err = s.db.Query(ctx, `
|
|
SELECT e.id, COALESCE(e.uid, 0)
|
|
FROM email_refs r
|
|
JOIN emails e ON e.id = r.email_id
|
|
WHERE r.tenant_id = $1
|
|
ORDER BY e.uid ASC NULLS LAST`, *tenantID)
|
|
}
|
|
if err != nil {
|
|
return nil, fmt.Errorf("storage: get mails with uid: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
var result []MailWithUID
|
|
for rows.Next() {
|
|
var m MailWithUID
|
|
if err := rows.Scan(&m.ID, &m.UID); err == nil {
|
|
result = append(result, m)
|
|
}
|
|
}
|
|
return result, rows.Err()
|
|
}
|
|
|
|
// GetMailsByRecipient returns mails where mail_to or mail_from contains the given email address.
|
|
// Used for personal IMAP mode filtering — includes both received and sent mails.
|
|
func (s *Store) GetMailsByRecipient(ctx context.Context, tenantID *int64, email string) ([]MailWithUID, error) {
|
|
if s.db == nil || email == "" {
|
|
return nil, nil
|
|
}
|
|
pattern := "%" + email + "%"
|
|
var rows pgx.Rows
|
|
var err error
|
|
if tenantID == nil {
|
|
rows, err = s.db.Query(ctx,
|
|
`SELECT id, COALESCE(uid, 0) FROM emails WHERE mail_to ILIKE $1 OR mail_from ILIKE $1 ORDER BY uid ASC NULLS LAST`,
|
|
pattern)
|
|
} else {
|
|
rows, err = s.db.Query(ctx, `
|
|
SELECT DISTINCT e.id, COALESCE(e.uid, 0)
|
|
FROM email_refs r
|
|
JOIN emails e ON e.id = r.email_id
|
|
WHERE r.tenant_id = $1
|
|
AND (e.mail_to ILIKE $2 OR e.mail_from ILIKE $2)
|
|
ORDER BY e.uid ASC NULLS LAST`, *tenantID, pattern)
|
|
}
|
|
if err != nil {
|
|
return nil, fmt.Errorf("storage: get mails by recipient: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
var result []MailWithUID
|
|
for rows.Next() {
|
|
var m MailWithUID
|
|
if err := rows.Scan(&m.ID, &m.UID); err == nil {
|
|
result = append(result, m)
|
|
}
|
|
}
|
|
return result, rows.Err()
|
|
}
|
|
|
|
// ── File path helper ──────────────────────────────────────────────────────
|
|
|
|
// filePath returns the on-disk path for a given mail ID.
|
|
// Uses 2-char prefix sharding: {dir}/store/{id[:2]}/{id}
|
|
func (s *Store) filePath(id string) string {
|
|
return filepath.Join(s.dir, "store", id[:2], id)
|
|
}
|
|
|
|
// ── Integrity verification ────────────────────────────────────────────────
|
|
|
|
// VerifyIntegrity re-computes the SHA-256 of the stored plaintext and
|
|
// compares it to the file ID. Updates verify_ok and verified_at in the DB.
|
|
func (s *Store) VerifyIntegrity(ctx context.Context, id string) (bool, error) {
|
|
raw, err := s.Load(id)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
sum := sha256.Sum256(raw)
|
|
computed := fmt.Sprintf("%x", sum[:])
|
|
ok := computed == id
|
|
if s.db != nil {
|
|
s.db.Exec(ctx,
|
|
`UPDATE emails SET verify_ok=$1, verified_at=NOW() WHERE id=$2`,
|
|
ok, id)
|
|
}
|
|
return ok, nil
|
|
}
|
|
|
|
// GetTenantForMail returns the tenant_id stored directly on the email record.
|
|
// Returns nil if no tenant is assigned or the mail does not exist.
|
|
func (s *Store) GetTenantForMail(ctx context.Context, id string) (*int64, error) {
|
|
if s.db == nil {
|
|
return nil, nil
|
|
}
|
|
var tenantID *int64
|
|
err := s.db.QueryRow(ctx, `SELECT tenant_id FROM emails WHERE id = $1`, id).Scan(&tenantID)
|
|
if errors.Is(err, pgx.ErrNoRows) {
|
|
return nil, nil
|
|
}
|
|
if err != nil {
|
|
return nil, fmt.Errorf("storage: get tenant for mail: %w", err)
|
|
}
|
|
return tenantID, nil
|
|
}
|
|
|
|
// GetAllIDsByTenant returns all email IDs visible to a tenant.
|
|
// If tenantID is nil, all IDs are returned (superadmin / no-tenant context).
|
|
func (s *Store) GetAllIDsByTenant(ctx context.Context, tenantID *int64) ([]string, error) {
|
|
if s.db != nil {
|
|
var (
|
|
rows pgx.Rows
|
|
err error
|
|
)
|
|
if tenantID == nil {
|
|
rows, err = s.db.Query(ctx, `SELECT id FROM emails ORDER BY received_at`)
|
|
} else {
|
|
rows, err = s.db.Query(ctx,
|
|
`SELECT email_id FROM email_refs WHERE tenant_id = $1`, *tenantID)
|
|
}
|
|
if err != nil {
|
|
return nil, fmt.Errorf("storage: get ids by tenant: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
var ids []string
|
|
for rows.Next() {
|
|
var id string
|
|
if err := rows.Scan(&id); err != nil {
|
|
continue
|
|
}
|
|
ids = append(ids, id)
|
|
}
|
|
return ids, rows.Err()
|
|
}
|
|
// fallback: walk store (no tenant filtering possible without DB)
|
|
var ids []string
|
|
err := s.WalkStore(ctx, func(id string) error {
|
|
ids = append(ids, id)
|
|
return nil
|
|
})
|
|
return ids, err
|
|
}
|
|
|
|
// StatsByTenant returns mail count and total size filtered by tenant.
|
|
// If tenantID is nil, aggregate over all emails.
|
|
func (s *Store) StatsByTenant(ctx context.Context, tenantID *int64) (map[string]interface{}, error) {
|
|
if s.db == nil {
|
|
st, err := s.statsFromFS()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return map[string]interface{}{
|
|
"count": st.TotalMails,
|
|
"total_size": st.TotalBytes,
|
|
}, nil
|
|
}
|
|
|
|
var count int64
|
|
var totalSize int64
|
|
|
|
if tenantID == nil {
|
|
err := s.db.QueryRow(ctx,
|
|
`SELECT COALESCE(COUNT(*),0), COALESCE(SUM(size_bytes),0) FROM emails`,
|
|
).Scan(&count, &totalSize)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("storage: stats by tenant: %w", err)
|
|
}
|
|
} else {
|
|
err := s.db.QueryRow(ctx, `
|
|
SELECT COALESCE(COUNT(e.id),0), COALESCE(SUM(e.size_bytes),0)
|
|
FROM email_refs r
|
|
JOIN emails e ON e.id = r.email_id
|
|
WHERE r.tenant_id = $1
|
|
`, *tenantID).Scan(&count, &totalSize)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("storage: stats by tenant: %w", err)
|
|
}
|
|
}
|
|
|
|
return map[string]interface{}{
|
|
"count": count,
|
|
"total_size": totalSize,
|
|
}, nil
|
|
}
|
|
|
|
// IsWithoutTenant reports whether a mail has no tenant assignment —
|
|
// neither a direct tenant_id nor any email_refs entry.
|
|
// Used by the auditor role to gate direct mail access.
|
|
func (s *Store) IsWithoutTenant(ctx context.Context, id string) (bool, error) {
|
|
if s.db == nil {
|
|
return false, nil
|
|
}
|
|
var result bool
|
|
err := s.db.QueryRow(ctx, `
|
|
SELECT (e.tenant_id IS NULL)
|
|
AND NOT EXISTS (SELECT 1 FROM email_refs r WHERE r.email_id = e.id)
|
|
FROM emails e WHERE e.id = $1
|
|
`, id).Scan(&result)
|
|
if errors.Is(err, pgx.ErrNoRows) {
|
|
return false, nil
|
|
}
|
|
if err != nil {
|
|
return false, fmt.Errorf("storage: is without tenant: %w", err)
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
// GetAllIDsWithoutTenant returns email IDs that have no tenant assignment.
|
|
// Used for the auditor role: access to global (untenanted) mails only.
|
|
func (s *Store) GetAllIDsWithoutTenant(ctx context.Context) ([]string, error) {
|
|
if s.db == nil {
|
|
return nil, nil
|
|
}
|
|
rows, err := s.db.Query(ctx, `
|
|
SELECT e.id FROM emails e
|
|
WHERE e.tenant_id IS NULL
|
|
AND NOT EXISTS (SELECT 1 FROM email_refs r WHERE r.email_id = e.id)
|
|
ORDER BY e.received_at
|
|
`)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("storage: get ids without tenant: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
var ids []string
|
|
for rows.Next() {
|
|
var id string
|
|
if err := rows.Scan(&id); err != nil {
|
|
continue
|
|
}
|
|
ids = append(ids, id)
|
|
}
|
|
return ids, rows.Err()
|
|
}
|
|
|
|
// GetAllIDs returns all email IDs from the DB, or walks the store if no DB.
|
|
func (s *Store) GetAllIDs(ctx context.Context) ([]string, error) {
|
|
if s.db != nil {
|
|
rows, err := s.db.Query(ctx, `SELECT id FROM emails ORDER BY received_at`)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
var ids []string
|
|
for rows.Next() {
|
|
var id string
|
|
if err := rows.Scan(&id); err != nil {
|
|
continue
|
|
}
|
|
ids = append(ids, id)
|
|
}
|
|
return ids, nil
|
|
}
|
|
// fallback: walk store
|
|
var ids []string
|
|
err := s.WalkStore(ctx, func(id string) error {
|
|
ids = append(ids, id)
|
|
return nil
|
|
})
|
|
return ids, err
|
|
}
|
|
|
|
// VerifyStatus holds the result of an integrity verification.
|
|
type VerifyStatus struct {
|
|
VerifyOK *bool // nil = not yet checked
|
|
VerifiedAt *time.Time
|
|
}
|
|
|
|
// GetVerifyStatus returns the stored verification status for a given email ID.
|
|
func (s *Store) GetVerifyStatus(ctx context.Context, id string) (VerifyStatus, error) {
|
|
var vs VerifyStatus
|
|
if s.db == nil {
|
|
return vs, nil
|
|
}
|
|
row := s.db.QueryRow(ctx,
|
|
`SELECT verify_ok, verified_at FROM emails WHERE id=$1`, id)
|
|
var ok *bool
|
|
var at *time.Time
|
|
if err := row.Scan(&ok, &at); err != nil {
|
|
return vs, nil // not found = not verified
|
|
}
|
|
vs.VerifyOK = ok
|
|
vs.VerifiedAt = at
|
|
return vs, nil
|
|
}
|
|
|
|
// DBQueryRow exposes a single DB query row for use by the API metrics handler.
|
|
// Returns a no-op row if no DB is configured.
|
|
func (s *Store) DBQueryRow(ctx context.Context, sql string, args ...interface{}) interface {
|
|
Scan(dest ...interface{}) error
|
|
} {
|
|
if s.db == nil {
|
|
return &noopRow{}
|
|
}
|
|
return s.db.QueryRow(ctx, sql, args...)
|
|
}
|
|
|
|
type noopRow struct{}
|
|
|
|
func (n *noopRow) Scan(dest ...interface{}) error { return nil }
|
|
|
|
// DBExec exposes a single DB exec for use by API handlers (e.g., API key management).
|
|
// Returns the number of rows affected. Returns 0 if no DB is configured.
|
|
func (s *Store) DBExec(ctx context.Context, sql string, args ...interface{}) (int64, error) {
|
|
if s.db == nil {
|
|
return 0, nil
|
|
}
|
|
tag, err := s.db.Exec(ctx, sql, args...)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
return tag.RowsAffected(), nil
|
|
}
|
|
|
|
// DBQuery exposes a multi-row DB query for use by API handlers (e.g., API key listing).
|
|
// Returns nil rows if no DB is configured.
|
|
func (s *Store) DBQuery(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error) {
|
|
if s.db == nil {
|
|
return nil, fmt.Errorf("storage: no database configured")
|
|
}
|
|
return s.db.Query(ctx, sql, args...)
|
|
}
|