2bab61209c
Go-Modul in go.mod und allen 45 Go-Dateien umbenannt.
354 lines
9.5 KiB
Go
354 lines
9.5 KiB
Go
// Package smtpd implements an embedded receive-only SMTP daemon for archivmail.
|
||
// It accepts incoming emails (e.g. from Postfix via always_bcc) and hands them
|
||
// off to the storage coordinator. No AUTH, no relay, no outbound mail.
|
||
package smtpd
|
||
|
||
import (
|
||
"bytes"
|
||
"context"
|
||
"crypto/tls"
|
||
"errors"
|
||
"fmt"
|
||
"io"
|
||
"log/slog"
|
||
"net"
|
||
"strings"
|
||
"sync"
|
||
"sync/atomic"
|
||
"time"
|
||
|
||
"github.com/emersion/go-smtp"
|
||
|
||
"archivmail/config"
|
||
"archivmail/internal/storage"
|
||
)
|
||
|
||
// DomainToTenantFunc resolves an e-mail domain to a tenant ID.
|
||
// Returns nil if no tenant matches the domain.
|
||
type DomainToTenantFunc func(ctx context.Context, domain string) (*int64, error)
|
||
|
||
// Stats holds runtime statistics for the SMTP daemon.
|
||
type Stats struct {
|
||
Received atomic.Int64 // total emails successfully stored
|
||
Rejected atomic.Int64 // rejected (IP, size, etc.)
|
||
LastMailAt atomic.Value // time.Time of last accepted mail
|
||
}
|
||
|
||
// IndexCallback is called after a mail is successfully stored, with the raw
|
||
// bytes and the storage ID. Used to submit to the async index worker.
|
||
type IndexCallback func(raw []byte, id string)
|
||
|
||
// Daemon is the embedded receive-only SMTP server.
|
||
type Daemon struct {
|
||
cfg config.SMTPConfig
|
||
store *storage.Store
|
||
logger *slog.Logger
|
||
stats Stats
|
||
server *smtp.Server
|
||
mu sync.Mutex
|
||
running bool
|
||
indexCallback IndexCallback
|
||
domainToTenant DomainToTenantFunc // optional domain→tenant routing
|
||
defaultTenantID *int64 // fallback tenant if no domain matches
|
||
}
|
||
|
||
// New creates a new SMTP Daemon. Call Start() to begin accepting connections.
|
||
func New(cfg config.SMTPConfig, store *storage.Store, logger *slog.Logger) *Daemon {
|
||
d := &Daemon{
|
||
cfg: cfg,
|
||
store: store,
|
||
logger: logger,
|
||
}
|
||
d.stats.LastMailAt.Store(time.Time{})
|
||
return d
|
||
}
|
||
|
||
// SetDomainToTenant wires in the domain→tenant resolution function.
|
||
func (d *Daemon) SetDomainToTenant(fn DomainToTenantFunc, defaultTenantID *int64) {
|
||
d.domainToTenant = fn
|
||
d.defaultTenantID = defaultTenantID
|
||
}
|
||
|
||
// resolveTenantFromRcpts extracts the domain from RCPT TO addresses and
|
||
// resolves it to a tenant ID via the configured DomainToTenantFunc.
|
||
func (d *Daemon) resolveTenantFromRcpts(rcpts []string) *int64 {
|
||
if d.domainToTenant == nil {
|
||
return d.defaultTenantID
|
||
}
|
||
ctx := context.Background()
|
||
for _, rcpt := range rcpts {
|
||
// Strip angle brackets if present
|
||
addr := strings.Trim(rcpt, "<>")
|
||
at := strings.LastIndex(addr, "@")
|
||
if at < 0 {
|
||
continue
|
||
}
|
||
domain := strings.ToLower(addr[at+1:])
|
||
tenantID, err := d.domainToTenant(ctx, domain)
|
||
if err != nil {
|
||
d.logger.Warn("SMTP: tenant lookup failed", "domain", domain, "err", err)
|
||
continue
|
||
}
|
||
if tenantID != nil {
|
||
return tenantID
|
||
}
|
||
}
|
||
return d.defaultTenantID
|
||
}
|
||
|
||
// SetIndexCallback sets the function called after each successfully stored mail.
|
||
func (d *Daemon) SetIndexCallback(cb IndexCallback) {
|
||
d.indexCallback = cb
|
||
}
|
||
|
||
// Start launches the SMTP daemon in a background goroutine.
|
||
// It returns immediately; use Stop() for graceful shutdown.
|
||
func (d *Daemon) Start() error {
|
||
if !d.cfg.Enabled {
|
||
d.logger.Info("SMTP daemon disabled via config")
|
||
return nil
|
||
}
|
||
|
||
bind := d.cfg.Bind
|
||
if bind == "" {
|
||
bind = ":2525"
|
||
}
|
||
domain := d.cfg.Domain
|
||
if domain == "" {
|
||
domain = "archivmail"
|
||
}
|
||
maxBytes := int64(d.cfg.MaxSizeMB) * 1024 * 1024
|
||
if maxBytes <= 0 {
|
||
maxBytes = 50 * 1024 * 1024 // 50 MB default
|
||
}
|
||
|
||
backend := &backend{daemon: d}
|
||
srv := smtp.NewServer(backend)
|
||
srv.Addr = bind
|
||
srv.Domain = domain
|
||
srv.MaxMessageBytes = maxBytes
|
||
srv.ReadTimeout = 5 * time.Minute
|
||
srv.WriteTimeout = 30 * time.Second
|
||
srv.AllowInsecureAuth = false // no AUTH offered at all
|
||
|
||
// TLS / STARTTLS
|
||
if d.cfg.TLSCert != "" && d.cfg.TLSKey != "" {
|
||
cert, err := tls.LoadX509KeyPair(d.cfg.TLSCert, d.cfg.TLSKey)
|
||
if err != nil {
|
||
return fmt.Errorf("smtpd: load TLS cert: %w", err)
|
||
}
|
||
srv.TLSConfig = &tls.Config{Certificates: []tls.Certificate{cert}}
|
||
}
|
||
|
||
d.mu.Lock()
|
||
d.server = srv
|
||
d.running = true
|
||
d.mu.Unlock()
|
||
|
||
go func() {
|
||
d.logger.Info("SMTP daemon starting", "addr", bind, "domain", domain,
|
||
"max_size_mb", d.cfg.MaxSizeMB, "tls", d.cfg.TLSCert != "")
|
||
if err := srv.ListenAndServe(); err != nil {
|
||
if !errors.Is(err, smtp.ErrServerClosed) {
|
||
d.logger.Error("SMTP daemon error", "err", err)
|
||
}
|
||
}
|
||
d.mu.Lock()
|
||
d.running = false
|
||
d.mu.Unlock()
|
||
}()
|
||
|
||
return nil
|
||
}
|
||
|
||
// Stop shuts down the SMTP daemon gracefully.
|
||
func (d *Daemon) Stop() {
|
||
d.mu.Lock()
|
||
srv := d.server
|
||
d.mu.Unlock()
|
||
if srv != nil {
|
||
srv.Close()
|
||
}
|
||
}
|
||
|
||
// Status returns a snapshot of the daemon's current state.
|
||
func (d *Daemon) Status() StatusResponse {
|
||
d.mu.Lock()
|
||
running := d.running
|
||
d.mu.Unlock()
|
||
|
||
lastMail, _ := d.stats.LastMailAt.Load().(time.Time)
|
||
var lastMailStr string
|
||
if !lastMail.IsZero() {
|
||
lastMailStr = lastMail.UTC().Format(time.RFC3339)
|
||
}
|
||
|
||
bind := d.cfg.Bind
|
||
if bind == "" {
|
||
bind = ":2525"
|
||
}
|
||
|
||
return StatusResponse{
|
||
Running: running,
|
||
Enabled: d.cfg.Enabled,
|
||
Bind: bind,
|
||
Domain: d.cfg.Domain,
|
||
TLS: d.cfg.TLSCert != "",
|
||
MaxSizeMB: d.cfg.MaxSizeMB,
|
||
AllowedIPs: d.cfg.AllowedIPs,
|
||
Received: d.stats.Received.Load(),
|
||
Rejected: d.stats.Rejected.Load(),
|
||
LastMailAt: lastMailStr,
|
||
}
|
||
}
|
||
|
||
// StatusResponse is returned by GET /api/admin/smtp/status.
|
||
type StatusResponse struct {
|
||
Running bool `json:"running"`
|
||
Enabled bool `json:"enabled"`
|
||
Bind string `json:"bind"`
|
||
Domain string `json:"domain"`
|
||
TLS bool `json:"tls"`
|
||
MaxSizeMB int `json:"max_size_mb"`
|
||
AllowedIPs []string `json:"allowed_ips"`
|
||
Received int64 `json:"received"`
|
||
Rejected int64 `json:"rejected"`
|
||
LastMailAt string `json:"last_mail_at,omitempty"`
|
||
}
|
||
|
||
// ── go-smtp Backend / Session ─────────────────────────────────────────────
|
||
|
||
type backend struct {
|
||
daemon *Daemon
|
||
}
|
||
|
||
func (b *backend) NewSession(c *smtp.Conn) (smtp.Session, error) {
|
||
remoteIP := extractIP(c.Conn().RemoteAddr().String())
|
||
|
||
if !b.daemon.isAllowed(remoteIP) {
|
||
b.daemon.stats.Rejected.Add(1)
|
||
b.daemon.logger.Warn("SMTP: rejected connection from unlisted IP", "ip", remoteIP)
|
||
return nil, &smtp.SMTPError{
|
||
Code: 554,
|
||
EnhancedCode: smtp.EnhancedCode{5, 7, 1},
|
||
Message: "IP not in allowlist",
|
||
}
|
||
}
|
||
|
||
b.daemon.logger.Debug("SMTP: new session", "ip", remoteIP)
|
||
return &session{
|
||
daemon: b.daemon,
|
||
remoteIP: remoteIP,
|
||
}, nil
|
||
}
|
||
|
||
type session struct {
|
||
daemon *Daemon
|
||
remoteIP string
|
||
from string
|
||
rcpts []string
|
||
}
|
||
|
||
// AuthPlain – never called because server doesn't advertise AUTH.
|
||
func (s *session) AuthPlain(_, _ string) error {
|
||
return smtp.ErrAuthUnsupported
|
||
}
|
||
|
||
func (s *session) Mail(from string, _ *smtp.MailOptions) error {
|
||
s.from = from
|
||
return nil
|
||
}
|
||
|
||
func (s *session) Rcpt(to string, _ *smtp.RcptOptions) error {
|
||
s.rcpts = append(s.rcpts, to)
|
||
return nil
|
||
}
|
||
|
||
func (s *session) Data(r io.Reader) error {
|
||
var buf bytes.Buffer
|
||
if _, err := io.Copy(&buf, r); err != nil {
|
||
s.daemon.stats.Rejected.Add(1)
|
||
return fmt.Errorf("smtpd: read data: %w", err)
|
||
}
|
||
raw := buf.Bytes()
|
||
|
||
// Determine tenant from RCPT TO domain routing
|
||
tenantID := s.daemon.resolveTenantFromRcpts(s.rcpts)
|
||
|
||
id, err := s.daemon.store.Save(context.Background(), raw, time.Now(), tenantID)
|
||
if err != nil {
|
||
s.daemon.stats.Rejected.Add(1)
|
||
// PROJ-29: Quota exceeded — return 452 (Insufficient system storage) so
|
||
// the sending MTA retries later. All other storage errors get 554.
|
||
if errors.Is(err, storage.ErrQuotaExceeded) {
|
||
s.daemon.logger.Warn("SMTP: quota exceeded", "from", s.from, "tenant", tenantID)
|
||
return &smtp.SMTPError{
|
||
Code: 452,
|
||
EnhancedCode: smtp.EnhancedCode{4, 2, 2},
|
||
Message: "Insufficient system storage — quota exceeded",
|
||
}
|
||
}
|
||
s.daemon.logger.Error("SMTP: storage failed", "from", s.from, "err", err)
|
||
return &smtp.SMTPError{
|
||
Code: 554,
|
||
EnhancedCode: smtp.EnhancedCode{4, 6, 0},
|
||
Message: "Storage failure, please retry",
|
||
}
|
||
}
|
||
|
||
s.daemon.stats.Received.Add(1)
|
||
s.daemon.stats.LastMailAt.Store(time.Now())
|
||
s.daemon.logger.Info("SMTP: mail stored", "id", id, "from", s.from,
|
||
"rcpts", strings.Join(s.rcpts, ","), "bytes", len(raw), "ip", s.remoteIP)
|
||
|
||
// Submit to async index worker if callback is set
|
||
if s.daemon.indexCallback != nil {
|
||
s.daemon.indexCallback(raw, id)
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
func (s *session) Reset() {
|
||
s.from = ""
|
||
s.rcpts = nil
|
||
}
|
||
|
||
func (s *session) Logout() error {
|
||
return nil
|
||
}
|
||
|
||
// ── Helpers ───────────────────────────────────────────────────────────────
|
||
|
||
// isAllowed returns true if the IP is in the allowlist.
|
||
// SEC-26: Fail-closed — empty allowlist means NO IP is allowed (was fail-open before).
|
||
// To allow all IPs, set allowed_ips: ["0.0.0.0/0", "::/0"] explicitly in config.
|
||
func (d *Daemon) isAllowed(ip string) bool {
|
||
if len(d.cfg.AllowedIPs) == 0 {
|
||
return false // fail-closed: no IPs configured = block everything
|
||
}
|
||
for _, allowed := range d.cfg.AllowedIPs {
|
||
// Support CIDR notation (e.g. 192.168.1.0/24)
|
||
if strings.Contains(allowed, "/") {
|
||
_, network, err := net.ParseCIDR(allowed)
|
||
if err == nil && network.Contains(net.ParseIP(ip)) {
|
||
return true
|
||
}
|
||
continue
|
||
}
|
||
if allowed == ip {
|
||
return true
|
||
}
|
||
}
|
||
return false
|
||
}
|
||
|
||
// extractIP strips port from "ip:port" or "[::1]:port" strings.
|
||
func extractIP(addr string) string {
|
||
host, _, err := net.SplitHostPort(addr)
|
||
if err != nil {
|
||
return addr
|
||
}
|
||
return host
|
||
}
|