032892bc2b
Bisher haben nur der SMTP-Pfad und der Boot-Backfill ocrWorker.Submit gerufen. IMAP- und POP3-Importer riefen nur idx.IndexSync auf — neue Mails blieben dadurch dauerhaft in ocr_status='pending' (auf 132 44 Tage 54 Mails so haengen geblieben). Fix: Importer-Strukturen bekommen einen optionalen ocrSubmit-Callback, in main.go via SetOCRSubmit gehookt. Kein Import von internal/ocr in die Importer-Packages -> kein Risiko von Cycles. Submit ist non-blocking; bei Mails ohne Attachments markiert der Worker selbst 'skipped'.
184 lines
5.0 KiB
Go
184 lines
5.0 KiB
Go
package pop3
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log/slog"
|
|
"strings"
|
|
"time"
|
|
|
|
"archivmail/internal/index"
|
|
"archivmail/internal/storage"
|
|
"archivmail/pkg/mailparser"
|
|
)
|
|
|
|
// Importer runs background POP3 import jobs.
|
|
type Importer struct {
|
|
store *Store
|
|
mailStore *storage.Store
|
|
idx index.Indexer
|
|
logger *slog.Logger
|
|
TenantID *int64 // optional tenant assignment for stored mails
|
|
// PROJ-44: optional OCR enqueue hook, wired from main.go.
|
|
ocrSubmit func(mailID string, tenantID *int64)
|
|
}
|
|
|
|
// NewImporter creates a new Importer wired to the storage and index backends.
|
|
func NewImporter(store *Store, mailStore *storage.Store, idx index.Indexer, logger *slog.Logger) *Importer {
|
|
return &Importer{
|
|
store: store,
|
|
mailStore: mailStore,
|
|
idx: idx,
|
|
logger: logger,
|
|
}
|
|
}
|
|
|
|
// SetOCRSubmit installs a non-blocking callback that enqueues a mail for
|
|
// OCR processing. See imap.Importer.SetOCRSubmit for rationale (PROJ-44).
|
|
func (imp *Importer) SetOCRSubmit(fn func(mailID string, tenantID *int64)) {
|
|
imp.ocrSubmit = fn
|
|
}
|
|
|
|
// Run performs a full POP3 import for the given account. It is designed to be
|
|
// called as a goroutine: go imp.Run(context.Background(), accountID)
|
|
func (imp *Importer) Run(ctx context.Context, accountID int64) {
|
|
log := imp.logger.With("component", "pop3-importer", "account_id", accountID)
|
|
|
|
acc, err := imp.store.Get(ctx, accountID)
|
|
if err != nil {
|
|
log.Error("failed to get account", "err", err)
|
|
return
|
|
}
|
|
|
|
password, err := imp.store.GetPassword(ctx, accountID)
|
|
if err != nil {
|
|
log.Error("failed to decrypt password", "err", err)
|
|
_ = imp.store.UpdateStatus(ctx, accountID, "error", "failed to decrypt password", 0, 0)
|
|
return
|
|
}
|
|
|
|
// Mark as running
|
|
if err := imp.store.UpdateStatus(ctx, accountID, "running", "", 0, 0); err != nil {
|
|
log.Error("failed to update status", "err", err)
|
|
return
|
|
}
|
|
|
|
imported, err := imp.doImport(ctx, acc, password, log)
|
|
if err != nil {
|
|
log.Error("import failed", "err", err)
|
|
_ = imp.store.UpdateStatus(ctx, accountID, "error", err.Error(), 0, 0)
|
|
return
|
|
}
|
|
|
|
if err := imp.store.UpdateDone(ctx, accountID, imported); err != nil {
|
|
log.Error("failed to update done", "err", err)
|
|
}
|
|
|
|
log.Info("import completed", "imported", imported)
|
|
}
|
|
|
|
// doImport handles the actual POP3 connection and message retrieval.
|
|
func (imp *Importer) doImport(ctx context.Context, acc *Account, password string, log *slog.Logger) (int, error) {
|
|
c, err := Dial(acc.Host, acc.Port, acc.TLS, acc.TLSSkipVerify)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("pop3 connect: %w", err)
|
|
}
|
|
defer c.Close()
|
|
|
|
if err := c.Login(acc.Username, password); err != nil {
|
|
return 0, fmt.Errorf("pop3 login: %w", err)
|
|
}
|
|
|
|
total, _, err := c.Stat()
|
|
if err != nil {
|
|
return 0, fmt.Errorf("pop3 stat: %w", err)
|
|
}
|
|
|
|
nums, err := c.List()
|
|
if err != nil {
|
|
return 0, fmt.Errorf("pop3 list: %w", err)
|
|
}
|
|
|
|
log.Info("starting pop3 import", "total", total, "listed", len(nums))
|
|
_ = imp.store.UpdateStatus(ctx, acc.ID, "running", "", 0, len(nums))
|
|
|
|
imported := 0
|
|
for i, num := range nums {
|
|
// Check context cancellation
|
|
select {
|
|
case <-ctx.Done():
|
|
_ = c.Quit()
|
|
return imported, ctx.Err()
|
|
default:
|
|
}
|
|
|
|
raw, err := c.Retr(num)
|
|
if err != nil {
|
|
log.Warn("failed to retrieve message, skipping", "msg_num", num, "err", err)
|
|
_ = imp.store.UpdateStatus(ctx, acc.ID, "running", "", i+1, len(nums))
|
|
continue
|
|
}
|
|
|
|
if err := imp.storeAndIndex(raw, log); err != nil {
|
|
log.Warn("failed to store/index message, skipping", "msg_num", num, "err", err)
|
|
} else {
|
|
imported++
|
|
}
|
|
|
|
_ = imp.store.UpdateStatus(ctx, acc.ID, "running", "", i+1, len(nums))
|
|
}
|
|
|
|
_ = c.Quit()
|
|
return imported, nil
|
|
}
|
|
|
|
// storeAndIndex saves a raw email to storage and indexes it.
|
|
func (imp *Importer) storeAndIndex(raw []byte, log *slog.Logger) error {
|
|
ctx := context.Background()
|
|
// Save to file storage (deduplicates by SHA256 automatically)
|
|
id, err := imp.mailStore.Save(ctx, raw, time.Now(), imp.TenantID)
|
|
if err != nil {
|
|
return fmt.Errorf("pop3 save: %w", err)
|
|
}
|
|
|
|
// Parse for indexing
|
|
pm, err := mailparser.Parse(raw)
|
|
if err != nil {
|
|
log.Warn("failed to parse mail for indexing", "id", id, "err", err)
|
|
// Store succeeded — skip indexing for unparseable mails
|
|
return nil
|
|
}
|
|
|
|
// Build attachment names string
|
|
var attachNames []string
|
|
for _, a := range pm.Attachments {
|
|
if a.Filename != "" {
|
|
attachNames = append(attachNames, a.Filename)
|
|
}
|
|
}
|
|
|
|
doc := index.MailDocument{
|
|
ID: id,
|
|
From: pm.From,
|
|
To: strings.Join(pm.To, ", "),
|
|
Subject: pm.Subject,
|
|
Body: pm.TextBody,
|
|
AttachNames: strings.Join(attachNames, " "),
|
|
HasAttachment: len(pm.Attachments) > 0,
|
|
Date: pm.Date,
|
|
Size: int64(len(raw)),
|
|
}
|
|
|
|
if err := imp.idx.IndexSync(doc); err != nil {
|
|
log.Warn("failed to index mail", "id", id, "err", err)
|
|
// Non-fatal: mail is stored, just not searchable yet
|
|
}
|
|
|
|
// PROJ-44: enqueue OCR job for any mail with attachments.
|
|
if imp.ocrSubmit != nil && len(pm.Attachments) > 0 {
|
|
imp.ocrSubmit(id, imp.TenantID)
|
|
}
|
|
|
|
return nil
|
|
}
|