Files
archivmail/internal/imap/store.go
T
sysops a93a843506 feat(PROJ-30): Xapian → Manticore Search Migration
- internal/index/manticore.go: ManticoreTenantManager + manticoreIndex (RT-Indizes, CGO-frei)
- internal/index/index.go: TenantIndexer Interface (Xapian + Manticore)
- internal/index/tenant_worker.go: mgr-Typ auf TenantIndexer Interface
- internal/api/server.go: idxMgr auf TenantIndexer Interface
- config/config.go: IndexConfig.ManticoreDSN Feld
- cmd/archivmail/cmd_reindex.go: reindex Subkommando
- cmd/archivmail/main.go: Manticore-Branch + reindex Case
- go.mod: github.com/go-sql-driver/mysql v1.8.1
- update.sh: Manticore auto-install, CGO_ENABLED=0, config.yml migration, auto-reindex

fix(IMAP): TCP-Deadline-Wrapper für steckengebliebene Imports
fix(auth): Email-Claim in JWT für User-Isolation
fix(search): User-Isolation via sess.Email (fail-safe)
fix(ui): Admin-Login Auth-Cache, Logout-Redirect, IMAP-Polling-Resilienz

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-03 21:19:36 +02:00

391 lines
13 KiB
Go

package imap
import (
"context"
"crypto/aes"
"crypto/cipher"
"crypto/rand"
"crypto/sha256"
"fmt"
"io"
"time"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
)
// Account represents an IMAP account configuration stored in the database.
type Account struct {
ID int64 `json:"id"`
Owner string `json:"owner"`
Name string `json:"name"`
Host string `json:"host"`
Port int `json:"port"`
TLS string `json:"tls"`
Username string `json:"username"`
ExcludedFolders []string `json:"excluded_folders"`
Status string `json:"status"`
ErrorMsg string `json:"error_msg"`
LastImportAt *time.Time `json:"last_import_at,omitempty"`
LastImportCount int `json:"last_import_count"`
ProgressCurrent int `json:"progress_current"`
ProgressTotal int `json:"progress_total"`
CreatedAt time.Time `json:"created_at"`
// PROJ-8: Auto-sync fields
SyncIntervalMin int `json:"sync_interval_min"`
LastSyncAt *time.Time `json:"last_sync_at,omitempty"`
LastSyncCount int `json:"last_sync_count"`
LastUID uint32 `json:"last_uid"`
SyncRunning bool `json:"sync_running"`
SyncStatus string `json:"sync_status"`
SyncErrorMsg string `json:"sync_error_msg"`
// Tenant assignment — mails imported from this account are tagged with this tenant.
TenantID *int64 `json:"tenant_id,omitempty"`
}
// Store manages IMAP account persistence in PostgreSQL.
type Store struct {
pool *pgxpool.Pool
encKey [32]byte
}
const createTableSQL = `
CREATE TABLE IF NOT EXISTS imap_accounts (
id SERIAL PRIMARY KEY,
owner TEXT NOT NULL,
name TEXT NOT NULL,
host TEXT NOT NULL,
port INTEGER NOT NULL DEFAULT 993,
tls TEXT NOT NULL DEFAULT 'ssl',
username TEXT NOT NULL,
password_enc BYTEA NOT NULL,
excluded_folders TEXT[] NOT NULL DEFAULT '{}',
status TEXT NOT NULL DEFAULT 'idle',
error_msg TEXT NOT NULL DEFAULT '',
last_import_at TIMESTAMPTZ,
last_import_count INTEGER NOT NULL DEFAULT 0,
progress_current INTEGER NOT NULL DEFAULT 0,
progress_total INTEGER NOT NULL DEFAULT 0,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX IF NOT EXISTS idx_imap_accounts_owner ON imap_accounts (owner);
`
// migrationSQL adds columns that may not exist in older installations.
const migrationSQL = `
ALTER TABLE imap_accounts ADD COLUMN IF NOT EXISTS sync_interval_min INTEGER NOT NULL DEFAULT 0;
ALTER TABLE imap_accounts ADD COLUMN IF NOT EXISTS last_sync_at TIMESTAMPTZ;
ALTER TABLE imap_accounts ADD COLUMN IF NOT EXISTS last_sync_count INTEGER NOT NULL DEFAULT 0;
ALTER TABLE imap_accounts ADD COLUMN IF NOT EXISTS last_uid BIGINT NOT NULL DEFAULT 0;
ALTER TABLE imap_accounts ADD COLUMN IF NOT EXISTS sync_running BOOLEAN NOT NULL DEFAULT FALSE;
ALTER TABLE imap_accounts ADD COLUMN IF NOT EXISTS sync_status TEXT NOT NULL DEFAULT '';
ALTER TABLE imap_accounts ADD COLUMN IF NOT EXISTS sync_error_msg TEXT NOT NULL DEFAULT '';
ALTER TABLE imap_accounts ADD COLUMN IF NOT EXISTS tenant_id INTEGER REFERENCES tenants(id);
`
// New creates a new Store, connects to PostgreSQL, and runs the migration.
func New(dsn, secret string) (*Store, error) {
pool, err := pgxpool.New(context.Background(), dsn)
if err != nil {
return nil, fmt.Errorf("imap store: connect: %w", err)
}
if _, err := pool.Exec(context.Background(), createTableSQL); err != nil {
pool.Close()
return nil, fmt.Errorf("imap store: migrate create: %w", err)
}
if _, err := pool.Exec(context.Background(), migrationSQL); err != nil {
pool.Close()
return nil, fmt.Errorf("imap store: migrate alter: %w", err)
}
key := sha256.Sum256([]byte(secret))
return &Store{pool: pool, encKey: key}, nil
}
// Close releases the database connection pool.
func (s *Store) Close() {
s.pool.Close()
}
// Create inserts a new IMAP account with an encrypted password.
func (s *Store) Create(ctx context.Context, acc Account, password string) (*Account, error) {
enc, err := encryptPassword(password, s.encKey)
if err != nil {
return nil, fmt.Errorf("imap store: encrypt password: %w", err)
}
row := s.pool.QueryRow(ctx, `
INSERT INTO imap_accounts (owner, name, host, port, tls, username, password_enc, excluded_folders)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
RETURNING id, created_at`,
acc.Owner, acc.Name, acc.Host, acc.Port, acc.TLS, acc.Username, enc, acc.ExcludedFolders,
)
if err := row.Scan(&acc.ID, &acc.CreatedAt); err != nil {
return nil, fmt.Errorf("imap store: create: %w", err)
}
acc.Status = "idle"
acc.ErrorMsg = ""
return &acc, nil
}
// selectColumns is the canonical column list used in all SELECT statements.
// Column order must match the Scan call in scanRow.
// Leading and trailing spaces are intentional for correct SQL concatenation.
const selectColumns = ` id, owner, name, host, port, tls, username, excluded_folders,
status, error_msg, last_import_at, last_import_count,
progress_current, progress_total, created_at,
sync_interval_min, last_sync_at, last_sync_count, last_uid,
sync_running, sync_status, sync_error_msg, tenant_id `
// scanner abstracts pgx.Row and pgx.Rows — both expose Scan(...any) error.
type scanner interface {
Scan(dest ...any) error
}
func scanRow(row scanner) (Account, error) {
var a Account
err := row.Scan(
&a.ID, &a.Owner, &a.Name, &a.Host, &a.Port, &a.TLS, &a.Username,
&a.ExcludedFolders, &a.Status, &a.ErrorMsg, &a.LastImportAt,
&a.LastImportCount, &a.ProgressCurrent, &a.ProgressTotal, &a.CreatedAt,
&a.SyncIntervalMin, &a.LastSyncAt, &a.LastSyncCount, &a.LastUID,
&a.SyncRunning, &a.SyncStatus, &a.SyncErrorMsg, &a.TenantID,
)
return a, err
}
// List returns IMAP accounts. Admins see all accounts; regular users see only their own.
func (s *Store) List(ctx context.Context, owner string, isAdmin bool) ([]Account, error) {
var rows pgx.Rows
var err error
q := `SELECT` + selectColumns + `FROM imap_accounts`
if isAdmin {
rows, err = s.pool.Query(ctx, q+` ORDER BY id`)
} else {
rows, err = s.pool.Query(ctx, q+` WHERE owner = $1 ORDER BY id`, owner)
}
if err != nil {
return nil, fmt.Errorf("imap store: list: %w", err)
}
defer rows.Close()
var accounts []Account
for rows.Next() {
a, err := scanRow(rows)
if err != nil {
return nil, fmt.Errorf("imap store: scan: %w", err)
}
accounts = append(accounts, a)
}
return accounts, rows.Err()
}
// ListAll returns all IMAP accounts regardless of owner — used by the scheduler.
func (s *Store) ListAll(ctx context.Context) ([]Account, error) {
rows, err := s.pool.Query(ctx,
`SELECT`+selectColumns+`FROM imap_accounts ORDER BY id`)
if err != nil {
return nil, fmt.Errorf("imap store: list all: %w", err)
}
defer rows.Close()
var accounts []Account
for rows.Next() {
a, err := scanRow(rows)
if err != nil {
return nil, fmt.Errorf("imap store: scan: %w", err)
}
accounts = append(accounts, a)
}
return accounts, rows.Err()
}
// Get returns a single IMAP account by ID.
func (s *Store) Get(ctx context.Context, id int64) (*Account, error) {
row := s.pool.QueryRow(ctx,
`SELECT`+selectColumns+`FROM imap_accounts WHERE id = $1`, id)
a, err := scanRow(row)
if err != nil {
return nil, fmt.Errorf("imap store: get %d: %w", id, err)
}
return &a, nil
}
// GetPassword retrieves and decrypts the stored password for an IMAP account.
func (s *Store) GetPassword(ctx context.Context, id int64) (string, error) {
var enc []byte
err := s.pool.QueryRow(ctx, `SELECT password_enc FROM imap_accounts WHERE id = $1`, id).Scan(&enc)
if err != nil {
return "", fmt.Errorf("imap store: get password: %w", err)
}
return decryptPassword(enc, s.encKey)
}
// Delete removes an IMAP account by ID.
func (s *Store) Delete(ctx context.Context, id int64) error {
tag, err := s.pool.Exec(ctx, `DELETE FROM imap_accounts WHERE id = $1`, id)
if err != nil {
return fmt.Errorf("imap store: delete: %w", err)
}
if tag.RowsAffected() == 0 {
return fmt.Errorf("imap store: account %d not found", id)
}
return nil
}
// DeleteByOwner removes all IMAP accounts belonging to the given username.
// Returns the number of accounts deleted.
func (s *Store) DeleteByOwner(ctx context.Context, username string) (int, error) {
tag, err := s.pool.Exec(ctx, `DELETE FROM imap_accounts WHERE owner = $1`, username)
if err != nil {
return 0, fmt.Errorf("imap store: delete by owner: %w", err)
}
return int(tag.RowsAffected()), nil
}
// UpdateExcluded sets the list of excluded folders for an account.
func (s *Store) UpdateExcluded(ctx context.Context, id int64, excluded []string) error {
_, err := s.pool.Exec(ctx, `UPDATE imap_accounts SET excluded_folders = $1 WHERE id = $2`, excluded, id)
if err != nil {
return fmt.Errorf("imap store: update excluded: %w", err)
}
return nil
}
// UpdateStatus updates the import progress and status of an account.
func (s *Store) UpdateStatus(ctx context.Context, id int64, status, errMsg string, current, total int) error {
_, err := s.pool.Exec(ctx, `
UPDATE imap_accounts
SET status = $1, error_msg = $2, progress_current = $3, progress_total = $4
WHERE id = $5`, status, errMsg, current, total, id)
if err != nil {
return fmt.Errorf("imap store: update status: %w", err)
}
return nil
}
// UpdateDone marks an import as completed, setting status back to idle.
func (s *Store) UpdateDone(ctx context.Context, id int64, count int) error {
_, err := s.pool.Exec(ctx, `
UPDATE imap_accounts
SET status = 'idle', error_msg = '', last_import_at = now(),
last_import_count = $1, progress_current = 0, progress_total = 0
WHERE id = $2`, count, id)
if err != nil {
return fmt.Errorf("imap store: update done: %w", err)
}
return nil
}
// UpdateCredentials updates the connection details and optionally the password
// of an IMAP account. Pass an empty password to leave it unchanged.
func (s *Store) UpdateCredentials(ctx context.Context, id int64, acc Account, password string) error {
if password != "" {
enc, err := encryptPassword(password, s.encKey)
if err != nil {
return fmt.Errorf("imap store: encrypt password: %w", err)
}
_, err = s.pool.Exec(ctx,
`UPDATE imap_accounts SET name=$1, host=$2, port=$3, tls=$4, username=$5, password_enc=$6,
status='idle', error_msg='', sync_running=false WHERE id=$7`,
acc.Name, acc.Host, acc.Port, acc.TLS, acc.Username, enc, id)
if err != nil {
return fmt.Errorf("imap store: update credentials: %w", err)
}
} else {
_, err := s.pool.Exec(ctx,
`UPDATE imap_accounts SET name=$1, host=$2, port=$3, tls=$4, username=$5,
status='idle', error_msg='', sync_running=false WHERE id=$6`,
acc.Name, acc.Host, acc.Port, acc.TLS, acc.Username, id)
if err != nil {
return fmt.Errorf("imap store: update credentials (no pw): %w", err)
}
}
return nil
}
// UpdateSyncInterval sets the automatic sync interval for an account.
// intervalMin == 0 disables automatic sync.
func (s *Store) UpdateSyncInterval(ctx context.Context, id int64, intervalMin int) error {
_, err := s.pool.Exec(ctx,
`UPDATE imap_accounts SET sync_interval_min = $1 WHERE id = $2`,
intervalMin, id)
if err != nil {
return fmt.Errorf("imap store: update sync interval: %w", err)
}
return nil
}
// SetSyncRunning marks whether a background sync is currently active for an account.
func (s *Store) SetSyncRunning(ctx context.Context, id int64, running bool) error {
_, err := s.pool.Exec(ctx,
`UPDATE imap_accounts SET sync_running = $1 WHERE id = $2`,
running, id)
if err != nil {
return fmt.Errorf("imap store: set sync running: %w", err)
}
return nil
}
// UpdateSyncResult persists the outcome of a completed sync run.
func (s *Store) UpdateSyncResult(ctx context.Context, id int64, status, errMsg string, count int, lastUID uint32) error {
_, err := s.pool.Exec(ctx, `
UPDATE imap_accounts
SET sync_status = $1, sync_error_msg = $2, last_sync_count = $3,
last_uid = $4, last_sync_at = now(), sync_running = FALSE
WHERE id = $5`,
status, errMsg, count, lastUID, id)
if err != nil {
return fmt.Errorf("imap store: update sync result: %w", err)
}
return nil
}
// encryptPassword encrypts a plaintext password using AES-256-GCM.
func encryptPassword(plaintext string, key [32]byte) ([]byte, error) {
block, err := aes.NewCipher(key[:])
if err != nil {
return nil, err
}
gcm, err := cipher.NewGCM(block)
if err != nil {
return nil, err
}
nonce := make([]byte, gcm.NonceSize())
if _, err := io.ReadFull(rand.Reader, nonce); err != nil {
return nil, err
}
return gcm.Seal(nonce, nonce, []byte(plaintext), nil), nil
}
// decryptPassword decrypts a password previously encrypted with encryptPassword.
func decryptPassword(ciphertext []byte, key [32]byte) (string, error) {
block, err := aes.NewCipher(key[:])
if err != nil {
return "", err
}
gcm, err := cipher.NewGCM(block)
if err != nil {
return "", err
}
nonceSize := gcm.NonceSize()
if len(ciphertext) < nonceSize {
return "", fmt.Errorf("ciphertext too short")
}
nonce, ct := ciphertext[:nonceSize], ciphertext[nonceSize:]
plaintext, err := gcm.Open(nil, nonce, ct, nil)
if err != nil {
return "", fmt.Errorf("decrypt failed: %w", err)
}
return string(plaintext), nil
}