4151b6f8c5
- FolderState: GetFolderState, UpsertFolderState, ListFolderStates, DecideResync - syncFolder nutzt per-folder UID-Tracking statt globalem highest_uid - UIDVALIDITY-Check loest automatisch Full-Resync aus - imap_folder_state Tabelle in initSchema (CREATE TABLE IF NOT EXISTS) - SetAuditLogger in main.go verdrahtet Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
410 lines
14 KiB
Go
410 lines
14 KiB
Go
package imap
|
|
|
|
import (
|
|
"context"
|
|
"crypto/aes"
|
|
"crypto/cipher"
|
|
"crypto/rand"
|
|
"crypto/sha256"
|
|
"fmt"
|
|
"io"
|
|
"time"
|
|
|
|
"github.com/jackc/pgx/v5"
|
|
"github.com/jackc/pgx/v5/pgxpool"
|
|
)
|
|
|
|
// Account represents an IMAP account configuration stored in the database.
|
|
type Account struct {
|
|
ID int64 `json:"id"`
|
|
Owner string `json:"owner"`
|
|
Name string `json:"name"`
|
|
Host string `json:"host"`
|
|
Port int `json:"port"`
|
|
TLS string `json:"tls"`
|
|
Username string `json:"username"`
|
|
ExcludedFolders []string `json:"excluded_folders"`
|
|
Status string `json:"status"`
|
|
ErrorMsg string `json:"error_msg"`
|
|
LastImportAt *time.Time `json:"last_import_at,omitempty"`
|
|
LastImportCount int `json:"last_import_count"`
|
|
ProgressCurrent int `json:"progress_current"`
|
|
ProgressTotal int `json:"progress_total"`
|
|
CreatedAt time.Time `json:"created_at"`
|
|
|
|
// PROJ-8: Auto-sync fields
|
|
SyncIntervalMin int `json:"sync_interval_min"`
|
|
LastSyncAt *time.Time `json:"last_sync_at,omitempty"`
|
|
LastSyncCount int `json:"last_sync_count"`
|
|
LastUID uint32 `json:"last_uid"`
|
|
SyncRunning bool `json:"sync_running"`
|
|
SyncStatus string `json:"sync_status"`
|
|
SyncErrorMsg string `json:"sync_error_msg"`
|
|
|
|
// Tenant assignment — mails imported from this account are tagged with this tenant.
|
|
TenantID *int64 `json:"tenant_id,omitempty"`
|
|
}
|
|
|
|
// Store manages IMAP account persistence in PostgreSQL.
|
|
type Store struct {
|
|
pool *pgxpool.Pool
|
|
encKey [32]byte
|
|
}
|
|
|
|
const createTableSQL = `
|
|
CREATE TABLE IF NOT EXISTS imap_accounts (
|
|
id SERIAL PRIMARY KEY,
|
|
owner TEXT NOT NULL,
|
|
name TEXT NOT NULL,
|
|
host TEXT NOT NULL,
|
|
port INTEGER NOT NULL DEFAULT 993,
|
|
tls TEXT NOT NULL DEFAULT 'ssl',
|
|
username TEXT NOT NULL,
|
|
password_enc BYTEA NOT NULL,
|
|
excluded_folders TEXT[] NOT NULL DEFAULT '{}',
|
|
status TEXT NOT NULL DEFAULT 'idle',
|
|
error_msg TEXT NOT NULL DEFAULT '',
|
|
last_import_at TIMESTAMPTZ,
|
|
last_import_count INTEGER NOT NULL DEFAULT 0,
|
|
progress_current INTEGER NOT NULL DEFAULT 0,
|
|
progress_total INTEGER NOT NULL DEFAULT 0,
|
|
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_imap_accounts_owner ON imap_accounts (owner);
|
|
`
|
|
|
|
// migrationSQL adds columns that may not exist in older installations.
|
|
const migrationSQL = `
|
|
ALTER TABLE imap_accounts ADD COLUMN IF NOT EXISTS sync_interval_min INTEGER NOT NULL DEFAULT 0;
|
|
ALTER TABLE imap_accounts ADD COLUMN IF NOT EXISTS last_sync_at TIMESTAMPTZ;
|
|
ALTER TABLE imap_accounts ADD COLUMN IF NOT EXISTS last_sync_count INTEGER NOT NULL DEFAULT 0;
|
|
ALTER TABLE imap_accounts ADD COLUMN IF NOT EXISTS last_uid BIGINT NOT NULL DEFAULT 0;
|
|
ALTER TABLE imap_accounts ADD COLUMN IF NOT EXISTS sync_running BOOLEAN NOT NULL DEFAULT FALSE;
|
|
ALTER TABLE imap_accounts ADD COLUMN IF NOT EXISTS sync_status TEXT NOT NULL DEFAULT '';
|
|
ALTER TABLE imap_accounts ADD COLUMN IF NOT EXISTS sync_error_msg TEXT NOT NULL DEFAULT '';
|
|
ALTER TABLE imap_accounts ADD COLUMN IF NOT EXISTS tenant_id INTEGER REFERENCES tenants(id);
|
|
`
|
|
|
|
// folderStateSQL creates the per-folder UID-tracking table introduced by PROJ-45.
|
|
// Replaces the previous account-global last_uid tracking which silently skipped
|
|
// folders with lower UIDs than INBOX and ignored UIDVALIDITY changes.
|
|
const folderStateSQL = `
|
|
CREATE TABLE IF NOT EXISTS imap_folder_state (
|
|
account_id BIGINT NOT NULL REFERENCES imap_accounts(id) ON DELETE CASCADE,
|
|
folder TEXT NOT NULL,
|
|
last_uid BIGINT NOT NULL DEFAULT 0,
|
|
uid_validity BIGINT NOT NULL DEFAULT 0,
|
|
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
|
PRIMARY KEY (account_id, folder)
|
|
);
|
|
`
|
|
|
|
// New creates a new Store, connects to PostgreSQL, and runs the migration.
|
|
func New(dsn, secret string) (*Store, error) {
|
|
pool, err := pgxpool.New(context.Background(), dsn)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("imap store: connect: %w", err)
|
|
}
|
|
|
|
if _, err := pool.Exec(context.Background(), createTableSQL); err != nil {
|
|
pool.Close()
|
|
return nil, fmt.Errorf("imap store: migrate create: %w", err)
|
|
}
|
|
|
|
if _, err := pool.Exec(context.Background(), migrationSQL); err != nil {
|
|
pool.Close()
|
|
return nil, fmt.Errorf("imap store: migrate alter: %w", err)
|
|
}
|
|
|
|
if _, err := pool.Exec(context.Background(), folderStateSQL); err != nil {
|
|
pool.Close()
|
|
return nil, fmt.Errorf("imap store: migrate folder state: %w", err)
|
|
}
|
|
|
|
key := sha256.Sum256([]byte(secret))
|
|
return &Store{pool: pool, encKey: key}, nil
|
|
}
|
|
|
|
// Close releases the database connection pool.
|
|
func (s *Store) Close() {
|
|
s.pool.Close()
|
|
}
|
|
|
|
// Create inserts a new IMAP account with an encrypted password.
|
|
func (s *Store) Create(ctx context.Context, acc Account, password string) (*Account, error) {
|
|
enc, err := encryptPassword(password, s.encKey)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("imap store: encrypt password: %w", err)
|
|
}
|
|
|
|
row := s.pool.QueryRow(ctx, `
|
|
INSERT INTO imap_accounts (owner, name, host, port, tls, username, password_enc, excluded_folders)
|
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
|
|
RETURNING id, created_at`,
|
|
acc.Owner, acc.Name, acc.Host, acc.Port, acc.TLS, acc.Username, enc, acc.ExcludedFolders,
|
|
)
|
|
|
|
if err := row.Scan(&acc.ID, &acc.CreatedAt); err != nil {
|
|
return nil, fmt.Errorf("imap store: create: %w", err)
|
|
}
|
|
|
|
acc.Status = "idle"
|
|
acc.ErrorMsg = ""
|
|
return &acc, nil
|
|
}
|
|
|
|
// selectColumns is the canonical column list used in all SELECT statements.
|
|
// Column order must match the Scan call in scanRow.
|
|
// Leading and trailing spaces are intentional for correct SQL concatenation.
|
|
const selectColumns = ` id, owner, name, host, port, tls, username, excluded_folders,
|
|
status, error_msg, last_import_at, last_import_count,
|
|
progress_current, progress_total, created_at,
|
|
sync_interval_min, last_sync_at, last_sync_count, last_uid,
|
|
sync_running, sync_status, sync_error_msg, tenant_id `
|
|
|
|
// scanner abstracts pgx.Row and pgx.Rows — both expose Scan(...any) error.
|
|
type scanner interface {
|
|
Scan(dest ...any) error
|
|
}
|
|
|
|
func scanRow(row scanner) (Account, error) {
|
|
var a Account
|
|
err := row.Scan(
|
|
&a.ID, &a.Owner, &a.Name, &a.Host, &a.Port, &a.TLS, &a.Username,
|
|
&a.ExcludedFolders, &a.Status, &a.ErrorMsg, &a.LastImportAt,
|
|
&a.LastImportCount, &a.ProgressCurrent, &a.ProgressTotal, &a.CreatedAt,
|
|
&a.SyncIntervalMin, &a.LastSyncAt, &a.LastSyncCount, &a.LastUID,
|
|
&a.SyncRunning, &a.SyncStatus, &a.SyncErrorMsg, &a.TenantID,
|
|
)
|
|
return a, err
|
|
}
|
|
|
|
// List returns IMAP accounts. Admins see all accounts; regular users see only their own.
|
|
func (s *Store) List(ctx context.Context, owner string, isAdmin bool) ([]Account, error) {
|
|
var rows pgx.Rows
|
|
var err error
|
|
|
|
q := `SELECT` + selectColumns + `FROM imap_accounts`
|
|
|
|
if isAdmin {
|
|
rows, err = s.pool.Query(ctx, q+` ORDER BY id`)
|
|
} else {
|
|
rows, err = s.pool.Query(ctx, q+` WHERE owner = $1 ORDER BY id`, owner)
|
|
}
|
|
if err != nil {
|
|
return nil, fmt.Errorf("imap store: list: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
var accounts []Account
|
|
for rows.Next() {
|
|
a, err := scanRow(rows)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("imap store: scan: %w", err)
|
|
}
|
|
accounts = append(accounts, a)
|
|
}
|
|
return accounts, rows.Err()
|
|
}
|
|
|
|
// ListAll returns all IMAP accounts regardless of owner — used by the scheduler.
|
|
func (s *Store) ListAll(ctx context.Context) ([]Account, error) {
|
|
rows, err := s.pool.Query(ctx,
|
|
`SELECT`+selectColumns+`FROM imap_accounts ORDER BY id`)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("imap store: list all: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
var accounts []Account
|
|
for rows.Next() {
|
|
a, err := scanRow(rows)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("imap store: scan: %w", err)
|
|
}
|
|
accounts = append(accounts, a)
|
|
}
|
|
return accounts, rows.Err()
|
|
}
|
|
|
|
// Get returns a single IMAP account by ID.
|
|
func (s *Store) Get(ctx context.Context, id int64) (*Account, error) {
|
|
row := s.pool.QueryRow(ctx,
|
|
`SELECT`+selectColumns+`FROM imap_accounts WHERE id = $1`, id)
|
|
a, err := scanRow(row)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("imap store: get %d: %w", id, err)
|
|
}
|
|
return &a, nil
|
|
}
|
|
|
|
// GetPassword retrieves and decrypts the stored password for an IMAP account.
|
|
func (s *Store) GetPassword(ctx context.Context, id int64) (string, error) {
|
|
var enc []byte
|
|
err := s.pool.QueryRow(ctx, `SELECT password_enc FROM imap_accounts WHERE id = $1`, id).Scan(&enc)
|
|
if err != nil {
|
|
return "", fmt.Errorf("imap store: get password: %w", err)
|
|
}
|
|
return decryptPassword(enc, s.encKey)
|
|
}
|
|
|
|
// Delete removes an IMAP account by ID.
|
|
func (s *Store) Delete(ctx context.Context, id int64) error {
|
|
tag, err := s.pool.Exec(ctx, `DELETE FROM imap_accounts WHERE id = $1`, id)
|
|
if err != nil {
|
|
return fmt.Errorf("imap store: delete: %w", err)
|
|
}
|
|
if tag.RowsAffected() == 0 {
|
|
return fmt.Errorf("imap store: account %d not found", id)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// DeleteByOwner removes all IMAP accounts belonging to the given username.
|
|
// Returns the number of accounts deleted.
|
|
func (s *Store) DeleteByOwner(ctx context.Context, username string) (int, error) {
|
|
tag, err := s.pool.Exec(ctx, `DELETE FROM imap_accounts WHERE owner = $1`, username)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("imap store: delete by owner: %w", err)
|
|
}
|
|
return int(tag.RowsAffected()), nil
|
|
}
|
|
|
|
// UpdateExcluded sets the list of excluded folders for an account.
|
|
func (s *Store) UpdateExcluded(ctx context.Context, id int64, excluded []string) error {
|
|
_, err := s.pool.Exec(ctx, `UPDATE imap_accounts SET excluded_folders = $1 WHERE id = $2`, excluded, id)
|
|
if err != nil {
|
|
return fmt.Errorf("imap store: update excluded: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// UpdateStatus updates the import progress and status of an account.
|
|
func (s *Store) UpdateStatus(ctx context.Context, id int64, status, errMsg string, current, total int) error {
|
|
_, err := s.pool.Exec(ctx, `
|
|
UPDATE imap_accounts
|
|
SET status = $1, error_msg = $2, progress_current = $3, progress_total = $4
|
|
WHERE id = $5`, status, errMsg, current, total, id)
|
|
if err != nil {
|
|
return fmt.Errorf("imap store: update status: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// UpdateDone marks an import as completed, setting status back to idle.
|
|
func (s *Store) UpdateDone(ctx context.Context, id int64, count int) error {
|
|
_, err := s.pool.Exec(ctx, `
|
|
UPDATE imap_accounts
|
|
SET status = 'idle', error_msg = '', last_import_at = now(),
|
|
last_import_count = $1, progress_current = 0, progress_total = 0
|
|
WHERE id = $2`, count, id)
|
|
if err != nil {
|
|
return fmt.Errorf("imap store: update done: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// UpdateCredentials updates the connection details and optionally the password
|
|
// of an IMAP account. Pass an empty password to leave it unchanged.
|
|
func (s *Store) UpdateCredentials(ctx context.Context, id int64, acc Account, password string) error {
|
|
if password != "" {
|
|
enc, err := encryptPassword(password, s.encKey)
|
|
if err != nil {
|
|
return fmt.Errorf("imap store: encrypt password: %w", err)
|
|
}
|
|
_, err = s.pool.Exec(ctx,
|
|
`UPDATE imap_accounts SET name=$1, host=$2, port=$3, tls=$4, username=$5, password_enc=$6,
|
|
status='idle', error_msg='', sync_running=false WHERE id=$7`,
|
|
acc.Name, acc.Host, acc.Port, acc.TLS, acc.Username, enc, id)
|
|
if err != nil {
|
|
return fmt.Errorf("imap store: update credentials: %w", err)
|
|
}
|
|
} else {
|
|
_, err := s.pool.Exec(ctx,
|
|
`UPDATE imap_accounts SET name=$1, host=$2, port=$3, tls=$4, username=$5,
|
|
status='idle', error_msg='', sync_running=false WHERE id=$6`,
|
|
acc.Name, acc.Host, acc.Port, acc.TLS, acc.Username, id)
|
|
if err != nil {
|
|
return fmt.Errorf("imap store: update credentials (no pw): %w", err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// UpdateSyncInterval sets the automatic sync interval for an account.
|
|
// intervalMin == 0 disables automatic sync.
|
|
func (s *Store) UpdateSyncInterval(ctx context.Context, id int64, intervalMin int) error {
|
|
_, err := s.pool.Exec(ctx,
|
|
`UPDATE imap_accounts SET sync_interval_min = $1 WHERE id = $2`,
|
|
intervalMin, id)
|
|
if err != nil {
|
|
return fmt.Errorf("imap store: update sync interval: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// SetSyncRunning marks whether a background sync is currently active for an account.
|
|
func (s *Store) SetSyncRunning(ctx context.Context, id int64, running bool) error {
|
|
_, err := s.pool.Exec(ctx,
|
|
`UPDATE imap_accounts SET sync_running = $1 WHERE id = $2`,
|
|
running, id)
|
|
if err != nil {
|
|
return fmt.Errorf("imap store: set sync running: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// UpdateSyncResult persists the outcome of a completed sync run.
|
|
func (s *Store) UpdateSyncResult(ctx context.Context, id int64, status, errMsg string, count int, lastUID uint32) error {
|
|
_, err := s.pool.Exec(ctx, `
|
|
UPDATE imap_accounts
|
|
SET sync_status = $1, sync_error_msg = $2, last_sync_count = $3,
|
|
last_uid = $4, last_sync_at = now(), sync_running = FALSE
|
|
WHERE id = $5`,
|
|
status, errMsg, count, lastUID, id)
|
|
if err != nil {
|
|
return fmt.Errorf("imap store: update sync result: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// encryptPassword encrypts a plaintext password using AES-256-GCM.
|
|
func encryptPassword(plaintext string, key [32]byte) ([]byte, error) {
|
|
block, err := aes.NewCipher(key[:])
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
gcm, err := cipher.NewGCM(block)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
nonce := make([]byte, gcm.NonceSize())
|
|
if _, err := io.ReadFull(rand.Reader, nonce); err != nil {
|
|
return nil, err
|
|
}
|
|
return gcm.Seal(nonce, nonce, []byte(plaintext), nil), nil
|
|
}
|
|
|
|
// decryptPassword decrypts a password previously encrypted with encryptPassword.
|
|
func decryptPassword(ciphertext []byte, key [32]byte) (string, error) {
|
|
block, err := aes.NewCipher(key[:])
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
gcm, err := cipher.NewGCM(block)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
nonceSize := gcm.NonceSize()
|
|
if len(ciphertext) < nonceSize {
|
|
return "", fmt.Errorf("ciphertext too short")
|
|
}
|
|
nonce, ct := ciphertext[:nonceSize], ciphertext[nonceSize:]
|
|
plaintext, err := gcm.Open(nil, nonce, ct, nil)
|
|
if err != nil {
|
|
return "", fmt.Errorf("decrypt failed: %w", err)
|
|
}
|
|
return string(plaintext), nil
|
|
}
|