feat(PROJ-14): POP3-Import — Client, Store, Importer, API-Routen, Frontend-Seite
This commit is contained in:
@@ -22,6 +22,7 @@ import (
|
||||
"github.com/archivmail/internal/auth"
|
||||
imapstore "github.com/archivmail/internal/imap"
|
||||
"github.com/archivmail/internal/index"
|
||||
pop3store "github.com/archivmail/internal/pop3"
|
||||
"github.com/archivmail/internal/smtpd"
|
||||
"github.com/archivmail/internal/storage"
|
||||
"github.com/archivmail/internal/userstore"
|
||||
@@ -46,6 +47,8 @@ type Server struct {
|
||||
imapStore *imapstore.Store
|
||||
imapImporter *imapstore.Importer
|
||||
imapScheduler *imapstore.Scheduler
|
||||
pop3Store *pop3store.Store
|
||||
pop3Importer *pop3store.Importer
|
||||
uploadJobs sync.Map // jobID → *UploadJob
|
||||
}
|
||||
|
||||
@@ -61,6 +64,12 @@ func (s *Server) SetImap(store *imapstore.Store, importer *imapstore.Importer, s
|
||||
s.imapScheduler = scheduler
|
||||
}
|
||||
|
||||
// SetPop3 wires the POP3 store and importer into the API server after construction.
|
||||
func (s *Server) SetPop3(store *pop3store.Store, importer *pop3store.Importer) {
|
||||
s.pop3Store = store
|
||||
s.pop3Importer = importer
|
||||
}
|
||||
|
||||
// New creates and wires up a new API server.
|
||||
func New(
|
||||
cfg config.APIConfig,
|
||||
@@ -129,6 +138,14 @@ func (s *Server) routes() {
|
||||
s.mux.HandleFunc("POST /api/imap/{id}/import", s.authMiddleware(s.handleStartImport))
|
||||
s.mux.HandleFunc("GET /api/imap/{id}/progress", s.authMiddleware(s.handleImapProgress))
|
||||
s.mux.HandleFunc("POST /api/imap/{id}/sync", s.authMiddleware(s.handleSyncNow))
|
||||
|
||||
// POP3 routes (accessible to all authenticated users)
|
||||
s.mux.HandleFunc("GET /api/pop3", s.authMiddleware(s.handleListPop3))
|
||||
s.mux.HandleFunc("POST /api/pop3", s.authMiddleware(s.handleCreatePop3))
|
||||
s.mux.HandleFunc("DELETE /api/pop3/{id}", s.authMiddleware(s.handleDeletePop3))
|
||||
s.mux.HandleFunc("POST /api/pop3/test", s.authMiddleware(s.handleTestPop3))
|
||||
s.mux.HandleFunc("POST /api/pop3/{id}/import", s.authMiddleware(s.handleStartPop3Import))
|
||||
s.mux.HandleFunc("GET /api/pop3/{id}/progress", s.authMiddleware(s.handlePop3Progress))
|
||||
}
|
||||
|
||||
// ServeHTTP implements http.Handler.
|
||||
@@ -1696,6 +1713,230 @@ func (s *Server) handleSecurityFix(w http.ResponseWriter, r *http.Request) {
|
||||
writeJSON(w, http.StatusOK, map[string]string{"message": msg})
|
||||
}
|
||||
|
||||
// ── POP3 handlers ──────────────────────────────────────────────────────────
|
||||
|
||||
func (s *Server) handleListPop3(w http.ResponseWriter, r *http.Request) {
|
||||
if s.pop3Store == nil {
|
||||
writeError(w, http.StatusServiceUnavailable, "POP3 not configured")
|
||||
return
|
||||
}
|
||||
sess := sessionFromCtx(r.Context())
|
||||
isAdmin := sess.Role == userstore.RoleAdmin
|
||||
accounts, err := s.pop3Store.List(r.Context(), sess.Username, isAdmin)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, "failed to list POP3 accounts")
|
||||
return
|
||||
}
|
||||
if accounts == nil {
|
||||
accounts = []pop3store.Account{}
|
||||
}
|
||||
writeJSON(w, http.StatusOK, accounts)
|
||||
}
|
||||
|
||||
func (s *Server) handleCreatePop3(w http.ResponseWriter, r *http.Request) {
|
||||
if s.pop3Store == nil {
|
||||
writeError(w, http.StatusServiceUnavailable, "POP3 not configured")
|
||||
return
|
||||
}
|
||||
var req struct {
|
||||
Name string `json:"name"`
|
||||
Host string `json:"host"`
|
||||
Port int `json:"port"`
|
||||
TLS string `json:"tls"`
|
||||
TLSSkipVerify bool `json:"tls_skip_verify"`
|
||||
Username string `json:"username"`
|
||||
Password string `json:"password"`
|
||||
}
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
writeError(w, http.StatusBadRequest, "invalid request body")
|
||||
return
|
||||
}
|
||||
if req.Name == "" || req.Host == "" || req.Username == "" || req.Password == "" {
|
||||
writeError(w, http.StatusBadRequest, "name, host, username and password are required")
|
||||
return
|
||||
}
|
||||
if req.Port <= 0 {
|
||||
req.Port = 110
|
||||
}
|
||||
if req.TLS == "" {
|
||||
req.TLS = "none"
|
||||
}
|
||||
|
||||
sess := sessionFromCtx(r.Context())
|
||||
acc := pop3store.Account{
|
||||
Owner: sess.Username,
|
||||
Name: req.Name,
|
||||
Host: req.Host,
|
||||
Port: req.Port,
|
||||
TLS: req.TLS,
|
||||
TLSSkipVerify: req.TLSSkipVerify,
|
||||
Username: req.Username,
|
||||
}
|
||||
|
||||
created, err := s.pop3Store.Create(r.Context(), acc, req.Password)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, "failed to create POP3 account")
|
||||
return
|
||||
}
|
||||
|
||||
writeJSON(w, http.StatusCreated, created)
|
||||
}
|
||||
|
||||
func (s *Server) handleDeletePop3(w http.ResponseWriter, r *http.Request) {
|
||||
if s.pop3Store == nil {
|
||||
writeError(w, http.StatusServiceUnavailable, "POP3 not configured")
|
||||
return
|
||||
}
|
||||
idStr := r.PathValue("id")
|
||||
id, err := strconv.ParseInt(idStr, 10, 64)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusBadRequest, "invalid id")
|
||||
return
|
||||
}
|
||||
|
||||
acc, err := s.pop3Store.Get(r.Context(), id)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusNotFound, "account not found")
|
||||
return
|
||||
}
|
||||
|
||||
sess := sessionFromCtx(r.Context())
|
||||
if acc.Owner != sess.Username && sess.Role != userstore.RoleAdmin {
|
||||
writeError(w, http.StatusForbidden, "access denied")
|
||||
return
|
||||
}
|
||||
|
||||
if err := s.pop3Store.Delete(r.Context(), id); err != nil {
|
||||
writeError(w, http.StatusInternalServerError, "failed to delete account")
|
||||
return
|
||||
}
|
||||
|
||||
writeJSON(w, http.StatusOK, map[string]bool{"ok": true})
|
||||
}
|
||||
|
||||
func (s *Server) handleTestPop3(w http.ResponseWriter, r *http.Request) {
|
||||
var req struct {
|
||||
Host string `json:"host"`
|
||||
Port int `json:"port"`
|
||||
TLS string `json:"tls"`
|
||||
TLSSkipVerify bool `json:"tls_skip_verify"`
|
||||
Username string `json:"username"`
|
||||
Password string `json:"password"`
|
||||
}
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
writeError(w, http.StatusBadRequest, "invalid request body")
|
||||
return
|
||||
}
|
||||
if req.Host == "" || req.Username == "" || req.Password == "" {
|
||||
writeError(w, http.StatusBadRequest, "host, username and password are required")
|
||||
return
|
||||
}
|
||||
if req.Port <= 0 {
|
||||
req.Port = 110
|
||||
}
|
||||
if req.TLS == "" {
|
||||
req.TLS = "none"
|
||||
}
|
||||
|
||||
c, err := pop3store.Dial(req.Host, req.Port, req.TLS, req.TLSSkipVerify)
|
||||
if err != nil {
|
||||
writeJSON(w, http.StatusOK, map[string]interface{}{
|
||||
"ok": false,
|
||||
"message": fmt.Sprintf("Verbindung fehlgeschlagen: %v", err),
|
||||
})
|
||||
return
|
||||
}
|
||||
defer c.Close()
|
||||
|
||||
if err := c.Login(req.Username, req.Password); err != nil {
|
||||
writeJSON(w, http.StatusOK, map[string]interface{}{
|
||||
"ok": false,
|
||||
"message": fmt.Sprintf("Anmeldung fehlgeschlagen: %v", err),
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
count, totalSize, err := c.Stat()
|
||||
if err != nil {
|
||||
writeJSON(w, http.StatusOK, map[string]interface{}{
|
||||
"ok": false,
|
||||
"message": fmt.Sprintf("STAT fehlgeschlagen: %v", err),
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
_ = c.Quit()
|
||||
writeJSON(w, http.StatusOK, map[string]interface{}{
|
||||
"ok": true,
|
||||
"message": fmt.Sprintf("Verbindung erfolgreich: %d E-Mails", count),
|
||||
"message_count": count,
|
||||
"total_size_bytes": totalSize,
|
||||
})
|
||||
}
|
||||
|
||||
func (s *Server) handleStartPop3Import(w http.ResponseWriter, r *http.Request) {
|
||||
if s.pop3Store == nil || s.pop3Importer == nil {
|
||||
writeError(w, http.StatusServiceUnavailable, "POP3 not configured")
|
||||
return
|
||||
}
|
||||
idStr := r.PathValue("id")
|
||||
id, err := strconv.ParseInt(idStr, 10, 64)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusBadRequest, "invalid id")
|
||||
return
|
||||
}
|
||||
|
||||
acc, err := s.pop3Store.Get(r.Context(), id)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusNotFound, "account not found")
|
||||
return
|
||||
}
|
||||
|
||||
sess := sessionFromCtx(r.Context())
|
||||
if acc.Owner != sess.Username && sess.Role != userstore.RoleAdmin {
|
||||
writeError(w, http.StatusForbidden, "access denied")
|
||||
return
|
||||
}
|
||||
|
||||
if acc.Status == "running" {
|
||||
writeError(w, http.StatusConflict, "import already running")
|
||||
return
|
||||
}
|
||||
|
||||
go s.pop3Importer.Run(context.Background(), id)
|
||||
|
||||
// Return current account state (status will switch to "running" shortly)
|
||||
acc.Status = "running"
|
||||
writeJSON(w, http.StatusOK, acc)
|
||||
}
|
||||
|
||||
func (s *Server) handlePop3Progress(w http.ResponseWriter, r *http.Request) {
|
||||
if s.pop3Store == nil {
|
||||
writeError(w, http.StatusServiceUnavailable, "POP3 not configured")
|
||||
return
|
||||
}
|
||||
idStr := r.PathValue("id")
|
||||
id, err := strconv.ParseInt(idStr, 10, 64)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusBadRequest, "invalid id")
|
||||
return
|
||||
}
|
||||
|
||||
acc, err := s.pop3Store.Get(r.Context(), id)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusNotFound, "account not found")
|
||||
return
|
||||
}
|
||||
|
||||
sess := sessionFromCtx(r.Context())
|
||||
if acc.Owner != sess.Username && sess.Role != userstore.RoleAdmin {
|
||||
writeError(w, http.StatusForbidden, "access denied")
|
||||
return
|
||||
}
|
||||
|
||||
writeJSON(w, http.StatusOK, acc)
|
||||
}
|
||||
|
||||
// sshConfigSet sets or replaces a directive in /etc/ssh/sshd_config.
|
||||
// Commented-out lines are left untouched; the active directive is updated or appended.
|
||||
func sshConfigSet(key, value string) error {
|
||||
|
||||
@@ -0,0 +1,246 @@
|
||||
package pop3
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"net"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
const dialTimeout = 30 * time.Second
|
||||
const rwTimeout = 30 * time.Second
|
||||
|
||||
// Client is a minimal POP3 client implemented directly over net.Conn.
|
||||
// It supports SSL, STARTTLS, and plaintext connections.
|
||||
type Client struct {
|
||||
conn net.Conn
|
||||
reader *bufio.Reader
|
||||
}
|
||||
|
||||
// Dial connects to a POP3 server and reads the server greeting.
|
||||
// tlsMode must be one of: "ssl", "starttls", "none".
|
||||
func Dial(host string, port int, tlsMode string, skipVerify bool) (*Client, error) {
|
||||
addr := net.JoinHostPort(host, strconv.Itoa(port))
|
||||
tlsCfg := &tls.Config{
|
||||
ServerName: host,
|
||||
InsecureSkipVerify: skipVerify, //nolint:gosec // user-controlled opt-in
|
||||
}
|
||||
|
||||
var conn net.Conn
|
||||
var err error
|
||||
|
||||
switch tlsMode {
|
||||
case "ssl":
|
||||
dialer := &tls.Dialer{
|
||||
NetDialer: &net.Dialer{Timeout: dialTimeout},
|
||||
Config: tlsCfg,
|
||||
}
|
||||
conn, err = dialer.Dial("tcp", addr)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("pop3 dial ssl: %w", err)
|
||||
}
|
||||
case "starttls":
|
||||
plain, err2 := net.DialTimeout("tcp", addr, dialTimeout)
|
||||
if err2 != nil {
|
||||
return nil, fmt.Errorf("pop3 dial starttls plain: %w", err2)
|
||||
}
|
||||
c := &Client{conn: plain, reader: bufio.NewReader(plain)}
|
||||
// Read server greeting before STLS
|
||||
if _, err2 := c.readLine(); err2 != nil {
|
||||
plain.Close()
|
||||
return nil, fmt.Errorf("pop3 starttls greeting: %w", err2)
|
||||
}
|
||||
if err2 := c.sendCmd("STLS"); err2 != nil {
|
||||
plain.Close()
|
||||
return nil, fmt.Errorf("pop3 starttls send: %w", err2)
|
||||
}
|
||||
if _, err2 := c.readLine(); err2 != nil {
|
||||
plain.Close()
|
||||
return nil, fmt.Errorf("pop3 starttls response: %w", err2)
|
||||
}
|
||||
tlsConn := tls.Client(plain, tlsCfg)
|
||||
if err2 := tlsConn.Handshake(); err2 != nil {
|
||||
tlsConn.Close()
|
||||
return nil, fmt.Errorf("pop3 starttls handshake: %w", err2)
|
||||
}
|
||||
// Return upgraded client — greeting already consumed
|
||||
return &Client{conn: tlsConn, reader: bufio.NewReader(tlsConn)}, nil
|
||||
default: // "none"
|
||||
conn, err = net.DialTimeout("tcp", addr, dialTimeout)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("pop3 dial plain: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
c := &Client{conn: conn, reader: bufio.NewReader(conn)}
|
||||
// Read and discard the server greeting (+OK ...)
|
||||
if _, err := c.readLine(); err != nil {
|
||||
conn.Close()
|
||||
return nil, fmt.Errorf("pop3 greeting: %w", err)
|
||||
}
|
||||
return c, nil
|
||||
}
|
||||
|
||||
// Login authenticates with USER and PASS commands.
|
||||
func (c *Client) Login(user, pass string) error {
|
||||
if err := c.sendCmd("USER " + user); err != nil {
|
||||
return fmt.Errorf("pop3 login user: %w", err)
|
||||
}
|
||||
if _, err := c.readLine(); err != nil {
|
||||
return fmt.Errorf("pop3 login user response: %w", err)
|
||||
}
|
||||
if err := c.sendCmd("PASS " + pass); err != nil {
|
||||
return fmt.Errorf("pop3 login pass: %w", err)
|
||||
}
|
||||
if _, err := c.readLine(); err != nil {
|
||||
return fmt.Errorf("pop3 login pass response: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stat returns the message count and total mailbox size in bytes.
|
||||
func (c *Client) Stat() (count, size int, err error) {
|
||||
if err := c.sendCmd("STAT"); err != nil {
|
||||
return 0, 0, fmt.Errorf("pop3 stat send: %w", err)
|
||||
}
|
||||
line, err := c.readLine()
|
||||
if err != nil {
|
||||
return 0, 0, fmt.Errorf("pop3 stat read: %w", err)
|
||||
}
|
||||
// Response: "+OK count size"
|
||||
parts := strings.Fields(line)
|
||||
if len(parts) < 2 {
|
||||
return 0, 0, fmt.Errorf("pop3 stat: unexpected response: %q", line)
|
||||
}
|
||||
count, err = strconv.Atoi(parts[0])
|
||||
if err != nil {
|
||||
return 0, 0, fmt.Errorf("pop3 stat count parse: %w", err)
|
||||
}
|
||||
if len(parts) >= 2 {
|
||||
size, _ = strconv.Atoi(parts[1])
|
||||
}
|
||||
return count, size, nil
|
||||
}
|
||||
|
||||
// List returns the message numbers available on the server.
|
||||
func (c *Client) List() ([]int, error) {
|
||||
if err := c.sendCmd("LIST"); err != nil {
|
||||
return nil, fmt.Errorf("pop3 list send: %w", err)
|
||||
}
|
||||
// Read status line
|
||||
if _, err := c.readLine(); err != nil {
|
||||
return nil, fmt.Errorf("pop3 list status: %w", err)
|
||||
}
|
||||
// Read multi-line response
|
||||
data, err := c.readMultiLine()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("pop3 list multiline: %w", err)
|
||||
}
|
||||
var nums []int
|
||||
for _, line := range strings.Split(string(data), "\n") {
|
||||
line = strings.TrimSpace(line)
|
||||
if line == "" {
|
||||
continue
|
||||
}
|
||||
// Each line: "msgnum size"
|
||||
parts := strings.Fields(line)
|
||||
if len(parts) == 0 {
|
||||
continue
|
||||
}
|
||||
n, err := strconv.Atoi(parts[0])
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
nums = append(nums, n)
|
||||
}
|
||||
return nums, nil
|
||||
}
|
||||
|
||||
// Retr retrieves a message by its number and returns the raw RFC 2822 bytes.
|
||||
func (c *Client) Retr(num int) ([]byte, error) {
|
||||
if err := c.sendCmd(fmt.Sprintf("RETR %d", num)); err != nil {
|
||||
return nil, fmt.Errorf("pop3 retr send: %w", err)
|
||||
}
|
||||
// Read status line
|
||||
if _, err := c.readLine(); err != nil {
|
||||
return nil, fmt.Errorf("pop3 retr status: %w", err)
|
||||
}
|
||||
data, err := c.readMultiLine()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("pop3 retr multiline: %w", err)
|
||||
}
|
||||
return data, nil
|
||||
}
|
||||
|
||||
// Quit sends the QUIT command and waits for the server acknowledgement.
|
||||
func (c *Client) Quit() error {
|
||||
if err := c.sendCmd("QUIT"); err != nil {
|
||||
return fmt.Errorf("pop3 quit send: %w", err)
|
||||
}
|
||||
_, err := c.readLine()
|
||||
return err
|
||||
}
|
||||
|
||||
// Close closes the underlying network connection.
|
||||
func (c *Client) Close() {
|
||||
c.conn.Close()
|
||||
}
|
||||
|
||||
// sendCmd writes a POP3 command terminated with CRLF.
|
||||
func (c *Client) sendCmd(cmd string) error {
|
||||
_ = c.conn.SetWriteDeadline(time.Now().Add(rwTimeout))
|
||||
_, err := fmt.Fprintf(c.conn, "%s\r\n", cmd)
|
||||
if err != nil {
|
||||
return fmt.Errorf("pop3 write: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// readLine reads one response line from the server.
|
||||
// It strips the CRLF, verifies the +OK/-ERR prefix, and returns the
|
||||
// text after the status indicator (without the "+OK" / "-ERR" prefix).
|
||||
// An error is returned if the server replies with -ERR.
|
||||
func (c *Client) readLine() (string, error) {
|
||||
_ = c.conn.SetReadDeadline(time.Now().Add(rwTimeout))
|
||||
line, err := c.reader.ReadString('\n')
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("pop3 read line: %w", err)
|
||||
}
|
||||
line = strings.TrimRight(line, "\r\n")
|
||||
if strings.HasPrefix(line, "-ERR") {
|
||||
return "", fmt.Errorf("pop3 server error: %s", strings.TrimPrefix(line, "-ERR "))
|
||||
}
|
||||
// Strip "+OK" prefix
|
||||
if strings.HasPrefix(line, "+OK") {
|
||||
return strings.TrimPrefix(strings.TrimPrefix(line, "+OK"), " "), nil
|
||||
}
|
||||
return line, nil
|
||||
}
|
||||
|
||||
// readMultiLine reads a dot-stuffed multi-line POP3 response until the
|
||||
// terminating ".\r\n" line. Dot-unstuffing is applied: lines beginning
|
||||
// with ".." are returned with a single leading ".".
|
||||
func (c *Client) readMultiLine() ([]byte, error) {
|
||||
var buf bytes.Buffer
|
||||
for {
|
||||
_ = c.conn.SetReadDeadline(time.Now().Add(rwTimeout))
|
||||
line, err := c.reader.ReadString('\n')
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("pop3 read multiline: %w", err)
|
||||
}
|
||||
// Terminator: a single dot on a line by itself
|
||||
if line == ".\r\n" || line == ".\n" {
|
||||
break
|
||||
}
|
||||
// Dot-unstuffing: RFC 1939 §3 — lines beginning with ".." → "."
|
||||
if strings.HasPrefix(line, "..") {
|
||||
line = line[1:]
|
||||
}
|
||||
buf.WriteString(line)
|
||||
}
|
||||
return buf.Bytes(), nil
|
||||
}
|
||||
@@ -0,0 +1,168 @@
|
||||
package pop3
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/archivmail/internal/index"
|
||||
"github.com/archivmail/internal/storage"
|
||||
"github.com/archivmail/pkg/mailparser"
|
||||
)
|
||||
|
||||
// Importer runs background POP3 import jobs.
|
||||
type Importer struct {
|
||||
store *Store
|
||||
mailStore *storage.Store
|
||||
idx index.Indexer
|
||||
logger *slog.Logger
|
||||
}
|
||||
|
||||
// NewImporter creates a new Importer wired to the storage and index backends.
|
||||
func NewImporter(store *Store, mailStore *storage.Store, idx index.Indexer, logger *slog.Logger) *Importer {
|
||||
return &Importer{
|
||||
store: store,
|
||||
mailStore: mailStore,
|
||||
idx: idx,
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
// Run performs a full POP3 import for the given account. It is designed to be
|
||||
// called as a goroutine: go imp.Run(context.Background(), accountID)
|
||||
func (imp *Importer) Run(ctx context.Context, accountID int64) {
|
||||
log := imp.logger.With("component", "pop3-importer", "account_id", accountID)
|
||||
|
||||
acc, err := imp.store.Get(ctx, accountID)
|
||||
if err != nil {
|
||||
log.Error("failed to get account", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
password, err := imp.store.GetPassword(ctx, accountID)
|
||||
if err != nil {
|
||||
log.Error("failed to decrypt password", "err", err)
|
||||
_ = imp.store.UpdateStatus(ctx, accountID, "error", "failed to decrypt password", 0, 0)
|
||||
return
|
||||
}
|
||||
|
||||
// Mark as running
|
||||
if err := imp.store.UpdateStatus(ctx, accountID, "running", "", 0, 0); err != nil {
|
||||
log.Error("failed to update status", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
imported, err := imp.doImport(ctx, acc, password, log)
|
||||
if err != nil {
|
||||
log.Error("import failed", "err", err)
|
||||
_ = imp.store.UpdateStatus(ctx, accountID, "error", err.Error(), 0, 0)
|
||||
return
|
||||
}
|
||||
|
||||
if err := imp.store.UpdateDone(ctx, accountID, imported); err != nil {
|
||||
log.Error("failed to update done", "err", err)
|
||||
}
|
||||
|
||||
log.Info("import completed", "imported", imported)
|
||||
}
|
||||
|
||||
// doImport handles the actual POP3 connection and message retrieval.
|
||||
func (imp *Importer) doImport(ctx context.Context, acc *Account, password string, log *slog.Logger) (int, error) {
|
||||
c, err := Dial(acc.Host, acc.Port, acc.TLS, acc.TLSSkipVerify)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("pop3 connect: %w", err)
|
||||
}
|
||||
defer c.Close()
|
||||
|
||||
if err := c.Login(acc.Username, password); err != nil {
|
||||
return 0, fmt.Errorf("pop3 login: %w", err)
|
||||
}
|
||||
|
||||
total, _, err := c.Stat()
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("pop3 stat: %w", err)
|
||||
}
|
||||
|
||||
nums, err := c.List()
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("pop3 list: %w", err)
|
||||
}
|
||||
|
||||
log.Info("starting pop3 import", "total", total, "listed", len(nums))
|
||||
_ = imp.store.UpdateStatus(ctx, acc.ID, "running", "", 0, len(nums))
|
||||
|
||||
imported := 0
|
||||
for i, num := range nums {
|
||||
// Check context cancellation
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
_ = c.Quit()
|
||||
return imported, ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
raw, err := c.Retr(num)
|
||||
if err != nil {
|
||||
log.Warn("failed to retrieve message, skipping", "msg_num", num, "err", err)
|
||||
_ = imp.store.UpdateStatus(ctx, acc.ID, "running", "", i+1, len(nums))
|
||||
continue
|
||||
}
|
||||
|
||||
if err := imp.storeAndIndex(raw, log); err != nil {
|
||||
log.Warn("failed to store/index message, skipping", "msg_num", num, "err", err)
|
||||
} else {
|
||||
imported++
|
||||
}
|
||||
|
||||
_ = imp.store.UpdateStatus(ctx, acc.ID, "running", "", i+1, len(nums))
|
||||
}
|
||||
|
||||
_ = c.Quit()
|
||||
return imported, nil
|
||||
}
|
||||
|
||||
// storeAndIndex saves a raw email to storage and indexes it.
|
||||
func (imp *Importer) storeAndIndex(raw []byte, log *slog.Logger) error {
|
||||
// Save to file storage (deduplicates by SHA256 automatically)
|
||||
id, err := imp.mailStore.Save(raw, time.Now())
|
||||
if err != nil {
|
||||
return fmt.Errorf("pop3 save: %w", err)
|
||||
}
|
||||
|
||||
// Parse for indexing
|
||||
pm, err := mailparser.Parse(raw)
|
||||
if err != nil {
|
||||
log.Warn("failed to parse mail for indexing", "id", id, "err", err)
|
||||
// Store succeeded — skip indexing for unparseable mails
|
||||
return nil
|
||||
}
|
||||
|
||||
// Build attachment names string
|
||||
var attachNames []string
|
||||
for _, a := range pm.Attachments {
|
||||
if a.Filename != "" {
|
||||
attachNames = append(attachNames, a.Filename)
|
||||
}
|
||||
}
|
||||
|
||||
doc := index.MailDocument{
|
||||
ID: id,
|
||||
From: pm.From,
|
||||
To: strings.Join(pm.To, ", "),
|
||||
Subject: pm.Subject,
|
||||
Body: pm.TextBody,
|
||||
AttachNames: strings.Join(attachNames, " "),
|
||||
HasAttachment: len(pm.Attachments) > 0,
|
||||
Date: pm.Date,
|
||||
Size: int64(len(raw)),
|
||||
}
|
||||
|
||||
if err := imp.idx.IndexSync(doc); err != nil {
|
||||
log.Warn("failed to index mail", "id", id, "err", err)
|
||||
// Non-fatal: mail is stored, just not searchable yet
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -0,0 +1,263 @@
|
||||
// Package pop3 implements POP3 account management and import.
|
||||
// It provides a DB-backed store for POP3 account configurations,
|
||||
// a raw TCP/TLS POP3 client, and an import worker.
|
||||
package pop3
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/aes"
|
||||
"crypto/cipher"
|
||||
"crypto/rand"
|
||||
"crypto/sha256"
|
||||
"fmt"
|
||||
"io"
|
||||
"time"
|
||||
|
||||
"github.com/jackc/pgx/v5"
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
)
|
||||
|
||||
// Account represents a POP3 account configuration stored in the database.
|
||||
type Account struct {
|
||||
ID int64 `json:"id"`
|
||||
Owner string `json:"owner"`
|
||||
Name string `json:"name"`
|
||||
Host string `json:"host"`
|
||||
Port int `json:"port"`
|
||||
TLS string `json:"tls"`
|
||||
TLSSkipVerify bool `json:"tls_skip_verify"`
|
||||
Username string `json:"username"`
|
||||
Status string `json:"status"`
|
||||
ErrorMsg string `json:"error_msg"`
|
||||
LastImportAt *time.Time `json:"last_import_at,omitempty"`
|
||||
LastImportCount int `json:"last_import_count"`
|
||||
ProgressCurrent int `json:"progress_current"`
|
||||
ProgressTotal int `json:"progress_total"`
|
||||
CreatedAt time.Time `json:"created_at"`
|
||||
}
|
||||
|
||||
// Store manages POP3 account persistence in PostgreSQL.
|
||||
type Store struct {
|
||||
pool *pgxpool.Pool
|
||||
encKey [32]byte
|
||||
}
|
||||
|
||||
const createTableSQL = `
|
||||
CREATE TABLE IF NOT EXISTS pop3_accounts (
|
||||
id SERIAL PRIMARY KEY,
|
||||
owner TEXT NOT NULL,
|
||||
name TEXT NOT NULL,
|
||||
host TEXT NOT NULL,
|
||||
port INTEGER NOT NULL DEFAULT 110,
|
||||
tls TEXT NOT NULL DEFAULT 'none',
|
||||
tls_skip_verify BOOLEAN NOT NULL DEFAULT false,
|
||||
username TEXT NOT NULL,
|
||||
password_enc BYTEA NOT NULL,
|
||||
status TEXT NOT NULL DEFAULT 'idle',
|
||||
error_msg TEXT NOT NULL DEFAULT '',
|
||||
last_import_at TIMESTAMPTZ,
|
||||
last_import_count INTEGER NOT NULL DEFAULT 0,
|
||||
progress_current INTEGER NOT NULL DEFAULT 0,
|
||||
progress_total INTEGER NOT NULL DEFAULT 0,
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS idx_pop3_accounts_owner ON pop3_accounts (owner);
|
||||
`
|
||||
|
||||
// New creates a new Store, connects to PostgreSQL, and runs the schema migration.
|
||||
func New(dsn, secret string) (*Store, error) {
|
||||
pool, err := pgxpool.New(context.Background(), dsn)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("pop3 store: connect: %w", err)
|
||||
}
|
||||
|
||||
if _, err := pool.Exec(context.Background(), createTableSQL); err != nil {
|
||||
pool.Close()
|
||||
return nil, fmt.Errorf("pop3 store: migrate: %w", err)
|
||||
}
|
||||
|
||||
key := sha256.Sum256([]byte(secret))
|
||||
return &Store{pool: pool, encKey: key}, nil
|
||||
}
|
||||
|
||||
// Close releases the database connection pool.
|
||||
func (s *Store) Close() {
|
||||
s.pool.Close()
|
||||
}
|
||||
|
||||
// Create inserts a new POP3 account with an encrypted password.
|
||||
func (s *Store) Create(ctx context.Context, acc Account, password string) (*Account, error) {
|
||||
enc, err := encryptPassword(password, s.encKey)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("pop3 store: encrypt password: %w", err)
|
||||
}
|
||||
|
||||
row := s.pool.QueryRow(ctx, `
|
||||
INSERT INTO pop3_accounts (owner, name, host, port, tls, tls_skip_verify, username, password_enc)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
|
||||
RETURNING id, created_at`,
|
||||
acc.Owner, acc.Name, acc.Host, acc.Port, acc.TLS, acc.TLSSkipVerify, acc.Username, enc,
|
||||
)
|
||||
|
||||
if err := row.Scan(&acc.ID, &acc.CreatedAt); err != nil {
|
||||
return nil, fmt.Errorf("pop3 store: create: %w", err)
|
||||
}
|
||||
|
||||
acc.Status = "idle"
|
||||
acc.ErrorMsg = ""
|
||||
return &acc, nil
|
||||
}
|
||||
|
||||
// selectColumns is the canonical column list used in all SELECT statements.
|
||||
const selectColumns = ` id, owner, name, host, port, tls, tls_skip_verify, username,
|
||||
status, error_msg, last_import_at, last_import_count,
|
||||
progress_current, progress_total, created_at `
|
||||
|
||||
// scanner abstracts pgx.Row and pgx.Rows — both expose Scan(...any) error.
|
||||
type scanner interface {
|
||||
Scan(dest ...any) error
|
||||
}
|
||||
|
||||
func scanRow(row scanner) (Account, error) {
|
||||
var a Account
|
||||
err := row.Scan(
|
||||
&a.ID, &a.Owner, &a.Name, &a.Host, &a.Port, &a.TLS, &a.TLSSkipVerify, &a.Username,
|
||||
&a.Status, &a.ErrorMsg, &a.LastImportAt,
|
||||
&a.LastImportCount, &a.ProgressCurrent, &a.ProgressTotal, &a.CreatedAt,
|
||||
)
|
||||
return a, err
|
||||
}
|
||||
|
||||
// List returns POP3 accounts. Admins see all accounts; regular users see only their own.
|
||||
func (s *Store) List(ctx context.Context, owner string, isAdmin bool) ([]Account, error) {
|
||||
var rows pgx.Rows
|
||||
var err error
|
||||
|
||||
q := `SELECT` + selectColumns + `FROM pop3_accounts`
|
||||
|
||||
if isAdmin {
|
||||
rows, err = s.pool.Query(ctx, q+` ORDER BY id`)
|
||||
} else {
|
||||
rows, err = s.pool.Query(ctx, q+` WHERE owner = $1 ORDER BY id`, owner)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("pop3 store: list: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var accounts []Account
|
||||
for rows.Next() {
|
||||
a, err := scanRow(rows)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("pop3 store: scan: %w", err)
|
||||
}
|
||||
accounts = append(accounts, a)
|
||||
}
|
||||
return accounts, rows.Err()
|
||||
}
|
||||
|
||||
// Get returns a single POP3 account by ID.
|
||||
func (s *Store) Get(ctx context.Context, id int64) (*Account, error) {
|
||||
row := s.pool.QueryRow(ctx,
|
||||
`SELECT`+selectColumns+`FROM pop3_accounts WHERE id = $1`, id)
|
||||
a, err := scanRow(row)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("pop3 store: get %d: %w", id, err)
|
||||
}
|
||||
return &a, nil
|
||||
}
|
||||
|
||||
// GetPassword retrieves and decrypts the stored password for a POP3 account.
|
||||
func (s *Store) GetPassword(ctx context.Context, id int64) (string, error) {
|
||||
var enc []byte
|
||||
err := s.pool.QueryRow(ctx, `SELECT password_enc FROM pop3_accounts WHERE id = $1`, id).Scan(&enc)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("pop3 store: get password: %w", err)
|
||||
}
|
||||
return decryptPassword(enc, s.encKey)
|
||||
}
|
||||
|
||||
// Delete removes a POP3 account by ID.
|
||||
func (s *Store) Delete(ctx context.Context, id int64) error {
|
||||
tag, err := s.pool.Exec(ctx, `DELETE FROM pop3_accounts WHERE id = $1`, id)
|
||||
if err != nil {
|
||||
return fmt.Errorf("pop3 store: delete: %w", err)
|
||||
}
|
||||
if tag.RowsAffected() == 0 {
|
||||
return fmt.Errorf("pop3 store: account %d not found", id)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeleteByOwner removes all POP3 accounts belonging to the given username.
|
||||
func (s *Store) DeleteByOwner(ctx context.Context, username string) (int, error) {
|
||||
tag, err := s.pool.Exec(ctx, `DELETE FROM pop3_accounts WHERE owner = $1`, username)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("pop3 store: delete by owner: %w", err)
|
||||
}
|
||||
return int(tag.RowsAffected()), nil
|
||||
}
|
||||
|
||||
// UpdateStatus updates the import progress and status of an account.
|
||||
func (s *Store) UpdateStatus(ctx context.Context, id int64, status, errMsg string, current, total int) error {
|
||||
_, err := s.pool.Exec(ctx, `
|
||||
UPDATE pop3_accounts
|
||||
SET status = $1, error_msg = $2, progress_current = $3, progress_total = $4
|
||||
WHERE id = $5`, status, errMsg, current, total, id)
|
||||
if err != nil {
|
||||
return fmt.Errorf("pop3 store: update status: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// UpdateDone marks an import as completed, setting status back to idle.
|
||||
func (s *Store) UpdateDone(ctx context.Context, id int64, count int) error {
|
||||
_, err := s.pool.Exec(ctx, `
|
||||
UPDATE pop3_accounts
|
||||
SET status = 'idle', error_msg = '', last_import_at = now(),
|
||||
last_import_count = $1, progress_current = 0, progress_total = 0
|
||||
WHERE id = $2`, count, id)
|
||||
if err != nil {
|
||||
return fmt.Errorf("pop3 store: update done: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// encryptPassword encrypts a plaintext password using AES-256-GCM.
|
||||
func encryptPassword(plaintext string, key [32]byte) ([]byte, error) {
|
||||
block, err := aes.NewCipher(key[:])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
gcm, err := cipher.NewGCM(block)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
nonce := make([]byte, gcm.NonceSize())
|
||||
if _, err := io.ReadFull(rand.Reader, nonce); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return gcm.Seal(nonce, nonce, []byte(plaintext), nil), nil
|
||||
}
|
||||
|
||||
// decryptPassword decrypts a password previously encrypted with encryptPassword.
|
||||
func decryptPassword(ciphertext []byte, key [32]byte) (string, error) {
|
||||
block, err := aes.NewCipher(key[:])
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
gcm, err := cipher.NewGCM(block)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
nonceSize := gcm.NonceSize()
|
||||
if len(ciphertext) < nonceSize {
|
||||
return "", fmt.Errorf("ciphertext too short")
|
||||
}
|
||||
nonce, ct := ciphertext[:nonceSize], ciphertext[nonceSize:]
|
||||
plaintext, err := gcm.Open(nil, nonce, ct, nil)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("decrypt failed: %w", err)
|
||||
}
|
||||
return string(plaintext), nil
|
||||
}
|
||||
Reference in New Issue
Block a user