Files
archivmail/internal/pop3/importer.go
T
sysops 479c27e5a8 feat(PROJ-21): Phase 2+3+5+8 Multi-Tenancy + PROJ-2 EML/MBOX Upload
Phase 2a: userstore domain_admin/superadmin Rollen, User.TenantID,
          ListByTenant, UpsertLDAPUser mit tenantID
Phase 2b: storage.Save() mit tenantID *int64, email_refs Tabelle,
          GetTenantForMail, GetAllIDsByTenant, StatsByTenant
Phase 2c: JWT-Claims tenant_id/tenant_slug, Session.TenantID,
          Login Domain-Erkennung via E-Mail-Domain
Phase 3:  tenantMiddleware, Handler-Filterung (Users, Mail, Stats)
Phase 5:  SMTP Domain-Routing via DomainToTenantFunc Callback,
          config smtp.tenant_routing + default_tenant_id
Phase 8:  archivmail migrate-tenants Subkommando
PROJ-2:   Upload-Seite /admin/upload mit DropZone + Progress-Polling

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-17 21:03:40 +01:00

171 lines
4.5 KiB
Go

package pop3
import (
"context"
"fmt"
"log/slog"
"strings"
"time"
"github.com/archivmail/internal/index"
"github.com/archivmail/internal/storage"
"github.com/archivmail/pkg/mailparser"
)
// Importer runs background POP3 import jobs.
type Importer struct {
store *Store
mailStore *storage.Store
idx index.Indexer
logger *slog.Logger
TenantID *int64 // optional tenant assignment for stored mails
}
// NewImporter creates a new Importer wired to the storage and index backends.
func NewImporter(store *Store, mailStore *storage.Store, idx index.Indexer, logger *slog.Logger) *Importer {
return &Importer{
store: store,
mailStore: mailStore,
idx: idx,
logger: logger,
}
}
// Run performs a full POP3 import for the given account. It is designed to be
// called as a goroutine: go imp.Run(context.Background(), accountID)
func (imp *Importer) Run(ctx context.Context, accountID int64) {
log := imp.logger.With("component", "pop3-importer", "account_id", accountID)
acc, err := imp.store.Get(ctx, accountID)
if err != nil {
log.Error("failed to get account", "err", err)
return
}
password, err := imp.store.GetPassword(ctx, accountID)
if err != nil {
log.Error("failed to decrypt password", "err", err)
_ = imp.store.UpdateStatus(ctx, accountID, "error", "failed to decrypt password", 0, 0)
return
}
// Mark as running
if err := imp.store.UpdateStatus(ctx, accountID, "running", "", 0, 0); err != nil {
log.Error("failed to update status", "err", err)
return
}
imported, err := imp.doImport(ctx, acc, password, log)
if err != nil {
log.Error("import failed", "err", err)
_ = imp.store.UpdateStatus(ctx, accountID, "error", err.Error(), 0, 0)
return
}
if err := imp.store.UpdateDone(ctx, accountID, imported); err != nil {
log.Error("failed to update done", "err", err)
}
log.Info("import completed", "imported", imported)
}
// doImport handles the actual POP3 connection and message retrieval.
func (imp *Importer) doImport(ctx context.Context, acc *Account, password string, log *slog.Logger) (int, error) {
c, err := Dial(acc.Host, acc.Port, acc.TLS, acc.TLSSkipVerify)
if err != nil {
return 0, fmt.Errorf("pop3 connect: %w", err)
}
defer c.Close()
if err := c.Login(acc.Username, password); err != nil {
return 0, fmt.Errorf("pop3 login: %w", err)
}
total, _, err := c.Stat()
if err != nil {
return 0, fmt.Errorf("pop3 stat: %w", err)
}
nums, err := c.List()
if err != nil {
return 0, fmt.Errorf("pop3 list: %w", err)
}
log.Info("starting pop3 import", "total", total, "listed", len(nums))
_ = imp.store.UpdateStatus(ctx, acc.ID, "running", "", 0, len(nums))
imported := 0
for i, num := range nums {
// Check context cancellation
select {
case <-ctx.Done():
_ = c.Quit()
return imported, ctx.Err()
default:
}
raw, err := c.Retr(num)
if err != nil {
log.Warn("failed to retrieve message, skipping", "msg_num", num, "err", err)
_ = imp.store.UpdateStatus(ctx, acc.ID, "running", "", i+1, len(nums))
continue
}
if err := imp.storeAndIndex(raw, log); err != nil {
log.Warn("failed to store/index message, skipping", "msg_num", num, "err", err)
} else {
imported++
}
_ = imp.store.UpdateStatus(ctx, acc.ID, "running", "", i+1, len(nums))
}
_ = c.Quit()
return imported, nil
}
// storeAndIndex saves a raw email to storage and indexes it.
func (imp *Importer) storeAndIndex(raw []byte, log *slog.Logger) error {
ctx := context.Background()
// Save to file storage (deduplicates by SHA256 automatically)
id, err := imp.mailStore.Save(ctx, raw, time.Now(), imp.TenantID)
if err != nil {
return fmt.Errorf("pop3 save: %w", err)
}
// Parse for indexing
pm, err := mailparser.Parse(raw)
if err != nil {
log.Warn("failed to parse mail for indexing", "id", id, "err", err)
// Store succeeded — skip indexing for unparseable mails
return nil
}
// Build attachment names string
var attachNames []string
for _, a := range pm.Attachments {
if a.Filename != "" {
attachNames = append(attachNames, a.Filename)
}
}
doc := index.MailDocument{
ID: id,
From: pm.From,
To: strings.Join(pm.To, ", "),
Subject: pm.Subject,
Body: pm.TextBody,
AttachNames: strings.Join(attachNames, " "),
HasAttachment: len(pm.Attachments) > 0,
Date: pm.Date,
Size: int64(len(raw)),
}
if err := imp.idx.IndexSync(doc); err != nil {
log.Warn("failed to index mail", "id", id, "err", err)
// Non-fatal: mail is stored, just not searchable yet
}
return nil
}