Source code for web.api.v0

import typing as t
from decimal import Decimal
from datetime import timedelta, datetime, date
from functools import wraps
from ipaddress import IPv4Address, IPv6Address

from flask import jsonify, current_app, Response
from flask.typing import ResponseReturnValue
from flask_restful import Api, Resource as FlaskRestfulResource, abort
from packaging.utils import InvalidName
from sqlalchemy.exc import IntegrityError
from sqlalchemy import select
from sqlalchemy.orm import joinedload, selectinload, undefer, with_polymorphic
from sqlalchemy.orm.interfaces import ORMOption
from webargs import fields
from webargs.flaskparser import use_kwargs

from pycroft.helpers import utc
from pycroft.helpers.i18n import Message
from pycroft.lib.finance import estimate_balance, get_last_import_date
from pycroft.lib.mpsk_client import mpsk_edit, mpsk_client_create, mpsk_delete
from pycroft.lib.host import change_mac, host_create, interface_create, host_edit
from pycroft.lib.net import SubnetFullException
from pycroft.lib.swdd import get_swdd_person_id, get_relevant_tenancies, \
    get_first_tenancy_with_room
from pycroft.lib.task import cancel_task
from pycroft.lib.user import (
    encode_type2_user_id,
    edit_email,
    change_password,
    status,
    traffic_history as func_traffic_history,
    scheduled_membership_end,
    move_out,
    membership_ending_task,
    reset_wifi_password,
    create_member_request,
    NoTenancyForRoomException,
    UserExistsException,
    UserExistsInRoomException,
    EmailTakenException,
    LoginTakenException,
    MoveInDateInvalidException,
    check_similar_user_in_room,
    get_name_from_first_last,
    confirm_mail_address,
    get_user_by_swdd_person_id,
    scheduled_membership_start,
    send_confirmation_email,
    get_user_by_id_or_login,
    send_password_reset_mail,
    change_password_from_token,
)
from pycroft.model import session
from pycroft.model.facilities import Room
from pycroft.model.finance import Account, Split
from pycroft.model.host import IP, Interface, Host
from pycroft.model.session import current_timestamp
from pycroft.model.task import Task
from pycroft.model.types import IPAddress, InvalidMACAddressException
from pycroft.model.user import User, IllegalEmailError, IllegalLoginError
from web.blueprints.mpskclient import get_mpsk_client_or_404

api = Api()


[docs] def parse_authorization_header(value: str | None) -> str | None: if not value: return None try: auth_type, api_key = value.split(maxsplit=1) return api_key if auth_type.lower() == 'apikey' else None except ValueError: return None
_P = t.ParamSpec("_P") _TF = t.Callable[_P, ResponseReturnValue]
[docs] def authenticate(func: _TF) -> _TF: @t.cast(t.Callable[[_TF], _TF], wraps(func)) @use_kwargs( { "auth": fields.Str(required=True, data_key="authorization"), }, location="headers", ) def wrapper(auth: str, *args: _P.args, **kwargs: _P.kwargs) -> ResponseReturnValue: api_key = parse_authorization_header(auth) if api_key is None: abort(401, message="Missing API key.") if current_app.config['PYCROFT_API_KEY'] != api_key: abort(401, message="Invalid API key.") return func(*args, **kwargs) return wrapper
[docs] class Resource(FlaskRestfulResource): method_decorators = [authenticate]
[docs] def get_user_or_404(user_id: int, options: t.Sequence[ORMOption] | None = None) -> User: user = session.session.get(User, user_id, options=options) if user is None: abort(404, message=f"User {user_id} does not exist") return user
[docs] def get_authenticated_user(user_id: int, password: str) -> User: user = get_user_or_404(user_id) if user is None or not user.check_password(password): abort(401, message=f"Authentication of user {user_id} failed") return user
[docs] def get_interface_or_404(interface_id: int) -> Interface: interface = session.session.get(Interface, interface_id) if interface is None: abort(404, message=f"Interface {interface_id} does not exist") return interface
[docs] def generate_user_data(user: User) -> Response: props = {prop.property_name for prop in user.current_properties} user_status = status(user) interval = timedelta(days=7) step = timedelta(days=1) traffic_history = func_traffic_history( user.id, current_timestamp() - interval + step, current_timestamp(), ) class _Entry(t.TypedDict): valid_on: date amount: int | Decimal description: str finance_history: list[_Entry] = [ { "valid_on": split.transaction.valid_on, # Invert amount, to display it from the user's point of view "amount": -split.amount, "description": Message.from_json(split.transaction.description).localize(), } for split in user.account.splits ] finance_history = sorted(finance_history, key=lambda e: e['valid_on'], reverse=True) last_import_ts = get_last_import_date(session.session) last_finance_update = last_import_ts and last_import_ts.date() or None wifi_password = user.wifi_password med = scheduled_membership_end(user) mbd = scheduled_membership_start(user) interface_info = [{ 'id': i.id, 'mac': str(i.mac), 'ips': [str(ip.address) for ip in i.ips] } for h in user.hosts for i in h.interfaces] return jsonify( id=user.id, user_id=encode_type2_user_id(user.id), name=user.name, login=user.login, status={ 'member': user_status.member, 'traffic_exceeded': user_status.traffic_exceeded, 'network_access': user_status.network_access, 'account_balanced': user_status.account_balanced, 'violation': user_status.violation }, room=user.room.short_name if user.room is not None else None, interfaces=interface_info, mail=user.email, mail_forwarded=user.email_forwarded, mail_confirmed=user.email_confirmed, cache='cache_access' in props, # TODO: make `has_property` use `current_property` properties=list(props), traffic_history=[e.__dict__ for e in traffic_history], # TODO: think about better way for credit finance_balance=-user.account.balance, finance_history=finance_history, last_finance_update=last_finance_update.isoformat() if last_finance_update else None, membership_end_date=med.isoformat() if med else None, membership_begin_date=mbd.isoformat() if mbd else None, wifi_password=wifi_password, birthdate=user.birthdate.isoformat() if user.birthdate else None, )
[docs] class UserResource(Resource):
[docs] def get(self, user_id: int) -> Response: user = get_user_or_404( user_id, options=[ joinedload(User.room).joinedload(Room.building), joinedload(User.hosts) .joinedload(Host.interfaces) .joinedload(Interface.ips), undefer(User.wifi_passwd_hash), joinedload(User.account) .selectinload(Account.splits) .joinedload(Split.transaction), selectinload(User.tasks.of_type(with_polymorphic(Task, "*"))), selectinload(User.current_properties), ], ) return generate_user_data(user)
api.add_resource(UserResource, '/user/<int:user_id>')
[docs] class ChangeEmailResource(Resource):
[docs] @use_kwargs( { "password": fields.Str(required=True), "new_email": fields.Str(required=True), "forwarded": fields.Bool(required=False, load_default=True), }, location="form", ) def post( self, user_id: int, password: str, new_email: str, forwarded: bool | None = None, ) -> ResponseReturnValue: user = get_authenticated_user(user_id, password) try: edit_email(user, new_email, forwarded, processor=user) session.session.commit() except IllegalEmailError: abort(400, message='Invalid email address.') return "Email has been changed."
api.add_resource(ChangeEmailResource, '/user/<int:user_id>/change-email')
[docs] class ChangePasswordResource(Resource):
[docs] @use_kwargs( { "old_password": fields.Str(required=True, data_key="password"), "new_password": fields.Str(required=True), }, location="form", ) def post( self, user_id: int, old_password: str, new_password: str ) -> ResponseReturnValue: user = get_authenticated_user(user_id, old_password) change_password(user, new_password) session.session.commit() return "Password has been changed."
api.add_resource(ChangePasswordResource, '/user/<int:user_id>/change-password')
[docs] class FinanceHistoryResource(Resource):
[docs] def get(self, user_id: int) -> ResponseReturnValue: user = get_user_or_404(user_id) return jsonify([ {'valid_on': s.transaction.valid_on.isoformat(), 'amount': s.amount} for s in sorted(user.account.splits, key=lambda s: s.transaction.valid_on) ])
api.add_resource(FinanceHistoryResource, '/user/<int:user_id>/finance-history')
[docs] class AuthenticationResource(Resource):
[docs] @use_kwargs( { "login": fields.Str(required=True), "password": fields.Str(required=True), }, location="form", ) def post(self, login: str, password: str) -> ResponseReturnValue: user = User.verify_and_get(login=login, plaintext_password=password) if user is None: abort(401, message="Authentication failed") return {'id': user.id}
api.add_resource(AuthenticationResource, '/user/authenticate')
[docs] class UserByIPResource(Resource):
[docs] @use_kwargs( { "ipv4": fields.IP(required=True, data_key="ip"), # type: ignore[no-untyped-call] }, location="query", ) def get(self, ipv4: IPv4Address | IPv6Address) -> ResponseReturnValue: user = session.session.scalars( select(User) .join(Host) .join(Interface) .join(IP) .where(IP.address == ipv4) .options( # multi-valued, but effectively one-valued: # a user has only one IP (except in rare cases) → joinedload joinedload(User.hosts, innerjoin=True) .joinedload(Host.interfaces, innerjoin=True) .joinedload(Interface.ips, innerjoin=True), # single-valued → joinedload joinedload(User.room, innerjoin=True) .joinedload(Room.building, innerjoin=True), # 1 account, but many splits joinedload(User.account, innerjoin=True) # many splits → load afterwards with `select/where/in` .selectinload(Account.splits) .joinedload(Split.transaction, innerjoin=True), # many properties selectinload(User.current_properties), ) ).unique().one_or_none() if user is None: abort(404, message=f"IP {ipv4} is not related to a user") return generate_user_data(user)
api.add_resource(UserByIPResource, '/user/from-ip')
[docs] class MPSKSClientsResource(Resource):
[docs] def get(self, user_id: int) -> ResponseReturnValue: user = get_user_or_404(user_id) return jsonify( [ { "name": mpsk_client.name, "id": mpsk_client.id, "mac": mpsk_client.mac, } for mpsk_client in user.mpsk_clients ] )
api.add_resource(MPSKSClientsResource, "/user/<int:user_id>/get-mpsks")
[docs] class MPSKSClientAddResource(Resource):
[docs] @use_kwargs( { "password": fields.Str(required=True), "mac": fields.Str(required=True), "name": fields.Str(required=True), }, location="form", ) def post(self, user_id: int, password: str, mac: str, name: str) -> ResponseReturnValue: user = get_authenticated_user(user_id, password) # checks rather the user has all settable mpsks clients created if len(user.mpsk_clients) >= current_app.config.get("MAX_MPSKS", 30): abort(400, message="User has the maximum count of mpsk clients.") if not user.wifi_password: abort(412, message="Please generate a wifi password first") try: mpsk_client = mpsk_client_create( session.session, owner=user, mac=mac, name=name, processor=user ) session.session.commit() except InvalidMACAddressException as e: abort(422, message=f"Invalid MAC address: {e}") except IntegrityError as e: abort(409, message=f"Mac address is already in use: {e}") except InvalidName: abort(400, message="No proper name was provided.") return jsonify( { "name": mpsk_client.name, "id": mpsk_client.id, "mac": mpsk_client.mac, } )
api.add_resource(MPSKSClientAddResource, "/user/<int:user_id>/add-mpsk")
[docs] class MPSKSClientDeleteResource(Resource):
[docs] @use_kwargs( { "password": fields.Str(required=True), }, location="form", ) def post(self, user_id: int, mpsk_id: int, password: str) -> ResponseReturnValue: user = get_authenticated_user(user_id, password) mpsk = get_mpsk_client_or_404(mpsk_id) if not user == mpsk.owner: abort(401, message="You are not the owner of the mpsk.") mpsk_delete(session.session, mpsk_client=mpsk, processor=user) session.session.commit() return "mpsk client was deleted"
api.add_resource(MPSKSClientDeleteResource, "/user/<int:user_id>/delete-mpsk/<int:mpsk_id>")
[docs] class MPSKSClientChangeResource(Resource):
[docs] @use_kwargs( { "password": fields.Str(required=True), "mac": fields.Str(required=True), "name": fields.Str(required=True), }, location="form", ) def post( self, user_id: int, mpsk_id: int, password: str, mac: str, name: str ) -> ResponseReturnValue: user = get_authenticated_user(user_id, password) mpsk = get_mpsk_client_or_404(mpsk_id) if user != mpsk.owner: abort(404, message=f"User {user_id} does not own the mpsk client with the id {mpsk_id}") try: mpsk_edit(session.session, client=mpsk, owner=user, name=name, mac=mac, processor=user) session.session.commit() except InvalidMACAddressException: abort(422, message="Invalid MAC address.") except IntegrityError: abort(409, message="Mac address is already in use.") except InvalidName: abort(400, message="No proper name was provided.") return "mpsk has been changed."
api.add_resource(MPSKSClientChangeResource, "/user/<int:user_id>/change-mpsk/<int:mpsk_id>")
[docs] class UserInterfaceResource(Resource):
[docs] @use_kwargs( { "password": fields.Str(required=True), "mac": fields.Str(required=True), "host_name": fields.Str(required=False), }, location="form", ) def post( self, user_id: int, interface_id: int, password: str, mac: str, host_name: str | None = None, ) -> ResponseReturnValue: user = get_authenticated_user(user_id, password) interface = get_interface_or_404(interface_id) if interface.host.owner != user: abort( 404, message=f"User {user_id} does not have a host with interface {interface_id}" ) try: if host_name: host_edit( interface.host, interface.host.owner, interface.host.room, host_name, user, ) change_mac(interface, mac, user) session.session.add(interface) session.session.commit() except InvalidMACAddressException: abort(400, message='Invalid mac address.') except IntegrityError: abort(400, message='Mac address is already in use.') return "Mac address has been changed."
api.add_resource(UserInterfaceResource, '/user/<int:user_id>/change-mac/<int:interface_id>')
[docs] class ActivateNetworkAccessResource(Resource):
[docs] @use_kwargs( { "password": fields.Str(required=True), "birthdate": fields.Date(required=True), "mac": fields.Str(required=True), "host_name": fields.Str(required=False), }, location="form", ) def post( self, user_id: int, password: str, birthdate: date, mac: str, host_name: str | None = None, ) -> ResponseReturnValue: user = get_authenticated_user(user_id, password) if user.room is None: abort(424, message="User is not living in a dormitory.") if not user.has_property('network_access'): abort(403, message="User has no network access.") interfaces = Interface.q.join(Host).filter(Host.owner_id == user.id).all() if len(interfaces) > 0: abort(412, message="User already has a host with interface.") user.birthdate = birthdate host = Host.q.filter_by(owner_id=user.id).one_or_none() try: if host is None: host = host_create(user, user.room, host_name, user) else: host_edit(host, host.owner, user.room, host_name, user) interface_create(host, None, mac, None, user) session.session.commit() except InvalidMACAddressException: abort(400, message='Invalid mac address.') except IntegrityError: abort(400, message='Mac address is already in use.') except SubnetFullException: abort(422, message='Subnet full.') return jsonify({'success': True})
api.add_resource(ActivateNetworkAccessResource, '/user/<int:user_id>/activate-network-access')
[docs] class TerminateMembershipResource(Resource):
[docs] @use_kwargs( { "end_date": fields.Date(required=True), }, location="query", ) def get(self, user_id: int, end_date: date) -> ResponseReturnValue: """ :param user_id: The ID of the user :return: The estimated balance of the given end_date """ user = get_user_or_404(user_id) estimated_balance = estimate_balance(session.session, user, end_date) return jsonify(estimated_balance=estimated_balance)
[docs] @use_kwargs( { "end_date": fields.Date(required=True), "comment": fields.Str(required=False), }, location="form", ) def post( self, user_id: int, end_date: date, comment: str | None = None ) -> ResponseReturnValue: """ Terminate the membership on the given date :param user_id: The ID of the user :return: """ user = get_user_or_404(user_id) if membership_ending_task(user) is not None: abort(400, message="The termination of the membership has already" " been scheduled.") if not user.has_property('member'): abort(400, message="User is not a member.") move_out( user=user, comment=comment if comment is not None else "Move-out over API", processor=user, when=utc.with_min_time(end_date), end_membership=True, ) session.session.commit() return "Membership termination scheduled."
[docs] def delete(self, user_id: int) -> ResponseReturnValue: """ Cancel termination of a membership :param user_id: The ID of the user :return: """ user = get_user_or_404(user_id) task = membership_ending_task(user) if task is None: abort(400, message="There is no termination scheduled") if not user.has_property('member'): abort(400, message="User is not a member.") cancel_task(task, user) session.session.commit() return "Membership termination cancelled."
api.add_resource(TerminateMembershipResource, '/user/<int:user_id>/terminate-membership')
[docs] class ResetWifiPasswordResource(Resource):
[docs] def patch(self, user_id: int) -> ResponseReturnValue: """ Reset the wifi password :return: new password """ user = get_user_or_404(user_id) new_password = reset_wifi_password(user, user) session.session.commit() return new_password
api.add_resource(ResetWifiPasswordResource, '/user/<int:user_id>/reset-wifi-password')
[docs] class RegistrationResource(Resource):
[docs] @use_kwargs( { "first_name": fields.Str(required=True), "last_name": fields.Str(required=True), "birthdate": fields.Date(required=True), "person_id": fields.Int(required=True), "previous_dorm": fields.Str(required=False), }, location="query", ) def get( self, first_name: str, last_name: str, birthdate: date, person_id: int, previous_dorm: str | None = None, ) -> ResponseReturnValue: """ Get the newest tenancy for the supplied user data, or an error 404 if not found. Error codes no_tenancies No tenancies could be found for the supplied data no_relevant_tenancies active or future tenancies could be found no_room_for_tenancies ere are tenancies but none of them are connected to a pycroft room user_exists user with this person_id already exists similar_user_exists similar user already lives in the room """ swdd_person_id = get_swdd_person_id(first_name, last_name, birthdate) # some tenants have an additional semicolon added to their last names if swdd_person_id is None: swdd_person_id = get_swdd_person_id(first_name, last_name + ";", birthdate) if swdd_person_id is None or swdd_person_id != person_id: abort(404, message="No tenancies found for this data", code="no_tenancies") tenancies = get_relevant_tenancies(swdd_person_id) if not tenancies: abort(404, message="No active or future tenancies found", code="no_relevant_tenancies") newest_tenancy = get_first_tenancy_with_room(tenancies) if newest_tenancy is None: abort(404, message="Cannot associate a room with any tenancy", code="no_room_for_tenancies") if previous_dorm is None: if get_user_by_swdd_person_id(swdd_person_id) is not None: abort(400, message="User already exists", code="user_exists") try: name = get_name_from_first_last(first_name, last_name) check_similar_user_in_room(name, newest_tenancy.room) except UserExistsInRoomException: abort(400, message="A user with a similar name already lives in this room", code="similar_user_exists") return jsonify({ 'id': newest_tenancy.persvv_id, 'vo_suchname': newest_tenancy.vo_suchname, 'begin': newest_tenancy.mietbeginn.isoformat(), 'end': newest_tenancy.mietende.isoformat(), 'room_id': newest_tenancy.room.id, 'building': newest_tenancy.room.building.street_and_number, 'room': newest_tenancy.room.level_and_number })
[docs] @use_kwargs( { "first_name": fields.Str(required=True), "last_name": fields.Str(required=False), "birthdate": fields.Date(required=True), "email": fields.Str(required=True), "password": fields.Str(required=True), "login": fields.Str(required=True), "person_id": fields.Int(required=False), "room_id": fields.Int(required=False), "move_in_date": fields.Date(required=False), "previous_dorm": fields.Str(required=False), }, location="form", ) def post( self, first_name: str, birthdate: date, email: str, password: str, login: str, last_name: str | None = None, person_id: int | None = None, room_id: int | None = None, move_in_date: date | None = None, previous_dorm: str | None = None, ) -> int: """ Create a member request """ room = None swdd_person_id = None if room_id is not None: room = session.session.get(Room, room_id) if room is None: abort(404, message="Invalid room", code="invalid_room") if person_id is not None: swdd_person_id = get_swdd_person_id(first_name, last_name, birthdate) # some tenants have an additional semicolon added to their last names if swdd_person_id is None: swdd_person_id = get_swdd_person_id( first_name, last_name + ";", birthdate ) if swdd_person_id != person_id: abort(400, message="Person id does not match", code="person_id_mismatch") name = get_name_from_first_last(first_name, last_name) try: mr = create_member_request( name, email, password, login, birthdate, swdd_person_id, room, move_in_date, previous_dorm, ) except UserExistsException: abort(400, message="User already exists", code="user_exists") except UserExistsInRoomException: abort(400, message="A user with a similar name already lives in this room", code="similar_user_exists") except EmailTakenException: abort(400, message="E-Mail address already in use", code="email_taken") except LoginTakenException: abort(400, message="Login already in use", code="login_taken") except IllegalEmailError: abort(400, message="Illegal E-Mail address", code="email_illegal") except IllegalLoginError: abort(400, message="Illegal login", code="login_illegal") except NoTenancyForRoomException: abort(400, message="The given person has no tenancy for the room", code="no_tenancy_in_room") except MoveInDateInvalidException: abort(400, message="The move-in date is invalid", code="move_in_date_invalid") else: session.session.commit() return mr.id raise AssertionError( "unreachable" ) # the `abort`s from `flask_restful` don't return `NoReturn`
api.add_resource(RegistrationResource, '/register')
[docs] class EmailConfirmResource(Resource):
[docs] @use_kwargs( { "user_id": fields.Int(required=True), }, location="query", ) def get(self, user_id: int) -> ResponseReturnValue: user = session.session.get(User, user_id) if user is None: abort(404, message='User not found') send_confirmation_email(user) session.session.commit() return jsonify({'success': True})
[docs] @use_kwargs( { "key": fields.Str(required=True), }, location="form", ) def post(self, key: str) -> ResponseReturnValue: try: user_type, reg_result = confirm_mail_address(key) except ValueError: abort(400, message="Bad key", code="bad_key") session.session.commit() return jsonify({'type': user_type, 'reg_result': reg_result})
api.add_resource(EmailConfirmResource, '/register/confirm')
[docs] class ResetPasswordResource(Resource):
[docs] @use_kwargs( { "ident": fields.Str(required=True), "email": fields.Str(required=True), }, location="form", ) def post(self, ident: str, email: str) -> ResponseReturnValue: user = get_user_by_id_or_login(ident, email) if user is None or not user.has_property('sipa_login'): abort(404, message="Not found", code="not_found") if not send_password_reset_mail(user): abort(412, message="No contact email", code="no_contact") session.session.commit() return { 'success': True }
[docs] @use_kwargs( { "token": fields.Str(required=True), "password": fields.Str(required=True), }, location="form", ) def patch(self, token: str, password: str) -> ResponseReturnValue: if not change_password_from_token(token, password): abort(403, message="Invalid token", code="invalid_token") session.session.commit() return { 'success': True }
api.add_resource(ResetPasswordResource, '/user/reset-password')