feat(PROJ-36): archivmail recompress — Nachkomprimierung bestehender Mails
Neuer CLI-Subcommand: archivmail recompress [--dry-run] Komprimiert alle unkomprimierten Dateien im Store atomisch (temp + rename). Überspringt bereits komprimierte Dateien (Magic-Byte 0x01). Aktualisiert storage_objects und emails.storage_id in der DB. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,180 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io/fs"
|
||||
"log/slog"
|
||||
"os"
|
||||
"path/filepath"
|
||||
)
|
||||
|
||||
// RecompressStats holds counters from a Recompress run.
|
||||
type RecompressStats struct {
|
||||
Total int
|
||||
Compressed int // files that were compressed in this run
|
||||
AlreadyCompressed int // files already compressed, skipped
|
||||
Errors int // files skipped due to errors
|
||||
BytesSaved int64 // total bytes saved (original − compressed on disk)
|
||||
}
|
||||
|
||||
// Recompress walks the store directory and gzip-compresses any mail file that
|
||||
// is not yet compressed. Files are replaced atomically via a temp file + rename.
|
||||
// Already-compressed files (magic byte 0x01) are skipped.
|
||||
// If dryRun is true, no files are written but stats are calculated.
|
||||
func (s *Store) Recompress(ctx context.Context, dryRun bool, logger *slog.Logger) (RecompressStats, error) {
|
||||
storeDir := filepath.Join(s.dir, "store")
|
||||
var stats RecompressStats
|
||||
|
||||
err := filepath.WalkDir(storeDir, func(path string, d fs.DirEntry, werr error) error {
|
||||
if werr != nil {
|
||||
return werr
|
||||
}
|
||||
if d.IsDir() {
|
||||
return nil
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
stats.Total++
|
||||
id := d.Name()
|
||||
|
||||
// Read raw file bytes (encrypted or not)
|
||||
raw, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
logger.Warn("recompress: read failed", "id", id, "err", err)
|
||||
stats.Errors++
|
||||
return nil
|
||||
}
|
||||
|
||||
// Decrypt to get plaintext
|
||||
plaintext := raw
|
||||
if s.key != nil {
|
||||
pt, err := s.decrypt(raw)
|
||||
if err != nil {
|
||||
// Pre-encryption file — treat raw bytes as plaintext
|
||||
pt = raw
|
||||
}
|
||||
plaintext = pt
|
||||
}
|
||||
|
||||
// Already compressed?
|
||||
if len(plaintext) > 0 && plaintext[0] == magicGzip {
|
||||
stats.AlreadyCompressed++
|
||||
if stats.AlreadyCompressed%1000 == 0 {
|
||||
logger.Info("recompress: progress", "total", stats.Total, "already_compressed", stats.AlreadyCompressed)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Compress
|
||||
compressed, err := compressGzip(plaintext)
|
||||
if err != nil {
|
||||
logger.Warn("recompress: compress failed", "id", id, "err", err)
|
||||
stats.Errors++
|
||||
return nil
|
||||
}
|
||||
|
||||
// Skip if compression doesn't help
|
||||
if len(compressed) >= len(plaintext) {
|
||||
stats.AlreadyCompressed++
|
||||
return nil
|
||||
}
|
||||
|
||||
bytesSaved := int64(len(raw)) - int64(len(compressed))
|
||||
if s.key != nil {
|
||||
// After encryption the size changes; estimate savings from plaintext difference
|
||||
bytesSaved = int64(len(plaintext)) - int64(len(compressed))
|
||||
}
|
||||
|
||||
if dryRun {
|
||||
stats.Compressed++
|
||||
stats.BytesSaved += bytesSaved
|
||||
return nil
|
||||
}
|
||||
|
||||
// Encrypt compressed content
|
||||
toWrite := compressed
|
||||
if s.key != nil {
|
||||
enc, err := s.encrypt(compressed)
|
||||
if err != nil {
|
||||
logger.Warn("recompress: encrypt failed", "id", id, "err", err)
|
||||
stats.Errors++
|
||||
return nil
|
||||
}
|
||||
toWrite = enc
|
||||
}
|
||||
|
||||
// Atomic write: temp file in same directory, then rename
|
||||
dir := filepath.Dir(path)
|
||||
tmp, err := os.CreateTemp(dir, ".recompress-*")
|
||||
if err != nil {
|
||||
logger.Warn("recompress: temp file failed", "id", id, "err", err)
|
||||
stats.Errors++
|
||||
return nil
|
||||
}
|
||||
tmpName := tmp.Name()
|
||||
|
||||
if _, err := tmp.Write(toWrite); err != nil {
|
||||
_ = tmp.Close()
|
||||
_ = os.Remove(tmpName)
|
||||
logger.Warn("recompress: write temp failed", "id", id, "err", err)
|
||||
stats.Errors++
|
||||
return nil
|
||||
}
|
||||
if err := tmp.Close(); err != nil {
|
||||
_ = os.Remove(tmpName)
|
||||
logger.Warn("recompress: close temp failed", "id", id, "err", err)
|
||||
stats.Errors++
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := os.Rename(tmpName, path); err != nil {
|
||||
_ = os.Remove(tmpName)
|
||||
logger.Warn("recompress: rename failed", "id", id, "err", err)
|
||||
stats.Errors++
|
||||
return nil
|
||||
}
|
||||
|
||||
// Update storage_objects record if DB available
|
||||
if s.db != nil {
|
||||
origSize := int64(len(plaintext))
|
||||
compSize := int64(len(toWrite))
|
||||
_, _ = s.db.Exec(ctx, `
|
||||
INSERT INTO storage_objects (storage_type, path, compression, size_original, size_compressed, checksum)
|
||||
VALUES ('filesystem', $1, 'gzip', $2, $3, $4)
|
||||
ON CONFLICT DO NOTHING
|
||||
`, path, origSize, compSize, id)
|
||||
// Also update emails.storage_id if the record exists but storage_id is null
|
||||
_, _ = s.db.Exec(ctx, `
|
||||
UPDATE emails SET storage_id = (
|
||||
SELECT id FROM storage_objects WHERE checksum = $1 LIMIT 1
|
||||
) WHERE id = $1 AND storage_id IS NULL
|
||||
`, id)
|
||||
}
|
||||
|
||||
stats.Compressed++
|
||||
stats.BytesSaved += bytesSaved
|
||||
|
||||
if stats.Total%500 == 0 {
|
||||
logger.Info("recompress: progress",
|
||||
"total", stats.Total,
|
||||
"compressed", stats.Compressed,
|
||||
"already_compressed", stats.AlreadyCompressed,
|
||||
"bytes_saved", fmt.Sprintf("%.1f MB", float64(stats.BytesSaved)/1024/1024),
|
||||
)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return stats, fmt.Errorf("recompress: walk: %w", err)
|
||||
}
|
||||
|
||||
return stats, nil
|
||||
}
|
||||
Reference in New Issue
Block a user