feat(PROJ-26,PROJ-38): IMAP LDAP-Auth + Mail-Threading
This commit is contained in:
+154
-5
@@ -265,6 +265,15 @@ func (s *Store) initSchema(ctx context.Context) error {
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS idx_email_attachments_email ON email_attachments (email_id);
|
||||
`)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// PROJ-38: Mail-Threading
|
||||
_, err = s.db.Exec(ctx, `
|
||||
ALTER TABLE emails ADD COLUMN IF NOT EXISTS thread_id TEXT;
|
||||
ALTER TABLE emails ADD COLUMN IF NOT EXISTS in_reply_to TEXT;
|
||||
CREATE INDEX IF NOT EXISTS idx_emails_thread ON emails (thread_id);
|
||||
`)
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -367,7 +376,12 @@ func (s *Store) Save(ctx context.Context, raw []byte, _ time.Time, tenantID *int
|
||||
}
|
||||
|
||||
if parseErr == nil {
|
||||
if err := s.insertMeta(ctx, id, pm, len(raw), tenantID, storageID); err != nil {
|
||||
// PROJ-38: resolve thread before inserting
|
||||
if pm.InReplyTo != "" || len(pm.References) > 0 {
|
||||
pm.MessageID = pm.MessageID // no-op; thread resolved inside insertMeta
|
||||
}
|
||||
threadID := s.resolveThreadID(ctx, pm)
|
||||
if err := s.insertMeta(ctx, id, pm, len(raw), tenantID, storageID, threadID); err != nil {
|
||||
// Race: another goroutine inserted via Message-ID UNIQUE conflict.
|
||||
// Resolve to the existing record's ID.
|
||||
if messageID != "" {
|
||||
@@ -421,6 +435,133 @@ func (s *Store) Save(ctx context.Context, raw []byte, _ time.Time, tenantID *int
|
||||
return id, nil
|
||||
}
|
||||
|
||||
// resolveThreadID determines the thread_id for a new mail by checking its
|
||||
// References and In-Reply-To headers against existing mails in the DB.
|
||||
// Returns the inherited thread_id, or the mail's own MessageID if it starts
|
||||
// a new thread, or "" if no MessageID is available.
|
||||
func (s *Store) resolveThreadID(ctx context.Context, pm *mailparser.ParsedMail) string {
|
||||
if s.db == nil {
|
||||
return pm.MessageID
|
||||
}
|
||||
// Check References in order (oldest first per RFC 5322 §3.6.4).
|
||||
// The first reference that has a known thread_id wins.
|
||||
for _, ref := range pm.References {
|
||||
var tid string
|
||||
err := s.db.QueryRow(ctx,
|
||||
`SELECT COALESCE(thread_id, message_id) FROM emails WHERE message_id = $1 LIMIT 1`, ref,
|
||||
).Scan(&tid)
|
||||
if err == nil && tid != "" {
|
||||
return tid
|
||||
}
|
||||
}
|
||||
// Fall back to In-Reply-To
|
||||
if pm.InReplyTo != "" {
|
||||
var tid string
|
||||
err := s.db.QueryRow(ctx,
|
||||
`SELECT COALESCE(thread_id, message_id) FROM emails WHERE message_id = $1 LIMIT 1`, pm.InReplyTo,
|
||||
).Scan(&tid)
|
||||
if err == nil && tid != "" {
|
||||
return tid
|
||||
}
|
||||
}
|
||||
// New thread — use own Message-ID as root
|
||||
return pm.MessageID
|
||||
}
|
||||
|
||||
// ThreadInfo holds thread metadata for a single mail.
|
||||
type ThreadInfo struct {
|
||||
ThreadID string
|
||||
ThreadSize int
|
||||
}
|
||||
|
||||
// GetThreadInfo returns thread_id and thread size for a batch of email IDs.
|
||||
// Only mails that belong to a thread (non-NULL thread_id) are enriched.
|
||||
func (s *Store) GetThreadInfo(ctx context.Context, ids []string) (map[string]ThreadInfo, error) {
|
||||
if s.db == nil || len(ids) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
// Step 1: get thread_ids for the given email IDs
|
||||
rows, err := s.db.Query(ctx,
|
||||
`SELECT id, thread_id FROM emails WHERE id = ANY($1) AND thread_id IS NOT NULL`, ids,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
threadByEmail := map[string]string{}
|
||||
var threadIDs []string
|
||||
for rows.Next() {
|
||||
var emailID, threadID string
|
||||
if err := rows.Scan(&emailID, &threadID); err == nil {
|
||||
threadByEmail[emailID] = threadID
|
||||
threadIDs = append(threadIDs, threadID)
|
||||
}
|
||||
}
|
||||
rows.Close()
|
||||
|
||||
if len(threadIDs) == 0 {
|
||||
return map[string]ThreadInfo{}, nil
|
||||
}
|
||||
|
||||
// Step 2: count mails per thread
|
||||
rows2, err := s.db.Query(ctx,
|
||||
`SELECT thread_id, COUNT(*) FROM emails WHERE thread_id = ANY($1) GROUP BY thread_id`, threadIDs,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows2.Close()
|
||||
|
||||
threadSize := map[string]int{}
|
||||
for rows2.Next() {
|
||||
var tid string
|
||||
var cnt int
|
||||
if err := rows2.Scan(&tid, &cnt); err == nil {
|
||||
threadSize[tid] = cnt
|
||||
}
|
||||
}
|
||||
|
||||
result := make(map[string]ThreadInfo, len(ids))
|
||||
for emailID, tid := range threadByEmail {
|
||||
result[emailID] = ThreadInfo{ThreadID: tid, ThreadSize: threadSize[tid]}
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// GetMailsByThread returns all email IDs in a thread, ordered by date ascending.
|
||||
func (s *Store) GetMailsByThread(ctx context.Context, threadID string, tenantID *int64) ([]string, error) {
|
||||
if s.db == nil {
|
||||
return nil, nil
|
||||
}
|
||||
var rows pgx.Rows
|
||||
var err error
|
||||
if tenantID != nil {
|
||||
rows, err = s.db.Query(ctx, `
|
||||
SELECT e.id FROM emails e
|
||||
JOIN email_refs r ON r.email_id = e.id AND r.tenant_id = $2
|
||||
WHERE e.thread_id = $1
|
||||
ORDER BY e.received_at ASC
|
||||
`, threadID, *tenantID)
|
||||
} else {
|
||||
rows, err = s.db.Query(ctx, `
|
||||
SELECT id FROM emails WHERE thread_id = $1 ORDER BY received_at ASC
|
||||
`, threadID)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
var ids []string
|
||||
for rows.Next() {
|
||||
var id string
|
||||
if err := rows.Scan(&id); err == nil {
|
||||
ids = append(ids, id)
|
||||
}
|
||||
}
|
||||
return ids, nil
|
||||
}
|
||||
|
||||
// lookupByMessageID returns the email ID for a given Message-ID header value,
|
||||
// or an empty string if not found. Returns an error only on unexpected DB failures.
|
||||
func (s *Store) lookupByMessageID(ctx context.Context, messageID string) (string, error) {
|
||||
@@ -636,7 +777,7 @@ func (s *Store) firstAndLastFromFS() (first, last *MailRef, err error) {
|
||||
|
||||
// insertMeta inserts parsed email metadata into the emails table.
|
||||
// Returns an error so the caller can detect UNIQUE-constraint conflicts on message_id.
|
||||
func (s *Store) insertMeta(ctx context.Context, id string, pm *mailparser.ParsedMail, size int, tenantID *int64, storageID *int64) error {
|
||||
func (s *Store) insertMeta(ctx context.Context, id string, pm *mailparser.ParsedMail, size int, tenantID *int64, storageID *int64, threadID string) error {
|
||||
mailTo := strings.Join(pm.To, ", ")
|
||||
hasAttach := len(pm.Attachments) > 0
|
||||
|
||||
@@ -644,16 +785,24 @@ func (s *Store) insertMeta(ctx context.Context, id string, pm *mailparser.Parsed
|
||||
if pm.MessageID != "" {
|
||||
msgID = &pm.MessageID
|
||||
}
|
||||
var tid *string
|
||||
if threadID != "" {
|
||||
tid = &threadID
|
||||
}
|
||||
var inReplyTo *string
|
||||
if pm.InReplyTo != "" {
|
||||
inReplyTo = &pm.InReplyTo
|
||||
}
|
||||
|
||||
receivedAt := pm.Date
|
||||
if receivedAt.IsZero() {
|
||||
receivedAt = time.Now()
|
||||
}
|
||||
_, err := s.db.Exec(ctx, `
|
||||
INSERT INTO emails (id, received_at, mail_from, mail_to, subject, size_bytes, has_attach, tenant_id, message_id, storage_id)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
|
||||
INSERT INTO emails (id, received_at, mail_from, mail_to, subject, size_bytes, has_attach, tenant_id, message_id, storage_id, thread_id, in_reply_to)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
|
||||
ON CONFLICT (id) DO NOTHING
|
||||
`, id, receivedAt, pm.From, mailTo, pm.Subject, int64(size), hasAttach, tenantID, msgID, storageID)
|
||||
`, id, receivedAt, pm.From, mailTo, pm.Subject, int64(size), hasAttach, tenantID, msgID, storageID, tid, inReplyTo)
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user