feat(PROJ-38): rethread — rückwirkendes Mail-Threading
This commit is contained in:
@@ -0,0 +1,118 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
|
||||
"github.com/archivmail/pkg/mailparser"
|
||||
)
|
||||
|
||||
// RethreadStats holds counters from a Rethread run.
|
||||
type RethreadStats struct {
|
||||
Total int
|
||||
Updated int // mails whose thread_id was set
|
||||
Skipped int // mails already had thread_id or no message_id
|
||||
Errors int
|
||||
}
|
||||
|
||||
// Rethread walks all emails without a thread_id (oldest first) and sets
|
||||
// thread_id + in_reply_to by parsing each raw mail and resolving its thread.
|
||||
//
|
||||
// Mails are processed in ascending received_at order so that parent mails
|
||||
// are resolved before their replies — this ensures children inherit the
|
||||
// correct thread_id in a single pass.
|
||||
//
|
||||
// If dryRun is true, no DB updates are written.
|
||||
func (s *Store) Rethread(ctx context.Context, dryRun bool, logger *slog.Logger) (RethreadStats, error) {
|
||||
if s.db == nil {
|
||||
return RethreadStats{}, fmt.Errorf("rethread: no database configured")
|
||||
}
|
||||
|
||||
// Load all mails without thread_id, oldest first.
|
||||
rows, err := s.db.Query(ctx, `
|
||||
SELECT id FROM emails
|
||||
WHERE thread_id IS NULL
|
||||
ORDER BY received_at ASC
|
||||
`)
|
||||
if err != nil {
|
||||
return RethreadStats{}, fmt.Errorf("rethread: query: %w", err)
|
||||
}
|
||||
var ids []string
|
||||
for rows.Next() {
|
||||
var id string
|
||||
if err := rows.Scan(&id); err == nil {
|
||||
ids = append(ids, id)
|
||||
}
|
||||
}
|
||||
rows.Close()
|
||||
|
||||
var stats RethreadStats
|
||||
stats.Total = len(ids)
|
||||
|
||||
logger.Info("rethread: starting", "total_mails", stats.Total, "dry_run", dryRun)
|
||||
|
||||
for i, id := range ids {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return stats, ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
raw, err := s.Load(id)
|
||||
if err != nil {
|
||||
logger.Warn("rethread: load failed", "id", id, "err", err)
|
||||
stats.Errors++
|
||||
continue
|
||||
}
|
||||
|
||||
pm, err := mailparser.Parse(raw)
|
||||
if err != nil {
|
||||
logger.Warn("rethread: parse failed", "id", id, "err", err)
|
||||
stats.Errors++
|
||||
continue
|
||||
}
|
||||
|
||||
// No message-id and no reply headers → nothing to thread.
|
||||
if pm.MessageID == "" && pm.InReplyTo == "" && len(pm.References) == 0 {
|
||||
stats.Skipped++
|
||||
continue
|
||||
}
|
||||
|
||||
threadID := s.resolveThreadID(ctx, pm)
|
||||
if threadID == "" {
|
||||
stats.Skipped++
|
||||
continue
|
||||
}
|
||||
|
||||
if !dryRun {
|
||||
var inReplyTo *string
|
||||
if pm.InReplyTo != "" {
|
||||
inReplyTo = &pm.InReplyTo
|
||||
}
|
||||
_, err = s.db.Exec(ctx, `
|
||||
UPDATE emails SET thread_id = $1, in_reply_to = $2
|
||||
WHERE id = $3 AND thread_id IS NULL
|
||||
`, threadID, inReplyTo, id)
|
||||
if err != nil {
|
||||
logger.Warn("rethread: update failed", "id", id, "err", err)
|
||||
stats.Errors++
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
stats.Updated++
|
||||
|
||||
if (i+1)%500 == 0 {
|
||||
logger.Info("rethread: progress",
|
||||
"processed", i+1,
|
||||
"total", stats.Total,
|
||||
"updated", stats.Updated,
|
||||
"skipped", stats.Skipped,
|
||||
"errors", stats.Errors,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
return stats, nil
|
||||
}
|
||||
Reference in New Issue
Block a user