2bab61209c
Go-Modul in go.mod und allen 45 Go-Dateien umbenannt.
216 lines
5.1 KiB
Go
216 lines
5.1 KiB
Go
package api
|
|
|
|
import (
|
|
"context"
|
|
"crypto/rand"
|
|
"encoding/hex"
|
|
"net/http"
|
|
"strings"
|
|
"sync"
|
|
|
|
"archivmail/internal/index"
|
|
"archivmail/pkg/mailparser"
|
|
)
|
|
|
|
// UploadJob tracks the progress of an EML/MBOX import job.
|
|
type UploadJob struct {
|
|
mu sync.Mutex
|
|
ID string `json:"id"`
|
|
Status string `json:"status"` // "running" | "done" | "error"
|
|
Total int `json:"total"`
|
|
Imported int `json:"imported"`
|
|
Skipped int `json:"skipped"`
|
|
Errors int `json:"errors"`
|
|
ErrMsg string `json:"error_msg,omitempty"`
|
|
}
|
|
|
|
func (j *UploadJob) snapshot() uploadJobSnapshot {
|
|
j.mu.Lock()
|
|
defer j.mu.Unlock()
|
|
return uploadJobSnapshot{
|
|
ID: j.ID,
|
|
Status: j.Status,
|
|
Total: j.Total,
|
|
Imported: j.Imported,
|
|
Skipped: j.Skipped,
|
|
Errors: j.Errors,
|
|
ErrMsg: j.ErrMsg,
|
|
}
|
|
}
|
|
|
|
type uploadJobSnapshot struct {
|
|
ID string `json:"id"`
|
|
Status string `json:"status"`
|
|
Total int `json:"total"`
|
|
Imported int `json:"imported"`
|
|
Skipped int `json:"skipped"`
|
|
Errors int `json:"errors"`
|
|
ErrMsg string `json:"error_msg,omitempty"`
|
|
}
|
|
|
|
// handleUpload accepts a multipart upload of one or more .eml or .mbox files,
|
|
// starts a background import job and returns its ID immediately.
|
|
func (s *Server) handleUpload(w http.ResponseWriter, r *http.Request) {
|
|
// 512 MB max total upload
|
|
if err := r.ParseMultipartForm(512 << 20); err != nil {
|
|
s.logger.Error("multipart parse failed", "err", err)
|
|
writeError(w, http.StatusBadRequest, "multipart parse failed")
|
|
return
|
|
}
|
|
|
|
files := r.MultipartForm.File["files"]
|
|
if len(files) == 0 {
|
|
writeError(w, http.StatusBadRequest, "no files uploaded")
|
|
return
|
|
}
|
|
|
|
// Collect all raw messages from uploaded files
|
|
type rawEntry struct {
|
|
data []byte
|
|
isMbox bool
|
|
}
|
|
var entries []rawEntry
|
|
|
|
for _, fh := range files {
|
|
f, err := fh.Open()
|
|
if err != nil {
|
|
continue
|
|
}
|
|
buf := make([]byte, fh.Size)
|
|
f.Read(buf) //nolint
|
|
f.Close()
|
|
|
|
name := strings.ToLower(fh.Filename)
|
|
isMbox := strings.HasSuffix(name, ".mbox")
|
|
entries = append(entries, rawEntry{buf, isMbox})
|
|
}
|
|
|
|
if len(entries) == 0 {
|
|
writeError(w, http.StatusBadRequest, "no readable files")
|
|
return
|
|
}
|
|
|
|
// Count total messages upfront
|
|
var allMessages [][]byte
|
|
for _, e := range entries {
|
|
if e.isMbox {
|
|
msgs := mailparser.SplitMbox(e.data)
|
|
allMessages = append(allMessages, msgs...)
|
|
} else {
|
|
allMessages = append(allMessages, e.data)
|
|
}
|
|
}
|
|
|
|
jobID := newJobID()
|
|
job := &UploadJob{
|
|
ID: jobID,
|
|
Status: "running",
|
|
Total: len(allMessages),
|
|
}
|
|
s.uploadJobs.Store(jobID, job)
|
|
|
|
// Propagate tenant from session context
|
|
tenantID := tenantFromCtx(r.Context())
|
|
|
|
// Run import in background
|
|
go s.runUploadJob(job, allMessages, tenantID)
|
|
|
|
writeJSON(w, http.StatusAccepted, map[string]string{"job_id": jobID})
|
|
}
|
|
|
|
// handleUploadProgress returns the current status of an upload job.
|
|
func (s *Server) handleUploadProgress(w http.ResponseWriter, r *http.Request) {
|
|
jobID := r.PathValue("jobID")
|
|
val, ok := s.uploadJobs.Load(jobID)
|
|
if !ok {
|
|
writeError(w, http.StatusNotFound, "job not found")
|
|
return
|
|
}
|
|
job := val.(*UploadJob)
|
|
writeJSON(w, http.StatusOK, job.snapshot())
|
|
}
|
|
|
|
func (s *Server) runUploadJob(job *UploadJob, messages [][]byte, tenantID *int64) {
|
|
ctx := context.Background()
|
|
|
|
for _, raw := range messages {
|
|
result := s.importRawMessage(ctx, raw, tenantID)
|
|
job.mu.Lock()
|
|
switch result {
|
|
case "imported":
|
|
job.Imported++
|
|
case "skipped":
|
|
job.Skipped++
|
|
default:
|
|
job.Errors++
|
|
}
|
|
job.mu.Unlock()
|
|
}
|
|
|
|
job.mu.Lock()
|
|
job.Status = "done"
|
|
job.mu.Unlock()
|
|
}
|
|
|
|
// importRawMessage stores and indexes a single raw message.
|
|
// Returns "imported", "skipped", or "error".
|
|
func (s *Server) importRawMessage(ctx context.Context, raw []byte, tenantID *int64) string {
|
|
pm, err := mailparser.Parse(raw)
|
|
if err != nil {
|
|
s.logger.Warn("upload: parse failed", "err", err)
|
|
return "error"
|
|
}
|
|
|
|
id, err := s.store.Save(ctx, raw, pm.Date, tenantID)
|
|
if err != nil {
|
|
s.logger.Warn("upload: save failed", "err", err)
|
|
return "error"
|
|
}
|
|
|
|
// Check dedup: storage.Save returns same id for duplicate content.
|
|
// If already indexed, skip indexing.
|
|
if already, _ := s.store.IsIndexed(ctx, id); already {
|
|
return "skipped"
|
|
}
|
|
|
|
var attachNames []string
|
|
for _, a := range pm.Attachments {
|
|
if a.Filename != "" {
|
|
attachNames = append(attachNames, a.Filename)
|
|
}
|
|
}
|
|
|
|
doc := index.MailDocument{
|
|
ID: id,
|
|
From: pm.From,
|
|
To: strings.Join(pm.To, " "),
|
|
Subject: pm.Subject,
|
|
Body: pm.TextBody + " " + pm.HTMLBody,
|
|
AttachNames: strings.Join(attachNames, " "),
|
|
HasAttachment: len(pm.Attachments) > 0,
|
|
Date: pm.Date,
|
|
Size: int64(len(raw)),
|
|
}
|
|
|
|
if err := s.idx.IndexSync(doc); err != nil {
|
|
s.logger.Warn("upload: index failed", "id", id, "err", err)
|
|
return "error"
|
|
}
|
|
|
|
if err := s.store.SetIndexedAt(ctx, id); err != nil {
|
|
s.logger.Warn("upload: set indexed_at failed", "id", id, "err", err)
|
|
}
|
|
|
|
if err := s.store.SaveMeta(ctx, id, pm, len(raw)); err != nil {
|
|
s.logger.Warn("upload: save meta failed", "id", id, "err", err)
|
|
}
|
|
|
|
return "imported"
|
|
}
|
|
|
|
func newJobID() string {
|
|
b := make([]byte, 8)
|
|
rand.Read(b) //nolint
|
|
return hex.EncodeToString(b)
|
|
}
|