Files
archivmail/internal/api/upload.go
T
sysops 479c27e5a8 feat(PROJ-21): Phase 2+3+5+8 Multi-Tenancy + PROJ-2 EML/MBOX Upload
Phase 2a: userstore domain_admin/superadmin Rollen, User.TenantID,
          ListByTenant, UpsertLDAPUser mit tenantID
Phase 2b: storage.Save() mit tenantID *int64, email_refs Tabelle,
          GetTenantForMail, GetAllIDsByTenant, StatsByTenant
Phase 2c: JWT-Claims tenant_id/tenant_slug, Session.TenantID,
          Login Domain-Erkennung via E-Mail-Domain
Phase 3:  tenantMiddleware, Handler-Filterung (Users, Mail, Stats)
Phase 5:  SMTP Domain-Routing via DomainToTenantFunc Callback,
          config smtp.tenant_routing + default_tenant_id
Phase 8:  archivmail migrate-tenants Subkommando
PROJ-2:   Upload-Seite /admin/upload mit DropZone + Progress-Polling

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-17 21:03:40 +01:00

215 lines
5.1 KiB
Go

package api
import (
"context"
"crypto/rand"
"encoding/hex"
"net/http"
"strings"
"sync"
"github.com/archivmail/internal/index"
"github.com/archivmail/pkg/mailparser"
)
// UploadJob tracks the progress of an EML/MBOX import job.
type UploadJob struct {
mu sync.Mutex
ID string `json:"id"`
Status string `json:"status"` // "running" | "done" | "error"
Total int `json:"total"`
Imported int `json:"imported"`
Skipped int `json:"skipped"`
Errors int `json:"errors"`
ErrMsg string `json:"error_msg,omitempty"`
}
func (j *UploadJob) snapshot() uploadJobSnapshot {
j.mu.Lock()
defer j.mu.Unlock()
return uploadJobSnapshot{
ID: j.ID,
Status: j.Status,
Total: j.Total,
Imported: j.Imported,
Skipped: j.Skipped,
Errors: j.Errors,
ErrMsg: j.ErrMsg,
}
}
type uploadJobSnapshot struct {
ID string `json:"id"`
Status string `json:"status"`
Total int `json:"total"`
Imported int `json:"imported"`
Skipped int `json:"skipped"`
Errors int `json:"errors"`
ErrMsg string `json:"error_msg,omitempty"`
}
// handleUpload accepts a multipart upload of one or more .eml or .mbox files,
// starts a background import job and returns its ID immediately.
func (s *Server) handleUpload(w http.ResponseWriter, r *http.Request) {
// 512 MB max total upload
if err := r.ParseMultipartForm(512 << 20); err != nil {
writeError(w, http.StatusBadRequest, "multipart parse failed: "+err.Error())
return
}
files := r.MultipartForm.File["files"]
if len(files) == 0 {
writeError(w, http.StatusBadRequest, "no files uploaded")
return
}
// Collect all raw messages from uploaded files
type rawEntry struct {
data []byte
isMbox bool
}
var entries []rawEntry
for _, fh := range files {
f, err := fh.Open()
if err != nil {
continue
}
buf := make([]byte, fh.Size)
f.Read(buf) //nolint
f.Close()
name := strings.ToLower(fh.Filename)
isMbox := strings.HasSuffix(name, ".mbox")
entries = append(entries, rawEntry{buf, isMbox})
}
if len(entries) == 0 {
writeError(w, http.StatusBadRequest, "no readable files")
return
}
// Count total messages upfront
var allMessages [][]byte
for _, e := range entries {
if e.isMbox {
msgs := mailparser.SplitMbox(e.data)
allMessages = append(allMessages, msgs...)
} else {
allMessages = append(allMessages, e.data)
}
}
jobID := newJobID()
job := &UploadJob{
ID: jobID,
Status: "running",
Total: len(allMessages),
}
s.uploadJobs.Store(jobID, job)
// Propagate tenant from session context
tenantID := tenantFromCtx(r.Context())
// Run import in background
go s.runUploadJob(job, allMessages, tenantID)
writeJSON(w, http.StatusAccepted, map[string]string{"job_id": jobID})
}
// handleUploadProgress returns the current status of an upload job.
func (s *Server) handleUploadProgress(w http.ResponseWriter, r *http.Request) {
jobID := r.PathValue("jobID")
val, ok := s.uploadJobs.Load(jobID)
if !ok {
writeError(w, http.StatusNotFound, "job not found")
return
}
job := val.(*UploadJob)
writeJSON(w, http.StatusOK, job.snapshot())
}
func (s *Server) runUploadJob(job *UploadJob, messages [][]byte, tenantID *int64) {
ctx := context.Background()
for _, raw := range messages {
result := s.importRawMessage(ctx, raw, tenantID)
job.mu.Lock()
switch result {
case "imported":
job.Imported++
case "skipped":
job.Skipped++
default:
job.Errors++
}
job.mu.Unlock()
}
job.mu.Lock()
job.Status = "done"
job.mu.Unlock()
}
// importRawMessage stores and indexes a single raw message.
// Returns "imported", "skipped", or "error".
func (s *Server) importRawMessage(ctx context.Context, raw []byte, tenantID *int64) string {
pm, err := mailparser.Parse(raw)
if err != nil {
s.logger.Warn("upload: parse failed", "err", err)
return "error"
}
id, err := s.store.Save(ctx, raw, pm.Date, tenantID)
if err != nil {
s.logger.Warn("upload: save failed", "err", err)
return "error"
}
// Check dedup: storage.Save returns same id for duplicate content.
// If already indexed, skip indexing.
if already, _ := s.store.IsIndexed(ctx, id); already {
return "skipped"
}
var attachNames []string
for _, a := range pm.Attachments {
if a.Filename != "" {
attachNames = append(attachNames, a.Filename)
}
}
doc := index.MailDocument{
ID: id,
From: pm.From,
To: strings.Join(pm.To, " "),
Subject: pm.Subject,
Body: pm.TextBody + " " + pm.HTMLBody,
AttachNames: strings.Join(attachNames, " "),
HasAttachment: len(pm.Attachments) > 0,
Date: pm.Date,
Size: int64(len(raw)),
}
if err := s.idx.IndexSync(doc); err != nil {
s.logger.Warn("upload: index failed", "id", id, "err", err)
return "error"
}
if err := s.store.SetIndexedAt(ctx, id); err != nil {
s.logger.Warn("upload: set indexed_at failed", "id", id, "err", err)
}
if err := s.store.SaveMeta(ctx, id, pm, len(raw)); err != nil {
s.logger.Warn("upload: save meta failed", "id", id, "err", err)
}
return "imported"
}
func newJobID() string {
b := make([]byte, 8)
rand.Read(b) //nolint
return hex.EncodeToString(b)
}