"""
Deputy daemon that provides a service via DBus for performing privileged
operations.
Some operations, such as generating configuration files, sending signals to
other processes etc. need certain privileges. The Deputy service runs as *root*
and provides a very simple service over DBus.
"""
import contextlib
import importlib.resources
import io
import logging
import os
import pathlib
import pwd
import re
import signal
import stat
import string
import subprocess
import tempfile
import textwrap
import typing
from functools import partial
from typing import Iterable, Optional, Tuple, Union, overload, Iterator, List
import netaddr
from gi.repository import GLib
from netaddr import EUI, IPAddress
from pydbus import SystemBus
from pydbus.bus import Bus
from sqlalchemy import null
from sqlalchemy.pool import StaticPool
from hades import constants
from hades.common import db
from hades.common.db import (
auth_dhcp_lease,
get_dhcp_lease_of_ip,
unauth_dhcp_lease,
ObjectsDiff,
LeaseInfo,
eui_as_unix,
)
from hades.common.glib import typed_glib_error
from hades.common.privileges import dropped_privileges
from hades.common.signals import install_handler
from hades.config import Config, get_config
from hades.deputy.dhcp import release_dhcp_lease
logger = logging.getLogger(__name__)
[docs]def reload_systemd_unit(bus: Bus, unit: str, timeout: int = 100) -> None:
"""Instruct systemd to reload a given unit.
:param bus: A DBus Bus
:param unit: The name of the systemd unit
:param timeout: Timeout in milliseconds
"""
logger.debug("Instructing systemd to reload unit %s", unit)
with typed_glib_error():
systemd = bus.get('org.freedesktop.systemd1', timeout=timeout)
manager_interface = systemd['org.freedesktop.systemd1.Manager']
manager_interface.ReloadUnit(unit, 'fail', timeout=timeout)
[docs]def restart_systemd_unit(bus: Bus, unit: str, timeout: int = 100) -> None:
"""Instruct systemd to restart a given unit.
:param bus: A DBus Bus
:param unit: The name of the systemd unit
:param timeout: Timeout in milliseconds
"""
logger.debug("Instructing systemd to restart unit %s", unit)
with typed_glib_error():
systemd = bus.get('org.freedesktop.systemd1', timeout=timeout)
manager_interface = systemd['org.freedesktop.systemd1.Manager']
manager_interface.RestartUnit(unit, 'fail', timeout=timeout)
[docs]def generate_dhcp_host_reservations(
hosts: Iterable[Tuple[netaddr.EUI, netaddr.IPAddress, Optional[str]]],
) -> Iterable[str]:
"""Generate lines suitable for dnsmasq's ``--dhcp-hostsfile=`` option.
:param hosts: The MAC address-IP address pairs of the hosts
"""
for mac, ip, hostname in hosts:
mac = netaddr.EUI(mac)
mac.dialect = netaddr.mac_unix_expanded
if hostname is not None:
yield "{0},id:*,{1},{2}\n".format(mac, ip, hostname)
else:
yield "{0},id:*,{1}\n".format(mac, ip)
@overload
def replace_file(
path: Union[os.PathLike, str],
content: Union[Iterable[bytes], bytes],
*,
encoding: None = ...,
owner: Optional[int] = ...,
group: Optional[int] = ...,
mode: Optional[int] = ...,
) -> None: ...
@overload
def replace_file(
path: Union[os.PathLike, str],
content: Union[Iterable[str], str],
*,
encoding: str = ...,
owner: Optional[int] = ...,
group: Optional[int] = ...,
mode: Optional[int] = ...,
) -> None: ...
[docs]def replace_file(
path: Union[os.PathLike, str],
content: Union[Iterable[Union[bytes, str]], Union[bytes, str]],
*,
encoding: Optional[str] = None,
owner: Optional[int] = None,
group: Optional[int] = None,
mode: Optional[int] = None,
) -> None:
"""
Atomically replace a file with the given content.
The directory of the file must exist and must be writeable. The content may
either be a str or bytes object or an Iterable of such objects, in which
case the content will be written via :func:`io.IO.writelines`.
:param path: Path to the file
:param content: The new content of the file
:param encoding: The encoding for :class:`str` content
:param owner: File owner
:param group: File group
:param mode: File mode
:raises OSError: if file system operations fail
"""
open_mode = "wb" if encoding is None else "w"
path = pathlib.Path(path)
parent = path.parent
if encoding is None and isinstance(content, str):
raise ValueError("encoding required for writing str content")
with tempfile.NamedTemporaryFile(
open_mode, encoding=encoding, dir=parent, delete=False
) as file:
dir_fd = None
try:
dir_fd = os.open(
parent, os.O_DIRECTORY | os.O_CLOEXEC | os.O_RDONLY, 0
)
fd = file.fileno()
if (owner is None) ^ (group is None):
stat_result = os.fstat(fd)
if owner is None:
owner = stat_result.st_uid
if group is None:
group = stat_result.st_gid
if owner is not None:
os.fchown(fd, owner, group)
if mode is not None:
os.fchmod(fd, mode)
if isinstance(content, (bytes, str)):
file.write(content)
else:
file.writelines(content)
file.flush()
os.fsync(fd)
os.rename(file.name, path)
os.fsync(dir_fd)
except BaseException:
os.unlink(file)
raise
finally:
if dir_fd is not None:
os.close(dir_fd)
[docs]def generate_auth_dhcp_hosts_file(
hosts: Iterable[Tuple[netaddr.EUI, netaddr.IPAddress, Optional[str]]],
) -> None:
"""Generate the dnsmasq hosts file for authenticated users.
This file is passed toh the dnsmasq via the ``--dhcp-hostsfile`` option.
The lines are generated by :func:`generate_dhcp_host_reservations`.
"""
file = pathlib.Path(constants.AUTH_DHCP_HOSTS_FILE)
logger.info("Generating DHCP hosts file %s", file)
# Use hades-auth-dhcp as owner not as root
auth_dhcp_pwd = pwd.getpwnam(constants.AUTH_DHCP_USER)
try:
replace_file(
file,
generate_dhcp_host_reservations(hosts),
encoding="ascii",
owner=auth_dhcp_pwd.pw_uid,
group=auth_dhcp_pwd.pw_gid,
mode=stat.S_IRUSR | stat.S_IRGRP,
)
except OSError as e:
logger.error(
"Failed to replace DHCP hosts file %s: %s", file, e.strerror
)
[docs]def generate_ipset_swap(ipset_name: str, tmp_ipset_name: str,
ips: Iterable[netaddr.IPAddress]) -> Iterable[str]:
"""Generate an ``ipset`` script, that replaces an existing ``hash:ip`` ipset
with new contents.
:param ipset_name: The ipset to replace
:param tmp_ipset_name: Name of the temporary ipset
:param ips: The new contents of the ipset
"""
yield 'create {} hash:ip -exist\n'.format(tmp_ipset_name)
yield 'flush {}\n'.format(tmp_ipset_name)
yield from map(partial('add {} {}\n'.format, tmp_ipset_name), ips)
yield 'swap {} {}\n'.format(ipset_name, tmp_ipset_name)
yield 'destroy {}\n'.format(tmp_ipset_name)
[docs]def update_alternative_dns_ipset(ips: Iterable[netaddr.IPAddress]) -> None:
"""Update the *alternative DNS ipset* with the new IP addresses
:param ips: The new IP addresses
"""
conf = get_config()
ipset_name = conf['HADES_AUTH_DNS_ALTERNATIVE_IPSET']
tmp_ipset_name = 'tmp_' + ipset_name
logger.info("Updating alternative_dns ipset (%s)", ipset_name)
buffer = io.BytesIO()
commands = io.TextIOWrapper(buffer, 'ascii')
commands.writelines(generate_ipset_swap(ipset_name, tmp_ipset_name, ips))
commands.flush()
subprocess.run(
[constants.IP, 'netns', 'exec', 'auth', constants.IPSET, 'restore'],
input=buffer.getvalue())
[docs]def generate_radius_clients(
clients: Iterable[Tuple[str, str, str, int, str, str, str, str]]
) -> Iterable[str]:
"""Generate the FreeRADIUS configuration for a given list of NAS clients in
the ``clients.conf`` format.
:param clients: An iterable of (Shortname, NAS-Name, NAS-Type, Port, Secret,
Server, Community, Description)-tuples. Currently only shortname NAS-Name,
NAS-Type and the Secret elements are used.
:return: configuration snippets for the given NAS clients
"""
escape_pattern = re.compile(r'(["\\])')
replacement = r'\\\1'
template = string.Template(textwrap.dedent("""
client $shortname {
shortname = "$shortname"
ipaddr = "$nasname"
secret = "$secret"
require_message_authenticator = no
nastype = $type
coa_server = "$shortname"
}
home_server $shortname {
type = coa
ipaddr = "$nasname"
port = 3799
secret = "$secret"
coa {
irt = 2
mrt = 16
mrc = 5
mrd = 30
}
}
"""))
for shortname, nasname, type, ports, secret, server, community, description in clients:
yield template.substitute(
shortname=shortname, nasname=nasname, type=type, ports=ports,
secret=escape_pattern.sub(replacement, secret), community=community,
description=description)
[docs]def generate_radius_clients_file(
clients: Iterable[Tuple[str, str, str, int, str, str, str, str]]
) -> None:
"""Generate a FreeRADIUS ``clients.conf`` file.
:param clients: See :func:`generate_radius_clients` for a description
"""
logger.info("Generating freeRADIUS clients configuration")
file_name = constants.RADIUS_CLIENTS_FILE
radius_pwd = pwd.getpwnam(constants.RADIUS_USER)
try:
with open(file_name, mode='w', encoding='ascii') as f:
fd = f.fileno()
os.fchown(fd, radius_pwd.pw_uid, radius_pwd.pw_gid)
os.fchmod(fd, stat.S_IRUSR | stat.S_IRGRP)
f.writelines(generate_radius_clients(clients))
except OSError as e:
logger.exception("Error writing %s: %s", file_name, e.strerror)
# noinspection PyPep8Naming
[docs]class HadesDeputyService:
"""Deputy DBus service
This class implements a DBus service that exposes some privileged operations
for use by the :mod:`hades.agent` or the periodic systemd timer services.
For security reasons, the service doesn't accept data from the DBus clients
and always queries the database itself, so that this service can't be
misused.
"""
dbus = importlib.resources.read_text(__package__, "interface.xml")
"""DBus object introspection specification
:meta hide-value:
"""
def __init__(self, bus: Bus, config: Config):
"""
:param bus: The bus (typically the system bus)
:param config: The configuration object
"""
self.bus = bus
self.config = config
self.engine = db.create_engine(config, poolclass=StaticPool)
database_pwd = pwd.getpwnam(constants.DATABASE_USER)
original_creator = self.engine.pool._creator
def creator(connection_record=None):
"""Create a connection as the database user"""
with dropped_privileges(database_pwd):
connection = original_creator(connection_record)
return connection
self.engine.pool._creator = creator
[docs] def Refresh(self, force: bool) -> str:
"""Refresh the materialized views.
If necessary depended config files are regenerated and the corresponding services are reloaded.
The forced refresh is a little more aggressive in what it consolidates
to achieve eventual consistency:
* The host reservation file is regenerated regardless of whether the content
of the ``auth_dhcp_host`` table has changed.
* The radius config is regenerated regardless of whether the content
of the ``nas`` table has changed.
* The alternative DNS ipset is regenerated regardless of whether the content
of the ``alternative_dns`` table has changed.
* Instead of invalidating leases which were modified in the `auth_dhcp_hosts`
reservation table, we invalidate every lease in `auth_dhcp_leases`
which does not belong to a host reservation.
:param force: Whether to use the forced refresh.
"""
reload_auth_dhcp_host: bool # if set, we want `hosts: List`
hosts: Optional[Iterator[
Tuple[netaddr.EUI, netaddr.IPAddress, Optional[str]]
]] # set iff `reload_auth_dhcp_host`
auth_leases_to_invalidate: List[LeaseInfo] = []
reload_nas: bool # if set, we want `clients: List`
clients: Optional[Iterator[
Tuple[str, str, str, int, str, str, str, str]
]] # set iff `reload_nas`
reload_alternative_dns: bool # if set, we want `ips: List`
ips: Optional[Iterator[netaddr.IPAddress]]
logger.info("Refreshing materialized views")
with contextlib.closing(self.engine.connect()) as connection:
with connection.begin():
db.refresh_materialized_view(connection, db.radcheck)
db.refresh_materialized_view(connection, db.radreply)
db.refresh_materialized_view(connection, db.radgroupcheck)
db.refresh_materialized_view(connection, db.radgroupreply)
db.refresh_materialized_view(connection, db.radusergroup)
if force:
with connection.begin():
db.refresh_materialized_view(connection, db.auth_dhcp_host)
db.refresh_materialized_view(connection, db.nas)
db.refresh_materialized_view(connection, db.alternative_dns)
logger.info("Forcing reload of DHCP hosts, NAS clients and "
"alternative DNS clients")
reload_auth_dhcp_host = True
reload_nas = True
reload_alternative_dns = True
auth_leases_to_invalidate = list(
db.get_all_invalid_auth_dhcp_leases(connection)
)
hosts = db.get_all_auth_dhcp_hosts(connection)
clients = db.get_all_nas_clients(connection)
ips = db.get_all_alternative_dns_ips(connection)
else:
HostDiff = ObjectsDiff[Tuple[IPAddress, EUI, IPAddress, EUI]]
auth_dhcp_host_diff: HostDiff = typing.cast(
HostDiff,
db.refresh_and_diff_materialized_view(
connection,
db.auth_dhcp_host,
db.temp_auth_dhcp_host,
[
db.temp_auth_dhcp_host.c.IPAddress, # old ip
db.temp_auth_dhcp_host.c.MAC, # old mac
db.auth_dhcp_host.c.IPAddress, # new ip
db.auth_dhcp_host.c.MAC, # new mac
],
unique_columns=(db.auth_dhcp_host.c.MAC, db.auth_dhcp_host.c.IPAddress),
)
)
if auth_dhcp_host_diff:
logger.info(
"Auth DHCP host reservations changed (%s).",
auth_dhcp_host_diff,
)
logger.debug(
"Full host reservations diff:\n%s",
f"{auth_dhcp_host_diff:l}",
)
hosts = db.get_all_auth_dhcp_hosts(connection)
auth_leases_to_invalidate = [
LeaseInfo(old_ip, old_mac)
for old_ip, old_mac, _, _
in auth_dhcp_host_diff.deleted + auth_dhcp_host_diff.modified
]
reload_auth_dhcp_host = True
else:
reload_auth_dhcp_host = False
nas_diff = db.refresh_and_diff_materialized_view(
connection, db.nas, db.temp_nas, [null()])
if nas_diff:
logger.info(
"RADIUS clients changed (%s).",
nas_diff,
)
clients = db.get_all_nas_clients(connection)
reload_nas = True
else:
reload_nas = False
alternative_dns_diff = db.refresh_and_diff_materialized_view(
connection, db.alternative_dns, db.temp_alternative_dns,
[null()])
if alternative_dns_diff:
logger.info(
"Alternative auth DNS clients changed (%s).",
alternative_dns_diff,
)
ips = db.get_all_alternative_dns_ips(connection)
reload_alternative_dns = True
else:
reload_alternative_dns = False
if auth_leases_to_invalidate:
logger.info(
"Releasing %d invalid leases",
len(auth_leases_to_invalidate),
)
server_ip = self.config.HADES_AUTH_LISTEN[0].ip
for lease in auth_leases_to_invalidate:
logger.debug("Releasing lease %s", lease)
# potential optimization: batched packet sending
release_dhcp_lease(server_ip, lease.ip, lease.mac)
if reload_auth_dhcp_host:
assert hosts is not None
generate_auth_dhcp_hosts_file(hosts)
reload_systemd_unit(self.bus, "hades-auth-dhcp.service")
if reload_nas:
assert clients is not None
generate_radius_clients_file(clients)
restart_systemd_unit(self.bus, 'hades-radius.service')
if reload_alternative_dns:
assert ips is not None
update_alternative_dns_ipset(ips)
return "OK"
[docs] def Cleanup(self) -> str:
"""Clean up old records in the ``radacct`` and ``radpostauth`` tables.
"""
logger.info("Cleaning up old records")
interval = self.config.HADES_RETENTION_INTERVAL
with contextlib.closing(self.engine.connect()) as connection:
db.delete_old_sessions(connection, interval)
db.delete_old_auth_attempts(connection, interval)
old_auth_leases = db.get_all_auth_dhcp_leases(
connection,
interval=self.config.HADES_AUTH_DHCP_LEASE_LIFETIME
)
old_unauth_leases = db.get_all_unauth_dhcp_leases(
connection,
interval=self.config.HADES_UNAUTH_DHCP_LEASE_TIME
)
for expires_at, mac, ip, hostname, client_id in old_auth_leases:
logger.warning(
"Found expired auth_dhcp_lease: (%s, %s, %s) (expired at %s)",
eui_as_unix(mac), ip, hostname or "<no hostname>", expires_at,
)
for expires_at, mac, ip, hostname, client_id in old_unauth_leases:
logger.warning(
"Found expired unauth_dhcp_lease: (%s, %s, %s) (expired at %s)",
eui_as_unix(mac), ip, hostname or "<no hostname>", expires_at,
)
return "OK"
def _release_dhcp_lease(
self, table, server_ip: netaddr.IPAddress, client_ip: str
) -> str:
"""
Release an auth or unauth DHCP lease
:return:
"""
try:
client_ip = netaddr.IPAddress(client_ip)
except ValueError:
return "ERROR: Illegal IP address %s" % client_ip
with contextlib.closing(self.engine.connect()) as connection:
lease_info = get_dhcp_lease_of_ip(table, connection, client_ip)
if lease_info is None:
logger.warning("No lease for %s found", client_ip)
return "OK"
expiry_time, mac, hostname, client_id = lease_info
release_dhcp_lease(server_ip, client_ip, mac, client_id)
return "OK"
[docs] def ReleaseAuthDhcpLease(self, client_ip: str) -> str:
"""
Release an auth DHCP lease
:return:
"""
logger.info("Releasing auth DHCP lease for client %s", client_ip)
return self._release_dhcp_lease(
auth_dhcp_lease, self.config.HADES_AUTH_LISTEN[0].ip, client_ip
)
[docs] def ReleaseUnauthDhcpLease(self, client_ip: str) -> str:
"""
Release an auth DHCP lease
:return:
"""
logger.info("Releasing unauth DHCP lease for client %s", client_ip)
return self._release_dhcp_lease(
unauth_dhcp_lease, self.config.HADES_UNAUTH_LISTEN[0].ip, client_ip
)
[docs]def run_event_loop():
"""Run the DBus :class:`HadesDeputyService` on the GLib event loop."""
with contextlib.ExitStack() as stack:
bus: Bus = stack.enter_context(SystemBus())
logger.debug(
"Publishing interface %s on DBus", constants.DEPUTY_DBUS_NAME
)
config = get_config()
stack.enter_context(
bus.publish(
constants.DEPUTY_DBUS_NAME, HadesDeputyService(bus, config)
)
)
loop = GLib.MainLoop()
stack.enter_context(
install_handler(
(signal.SIGHUP, signal.SIGINT, signal.SIGTERM),
lambda _sig, _frame: loop.quit(),
)
)
loop.run()