Files
sysops 1fedd683e0 Initial commit – TimeMaster Zeiterfassung & HR-Tool
Stand: agent-06 (Audit-Log), agent-05 (Krankmeldung), agent-07 Phase 1 (Personalnummer),
Busylight-Pull-Integration, TOTP/2FA, Abwesenheiten, Zeiterfassung, Kiosk-Grundgerüst.
Migrations 0001–0023 deployed auf 192.168.1.137 + .164.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-23 20:03:27 +02:00

333 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""LDAP integration service.
Supports ActiveDirectory and OpenLDAP via ldap3 (pure Python).
Bind passwords are stored Fernet-encrypted using the app SECRET_KEY.
"""
import base64
import hashlib
import logging
import uuid
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Any
from fastapi import HTTPException
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.config import settings
from app.models.ldap_config import LdapConfig
from app.models.user import AuthProvider, User, UserRole
logger = logging.getLogger(__name__)
def _fernet():
from cryptography.fernet import Fernet
key = hashlib.sha256(settings.secret_key.encode()).digest()
return Fernet(base64.urlsafe_b64encode(key))
def encrypt_password(plain: str) -> str:
return _fernet().encrypt(plain.encode()).decode()
def decrypt_password(encrypted: str) -> str:
return _fernet().decrypt(encrypted.encode()).decode()
@dataclass
class ConnectionResult:
success: bool
message: str
@dataclass
class SyncResult:
created: int
updated: int
deactivated: int
errors: list[str]
def _get_attr(entry_attrs: dict, attr_name: str) -> str:
"""Safely extract a string attribute value from ldap3 entry attributes."""
val = entry_attrs.get(attr_name)
if not val:
return ""
if isinstance(val, list):
return str(val[0]) if val else ""
return str(val)
class LdapService:
def _build_server(self, config: LdapConfig):
from ldap3 import Server, Tls
import ssl
tls = None
if config.use_tls:
if config.tls_verify:
tls = Tls(validate=ssl.CERT_REQUIRED)
else:
logger.warning(
"LDAP TLS certificate validation is DISABLED for host %s (company_id=%s). "
"Set tls_verify=True in production to prevent MITM attacks.",
config.host,
config.company_id,
)
tls = Tls(validate=ssl.CERT_NONE)
return Server(
config.host,
port=config.port,
use_ssl=config.use_ssl,
tls=tls,
get_info="ALL",
connect_timeout=5,
)
def _bind_connection(self, config: LdapConfig, dn: str | None = None, password: str | None = None):
"""Return an authenticated ldap3 Connection (simple bind)."""
from ldap3 import Connection, SIMPLE, SYNC
bind_dn = dn or config.bind_dn
bind_pw = password or decrypt_password(config.bind_password_encrypted)
server = self._build_server(config)
conn = Connection(
server,
user=bind_dn,
password=bind_pw,
authentication=SIMPLE,
client_strategy=SYNC,
auto_bind="NO_TLS",
raise_exceptions=False,
)
if not conn.bind():
return None
return conn
# ── Public API ────────────────────────────────────────────────────────────
async def get_config(self, company_id: uuid.UUID, db: AsyncSession) -> LdapConfig | None:
return await db.scalar(
select(LdapConfig).where(LdapConfig.company_id == company_id)
)
async def get_config_or_404(self, company_id: uuid.UUID, db: AsyncSession) -> LdapConfig:
cfg = await self.get_config(company_id, db)
if not cfg:
raise HTTPException(status_code=404, detail="LDAP configuration not found")
return cfg
def test_connection(self, config: LdapConfig) -> ConnectionResult:
try:
conn = self._bind_connection(config)
if conn is None:
return ConnectionResult(success=False, message="Bind fehlgeschlagen DN oder Passwort falsch")
conn.unbind()
return ConnectionResult(success=True, message="Verbindung erfolgreich")
except Exception as exc:
return ConnectionResult(success=False, message=str(exc))
def search_users(self, config: LdapConfig) -> list[dict[str, Any]]:
"""Return raw list of user dicts from LDAP directory."""
from ldap3 import SUBTREE
conn = self._bind_connection(config)
if conn is None:
raise HTTPException(status_code=502, detail="LDAP bind fehlgeschlagen")
attrs = [
config.attr_email,
config.attr_firstname,
config.attr_lastname,
config.attr_username,
]
if config.attr_department:
attrs.append(config.attr_department)
conn.search(
search_base=config.base_dn,
search_filter=config.user_search_filter,
search_scope=SUBTREE,
attributes=attrs,
)
results = []
for entry in conn.entries:
raw = {a: entry[a].value for a in attrs if a in entry}
raw["dn"] = entry.entry_dn
results.append(raw)
conn.unbind()
return results
async def sync_users(
self,
config: LdapConfig,
db: AsyncSession,
default_role: UserRole = UserRole.EMPLOYEE,
) -> SyncResult:
"""Sync LDAP users into the local database."""
from ldap3 import SUBTREE
conn = self._bind_connection(config)
if conn is None:
raise HTTPException(status_code=502, detail="LDAP bind fehlgeschlagen")
attrs = [
config.attr_email,
config.attr_firstname,
config.attr_lastname,
]
if config.attr_department:
attrs.append(config.attr_department)
if config.attr_personnel_number:
attrs.append(config.attr_personnel_number)
conn.search(
search_base=config.base_dn,
search_filter=config.user_search_filter,
search_scope=SUBTREE,
attributes=attrs,
)
entries = list(conn.entries)
conn.unbind()
result = SyncResult(created=0, updated=0, deactivated=0, errors=[])
ldap_emails: set[str] = set()
for entry in entries:
try:
email = _get_attr(entry, config.attr_email).lower().strip()
if not email:
continue
ldap_emails.add(email)
first = _get_attr(entry, config.attr_firstname) or "?"
last = _get_attr(entry, config.attr_lastname) or "?"
dn = entry.entry_dn
ldap_personnel = (
_get_attr(entry, config.attr_personnel_number).strip()
if config.attr_personnel_number else ""
)
# nur Ziffern akzeptieren (Format-Vorgabe)
if ldap_personnel and not ldap_personnel.isdigit():
logger.warning(
"LDAP personnel_number for %s contains non-digits (%r), skipping mapping.",
email, ldap_personnel,
)
ldap_personnel = ""
existing = await db.scalar(select(User).where(User.email == email))
if existing:
existing.first_name = first
existing.last_name = last
existing.ldap_dn = dn
existing.auth_provider = AuthProvider.LDAP
existing.is_active = True
if ldap_personnel and existing.personnel_number != ldap_personnel:
await self._apply_personnel_from_ldap(
existing, ldap_personnel, db, result,
)
result.updated += 1
else:
user = User(
company_id=config.company_id,
email=email,
first_name=first,
last_name=last,
password_hash=None,
auth_provider=AuthProvider.LDAP,
ldap_dn=dn,
role=default_role,
is_active=True,
)
if ldap_personnel:
await self._apply_personnel_from_ldap(
user, ldap_personnel, db, result, company_id=config.company_id,
)
db.add(user)
result.created += 1
except Exception as exc:
result.errors.append(str(exc))
# Deactivate LDAP users no longer in directory
existing_ldap = await db.scalars(
select(User).where(
User.company_id == config.company_id,
User.auth_provider == AuthProvider.LDAP,
User.is_active.is_(True),
)
)
for user in existing_ldap:
if user.email not in ldap_emails:
user.is_active = False
result.deactivated += 1
config.last_sync_at = datetime.now(timezone.utc)
await db.commit()
return result
async def _apply_personnel_from_ldap(
self,
user: User,
ldap_value: str,
db: AsyncSession,
result: SyncResult,
company_id: uuid.UUID | None = None,
) -> None:
"""Apply personnel_number from LDAP, but skip on conflict (no override of reserved numbers)."""
cid = company_id or user.company_id
# Konflikt mit anderem User in derselben Firma?
conflict = await db.scalar(
select(User.id).where(
User.company_id == cid,
User.personnel_number == ldap_value,
User.id != user.id,
)
)
if conflict is not None:
msg = (
f"LDAP-Personalnummer {ldap_value!r} für {user.email} kollidiert mit "
f"vergebener Nummer Wert verworfen."
)
logger.warning(msg)
result.errors.append(msg)
return
user.personnel_number = ldap_value
def authenticate_ldap(self, config: LdapConfig, email: str, password: str) -> bool:
"""Authenticate a user by finding their DN and attempting a bind."""
from ldap3 import SUBTREE
from ldap3.utils.conv import escape_filter_chars
conn = self._bind_connection(config)
if conn is None:
return False
safe_email = escape_filter_chars(email)
conn.search(
search_base=config.base_dn,
search_filter=f"({config.attr_email}={safe_email})",
search_scope=SUBTREE,
attributes=[config.attr_email],
)
if not conn.entries:
conn.unbind()
return False
user_dn = conn.entries[0].entry_dn
conn.unbind()
# Try binding as the found user
user_conn = self._bind_connection(config, dn=user_dn, password=password)
if user_conn is None:
return False
user_conn.unbind()
return True
ldap_service = LdapService()