// Package imap provides IMAP account management, import, and automatic sync. package imap import ( "context" "fmt" "io" "log/slog" "sync" "time" "archivmail/internal/audit" imapv2 "github.com/emersion/go-imap/v2" "github.com/emersion/go-imap/v2/imapclient" ) // Scheduler runs automatic IMAP syncs for all configured accounts. // It checks every minute whether a sync is due for a given account. type Scheduler struct { store *Store importer *Importer logger *slog.Logger audlog *audit.Logger // PROJ-45: UIDVALIDITY-reset events are tenant-visible audit entries mu sync.Mutex running map[int64]bool // in-memory guard against concurrent syncs cancel context.CancelFunc } // NewScheduler creates a Scheduler wired to the given store and importer. func NewScheduler(store *Store, importer *Importer, logger *slog.Logger) *Scheduler { return &Scheduler{ store: store, importer: importer, logger: logger, running: make(map[int64]bool), } } // SetAuditLogger wires an audit.Logger into the scheduler so that // UIDVALIDITY-reset events (PROJ-45) are persisted as tenant-visible // audit entries. Optional — when nil, only structured logs are emitted. func (s *Scheduler) SetAuditLogger(a *audit.Logger) { s.audlog = a } // Start launches the background scheduler goroutine. // Call Stop to shut it down gracefully. func (s *Scheduler) Start() { ctx, cancel := context.WithCancel(context.Background()) s.cancel = cancel go s.loop(ctx) s.logger.Info("imap scheduler: started") } // Stop signals the scheduler goroutine to exit. func (s *Scheduler) Stop() { if s.cancel != nil { s.cancel() } s.logger.Info("imap scheduler: stopped") } // TriggerSync manually initiates an incremental sync for a single account. // Returns immediately; the sync runs in a background goroutine. // Returns an error if a sync is already running for this account. func (s *Scheduler) TriggerSync(ctx context.Context, accountID int64) error { s.mu.Lock() if s.running[accountID] { s.mu.Unlock() return fmt.Errorf("imap scheduler: sync already running for account %d", accountID) } s.running[accountID] = true s.mu.Unlock() go s.runSyncWithRetry(context.Background(), accountID) return nil } // loop is the main scheduler goroutine — ticks every minute. func (s *Scheduler) loop(ctx context.Context) { ticker := time.NewTicker(1 * time.Minute) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: s.checkAccounts(ctx) } } } // checkAccounts loads all accounts and starts syncs for those that are due. func (s *Scheduler) checkAccounts(ctx context.Context) { accounts, err := s.store.ListAll(ctx) if err != nil { s.logger.Error("imap scheduler: list accounts failed", "err", err) return } now := time.Now() for _, acc := range accounts { if acc.SyncIntervalMin <= 0 { continue } s.mu.Lock() alreadyRunning := s.running[acc.ID] s.mu.Unlock() if alreadyRunning || acc.SyncRunning { continue } interval := time.Duration(acc.SyncIntervalMin) * time.Minute var lastSync time.Time if acc.LastSyncAt != nil { lastSync = *acc.LastSyncAt } if now.Sub(lastSync) >= interval { s.mu.Lock() s.running[acc.ID] = true s.mu.Unlock() s.logger.Info("imap scheduler: starting scheduled sync", "account_id", acc.ID, "name", acc.Name) go s.runSyncWithRetry(context.Background(), acc.ID) } } } // syncBackoff defines wait durations between retry attempts. var syncBackoff = []time.Duration{1 * time.Second, 60 * time.Second, 300 * time.Second} // runSyncWithRetry retries doSync up to 3 times with backoff. // It always resets the running flag when it returns. func (s *Scheduler) runSyncWithRetry(ctx context.Context, accountID int64) { defer func() { s.mu.Lock() delete(s.running, accountID) s.mu.Unlock() _ = s.store.SetSyncRunning(ctx, accountID, false) }() if err := s.store.SetSyncRunning(ctx, accountID, true); err != nil { s.logger.Error("imap scheduler: set sync running failed", "account_id", accountID, "err", err) return } var ( count int lastUID uint32 lastErr error ) startedAt := time.Now() for attempt := 0; attempt < 3; attempt++ { if attempt > 0 { backoff := syncBackoff[attempt-1] s.logger.Warn("imap scheduler: retrying sync", "account_id", accountID, "attempt", attempt+1, "backoff", backoff) select { case <-ctx.Done(): return case <-time.After(backoff): } } count, lastUID, lastErr = s.doSync(ctx, accountID) if lastErr == nil { break } s.logger.Warn("imap scheduler: sync attempt failed", "account_id", accountID, "attempt", attempt+1, "err", lastErr) } durationMs := time.Since(startedAt).Milliseconds() if lastErr != nil { s.logger.Error("imap scheduler: sync failed after all retries", "account_id", accountID, "err", lastErr, "duration_ms", durationMs) _ = s.store.UpdateSyncResult(ctx, accountID, "error", lastErr.Error(), 0, lastUID) return } s.logger.Info("imap scheduler: sync completed", "account_id", accountID, "imported", count, "last_uid", lastUID, "duration_ms", durationMs) _ = s.store.UpdateSyncResult(ctx, accountID, "ok", "", count, lastUID) } // doSync performs an incremental IMAP sync for one account. // Returns (importedCount, maxUID, error). func (s *Scheduler) doSync(ctx context.Context, accountID int64) (int, uint32, error) { acc, err := s.store.Get(ctx, accountID) if err != nil { return 0, 0, fmt.Errorf("imap scheduler: get account: %w", err) } password, err := s.store.GetPassword(ctx, accountID) if err != nil { return 0, 0, fmt.Errorf("imap scheduler: get password: %w", err) } c, err := Connect(acc.Host, acc.Port, acc.TLS) if err != nil { return 0, 0, fmt.Errorf("imap scheduler: connect: %w", err) } defer c.Close() if err := c.Login(acc.Username, password).Wait(); err != nil { return 0, 0, fmt.Errorf("imap scheduler: login: %w", err) } folders, err := ListFolders(c.Client) if err != nil { return 0, 0, fmt.Errorf("imap scheduler: list folders: %w", err) } excluded := make(map[string]bool, len(acc.ExcludedFolders)) for _, f := range acc.ExcludedFolders { excluded[f] = true } log := s.logger.With("component", "imap-scheduler", "account_id", accountID) var ( totalImported int maxUID uint32 foldersSynced int ) for _, folder := range folders { if excluded[folder.Name] { continue } count, folderMaxUID, err := s.syncFolder(ctx, c, acc, folder.Name, log) if err != nil { log.Warn("sync folder failed, continuing", "folder", folder.Name, "err", err) continue } foldersSynced++ totalImported += count if folderMaxUID > maxUID { maxUID = folderMaxUID } } // Account-level summary (PROJ-45 acceptance criterion). // duration_ms is intentionally not added here — it's logged by runSyncWithRetry // which owns the retry-wrapped wall clock. log.Info("account sync complete", "folders_synced", foldersSynced, "new_messages_total", totalImported) return totalImported, maxUID, nil } // syncFolder syncs new messages from a single IMAP folder. // // PROJ-45: Uses per-folder UID tracking from imap_folder_state instead of the // legacy account-global imap_accounts.last_uid. UIDVALIDITY is read from the // SELECT response and compared against the stored value — on mismatch the // folder is fully resynced in this same iteration (PROJ-32 message-id dedup // blocks duplicates). func (s *Scheduler) syncFolder( ctx context.Context, c *Conn, acc *Account, folder string, log *slog.Logger, ) (int, uint32, error) { // Load per-folder state BEFORE selecting — we need the stored uid_validity // to compare against the value the server reports in the SELECT response. state, err := s.store.GetFolderState(ctx, acc.ID, folder) if err != nil { return 0, 0, fmt.Errorf("imap scheduler: get folder state %q: %w", folder, err) } selectData, err := c.Select(folder, nil).Wait() if err != nil { return 0, 0, fmt.Errorf("imap scheduler: select %q: %w", folder, err) } // Snapshot the server-reported UIDVALIDITY. A value of 0 is treated as // "unknown / server bug" — we keep the existing high-water mark instead // of forcing a needless full resync. serverUIDValidity := uint32(0) if selectData != nil { serverUIDValidity = selectData.UIDValidity } effectiveLastUID := state.LastUID switch { case serverUIDValidity == 0: log.Warn("uidvalidity zero from server, keeping stored last_uid", "folder", folder, "stored_uid_validity", state.UIDValidity) case state.UIDValidity != 0 && state.UIDValidity != serverUIDValidity: // Mailbox was rebuilt server-side — UIDs restart at 1. We must // resync the entire folder from scratch. PROJ-32 message-id // deduplication prevents duplicate writes. log.Warn("uidvalidity change", "account_id", acc.ID, "folder", folder, "old", state.UIDValidity, "new", serverUIDValidity) if s.audlog != nil { s.audlog.Log(audit.Entry{ EventType: "imap_uidvalidity_reset", Username: acc.Owner, Success: true, Detail: fmt.Sprintf("account=%d folder=%q old_uidvalidity=%d new_uidvalidity=%d", acc.ID, folder, state.UIDValidity, serverUIDValidity), }) } effectiveLastUID = 0 } var uids []imapv2.UID if effectiveLastUID > 0 { // Incremental: only messages with UID > effectiveLastUID. minUID := imapv2.UID(effectiveLastUID + 1) criteria := &imapv2.SearchCriteria{ UID: []imapv2.UIDSet{ {imapv2.UIDRange{Start: minUID, Stop: 0}}, }, } searchData, err := c.UIDSearch(criteria, nil).Wait() if err != nil { return 0, 0, fmt.Errorf("imap scheduler: uid search incremental %q: %w", folder, err) } uids = searchData.AllUIDs() } else { // First sync or post-UIDVALIDITY-reset: fetch everything. searchData, err := c.UIDSearch(&imapv2.SearchCriteria{}, nil).Wait() if err != nil { return 0, 0, fmt.Errorf("imap scheduler: uid search full %q: %w", folder, err) } uids = searchData.AllUIDs() } var ( imported int maxUID uint32 = effectiveLastUID ) // PROJ-45: even when there are 0 new UIDs, persist the (possibly new) // uid_validity so a later UIDVALIDITY change can still be detected. defer func() { newState := FolderState{ AccountID: acc.ID, Folder: folder, LastUID: maxUID, UIDValidity: serverUIDValidity, } if err := s.store.UpsertFolderState(ctx, newState); err != nil { log.Warn("imap scheduler: persist folder state failed", "folder", folder, "err", err) } }() if len(uids) == 0 { log.Info("folder synced", "folder", folder, "new_messages", 0, "total_messages", 0, "uid_validity", serverUIDValidity) return 0, maxUID, nil } for i := 0; i < len(uids); i += batchSize { end := i + batchSize if end > len(uids) { end = len(uids) } batch := uids[i:end] c.SetFetchDeadline() count, batchMaxUID, err := s.fetchSyncBatch(c.Client, batch, acc.TenantID, log) c.ClearDeadline() if err != nil { log.Warn("imap scheduler: batch error, continuing", "folder", folder, "offset", i, "err", err) continue } imported += count if batchMaxUID > maxUID { maxUID = batchMaxUID } } log.Info("folder synced", "folder", folder, "new_messages", imported, "total_messages", len(uids), "uid_validity", serverUIDValidity) return imported, maxUID, nil } // fetchSyncBatch fetches and stores a batch of messages by UID. // The maximum UID is derived from the input uid slice (already known from SEARCH). // Returns (importedCount, maxUID, error). func (s *Scheduler) fetchSyncBatch( c *imapclient.Client, uids []imapv2.UID, tenantID *int64, log *slog.Logger, ) (int, uint32, error) { if len(uids) == 0 { return 0, 0, nil } // Compute the max UID from the search result — no need to parse it from FETCH. var maxUID uint32 for _, u := range uids { if uint32(u) > maxUID { maxUID = uint32(u) } } fetchOptions := &imapv2.FetchOptions{ BodySection: []*imapv2.FetchItemBodySection{{}}, } seqSet := imapv2.UIDSetNum(uids...) fetchCmd := c.Fetch(seqSet, fetchOptions) imported := 0 for { msg := fetchCmd.Next() if msg == nil { break } for { item := msg.Next() if item == nil { break } body, ok := item.(imapclient.FetchItemDataBodySection) if !ok { continue } raw, err := io.ReadAll(body.Literal) if err != nil { log.Warn("imap scheduler: read body failed", "err", err) continue } if len(raw) > 0 { if err := s.importer.storeAndIndex(raw, tenantID, log); err != nil { log.Warn("imap scheduler: store/index failed", "err", err) } else { imported++ } } } } if err := fetchCmd.Close(); err != nil { return imported, maxUID, fmt.Errorf("imap scheduler: fetch close: %w", err) } return imported, maxUID, nil }