Source code for invenio_saml.handlers
# -*- coding: utf-8 -*-
#
# Copyright (C) 2019-2024 Esteban J. Garcia Gabancho.
# Copyright (C) 2021-2022 Graz University of Technology.
#
# Invenio-SAML is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.
"""Default handlers for SSO-SAML."""
from datetime import datetime, timezone
from flask import abort, current_app
from flask_login import current_user
from flask_security import logout_user
from invenio_db import db
from invenio_oauthclient.errors import AlreadyLinkedError
from invenio_oauthclient.utils import create_csrf_disabled_registrationform, fill_form
from .invenio_accounts.utils import (
account_authenticate,
account_get_user,
account_link_external_id,
account_register,
)
from .invenio_app import get_safe_redirect_target
[docs]
def default_account_info(attributes, remote_app):
"""Return account info for remote user.
This function uses the mappings configuration variable inside your IdP
configuration.
:param attributes: (dict) dictionary of data returned by identity provider.
:param remote_app: (str) Identity provider key.
:returns: (dict) A dictionary representing user to create or update.
"""
remote_app_config = current_app.config["SSO_SAML_IDPS"][remote_app]
mappings = remote_app_config["mappings"]
name = attributes[mappings["name"]][0]
surname = attributes[mappings["surname"]][0]
email = attributes[mappings["email"]][0]
external_id = attributes[mappings["external_id"]][0]
username = (
remote_app + "-" + external_id.split("@")[0]
if "@" in external_id
else remote_app + "-" + external_id
)
return dict(
user=dict(
email=email,
profile=dict(username=username, full_name=name + " " + surname),
),
external_id=external_id,
external_method=remote_app,
active=True,
confirmed_at=(
datetime.now(timezone.utc)
if remote_app_config.get("auto_confirm", False)
else None
),
)
[docs]
def default_account_setup(user, account_info):
"""Default account setup which only links ``User`` and ``UserIdentity``."""
try:
account_link_external_id(
user,
dict(
id=account_info["external_id"], method=account_info["external_method"]
),
)
except AlreadyLinkedError:
pass
[docs]
def default_sls_handler(auth, next_url):
"""Default SLS handler which simply logs out the user."""
logout_user()
next_url = (
get_safe_redirect_target(_target=next_url)
or current_app.config["SECURITY_POST_LOGOUT_VIEW"]
)
return next_url
[docs]
def acs_handler_factory(
remote_app, account_info=default_account_info, account_setup=default_account_setup
):
"""Generate ACS handlers with an specific account info and setup functions.
.. note::
In 90% of the cases the ACS handler is going to be the same, only the
way the information is extracted and processed from the IdP will be
different.
:param remote_app: string representing the name of the identity provider.
:param account_info: callable to extract the account information from a
dict like object. ``mappings`` key is required whe using it.
This function is expected to return a dictionary similar to this:
.. code-block:: python
dict(
user=dict(
email='federico@example.com',
profile=dict(username='federico',
full_name='Federico Fernandez'),
),
external_id='12345679abcdf',
external_method='example',
active=True
)
Where ``external_id`` is the ID provided by the IdP and
``external_method`` is the name of the IdP as in the configuration
file (not mandatory but recommended).
:param account_setup: callable to setup the user account with the
corresponding IdP account information. Typically this means creating a
new row under ``UserIdentity`` and maybe extending ``g.identity``.
:return: function to be used as ACS handler
"""
def default_acs_handler(auth, next_url):
"""Default ACS handler.
:para auth: A :class:`invenio_saml.utils.SAMLAuth` instance.
:param next_url: String with the next URL to redirect to.
:return: Next URL
"""
if not current_user.is_authenticated:
current_app.logger.debug(
"Metadata received from IdP %s", auth.get_attributes()
)
_account_info = account_info(auth.get_attributes(), remote_app)
current_app.logger.debug("Metadata extracted from IdP %s", _account_info)
# TODO: signals?
user = account_get_user(_account_info)
if user is None:
form = create_csrf_disabled_registrationform(remote_app)
form = fill_form(form, _account_info["user"])
user = account_register(
form, confirmed_at=_account_info["confirmed_at"]
)
# if registration fails ... TODO: signup?
if user is None or not account_authenticate(user):
abort(401)
account_setup(user, _account_info)
db.session.commit()
next_url = (
get_safe_redirect_target(_target=next_url)
or current_app.config["SECURITY_POST_LOGIN_VIEW"]
)
return next_url
return default_acs_handler