Files
sysops 799c828548 feat(PROJ-45): IMAP per-folder UID-tracking, UIDVALIDITY-check + reindex OCR protection
- scheduler.go: BUG-1 fix — preserve stored uid_validity when server returns 0
- scheduler.go: BUG-2 fix — replace inline switch with DecideResync() call
- scheduler.go: SetAuditLogger wired; imap_uidvalidity_reset audit event
- cmd_reindex.go: read existing attachment_text before IndexSync to prevent
  Manticore REPLACE INTO from wiping OCR text written by the OCR worker

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-11 14:56:28 +02:00

496 lines
13 KiB
Go

// Package imap provides IMAP account management, import, and automatic sync.
package imap
import (
"context"
"fmt"
"io"
"log/slog"
"sync"
"time"
"archivmail/internal/audit"
imapv2 "github.com/emersion/go-imap/v2"
"github.com/emersion/go-imap/v2/imapclient"
)
// Scheduler runs automatic IMAP syncs for all configured accounts.
// It checks every minute whether a sync is due for a given account.
type Scheduler struct {
store *Store
importer *Importer
logger *slog.Logger
audlog *audit.Logger // PROJ-45: UIDVALIDITY-reset events are tenant-visible audit entries
mu sync.Mutex
running map[int64]bool // in-memory guard against concurrent syncs
cancel context.CancelFunc
}
// NewScheduler creates a Scheduler wired to the given store and importer.
func NewScheduler(store *Store, importer *Importer, logger *slog.Logger) *Scheduler {
return &Scheduler{
store: store,
importer: importer,
logger: logger,
running: make(map[int64]bool),
}
}
// SetAuditLogger wires an audit.Logger into the scheduler so that
// UIDVALIDITY-reset events (PROJ-45) are persisted as tenant-visible
// audit entries. Optional — when nil, only structured logs are emitted.
func (s *Scheduler) SetAuditLogger(a *audit.Logger) {
s.audlog = a
}
// Start launches the background scheduler goroutine.
// Call Stop to shut it down gracefully.
func (s *Scheduler) Start() {
ctx, cancel := context.WithCancel(context.Background())
s.cancel = cancel
go s.loop(ctx)
s.logger.Info("imap scheduler: started")
}
// Stop signals the scheduler goroutine to exit.
func (s *Scheduler) Stop() {
if s.cancel != nil {
s.cancel()
}
s.logger.Info("imap scheduler: stopped")
}
// TriggerSync manually initiates an incremental sync for a single account.
// Returns immediately; the sync runs in a background goroutine.
// Returns an error if a sync is already running for this account.
func (s *Scheduler) TriggerSync(ctx context.Context, accountID int64) error {
s.mu.Lock()
if s.running[accountID] {
s.mu.Unlock()
return fmt.Errorf("imap scheduler: sync already running for account %d", accountID)
}
s.running[accountID] = true
s.mu.Unlock()
go s.runSyncWithRetry(context.Background(), accountID)
return nil
}
// loop is the main scheduler goroutine — ticks every minute.
func (s *Scheduler) loop(ctx context.Context) {
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
s.checkAccounts(ctx)
}
}
}
// checkAccounts loads all accounts and starts syncs for those that are due.
func (s *Scheduler) checkAccounts(ctx context.Context) {
accounts, err := s.store.ListAll(ctx)
if err != nil {
s.logger.Error("imap scheduler: list accounts failed", "err", err)
return
}
now := time.Now()
for _, acc := range accounts {
if acc.SyncIntervalMin <= 0 {
continue
}
s.mu.Lock()
alreadyRunning := s.running[acc.ID]
s.mu.Unlock()
if alreadyRunning || acc.SyncRunning {
continue
}
interval := time.Duration(acc.SyncIntervalMin) * time.Minute
var lastSync time.Time
if acc.LastSyncAt != nil {
lastSync = *acc.LastSyncAt
}
if now.Sub(lastSync) >= interval {
s.mu.Lock()
s.running[acc.ID] = true
s.mu.Unlock()
s.logger.Info("imap scheduler: starting scheduled sync",
"account_id", acc.ID, "name", acc.Name)
go s.runSyncWithRetry(context.Background(), acc.ID)
}
}
}
// syncBackoff defines wait durations between retry attempts.
var syncBackoff = []time.Duration{1 * time.Second, 60 * time.Second, 300 * time.Second}
// runSyncWithRetry retries doSync up to 3 times with backoff.
// It always resets the running flag when it returns.
func (s *Scheduler) runSyncWithRetry(ctx context.Context, accountID int64) {
defer func() {
s.mu.Lock()
delete(s.running, accountID)
s.mu.Unlock()
_ = s.store.SetSyncRunning(ctx, accountID, false)
}()
if err := s.store.SetSyncRunning(ctx, accountID, true); err != nil {
s.logger.Error("imap scheduler: set sync running failed",
"account_id", accountID, "err", err)
return
}
var (
count int
lastUID uint32
lastErr error
)
startedAt := time.Now()
for attempt := 0; attempt < 3; attempt++ {
if attempt > 0 {
backoff := syncBackoff[attempt-1]
s.logger.Warn("imap scheduler: retrying sync",
"account_id", accountID, "attempt", attempt+1, "backoff", backoff)
select {
case <-ctx.Done():
return
case <-time.After(backoff):
}
}
count, lastUID, lastErr = s.doSync(ctx, accountID)
if lastErr == nil {
break
}
s.logger.Warn("imap scheduler: sync attempt failed",
"account_id", accountID, "attempt", attempt+1, "err", lastErr)
}
durationMs := time.Since(startedAt).Milliseconds()
if lastErr != nil {
s.logger.Error("imap scheduler: sync failed after all retries",
"account_id", accountID, "err", lastErr, "duration_ms", durationMs)
_ = s.store.UpdateSyncResult(ctx, accountID, "error", lastErr.Error(), 0, lastUID)
return
}
s.logger.Info("imap scheduler: sync completed",
"account_id", accountID, "imported", count, "last_uid", lastUID,
"duration_ms", durationMs)
_ = s.store.UpdateSyncResult(ctx, accountID, "ok", "", count, lastUID)
}
// doSync performs an incremental IMAP sync for one account.
// Returns (importedCount, maxUID, error).
func (s *Scheduler) doSync(ctx context.Context, accountID int64) (int, uint32, error) {
acc, err := s.store.Get(ctx, accountID)
if err != nil {
return 0, 0, fmt.Errorf("imap scheduler: get account: %w", err)
}
password, err := s.store.GetPassword(ctx, accountID)
if err != nil {
return 0, 0, fmt.Errorf("imap scheduler: get password: %w", err)
}
c, err := Connect(acc.Host, acc.Port, acc.TLS)
if err != nil {
return 0, 0, fmt.Errorf("imap scheduler: connect: %w", err)
}
defer c.Close()
if err := c.Login(acc.Username, password).Wait(); err != nil {
return 0, 0, fmt.Errorf("imap scheduler: login: %w", err)
}
folders, err := ListFolders(c.Client)
if err != nil {
return 0, 0, fmt.Errorf("imap scheduler: list folders: %w", err)
}
excluded := make(map[string]bool, len(acc.ExcludedFolders))
for _, f := range acc.ExcludedFolders {
excluded[f] = true
}
log := s.logger.With("component", "imap-scheduler", "account_id", accountID)
var (
totalImported int
maxUID uint32
foldersSynced int
)
for _, folder := range folders {
if excluded[folder.Name] {
continue
}
count, folderMaxUID, err := s.syncFolder(ctx, c, acc, folder.Name, log)
if err != nil {
log.Warn("sync folder failed, continuing", "folder", folder.Name, "err", err)
continue
}
foldersSynced++
totalImported += count
if folderMaxUID > maxUID {
maxUID = folderMaxUID
}
}
// Account-level summary (PROJ-45 acceptance criterion).
// duration_ms is intentionally not added here — it's logged by runSyncWithRetry
// which owns the retry-wrapped wall clock.
log.Info("account sync complete",
"folders_synced", foldersSynced,
"new_messages_total", totalImported)
return totalImported, maxUID, nil
}
// syncFolder syncs new messages from a single IMAP folder.
//
// PROJ-45: Uses per-folder UID tracking from imap_folder_state instead of the
// legacy account-global imap_accounts.last_uid. UIDVALIDITY is read from the
// SELECT response and compared against the stored value — on mismatch the
// folder is fully resynced in this same iteration (PROJ-32 message-id dedup
// blocks duplicates).
func (s *Scheduler) syncFolder(
ctx context.Context,
c *Conn,
acc *Account,
folder string,
log *slog.Logger,
) (int, uint32, error) {
// Load per-folder state BEFORE selecting — we need the stored uid_validity
// to compare against the value the server reports in the SELECT response.
state, err := s.store.GetFolderState(ctx, acc.ID, folder)
if err != nil {
return 0, 0, fmt.Errorf("imap scheduler: get folder state %q: %w", folder, err)
}
selectData, err := c.Select(folder, nil).Wait()
if err != nil {
return 0, 0, fmt.Errorf("imap scheduler: select %q: %w", folder, err)
}
// Snapshot the server-reported UIDVALIDITY. A value of 0 is treated as
// "unknown / server bug" — we keep the existing high-water mark instead
// of forcing a needless full resync.
serverUIDValidity := uint32(0)
if selectData != nil {
serverUIDValidity = selectData.UIDValidity
}
// BUG-2 fix: use DecideResync instead of inline switch to keep the
// branching logic in one place and avoid drift with the unit-tested path.
decision := DecideResync(state.LastUID, state.UIDValidity, serverUIDValidity)
effectiveLastUID := decision.EffectiveStart
if serverUIDValidity == 0 {
log.Warn("uidvalidity zero from server, keeping stored last_uid",
"folder", folder, "stored_uid_validity", state.UIDValidity)
}
if decision.UIDValidReset {
// Mailbox was rebuilt server-side — UIDs restart at 1. We must
// resync the entire folder from scratch. PROJ-32 message-id
// deduplication prevents duplicate writes.
log.Warn("uidvalidity change",
"account_id", acc.ID,
"folder", folder,
"old", state.UIDValidity,
"new", serverUIDValidity)
if s.audlog != nil {
s.audlog.Log(audit.Entry{
EventType: "imap_uidvalidity_reset",
Username: acc.Owner,
Success: true,
Detail: fmt.Sprintf("account=%d folder=%q old_uidvalidity=%d new_uidvalidity=%d",
acc.ID, folder, state.UIDValidity, serverUIDValidity),
})
}
}
var uids []imapv2.UID
if effectiveLastUID > 0 {
// Incremental: only messages with UID > effectiveLastUID.
minUID := imapv2.UID(effectiveLastUID + 1)
criteria := &imapv2.SearchCriteria{
UID: []imapv2.UIDSet{
{imapv2.UIDRange{Start: minUID, Stop: 0}},
},
}
searchData, err := c.UIDSearch(criteria, nil).Wait()
if err != nil {
return 0, 0, fmt.Errorf("imap scheduler: uid search incremental %q: %w", folder, err)
}
uids = searchData.AllUIDs()
} else {
// First sync or post-UIDVALIDITY-reset: fetch everything.
searchData, err := c.UIDSearch(&imapv2.SearchCriteria{}, nil).Wait()
if err != nil {
return 0, 0, fmt.Errorf("imap scheduler: uid search full %q: %w", folder, err)
}
uids = searchData.AllUIDs()
}
var (
imported int
maxUID uint32 = effectiveLastUID
)
// BUG-1 fix: when the server returns UIDVALIDITY=0 (misbehaving server),
// keep the stored valid value so future mismatch detection still works.
// Writing 0 would silently disable the UIDVALIDITY guard for this folder.
persistedValidity := serverUIDValidity
if serverUIDValidity == 0 {
persistedValidity = state.UIDValidity
}
// PROJ-45: even when there are 0 new UIDs, persist the uid_validity
// so a later UIDVALIDITY change can still be detected.
defer func() {
newState := FolderState{
AccountID: acc.ID,
Folder: folder,
LastUID: maxUID,
UIDValidity: persistedValidity,
}
if err := s.store.UpsertFolderState(ctx, newState); err != nil {
log.Warn("imap scheduler: persist folder state failed",
"folder", folder, "err", err)
}
}()
if len(uids) == 0 {
log.Info("folder synced",
"folder", folder,
"new_messages", 0,
"total_messages", 0,
"uid_validity", serverUIDValidity)
return 0, maxUID, nil
}
for i := 0; i < len(uids); i += batchSize {
end := i + batchSize
if end > len(uids) {
end = len(uids)
}
batch := uids[i:end]
c.SetFetchDeadline()
count, batchMaxUID, err := s.fetchSyncBatch(c.Client, batch, acc.TenantID, log)
c.ClearDeadline()
if err != nil {
log.Warn("imap scheduler: batch error, continuing",
"folder", folder, "offset", i, "err", err)
continue
}
imported += count
if batchMaxUID > maxUID {
maxUID = batchMaxUID
}
}
log.Info("folder synced",
"folder", folder,
"new_messages", imported,
"total_messages", len(uids),
"uid_validity", serverUIDValidity)
return imported, maxUID, nil
}
// fetchSyncBatch fetches and stores a batch of messages by UID.
// The maximum UID is derived from the input uid slice (already known from SEARCH).
// Returns (importedCount, maxUID, error).
func (s *Scheduler) fetchSyncBatch(
c *imapclient.Client,
uids []imapv2.UID,
tenantID *int64,
log *slog.Logger,
) (int, uint32, error) {
if len(uids) == 0 {
return 0, 0, nil
}
// Compute the max UID from the search result — no need to parse it from FETCH.
var maxUID uint32
for _, u := range uids {
if uint32(u) > maxUID {
maxUID = uint32(u)
}
}
fetchOptions := &imapv2.FetchOptions{
BodySection: []*imapv2.FetchItemBodySection{{}},
}
seqSet := imapv2.UIDSetNum(uids...)
fetchCmd := c.Fetch(seqSet, fetchOptions)
imported := 0
for {
msg := fetchCmd.Next()
if msg == nil {
break
}
for {
item := msg.Next()
if item == nil {
break
}
body, ok := item.(imapclient.FetchItemDataBodySection)
if !ok {
continue
}
raw, err := io.ReadAll(body.Literal)
if err != nil {
log.Warn("imap scheduler: read body failed", "err", err)
continue
}
if len(raw) > 0 {
if err := s.importer.storeAndIndex(raw, tenantID, log); err != nil {
log.Warn("imap scheduler: store/index failed", "err", err)
} else {
imported++
}
}
}
}
if err := fetchCmd.Close(); err != nil {
return imported, maxUID, fmt.Errorf("imap scheduler: fetch close: %w", err)
}
return imported, maxUID, nil
}