799c828548
- scheduler.go: BUG-1 fix — preserve stored uid_validity when server returns 0 - scheduler.go: BUG-2 fix — replace inline switch with DecideResync() call - scheduler.go: SetAuditLogger wired; imap_uidvalidity_reset audit event - cmd_reindex.go: read existing attachment_text before IndexSync to prevent Manticore REPLACE INTO from wiping OCR text written by the OCR worker Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
496 lines
13 KiB
Go
496 lines
13 KiB
Go
// Package imap provides IMAP account management, import, and automatic sync.
|
|
package imap
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"log/slog"
|
|
"sync"
|
|
"time"
|
|
|
|
"archivmail/internal/audit"
|
|
|
|
imapv2 "github.com/emersion/go-imap/v2"
|
|
"github.com/emersion/go-imap/v2/imapclient"
|
|
)
|
|
|
|
// Scheduler runs automatic IMAP syncs for all configured accounts.
|
|
// It checks every minute whether a sync is due for a given account.
|
|
type Scheduler struct {
|
|
store *Store
|
|
importer *Importer
|
|
logger *slog.Logger
|
|
audlog *audit.Logger // PROJ-45: UIDVALIDITY-reset events are tenant-visible audit entries
|
|
|
|
mu sync.Mutex
|
|
running map[int64]bool // in-memory guard against concurrent syncs
|
|
|
|
cancel context.CancelFunc
|
|
}
|
|
|
|
// NewScheduler creates a Scheduler wired to the given store and importer.
|
|
func NewScheduler(store *Store, importer *Importer, logger *slog.Logger) *Scheduler {
|
|
return &Scheduler{
|
|
store: store,
|
|
importer: importer,
|
|
logger: logger,
|
|
running: make(map[int64]bool),
|
|
}
|
|
}
|
|
|
|
// SetAuditLogger wires an audit.Logger into the scheduler so that
|
|
// UIDVALIDITY-reset events (PROJ-45) are persisted as tenant-visible
|
|
// audit entries. Optional — when nil, only structured logs are emitted.
|
|
func (s *Scheduler) SetAuditLogger(a *audit.Logger) {
|
|
s.audlog = a
|
|
}
|
|
|
|
// Start launches the background scheduler goroutine.
|
|
// Call Stop to shut it down gracefully.
|
|
func (s *Scheduler) Start() {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
s.cancel = cancel
|
|
|
|
go s.loop(ctx)
|
|
s.logger.Info("imap scheduler: started")
|
|
}
|
|
|
|
// Stop signals the scheduler goroutine to exit.
|
|
func (s *Scheduler) Stop() {
|
|
if s.cancel != nil {
|
|
s.cancel()
|
|
}
|
|
s.logger.Info("imap scheduler: stopped")
|
|
}
|
|
|
|
// TriggerSync manually initiates an incremental sync for a single account.
|
|
// Returns immediately; the sync runs in a background goroutine.
|
|
// Returns an error if a sync is already running for this account.
|
|
func (s *Scheduler) TriggerSync(ctx context.Context, accountID int64) error {
|
|
s.mu.Lock()
|
|
if s.running[accountID] {
|
|
s.mu.Unlock()
|
|
return fmt.Errorf("imap scheduler: sync already running for account %d", accountID)
|
|
}
|
|
s.running[accountID] = true
|
|
s.mu.Unlock()
|
|
|
|
go s.runSyncWithRetry(context.Background(), accountID)
|
|
return nil
|
|
}
|
|
|
|
// loop is the main scheduler goroutine — ticks every minute.
|
|
func (s *Scheduler) loop(ctx context.Context) {
|
|
ticker := time.NewTicker(1 * time.Minute)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
s.checkAccounts(ctx)
|
|
}
|
|
}
|
|
}
|
|
|
|
// checkAccounts loads all accounts and starts syncs for those that are due.
|
|
func (s *Scheduler) checkAccounts(ctx context.Context) {
|
|
accounts, err := s.store.ListAll(ctx)
|
|
if err != nil {
|
|
s.logger.Error("imap scheduler: list accounts failed", "err", err)
|
|
return
|
|
}
|
|
|
|
now := time.Now()
|
|
for _, acc := range accounts {
|
|
if acc.SyncIntervalMin <= 0 {
|
|
continue
|
|
}
|
|
|
|
s.mu.Lock()
|
|
alreadyRunning := s.running[acc.ID]
|
|
s.mu.Unlock()
|
|
|
|
if alreadyRunning || acc.SyncRunning {
|
|
continue
|
|
}
|
|
|
|
interval := time.Duration(acc.SyncIntervalMin) * time.Minute
|
|
var lastSync time.Time
|
|
if acc.LastSyncAt != nil {
|
|
lastSync = *acc.LastSyncAt
|
|
}
|
|
|
|
if now.Sub(lastSync) >= interval {
|
|
s.mu.Lock()
|
|
s.running[acc.ID] = true
|
|
s.mu.Unlock()
|
|
|
|
s.logger.Info("imap scheduler: starting scheduled sync",
|
|
"account_id", acc.ID, "name", acc.Name)
|
|
go s.runSyncWithRetry(context.Background(), acc.ID)
|
|
}
|
|
}
|
|
}
|
|
|
|
// syncBackoff defines wait durations between retry attempts.
|
|
var syncBackoff = []time.Duration{1 * time.Second, 60 * time.Second, 300 * time.Second}
|
|
|
|
// runSyncWithRetry retries doSync up to 3 times with backoff.
|
|
// It always resets the running flag when it returns.
|
|
func (s *Scheduler) runSyncWithRetry(ctx context.Context, accountID int64) {
|
|
defer func() {
|
|
s.mu.Lock()
|
|
delete(s.running, accountID)
|
|
s.mu.Unlock()
|
|
_ = s.store.SetSyncRunning(ctx, accountID, false)
|
|
}()
|
|
|
|
if err := s.store.SetSyncRunning(ctx, accountID, true); err != nil {
|
|
s.logger.Error("imap scheduler: set sync running failed",
|
|
"account_id", accountID, "err", err)
|
|
return
|
|
}
|
|
|
|
var (
|
|
count int
|
|
lastUID uint32
|
|
lastErr error
|
|
)
|
|
|
|
startedAt := time.Now()
|
|
|
|
for attempt := 0; attempt < 3; attempt++ {
|
|
if attempt > 0 {
|
|
backoff := syncBackoff[attempt-1]
|
|
s.logger.Warn("imap scheduler: retrying sync",
|
|
"account_id", accountID, "attempt", attempt+1, "backoff", backoff)
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-time.After(backoff):
|
|
}
|
|
}
|
|
|
|
count, lastUID, lastErr = s.doSync(ctx, accountID)
|
|
if lastErr == nil {
|
|
break
|
|
}
|
|
|
|
s.logger.Warn("imap scheduler: sync attempt failed",
|
|
"account_id", accountID, "attempt", attempt+1, "err", lastErr)
|
|
}
|
|
|
|
durationMs := time.Since(startedAt).Milliseconds()
|
|
|
|
if lastErr != nil {
|
|
s.logger.Error("imap scheduler: sync failed after all retries",
|
|
"account_id", accountID, "err", lastErr, "duration_ms", durationMs)
|
|
_ = s.store.UpdateSyncResult(ctx, accountID, "error", lastErr.Error(), 0, lastUID)
|
|
return
|
|
}
|
|
|
|
s.logger.Info("imap scheduler: sync completed",
|
|
"account_id", accountID, "imported", count, "last_uid", lastUID,
|
|
"duration_ms", durationMs)
|
|
_ = s.store.UpdateSyncResult(ctx, accountID, "ok", "", count, lastUID)
|
|
}
|
|
|
|
// doSync performs an incremental IMAP sync for one account.
|
|
// Returns (importedCount, maxUID, error).
|
|
func (s *Scheduler) doSync(ctx context.Context, accountID int64) (int, uint32, error) {
|
|
acc, err := s.store.Get(ctx, accountID)
|
|
if err != nil {
|
|
return 0, 0, fmt.Errorf("imap scheduler: get account: %w", err)
|
|
}
|
|
|
|
password, err := s.store.GetPassword(ctx, accountID)
|
|
if err != nil {
|
|
return 0, 0, fmt.Errorf("imap scheduler: get password: %w", err)
|
|
}
|
|
|
|
c, err := Connect(acc.Host, acc.Port, acc.TLS)
|
|
if err != nil {
|
|
return 0, 0, fmt.Errorf("imap scheduler: connect: %w", err)
|
|
}
|
|
defer c.Close()
|
|
|
|
if err := c.Login(acc.Username, password).Wait(); err != nil {
|
|
return 0, 0, fmt.Errorf("imap scheduler: login: %w", err)
|
|
}
|
|
|
|
folders, err := ListFolders(c.Client)
|
|
if err != nil {
|
|
return 0, 0, fmt.Errorf("imap scheduler: list folders: %w", err)
|
|
}
|
|
|
|
excluded := make(map[string]bool, len(acc.ExcludedFolders))
|
|
for _, f := range acc.ExcludedFolders {
|
|
excluded[f] = true
|
|
}
|
|
|
|
log := s.logger.With("component", "imap-scheduler", "account_id", accountID)
|
|
|
|
var (
|
|
totalImported int
|
|
maxUID uint32
|
|
foldersSynced int
|
|
)
|
|
|
|
for _, folder := range folders {
|
|
if excluded[folder.Name] {
|
|
continue
|
|
}
|
|
|
|
count, folderMaxUID, err := s.syncFolder(ctx, c, acc, folder.Name, log)
|
|
if err != nil {
|
|
log.Warn("sync folder failed, continuing", "folder", folder.Name, "err", err)
|
|
continue
|
|
}
|
|
|
|
foldersSynced++
|
|
totalImported += count
|
|
if folderMaxUID > maxUID {
|
|
maxUID = folderMaxUID
|
|
}
|
|
}
|
|
|
|
// Account-level summary (PROJ-45 acceptance criterion).
|
|
// duration_ms is intentionally not added here — it's logged by runSyncWithRetry
|
|
// which owns the retry-wrapped wall clock.
|
|
log.Info("account sync complete",
|
|
"folders_synced", foldersSynced,
|
|
"new_messages_total", totalImported)
|
|
|
|
return totalImported, maxUID, nil
|
|
}
|
|
|
|
// syncFolder syncs new messages from a single IMAP folder.
|
|
//
|
|
// PROJ-45: Uses per-folder UID tracking from imap_folder_state instead of the
|
|
// legacy account-global imap_accounts.last_uid. UIDVALIDITY is read from the
|
|
// SELECT response and compared against the stored value — on mismatch the
|
|
// folder is fully resynced in this same iteration (PROJ-32 message-id dedup
|
|
// blocks duplicates).
|
|
func (s *Scheduler) syncFolder(
|
|
ctx context.Context,
|
|
c *Conn,
|
|
acc *Account,
|
|
folder string,
|
|
log *slog.Logger,
|
|
) (int, uint32, error) {
|
|
// Load per-folder state BEFORE selecting — we need the stored uid_validity
|
|
// to compare against the value the server reports in the SELECT response.
|
|
state, err := s.store.GetFolderState(ctx, acc.ID, folder)
|
|
if err != nil {
|
|
return 0, 0, fmt.Errorf("imap scheduler: get folder state %q: %w", folder, err)
|
|
}
|
|
|
|
selectData, err := c.Select(folder, nil).Wait()
|
|
if err != nil {
|
|
return 0, 0, fmt.Errorf("imap scheduler: select %q: %w", folder, err)
|
|
}
|
|
|
|
// Snapshot the server-reported UIDVALIDITY. A value of 0 is treated as
|
|
// "unknown / server bug" — we keep the existing high-water mark instead
|
|
// of forcing a needless full resync.
|
|
serverUIDValidity := uint32(0)
|
|
if selectData != nil {
|
|
serverUIDValidity = selectData.UIDValidity
|
|
}
|
|
|
|
// BUG-2 fix: use DecideResync instead of inline switch to keep the
|
|
// branching logic in one place and avoid drift with the unit-tested path.
|
|
decision := DecideResync(state.LastUID, state.UIDValidity, serverUIDValidity)
|
|
effectiveLastUID := decision.EffectiveStart
|
|
|
|
if serverUIDValidity == 0 {
|
|
log.Warn("uidvalidity zero from server, keeping stored last_uid",
|
|
"folder", folder, "stored_uid_validity", state.UIDValidity)
|
|
}
|
|
if decision.UIDValidReset {
|
|
// Mailbox was rebuilt server-side — UIDs restart at 1. We must
|
|
// resync the entire folder from scratch. PROJ-32 message-id
|
|
// deduplication prevents duplicate writes.
|
|
log.Warn("uidvalidity change",
|
|
"account_id", acc.ID,
|
|
"folder", folder,
|
|
"old", state.UIDValidity,
|
|
"new", serverUIDValidity)
|
|
|
|
if s.audlog != nil {
|
|
s.audlog.Log(audit.Entry{
|
|
EventType: "imap_uidvalidity_reset",
|
|
Username: acc.Owner,
|
|
Success: true,
|
|
Detail: fmt.Sprintf("account=%d folder=%q old_uidvalidity=%d new_uidvalidity=%d",
|
|
acc.ID, folder, state.UIDValidity, serverUIDValidity),
|
|
})
|
|
}
|
|
}
|
|
|
|
var uids []imapv2.UID
|
|
|
|
if effectiveLastUID > 0 {
|
|
// Incremental: only messages with UID > effectiveLastUID.
|
|
minUID := imapv2.UID(effectiveLastUID + 1)
|
|
criteria := &imapv2.SearchCriteria{
|
|
UID: []imapv2.UIDSet{
|
|
{imapv2.UIDRange{Start: minUID, Stop: 0}},
|
|
},
|
|
}
|
|
searchData, err := c.UIDSearch(criteria, nil).Wait()
|
|
if err != nil {
|
|
return 0, 0, fmt.Errorf("imap scheduler: uid search incremental %q: %w", folder, err)
|
|
}
|
|
uids = searchData.AllUIDs()
|
|
} else {
|
|
// First sync or post-UIDVALIDITY-reset: fetch everything.
|
|
searchData, err := c.UIDSearch(&imapv2.SearchCriteria{}, nil).Wait()
|
|
if err != nil {
|
|
return 0, 0, fmt.Errorf("imap scheduler: uid search full %q: %w", folder, err)
|
|
}
|
|
uids = searchData.AllUIDs()
|
|
}
|
|
|
|
var (
|
|
imported int
|
|
maxUID uint32 = effectiveLastUID
|
|
)
|
|
|
|
// BUG-1 fix: when the server returns UIDVALIDITY=0 (misbehaving server),
|
|
// keep the stored valid value so future mismatch detection still works.
|
|
// Writing 0 would silently disable the UIDVALIDITY guard for this folder.
|
|
persistedValidity := serverUIDValidity
|
|
if serverUIDValidity == 0 {
|
|
persistedValidity = state.UIDValidity
|
|
}
|
|
|
|
// PROJ-45: even when there are 0 new UIDs, persist the uid_validity
|
|
// so a later UIDVALIDITY change can still be detected.
|
|
defer func() {
|
|
newState := FolderState{
|
|
AccountID: acc.ID,
|
|
Folder: folder,
|
|
LastUID: maxUID,
|
|
UIDValidity: persistedValidity,
|
|
}
|
|
if err := s.store.UpsertFolderState(ctx, newState); err != nil {
|
|
log.Warn("imap scheduler: persist folder state failed",
|
|
"folder", folder, "err", err)
|
|
}
|
|
}()
|
|
|
|
if len(uids) == 0 {
|
|
log.Info("folder synced",
|
|
"folder", folder,
|
|
"new_messages", 0,
|
|
"total_messages", 0,
|
|
"uid_validity", serverUIDValidity)
|
|
return 0, maxUID, nil
|
|
}
|
|
|
|
for i := 0; i < len(uids); i += batchSize {
|
|
end := i + batchSize
|
|
if end > len(uids) {
|
|
end = len(uids)
|
|
}
|
|
batch := uids[i:end]
|
|
|
|
c.SetFetchDeadline()
|
|
count, batchMaxUID, err := s.fetchSyncBatch(c.Client, batch, acc.TenantID, log)
|
|
c.ClearDeadline()
|
|
if err != nil {
|
|
log.Warn("imap scheduler: batch error, continuing",
|
|
"folder", folder, "offset", i, "err", err)
|
|
continue
|
|
}
|
|
|
|
imported += count
|
|
if batchMaxUID > maxUID {
|
|
maxUID = batchMaxUID
|
|
}
|
|
}
|
|
|
|
log.Info("folder synced",
|
|
"folder", folder,
|
|
"new_messages", imported,
|
|
"total_messages", len(uids),
|
|
"uid_validity", serverUIDValidity)
|
|
|
|
return imported, maxUID, nil
|
|
}
|
|
|
|
// fetchSyncBatch fetches and stores a batch of messages by UID.
|
|
// The maximum UID is derived from the input uid slice (already known from SEARCH).
|
|
// Returns (importedCount, maxUID, error).
|
|
func (s *Scheduler) fetchSyncBatch(
|
|
c *imapclient.Client,
|
|
uids []imapv2.UID,
|
|
tenantID *int64,
|
|
log *slog.Logger,
|
|
) (int, uint32, error) {
|
|
if len(uids) == 0 {
|
|
return 0, 0, nil
|
|
}
|
|
|
|
// Compute the max UID from the search result — no need to parse it from FETCH.
|
|
var maxUID uint32
|
|
for _, u := range uids {
|
|
if uint32(u) > maxUID {
|
|
maxUID = uint32(u)
|
|
}
|
|
}
|
|
|
|
fetchOptions := &imapv2.FetchOptions{
|
|
BodySection: []*imapv2.FetchItemBodySection{{}},
|
|
}
|
|
|
|
seqSet := imapv2.UIDSetNum(uids...)
|
|
fetchCmd := c.Fetch(seqSet, fetchOptions)
|
|
|
|
imported := 0
|
|
|
|
for {
|
|
msg := fetchCmd.Next()
|
|
if msg == nil {
|
|
break
|
|
}
|
|
|
|
for {
|
|
item := msg.Next()
|
|
if item == nil {
|
|
break
|
|
}
|
|
|
|
body, ok := item.(imapclient.FetchItemDataBodySection)
|
|
if !ok {
|
|
continue
|
|
}
|
|
|
|
raw, err := io.ReadAll(body.Literal)
|
|
if err != nil {
|
|
log.Warn("imap scheduler: read body failed", "err", err)
|
|
continue
|
|
}
|
|
|
|
if len(raw) > 0 {
|
|
if err := s.importer.storeAndIndex(raw, tenantID, log); err != nil {
|
|
log.Warn("imap scheduler: store/index failed", "err", err)
|
|
} else {
|
|
imported++
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if err := fetchCmd.Close(); err != nil {
|
|
return imported, maxUID, fmt.Errorf("imap scheduler: fetch close: %w", err)
|
|
}
|
|
|
|
return imported, maxUID, nil
|
|
}
|