package imap import ( "context" "fmt" "io" "log/slog" "strings" "time" imapv2 "github.com/emersion/go-imap/v2" "github.com/emersion/go-imap/v2/imapclient" "github.com/archivmail/internal/index" "github.com/archivmail/internal/storage" "github.com/archivmail/pkg/mailparser" ) const batchSize = 50 // Importer runs background IMAP import jobs. type Importer struct { store *Store mailStore *storage.Store idx index.Indexer logger *slog.Logger } // NewImporter creates a new Importer wired to the storage and index backends. func NewImporter(store *Store, mailStore *storage.Store, idx index.Indexer, logger *slog.Logger) *Importer { return &Importer{ store: store, mailStore: mailStore, idx: idx, logger: logger, } } // Run performs a full IMAP import for the given account. It is designed to be // called as a goroutine: go imp.Run(context.Background(), accountID) func (imp *Importer) Run(ctx context.Context, accountID int64) { log := imp.logger.With("component", "imap-importer", "account_id", accountID) acc, err := imp.store.Get(ctx, accountID) if err != nil { log.Error("failed to get account", "err", err) return } password, err := imp.store.GetPassword(ctx, accountID) if err != nil { log.Error("failed to decrypt password", "err", err) _ = imp.store.UpdateStatus(ctx, accountID, "error", "failed to decrypt password", 0, 0) return } // Mark as running if err := imp.store.UpdateStatus(ctx, accountID, "running", "", 0, 0); err != nil { log.Error("failed to update status", "err", err) return } imported, err := imp.doImport(ctx, acc, password, log) if err != nil { log.Error("import failed", "err", err) _ = imp.store.UpdateStatus(ctx, accountID, "error", err.Error(), 0, 0) return } if err := imp.store.UpdateDone(ctx, accountID, imported); err != nil { log.Error("failed to update done", "err", err) } log.Info("import completed", "imported", imported) } // doImport handles the actual IMAP connection, folder iteration, and message fetching. func (imp *Importer) doImport(ctx context.Context, acc *Account, password string, log *slog.Logger) (int, error) { c, err := Connect(acc.Host, acc.Port, acc.TLS) if err != nil { return 0, fmt.Errorf("connect: %w", err) } defer c.Close() // Login if err := c.Login(acc.Username, password).Wait(); err != nil { return 0, fmt.Errorf("login: %w", err) } // List all folders folders, err := ListFolders(c) if err != nil { return 0, fmt.Errorf("list folders: %w", err) } // Build excluded set from account config excluded := make(map[string]bool) for _, f := range acc.ExcludedFolders { excluded[f] = true } // Collect included folders var includedFolders []string for _, f := range folders { if !excluded[f.Name] { includedFolders = append(includedFolders, f.Name) } } // Count total messages across all folders first totalMsgs := 0 folderUIDs := make(map[string][]imapv2.UID) for _, folder := range includedFolders { selectData, err := c.Select(folder, nil).Wait() if err != nil { log.Warn("failed to select folder, skipping", "folder", folder, "err", err) continue } _ = selectData searchCmd := c.UIDSearch(&imapv2.SearchCriteria{}, nil) searchData, err := searchCmd.Wait() if err != nil { log.Warn("failed to search folder, skipping", "folder", folder, "err", err) continue } uids := searchData.AllUIDs() folderUIDs[folder] = uids totalMsgs += len(uids) } log.Info("starting import", "folders", len(includedFolders), "total_messages", totalMsgs) _ = imp.store.UpdateStatus(ctx, acc.ID, "running", "", 0, totalMsgs) imported := 0 processed := 0 for _, folder := range includedFolders { uids, ok := folderUIDs[folder] if !ok || len(uids) == 0 { continue } // Need to re-select the folder before fetching if _, err := c.Select(folder, nil).Wait(); err != nil { log.Warn("failed to re-select folder", "folder", folder, "err", err) continue } log.Info("importing folder", "folder", folder, "messages", len(uids)) // Process in batches for i := 0; i < len(uids); i += batchSize { end := i + batchSize if end > len(uids) { end = len(uids) } batch := uids[i:end] count, err := imp.fetchBatch(ctx, c, batch, log) if err != nil { log.Error("batch fetch error", "folder", folder, "offset", i, "err", err) // Continue with the next batch rather than aborting entirely continue } imported += count processed += len(batch) _ = imp.store.UpdateStatus(ctx, acc.ID, "running", "", processed, totalMsgs) } } return imported, nil } // fetchBatch fetches and stores a batch of messages by UID. func (imp *Importer) fetchBatch(ctx context.Context, c *imapclient.Client, uids []imapv2.UID, log *slog.Logger) (int, error) { if len(uids) == 0 { return 0, nil } fetchOptions := &imapv2.FetchOptions{ UID: true, BodySection: []*imapv2.FetchItemBodySection{{}}, } seqSet := imapv2.UIDSetNum(uids...) fetchCmd := c.Fetch(seqSet, fetchOptions) imported := 0 for { msg := fetchCmd.Next() if msg == nil { break } // Collect body sections from this message for { item := msg.Next() if item == nil { break } switch body := item.(type) { case imapclient.FetchItemDataBodySection: raw, err := io.ReadAll(body.Literal) if err != nil { log.Warn("failed to read message body", "err", err) continue } if err := imp.storeAndIndex(raw, log); err != nil { log.Warn("failed to store/index message", "err", err) continue } imported++ } } } if err := fetchCmd.Close(); err != nil { return imported, fmt.Errorf("fetch close: %w", err) } return imported, nil } // storeAndIndex saves a raw email to storage and indexes it. func (imp *Importer) storeAndIndex(raw []byte, log *slog.Logger) error { // Save to file storage (deduplicates by SHA256 automatically) id, err := imp.mailStore.Save(raw, time.Now()) if err != nil { return fmt.Errorf("save: %w", err) } // Parse for indexing pm, err := mailparser.Parse(raw) if err != nil { log.Warn("failed to parse mail for indexing", "id", id, "err", err) // Store succeeded, just skip indexing for unparseable mails return nil } // Build attachment names string var attachNames []string for _, a := range pm.Attachments { if a.Filename != "" { attachNames = append(attachNames, a.Filename) } } doc := index.MailDocument{ ID: id, From: pm.From, To: strings.Join(pm.To, ", "), Subject: pm.Subject, Body: pm.TextBody, AttachNames: strings.Join(attachNames, " "), HasAttachment: len(pm.Attachments) > 0, Date: pm.Date, Size: int64(len(raw)), } if err := imp.idx.IndexSync(doc); err != nil { log.Warn("failed to index mail", "id", id, "err", err) // Non-fatal: mail is stored, just not searchable yet } return nil }