Source code for ldap_sync.record_diff
# Copyright (c) 2022. The Pycroft Authors. See the AUTHORS file.
# This file is part of the Pycroft project and licensed under the terms of
# the Apache License, Version 2.0. See the LICENSE file for details
"""
ldap_sync.record_diff
~~~~~~~~~~~~~~~~~~~~~
"""
import typing
from .concepts import record, action, types
[docs]
def diff_attributes(
current_attrs: types.NormalizedAttributes,
desired_attrs: types.NormalizedAttributes,
) -> types.NormalizedAttributes:
"""Determine which attributes need to be updated.
This function doesn't check whether both dicts have equal
keys, meaning keys not given in :paramref:`desired_attrs`
won't end up in the modification dict. Removing attributes
has to be done by explicitly setting them to an empty string.
:param current_attrs: the attributes of the entity currently in the system.
:param desired_attrs: the attributes we want the entity to have.
"""
return {
attr: desired_value
for attr, desired_value in desired_attrs.items()
if attr not in current_attrs # attribute is new
or current_attrs[attr] != desired_value # attribute has changed
}
[docs]
def diff_user_attributes(
current_attrs: types.NormalizedAttributes,
desired_attrs: types.NormalizedAttributes,
) -> types.NormalizedAttributes:
"""Like :func:`diff_attributes`, but aware of the
`ppolicy <https://linux.die.net/man/5/slapo-ppolicy>`_ overlay."""
modifications = diff_attributes(current_attrs, desired_attrs)
# Do not try to delete pwdAccountLockedTime if password is changed,
# as the ppolicy overlay already takes care of that.
password_changed = "userPassword" in modifications
locked_time_present_or_none = not modifications.get("pwdAccountLockedTime")
if password_changed and locked_time_present_or_none:
modifications.pop("pwdAccountLockedTime", None)
return modifications
[docs]
def diff_records[T: record.Record](current: T | None, desired: T | None) -> action.Action:
"""Determines an action to take, given a desired and a current record.
:param current: the present state
:param desired: the future state
"""
match (current, desired):
case (None, None):
raise ValueError("cannot diff two nonexistent records")
case (None, desired):
return action.AddAction(record=desired)
case (current, None):
return action.DeleteAction(record_dn=current.dn)
case (c, d) if c == d:
return action.IdleAction(record_dn=d.dn)
case (record.Record(dn=dn1), record.Record(dn=dn2)) if dn1 != dn2:
raise TypeError("Cannot compute difference between records of different dn")
case (record.UserRecord() as c, record.UserRecord() as d):
return action.ModifyAction(
record_dn=d.dn, modifications=diff_user_attributes(c.attrs, d.attrs)
)
case (record.GroupRecord() as c, record.GroupRecord() as d):
return action.ModifyAction(
record_dn=d.dn, modifications=diff_attributes(c.attrs, d.attrs)
)
case (c, d):
raise TypeError(f"Cannot diff {type(c).__name__} and {type(d).__name__}")
# see https://github.com/python/mypy/issues/12534
raise AssertionError # pragma: no cover
[docs]
def iter_zip_dicts[
TKey, TVal1, TVal2
](
d1: dict[TKey, TVal1],
d2: dict[TKey, TVal2],
) -> typing.Iterator[tuple[TKey, tuple[TVal1 | None, TVal2 | None]]]:
for k in d1.keys() | d2.keys():
yield k, (d1.get(k), d2.get(k))
[docs]
def bulk_diff_records(
current: typing.Iterable[record.Record], desired: typing.Iterable[record.Record]
) -> dict[types.DN, action.Action]:
return {
dn: diff_records(cur, des)
for dn, (cur, des) in iter_zip_dicts(
{r.dn: r for r in current},
{r.dn: r for r in desired},
)
}