diff --git a/cmd/archivmail/main.go b/cmd/archivmail/main.go index 6e16938..fceeac4 100644 --- a/cmd/archivmail/main.go +++ b/cmd/archivmail/main.go @@ -154,7 +154,7 @@ func main() { // Wire SMTP daemon into API server for status endpoint srv.SetSMTPDaemon(smtpDaemon) - // IMAP store + importer (wired to use async worker) + // IMAP store + importer + scheduler (wired to use async worker) imapSt, err := imapstore.New(cfg.Database.DSN(), cfg.API.Secret) if err != nil { logger.Error("imap store init failed", "err", err) @@ -162,7 +162,10 @@ func main() { } defer imapSt.Close() imapImp := imapstore.NewImporter(imapSt, mailStore, idx, logger) - srv.SetImap(imapSt, imapImp) + imapSched := imapstore.NewScheduler(imapSt, imapImp, logger) + imapSched.Start() + defer imapSched.Stop() + srv.SetImap(imapSt, imapImp, imapSched) // Backfill in background: migrate existing files into DB metadata + re-index go runBackfill(context.Background(), mailStore, idx, worker, logger) diff --git a/features/INDEX.md b/features/INDEX.md index 6fa7dd0..7122c73 100644 --- a/features/INDEX.md +++ b/features/INDEX.md @@ -19,7 +19,7 @@ | PROJ-5 | E-Mail-Speicherung & Volltext-Indexierung | Deployed | [PROJ-5](PROJ-5-speicherung-und-indexierung.md) | 2026-03-12 | | PROJ-6 | Volltext-Suche & Filterung | In Review | [PROJ-6](PROJ-6-volltext-suche.md) | 2026-03-12 | | PROJ-7 | E-Mail-Ansicht (Lesen & Anhänge) | In Progress | [PROJ-7](PROJ-7-email-ansicht.md) | 2026-03-12 | -| PROJ-8 | Automatischer IMAP-Sync (Cron-Job) | In Progress | [PROJ-8](PROJ-8-imap-auto-sync.md) | 2026-03-12 | +| PROJ-8 | Automatischer IMAP-Sync (Cron-Job) | Deployed | [PROJ-8](PROJ-8-imap-auto-sync.md) | 2026-03-12 | | PROJ-9 | Ordner- & Label-Verwaltung | In Progress | [PROJ-9](PROJ-9-ordner-und-labels.md) | 2026-03-12 | | PROJ-10 | Admin-Bereich: Nutzer- & Postfachverwaltung | In Progress | [PROJ-10](PROJ-10-admin-bereich.md) | 2026-03-12 | | PROJ-11 | Audit-Log & Compliance-Berichte | In Progress | [PROJ-11](PROJ-11-audit-log.md) | 2026-03-12 | diff --git a/features/PROJ-8-imap-auto-sync.md b/features/PROJ-8-imap-auto-sync.md index 85c9a20..21ccfe4 100644 --- a/features/PROJ-8-imap-auto-sync.md +++ b/features/PROJ-8-imap-auto-sync.md @@ -1,8 +1,8 @@ # PROJ-8: Automatischer IMAP-Sync (Cron-Job) -## Status: In Progress +## Status: Deployed **Created:** 2026-03-12 -**Last Updated:** 2026-03-12 +**Last Updated:** 2026-03-17 ## Dependencies - Requires: PROJ-3 (IMAP-Import) – IMAP-Verbindungen müssen konfiguriert sein @@ -15,12 +15,12 @@ - Als System möchte ich beim Sync nur neue E-Mails (seit letztem Sync) abholen, damit kein unnötiger Traffic entsteht. ## Acceptance Criteria -- [ ] Sync-Intervall pro IMAP-Verbindung konfigurierbar (min. 5 Minuten, max. 24 Stunden) -- [ ] IMAP UID-basierter inkrementeller Sync (nur neue E-Mails seit letztem Sync) -- [ ] Admin-UI zeigt: letzter Sync, Status (Erfolg/Fehler), Anzahl importierter E-Mails -- [ ] Manueller "Sync jetzt"-Button im Admin-Bereich -- [ ] Bei Sync-Fehler: Retry mit exponential backoff (max. 3 Versuche) -- [ ] Sync-Fehler nach allen Versuchen → Fehlermeldung im Admin-Dashboard +- [x] Sync-Intervall pro IMAP-Verbindung konfigurierbar (min. 5 Minuten, max. 24 Stunden) +- [x] IMAP UID-basierter inkrementeller Sync (nur neue E-Mails seit letztem Sync) +- [x] Admin-UI zeigt: letzter Sync, Status (Erfolg/Fehler), Anzahl importierter E-Mails +- [x] Manueller "Sync jetzt"-Button im Admin-Bereich +- [x] Bei Sync-Fehler: Retry mit exponential backoff (max. 3 Versuche) +- [x] Sync-Fehler nach allen Versuchen → Fehlermeldung im Admin-Dashboard ## Edge Cases - IMAP-Server temporär nicht erreichbar → Retry ohne Abbruch des gesamten Sync-Jobs @@ -29,9 +29,18 @@ - Zeitzonenprobleme beim Datum-Vergleich → immer UTC intern verwenden ## Technical Requirements -- Cron-Scheduler eingebettet (z.B. robfig/cron für Go) +- Kein externer Cron-Scheduler — `time.NewTicker(1 * time.Minute)` + Goroutine (YAGNI, keine neue Abhängigkeit) - Sync-Status persistent in DB gespeichert (überlebt Server-Neustart) +## Implementation Notes (2026-03-17) + +- `internal/imap/store.go`: Account-Struct um 7 Sync-Felder erweitert; `migrationSQL` mit `ADD COLUMN IF NOT EXISTS`; neue Methoden: `ListAll`, `UpdateSyncInterval`, `SetSyncRunning`, `UpdateSyncResult`; einheitliche `scanRow(scanner)`-Funktion mit eigenem Interface statt `pgx.Row` +- `internal/imap/scheduler.go`: Neues Paket; `Scheduler` mit `sync.Mutex`-geschützter `running`-Map; `Start/Stop/TriggerSync`; `runSyncWithRetry` mit 3 Versuchen (Backoffs: 1s, 60s, 300s); `doSync` delegiert `storeAndIndex` an den vorhandenen `Importer` +- `internal/api/server.go`: `imapScheduler`-Feld; `SetImap`-Signatur erweitert; neue Routen `POST /api/imap/{id}/sync` und `PATCH /api/imap/{id}` +- `src/lib/api.ts`: ImapAccount um 6 Felder erweitert; `triggerImapSync`, `updateImapInterval` hinzugefügt +- `src/app/imap/page.tsx`: Polling auch für `sync_running`; Dropdown für Sync-Intervall; "Sync jetzt"-Button; Sync-Status-Badge + letzter Sync-Zeitstempel pro Account-Card +- `cmd/archivmail/main.go`: `NewScheduler`, `Start`, `Stop`, `SetImap` mit Scheduler verdrahtet + --- ## Tech Design (Solution Architect) diff --git a/internal/api/server.go b/internal/api/server.go index 56e44bf..c92d98f 100644 --- a/internal/api/server.go +++ b/internal/api/server.go @@ -41,9 +41,10 @@ type Server struct { audlog *audit.Logger logger *slog.Logger mux *http.ServeMux - smtpDaemon *smtpd.Daemon - imapStore *imapstore.Store - imapImporter *imapstore.Importer + smtpDaemon *smtpd.Daemon + imapStore *imapstore.Store + imapImporter *imapstore.Importer + imapScheduler *imapstore.Scheduler } // SetSMTPDaemon wires the SMTP daemon into the API server after construction. @@ -51,10 +52,11 @@ func (s *Server) SetSMTPDaemon(d *smtpd.Daemon) { s.smtpDaemon = d } -// SetImap wires the IMAP store and importer into the API server after construction. -func (s *Server) SetImap(store *imapstore.Store, importer *imapstore.Importer) { +// SetImap wires the IMAP store, importer, and scheduler into the API server after construction. +func (s *Server) SetImap(store *imapstore.Store, importer *imapstore.Importer, scheduler *imapstore.Scheduler) { s.imapStore = store s.imapImporter = importer + s.imapScheduler = scheduler } // New creates and wires up a new API server. @@ -110,9 +112,11 @@ func (s *Server) routes() { s.mux.HandleFunc("GET /api/imap", s.authMiddleware(s.handleListImap)) s.mux.HandleFunc("POST /api/imap", s.authMiddleware(s.handleCreateImap)) s.mux.HandleFunc("DELETE /api/imap/{id}", s.authMiddleware(s.handleDeleteImap)) + s.mux.HandleFunc("PATCH /api/imap/{id}", s.authMiddleware(s.handleUpdateImapInterval)) s.mux.HandleFunc("POST /api/imap/test", s.authMiddleware(s.handleTestImap)) s.mux.HandleFunc("POST /api/imap/{id}/import", s.authMiddleware(s.handleStartImport)) s.mux.HandleFunc("GET /api/imap/{id}/progress", s.authMiddleware(s.handleImapProgress)) + s.mux.HandleFunc("POST /api/imap/{id}/sync", s.authMiddleware(s.handleSyncNow)) } // ServeHTTP implements http.Handler. @@ -1205,6 +1209,89 @@ func (s *Server) handleImapProgress(w http.ResponseWriter, r *http.Request) { writeJSON(w, http.StatusOK, acc) } +// handleSyncNow triggers an immediate incremental sync for a single IMAP account. +func (s *Server) handleSyncNow(w http.ResponseWriter, r *http.Request) { + if s.imapStore == nil || s.imapScheduler == nil { + writeError(w, http.StatusServiceUnavailable, "IMAP not configured") + return + } + idStr := r.PathValue("id") + id, err := strconv.ParseInt(idStr, 10, 64) + if err != nil { + writeError(w, http.StatusBadRequest, "invalid id") + return + } + + acc, err := s.imapStore.Get(r.Context(), id) + if err != nil { + writeError(w, http.StatusNotFound, "account not found") + return + } + + sess := sessionFromCtx(r.Context()) + if acc.Owner != sess.Username && sess.Role != userstore.RoleAdmin { + writeError(w, http.StatusForbidden, "access denied") + return + } + + if err := s.imapScheduler.TriggerSync(r.Context(), id); err != nil { + writeError(w, http.StatusConflict, err.Error()) + return + } + + // Return the account with the updated sync_running flag reflected immediately. + acc.SyncRunning = true + writeJSON(w, http.StatusOK, acc) +} + +// handleUpdateImapInterval updates the automatic sync interval for an IMAP account. +func (s *Server) handleUpdateImapInterval(w http.ResponseWriter, r *http.Request) { + if s.imapStore == nil { + writeError(w, http.StatusServiceUnavailable, "IMAP not configured") + return + } + idStr := r.PathValue("id") + id, err := strconv.ParseInt(idStr, 10, 64) + if err != nil { + writeError(w, http.StatusBadRequest, "invalid id") + return + } + + acc, err := s.imapStore.Get(r.Context(), id) + if err != nil { + writeError(w, http.StatusNotFound, "account not found") + return + } + + sess := sessionFromCtx(r.Context()) + if acc.Owner != sess.Username && sess.Role != userstore.RoleAdmin { + writeError(w, http.StatusForbidden, "access denied") + return + } + + var req struct { + SyncIntervalMin int `json:"sync_interval_min"` + } + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + writeError(w, http.StatusBadRequest, "invalid request body") + return + } + + // 0 = disabled; otherwise must be between 5 and 1440 minutes. + if req.SyncIntervalMin != 0 && (req.SyncIntervalMin < 5 || req.SyncIntervalMin > 1440) { + writeError(w, http.StatusBadRequest, "sync_interval_min must be 0 (disabled) or between 5 and 1440") + return + } + + if err := s.imapStore.UpdateSyncInterval(r.Context(), id, req.SyncIntervalMin); err != nil { + writeError(w, http.StatusInternalServerError, "failed to update sync interval") + return + } + + acc.SyncIntervalMin = req.SyncIntervalMin + writeJSON(w, http.StatusOK, acc) +} + // ── System stats handler ───────────────────────────────────────────────── type diskStat struct { diff --git a/internal/imap/scheduler.go b/internal/imap/scheduler.go new file mode 100644 index 0000000..2058eb8 --- /dev/null +++ b/internal/imap/scheduler.go @@ -0,0 +1,384 @@ +// Package imap provides IMAP account management, import, and automatic sync. +package imap + +import ( + "context" + "fmt" + "io" + "log/slog" + "sync" + "time" + + imapv2 "github.com/emersion/go-imap/v2" + "github.com/emersion/go-imap/v2/imapclient" +) + +// Scheduler runs automatic IMAP syncs for all configured accounts. +// It checks every minute whether a sync is due for a given account. +type Scheduler struct { + store *Store + importer *Importer + logger *slog.Logger + + mu sync.Mutex + running map[int64]bool // in-memory guard against concurrent syncs + + cancel context.CancelFunc +} + +// NewScheduler creates a Scheduler wired to the given store and importer. +func NewScheduler(store *Store, importer *Importer, logger *slog.Logger) *Scheduler { + return &Scheduler{ + store: store, + importer: importer, + logger: logger, + running: make(map[int64]bool), + } +} + +// Start launches the background scheduler goroutine. +// Call Stop to shut it down gracefully. +func (s *Scheduler) Start() { + ctx, cancel := context.WithCancel(context.Background()) + s.cancel = cancel + + go s.loop(ctx) + s.logger.Info("imap scheduler: started") +} + +// Stop signals the scheduler goroutine to exit. +func (s *Scheduler) Stop() { + if s.cancel != nil { + s.cancel() + } + s.logger.Info("imap scheduler: stopped") +} + +// TriggerSync manually initiates an incremental sync for a single account. +// Returns immediately; the sync runs in a background goroutine. +// Returns an error if a sync is already running for this account. +func (s *Scheduler) TriggerSync(ctx context.Context, accountID int64) error { + s.mu.Lock() + if s.running[accountID] { + s.mu.Unlock() + return fmt.Errorf("imap scheduler: sync already running for account %d", accountID) + } + s.running[accountID] = true + s.mu.Unlock() + + go s.runSyncWithRetry(context.Background(), accountID) + return nil +} + +// loop is the main scheduler goroutine — ticks every minute. +func (s *Scheduler) loop(ctx context.Context) { + ticker := time.NewTicker(1 * time.Minute) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + s.checkAccounts(ctx) + } + } +} + +// checkAccounts loads all accounts and starts syncs for those that are due. +func (s *Scheduler) checkAccounts(ctx context.Context) { + accounts, err := s.store.ListAll(ctx) + if err != nil { + s.logger.Error("imap scheduler: list accounts failed", "err", err) + return + } + + now := time.Now() + for _, acc := range accounts { + if acc.SyncIntervalMin <= 0 { + continue + } + + s.mu.Lock() + alreadyRunning := s.running[acc.ID] + s.mu.Unlock() + + if alreadyRunning || acc.SyncRunning { + continue + } + + interval := time.Duration(acc.SyncIntervalMin) * time.Minute + var lastSync time.Time + if acc.LastSyncAt != nil { + lastSync = *acc.LastSyncAt + } + + if now.Sub(lastSync) >= interval { + s.mu.Lock() + s.running[acc.ID] = true + s.mu.Unlock() + + s.logger.Info("imap scheduler: starting scheduled sync", + "account_id", acc.ID, "name", acc.Name) + go s.runSyncWithRetry(context.Background(), acc.ID) + } + } +} + +// syncBackoff defines wait durations between retry attempts. +var syncBackoff = []time.Duration{1 * time.Second, 60 * time.Second, 300 * time.Second} + +// runSyncWithRetry retries doSync up to 3 times with backoff. +// It always resets the running flag when it returns. +func (s *Scheduler) runSyncWithRetry(ctx context.Context, accountID int64) { + defer func() { + s.mu.Lock() + delete(s.running, accountID) + s.mu.Unlock() + _ = s.store.SetSyncRunning(ctx, accountID, false) + }() + + if err := s.store.SetSyncRunning(ctx, accountID, true); err != nil { + s.logger.Error("imap scheduler: set sync running failed", + "account_id", accountID, "err", err) + return + } + + var ( + count int + lastUID uint32 + lastErr error + ) + + for attempt := 0; attempt < 3; attempt++ { + if attempt > 0 { + backoff := syncBackoff[attempt-1] + s.logger.Warn("imap scheduler: retrying sync", + "account_id", accountID, "attempt", attempt+1, "backoff", backoff) + + select { + case <-ctx.Done(): + return + case <-time.After(backoff): + } + } + + count, lastUID, lastErr = s.doSync(ctx, accountID) + if lastErr == nil { + break + } + + s.logger.Warn("imap scheduler: sync attempt failed", + "account_id", accountID, "attempt", attempt+1, "err", lastErr) + } + + if lastErr != nil { + s.logger.Error("imap scheduler: sync failed after all retries", + "account_id", accountID, "err", lastErr) + _ = s.store.UpdateSyncResult(ctx, accountID, "error", lastErr.Error(), 0, lastUID) + return + } + + s.logger.Info("imap scheduler: sync completed", + "account_id", accountID, "imported", count, "last_uid", lastUID) + _ = s.store.UpdateSyncResult(ctx, accountID, "ok", "", count, lastUID) +} + +// doSync performs an incremental IMAP sync for one account. +// Returns (importedCount, maxUID, error). +func (s *Scheduler) doSync(ctx context.Context, accountID int64) (int, uint32, error) { + acc, err := s.store.Get(ctx, accountID) + if err != nil { + return 0, 0, fmt.Errorf("imap scheduler: get account: %w", err) + } + + password, err := s.store.GetPassword(ctx, accountID) + if err != nil { + return 0, 0, fmt.Errorf("imap scheduler: get password: %w", err) + } + + c, err := Connect(acc.Host, acc.Port, acc.TLS) + if err != nil { + return 0, 0, fmt.Errorf("imap scheduler: connect: %w", err) + } + defer c.Close() + + if err := c.Login(acc.Username, password).Wait(); err != nil { + return 0, 0, fmt.Errorf("imap scheduler: login: %w", err) + } + + folders, err := ListFolders(c) + if err != nil { + return 0, 0, fmt.Errorf("imap scheduler: list folders: %w", err) + } + + excluded := make(map[string]bool, len(acc.ExcludedFolders)) + for _, f := range acc.ExcludedFolders { + excluded[f] = true + } + + log := s.logger.With("component", "imap-scheduler", "account_id", accountID) + + var ( + totalImported int + maxUID uint32 = acc.LastUID + ) + + for _, folder := range folders { + if excluded[folder.Name] { + continue + } + + count, folderMaxUID, err := s.syncFolder(ctx, c, acc, folder.Name, log) + if err != nil { + log.Warn("sync folder failed, continuing", "folder", folder.Name, "err", err) + continue + } + + totalImported += count + if folderMaxUID > maxUID { + maxUID = folderMaxUID + } + } + + return totalImported, maxUID, nil +} + +// syncFolder syncs new messages from a single IMAP folder. +func (s *Scheduler) syncFolder( + ctx context.Context, + c *imapclient.Client, + acc *Account, + folder string, + log *slog.Logger, +) (int, uint32, error) { + if _, err := c.Select(folder, nil).Wait(); err != nil { + return 0, 0, fmt.Errorf("imap scheduler: select %q: %w", folder, err) + } + + var uids []imapv2.UID + + if acc.LastUID > 0 { + // Incremental: only messages with UID > lastUID. + minUID := imapv2.UID(acc.LastUID + 1) + criteria := &imapv2.SearchCriteria{ + UID: []imapv2.UIDSet{ + {imapv2.UIDRange{Start: minUID, Stop: 0}}, + }, + } + searchData, err := c.UIDSearch(criteria, nil).Wait() + if err != nil { + return 0, 0, fmt.Errorf("imap scheduler: uid search incremental %q: %w", folder, err) + } + uids = searchData.AllUIDs() + } else { + // First sync: fetch everything. + searchData, err := c.UIDSearch(&imapv2.SearchCriteria{}, nil).Wait() + if err != nil { + return 0, 0, fmt.Errorf("imap scheduler: uid search full %q: %w", folder, err) + } + uids = searchData.AllUIDs() + } + + if len(uids) == 0 { + return 0, 0, nil + } + + log.Info("syncing folder", "folder", folder, "new_messages", len(uids)) + + var ( + imported int + maxUID uint32 + ) + + for i := 0; i < len(uids); i += batchSize { + end := i + batchSize + if end > len(uids) { + end = len(uids) + } + batch := uids[i:end] + + count, batchMaxUID, err := s.fetchSyncBatch(c, batch, log) + if err != nil { + log.Warn("imap scheduler: batch error, continuing", + "folder", folder, "offset", i, "err", err) + continue + } + + imported += count + if batchMaxUID > maxUID { + maxUID = batchMaxUID + } + } + + return imported, maxUID, nil +} + +// fetchSyncBatch fetches and stores a batch of messages by UID. +// The maximum UID is derived from the input uid slice (already known from SEARCH). +// Returns (importedCount, maxUID, error). +func (s *Scheduler) fetchSyncBatch( + c *imapclient.Client, + uids []imapv2.UID, + log *slog.Logger, +) (int, uint32, error) { + if len(uids) == 0 { + return 0, 0, nil + } + + // Compute the max UID from the search result — no need to parse it from FETCH. + var maxUID uint32 + for _, u := range uids { + if uint32(u) > maxUID { + maxUID = uint32(u) + } + } + + fetchOptions := &imapv2.FetchOptions{ + BodySection: []*imapv2.FetchItemBodySection{{}}, + } + + seqSet := imapv2.UIDSetNum(uids...) + fetchCmd := c.Fetch(seqSet, fetchOptions) + + imported := 0 + + for { + msg := fetchCmd.Next() + if msg == nil { + break + } + + for { + item := msg.Next() + if item == nil { + break + } + + body, ok := item.(imapclient.FetchItemDataBodySection) + if !ok { + continue + } + + raw, err := io.ReadAll(body.Literal) + if err != nil { + log.Warn("imap scheduler: read body failed", "err", err) + continue + } + + if len(raw) > 0 { + if err := s.importer.storeAndIndex(raw, log); err != nil { + log.Warn("imap scheduler: store/index failed", "err", err) + } else { + imported++ + } + } + } + } + + if err := fetchCmd.Close(); err != nil { + return imported, maxUID, fmt.Errorf("imap scheduler: fetch close: %w", err) + } + + return imported, maxUID, nil +} diff --git a/internal/imap/store.go b/internal/imap/store.go index 02544da..a43ba59 100644 --- a/internal/imap/store.go +++ b/internal/imap/store.go @@ -31,6 +31,15 @@ type Account struct { ProgressCurrent int `json:"progress_current"` ProgressTotal int `json:"progress_total"` CreatedAt time.Time `json:"created_at"` + + // PROJ-8: Auto-sync fields + SyncIntervalMin int `json:"sync_interval_min"` + LastSyncAt *time.Time `json:"last_sync_at,omitempty"` + LastSyncCount int `json:"last_sync_count"` + LastUID uint32 `json:"last_uid"` + SyncRunning bool `json:"sync_running"` + SyncStatus string `json:"sync_status"` + SyncErrorMsg string `json:"sync_error_msg"` } // Store manages IMAP account persistence in PostgreSQL. @@ -62,6 +71,17 @@ CREATE TABLE IF NOT EXISTS imap_accounts ( CREATE INDEX IF NOT EXISTS idx_imap_accounts_owner ON imap_accounts (owner); ` +// migrationSQL adds the PROJ-8 sync columns if they do not yet exist. +const migrationSQL = ` +ALTER TABLE imap_accounts ADD COLUMN IF NOT EXISTS sync_interval_min INTEGER NOT NULL DEFAULT 0; +ALTER TABLE imap_accounts ADD COLUMN IF NOT EXISTS last_sync_at TIMESTAMPTZ; +ALTER TABLE imap_accounts ADD COLUMN IF NOT EXISTS last_sync_count INTEGER NOT NULL DEFAULT 0; +ALTER TABLE imap_accounts ADD COLUMN IF NOT EXISTS last_uid BIGINT NOT NULL DEFAULT 0; +ALTER TABLE imap_accounts ADD COLUMN IF NOT EXISTS sync_running BOOLEAN NOT NULL DEFAULT FALSE; +ALTER TABLE imap_accounts ADD COLUMN IF NOT EXISTS sync_status TEXT NOT NULL DEFAULT ''; +ALTER TABLE imap_accounts ADD COLUMN IF NOT EXISTS sync_error_msg TEXT NOT NULL DEFAULT ''; +` + // New creates a new Store, connects to PostgreSQL, and runs the migration. func New(dsn, secret string) (*Store, error) { pool, err := pgxpool.New(context.Background(), dsn) @@ -71,7 +91,12 @@ func New(dsn, secret string) (*Store, error) { if _, err := pool.Exec(context.Background(), createTableSQL); err != nil { pool.Close() - return nil, fmt.Errorf("imap store: migrate: %w", err) + return nil, fmt.Errorf("imap store: migrate create: %w", err) + } + + if _, err := pool.Exec(context.Background(), migrationSQL); err != nil { + pool.Close() + return nil, fmt.Errorf("imap store: migrate alter: %w", err) } key := sha256.Sum256([]byte(secret)) @@ -106,23 +131,43 @@ func (s *Store) Create(ctx context.Context, acc Account, password string) (*Acco return &acc, nil } +// selectColumns is the canonical column list used in all SELECT statements. +// Column order must match the Scan call in scanRow. +// Leading and trailing spaces are intentional for correct SQL concatenation. +const selectColumns = ` id, owner, name, host, port, tls, username, excluded_folders, + status, error_msg, last_import_at, last_import_count, + progress_current, progress_total, created_at, + sync_interval_min, last_sync_at, last_sync_count, last_uid, + sync_running, sync_status, sync_error_msg ` + +// scanner abstracts pgx.Row and pgx.Rows — both expose Scan(...any) error. +type scanner interface { + Scan(dest ...any) error +} + +func scanRow(row scanner) (Account, error) { + var a Account + err := row.Scan( + &a.ID, &a.Owner, &a.Name, &a.Host, &a.Port, &a.TLS, &a.Username, + &a.ExcludedFolders, &a.Status, &a.ErrorMsg, &a.LastImportAt, + &a.LastImportCount, &a.ProgressCurrent, &a.ProgressTotal, &a.CreatedAt, + &a.SyncIntervalMin, &a.LastSyncAt, &a.LastSyncCount, &a.LastUID, + &a.SyncRunning, &a.SyncStatus, &a.SyncErrorMsg, + ) + return a, err +} + // List returns IMAP accounts. Admins see all accounts; regular users see only their own. func (s *Store) List(ctx context.Context, owner string, isAdmin bool) ([]Account, error) { var rows pgx.Rows var err error + q := `SELECT` + selectColumns + `FROM imap_accounts` + if isAdmin { - rows, err = s.pool.Query(ctx, ` - SELECT id, owner, name, host, port, tls, username, excluded_folders, - status, error_msg, last_import_at, last_import_count, - progress_current, progress_total, created_at - FROM imap_accounts ORDER BY id`) + rows, err = s.pool.Query(ctx, q+` ORDER BY id`) } else { - rows, err = s.pool.Query(ctx, ` - SELECT id, owner, name, host, port, tls, username, excluded_folders, - status, error_msg, last_import_at, last_import_count, - progress_current, progress_total, created_at - FROM imap_accounts WHERE owner = $1 ORDER BY id`, owner) + rows, err = s.pool.Query(ctx, q+` WHERE owner = $1 ORDER BY id`, owner) } if err != nil { return nil, fmt.Errorf("imap store: list: %w", err) @@ -131,12 +176,28 @@ func (s *Store) List(ctx context.Context, owner string, isAdmin bool) ([]Account var accounts []Account for rows.Next() { - var a Account - if err := rows.Scan( - &a.ID, &a.Owner, &a.Name, &a.Host, &a.Port, &a.TLS, &a.Username, - &a.ExcludedFolders, &a.Status, &a.ErrorMsg, &a.LastImportAt, - &a.LastImportCount, &a.ProgressCurrent, &a.ProgressTotal, &a.CreatedAt, - ); err != nil { + a, err := scanRow(rows) + if err != nil { + return nil, fmt.Errorf("imap store: scan: %w", err) + } + accounts = append(accounts, a) + } + return accounts, rows.Err() +} + +// ListAll returns all IMAP accounts regardless of owner — used by the scheduler. +func (s *Store) ListAll(ctx context.Context) ([]Account, error) { + rows, err := s.pool.Query(ctx, + `SELECT`+selectColumns+`FROM imap_accounts ORDER BY id`) + if err != nil { + return nil, fmt.Errorf("imap store: list all: %w", err) + } + defer rows.Close() + + var accounts []Account + for rows.Next() { + a, err := scanRow(rows) + if err != nil { return nil, fmt.Errorf("imap store: scan: %w", err) } accounts = append(accounts, a) @@ -146,17 +207,9 @@ func (s *Store) List(ctx context.Context, owner string, isAdmin bool) ([]Account // Get returns a single IMAP account by ID. func (s *Store) Get(ctx context.Context, id int64) (*Account, error) { - var a Account - err := s.pool.QueryRow(ctx, ` - SELECT id, owner, name, host, port, tls, username, excluded_folders, - status, error_msg, last_import_at, last_import_count, - progress_current, progress_total, created_at - FROM imap_accounts WHERE id = $1`, id, - ).Scan( - &a.ID, &a.Owner, &a.Name, &a.Host, &a.Port, &a.TLS, &a.Username, - &a.ExcludedFolders, &a.Status, &a.ErrorMsg, &a.LastImportAt, - &a.LastImportCount, &a.ProgressCurrent, &a.ProgressTotal, &a.CreatedAt, - ) + row := s.pool.QueryRow(ctx, + `SELECT`+selectColumns+`FROM imap_accounts WHERE id = $1`, id) + a, err := scanRow(row) if err != nil { return nil, fmt.Errorf("imap store: get %d: %w", id, err) } @@ -219,6 +272,43 @@ func (s *Store) UpdateDone(ctx context.Context, id int64, count int) error { return nil } +// UpdateSyncInterval sets the automatic sync interval for an account. +// intervalMin == 0 disables automatic sync. +func (s *Store) UpdateSyncInterval(ctx context.Context, id int64, intervalMin int) error { + _, err := s.pool.Exec(ctx, + `UPDATE imap_accounts SET sync_interval_min = $1 WHERE id = $2`, + intervalMin, id) + if err != nil { + return fmt.Errorf("imap store: update sync interval: %w", err) + } + return nil +} + +// SetSyncRunning marks whether a background sync is currently active for an account. +func (s *Store) SetSyncRunning(ctx context.Context, id int64, running bool) error { + _, err := s.pool.Exec(ctx, + `UPDATE imap_accounts SET sync_running = $1 WHERE id = $2`, + running, id) + if err != nil { + return fmt.Errorf("imap store: set sync running: %w", err) + } + return nil +} + +// UpdateSyncResult persists the outcome of a completed sync run. +func (s *Store) UpdateSyncResult(ctx context.Context, id int64, status, errMsg string, count int, lastUID uint32) error { + _, err := s.pool.Exec(ctx, ` + UPDATE imap_accounts + SET sync_status = $1, sync_error_msg = $2, last_sync_count = $3, + last_uid = $4, last_sync_at = now(), sync_running = FALSE + WHERE id = $5`, + status, errMsg, count, lastUID, id) + if err != nil { + return fmt.Errorf("imap store: update sync result: %w", err) + } + return nil +} + // encryptPassword encrypts a plaintext password using AES-256-GCM. func encryptPassword(plaintext string, key [32]byte) ([]byte, error) { block, err := aes.NewCipher(key[:]) diff --git a/src/app/imap/page.tsx b/src/app/imap/page.tsx index 1d0ba3c..53c767e 100644 --- a/src/app/imap/page.tsx +++ b/src/app/imap/page.tsx @@ -10,6 +10,8 @@ import { testImapConnection, startImapImport, getImapProgress, + triggerImapSync, + updateImapInterval, type ImapAccount, type ImapFolder, } from "@/lib/api"; @@ -83,17 +85,18 @@ export default function ImapPage() { if (user) loadAccounts(); }, [user, loadAccounts]); - // Start polling for running accounts + // Start polling for running accounts (import or sync) useEffect(() => { for (const acc of accounts) { - if (acc.status === "running" && !pollingRefs.current.has(acc.id)) { + const isActive = acc.status === "running" || acc.sync_running; + if (isActive && !pollingRefs.current.has(acc.id)) { const interval = setInterval(async () => { try { const updated = await getImapProgress(acc.id); setAccounts((prev) => prev.map((a) => (a.id === updated.id ? updated : a)) ); - if (updated.status !== "running") { + if (updated.status !== "running" && !updated.sync_running) { clearInterval(pollingRefs.current.get(acc.id)!); pollingRefs.current.delete(acc.id); } @@ -105,10 +108,10 @@ export default function ImapPage() { pollingRefs.current.set(acc.id, interval); } } - // Cleanup intervals for accounts that are no longer running + // Cleanup intervals for accounts that are no longer active for (const [id, interval] of pollingRefs.current) { const acc = accounts.find((a) => a.id === id); - if (!acc || acc.status !== "running") { + if (!acc || (acc.status !== "running" && !acc.sync_running)) { clearInterval(interval); pollingRefs.current.delete(id); } @@ -206,6 +209,25 @@ export default function ImapPage() { setDeleteConfirm(null); } + async function handleSyncNow(id: number) { + try { + const updated = await triggerImapSync(id); + setAccounts((prev) => prev.map((a) => (a.id === updated.id ? updated : a))); + } catch { + // ignore — conflict means sync already running + } + } + + async function handleIntervalChange(id: number, value: string) { + const intervalMin = parseInt(value, 10); + try { + const updated = await updateImapInterval(id, intervalMin); + setAccounts((prev) => prev.map((a) => (a.id === updated.id ? updated : a))); + } catch { + // ignore + } + } + function toggleExcluded(folderName: string) { setExcludedFolders((prev) => { const next = new Set(prev); @@ -229,6 +251,20 @@ export default function ImapPage() { } } + function syncBadge(acc: ImapAccount) { + if (acc.sync_running) { + return Sync laeuft...; + } + if (!acc.sync_status) return null; + if (acc.sync_status === "ok") { + return Sync OK; + } + if (acc.sync_status === "error") { + return Sync Fehler; + } + return null; + } + return (
@@ -305,13 +341,57 @@ export default function ImapPage() {

)} + {/* PROJ-8: Sync status */} + {acc.last_sync_at && ( +
+

+ Letzter Sync:{" "} + {new Date(acc.last_sync_at).toLocaleString("de-DE")} ( + {acc.last_sync_count} neu) +

+ {syncBadge(acc)} +
+ )} + + {acc.sync_running && !acc.last_sync_at && syncBadge(acc)} + + {acc.sync_error_msg && acc.sync_status === "error" && ( +

+ Sync-Fehler: {acc.sync_error_msg} +

+ )} + {acc.excluded_folders && acc.excluded_folders.length > 0 && (

Ausgeschlossene Ordner: {acc.excluded_folders.join(", ")}

)} -
+ {/* PROJ-8: Sync interval selector */} +
+ + Auto-Sync: + + +
+ +
+