Files
archivmail/internal/smtpd/smtpd.go
T
sysops 2bab61209c chore: Modulname github.com/archivmail → archivmail
Go-Modul in go.mod und allen 45 Go-Dateien umbenannt.
2026-04-05 20:37:35 +02:00

354 lines
9.5 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// Package smtpd implements an embedded receive-only SMTP daemon for archivmail.
// It accepts incoming emails (e.g. from Postfix via always_bcc) and hands them
// off to the storage coordinator. No AUTH, no relay, no outbound mail.
package smtpd
import (
"bytes"
"context"
"crypto/tls"
"errors"
"fmt"
"io"
"log/slog"
"net"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/emersion/go-smtp"
"archivmail/config"
"archivmail/internal/storage"
)
// DomainToTenantFunc resolves an e-mail domain to a tenant ID.
// Returns nil if no tenant matches the domain.
type DomainToTenantFunc func(ctx context.Context, domain string) (*int64, error)
// Stats holds runtime statistics for the SMTP daemon.
type Stats struct {
Received atomic.Int64 // total emails successfully stored
Rejected atomic.Int64 // rejected (IP, size, etc.)
LastMailAt atomic.Value // time.Time of last accepted mail
}
// IndexCallback is called after a mail is successfully stored, with the raw
// bytes and the storage ID. Used to submit to the async index worker.
type IndexCallback func(raw []byte, id string)
// Daemon is the embedded receive-only SMTP server.
type Daemon struct {
cfg config.SMTPConfig
store *storage.Store
logger *slog.Logger
stats Stats
server *smtp.Server
mu sync.Mutex
running bool
indexCallback IndexCallback
domainToTenant DomainToTenantFunc // optional domain→tenant routing
defaultTenantID *int64 // fallback tenant if no domain matches
}
// New creates a new SMTP Daemon. Call Start() to begin accepting connections.
func New(cfg config.SMTPConfig, store *storage.Store, logger *slog.Logger) *Daemon {
d := &Daemon{
cfg: cfg,
store: store,
logger: logger,
}
d.stats.LastMailAt.Store(time.Time{})
return d
}
// SetDomainToTenant wires in the domain→tenant resolution function.
func (d *Daemon) SetDomainToTenant(fn DomainToTenantFunc, defaultTenantID *int64) {
d.domainToTenant = fn
d.defaultTenantID = defaultTenantID
}
// resolveTenantFromRcpts extracts the domain from RCPT TO addresses and
// resolves it to a tenant ID via the configured DomainToTenantFunc.
func (d *Daemon) resolveTenantFromRcpts(rcpts []string) *int64 {
if d.domainToTenant == nil {
return d.defaultTenantID
}
ctx := context.Background()
for _, rcpt := range rcpts {
// Strip angle brackets if present
addr := strings.Trim(rcpt, "<>")
at := strings.LastIndex(addr, "@")
if at < 0 {
continue
}
domain := strings.ToLower(addr[at+1:])
tenantID, err := d.domainToTenant(ctx, domain)
if err != nil {
d.logger.Warn("SMTP: tenant lookup failed", "domain", domain, "err", err)
continue
}
if tenantID != nil {
return tenantID
}
}
return d.defaultTenantID
}
// SetIndexCallback sets the function called after each successfully stored mail.
func (d *Daemon) SetIndexCallback(cb IndexCallback) {
d.indexCallback = cb
}
// Start launches the SMTP daemon in a background goroutine.
// It returns immediately; use Stop() for graceful shutdown.
func (d *Daemon) Start() error {
if !d.cfg.Enabled {
d.logger.Info("SMTP daemon disabled via config")
return nil
}
bind := d.cfg.Bind
if bind == "" {
bind = ":2525"
}
domain := d.cfg.Domain
if domain == "" {
domain = "archivmail"
}
maxBytes := int64(d.cfg.MaxSizeMB) * 1024 * 1024
if maxBytes <= 0 {
maxBytes = 50 * 1024 * 1024 // 50 MB default
}
backend := &backend{daemon: d}
srv := smtp.NewServer(backend)
srv.Addr = bind
srv.Domain = domain
srv.MaxMessageBytes = maxBytes
srv.ReadTimeout = 5 * time.Minute
srv.WriteTimeout = 30 * time.Second
srv.AllowInsecureAuth = false // no AUTH offered at all
// TLS / STARTTLS
if d.cfg.TLSCert != "" && d.cfg.TLSKey != "" {
cert, err := tls.LoadX509KeyPair(d.cfg.TLSCert, d.cfg.TLSKey)
if err != nil {
return fmt.Errorf("smtpd: load TLS cert: %w", err)
}
srv.TLSConfig = &tls.Config{Certificates: []tls.Certificate{cert}}
}
d.mu.Lock()
d.server = srv
d.running = true
d.mu.Unlock()
go func() {
d.logger.Info("SMTP daemon starting", "addr", bind, "domain", domain,
"max_size_mb", d.cfg.MaxSizeMB, "tls", d.cfg.TLSCert != "")
if err := srv.ListenAndServe(); err != nil {
if !errors.Is(err, smtp.ErrServerClosed) {
d.logger.Error("SMTP daemon error", "err", err)
}
}
d.mu.Lock()
d.running = false
d.mu.Unlock()
}()
return nil
}
// Stop shuts down the SMTP daemon gracefully.
func (d *Daemon) Stop() {
d.mu.Lock()
srv := d.server
d.mu.Unlock()
if srv != nil {
srv.Close()
}
}
// Status returns a snapshot of the daemon's current state.
func (d *Daemon) Status() StatusResponse {
d.mu.Lock()
running := d.running
d.mu.Unlock()
lastMail, _ := d.stats.LastMailAt.Load().(time.Time)
var lastMailStr string
if !lastMail.IsZero() {
lastMailStr = lastMail.UTC().Format(time.RFC3339)
}
bind := d.cfg.Bind
if bind == "" {
bind = ":2525"
}
return StatusResponse{
Running: running,
Enabled: d.cfg.Enabled,
Bind: bind,
Domain: d.cfg.Domain,
TLS: d.cfg.TLSCert != "",
MaxSizeMB: d.cfg.MaxSizeMB,
AllowedIPs: d.cfg.AllowedIPs,
Received: d.stats.Received.Load(),
Rejected: d.stats.Rejected.Load(),
LastMailAt: lastMailStr,
}
}
// StatusResponse is returned by GET /api/admin/smtp/status.
type StatusResponse struct {
Running bool `json:"running"`
Enabled bool `json:"enabled"`
Bind string `json:"bind"`
Domain string `json:"domain"`
TLS bool `json:"tls"`
MaxSizeMB int `json:"max_size_mb"`
AllowedIPs []string `json:"allowed_ips"`
Received int64 `json:"received"`
Rejected int64 `json:"rejected"`
LastMailAt string `json:"last_mail_at,omitempty"`
}
// ── go-smtp Backend / Session ─────────────────────────────────────────────
type backend struct {
daemon *Daemon
}
func (b *backend) NewSession(c *smtp.Conn) (smtp.Session, error) {
remoteIP := extractIP(c.Conn().RemoteAddr().String())
if !b.daemon.isAllowed(remoteIP) {
b.daemon.stats.Rejected.Add(1)
b.daemon.logger.Warn("SMTP: rejected connection from unlisted IP", "ip", remoteIP)
return nil, &smtp.SMTPError{
Code: 554,
EnhancedCode: smtp.EnhancedCode{5, 7, 1},
Message: "IP not in allowlist",
}
}
b.daemon.logger.Debug("SMTP: new session", "ip", remoteIP)
return &session{
daemon: b.daemon,
remoteIP: remoteIP,
}, nil
}
type session struct {
daemon *Daemon
remoteIP string
from string
rcpts []string
}
// AuthPlain never called because server doesn't advertise AUTH.
func (s *session) AuthPlain(_, _ string) error {
return smtp.ErrAuthUnsupported
}
func (s *session) Mail(from string, _ *smtp.MailOptions) error {
s.from = from
return nil
}
func (s *session) Rcpt(to string, _ *smtp.RcptOptions) error {
s.rcpts = append(s.rcpts, to)
return nil
}
func (s *session) Data(r io.Reader) error {
var buf bytes.Buffer
if _, err := io.Copy(&buf, r); err != nil {
s.daemon.stats.Rejected.Add(1)
return fmt.Errorf("smtpd: read data: %w", err)
}
raw := buf.Bytes()
// Determine tenant from RCPT TO domain routing
tenantID := s.daemon.resolveTenantFromRcpts(s.rcpts)
id, err := s.daemon.store.Save(context.Background(), raw, time.Now(), tenantID)
if err != nil {
s.daemon.stats.Rejected.Add(1)
// PROJ-29: Quota exceeded — return 452 (Insufficient system storage) so
// the sending MTA retries later. All other storage errors get 554.
if errors.Is(err, storage.ErrQuotaExceeded) {
s.daemon.logger.Warn("SMTP: quota exceeded", "from", s.from, "tenant", tenantID)
return &smtp.SMTPError{
Code: 452,
EnhancedCode: smtp.EnhancedCode{4, 2, 2},
Message: "Insufficient system storage — quota exceeded",
}
}
s.daemon.logger.Error("SMTP: storage failed", "from", s.from, "err", err)
return &smtp.SMTPError{
Code: 554,
EnhancedCode: smtp.EnhancedCode{4, 6, 0},
Message: "Storage failure, please retry",
}
}
s.daemon.stats.Received.Add(1)
s.daemon.stats.LastMailAt.Store(time.Now())
s.daemon.logger.Info("SMTP: mail stored", "id", id, "from", s.from,
"rcpts", strings.Join(s.rcpts, ","), "bytes", len(raw), "ip", s.remoteIP)
// Submit to async index worker if callback is set
if s.daemon.indexCallback != nil {
s.daemon.indexCallback(raw, id)
}
return nil
}
func (s *session) Reset() {
s.from = ""
s.rcpts = nil
}
func (s *session) Logout() error {
return nil
}
// ── Helpers ───────────────────────────────────────────────────────────────
// isAllowed returns true if the IP is in the allowlist.
// SEC-26: Fail-closed — empty allowlist means NO IP is allowed (was fail-open before).
// To allow all IPs, set allowed_ips: ["0.0.0.0/0", "::/0"] explicitly in config.
func (d *Daemon) isAllowed(ip string) bool {
if len(d.cfg.AllowedIPs) == 0 {
return false // fail-closed: no IPs configured = block everything
}
for _, allowed := range d.cfg.AllowedIPs {
// Support CIDR notation (e.g. 192.168.1.0/24)
if strings.Contains(allowed, "/") {
_, network, err := net.ParseCIDR(allowed)
if err == nil && network.Contains(net.ParseIP(ip)) {
return true
}
continue
}
if allowed == ip {
return true
}
}
return false
}
// extractIP strips port from "ip:port" or "[::1]:port" strings.
func extractIP(addr string) string {
host, _, err := net.SplitHostPort(addr)
if err != nil {
return addr
}
return host
}