Files
sysops 2bab61209c chore: Modulname github.com/archivmail → archivmail
Go-Modul in go.mod und allen 45 Go-Dateien umbenannt.
2026-04-05 20:37:35 +02:00

119 lines
2.7 KiB
Go

package storage
import (
"context"
"fmt"
"log/slog"
"archivmail/pkg/mailparser"
)
// RethreadStats holds counters from a Rethread run.
type RethreadStats struct {
Total int
Updated int // mails whose thread_id was set
Skipped int // mails already had thread_id or no message_id
Errors int
}
// Rethread walks all emails without a thread_id (oldest first) and sets
// thread_id + in_reply_to by parsing each raw mail and resolving its thread.
//
// Mails are processed in ascending received_at order so that parent mails
// are resolved before their replies — this ensures children inherit the
// correct thread_id in a single pass.
//
// If dryRun is true, no DB updates are written.
func (s *Store) Rethread(ctx context.Context, dryRun bool, logger *slog.Logger) (RethreadStats, error) {
if s.db == nil {
return RethreadStats{}, fmt.Errorf("rethread: no database configured")
}
// Load all mails without thread_id, oldest first.
rows, err := s.db.Query(ctx, `
SELECT id FROM emails
WHERE thread_id IS NULL
ORDER BY received_at ASC
`)
if err != nil {
return RethreadStats{}, fmt.Errorf("rethread: query: %w", err)
}
var ids []string
for rows.Next() {
var id string
if err := rows.Scan(&id); err == nil {
ids = append(ids, id)
}
}
rows.Close()
var stats RethreadStats
stats.Total = len(ids)
logger.Info("rethread: starting", "total_mails", stats.Total, "dry_run", dryRun)
for i, id := range ids {
select {
case <-ctx.Done():
return stats, ctx.Err()
default:
}
raw, err := s.Load(id)
if err != nil {
logger.Warn("rethread: load failed", "id", id, "err", err)
stats.Errors++
continue
}
pm, err := mailparser.Parse(raw)
if err != nil {
logger.Warn("rethread: parse failed", "id", id, "err", err)
stats.Errors++
continue
}
// No message-id and no reply headers → nothing to thread.
if pm.MessageID == "" && pm.InReplyTo == "" && len(pm.References) == 0 {
stats.Skipped++
continue
}
threadID := s.resolveThreadID(ctx, pm)
if threadID == "" {
stats.Skipped++
continue
}
if !dryRun {
var inReplyTo *string
if pm.InReplyTo != "" {
inReplyTo = &pm.InReplyTo
}
_, err = s.db.Exec(ctx, `
UPDATE emails SET thread_id = $1, in_reply_to = $2
WHERE id = $3 AND thread_id IS NULL
`, threadID, inReplyTo, id)
if err != nil {
logger.Warn("rethread: update failed", "id", id, "err", err)
stats.Errors++
continue
}
}
stats.Updated++
if (i+1)%500 == 0 {
logger.Info("rethread: progress",
"processed", i+1,
"total", stats.Total,
"updated", stats.Updated,
"skipped", stats.Skipped,
"errors", stats.Errors,
)
}
}
return stats, nil
}