Files
archivmail/internal/imap/importer.go
T
sysops 472ba6a087 feat(PROJ-53): Konfigurierbare Listenanzahl pro Seite
- users.list_page_size (Default 25), PATCH /api/auth/preferences,
  Whitelist 25/50/100/200, Wert in login/me-Response
- Settings-UI mit Select, /search nutzt gespeicherte Seitengröße
- /api/search page_size serverseitig auf max. 500 gecappt

fix(PROJ-46): login_attempts-Migration nutzte s.db statt s.pool
(Backend kompilierte nicht)

feat(PROJ-50): DSGVO-Löschersuchen Backend (dsgvo_requests, Handler,
cc_addr/bcc_addr Indexerweiterung) — noch nicht QA'd/deployed
2026-06-14 22:25:02 +02:00

295 lines
7.9 KiB
Go

package imap
import (
"context"
"fmt"
"io"
"log/slog"
"strings"
"time"
imapv2 "github.com/emersion/go-imap/v2"
"github.com/emersion/go-imap/v2/imapclient"
"archivmail/internal/index"
"archivmail/internal/storage"
"archivmail/pkg/mailparser"
)
const batchSize = 50
// Importer runs background IMAP import jobs.
type Importer struct {
store *Store
mailStore *storage.Store
idxMgr index.TenantIndexer
logger *slog.Logger
// PROJ-44: optional hook into the async OCR worker. Wired in main.go
// via SetOCRSubmit so the imap package does not import internal/ocr.
ocrSubmit func(mailID string, tenantID *int64)
}
// NewImporter creates a new Importer wired to the storage and index backends.
func NewImporter(store *Store, mailStore *storage.Store, idxMgr index.TenantIndexer, logger *slog.Logger) *Importer {
return &Importer{
store: store,
mailStore: mailStore,
idxMgr: idxMgr,
logger: logger,
}
}
// SetOCRSubmit installs a non-blocking callback that enqueues a mail for
// OCR processing. If never called, IMAP-imported mails are not OCR'd —
// they remain in ocr_status='pending' forever (PROJ-44 fix).
func (imp *Importer) SetOCRSubmit(fn func(mailID string, tenantID *int64)) {
imp.ocrSubmit = fn
}
// Run performs a full IMAP import for the given account. It is designed to be
// called as a goroutine: go imp.Run(context.Background(), accountID)
func (imp *Importer) Run(ctx context.Context, accountID int64) {
log := imp.logger.With("component", "imap-importer", "account_id", accountID)
acc, err := imp.store.Get(ctx, accountID)
if err != nil {
log.Error("failed to get account", "err", err)
return
}
password, err := imp.store.GetPassword(ctx, accountID)
if err != nil {
log.Error("failed to decrypt password", "err", err)
_ = imp.store.UpdateStatus(ctx, accountID, "error", "failed to decrypt password", 0, 0)
return
}
// Mark as running
if err := imp.store.UpdateStatus(ctx, accountID, "running", "", 0, 0); err != nil {
log.Error("failed to update status", "err", err)
return
}
imported, err := imp.doImport(ctx, acc, password, log)
if err != nil {
log.Error("import failed", "err", err)
_ = imp.store.UpdateStatus(ctx, accountID, "error", err.Error(), 0, 0)
return
}
if err := imp.store.UpdateDone(ctx, accountID, imported); err != nil {
log.Error("failed to update done", "err", err)
}
log.Info("import completed", "imported", imported)
}
// doImport handles the actual IMAP connection, folder iteration, and message fetching.
func (imp *Importer) doImport(ctx context.Context, acc *Account, password string, log *slog.Logger) (int, error) {
c, err := Connect(acc.Host, acc.Port, acc.TLS)
if err != nil {
return 0, fmt.Errorf("connect: %w", err)
}
defer c.Close()
// Login
if err := c.Login(acc.Username, password).Wait(); err != nil {
return 0, fmt.Errorf("login: %w", err)
}
// List all folders
folders, err := ListFolders(c.Client)
if err != nil {
return 0, fmt.Errorf("list folders: %w", err)
}
// Build excluded set from account config
excluded := make(map[string]bool)
for _, f := range acc.ExcludedFolders {
excluded[f] = true
}
// Collect included folders
var includedFolders []string
for _, f := range folders {
if !excluded[f.Name] {
includedFolders = append(includedFolders, f.Name)
}
}
// Count total messages across all folders first
totalMsgs := 0
folderUIDs := make(map[string][]imapv2.UID)
for _, folder := range includedFolders {
selectData, err := c.Select(folder, nil).Wait()
if err != nil {
log.Warn("failed to select folder, skipping", "folder", folder, "err", err)
continue
}
_ = selectData
searchCmd := c.UIDSearch(&imapv2.SearchCriteria{}, nil)
searchData, err := searchCmd.Wait()
if err != nil {
log.Warn("failed to search folder, skipping", "folder", folder, "err", err)
continue
}
uids := searchData.AllUIDs()
folderUIDs[folder] = uids
totalMsgs += len(uids)
}
log.Info("starting import", "folders", len(includedFolders), "total_messages", totalMsgs)
_ = imp.store.UpdateStatus(ctx, acc.ID, "running", "", 0, totalMsgs)
imported := 0
processed := 0
for _, folder := range includedFolders {
uids, ok := folderUIDs[folder]
if !ok || len(uids) == 0 {
continue
}
// Need to re-select the folder before fetching
if _, err := c.Select(folder, nil).Wait(); err != nil {
log.Warn("failed to re-select folder", "folder", folder, "err", err)
continue
}
log.Info("importing folder", "folder", folder, "messages", len(uids))
// Process in batches
for i := 0; i < len(uids); i += batchSize {
end := i + batchSize
if end > len(uids) {
end = len(uids)
}
batch := uids[i:end]
// Set per-batch deadline to prevent indefinite blocking on stalled connections.
c.SetFetchDeadline()
count, err := imp.fetchBatch(ctx, c.Client, batch, acc.TenantID, log)
c.ClearDeadline()
if err != nil {
log.Error("batch fetch error — aborting import", "folder", folder, "offset", i, "err", err)
return imported, fmt.Errorf("fetch batch %d in %q: %w", i, folder, err)
}
imported += count
processed += len(batch)
_ = imp.store.UpdateStatus(ctx, acc.ID, "running", "", processed, totalMsgs)
}
}
return imported, nil
}
// fetchBatch fetches and stores a batch of messages by UID.
func (imp *Importer) fetchBatch(ctx context.Context, c *imapclient.Client, uids []imapv2.UID, tenantID *int64, log *slog.Logger) (int, error) {
if len(uids) == 0 {
return 0, nil
}
fetchOptions := &imapv2.FetchOptions{
UID: true,
BodySection: []*imapv2.FetchItemBodySection{{}},
}
seqSet := imapv2.UIDSetNum(uids...)
fetchCmd := c.Fetch(seqSet, fetchOptions)
imported := 0
for {
msg := fetchCmd.Next()
if msg == nil {
break
}
// Collect body sections from this message
for {
item := msg.Next()
if item == nil {
break
}
switch body := item.(type) {
case imapclient.FetchItemDataBodySection:
raw, err := io.ReadAll(body.Literal)
if err != nil {
log.Warn("failed to read message body", "err", err)
continue
}
if err := imp.storeAndIndex(raw, tenantID, log); err != nil {
log.Warn("failed to store/index message", "err", err)
continue
}
imported++
}
}
}
if err := fetchCmd.Close(); err != nil {
return imported, fmt.Errorf("fetch close: %w", err)
}
return imported, nil
}
// storeAndIndex saves a raw email to storage and indexes it.
func (imp *Importer) storeAndIndex(raw []byte, tenantID *int64, log *slog.Logger) error {
ctx := context.Background()
// Save to file storage (deduplicates by SHA256 automatically)
id, err := imp.mailStore.Save(ctx, raw, time.Now(), tenantID)
if err != nil {
return fmt.Errorf("save: %w", err)
}
// Parse for indexing
pm, err := mailparser.Parse(raw)
if err != nil {
log.Warn("failed to parse mail for indexing", "id", id, "err", err)
// Store succeeded, just skip indexing for unparseable mails
return nil
}
// Build attachment names string
var attachNames []string
for _, a := range pm.Attachments {
if a.Filename != "" {
attachNames = append(attachNames, a.Filename)
}
}
doc := index.MailDocument{
ID: id,
From: pm.From,
To: strings.Join(pm.To, ", "),
CC: strings.Join(pm.CC, ", "),
Subject: pm.Subject,
Body: pm.TextBody,
AttachNames: strings.Join(attachNames, " "),
HasAttachment: len(pm.Attachments) > 0,
Date: pm.Date,
Size: int64(len(raw)),
TenantID: tenantID,
}
if err := imp.idxMgr.ForTenant(tenantID).IndexSync(doc); err != nil {
log.Warn("failed to index mail", "id", id, "err", err)
// Non-fatal: mail is stored, just not searchable yet
}
// PROJ-44: enqueue OCR job for any mail with attachments. Submit is
// non-blocking; mails with no OCR-eligible parts get marked 'skipped'
// by the worker, so the queue stays in sync regardless.
if imp.ocrSubmit != nil && len(pm.Attachments) > 0 {
imp.ocrSubmit(id, tenantID)
}
return nil
}