diff --git a/cmd/archivmail/main.go b/cmd/archivmail/main.go index d809d0b..a700fd9 100644 --- a/cmd/archivmail/main.go +++ b/cmd/archivmail/main.go @@ -421,6 +421,7 @@ func main() { }) } imapSched := imapstore.NewScheduler(imapSt, imapImp, logger) + imapSched.SetAuditLogger(audlog) // PROJ-45: tenant-visible UIDVALIDITY-reset audit entries imapSched.Start() defer imapSched.Stop() srv.SetImap(imapSt, imapImp, imapSched) diff --git a/internal/imap/folder_state.go b/internal/imap/folder_state.go new file mode 100644 index 0000000..07be7de --- /dev/null +++ b/internal/imap/folder_state.go @@ -0,0 +1,122 @@ +package imap + +import ( + "context" + "errors" + "fmt" + + "github.com/jackc/pgx/v5" +) + +// FolderState represents per-folder UID tracking for an IMAP account (PROJ-45). +// Each IMAP folder has its own independent UID sequence — tracking a single +// account-global last_uid (the legacy behaviour) silently skips folders whose +// highest UID falls below the global value and misses UIDVALIDITY resets. +type FolderState struct { + AccountID int64 + Folder string + LastUID uint32 + UIDValidity uint32 +} + +// GetFolderState returns the saved per-folder UID tracking record. +// If no record exists yet (first sync of this folder), an empty +// FolderState{LastUID: 0, UIDValidity: 0} is returned and the caller +// MUST treat it as a request for a full folder resync. +func (s *Store) GetFolderState(ctx context.Context, accountID int64, folder string) (FolderState, error) { + var st FolderState + st.AccountID = accountID + st.Folder = folder + + err := s.pool.QueryRow(ctx, ` + SELECT last_uid, uid_validity + FROM imap_folder_state + WHERE account_id = $1 AND folder = $2`, + accountID, folder, + ).Scan(&st.LastUID, &st.UIDValidity) + + if errors.Is(err, pgx.ErrNoRows) { + return st, nil + } + if err != nil { + return st, fmt.Errorf("imap store: get folder state: %w", err) + } + return st, nil +} + +// UpsertFolderState writes (or overwrites) the per-folder UID tracking record. +// Used after a successful folder sync to persist the new high-water mark. +func (s *Store) UpsertFolderState(ctx context.Context, st FolderState) error { + _, err := s.pool.Exec(ctx, ` + INSERT INTO imap_folder_state (account_id, folder, last_uid, uid_validity, updated_at) + VALUES ($1, $2, $3, $4, now()) + ON CONFLICT (account_id, folder) DO UPDATE + SET last_uid = EXCLUDED.last_uid, + uid_validity = EXCLUDED.uid_validity, + updated_at = now()`, + st.AccountID, st.Folder, st.LastUID, st.UIDValidity, + ) + if err != nil { + return fmt.Errorf("imap store: upsert folder state: %w", err) + } + return nil +} + +// ResyncDecision describes what the scheduler should do for a folder given +// the stored state and the UIDVALIDITY reported by the server right after SELECT. +// Extracted as a pure function so the branching logic can be unit-tested +// without a live IMAP server. +type ResyncDecision struct { + FullResync bool // true → ignore stored last_uid, fetch all UIDs + UIDValidReset bool // true → write an audit-log "uidvalidity_reset" entry + EffectiveStart uint32 // UID to filter on (0 means "all messages") +} + +// DecideResync implements the per-folder sync gate (PROJ-45). +// +// - server UIDVALIDITY == 0 → treat as "unknown", continue incrementally +// with the stored last_uid (defensive; some servers misbehave). +// - stored UIDVALIDITY == 0 → first ever sync of this folder → full sync, +// but no "reset" event (nothing to reset from). +// - stored != server (both != 0) → mailbox rebuilt server-side → full resync +// AND log a UIDVALIDITY-reset audit event. +// - otherwise → incremental from stored last_uid. +func DecideResync(storedLastUID, storedUIDValidity, serverUIDValidity uint32) ResyncDecision { + if serverUIDValidity == 0 { + return ResyncDecision{ + FullResync: storedLastUID == 0, + EffectiveStart: storedLastUID, + } + } + if storedUIDValidity == 0 { + return ResyncDecision{FullResync: storedLastUID == 0, EffectiveStart: storedLastUID} + } + if storedUIDValidity != serverUIDValidity { + return ResyncDecision{FullResync: true, UIDValidReset: true, EffectiveStart: 0} + } + return ResyncDecision{FullResync: storedLastUID == 0, EffectiveStart: storedLastUID} +} + +// ListFolderStates returns all per-folder UID tracking records for an account. +// Currently used for diagnostics and potential admin UI. +func (s *Store) ListFolderStates(ctx context.Context, accountID int64) ([]FolderState, error) { + rows, err := s.pool.Query(ctx, ` + SELECT account_id, folder, last_uid, uid_validity + FROM imap_folder_state + WHERE account_id = $1 + ORDER BY folder`, accountID) + if err != nil { + return nil, fmt.Errorf("imap store: list folder states: %w", err) + } + defer rows.Close() + + var states []FolderState + for rows.Next() { + var st FolderState + if err := rows.Scan(&st.AccountID, &st.Folder, &st.LastUID, &st.UIDValidity); err != nil { + return nil, fmt.Errorf("imap store: scan folder state: %w", err) + } + states = append(states, st) + } + return states, rows.Err() +} diff --git a/internal/imap/folder_state_test.go b/internal/imap/folder_state_test.go new file mode 100644 index 0000000..616d6bc --- /dev/null +++ b/internal/imap/folder_state_test.go @@ -0,0 +1,82 @@ +package imap + +import "testing" + +// TestDecideResync covers the per-folder sync gate introduced by PROJ-45. +// The branches under test correspond directly to the acceptance criteria: +// - first-ever sync (no stored state) +// - normal incremental sync (UIDVALIDITY unchanged) +// - UIDVALIDITY mismatch → full resync + audit event +// - server returns UIDVALIDITY=0 → defensive fallback, keep stored cursor +func TestDecideResync(t *testing.T) { + cases := []struct { + name string + storedUID uint32 + storedValid uint32 + serverValid uint32 + wantFull bool + wantReset bool + wantStart uint32 + }{ + { + name: "first sync, no stored state", + storedUID: 0, + storedValid: 0, + serverValid: 12345, + wantFull: true, + wantReset: false, + wantStart: 0, + }, + { + name: "incremental, validity unchanged", + storedUID: 1500, + storedValid: 12345, + serverValid: 12345, + wantFull: false, + wantReset: false, + wantStart: 1500, + }, + { + name: "uidvalidity mismatch triggers full resync + audit", + storedUID: 1500, + storedValid: 12345, + serverValid: 99999, + wantFull: true, + wantReset: true, + wantStart: 0, + }, + { + name: "server uidvalidity 0 defensive: keep cursor, no reset", + storedUID: 800, + storedValid: 12345, + serverValid: 0, + wantFull: false, + wantReset: false, + wantStart: 800, + }, + { + name: "stored validity 0 (legacy row) does not trigger reset", + storedUID: 0, + storedValid: 0, + serverValid: 7777, + wantFull: true, + wantReset: false, + wantStart: 0, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + got := DecideResync(tc.storedUID, tc.storedValid, tc.serverValid) + if got.FullResync != tc.wantFull { + t.Errorf("FullResync: got %v want %v", got.FullResync, tc.wantFull) + } + if got.UIDValidReset != tc.wantReset { + t.Errorf("UIDValidReset: got %v want %v", got.UIDValidReset, tc.wantReset) + } + if got.EffectiveStart != tc.wantStart { + t.Errorf("EffectiveStart: got %d want %d", got.EffectiveStart, tc.wantStart) + } + }) + } +} diff --git a/internal/imap/scheduler.go b/internal/imap/scheduler.go index 39782ae..4360f48 100644 --- a/internal/imap/scheduler.go +++ b/internal/imap/scheduler.go @@ -9,6 +9,8 @@ import ( "sync" "time" + "archivmail/internal/audit" + imapv2 "github.com/emersion/go-imap/v2" "github.com/emersion/go-imap/v2/imapclient" ) @@ -19,6 +21,7 @@ type Scheduler struct { store *Store importer *Importer logger *slog.Logger + audlog *audit.Logger // PROJ-45: UIDVALIDITY-reset events are tenant-visible audit entries mu sync.Mutex running map[int64]bool // in-memory guard against concurrent syncs @@ -36,6 +39,13 @@ func NewScheduler(store *Store, importer *Importer, logger *slog.Logger) *Schedu } } +// SetAuditLogger wires an audit.Logger into the scheduler so that +// UIDVALIDITY-reset events (PROJ-45) are persisted as tenant-visible +// audit entries. Optional — when nil, only structured logs are emitted. +func (s *Scheduler) SetAuditLogger(a *audit.Logger) { + s.audlog = a +} + // Start launches the background scheduler goroutine. // Call Stop to shut it down gracefully. func (s *Scheduler) Start() { @@ -150,6 +160,8 @@ func (s *Scheduler) runSyncWithRetry(ctx context.Context, accountID int64) { lastErr error ) + startedAt := time.Now() + for attempt := 0; attempt < 3; attempt++ { if attempt > 0 { backoff := syncBackoff[attempt-1] @@ -172,15 +184,18 @@ func (s *Scheduler) runSyncWithRetry(ctx context.Context, accountID int64) { "account_id", accountID, "attempt", attempt+1, "err", lastErr) } + durationMs := time.Since(startedAt).Milliseconds() + if lastErr != nil { s.logger.Error("imap scheduler: sync failed after all retries", - "account_id", accountID, "err", lastErr) + "account_id", accountID, "err", lastErr, "duration_ms", durationMs) _ = s.store.UpdateSyncResult(ctx, accountID, "error", lastErr.Error(), 0, lastUID) return } s.logger.Info("imap scheduler: sync completed", - "account_id", accountID, "imported", count, "last_uid", lastUID) + "account_id", accountID, "imported", count, "last_uid", lastUID, + "duration_ms", durationMs) _ = s.store.UpdateSyncResult(ctx, accountID, "ok", "", count, lastUID) } @@ -220,8 +235,9 @@ func (s *Scheduler) doSync(ctx context.Context, accountID int64) (int, uint32, e log := s.logger.With("component", "imap-scheduler", "account_id", accountID) var ( - totalImported int - maxUID uint32 = acc.LastUID + totalImported int + maxUID uint32 + foldersSynced int ) for _, folder := range folders { @@ -235,16 +251,30 @@ func (s *Scheduler) doSync(ctx context.Context, accountID int64) (int, uint32, e continue } + foldersSynced++ totalImported += count if folderMaxUID > maxUID { maxUID = folderMaxUID } } + // Account-level summary (PROJ-45 acceptance criterion). + // duration_ms is intentionally not added here — it's logged by runSyncWithRetry + // which owns the retry-wrapped wall clock. + log.Info("account sync complete", + "folders_synced", foldersSynced, + "new_messages_total", totalImported) + return totalImported, maxUID, nil } // syncFolder syncs new messages from a single IMAP folder. +// +// PROJ-45: Uses per-folder UID tracking from imap_folder_state instead of the +// legacy account-global imap_accounts.last_uid. UIDVALIDITY is read from the +// SELECT response and compared against the stored value — on mismatch the +// folder is fully resynced in this same iteration (PROJ-32 message-id dedup +// blocks duplicates). func (s *Scheduler) syncFolder( ctx context.Context, c *Conn, @@ -252,15 +282,59 @@ func (s *Scheduler) syncFolder( folder string, log *slog.Logger, ) (int, uint32, error) { - if _, err := c.Select(folder, nil).Wait(); err != nil { + // Load per-folder state BEFORE selecting — we need the stored uid_validity + // to compare against the value the server reports in the SELECT response. + state, err := s.store.GetFolderState(ctx, acc.ID, folder) + if err != nil { + return 0, 0, fmt.Errorf("imap scheduler: get folder state %q: %w", folder, err) + } + + selectData, err := c.Select(folder, nil).Wait() + if err != nil { return 0, 0, fmt.Errorf("imap scheduler: select %q: %w", folder, err) } + // Snapshot the server-reported UIDVALIDITY. A value of 0 is treated as + // "unknown / server bug" — we keep the existing high-water mark instead + // of forcing a needless full resync. + serverUIDValidity := uint32(0) + if selectData != nil { + serverUIDValidity = selectData.UIDValidity + } + + effectiveLastUID := state.LastUID + + switch { + case serverUIDValidity == 0: + log.Warn("uidvalidity zero from server, keeping stored last_uid", + "folder", folder, "stored_uid_validity", state.UIDValidity) + case state.UIDValidity != 0 && state.UIDValidity != serverUIDValidity: + // Mailbox was rebuilt server-side — UIDs restart at 1. We must + // resync the entire folder from scratch. PROJ-32 message-id + // deduplication prevents duplicate writes. + log.Warn("uidvalidity change", + "account_id", acc.ID, + "folder", folder, + "old", state.UIDValidity, + "new", serverUIDValidity) + + if s.audlog != nil { + s.audlog.Log(audit.Entry{ + EventType: "imap_uidvalidity_reset", + Username: acc.Owner, + Success: true, + Detail: fmt.Sprintf("account=%d folder=%q old_uidvalidity=%d new_uidvalidity=%d", + acc.ID, folder, state.UIDValidity, serverUIDValidity), + }) + } + effectiveLastUID = 0 + } + var uids []imapv2.UID - if acc.LastUID > 0 { - // Incremental: only messages with UID > lastUID. - minUID := imapv2.UID(acc.LastUID + 1) + if effectiveLastUID > 0 { + // Incremental: only messages with UID > effectiveLastUID. + minUID := imapv2.UID(effectiveLastUID + 1) criteria := &imapv2.SearchCriteria{ UID: []imapv2.UIDSet{ {imapv2.UIDRange{Start: minUID, Stop: 0}}, @@ -272,7 +346,7 @@ func (s *Scheduler) syncFolder( } uids = searchData.AllUIDs() } else { - // First sync: fetch everything. + // First sync or post-UIDVALIDITY-reset: fetch everything. searchData, err := c.UIDSearch(&imapv2.SearchCriteria{}, nil).Wait() if err != nil { return 0, 0, fmt.Errorf("imap scheduler: uid search full %q: %w", folder, err) @@ -280,17 +354,35 @@ func (s *Scheduler) syncFolder( uids = searchData.AllUIDs() } - if len(uids) == 0 { - return 0, 0, nil - } - - log.Info("syncing folder", "folder", folder, "new_messages", len(uids)) - var ( imported int - maxUID uint32 + maxUID uint32 = effectiveLastUID ) + // PROJ-45: even when there are 0 new UIDs, persist the (possibly new) + // uid_validity so a later UIDVALIDITY change can still be detected. + defer func() { + newState := FolderState{ + AccountID: acc.ID, + Folder: folder, + LastUID: maxUID, + UIDValidity: serverUIDValidity, + } + if err := s.store.UpsertFolderState(ctx, newState); err != nil { + log.Warn("imap scheduler: persist folder state failed", + "folder", folder, "err", err) + } + }() + + if len(uids) == 0 { + log.Info("folder synced", + "folder", folder, + "new_messages", 0, + "total_messages", 0, + "uid_validity", serverUIDValidity) + return 0, maxUID, nil + } + for i := 0; i < len(uids); i += batchSize { end := i + batchSize if end > len(uids) { @@ -313,6 +405,12 @@ func (s *Scheduler) syncFolder( } } + log.Info("folder synced", + "folder", folder, + "new_messages", imported, + "total_messages", len(uids), + "uid_validity", serverUIDValidity) + return imported, maxUID, nil } diff --git a/internal/imap/store.go b/internal/imap/store.go index 1137d2e..c539b18 100644 --- a/internal/imap/store.go +++ b/internal/imap/store.go @@ -86,6 +86,20 @@ ALTER TABLE imap_accounts ADD COLUMN IF NOT EXISTS sync_error_msg TEXT NOT NULL ALTER TABLE imap_accounts ADD COLUMN IF NOT EXISTS tenant_id INTEGER REFERENCES tenants(id); ` +// folderStateSQL creates the per-folder UID-tracking table introduced by PROJ-45. +// Replaces the previous account-global last_uid tracking which silently skipped +// folders with lower UIDs than INBOX and ignored UIDVALIDITY changes. +const folderStateSQL = ` +CREATE TABLE IF NOT EXISTS imap_folder_state ( + account_id BIGINT NOT NULL REFERENCES imap_accounts(id) ON DELETE CASCADE, + folder TEXT NOT NULL, + last_uid BIGINT NOT NULL DEFAULT 0, + uid_validity BIGINT NOT NULL DEFAULT 0, + updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), + PRIMARY KEY (account_id, folder) +); +` + // New creates a new Store, connects to PostgreSQL, and runs the migration. func New(dsn, secret string) (*Store, error) { pool, err := pgxpool.New(context.Background(), dsn) @@ -103,6 +117,11 @@ func New(dsn, secret string) (*Store, error) { return nil, fmt.Errorf("imap store: migrate alter: %w", err) } + if _, err := pool.Exec(context.Background(), folderStateSQL); err != nil { + pool.Close() + return nil, fmt.Errorf("imap store: migrate folder state: %w", err) + } + key := sha256.Sum256([]byte(secret)) return &Store{pool: pool, encKey: key}, nil }