package imap import ( "context" "fmt" "io" "log/slog" "strings" "time" imapv2 "github.com/emersion/go-imap/v2" "github.com/emersion/go-imap/v2/imapclient" "archivmail/internal/index" "archivmail/internal/storage" "archivmail/pkg/mailparser" ) const batchSize = 50 // Importer runs background IMAP import jobs. type Importer struct { store *Store mailStore *storage.Store idxMgr index.TenantIndexer logger *slog.Logger // PROJ-44: optional hook into the async OCR worker. Wired in main.go // via SetOCRSubmit so the imap package does not import internal/ocr. ocrSubmit func(mailID string, tenantID *int64) } // NewImporter creates a new Importer wired to the storage and index backends. func NewImporter(store *Store, mailStore *storage.Store, idxMgr index.TenantIndexer, logger *slog.Logger) *Importer { return &Importer{ store: store, mailStore: mailStore, idxMgr: idxMgr, logger: logger, } } // SetOCRSubmit installs a non-blocking callback that enqueues a mail for // OCR processing. If never called, IMAP-imported mails are not OCR'd — // they remain in ocr_status='pending' forever (PROJ-44 fix). func (imp *Importer) SetOCRSubmit(fn func(mailID string, tenantID *int64)) { imp.ocrSubmit = fn } // Run performs a full IMAP import for the given account. It is designed to be // called as a goroutine: go imp.Run(context.Background(), accountID) func (imp *Importer) Run(ctx context.Context, accountID int64) { log := imp.logger.With("component", "imap-importer", "account_id", accountID) acc, err := imp.store.Get(ctx, accountID) if err != nil { log.Error("failed to get account", "err", err) return } password, err := imp.store.GetPassword(ctx, accountID) if err != nil { log.Error("failed to decrypt password", "err", err) _ = imp.store.UpdateStatus(ctx, accountID, "error", "failed to decrypt password", 0, 0) return } // Mark as running if err := imp.store.UpdateStatus(ctx, accountID, "running", "", 0, 0); err != nil { log.Error("failed to update status", "err", err) return } imported, err := imp.doImport(ctx, acc, password, log) if err != nil { log.Error("import failed", "err", err) _ = imp.store.UpdateStatus(ctx, accountID, "error", err.Error(), 0, 0) return } if err := imp.store.UpdateDone(ctx, accountID, imported); err != nil { log.Error("failed to update done", "err", err) } log.Info("import completed", "imported", imported) } // doImport handles the actual IMAP connection, folder iteration, and message fetching. func (imp *Importer) doImport(ctx context.Context, acc *Account, password string, log *slog.Logger) (int, error) { c, err := Connect(acc.Host, acc.Port, acc.TLS) if err != nil { return 0, fmt.Errorf("connect: %w", err) } defer c.Close() // Login if err := c.Login(acc.Username, password).Wait(); err != nil { return 0, fmt.Errorf("login: %w", err) } // List all folders folders, err := ListFolders(c.Client) if err != nil { return 0, fmt.Errorf("list folders: %w", err) } // Build excluded set from account config excluded := make(map[string]bool) for _, f := range acc.ExcludedFolders { excluded[f] = true } // Collect included folders var includedFolders []string for _, f := range folders { if !excluded[f.Name] { includedFolders = append(includedFolders, f.Name) } } // Count total messages across all folders first totalMsgs := 0 folderUIDs := make(map[string][]imapv2.UID) for _, folder := range includedFolders { selectData, err := c.Select(folder, nil).Wait() if err != nil { log.Warn("failed to select folder, skipping", "folder", folder, "err", err) continue } _ = selectData searchCmd := c.UIDSearch(&imapv2.SearchCriteria{}, nil) searchData, err := searchCmd.Wait() if err != nil { log.Warn("failed to search folder, skipping", "folder", folder, "err", err) continue } uids := searchData.AllUIDs() folderUIDs[folder] = uids totalMsgs += len(uids) } log.Info("starting import", "folders", len(includedFolders), "total_messages", totalMsgs) _ = imp.store.UpdateStatus(ctx, acc.ID, "running", "", 0, totalMsgs) imported := 0 processed := 0 for _, folder := range includedFolders { uids, ok := folderUIDs[folder] if !ok || len(uids) == 0 { continue } // Need to re-select the folder before fetching if _, err := c.Select(folder, nil).Wait(); err != nil { log.Warn("failed to re-select folder", "folder", folder, "err", err) continue } log.Info("importing folder", "folder", folder, "messages", len(uids)) // Process in batches for i := 0; i < len(uids); i += batchSize { end := i + batchSize if end > len(uids) { end = len(uids) } batch := uids[i:end] // Set per-batch deadline to prevent indefinite blocking on stalled connections. c.SetFetchDeadline() count, err := imp.fetchBatch(ctx, c.Client, batch, acc.TenantID, log) c.ClearDeadline() if err != nil { log.Error("batch fetch error — aborting import", "folder", folder, "offset", i, "err", err) return imported, fmt.Errorf("fetch batch %d in %q: %w", i, folder, err) } imported += count processed += len(batch) _ = imp.store.UpdateStatus(ctx, acc.ID, "running", "", processed, totalMsgs) } } return imported, nil } // fetchBatch fetches and stores a batch of messages by UID. func (imp *Importer) fetchBatch(ctx context.Context, c *imapclient.Client, uids []imapv2.UID, tenantID *int64, log *slog.Logger) (int, error) { if len(uids) == 0 { return 0, nil } fetchOptions := &imapv2.FetchOptions{ UID: true, BodySection: []*imapv2.FetchItemBodySection{{}}, } seqSet := imapv2.UIDSetNum(uids...) fetchCmd := c.Fetch(seqSet, fetchOptions) imported := 0 for { msg := fetchCmd.Next() if msg == nil { break } // Collect body sections from this message for { item := msg.Next() if item == nil { break } switch body := item.(type) { case imapclient.FetchItemDataBodySection: raw, err := io.ReadAll(body.Literal) if err != nil { log.Warn("failed to read message body", "err", err) continue } if err := imp.storeAndIndex(raw, tenantID, log); err != nil { log.Warn("failed to store/index message", "err", err) continue } imported++ } } } if err := fetchCmd.Close(); err != nil { return imported, fmt.Errorf("fetch close: %w", err) } return imported, nil } // storeAndIndex saves a raw email to storage and indexes it. func (imp *Importer) storeAndIndex(raw []byte, tenantID *int64, log *slog.Logger) error { ctx := context.Background() // Save to file storage (deduplicates by SHA256 automatically) id, err := imp.mailStore.Save(ctx, raw, time.Now(), tenantID) if err != nil { return fmt.Errorf("save: %w", err) } // Parse for indexing pm, err := mailparser.Parse(raw) if err != nil { log.Warn("failed to parse mail for indexing", "id", id, "err", err) // Store succeeded, just skip indexing for unparseable mails return nil } // Build attachment names string var attachNames []string for _, a := range pm.Attachments { if a.Filename != "" { attachNames = append(attachNames, a.Filename) } } doc := index.MailDocument{ ID: id, From: pm.From, To: strings.Join(pm.To, ", "), Subject: pm.Subject, Body: pm.TextBody, AttachNames: strings.Join(attachNames, " "), HasAttachment: len(pm.Attachments) > 0, Date: pm.Date, Size: int64(len(raw)), TenantID: tenantID, } if err := imp.idxMgr.ForTenant(tenantID).IndexSync(doc); err != nil { log.Warn("failed to index mail", "id", id, "err", err) // Non-fatal: mail is stored, just not searchable yet } // PROJ-44: enqueue OCR job for any mail with attachments. Submit is // non-blocking; mails with no OCR-eligible parts get marked 'skipped' // by the worker, so the queue stays in sync regardless. if imp.ocrSubmit != nil && len(pm.Attachments) > 0 { imp.ocrSubmit(id, tenantID) } return nil }