Source code for ldap_sync.concepts.record

"""
ldap_sync.concepts.record
~~~~~~~~~~~~~~~~~~~~~~~~~
"""
#  Copyright (c) 2022. The Pycroft Authors. See the AUTHORS file.
#  This file is part of the Pycroft project and licensed under the terms of
#  the Apache License, Version 2.0. See the LICENSE file for details

from __future__ import annotations

import dataclasses
import typing
import typing as t

from ldap3.utils.conv import escape_filter_chars

from .types import (
    Attributes,
    NormalizedAttributes,
    DN,
    AttributeValues,
)

def _canonicalize_to_list(
    value: AttributeValues,
) -> list[str] | list[bytes] | list[int]:
    """Canonicalize a value to a list.

    If value is a list, return it.  If it is None or an empty string,
    return an empty list.  Else, return value.
    """
    if isinstance(value, list):
        return list(value)
    if value == "" or value == b"" or value is None:
        return []
    # str, byte, int – or unknown. But good fallback.
    return [value]  # type: ignore


# the “true” type is not expressible with mypy: it's the overload
# bytes | str -> str
# T -> T
# …but mypy rejects this because we have an argument overlap with incompatible return types.
def _maybe_escape_filter_chars[T](value: T) -> T | str:
    """Escape and return according to :rfc:`04515` if type is string-like.

    Else, return the unchanged object.
    """
    if isinstance(value, bytes) or isinstance(value, str):
        return typing.cast(str, escape_filter_chars(value))
    return value


[docs] def escape_and_normalize_attrs(attrs: Attributes) -> NormalizedAttributes: return { key: [ _maybe_escape_filter_chars(x) for x in typing.cast(list[str], _canonicalize_to_list(val)) ] for key, val in attrs.items() }
[docs] @dataclasses.dataclass(frozen=True) class Record: """Create a new record with a dn and certain attributes. A record represents an entry which is to be synced to the LDAP, and consists of a dn and relevant attributes. Constructors are provided for SQLAlchemy ORM objects as well as entries of an ldap search response. :param dn: The DN of the record :param attrs: The attributes of the record. Every value will be canonicalized to a list to allow for a senseful comparison between two records, as well as escaped according to :rfc:`04515`. Additionally, the keys are fixed to what's given by :meth:`get_synced_attributes`. """ dn: DN attrs: NormalizedAttributes SYNCED_ATTRIBUTES: typing.ClassVar[typing.AbstractSet[str]] def __init__(self, dn: DN, attrs: Attributes) -> None: object.__setattr__(self, "dn", dn) attrs = {k: v for k, v in attrs.items() if k in self.SYNCED_ATTRIBUTES} for key in self.SYNCED_ATTRIBUTES: attrs.setdefault(key, []) # escape_filter_chars is idempotent ⇒ no double escaping object.__setattr__(self, "attrs", escape_and_normalize_attrs(attrs)) def __getitem__(self, item: str) -> typing.Any: return self.attrs.__getitem__(item) @t.override def __init_subclass__(cls, **kwargs: dict[str, typing.Any]) -> None: if "SYNCED_ATTRIBUTES" not in cls.__dict__: raise TypeError("Subclasses of Record must implement the SYNCED_ATTRIBUTES field") super().__init_subclass__(**kwargs) # `__eq__` must be total, hence no type restrictions/hints @t.override def __eq__(self, other: object) -> bool: try: return self.dn == other.dn and self.attrs == other.attrs # type: ignore except AttributeError: return False @t.override def __repr__(self) -> str: return f"<{type(self).__name__} dn={self.dn}>" @classmethod # we don't care about the values, hence not typing as `Attributes` def _validate_attributes(cls, attributes: dict[str, typing.Any]) -> None: # sanity check: did we forget something in `cls.SYNCED_ATTRIBUTES` that # we support migrating anyway? _missing_attrs = cls.SYNCED_ATTRIBUTES - set(attributes.keys()) assert not _missing_attrs, f"Missing attributes: {_missing_attrs}" _superfluous_attrs = set(attributes.keys()) - cls.SYNCED_ATTRIBUTES assert not _superfluous_attrs, f"Superfluous attributes: {_superfluous_attrs}"
[docs] class UserRecord(Record): """Create a new user record with a dn and certain attributes.""" SYNCED_ATTRIBUTES = frozenset( [ "objectClass", "mail", "sn", "cn", "loginShell", "gecos", "userPassword", "homeDirectory", "gidNumber", "uidNumber", "uid", "pwdAccountLockedTime", "shadowExpire", ] ) LDAP_OBJECTCLASSES = ["top", "inetOrgPerson", "posixAccount", "shadowAccount"] LDAP_LOGIN_ENABLED_PROPERTY = "ldap_login_enabled" PWD_POLICY_BLOCKED = "login_disabled"
[docs] @classmethod def get_synced_attributes(cls) -> typing.AbstractSet[str]: return cls.SYNCED_ATTRIBUTES
[docs] class GroupRecord(Record): """Create a new groupOfMembers record with a dn and certain attributes. Used to represent groups and properties. """ SYNCED_ATTRIBUTES = frozenset(["objectClass", "cn", "member"]) LDAP_OBJECTCLASSES = ["groupOfMembers"]
[docs] @dataclasses.dataclass class RecordState: """A Class representing the state (current, desired) of a record. This class is essentially a duple consisting of a current and desired record to represent the difference. """ current: Record | None = None desired: Record | None = None