From eb48081c5ef4617f4fb2af9bee93f42d9581fa0d Mon Sep 17 00:00:00 2001 From: sysops Date: Sun, 5 Apr 2026 20:28:50 +0200 Subject: [PATCH] =?UTF-8?q?feat(PROJ-38):=20rethread=20=E2=80=94=20r=C3=BC?= =?UTF-8?q?ckwirkendes=20Mail-Threading?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cmd/archivmail/cmd_import.go | 5 ++ cmd/archivmail/cmd_rethread.go | 65 ++++++++++++++++++ cmd/archivmail/main.go | 3 + internal/storage/rethread.go | 118 +++++++++++++++++++++++++++++++++ 4 files changed, 191 insertions(+) create mode 100644 cmd/archivmail/cmd_rethread.go create mode 100644 internal/storage/rethread.go diff --git a/cmd/archivmail/cmd_import.go b/cmd/archivmail/cmd_import.go index 7755e34..fb9730e 100644 --- a/cmd/archivmail/cmd_import.go +++ b/cmd/archivmail/cmd_import.go @@ -285,6 +285,7 @@ Commands: export E-Mails exportieren (EML, MBOX) reindex Index neu aufbauen (alle oder pro Mandant) recompress Bestehende Mails nachträglich gzip-komprimieren + rethread Thread-IDs rückwirkend aus In-Reply-To/References befüllen version Version anzeigen help Diese Hilfe anzeigen @@ -327,5 +328,9 @@ archivmail reindex [flags] archivmail recompress [flags] --config Pfad zur Konfigurationsdatei (Standard: /etc/archivmail/config.yml) --dry-run Simulation: zeigt wie viel gespart würde, ohne Dateien zu ändern + +archivmail rethread [flags] + --config Pfad zur Konfigurationsdatei (Standard: /etc/archivmail/config.yml) + --dry-run Simulation: zeigt wie viele Mails gethreaded würden, ohne DB zu ändern `, AppVersion) } diff --git a/cmd/archivmail/cmd_rethread.go b/cmd/archivmail/cmd_rethread.go new file mode 100644 index 0000000..cd569ad --- /dev/null +++ b/cmd/archivmail/cmd_rethread.go @@ -0,0 +1,65 @@ +package main + +import ( + "context" + "flag" + "fmt" + "log/slog" + "os" + + "github.com/archivmail/config" + "github.com/archivmail/internal/storage" +) + +// runRethread walks all emails without thread_id and sets thread relationships +// by parsing In-Reply-To / References headers. +// +// Usage: archivmail rethread [--config path] [--dry-run] +func runRethread(args []string) { + fset := flag.NewFlagSet("rethread", flag.ExitOnError) + configPath := fset.String("config", "/etc/archivmail/config.yml", "path to config file") + dryRun := fset.Bool("dry-run", false, "simulate without writing changes") + _ = fset.Parse(args) + + logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo})) + + cfg, err := config.Load(*configPath) + if err != nil { + fmt.Fprintf(os.Stderr, "error: load config: %v\n", err) + os.Exit(1) + } + + storeCfg := storage.Config{ + Dir: cfg.Storage.StorePath, + Keyfile: cfg.Storage.Keyfile, + DSN: cfg.Database.DSN(), + CompressEnabled: cfg.Storage.Compress, + } + mailStore, err := storage.New(storeCfg) + if err != nil { + fmt.Fprintf(os.Stderr, "error: storage init: %v\n", err) + os.Exit(1) + } + defer mailStore.Close() + + if *dryRun { + logger.Info("rethread: DRY-RUN — keine Änderungen werden gespeichert") + } + + stats, err := mailStore.Rethread(context.Background(), *dryRun, logger) + if err != nil { + fmt.Fprintf(os.Stderr, "error: rethread: %v\n", err) + os.Exit(1) + } + + logger.Info("rethread: abgeschlossen", + "total", stats.Total, + "updated", stats.Updated, + "skipped", stats.Skipped, + "errors", stats.Errors, + ) + + if stats.Errors > 0 { + os.Exit(1) + } +} diff --git a/cmd/archivmail/main.go b/cmd/archivmail/main.go index 922cdde..19fbcc8 100644 --- a/cmd/archivmail/main.go +++ b/cmd/archivmail/main.go @@ -60,6 +60,9 @@ func main() { case "recompress": runRecompress(os.Args[2:]) return + case "rethread": + runRethread(os.Args[2:]) + return case "version": fmt.Printf("archivmail %s\n", AppVersion) for mod, ver := range Modules { diff --git a/internal/storage/rethread.go b/internal/storage/rethread.go new file mode 100644 index 0000000..3315fc7 --- /dev/null +++ b/internal/storage/rethread.go @@ -0,0 +1,118 @@ +package storage + +import ( + "context" + "fmt" + "log/slog" + + "github.com/archivmail/pkg/mailparser" +) + +// RethreadStats holds counters from a Rethread run. +type RethreadStats struct { + Total int + Updated int // mails whose thread_id was set + Skipped int // mails already had thread_id or no message_id + Errors int +} + +// Rethread walks all emails without a thread_id (oldest first) and sets +// thread_id + in_reply_to by parsing each raw mail and resolving its thread. +// +// Mails are processed in ascending received_at order so that parent mails +// are resolved before their replies — this ensures children inherit the +// correct thread_id in a single pass. +// +// If dryRun is true, no DB updates are written. +func (s *Store) Rethread(ctx context.Context, dryRun bool, logger *slog.Logger) (RethreadStats, error) { + if s.db == nil { + return RethreadStats{}, fmt.Errorf("rethread: no database configured") + } + + // Load all mails without thread_id, oldest first. + rows, err := s.db.Query(ctx, ` + SELECT id FROM emails + WHERE thread_id IS NULL + ORDER BY received_at ASC + `) + if err != nil { + return RethreadStats{}, fmt.Errorf("rethread: query: %w", err) + } + var ids []string + for rows.Next() { + var id string + if err := rows.Scan(&id); err == nil { + ids = append(ids, id) + } + } + rows.Close() + + var stats RethreadStats + stats.Total = len(ids) + + logger.Info("rethread: starting", "total_mails", stats.Total, "dry_run", dryRun) + + for i, id := range ids { + select { + case <-ctx.Done(): + return stats, ctx.Err() + default: + } + + raw, err := s.Load(id) + if err != nil { + logger.Warn("rethread: load failed", "id", id, "err", err) + stats.Errors++ + continue + } + + pm, err := mailparser.Parse(raw) + if err != nil { + logger.Warn("rethread: parse failed", "id", id, "err", err) + stats.Errors++ + continue + } + + // No message-id and no reply headers → nothing to thread. + if pm.MessageID == "" && pm.InReplyTo == "" && len(pm.References) == 0 { + stats.Skipped++ + continue + } + + threadID := s.resolveThreadID(ctx, pm) + if threadID == "" { + stats.Skipped++ + continue + } + + if !dryRun { + var inReplyTo *string + if pm.InReplyTo != "" { + inReplyTo = &pm.InReplyTo + } + _, err = s.db.Exec(ctx, ` + UPDATE emails SET thread_id = $1, in_reply_to = $2 + WHERE id = $3 AND thread_id IS NULL + `, threadID, inReplyTo, id) + if err != nil { + logger.Warn("rethread: update failed", "id", id, "err", err) + stats.Errors++ + continue + } + } + + stats.Updated++ + + if (i+1)%500 == 0 { + logger.Info("rethread: progress", + "processed", i+1, + "total", stats.Total, + "updated", stats.Updated, + "skipped", stats.Skipped, + "errors", stats.Errors, + ) + } + } + + return stats, nil +}