feat(PROJ-8): Automatischer IMAP-Sync (Cron-Scheduler)
Backend:
- internal/imap/store.go: 7 neue Felder (sync_interval_min, last_sync_at,
last_sync_count, last_uid, sync_running, sync_status, sync_error_msg)
DB-Migration via ALTER TABLE ADD COLUMN IF NOT EXISTS
Neue Methoden: ListAll, UpdateSyncInterval, SetSyncRunning, UpdateSyncResult
- internal/imap/scheduler.go: Scheduler mit time.Ticker (1 min),
inkrementeller Sync via UID SEARCH UID <lastUID+1>:*,
exponential backoff (3 Versuche: 1s / 60s / 300s),
sync_running-Flag verhindert parallele Syncs
- internal/api/server.go: POST /api/imap/{id}/sync (manueller Trigger),
PATCH /api/imap/{id} (sync_interval_min setzen, 0 oder 5-1440 min)
- cmd/archivmail/main.go: Scheduler gestartet + via SetImap verdrahtet
Frontend:
- src/lib/api.ts: 6 neue ImapAccount-Felder, triggerImapSync, updateImapInterval
- src/app/imap/page.tsx: Intervall-Dropdown, "Sync jetzt"-Button,
Letzter-Sync-Anzeige mit Status-Badge, Polling auch bei sync_running
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -154,7 +154,7 @@ func main() {
|
|||||||
// Wire SMTP daemon into API server for status endpoint
|
// Wire SMTP daemon into API server for status endpoint
|
||||||
srv.SetSMTPDaemon(smtpDaemon)
|
srv.SetSMTPDaemon(smtpDaemon)
|
||||||
|
|
||||||
// IMAP store + importer (wired to use async worker)
|
// IMAP store + importer + scheduler (wired to use async worker)
|
||||||
imapSt, err := imapstore.New(cfg.Database.DSN(), cfg.API.Secret)
|
imapSt, err := imapstore.New(cfg.Database.DSN(), cfg.API.Secret)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error("imap store init failed", "err", err)
|
logger.Error("imap store init failed", "err", err)
|
||||||
@@ -162,7 +162,10 @@ func main() {
|
|||||||
}
|
}
|
||||||
defer imapSt.Close()
|
defer imapSt.Close()
|
||||||
imapImp := imapstore.NewImporter(imapSt, mailStore, idx, logger)
|
imapImp := imapstore.NewImporter(imapSt, mailStore, idx, logger)
|
||||||
srv.SetImap(imapSt, imapImp)
|
imapSched := imapstore.NewScheduler(imapSt, imapImp, logger)
|
||||||
|
imapSched.Start()
|
||||||
|
defer imapSched.Stop()
|
||||||
|
srv.SetImap(imapSt, imapImp, imapSched)
|
||||||
|
|
||||||
// Backfill in background: migrate existing files into DB metadata + re-index
|
// Backfill in background: migrate existing files into DB metadata + re-index
|
||||||
go runBackfill(context.Background(), mailStore, idx, worker, logger)
|
go runBackfill(context.Background(), mailStore, idx, worker, logger)
|
||||||
|
|||||||
+1
-1
@@ -19,7 +19,7 @@
|
|||||||
| PROJ-5 | E-Mail-Speicherung & Volltext-Indexierung | Deployed | [PROJ-5](PROJ-5-speicherung-und-indexierung.md) | 2026-03-12 |
|
| PROJ-5 | E-Mail-Speicherung & Volltext-Indexierung | Deployed | [PROJ-5](PROJ-5-speicherung-und-indexierung.md) | 2026-03-12 |
|
||||||
| PROJ-6 | Volltext-Suche & Filterung | In Review | [PROJ-6](PROJ-6-volltext-suche.md) | 2026-03-12 |
|
| PROJ-6 | Volltext-Suche & Filterung | In Review | [PROJ-6](PROJ-6-volltext-suche.md) | 2026-03-12 |
|
||||||
| PROJ-7 | E-Mail-Ansicht (Lesen & Anhänge) | In Progress | [PROJ-7](PROJ-7-email-ansicht.md) | 2026-03-12 |
|
| PROJ-7 | E-Mail-Ansicht (Lesen & Anhänge) | In Progress | [PROJ-7](PROJ-7-email-ansicht.md) | 2026-03-12 |
|
||||||
| PROJ-8 | Automatischer IMAP-Sync (Cron-Job) | In Progress | [PROJ-8](PROJ-8-imap-auto-sync.md) | 2026-03-12 |
|
| PROJ-8 | Automatischer IMAP-Sync (Cron-Job) | Deployed | [PROJ-8](PROJ-8-imap-auto-sync.md) | 2026-03-12 |
|
||||||
| PROJ-9 | Ordner- & Label-Verwaltung | In Progress | [PROJ-9](PROJ-9-ordner-und-labels.md) | 2026-03-12 |
|
| PROJ-9 | Ordner- & Label-Verwaltung | In Progress | [PROJ-9](PROJ-9-ordner-und-labels.md) | 2026-03-12 |
|
||||||
| PROJ-10 | Admin-Bereich: Nutzer- & Postfachverwaltung | In Progress | [PROJ-10](PROJ-10-admin-bereich.md) | 2026-03-12 |
|
| PROJ-10 | Admin-Bereich: Nutzer- & Postfachverwaltung | In Progress | [PROJ-10](PROJ-10-admin-bereich.md) | 2026-03-12 |
|
||||||
| PROJ-11 | Audit-Log & Compliance-Berichte | In Progress | [PROJ-11](PROJ-11-audit-log.md) | 2026-03-12 |
|
| PROJ-11 | Audit-Log & Compliance-Berichte | In Progress | [PROJ-11](PROJ-11-audit-log.md) | 2026-03-12 |
|
||||||
|
|||||||
@@ -1,8 +1,8 @@
|
|||||||
# PROJ-8: Automatischer IMAP-Sync (Cron-Job)
|
# PROJ-8: Automatischer IMAP-Sync (Cron-Job)
|
||||||
|
|
||||||
## Status: In Progress
|
## Status: Deployed
|
||||||
**Created:** 2026-03-12
|
**Created:** 2026-03-12
|
||||||
**Last Updated:** 2026-03-12
|
**Last Updated:** 2026-03-17
|
||||||
|
|
||||||
## Dependencies
|
## Dependencies
|
||||||
- Requires: PROJ-3 (IMAP-Import) – IMAP-Verbindungen müssen konfiguriert sein
|
- Requires: PROJ-3 (IMAP-Import) – IMAP-Verbindungen müssen konfiguriert sein
|
||||||
@@ -15,12 +15,12 @@
|
|||||||
- Als System möchte ich beim Sync nur neue E-Mails (seit letztem Sync) abholen, damit kein unnötiger Traffic entsteht.
|
- Als System möchte ich beim Sync nur neue E-Mails (seit letztem Sync) abholen, damit kein unnötiger Traffic entsteht.
|
||||||
|
|
||||||
## Acceptance Criteria
|
## Acceptance Criteria
|
||||||
- [ ] Sync-Intervall pro IMAP-Verbindung konfigurierbar (min. 5 Minuten, max. 24 Stunden)
|
- [x] Sync-Intervall pro IMAP-Verbindung konfigurierbar (min. 5 Minuten, max. 24 Stunden)
|
||||||
- [ ] IMAP UID-basierter inkrementeller Sync (nur neue E-Mails seit letztem Sync)
|
- [x] IMAP UID-basierter inkrementeller Sync (nur neue E-Mails seit letztem Sync)
|
||||||
- [ ] Admin-UI zeigt: letzter Sync, Status (Erfolg/Fehler), Anzahl importierter E-Mails
|
- [x] Admin-UI zeigt: letzter Sync, Status (Erfolg/Fehler), Anzahl importierter E-Mails
|
||||||
- [ ] Manueller "Sync jetzt"-Button im Admin-Bereich
|
- [x] Manueller "Sync jetzt"-Button im Admin-Bereich
|
||||||
- [ ] Bei Sync-Fehler: Retry mit exponential backoff (max. 3 Versuche)
|
- [x] Bei Sync-Fehler: Retry mit exponential backoff (max. 3 Versuche)
|
||||||
- [ ] Sync-Fehler nach allen Versuchen → Fehlermeldung im Admin-Dashboard
|
- [x] Sync-Fehler nach allen Versuchen → Fehlermeldung im Admin-Dashboard
|
||||||
|
|
||||||
## Edge Cases
|
## Edge Cases
|
||||||
- IMAP-Server temporär nicht erreichbar → Retry ohne Abbruch des gesamten Sync-Jobs
|
- IMAP-Server temporär nicht erreichbar → Retry ohne Abbruch des gesamten Sync-Jobs
|
||||||
@@ -29,9 +29,18 @@
|
|||||||
- Zeitzonenprobleme beim Datum-Vergleich → immer UTC intern verwenden
|
- Zeitzonenprobleme beim Datum-Vergleich → immer UTC intern verwenden
|
||||||
|
|
||||||
## Technical Requirements
|
## Technical Requirements
|
||||||
- Cron-Scheduler eingebettet (z.B. robfig/cron für Go)
|
- Kein externer Cron-Scheduler — `time.NewTicker(1 * time.Minute)` + Goroutine (YAGNI, keine neue Abhängigkeit)
|
||||||
- Sync-Status persistent in DB gespeichert (überlebt Server-Neustart)
|
- Sync-Status persistent in DB gespeichert (überlebt Server-Neustart)
|
||||||
|
|
||||||
|
## Implementation Notes (2026-03-17)
|
||||||
|
|
||||||
|
- `internal/imap/store.go`: Account-Struct um 7 Sync-Felder erweitert; `migrationSQL` mit `ADD COLUMN IF NOT EXISTS`; neue Methoden: `ListAll`, `UpdateSyncInterval`, `SetSyncRunning`, `UpdateSyncResult`; einheitliche `scanRow(scanner)`-Funktion mit eigenem Interface statt `pgx.Row`
|
||||||
|
- `internal/imap/scheduler.go`: Neues Paket; `Scheduler` mit `sync.Mutex`-geschützter `running`-Map; `Start/Stop/TriggerSync`; `runSyncWithRetry` mit 3 Versuchen (Backoffs: 1s, 60s, 300s); `doSync` delegiert `storeAndIndex` an den vorhandenen `Importer`
|
||||||
|
- `internal/api/server.go`: `imapScheduler`-Feld; `SetImap`-Signatur erweitert; neue Routen `POST /api/imap/{id}/sync` und `PATCH /api/imap/{id}`
|
||||||
|
- `src/lib/api.ts`: ImapAccount um 6 Felder erweitert; `triggerImapSync`, `updateImapInterval` hinzugefügt
|
||||||
|
- `src/app/imap/page.tsx`: Polling auch für `sync_running`; Dropdown für Sync-Intervall; "Sync jetzt"-Button; Sync-Status-Badge + letzter Sync-Zeitstempel pro Account-Card
|
||||||
|
- `cmd/archivmail/main.go`: `NewScheduler`, `Start`, `Stop`, `SetImap` mit Scheduler verdrahtet
|
||||||
|
|
||||||
---
|
---
|
||||||
## Tech Design (Solution Architect)
|
## Tech Design (Solution Architect)
|
||||||
|
|
||||||
|
|||||||
+92
-5
@@ -41,9 +41,10 @@ type Server struct {
|
|||||||
audlog *audit.Logger
|
audlog *audit.Logger
|
||||||
logger *slog.Logger
|
logger *slog.Logger
|
||||||
mux *http.ServeMux
|
mux *http.ServeMux
|
||||||
smtpDaemon *smtpd.Daemon
|
smtpDaemon *smtpd.Daemon
|
||||||
imapStore *imapstore.Store
|
imapStore *imapstore.Store
|
||||||
imapImporter *imapstore.Importer
|
imapImporter *imapstore.Importer
|
||||||
|
imapScheduler *imapstore.Scheduler
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetSMTPDaemon wires the SMTP daemon into the API server after construction.
|
// SetSMTPDaemon wires the SMTP daemon into the API server after construction.
|
||||||
@@ -51,10 +52,11 @@ func (s *Server) SetSMTPDaemon(d *smtpd.Daemon) {
|
|||||||
s.smtpDaemon = d
|
s.smtpDaemon = d
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetImap wires the IMAP store and importer into the API server after construction.
|
// SetImap wires the IMAP store, importer, and scheduler into the API server after construction.
|
||||||
func (s *Server) SetImap(store *imapstore.Store, importer *imapstore.Importer) {
|
func (s *Server) SetImap(store *imapstore.Store, importer *imapstore.Importer, scheduler *imapstore.Scheduler) {
|
||||||
s.imapStore = store
|
s.imapStore = store
|
||||||
s.imapImporter = importer
|
s.imapImporter = importer
|
||||||
|
s.imapScheduler = scheduler
|
||||||
}
|
}
|
||||||
|
|
||||||
// New creates and wires up a new API server.
|
// New creates and wires up a new API server.
|
||||||
@@ -110,9 +112,11 @@ func (s *Server) routes() {
|
|||||||
s.mux.HandleFunc("GET /api/imap", s.authMiddleware(s.handleListImap))
|
s.mux.HandleFunc("GET /api/imap", s.authMiddleware(s.handleListImap))
|
||||||
s.mux.HandleFunc("POST /api/imap", s.authMiddleware(s.handleCreateImap))
|
s.mux.HandleFunc("POST /api/imap", s.authMiddleware(s.handleCreateImap))
|
||||||
s.mux.HandleFunc("DELETE /api/imap/{id}", s.authMiddleware(s.handleDeleteImap))
|
s.mux.HandleFunc("DELETE /api/imap/{id}", s.authMiddleware(s.handleDeleteImap))
|
||||||
|
s.mux.HandleFunc("PATCH /api/imap/{id}", s.authMiddleware(s.handleUpdateImapInterval))
|
||||||
s.mux.HandleFunc("POST /api/imap/test", s.authMiddleware(s.handleTestImap))
|
s.mux.HandleFunc("POST /api/imap/test", s.authMiddleware(s.handleTestImap))
|
||||||
s.mux.HandleFunc("POST /api/imap/{id}/import", s.authMiddleware(s.handleStartImport))
|
s.mux.HandleFunc("POST /api/imap/{id}/import", s.authMiddleware(s.handleStartImport))
|
||||||
s.mux.HandleFunc("GET /api/imap/{id}/progress", s.authMiddleware(s.handleImapProgress))
|
s.mux.HandleFunc("GET /api/imap/{id}/progress", s.authMiddleware(s.handleImapProgress))
|
||||||
|
s.mux.HandleFunc("POST /api/imap/{id}/sync", s.authMiddleware(s.handleSyncNow))
|
||||||
}
|
}
|
||||||
|
|
||||||
// ServeHTTP implements http.Handler.
|
// ServeHTTP implements http.Handler.
|
||||||
@@ -1205,6 +1209,89 @@ func (s *Server) handleImapProgress(w http.ResponseWriter, r *http.Request) {
|
|||||||
writeJSON(w, http.StatusOK, acc)
|
writeJSON(w, http.StatusOK, acc)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// handleSyncNow triggers an immediate incremental sync for a single IMAP account.
|
||||||
|
func (s *Server) handleSyncNow(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if s.imapStore == nil || s.imapScheduler == nil {
|
||||||
|
writeError(w, http.StatusServiceUnavailable, "IMAP not configured")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
idStr := r.PathValue("id")
|
||||||
|
id, err := strconv.ParseInt(idStr, 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
writeError(w, http.StatusBadRequest, "invalid id")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
acc, err := s.imapStore.Get(r.Context(), id)
|
||||||
|
if err != nil {
|
||||||
|
writeError(w, http.StatusNotFound, "account not found")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
sess := sessionFromCtx(r.Context())
|
||||||
|
if acc.Owner != sess.Username && sess.Role != userstore.RoleAdmin {
|
||||||
|
writeError(w, http.StatusForbidden, "access denied")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := s.imapScheduler.TriggerSync(r.Context(), id); err != nil {
|
||||||
|
writeError(w, http.StatusConflict, err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Return the account with the updated sync_running flag reflected immediately.
|
||||||
|
acc.SyncRunning = true
|
||||||
|
writeJSON(w, http.StatusOK, acc)
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleUpdateImapInterval updates the automatic sync interval for an IMAP account.
|
||||||
|
func (s *Server) handleUpdateImapInterval(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if s.imapStore == nil {
|
||||||
|
writeError(w, http.StatusServiceUnavailable, "IMAP not configured")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
idStr := r.PathValue("id")
|
||||||
|
id, err := strconv.ParseInt(idStr, 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
writeError(w, http.StatusBadRequest, "invalid id")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
acc, err := s.imapStore.Get(r.Context(), id)
|
||||||
|
if err != nil {
|
||||||
|
writeError(w, http.StatusNotFound, "account not found")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
sess := sessionFromCtx(r.Context())
|
||||||
|
if acc.Owner != sess.Username && sess.Role != userstore.RoleAdmin {
|
||||||
|
writeError(w, http.StatusForbidden, "access denied")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var req struct {
|
||||||
|
SyncIntervalMin int `json:"sync_interval_min"`
|
||||||
|
}
|
||||||
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||||
|
writeError(w, http.StatusBadRequest, "invalid request body")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// 0 = disabled; otherwise must be between 5 and 1440 minutes.
|
||||||
|
if req.SyncIntervalMin != 0 && (req.SyncIntervalMin < 5 || req.SyncIntervalMin > 1440) {
|
||||||
|
writeError(w, http.StatusBadRequest, "sync_interval_min must be 0 (disabled) or between 5 and 1440")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := s.imapStore.UpdateSyncInterval(r.Context(), id, req.SyncIntervalMin); err != nil {
|
||||||
|
writeError(w, http.StatusInternalServerError, "failed to update sync interval")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
acc.SyncIntervalMin = req.SyncIntervalMin
|
||||||
|
writeJSON(w, http.StatusOK, acc)
|
||||||
|
}
|
||||||
|
|
||||||
// ── System stats handler ─────────────────────────────────────────────────
|
// ── System stats handler ─────────────────────────────────────────────────
|
||||||
|
|
||||||
type diskStat struct {
|
type diskStat struct {
|
||||||
|
|||||||
@@ -0,0 +1,384 @@
|
|||||||
|
// Package imap provides IMAP account management, import, and automatic sync.
|
||||||
|
package imap
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"log/slog"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
imapv2 "github.com/emersion/go-imap/v2"
|
||||||
|
"github.com/emersion/go-imap/v2/imapclient"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Scheduler runs automatic IMAP syncs for all configured accounts.
|
||||||
|
// It checks every minute whether a sync is due for a given account.
|
||||||
|
type Scheduler struct {
|
||||||
|
store *Store
|
||||||
|
importer *Importer
|
||||||
|
logger *slog.Logger
|
||||||
|
|
||||||
|
mu sync.Mutex
|
||||||
|
running map[int64]bool // in-memory guard against concurrent syncs
|
||||||
|
|
||||||
|
cancel context.CancelFunc
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewScheduler creates a Scheduler wired to the given store and importer.
|
||||||
|
func NewScheduler(store *Store, importer *Importer, logger *slog.Logger) *Scheduler {
|
||||||
|
return &Scheduler{
|
||||||
|
store: store,
|
||||||
|
importer: importer,
|
||||||
|
logger: logger,
|
||||||
|
running: make(map[int64]bool),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start launches the background scheduler goroutine.
|
||||||
|
// Call Stop to shut it down gracefully.
|
||||||
|
func (s *Scheduler) Start() {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
s.cancel = cancel
|
||||||
|
|
||||||
|
go s.loop(ctx)
|
||||||
|
s.logger.Info("imap scheduler: started")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop signals the scheduler goroutine to exit.
|
||||||
|
func (s *Scheduler) Stop() {
|
||||||
|
if s.cancel != nil {
|
||||||
|
s.cancel()
|
||||||
|
}
|
||||||
|
s.logger.Info("imap scheduler: stopped")
|
||||||
|
}
|
||||||
|
|
||||||
|
// TriggerSync manually initiates an incremental sync for a single account.
|
||||||
|
// Returns immediately; the sync runs in a background goroutine.
|
||||||
|
// Returns an error if a sync is already running for this account.
|
||||||
|
func (s *Scheduler) TriggerSync(ctx context.Context, accountID int64) error {
|
||||||
|
s.mu.Lock()
|
||||||
|
if s.running[accountID] {
|
||||||
|
s.mu.Unlock()
|
||||||
|
return fmt.Errorf("imap scheduler: sync already running for account %d", accountID)
|
||||||
|
}
|
||||||
|
s.running[accountID] = true
|
||||||
|
s.mu.Unlock()
|
||||||
|
|
||||||
|
go s.runSyncWithRetry(context.Background(), accountID)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// loop is the main scheduler goroutine — ticks every minute.
|
||||||
|
func (s *Scheduler) loop(ctx context.Context) {
|
||||||
|
ticker := time.NewTicker(1 * time.Minute)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case <-ticker.C:
|
||||||
|
s.checkAccounts(ctx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// checkAccounts loads all accounts and starts syncs for those that are due.
|
||||||
|
func (s *Scheduler) checkAccounts(ctx context.Context) {
|
||||||
|
accounts, err := s.store.ListAll(ctx)
|
||||||
|
if err != nil {
|
||||||
|
s.logger.Error("imap scheduler: list accounts failed", "err", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
now := time.Now()
|
||||||
|
for _, acc := range accounts {
|
||||||
|
if acc.SyncIntervalMin <= 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
s.mu.Lock()
|
||||||
|
alreadyRunning := s.running[acc.ID]
|
||||||
|
s.mu.Unlock()
|
||||||
|
|
||||||
|
if alreadyRunning || acc.SyncRunning {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
interval := time.Duration(acc.SyncIntervalMin) * time.Minute
|
||||||
|
var lastSync time.Time
|
||||||
|
if acc.LastSyncAt != nil {
|
||||||
|
lastSync = *acc.LastSyncAt
|
||||||
|
}
|
||||||
|
|
||||||
|
if now.Sub(lastSync) >= interval {
|
||||||
|
s.mu.Lock()
|
||||||
|
s.running[acc.ID] = true
|
||||||
|
s.mu.Unlock()
|
||||||
|
|
||||||
|
s.logger.Info("imap scheduler: starting scheduled sync",
|
||||||
|
"account_id", acc.ID, "name", acc.Name)
|
||||||
|
go s.runSyncWithRetry(context.Background(), acc.ID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// syncBackoff defines wait durations between retry attempts.
|
||||||
|
var syncBackoff = []time.Duration{1 * time.Second, 60 * time.Second, 300 * time.Second}
|
||||||
|
|
||||||
|
// runSyncWithRetry retries doSync up to 3 times with backoff.
|
||||||
|
// It always resets the running flag when it returns.
|
||||||
|
func (s *Scheduler) runSyncWithRetry(ctx context.Context, accountID int64) {
|
||||||
|
defer func() {
|
||||||
|
s.mu.Lock()
|
||||||
|
delete(s.running, accountID)
|
||||||
|
s.mu.Unlock()
|
||||||
|
_ = s.store.SetSyncRunning(ctx, accountID, false)
|
||||||
|
}()
|
||||||
|
|
||||||
|
if err := s.store.SetSyncRunning(ctx, accountID, true); err != nil {
|
||||||
|
s.logger.Error("imap scheduler: set sync running failed",
|
||||||
|
"account_id", accountID, "err", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
count int
|
||||||
|
lastUID uint32
|
||||||
|
lastErr error
|
||||||
|
)
|
||||||
|
|
||||||
|
for attempt := 0; attempt < 3; attempt++ {
|
||||||
|
if attempt > 0 {
|
||||||
|
backoff := syncBackoff[attempt-1]
|
||||||
|
s.logger.Warn("imap scheduler: retrying sync",
|
||||||
|
"account_id", accountID, "attempt", attempt+1, "backoff", backoff)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case <-time.After(backoff):
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
count, lastUID, lastErr = s.doSync(ctx, accountID)
|
||||||
|
if lastErr == nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
s.logger.Warn("imap scheduler: sync attempt failed",
|
||||||
|
"account_id", accountID, "attempt", attempt+1, "err", lastErr)
|
||||||
|
}
|
||||||
|
|
||||||
|
if lastErr != nil {
|
||||||
|
s.logger.Error("imap scheduler: sync failed after all retries",
|
||||||
|
"account_id", accountID, "err", lastErr)
|
||||||
|
_ = s.store.UpdateSyncResult(ctx, accountID, "error", lastErr.Error(), 0, lastUID)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
s.logger.Info("imap scheduler: sync completed",
|
||||||
|
"account_id", accountID, "imported", count, "last_uid", lastUID)
|
||||||
|
_ = s.store.UpdateSyncResult(ctx, accountID, "ok", "", count, lastUID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// doSync performs an incremental IMAP sync for one account.
|
||||||
|
// Returns (importedCount, maxUID, error).
|
||||||
|
func (s *Scheduler) doSync(ctx context.Context, accountID int64) (int, uint32, error) {
|
||||||
|
acc, err := s.store.Get(ctx, accountID)
|
||||||
|
if err != nil {
|
||||||
|
return 0, 0, fmt.Errorf("imap scheduler: get account: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
password, err := s.store.GetPassword(ctx, accountID)
|
||||||
|
if err != nil {
|
||||||
|
return 0, 0, fmt.Errorf("imap scheduler: get password: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
c, err := Connect(acc.Host, acc.Port, acc.TLS)
|
||||||
|
if err != nil {
|
||||||
|
return 0, 0, fmt.Errorf("imap scheduler: connect: %w", err)
|
||||||
|
}
|
||||||
|
defer c.Close()
|
||||||
|
|
||||||
|
if err := c.Login(acc.Username, password).Wait(); err != nil {
|
||||||
|
return 0, 0, fmt.Errorf("imap scheduler: login: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
folders, err := ListFolders(c)
|
||||||
|
if err != nil {
|
||||||
|
return 0, 0, fmt.Errorf("imap scheduler: list folders: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
excluded := make(map[string]bool, len(acc.ExcludedFolders))
|
||||||
|
for _, f := range acc.ExcludedFolders {
|
||||||
|
excluded[f] = true
|
||||||
|
}
|
||||||
|
|
||||||
|
log := s.logger.With("component", "imap-scheduler", "account_id", accountID)
|
||||||
|
|
||||||
|
var (
|
||||||
|
totalImported int
|
||||||
|
maxUID uint32 = acc.LastUID
|
||||||
|
)
|
||||||
|
|
||||||
|
for _, folder := range folders {
|
||||||
|
if excluded[folder.Name] {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
count, folderMaxUID, err := s.syncFolder(ctx, c, acc, folder.Name, log)
|
||||||
|
if err != nil {
|
||||||
|
log.Warn("sync folder failed, continuing", "folder", folder.Name, "err", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
totalImported += count
|
||||||
|
if folderMaxUID > maxUID {
|
||||||
|
maxUID = folderMaxUID
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return totalImported, maxUID, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// syncFolder syncs new messages from a single IMAP folder.
|
||||||
|
func (s *Scheduler) syncFolder(
|
||||||
|
ctx context.Context,
|
||||||
|
c *imapclient.Client,
|
||||||
|
acc *Account,
|
||||||
|
folder string,
|
||||||
|
log *slog.Logger,
|
||||||
|
) (int, uint32, error) {
|
||||||
|
if _, err := c.Select(folder, nil).Wait(); err != nil {
|
||||||
|
return 0, 0, fmt.Errorf("imap scheduler: select %q: %w", folder, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var uids []imapv2.UID
|
||||||
|
|
||||||
|
if acc.LastUID > 0 {
|
||||||
|
// Incremental: only messages with UID > lastUID.
|
||||||
|
minUID := imapv2.UID(acc.LastUID + 1)
|
||||||
|
criteria := &imapv2.SearchCriteria{
|
||||||
|
UID: []imapv2.UIDSet{
|
||||||
|
{imapv2.UIDRange{Start: minUID, Stop: 0}},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
searchData, err := c.UIDSearch(criteria, nil).Wait()
|
||||||
|
if err != nil {
|
||||||
|
return 0, 0, fmt.Errorf("imap scheduler: uid search incremental %q: %w", folder, err)
|
||||||
|
}
|
||||||
|
uids = searchData.AllUIDs()
|
||||||
|
} else {
|
||||||
|
// First sync: fetch everything.
|
||||||
|
searchData, err := c.UIDSearch(&imapv2.SearchCriteria{}, nil).Wait()
|
||||||
|
if err != nil {
|
||||||
|
return 0, 0, fmt.Errorf("imap scheduler: uid search full %q: %w", folder, err)
|
||||||
|
}
|
||||||
|
uids = searchData.AllUIDs()
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(uids) == 0 {
|
||||||
|
return 0, 0, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Info("syncing folder", "folder", folder, "new_messages", len(uids))
|
||||||
|
|
||||||
|
var (
|
||||||
|
imported int
|
||||||
|
maxUID uint32
|
||||||
|
)
|
||||||
|
|
||||||
|
for i := 0; i < len(uids); i += batchSize {
|
||||||
|
end := i + batchSize
|
||||||
|
if end > len(uids) {
|
||||||
|
end = len(uids)
|
||||||
|
}
|
||||||
|
batch := uids[i:end]
|
||||||
|
|
||||||
|
count, batchMaxUID, err := s.fetchSyncBatch(c, batch, log)
|
||||||
|
if err != nil {
|
||||||
|
log.Warn("imap scheduler: batch error, continuing",
|
||||||
|
"folder", folder, "offset", i, "err", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
imported += count
|
||||||
|
if batchMaxUID > maxUID {
|
||||||
|
maxUID = batchMaxUID
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return imported, maxUID, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// fetchSyncBatch fetches and stores a batch of messages by UID.
|
||||||
|
// The maximum UID is derived from the input uid slice (already known from SEARCH).
|
||||||
|
// Returns (importedCount, maxUID, error).
|
||||||
|
func (s *Scheduler) fetchSyncBatch(
|
||||||
|
c *imapclient.Client,
|
||||||
|
uids []imapv2.UID,
|
||||||
|
log *slog.Logger,
|
||||||
|
) (int, uint32, error) {
|
||||||
|
if len(uids) == 0 {
|
||||||
|
return 0, 0, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Compute the max UID from the search result — no need to parse it from FETCH.
|
||||||
|
var maxUID uint32
|
||||||
|
for _, u := range uids {
|
||||||
|
if uint32(u) > maxUID {
|
||||||
|
maxUID = uint32(u)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fetchOptions := &imapv2.FetchOptions{
|
||||||
|
BodySection: []*imapv2.FetchItemBodySection{{}},
|
||||||
|
}
|
||||||
|
|
||||||
|
seqSet := imapv2.UIDSetNum(uids...)
|
||||||
|
fetchCmd := c.Fetch(seqSet, fetchOptions)
|
||||||
|
|
||||||
|
imported := 0
|
||||||
|
|
||||||
|
for {
|
||||||
|
msg := fetchCmd.Next()
|
||||||
|
if msg == nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
for {
|
||||||
|
item := msg.Next()
|
||||||
|
if item == nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
body, ok := item.(imapclient.FetchItemDataBodySection)
|
||||||
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
raw, err := io.ReadAll(body.Literal)
|
||||||
|
if err != nil {
|
||||||
|
log.Warn("imap scheduler: read body failed", "err", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(raw) > 0 {
|
||||||
|
if err := s.importer.storeAndIndex(raw, log); err != nil {
|
||||||
|
log.Warn("imap scheduler: store/index failed", "err", err)
|
||||||
|
} else {
|
||||||
|
imported++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := fetchCmd.Close(); err != nil {
|
||||||
|
return imported, maxUID, fmt.Errorf("imap scheduler: fetch close: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return imported, maxUID, nil
|
||||||
|
}
|
||||||
+118
-28
@@ -31,6 +31,15 @@ type Account struct {
|
|||||||
ProgressCurrent int `json:"progress_current"`
|
ProgressCurrent int `json:"progress_current"`
|
||||||
ProgressTotal int `json:"progress_total"`
|
ProgressTotal int `json:"progress_total"`
|
||||||
CreatedAt time.Time `json:"created_at"`
|
CreatedAt time.Time `json:"created_at"`
|
||||||
|
|
||||||
|
// PROJ-8: Auto-sync fields
|
||||||
|
SyncIntervalMin int `json:"sync_interval_min"`
|
||||||
|
LastSyncAt *time.Time `json:"last_sync_at,omitempty"`
|
||||||
|
LastSyncCount int `json:"last_sync_count"`
|
||||||
|
LastUID uint32 `json:"last_uid"`
|
||||||
|
SyncRunning bool `json:"sync_running"`
|
||||||
|
SyncStatus string `json:"sync_status"`
|
||||||
|
SyncErrorMsg string `json:"sync_error_msg"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// Store manages IMAP account persistence in PostgreSQL.
|
// Store manages IMAP account persistence in PostgreSQL.
|
||||||
@@ -62,6 +71,17 @@ CREATE TABLE IF NOT EXISTS imap_accounts (
|
|||||||
CREATE INDEX IF NOT EXISTS idx_imap_accounts_owner ON imap_accounts (owner);
|
CREATE INDEX IF NOT EXISTS idx_imap_accounts_owner ON imap_accounts (owner);
|
||||||
`
|
`
|
||||||
|
|
||||||
|
// migrationSQL adds the PROJ-8 sync columns if they do not yet exist.
|
||||||
|
const migrationSQL = `
|
||||||
|
ALTER TABLE imap_accounts ADD COLUMN IF NOT EXISTS sync_interval_min INTEGER NOT NULL DEFAULT 0;
|
||||||
|
ALTER TABLE imap_accounts ADD COLUMN IF NOT EXISTS last_sync_at TIMESTAMPTZ;
|
||||||
|
ALTER TABLE imap_accounts ADD COLUMN IF NOT EXISTS last_sync_count INTEGER NOT NULL DEFAULT 0;
|
||||||
|
ALTER TABLE imap_accounts ADD COLUMN IF NOT EXISTS last_uid BIGINT NOT NULL DEFAULT 0;
|
||||||
|
ALTER TABLE imap_accounts ADD COLUMN IF NOT EXISTS sync_running BOOLEAN NOT NULL DEFAULT FALSE;
|
||||||
|
ALTER TABLE imap_accounts ADD COLUMN IF NOT EXISTS sync_status TEXT NOT NULL DEFAULT '';
|
||||||
|
ALTER TABLE imap_accounts ADD COLUMN IF NOT EXISTS sync_error_msg TEXT NOT NULL DEFAULT '';
|
||||||
|
`
|
||||||
|
|
||||||
// New creates a new Store, connects to PostgreSQL, and runs the migration.
|
// New creates a new Store, connects to PostgreSQL, and runs the migration.
|
||||||
func New(dsn, secret string) (*Store, error) {
|
func New(dsn, secret string) (*Store, error) {
|
||||||
pool, err := pgxpool.New(context.Background(), dsn)
|
pool, err := pgxpool.New(context.Background(), dsn)
|
||||||
@@ -71,7 +91,12 @@ func New(dsn, secret string) (*Store, error) {
|
|||||||
|
|
||||||
if _, err := pool.Exec(context.Background(), createTableSQL); err != nil {
|
if _, err := pool.Exec(context.Background(), createTableSQL); err != nil {
|
||||||
pool.Close()
|
pool.Close()
|
||||||
return nil, fmt.Errorf("imap store: migrate: %w", err)
|
return nil, fmt.Errorf("imap store: migrate create: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := pool.Exec(context.Background(), migrationSQL); err != nil {
|
||||||
|
pool.Close()
|
||||||
|
return nil, fmt.Errorf("imap store: migrate alter: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
key := sha256.Sum256([]byte(secret))
|
key := sha256.Sum256([]byte(secret))
|
||||||
@@ -106,23 +131,43 @@ func (s *Store) Create(ctx context.Context, acc Account, password string) (*Acco
|
|||||||
return &acc, nil
|
return &acc, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// selectColumns is the canonical column list used in all SELECT statements.
|
||||||
|
// Column order must match the Scan call in scanRow.
|
||||||
|
// Leading and trailing spaces are intentional for correct SQL concatenation.
|
||||||
|
const selectColumns = ` id, owner, name, host, port, tls, username, excluded_folders,
|
||||||
|
status, error_msg, last_import_at, last_import_count,
|
||||||
|
progress_current, progress_total, created_at,
|
||||||
|
sync_interval_min, last_sync_at, last_sync_count, last_uid,
|
||||||
|
sync_running, sync_status, sync_error_msg `
|
||||||
|
|
||||||
|
// scanner abstracts pgx.Row and pgx.Rows — both expose Scan(...any) error.
|
||||||
|
type scanner interface {
|
||||||
|
Scan(dest ...any) error
|
||||||
|
}
|
||||||
|
|
||||||
|
func scanRow(row scanner) (Account, error) {
|
||||||
|
var a Account
|
||||||
|
err := row.Scan(
|
||||||
|
&a.ID, &a.Owner, &a.Name, &a.Host, &a.Port, &a.TLS, &a.Username,
|
||||||
|
&a.ExcludedFolders, &a.Status, &a.ErrorMsg, &a.LastImportAt,
|
||||||
|
&a.LastImportCount, &a.ProgressCurrent, &a.ProgressTotal, &a.CreatedAt,
|
||||||
|
&a.SyncIntervalMin, &a.LastSyncAt, &a.LastSyncCount, &a.LastUID,
|
||||||
|
&a.SyncRunning, &a.SyncStatus, &a.SyncErrorMsg,
|
||||||
|
)
|
||||||
|
return a, err
|
||||||
|
}
|
||||||
|
|
||||||
// List returns IMAP accounts. Admins see all accounts; regular users see only their own.
|
// List returns IMAP accounts. Admins see all accounts; regular users see only their own.
|
||||||
func (s *Store) List(ctx context.Context, owner string, isAdmin bool) ([]Account, error) {
|
func (s *Store) List(ctx context.Context, owner string, isAdmin bool) ([]Account, error) {
|
||||||
var rows pgx.Rows
|
var rows pgx.Rows
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
|
q := `SELECT` + selectColumns + `FROM imap_accounts`
|
||||||
|
|
||||||
if isAdmin {
|
if isAdmin {
|
||||||
rows, err = s.pool.Query(ctx, `
|
rows, err = s.pool.Query(ctx, q+` ORDER BY id`)
|
||||||
SELECT id, owner, name, host, port, tls, username, excluded_folders,
|
|
||||||
status, error_msg, last_import_at, last_import_count,
|
|
||||||
progress_current, progress_total, created_at
|
|
||||||
FROM imap_accounts ORDER BY id`)
|
|
||||||
} else {
|
} else {
|
||||||
rows, err = s.pool.Query(ctx, `
|
rows, err = s.pool.Query(ctx, q+` WHERE owner = $1 ORDER BY id`, owner)
|
||||||
SELECT id, owner, name, host, port, tls, username, excluded_folders,
|
|
||||||
status, error_msg, last_import_at, last_import_count,
|
|
||||||
progress_current, progress_total, created_at
|
|
||||||
FROM imap_accounts WHERE owner = $1 ORDER BY id`, owner)
|
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("imap store: list: %w", err)
|
return nil, fmt.Errorf("imap store: list: %w", err)
|
||||||
@@ -131,12 +176,28 @@ func (s *Store) List(ctx context.Context, owner string, isAdmin bool) ([]Account
|
|||||||
|
|
||||||
var accounts []Account
|
var accounts []Account
|
||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
var a Account
|
a, err := scanRow(rows)
|
||||||
if err := rows.Scan(
|
if err != nil {
|
||||||
&a.ID, &a.Owner, &a.Name, &a.Host, &a.Port, &a.TLS, &a.Username,
|
return nil, fmt.Errorf("imap store: scan: %w", err)
|
||||||
&a.ExcludedFolders, &a.Status, &a.ErrorMsg, &a.LastImportAt,
|
}
|
||||||
&a.LastImportCount, &a.ProgressCurrent, &a.ProgressTotal, &a.CreatedAt,
|
accounts = append(accounts, a)
|
||||||
); err != nil {
|
}
|
||||||
|
return accounts, rows.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
// ListAll returns all IMAP accounts regardless of owner — used by the scheduler.
|
||||||
|
func (s *Store) ListAll(ctx context.Context) ([]Account, error) {
|
||||||
|
rows, err := s.pool.Query(ctx,
|
||||||
|
`SELECT`+selectColumns+`FROM imap_accounts ORDER BY id`)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("imap store: list all: %w", err)
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
var accounts []Account
|
||||||
|
for rows.Next() {
|
||||||
|
a, err := scanRow(rows)
|
||||||
|
if err != nil {
|
||||||
return nil, fmt.Errorf("imap store: scan: %w", err)
|
return nil, fmt.Errorf("imap store: scan: %w", err)
|
||||||
}
|
}
|
||||||
accounts = append(accounts, a)
|
accounts = append(accounts, a)
|
||||||
@@ -146,17 +207,9 @@ func (s *Store) List(ctx context.Context, owner string, isAdmin bool) ([]Account
|
|||||||
|
|
||||||
// Get returns a single IMAP account by ID.
|
// Get returns a single IMAP account by ID.
|
||||||
func (s *Store) Get(ctx context.Context, id int64) (*Account, error) {
|
func (s *Store) Get(ctx context.Context, id int64) (*Account, error) {
|
||||||
var a Account
|
row := s.pool.QueryRow(ctx,
|
||||||
err := s.pool.QueryRow(ctx, `
|
`SELECT`+selectColumns+`FROM imap_accounts WHERE id = $1`, id)
|
||||||
SELECT id, owner, name, host, port, tls, username, excluded_folders,
|
a, err := scanRow(row)
|
||||||
status, error_msg, last_import_at, last_import_count,
|
|
||||||
progress_current, progress_total, created_at
|
|
||||||
FROM imap_accounts WHERE id = $1`, id,
|
|
||||||
).Scan(
|
|
||||||
&a.ID, &a.Owner, &a.Name, &a.Host, &a.Port, &a.TLS, &a.Username,
|
|
||||||
&a.ExcludedFolders, &a.Status, &a.ErrorMsg, &a.LastImportAt,
|
|
||||||
&a.LastImportCount, &a.ProgressCurrent, &a.ProgressTotal, &a.CreatedAt,
|
|
||||||
)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("imap store: get %d: %w", id, err)
|
return nil, fmt.Errorf("imap store: get %d: %w", id, err)
|
||||||
}
|
}
|
||||||
@@ -219,6 +272,43 @@ func (s *Store) UpdateDone(ctx context.Context, id int64, count int) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// UpdateSyncInterval sets the automatic sync interval for an account.
|
||||||
|
// intervalMin == 0 disables automatic sync.
|
||||||
|
func (s *Store) UpdateSyncInterval(ctx context.Context, id int64, intervalMin int) error {
|
||||||
|
_, err := s.pool.Exec(ctx,
|
||||||
|
`UPDATE imap_accounts SET sync_interval_min = $1 WHERE id = $2`,
|
||||||
|
intervalMin, id)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("imap store: update sync interval: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetSyncRunning marks whether a background sync is currently active for an account.
|
||||||
|
func (s *Store) SetSyncRunning(ctx context.Context, id int64, running bool) error {
|
||||||
|
_, err := s.pool.Exec(ctx,
|
||||||
|
`UPDATE imap_accounts SET sync_running = $1 WHERE id = $2`,
|
||||||
|
running, id)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("imap store: set sync running: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// UpdateSyncResult persists the outcome of a completed sync run.
|
||||||
|
func (s *Store) UpdateSyncResult(ctx context.Context, id int64, status, errMsg string, count int, lastUID uint32) error {
|
||||||
|
_, err := s.pool.Exec(ctx, `
|
||||||
|
UPDATE imap_accounts
|
||||||
|
SET sync_status = $1, sync_error_msg = $2, last_sync_count = $3,
|
||||||
|
last_uid = $4, last_sync_at = now(), sync_running = FALSE
|
||||||
|
WHERE id = $5`,
|
||||||
|
status, errMsg, count, lastUID, id)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("imap store: update sync result: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// encryptPassword encrypts a plaintext password using AES-256-GCM.
|
// encryptPassword encrypts a plaintext password using AES-256-GCM.
|
||||||
func encryptPassword(plaintext string, key [32]byte) ([]byte, error) {
|
func encryptPassword(plaintext string, key [32]byte) ([]byte, error) {
|
||||||
block, err := aes.NewCipher(key[:])
|
block, err := aes.NewCipher(key[:])
|
||||||
|
|||||||
+95
-7
@@ -10,6 +10,8 @@ import {
|
|||||||
testImapConnection,
|
testImapConnection,
|
||||||
startImapImport,
|
startImapImport,
|
||||||
getImapProgress,
|
getImapProgress,
|
||||||
|
triggerImapSync,
|
||||||
|
updateImapInterval,
|
||||||
type ImapAccount,
|
type ImapAccount,
|
||||||
type ImapFolder,
|
type ImapFolder,
|
||||||
} from "@/lib/api";
|
} from "@/lib/api";
|
||||||
@@ -83,17 +85,18 @@ export default function ImapPage() {
|
|||||||
if (user) loadAccounts();
|
if (user) loadAccounts();
|
||||||
}, [user, loadAccounts]);
|
}, [user, loadAccounts]);
|
||||||
|
|
||||||
// Start polling for running accounts
|
// Start polling for running accounts (import or sync)
|
||||||
useEffect(() => {
|
useEffect(() => {
|
||||||
for (const acc of accounts) {
|
for (const acc of accounts) {
|
||||||
if (acc.status === "running" && !pollingRefs.current.has(acc.id)) {
|
const isActive = acc.status === "running" || acc.sync_running;
|
||||||
|
if (isActive && !pollingRefs.current.has(acc.id)) {
|
||||||
const interval = setInterval(async () => {
|
const interval = setInterval(async () => {
|
||||||
try {
|
try {
|
||||||
const updated = await getImapProgress(acc.id);
|
const updated = await getImapProgress(acc.id);
|
||||||
setAccounts((prev) =>
|
setAccounts((prev) =>
|
||||||
prev.map((a) => (a.id === updated.id ? updated : a))
|
prev.map((a) => (a.id === updated.id ? updated : a))
|
||||||
);
|
);
|
||||||
if (updated.status !== "running") {
|
if (updated.status !== "running" && !updated.sync_running) {
|
||||||
clearInterval(pollingRefs.current.get(acc.id)!);
|
clearInterval(pollingRefs.current.get(acc.id)!);
|
||||||
pollingRefs.current.delete(acc.id);
|
pollingRefs.current.delete(acc.id);
|
||||||
}
|
}
|
||||||
@@ -105,10 +108,10 @@ export default function ImapPage() {
|
|||||||
pollingRefs.current.set(acc.id, interval);
|
pollingRefs.current.set(acc.id, interval);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Cleanup intervals for accounts that are no longer running
|
// Cleanup intervals for accounts that are no longer active
|
||||||
for (const [id, interval] of pollingRefs.current) {
|
for (const [id, interval] of pollingRefs.current) {
|
||||||
const acc = accounts.find((a) => a.id === id);
|
const acc = accounts.find((a) => a.id === id);
|
||||||
if (!acc || acc.status !== "running") {
|
if (!acc || (acc.status !== "running" && !acc.sync_running)) {
|
||||||
clearInterval(interval);
|
clearInterval(interval);
|
||||||
pollingRefs.current.delete(id);
|
pollingRefs.current.delete(id);
|
||||||
}
|
}
|
||||||
@@ -206,6 +209,25 @@ export default function ImapPage() {
|
|||||||
setDeleteConfirm(null);
|
setDeleteConfirm(null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async function handleSyncNow(id: number) {
|
||||||
|
try {
|
||||||
|
const updated = await triggerImapSync(id);
|
||||||
|
setAccounts((prev) => prev.map((a) => (a.id === updated.id ? updated : a)));
|
||||||
|
} catch {
|
||||||
|
// ignore — conflict means sync already running
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async function handleIntervalChange(id: number, value: string) {
|
||||||
|
const intervalMin = parseInt(value, 10);
|
||||||
|
try {
|
||||||
|
const updated = await updateImapInterval(id, intervalMin);
|
||||||
|
setAccounts((prev) => prev.map((a) => (a.id === updated.id ? updated : a)));
|
||||||
|
} catch {
|
||||||
|
// ignore
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
function toggleExcluded(folderName: string) {
|
function toggleExcluded(folderName: string) {
|
||||||
setExcludedFolders((prev) => {
|
setExcludedFolders((prev) => {
|
||||||
const next = new Set(prev);
|
const next = new Set(prev);
|
||||||
@@ -229,6 +251,20 @@ export default function ImapPage() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function syncBadge(acc: ImapAccount) {
|
||||||
|
if (acc.sync_running) {
|
||||||
|
return <Badge className="bg-blue-500 text-white">Sync laeuft...</Badge>;
|
||||||
|
}
|
||||||
|
if (!acc.sync_status) return null;
|
||||||
|
if (acc.sync_status === "ok") {
|
||||||
|
return <Badge className="bg-green-600 text-white">Sync OK</Badge>;
|
||||||
|
}
|
||||||
|
if (acc.sync_status === "error") {
|
||||||
|
return <Badge variant="destructive">Sync Fehler</Badge>;
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
return (
|
return (
|
||||||
<div className="min-h-screen">
|
<div className="min-h-screen">
|
||||||
<Navbar username={user?.username ?? ""} role={user?.role ?? ""} />
|
<Navbar username={user?.username ?? ""} role={user?.role ?? ""} />
|
||||||
@@ -305,13 +341,57 @@ export default function ImapPage() {
|
|||||||
</p>
|
</p>
|
||||||
)}
|
)}
|
||||||
|
|
||||||
|
{/* PROJ-8: Sync status */}
|
||||||
|
{acc.last_sync_at && (
|
||||||
|
<div className="flex items-center gap-2 mb-3">
|
||||||
|
<p className="text-sm text-muted-foreground">
|
||||||
|
Letzter Sync:{" "}
|
||||||
|
{new Date(acc.last_sync_at).toLocaleString("de-DE")} (
|
||||||
|
{acc.last_sync_count} neu)
|
||||||
|
</p>
|
||||||
|
{syncBadge(acc)}
|
||||||
|
</div>
|
||||||
|
)}
|
||||||
|
|
||||||
|
{acc.sync_running && !acc.last_sync_at && syncBadge(acc)}
|
||||||
|
|
||||||
|
{acc.sync_error_msg && acc.sync_status === "error" && (
|
||||||
|
<p className="mb-3 text-sm text-destructive">
|
||||||
|
Sync-Fehler: {acc.sync_error_msg}
|
||||||
|
</p>
|
||||||
|
)}
|
||||||
|
|
||||||
{acc.excluded_folders && acc.excluded_folders.length > 0 && (
|
{acc.excluded_folders && acc.excluded_folders.length > 0 && (
|
||||||
<p className="text-xs text-muted-foreground mb-3">
|
<p className="text-xs text-muted-foreground mb-3">
|
||||||
Ausgeschlossene Ordner: {acc.excluded_folders.join(", ")}
|
Ausgeschlossene Ordner: {acc.excluded_folders.join(", ")}
|
||||||
</p>
|
</p>
|
||||||
)}
|
)}
|
||||||
|
|
||||||
<div className="flex gap-2">
|
{/* PROJ-8: Sync interval selector */}
|
||||||
|
<div className="mb-3 flex items-center gap-3">
|
||||||
|
<span className="text-sm text-muted-foreground whitespace-nowrap">
|
||||||
|
Auto-Sync:
|
||||||
|
</span>
|
||||||
|
<Select
|
||||||
|
value={String(acc.sync_interval_min ?? 0)}
|
||||||
|
onValueChange={(v) => handleIntervalChange(acc.id, v)}
|
||||||
|
>
|
||||||
|
<SelectTrigger className="w-40 h-8 text-sm">
|
||||||
|
<SelectValue />
|
||||||
|
</SelectTrigger>
|
||||||
|
<SelectContent>
|
||||||
|
<SelectItem value="0">Deaktiviert</SelectItem>
|
||||||
|
<SelectItem value="5">5 min</SelectItem>
|
||||||
|
<SelectItem value="15">15 min</SelectItem>
|
||||||
|
<SelectItem value="30">30 min</SelectItem>
|
||||||
|
<SelectItem value="60">1 Stunde</SelectItem>
|
||||||
|
<SelectItem value="360">6 Stunden</SelectItem>
|
||||||
|
<SelectItem value="1440">24 Stunden</SelectItem>
|
||||||
|
</SelectContent>
|
||||||
|
</Select>
|
||||||
|
</div>
|
||||||
|
|
||||||
|
<div className="flex gap-2 flex-wrap">
|
||||||
<Button
|
<Button
|
||||||
size="sm"
|
size="sm"
|
||||||
disabled={acc.status === "running"}
|
disabled={acc.status === "running"}
|
||||||
@@ -319,10 +399,18 @@ export default function ImapPage() {
|
|||||||
>
|
>
|
||||||
Import starten
|
Import starten
|
||||||
</Button>
|
</Button>
|
||||||
|
<Button
|
||||||
|
size="sm"
|
||||||
|
variant="outline"
|
||||||
|
disabled={acc.status === "running" || acc.sync_running}
|
||||||
|
onClick={() => handleSyncNow(acc.id)}
|
||||||
|
>
|
||||||
|
Sync jetzt
|
||||||
|
</Button>
|
||||||
<Button
|
<Button
|
||||||
size="sm"
|
size="sm"
|
||||||
variant="destructive"
|
variant="destructive"
|
||||||
disabled={acc.status === "running"}
|
disabled={acc.status === "running" || acc.sync_running}
|
||||||
onClick={() => setDeleteConfirm(acc.id)}
|
onClick={() => setDeleteConfirm(acc.id)}
|
||||||
>
|
>
|
||||||
Loeschen
|
Loeschen
|
||||||
|
|||||||
@@ -0,0 +1,31 @@
|
|||||||
|
export type FeatureStatus = "Planned" | "In Progress" | "In Review" | "Deployed";
|
||||||
|
|
||||||
|
export interface Feature {
|
||||||
|
id: string;
|
||||||
|
name: string;
|
||||||
|
status: FeatureStatus;
|
||||||
|
frontend: boolean;
|
||||||
|
backend: boolean;
|
||||||
|
lastUpdated: string; // YYYY-MM-DD
|
||||||
|
}
|
||||||
|
|
||||||
|
export const features: Feature[] = [
|
||||||
|
{ id: "PROJ-1", name: "Authentifizierung & Rollen", status: "In Review", frontend: true, backend: true, lastUpdated: "2026-03-15" },
|
||||||
|
{ id: "PROJ-2", name: "Import: EML/MBOX Upload", status: "In Progress", frontend: true, backend: true, lastUpdated: "2026-03-12" },
|
||||||
|
{ id: "PROJ-3", name: "Import: IMAP-Verbindung", status: "In Progress", frontend: true, backend: true, lastUpdated: "2026-03-12" },
|
||||||
|
{ id: "PROJ-4", name: "Import: SMTP-Eingang via BCC", status: "In Progress", frontend: false, backend: true, lastUpdated: "2026-03-12" },
|
||||||
|
{ id: "PROJ-5", name: "Speicherung & Volltext-Indexierung", status: "In Review", frontend: false, backend: true, lastUpdated: "2026-03-14" },
|
||||||
|
{ id: "PROJ-6", name: "Volltext-Suche & Filterung", status: "In Progress", frontend: true, backend: true, lastUpdated: "2026-03-12" },
|
||||||
|
{ id: "PROJ-7", name: "E-Mail-Ansicht (Lesen & Anhänge)", status: "In Progress", frontend: true, backend: true, lastUpdated: "2026-03-12" },
|
||||||
|
{ id: "PROJ-8", name: "Automatischer IMAP-Sync (Cron-Job)", status: "In Progress", frontend: false, backend: true, lastUpdated: "2026-03-12" },
|
||||||
|
{ id: "PROJ-9", name: "Ordner- & Label-Verwaltung", status: "In Progress", frontend: false, backend: false, lastUpdated: "2026-03-12" },
|
||||||
|
{ id: "PROJ-10", name: "Admin-Bereich: Nutzer- & Postfachverw.", status: "In Progress", frontend: true, backend: true, lastUpdated: "2026-03-12" },
|
||||||
|
{ id: "PROJ-11", name: "Audit-Log & Compliance-Berichte", status: "In Progress", frontend: true, backend: true, lastUpdated: "2026-03-12" },
|
||||||
|
{ id: "PROJ-12", name: "E-Mail-Export (EML/PDF/ZIP)", status: "In Review", frontend: true, backend: true, lastUpdated: "2026-03-13" },
|
||||||
|
{ id: "PROJ-13", name: "REST API für externe CRM-Anbindung", status: "In Progress", frontend: false, backend: true, lastUpdated: "2026-03-13" },
|
||||||
|
{ id: "PROJ-14", name: "Import: POP3-Verbindung", status: "In Progress", frontend: false, backend: false, lastUpdated: "2026-03-13" },
|
||||||
|
{ id: "PROJ-15", name: "CLI Import & Export", status: "In Review", frontend: false, backend: true, lastUpdated: "2026-03-13" },
|
||||||
|
{ id: "PROJ-16", name: "LDAP / Active Directory Anbindung", status: "In Progress", frontend: false, backend: false, lastUpdated: "2026-03-13" },
|
||||||
|
{ id: "PROJ-17", name: "Admin Dashboard – Systemauslastung", status: "In Review", frontend: true, backend: true, lastUpdated: "2026-03-14" },
|
||||||
|
{ id: "PROJ-18", name: "E-Mail Integritätsprüfung", status: "In Review", frontend: true, backend: true, lastUpdated: "2026-03-14" },
|
||||||
|
];
|
||||||
@@ -314,6 +314,13 @@ export interface ImapAccount {
|
|||||||
progress_current: number;
|
progress_current: number;
|
||||||
progress_total: number;
|
progress_total: number;
|
||||||
created_at: string;
|
created_at: string;
|
||||||
|
// PROJ-8: Auto-sync fields
|
||||||
|
sync_interval_min: number;
|
||||||
|
last_sync_at?: string;
|
||||||
|
last_sync_count: number;
|
||||||
|
sync_running: boolean;
|
||||||
|
sync_status: string;
|
||||||
|
sync_error_msg: string;
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface ImapTestResult {
|
export interface ImapTestResult {
|
||||||
@@ -366,6 +373,17 @@ export async function getImapProgress(id: number): Promise<ImapAccount> {
|
|||||||
return request<ImapAccount>(`/api/imap/${id}/progress`);
|
return request<ImapAccount>(`/api/imap/${id}/progress`);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export async function triggerImapSync(id: number): Promise<ImapAccount> {
|
||||||
|
return request<ImapAccount>(`/api/imap/${id}/sync`, { method: "POST" });
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function updateImapInterval(id: number, intervalMin: number): Promise<ImapAccount> {
|
||||||
|
return request<ImapAccount>(`/api/imap/${id}`, {
|
||||||
|
method: "PATCH",
|
||||||
|
body: JSON.stringify({ sync_interval_min: intervalMin }),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
// ── System Stats ──────────────────────────────────────────────────────────
|
// ── System Stats ──────────────────────────────────────────────────────────
|
||||||
|
|
||||||
export interface SystemStatsCPU {
|
export interface SystemStatsCPU {
|
||||||
|
|||||||
Reference in New Issue
Block a user