feat(PROJ-36,PROJ-37): gzip-Kompression + Attachment-Deduplication
Sprint 1: Emails werden vor AES-256-GCM optional gzip-komprimiert (compress: true). Magic-Byte 0x01 als Prefix ermöglicht backward-kompatibles Load() für Legacy-Dateien. Neue DB-Tabelle storage_objects trackt Kompressions-Metadaten. Sprint 2: Attachments werden via SHA-256 dedupliziert — gleicher Anhang in N Mails wird nur einmal gespeichert. Neue Tabellen: attachments, email_attachments. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -68,6 +68,7 @@ func runExport(args []string) {
|
|||||||
Dir: cfg.Storage.StorePath,
|
Dir: cfg.Storage.StorePath,
|
||||||
Keyfile: cfg.Storage.Keyfile,
|
Keyfile: cfg.Storage.Keyfile,
|
||||||
DSN: cfg.Database.DSN(),
|
DSN: cfg.Database.DSN(),
|
||||||
|
CompressEnabled: cfg.Storage.Compress,
|
||||||
}
|
}
|
||||||
mailStore, err := storage.New(storeCfg)
|
mailStore, err := storage.New(storeCfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@@ -60,6 +60,7 @@ func runImport(args []string) {
|
|||||||
Dir: cfg.Storage.StorePath,
|
Dir: cfg.Storage.StorePath,
|
||||||
Keyfile: cfg.Storage.Keyfile,
|
Keyfile: cfg.Storage.Keyfile,
|
||||||
DSN: cfg.Database.DSN(),
|
DSN: cfg.Database.DSN(),
|
||||||
|
CompressEnabled: cfg.Storage.Compress,
|
||||||
}
|
}
|
||||||
mailStore, err := storage.New(storeCfg)
|
mailStore, err := storage.New(storeCfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@@ -66,6 +66,7 @@ func runImportPiler(args []string) {
|
|||||||
Dir: cfg.Storage.StorePath,
|
Dir: cfg.Storage.StorePath,
|
||||||
Keyfile: cfg.Storage.Keyfile,
|
Keyfile: cfg.Storage.Keyfile,
|
||||||
DSN: cfg.Database.DSN(),
|
DSN: cfg.Database.DSN(),
|
||||||
|
CompressEnabled: cfg.Storage.Compress,
|
||||||
}
|
}
|
||||||
mailStore, err := storage.New(storeCfg)
|
mailStore, err := storage.New(storeCfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@@ -33,6 +33,7 @@ func runReindex(args []string) {
|
|||||||
Dir: cfg.Storage.StorePath,
|
Dir: cfg.Storage.StorePath,
|
||||||
Keyfile: cfg.Storage.Keyfile,
|
Keyfile: cfg.Storage.Keyfile,
|
||||||
DSN: cfg.Database.DSN(),
|
DSN: cfg.Database.DSN(),
|
||||||
|
CompressEnabled: cfg.Storage.Compress,
|
||||||
}
|
}
|
||||||
mailStore, err := storage.New(storeCfg)
|
mailStore, err := storage.New(storeCfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@@ -109,6 +109,7 @@ func main() {
|
|||||||
Keyfile: cfg.Storage.Keyfile,
|
Keyfile: cfg.Storage.Keyfile,
|
||||||
DSN: cfg.Database.DSN(),
|
DSN: cfg.Database.DSN(),
|
||||||
RetentionDays: cfg.Storage.RetentionDays,
|
RetentionDays: cfg.Storage.RetentionDays,
|
||||||
|
CompressEnabled: cfg.Storage.Compress,
|
||||||
}
|
}
|
||||||
mailStore, err := storage.New(storeCfg)
|
mailStore, err := storage.New(storeCfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
+1
-1
@@ -63,9 +63,9 @@ type SMTPOutConfig struct {
|
|||||||
type StorageConfig struct {
|
type StorageConfig struct {
|
||||||
StorePath string `yaml:"store_path"`
|
StorePath string `yaml:"store_path"`
|
||||||
AStorePath string `yaml:"astore_path"`
|
AStorePath string `yaml:"astore_path"`
|
||||||
XapianPath string `yaml:"xapian_path"`
|
|
||||||
Keyfile string `yaml:"keyfile"`
|
Keyfile string `yaml:"keyfile"`
|
||||||
RetentionDays int `yaml:"retention_days"` // 0 = kein Lock (GoBD-Compliance: z.B. 3650 für 10 Jahre)
|
RetentionDays int `yaml:"retention_days"` // 0 = kein Lock (GoBD-Compliance: z.B. 3650 für 10 Jahre)
|
||||||
|
Compress bool `yaml:"compress"` // gzip-Kompression vor AES-256-GCM (spart ~40-60% Disk)
|
||||||
}
|
}
|
||||||
|
|
||||||
// DatabaseConfig holds PostgreSQL connection settings.
|
// DatabaseConfig holds PostgreSQL connection settings.
|
||||||
|
|||||||
+3
-1
@@ -52,7 +52,9 @@
|
|||||||
| PROJ-34 | Retention-Policy + Löschsperre (GoBD-Compliance) | Deployed | [PROJ-34](PROJ-34-retention-policy.md) | 2026-03-31 |
|
| PROJ-34 | Retention-Policy + Löschsperre (GoBD-Compliance) | Deployed | [PROJ-34](PROJ-34-retention-policy.md) | 2026-03-31 |
|
||||||
|
|
||||||
| PROJ-35 | OCR & Anhang-Volltext-Indexierung | Planned | [PROJ-35](PROJ-35-ocr-anhang-volltext.md) | 2026-04-04 |
|
| PROJ-35 | OCR & Anhang-Volltext-Indexierung | Planned | [PROJ-35](PROJ-35-ocr-anhang-volltext.md) | 2026-04-04 |
|
||||||
|
| PROJ-36 | gzip-Kompression + storage_objects-Tabelle | In Progress | [PROJ-36](PROJ-36-compression-storage-objects.md) | 2026-04-05 |
|
||||||
|
| PROJ-37 | Attachment-Deduplication (Hash-basiert) | In Progress | [PROJ-37](PROJ-37-attachment-deduplication.md) | 2026-04-05 |
|
||||||
|
|
||||||
<!-- Add features above this line -->
|
<!-- Add features above this line -->
|
||||||
|
|
||||||
## Next Available ID: PROJ-35
|
## Next Available ID: PROJ-38
|
||||||
|
|||||||
@@ -0,0 +1,109 @@
|
|||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"crypto/sha256"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
|
||||||
|
"github.com/archivmail/pkg/mailparser"
|
||||||
|
)
|
||||||
|
|
||||||
|
// saveAttachments deduplicates and stores attachments from a parsed email.
|
||||||
|
// Each unique attachment (by SHA-256 hash) is stored once on disk.
|
||||||
|
// email_attachments links attachments to their email record.
|
||||||
|
func (s *Store) saveAttachments(ctx context.Context, emailID string, pm *mailparser.ParsedMail) error {
|
||||||
|
if s.db == nil || len(pm.Attachments) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, att := range pm.Attachments {
|
||||||
|
if len(att.Data) == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
sum := sha256.Sum256(att.Data)
|
||||||
|
hash := fmt.Sprintf("%x", sum[:])
|
||||||
|
|
||||||
|
// Check if this attachment is already stored
|
||||||
|
var attID int64
|
||||||
|
err := s.db.QueryRow(ctx, `SELECT id FROM attachments WHERE hash = $1`, hash).Scan(&attID)
|
||||||
|
if err != nil {
|
||||||
|
// Not found — compress and store
|
||||||
|
toWrite := att.Data
|
||||||
|
compression := "none"
|
||||||
|
if s.compressEnabled {
|
||||||
|
compressed, cerr := compressGzip(att.Data)
|
||||||
|
if cerr == nil && len(compressed) < len(att.Data) {
|
||||||
|
toWrite = compressed
|
||||||
|
compression = "gzip"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
attPath := s.attachmentPath(hash)
|
||||||
|
if err := os.MkdirAll(filepath.Dir(attPath), 0o755); err != nil {
|
||||||
|
return fmt.Errorf("storage: attachment mkdir: %w", err)
|
||||||
|
}
|
||||||
|
if _, statErr := os.Stat(attPath); os.IsNotExist(statErr) {
|
||||||
|
if err := os.WriteFile(attPath, toWrite, 0o644); err != nil {
|
||||||
|
return fmt.Errorf("storage: attachment write: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Register in storage_objects
|
||||||
|
var soID int64
|
||||||
|
soErr := s.db.QueryRow(ctx, `
|
||||||
|
INSERT INTO storage_objects (storage_type, path, compression, size_original, size_compressed, checksum)
|
||||||
|
VALUES ('filesystem', $1, $2, $3, $4, $5)
|
||||||
|
RETURNING id
|
||||||
|
`, attPath, compression, int64(len(att.Data)), int64(len(toWrite)), hash).Scan(&soID)
|
||||||
|
|
||||||
|
// Insert attachment record
|
||||||
|
var insertErr error
|
||||||
|
if soErr == nil {
|
||||||
|
insertErr = s.db.QueryRow(ctx, `
|
||||||
|
INSERT INTO attachments (filename, mime_type, size_bytes, hash, storage_id)
|
||||||
|
VALUES ($1, $2, $3, $4, $5)
|
||||||
|
ON CONFLICT (hash) DO UPDATE SET hash = EXCLUDED.hash
|
||||||
|
RETURNING id
|
||||||
|
`, att.Filename, att.ContentType, int64(len(att.Data)), hash, soID).Scan(&attID)
|
||||||
|
} else {
|
||||||
|
insertErr = s.db.QueryRow(ctx, `
|
||||||
|
INSERT INTO attachments (filename, mime_type, size_bytes, hash)
|
||||||
|
VALUES ($1, $2, $3, $4)
|
||||||
|
ON CONFLICT (hash) DO UPDATE SET hash = EXCLUDED.hash
|
||||||
|
RETURNING id
|
||||||
|
`, att.Filename, att.ContentType, int64(len(att.Data)), hash).Scan(&attID)
|
||||||
|
}
|
||||||
|
if insertErr != nil {
|
||||||
|
continue // non-fatal: mail is saved, attachment linking is best-effort
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Link attachment to email
|
||||||
|
_, _ = s.db.Exec(ctx, `
|
||||||
|
INSERT INTO email_attachments (email_id, attachment_id)
|
||||||
|
VALUES ($1, $2)
|
||||||
|
ON CONFLICT DO NOTHING
|
||||||
|
`, emailID, attID)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// LoadAttachment reads and decompresses an attachment by its SHA-256 hash.
|
||||||
|
func (s *Store) LoadAttachment(hash string) ([]byte, error) {
|
||||||
|
path := s.attachmentPath(hash)
|
||||||
|
data, err := os.ReadFile(path)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("storage: attachment not found: %s", hash)
|
||||||
|
}
|
||||||
|
return maybeDecompress(data)
|
||||||
|
}
|
||||||
|
|
||||||
|
// attachmentPath returns the on-disk path for a given attachment hash.
|
||||||
|
// Uses 2-level 2-char prefix sharding: {dir}/attachments/{ab}/{cd}/{hash}
|
||||||
|
func (s *Store) attachmentPath(hash string) string {
|
||||||
|
return filepath.Join(s.dir, "attachments", hash[:2], hash[2:4], hash)
|
||||||
|
}
|
||||||
@@ -0,0 +1,56 @@
|
|||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"compress/gzip"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Compression magic byte — prepended to compressed content before encryption.
|
||||||
|
// RFC 2822 email headers always start with printable ASCII (>= 0x20),
|
||||||
|
// so 0x01 is safe as a marker and will never appear in legacy uncompressed files.
|
||||||
|
const (
|
||||||
|
magicGzip = byte(0x01)
|
||||||
|
)
|
||||||
|
|
||||||
|
// compressGzip compresses data with gzip and prepends the magic byte.
|
||||||
|
// Returns an error only on internal gzip failure (effectively never).
|
||||||
|
func compressGzip(data []byte) ([]byte, error) {
|
||||||
|
var buf bytes.Buffer
|
||||||
|
buf.WriteByte(magicGzip)
|
||||||
|
w, err := gzip.NewWriterLevel(&buf, gzip.BestCompression)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("storage: gzip writer: %w", err)
|
||||||
|
}
|
||||||
|
if _, err := w.Write(data); err != nil {
|
||||||
|
_ = w.Close()
|
||||||
|
return nil, fmt.Errorf("storage: gzip write: %w", err)
|
||||||
|
}
|
||||||
|
if err := w.Close(); err != nil {
|
||||||
|
return nil, fmt.Errorf("storage: gzip close: %w", err)
|
||||||
|
}
|
||||||
|
return buf.Bytes(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// maybeDecompress inspects the first byte of data and decompresses if needed.
|
||||||
|
// Legacy files (no magic byte, first byte is printable ASCII) are returned as-is.
|
||||||
|
func maybeDecompress(data []byte) ([]byte, error) {
|
||||||
|
if len(data) < 2 {
|
||||||
|
return data, nil
|
||||||
|
}
|
||||||
|
if data[0] != magicGzip {
|
||||||
|
// Legacy file: no compression — return as-is
|
||||||
|
return data, nil
|
||||||
|
}
|
||||||
|
r, err := gzip.NewReader(bytes.NewReader(data[1:]))
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("storage: gzip reader: %w", err)
|
||||||
|
}
|
||||||
|
defer r.Close()
|
||||||
|
out, err := io.ReadAll(r)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("storage: gzip decompress: %w", err)
|
||||||
|
}
|
||||||
|
return out, nil
|
||||||
|
}
|
||||||
+81
-19
@@ -32,6 +32,7 @@ type Config struct {
|
|||||||
Keyfile string // path to 32-byte AES key file; empty = no encryption
|
Keyfile string // path to 32-byte AES key file; empty = no encryption
|
||||||
DSN string // PostgreSQL DSN; empty = no DB
|
DSN string // PostgreSQL DSN; empty = no DB
|
||||||
RetentionDays int // 0 = no lock; >0 = GoBD retention period in days
|
RetentionDays int // 0 = no lock; >0 = GoBD retention period in days
|
||||||
|
CompressEnabled bool // gzip-compress emails and attachments before encryption
|
||||||
}
|
}
|
||||||
|
|
||||||
// Store is a file-based email storage with optional AES-256-GCM encryption
|
// Store is a file-based email storage with optional AES-256-GCM encryption
|
||||||
@@ -41,6 +42,7 @@ type Store struct {
|
|||||||
key []byte // nil = no encryption
|
key []byte // nil = no encryption
|
||||||
db *pgxpool.Pool // nil = no DB
|
db *pgxpool.Pool // nil = no DB
|
||||||
retentionDays int // 0 = no lock
|
retentionDays int // 0 = no lock
|
||||||
|
compressEnabled bool // gzip before encryption
|
||||||
}
|
}
|
||||||
|
|
||||||
// StoreStats reports total mail count and size in bytes.
|
// StoreStats reports total mail count and size in bytes.
|
||||||
@@ -70,7 +72,7 @@ func New(cfg Config) (*Store, error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
s := &Store{dir: cfg.Dir, retentionDays: cfg.RetentionDays}
|
s := &Store{dir: cfg.Dir, retentionDays: cfg.RetentionDays, compressEnabled: cfg.CompressEnabled}
|
||||||
|
|
||||||
// Load encryption key
|
// Load encryption key
|
||||||
if err := s.loadKey(cfg.Keyfile); err != nil {
|
if err := s.loadKey(cfg.Keyfile); err != nil {
|
||||||
@@ -99,6 +101,8 @@ func New(cfg Config) (*Store, error) {
|
|||||||
// PROJ-33: Stable IMAP UIDs
|
// PROJ-33: Stable IMAP UIDs
|
||||||
_, _ = s.db.Exec(ctx, `ALTER TABLE emails ADD COLUMN IF NOT EXISTS uid BIGSERIAL`)
|
_, _ = s.db.Exec(ctx, `ALTER TABLE emails ADD COLUMN IF NOT EXISTS uid BIGSERIAL`)
|
||||||
_, _ = s.db.Exec(ctx, `CREATE UNIQUE INDEX IF NOT EXISTS idx_emails_uid ON emails (uid)`)
|
_, _ = s.db.Exec(ctx, `CREATE UNIQUE INDEX IF NOT EXISTS idx_emails_uid ON emails (uid)`)
|
||||||
|
// 2.0: storage_objects FK on emails
|
||||||
|
_, _ = s.db.Exec(ctx, `ALTER TABLE emails ADD COLUMN IF NOT EXISTS storage_id BIGINT REFERENCES storage_objects(id)`)
|
||||||
}
|
}
|
||||||
|
|
||||||
return s, nil
|
return s, nil
|
||||||
@@ -197,7 +201,33 @@ func (s *Store) decrypt(data []byte) ([]byte, error) {
|
|||||||
// ── Database schema ───────────────────────────────────────────────────────
|
// ── Database schema ───────────────────────────────────────────────────────
|
||||||
|
|
||||||
func (s *Store) initSchema(ctx context.Context) error {
|
func (s *Store) initSchema(ctx context.Context) error {
|
||||||
|
// storage_objects must exist before emails (FK dependency)
|
||||||
_, err := s.db.Exec(ctx, `
|
_, err := s.db.Exec(ctx, `
|
||||||
|
CREATE TABLE IF NOT EXISTS storage_objects (
|
||||||
|
id BIGSERIAL PRIMARY KEY,
|
||||||
|
storage_type TEXT NOT NULL DEFAULT 'filesystem',
|
||||||
|
path TEXT NOT NULL,
|
||||||
|
compression TEXT NOT NULL DEFAULT 'none',
|
||||||
|
size_original BIGINT,
|
||||||
|
size_compressed BIGINT,
|
||||||
|
checksum CHAR(64),
|
||||||
|
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
||||||
|
);
|
||||||
|
CREATE TABLE IF NOT EXISTS attachments (
|
||||||
|
id BIGSERIAL PRIMARY KEY,
|
||||||
|
filename TEXT,
|
||||||
|
mime_type TEXT,
|
||||||
|
size_bytes BIGINT,
|
||||||
|
hash CHAR(64) UNIQUE NOT NULL,
|
||||||
|
storage_id BIGINT REFERENCES storage_objects(id),
|
||||||
|
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
||||||
|
);
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_attachments_hash ON attachments (hash);
|
||||||
|
`)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
_, err = s.db.Exec(ctx, `
|
||||||
CREATE TABLE IF NOT EXISTS emails (
|
CREATE TABLE IF NOT EXISTS emails (
|
||||||
id TEXT PRIMARY KEY,
|
id TEXT PRIMARY KEY,
|
||||||
received_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
received_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||||
@@ -228,6 +258,12 @@ func (s *Store) initSchema(ctx context.Context) error {
|
|||||||
);
|
);
|
||||||
CREATE INDEX IF NOT EXISTS idx_email_refs_tenant ON email_refs (tenant_id);
|
CREATE INDEX IF NOT EXISTS idx_email_refs_tenant ON email_refs (tenant_id);
|
||||||
CREATE INDEX IF NOT EXISTS idx_email_refs_email ON email_refs (email_id);
|
CREATE INDEX IF NOT EXISTS idx_email_refs_email ON email_refs (email_id);
|
||||||
|
CREATE TABLE IF NOT EXISTS email_attachments (
|
||||||
|
email_id TEXT NOT NULL REFERENCES emails(id),
|
||||||
|
attachment_id BIGINT NOT NULL REFERENCES attachments(id),
|
||||||
|
PRIMARY KEY (email_id, attachment_id)
|
||||||
|
);
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_email_attachments_email ON email_attachments (email_id);
|
||||||
`)
|
`)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -290,16 +326,27 @@ func (s *Store) Save(ctx context.Context, raw []byte, _ time.Time, tenantID *int
|
|||||||
}
|
}
|
||||||
|
|
||||||
if !fileExists {
|
if !fileExists {
|
||||||
// Determine what to write: encrypted or plaintext
|
// Compress before encryption (if enabled)
|
||||||
|
toStore := raw
|
||||||
|
compression := "none"
|
||||||
|
if s.compressEnabled {
|
||||||
|
compressed, cerr := compressGzip(raw)
|
||||||
|
if cerr == nil && len(compressed) < len(raw) {
|
||||||
|
toStore = compressed
|
||||||
|
compression = "gzip"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Encrypt (if key configured)
|
||||||
var toWrite []byte
|
var toWrite []byte
|
||||||
if s.key != nil {
|
if s.key != nil {
|
||||||
encrypted, err := s.encrypt(raw)
|
encrypted, err := s.encrypt(toStore)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
toWrite = encrypted
|
toWrite = encrypted
|
||||||
} else {
|
} else {
|
||||||
toWrite = raw
|
toWrite = toStore
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := os.WriteFile(path, toWrite, 0o644); err != nil {
|
if err := os.WriteFile(path, toWrite, 0o644); err != nil {
|
||||||
@@ -308,8 +355,19 @@ func (s *Store) Save(ctx context.Context, raw []byte, _ time.Time, tenantID *int
|
|||||||
|
|
||||||
// Insert metadata into DB
|
// Insert metadata into DB
|
||||||
if s.db != nil {
|
if s.db != nil {
|
||||||
|
// Register in storage_objects
|
||||||
|
var storageID *int64
|
||||||
|
var sid int64
|
||||||
|
if soErr := s.db.QueryRow(ctx, `
|
||||||
|
INSERT INTO storage_objects (storage_type, path, compression, size_original, size_compressed, checksum)
|
||||||
|
VALUES ('filesystem', $1, $2, $3, $4, $5)
|
||||||
|
RETURNING id
|
||||||
|
`, path, compression, int64(len(raw)), int64(len(toWrite)), id).Scan(&sid); soErr == nil {
|
||||||
|
storageID = &sid
|
||||||
|
}
|
||||||
|
|
||||||
if parseErr == nil {
|
if parseErr == nil {
|
||||||
if err := s.insertMeta(ctx, id, pm, len(raw), tenantID); err != nil {
|
if err := s.insertMeta(ctx, id, pm, len(raw), tenantID, storageID); err != nil {
|
||||||
// Race: another goroutine inserted via Message-ID UNIQUE conflict.
|
// Race: another goroutine inserted via Message-ID UNIQUE conflict.
|
||||||
// Resolve to the existing record's ID.
|
// Resolve to the existing record's ID.
|
||||||
if messageID != "" {
|
if messageID != "" {
|
||||||
@@ -327,7 +385,11 @@ func (s *Store) Save(ctx context.Context, raw []byte, _ time.Time, tenantID *int
|
|||||||
// Non-conflict insert error: log but continue (file is written, metadata can be backfilled)
|
// Non-conflict insert error: log but continue (file is written, metadata can be backfilled)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
s.insertMetaMinimal(ctx, id, len(raw), tenantID)
|
s.insertMetaMinimal(ctx, id, len(raw), tenantID, storageID)
|
||||||
|
}
|
||||||
|
// Sprint 2: deduplicate and store attachments
|
||||||
|
if parseErr == nil {
|
||||||
|
_ = s.saveAttachments(ctx, id, pm)
|
||||||
}
|
}
|
||||||
// PROJ-34: Set retention lock.
|
// PROJ-34: Set retention lock.
|
||||||
// Mandanten-Mails: nur wenn der Mandant explizit retention_days > 0 gesetzt hat.
|
// Mandanten-Mails: nur wenn der Mandant explizit retention_days > 0 gesetzt hat.
|
||||||
@@ -390,14 +452,14 @@ func (s *Store) Load(id string) ([]byte, error) {
|
|||||||
if s.key != nil {
|
if s.key != nil {
|
||||||
plaintext, err := s.decrypt(data)
|
plaintext, err := s.decrypt(data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// If decryption fails, the file might be stored unencrypted
|
// Pre-encryption era: file stored unencrypted — try decompression anyway.
|
||||||
// (pre-encryption era). Return as-is for backwards compatibility.
|
out, _ := maybeDecompress(data)
|
||||||
return data, nil
|
return out, nil
|
||||||
}
|
}
|
||||||
return plaintext, nil
|
data = plaintext
|
||||||
}
|
}
|
||||||
|
|
||||||
return data, nil
|
return maybeDecompress(data)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete removes a stored email by its ID, including its DB metadata row.
|
// Delete removes a stored email by its ID, including its DB metadata row.
|
||||||
@@ -574,7 +636,7 @@ func (s *Store) firstAndLastFromFS() (first, last *MailRef, err error) {
|
|||||||
|
|
||||||
// insertMeta inserts parsed email metadata into the emails table.
|
// insertMeta inserts parsed email metadata into the emails table.
|
||||||
// Returns an error so the caller can detect UNIQUE-constraint conflicts on message_id.
|
// Returns an error so the caller can detect UNIQUE-constraint conflicts on message_id.
|
||||||
func (s *Store) insertMeta(ctx context.Context, id string, pm *mailparser.ParsedMail, size int, tenantID *int64) error {
|
func (s *Store) insertMeta(ctx context.Context, id string, pm *mailparser.ParsedMail, size int, tenantID *int64, storageID *int64) error {
|
||||||
mailTo := strings.Join(pm.To, ", ")
|
mailTo := strings.Join(pm.To, ", ")
|
||||||
hasAttach := len(pm.Attachments) > 0
|
hasAttach := len(pm.Attachments) > 0
|
||||||
|
|
||||||
@@ -588,20 +650,20 @@ func (s *Store) insertMeta(ctx context.Context, id string, pm *mailparser.Parsed
|
|||||||
receivedAt = time.Now()
|
receivedAt = time.Now()
|
||||||
}
|
}
|
||||||
_, err := s.db.Exec(ctx, `
|
_, err := s.db.Exec(ctx, `
|
||||||
INSERT INTO emails (id, received_at, mail_from, mail_to, subject, size_bytes, has_attach, tenant_id, message_id)
|
INSERT INTO emails (id, received_at, mail_from, mail_to, subject, size_bytes, has_attach, tenant_id, message_id, storage_id)
|
||||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
|
||||||
ON CONFLICT (id) DO NOTHING
|
ON CONFLICT (id) DO NOTHING
|
||||||
`, id, receivedAt, pm.From, mailTo, pm.Subject, int64(size), hasAttach, tenantID, msgID)
|
`, id, receivedAt, pm.From, mailTo, pm.Subject, int64(size), hasAttach, tenantID, msgID, storageID)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// insertMetaMinimal inserts minimal metadata when parsing fails.
|
// insertMetaMinimal inserts minimal metadata when parsing fails.
|
||||||
func (s *Store) insertMetaMinimal(ctx context.Context, id string, size int, tenantID *int64) {
|
func (s *Store) insertMetaMinimal(ctx context.Context, id string, size int, tenantID *int64, storageID *int64) {
|
||||||
_, _ = s.db.Exec(ctx, `
|
_, _ = s.db.Exec(ctx, `
|
||||||
INSERT INTO emails (id, received_at, size_bytes, tenant_id)
|
INSERT INTO emails (id, received_at, size_bytes, tenant_id, storage_id)
|
||||||
VALUES ($1, NOW(), $2, $3)
|
VALUES ($1, NOW(), $2, $3, $4)
|
||||||
ON CONFLICT (id) DO NOTHING
|
ON CONFLICT (id) DO NOTHING
|
||||||
`, id, int64(size), tenantID)
|
`, id, int64(size), tenantID, storageID)
|
||||||
}
|
}
|
||||||
|
|
||||||
// SaveMeta upserts metadata for a given email ID. Used by the backfill process.
|
// SaveMeta upserts metadata for a given email ID. Used by the backfill process.
|
||||||
|
|||||||
Reference in New Issue
Block a user