7e68c7ab02
- Storage: AES-256-GCM Verschlüsselung (keyfile, graceful fallback bei fehlendem Key) - Storage: PostgreSQL emails-Tabelle mit Auto-Migration - Storage: Save/Delete/Stats/FirstAndLastMail nutzen DB wenn verfügbar - Index: Async IndexWorker (Go-Channel, Queue 1000, non-blocking Submit) - SMTP: IndexCallback für async Indexierung nach Mail-Eingang - main: Backfill beim Start (40 Mails migriert + indexiert) - Bestehende Mails werden transparent entschlüsselt (Fallback auf Raw) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
300 lines
7.5 KiB
Go
300 lines
7.5 KiB
Go
// Package smtpd implements an embedded receive-only SMTP daemon for archivmail.
|
||
// It accepts incoming emails (e.g. from Postfix via always_bcc) and hands them
|
||
// off to the storage coordinator. No AUTH, no relay, no outbound mail.
|
||
package smtpd
|
||
|
||
import (
|
||
"bytes"
|
||
"crypto/tls"
|
||
"errors"
|
||
"fmt"
|
||
"io"
|
||
"log/slog"
|
||
"net"
|
||
"strings"
|
||
"sync"
|
||
"sync/atomic"
|
||
"time"
|
||
|
||
"github.com/emersion/go-smtp"
|
||
|
||
"github.com/archivmail/config"
|
||
"github.com/archivmail/internal/storage"
|
||
)
|
||
|
||
// Stats holds runtime statistics for the SMTP daemon.
|
||
type Stats struct {
|
||
Received atomic.Int64 // total emails successfully stored
|
||
Rejected atomic.Int64 // rejected (IP, size, etc.)
|
||
LastMailAt atomic.Value // time.Time of last accepted mail
|
||
}
|
||
|
||
// IndexCallback is called after a mail is successfully stored, with the raw
|
||
// bytes and the storage ID. Used to submit to the async index worker.
|
||
type IndexCallback func(raw []byte, id string)
|
||
|
||
// Daemon is the embedded receive-only SMTP server.
|
||
type Daemon struct {
|
||
cfg config.SMTPConfig
|
||
store *storage.Store
|
||
logger *slog.Logger
|
||
stats Stats
|
||
server *smtp.Server
|
||
mu sync.Mutex
|
||
running bool
|
||
indexCallback IndexCallback
|
||
}
|
||
|
||
// New creates a new SMTP Daemon. Call Start() to begin accepting connections.
|
||
func New(cfg config.SMTPConfig, store *storage.Store, logger *slog.Logger) *Daemon {
|
||
d := &Daemon{
|
||
cfg: cfg,
|
||
store: store,
|
||
logger: logger,
|
||
}
|
||
d.stats.LastMailAt.Store(time.Time{})
|
||
return d
|
||
}
|
||
|
||
// SetIndexCallback sets the function called after each successfully stored mail.
|
||
func (d *Daemon) SetIndexCallback(cb IndexCallback) {
|
||
d.indexCallback = cb
|
||
}
|
||
|
||
// Start launches the SMTP daemon in a background goroutine.
|
||
// It returns immediately; use Stop() for graceful shutdown.
|
||
func (d *Daemon) Start() error {
|
||
if !d.cfg.Enabled {
|
||
d.logger.Info("SMTP daemon disabled via config")
|
||
return nil
|
||
}
|
||
|
||
bind := d.cfg.Bind
|
||
if bind == "" {
|
||
bind = ":2525"
|
||
}
|
||
domain := d.cfg.Domain
|
||
if domain == "" {
|
||
domain = "archivmail"
|
||
}
|
||
maxBytes := int64(d.cfg.MaxSizeMB) * 1024 * 1024
|
||
if maxBytes <= 0 {
|
||
maxBytes = 50 * 1024 * 1024 // 50 MB default
|
||
}
|
||
|
||
backend := &backend{daemon: d}
|
||
srv := smtp.NewServer(backend)
|
||
srv.Addr = bind
|
||
srv.Domain = domain
|
||
srv.MaxMessageBytes = maxBytes
|
||
srv.ReadTimeout = 5 * time.Minute
|
||
srv.WriteTimeout = 30 * time.Second
|
||
srv.AllowInsecureAuth = false // no AUTH offered at all
|
||
|
||
// TLS / STARTTLS
|
||
if d.cfg.TLSCert != "" && d.cfg.TLSKey != "" {
|
||
cert, err := tls.LoadX509KeyPair(d.cfg.TLSCert, d.cfg.TLSKey)
|
||
if err != nil {
|
||
return fmt.Errorf("smtpd: load TLS cert: %w", err)
|
||
}
|
||
srv.TLSConfig = &tls.Config{Certificates: []tls.Certificate{cert}}
|
||
}
|
||
|
||
d.mu.Lock()
|
||
d.server = srv
|
||
d.running = true
|
||
d.mu.Unlock()
|
||
|
||
go func() {
|
||
d.logger.Info("SMTP daemon starting", "addr", bind, "domain", domain,
|
||
"max_size_mb", d.cfg.MaxSizeMB, "tls", d.cfg.TLSCert != "")
|
||
if err := srv.ListenAndServe(); err != nil {
|
||
if !errors.Is(err, smtp.ErrServerClosed) {
|
||
d.logger.Error("SMTP daemon error", "err", err)
|
||
}
|
||
}
|
||
d.mu.Lock()
|
||
d.running = false
|
||
d.mu.Unlock()
|
||
}()
|
||
|
||
return nil
|
||
}
|
||
|
||
// Stop shuts down the SMTP daemon gracefully.
|
||
func (d *Daemon) Stop() {
|
||
d.mu.Lock()
|
||
srv := d.server
|
||
d.mu.Unlock()
|
||
if srv != nil {
|
||
srv.Close()
|
||
}
|
||
}
|
||
|
||
// Status returns a snapshot of the daemon's current state.
|
||
func (d *Daemon) Status() StatusResponse {
|
||
d.mu.Lock()
|
||
running := d.running
|
||
d.mu.Unlock()
|
||
|
||
lastMail, _ := d.stats.LastMailAt.Load().(time.Time)
|
||
var lastMailStr string
|
||
if !lastMail.IsZero() {
|
||
lastMailStr = lastMail.UTC().Format(time.RFC3339)
|
||
}
|
||
|
||
bind := d.cfg.Bind
|
||
if bind == "" {
|
||
bind = ":2525"
|
||
}
|
||
|
||
return StatusResponse{
|
||
Running: running,
|
||
Enabled: d.cfg.Enabled,
|
||
Bind: bind,
|
||
Domain: d.cfg.Domain,
|
||
TLS: d.cfg.TLSCert != "",
|
||
MaxSizeMB: d.cfg.MaxSizeMB,
|
||
AllowedIPs: d.cfg.AllowedIPs,
|
||
Received: d.stats.Received.Load(),
|
||
Rejected: d.stats.Rejected.Load(),
|
||
LastMailAt: lastMailStr,
|
||
}
|
||
}
|
||
|
||
// StatusResponse is returned by GET /api/admin/smtp/status.
|
||
type StatusResponse struct {
|
||
Running bool `json:"running"`
|
||
Enabled bool `json:"enabled"`
|
||
Bind string `json:"bind"`
|
||
Domain string `json:"domain"`
|
||
TLS bool `json:"tls"`
|
||
MaxSizeMB int `json:"max_size_mb"`
|
||
AllowedIPs []string `json:"allowed_ips"`
|
||
Received int64 `json:"received"`
|
||
Rejected int64 `json:"rejected"`
|
||
LastMailAt string `json:"last_mail_at,omitempty"`
|
||
}
|
||
|
||
// ── go-smtp Backend / Session ─────────────────────────────────────────────
|
||
|
||
type backend struct {
|
||
daemon *Daemon
|
||
}
|
||
|
||
func (b *backend) NewSession(c *smtp.Conn) (smtp.Session, error) {
|
||
remoteIP := extractIP(c.Conn().RemoteAddr().String())
|
||
|
||
if !b.daemon.isAllowed(remoteIP) {
|
||
b.daemon.stats.Rejected.Add(1)
|
||
b.daemon.logger.Warn("SMTP: rejected connection from unlisted IP", "ip", remoteIP)
|
||
return nil, &smtp.SMTPError{
|
||
Code: 554,
|
||
EnhancedCode: smtp.EnhancedCode{5, 7, 1},
|
||
Message: "IP not in allowlist",
|
||
}
|
||
}
|
||
|
||
b.daemon.logger.Debug("SMTP: new session", "ip", remoteIP)
|
||
return &session{
|
||
daemon: b.daemon,
|
||
remoteIP: remoteIP,
|
||
}, nil
|
||
}
|
||
|
||
type session struct {
|
||
daemon *Daemon
|
||
remoteIP string
|
||
from string
|
||
rcpts []string
|
||
}
|
||
|
||
// AuthPlain – never called because server doesn't advertise AUTH.
|
||
func (s *session) AuthPlain(_, _ string) error {
|
||
return smtp.ErrAuthUnsupported
|
||
}
|
||
|
||
func (s *session) Mail(from string, _ *smtp.MailOptions) error {
|
||
s.from = from
|
||
return nil
|
||
}
|
||
|
||
func (s *session) Rcpt(to string, _ *smtp.RcptOptions) error {
|
||
s.rcpts = append(s.rcpts, to)
|
||
return nil
|
||
}
|
||
|
||
func (s *session) Data(r io.Reader) error {
|
||
var buf bytes.Buffer
|
||
if _, err := io.Copy(&buf, r); err != nil {
|
||
s.daemon.stats.Rejected.Add(1)
|
||
return fmt.Errorf("smtpd: read data: %w", err)
|
||
}
|
||
raw := buf.Bytes()
|
||
|
||
id, err := s.daemon.store.Save(raw, time.Now())
|
||
if err != nil {
|
||
s.daemon.stats.Rejected.Add(1)
|
||
s.daemon.logger.Error("SMTP: storage failed", "from", s.from, "err", err)
|
||
return &smtp.SMTPError{
|
||
Code: 554,
|
||
EnhancedCode: smtp.EnhancedCode{4, 6, 0},
|
||
Message: "Storage failure, please retry",
|
||
}
|
||
}
|
||
|
||
s.daemon.stats.Received.Add(1)
|
||
s.daemon.stats.LastMailAt.Store(time.Now())
|
||
s.daemon.logger.Info("SMTP: mail stored", "id", id, "from", s.from,
|
||
"rcpts", strings.Join(s.rcpts, ","), "bytes", len(raw), "ip", s.remoteIP)
|
||
|
||
// Submit to async index worker if callback is set
|
||
if s.daemon.indexCallback != nil {
|
||
s.daemon.indexCallback(raw, id)
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
func (s *session) Reset() {
|
||
s.from = ""
|
||
s.rcpts = nil
|
||
}
|
||
|
||
func (s *session) Logout() error {
|
||
return nil
|
||
}
|
||
|
||
// ── Helpers ───────────────────────────────────────────────────────────────
|
||
|
||
// isAllowed returns true if the IP is in the allowlist, or if the allowlist
|
||
// is empty (allow-all mode for development).
|
||
func (d *Daemon) isAllowed(ip string) bool {
|
||
if len(d.cfg.AllowedIPs) == 0 {
|
||
return true // no restriction configured
|
||
}
|
||
for _, allowed := range d.cfg.AllowedIPs {
|
||
// Support CIDR notation (e.g. 192.168.1.0/24)
|
||
if strings.Contains(allowed, "/") {
|
||
_, network, err := net.ParseCIDR(allowed)
|
||
if err == nil && network.Contains(net.ParseIP(ip)) {
|
||
return true
|
||
}
|
||
continue
|
||
}
|
||
if allowed == ip {
|
||
return true
|
||
}
|
||
}
|
||
return false
|
||
}
|
||
|
||
// extractIP strips port from "ip:port" or "[::1]:port" strings.
|
||
func extractIP(addr string) string {
|
||
host, _, err := net.SplitHostPort(addr)
|
||
if err != nil {
|
||
return addr
|
||
}
|
||
return host
|
||
}
|