Source code for invenio_saml.ext

# -*- coding: utf-8 -*-
#
# Copyright (C) 2019, 2022 Esteban J. Garcia Gabancho.
#
# Invenio-SAML is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.

"""Invenio module that provides SAML integration."""

import json
from collections.abc import Mapping
from functools import wraps

from flask import url_for
from onelogin.saml2.idp_metadata_parser import OneLogin_Saml2_IdPMetadataParser
from werkzeug.utils import cached_property, import_string

from . import config
from .errors import IdentityProviderNotFound
from .utils import SAMLAuth, prepare_flask_request
from .views import create_blueprint


def _default_config(idp):
    """Default IdP configuration."""
    return dict(
        settings={
            "strict": True,
            "debug": True,
            "sp": {
                "entityId": url_for("sso_saml.metadata", idp=idp, _external=True),
                "assertionConsumerService": {
                    "url": url_for("sso_saml.acs", idp=idp, _external=True),
                    "binding": "urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST",
                },
                "singleLogoutService": {
                    "url": url_for("sso_saml.sls", idp=idp, _external=True),
                    "binding": "urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect",
                },
                "NameIDFormat": "urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect",
                "x509cert": "",
                "privateKey": "",
            },
            "idp": {
                "entityId": None,
                "singleSignOnService": {
                    "url": None,
                    "binding": "urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect",
                },
                "singleLogoutService": {
                    "url": None,
                    "binding": "urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect",
                },
                "x509cert": None,
            },
            "security": {
                "authnRequestsSigned": False,
                "failOnAuthnContextMismatch": False,
                "logoutRequestSigned": False,
                "logoutResponseSigned": False,
                "metadataCacheDuration": None,
                "metadataValidUntil": None,
                "nameIdEncrypted": False,
                "requestedAuthnContext": True,
                "requestedAuthnContextComparison": "exact",
                "signMetadata": False,
                "signatureAlgorithm": "http://www.w3.org/2001/04/xmldsig-more#rsa-sha256",
                "wantAssertionsEncrypted": False,
                "wantAssertionsSigned": False,
                "wantMessagesSigned": False,
                "wantNameId": True,
                "wantNameIdEncrypted": False,
                "digestAlgorithm": "http://www.w3.org/2001/04/xmlenc#sha256",
            },
        },
        settings_file_path=None,
        settings_url=None,
        sp_cert_file=None,
        sp_key_file=None,
        settings_handler=None,
        login_handler=None,
        acs_handler=None,
        logout_handler=None,
        sls_handler=None,
    )


def _cached_configuration(f):
    """Cache the IdP configuration for future use."""

    @wraps(f)
    def inner(self, idp, *args, **kwargs):
        if idp not in self._saml_config:
            try:
                self._saml_config[idp] = self._build_configuration(idp)
            except KeyError as exc:
                raise IdentityProviderNotFound() from exc
        return f(self, idp, *args, **kwargs)

    return inner


class _InvenioSSOSAMLState(object):
    """Invenio SSO SAML state object."""

    def __init__(self, app):
        """Initialize state."""
        self.app = app
        self._saml_config = {}

    @property
    def url_prefix(self):
        """URL prefix from config."""
        return self.app.config["SSO_SAML_DEFAULT_BLUEPRINT_PREFIX"]

    @property
    def metadata_url(self):
        """SSO metadata URL from config."""
        return self.app.config["SSO_SAML_DEFAULT_METADATA_ROUTE"]

    @property
    def sso_url(self):
        """SSO SSO URL from config."""
        return self.app.config["SSO_SAML_DEFAULT_SSO_ROUTE"]

    @property
    def acs_url(self):
        """SSO ACS URL from config."""
        return self.app.config["SSO_SAML_DEFAULT_ACS_ROUTE"]

    @property
    def slo_url(self):
        """SSO SLO URL from config."""
        return self.app.config["SSO_SAML_DEFAULT_SLO_ROUTE"]

    @property
    def sls_url(self):
        """SSO SLS URL from config."""
        return self.app.config["SSO_SAML_DEFAULT_SLS_ROUTE"]

    @cached_property
    def prepare_flask_request(self):
        """Function to prepare flask request for OneLogin."""
        prep_func = self.app.config["SSO_SAML_PREPARE_FLASK_REQUEST_FUNCTION"]
        if isinstance(prep_func, str):
            prep_func = import_string(prep_func)
        return prep_func

    @_cached_configuration
    def get_settings(self, idp):
        """Find settings for a particular Identity Provider."""
        return self._saml_config[idp]["settings"]

    @_cached_configuration
    def get_handler(self, idp, handler):
        """Get handler for idp."""
        return self._saml_config[idp][handler]

    def get_auth(self, idp):
        """Instantiate the IdP."""
        return SAMLAuth(idp, self.get_settings(idp))

    def _build_configuration(self, idp):
        """Update default config with the ones read from configuration."""

        def update(d, u):
            for k, v in u.items():
                if isinstance(v, Mapping):
                    d[k] = update(d.get(k, {}), v)
                else:
                    d[k] = v
            return d

        def make_handler(handler, default=None):
            handler = handler if handler else default
            return (
                import_string(handler)
                if handler and isinstance(handler, str)
                else handler
            )

        config = _default_config(idp)
        update(config, self.app.config["SSO_SAML_IDPS"][idp])

        # Read IdP config from file or URL if any
        if config["settings_url"]:
            external_conf = OneLogin_Saml2_IdPMetadataParser.parse_remote(
                config["settings_url"]
            )
            config["settings"]["idp"].update(external_conf.get("idp"))

        if config["settings_file_path"]:
            with open(config["settings_file_path"], "r") as idp:
                file = config["settings_file_path"]
                # xml format
                if file.endswith(".xml"):
                    external_conf = OneLogin_Saml2_IdPMetadataParser.parse(idp.read())
                # json format
                elif file.endswith(".json"):
                    external_conf = json.loads(idp.read())
            config["settings"]["idp"].update(external_conf.get("idp"))

        # Load certificate and key
        if config["sp_cert_file"]:
            with open(config["sp_cert_file"], "r") as cf:
                cert = cf.read()
            config["settings"]["sp"]["x509cert"] = cert

        if config["sp_key_file"]:
            with open(config["sp_key_file"], "r") as cf:
                cert = cf.read()
            config["settings"]["sp"]["privateKey"] = cert

        # Import handlers is present
        config["settings_handler"] = make_handler(
            config["settings_handler"],
            self.app.config.get("SSO_SAML_DEFAULT_SETTINGS_HANDLER"),
        )
        config["login_handler"] = make_handler(
            config["login_handler"],
            self.app.config.get("SSO_SAML_DEFAULT_LOGIN_HANDLER"),
        )
        config["logout_handler"] = make_handler(
            config["logout_handler"],
            self.app.config.get("SSO_SAML_DEFAULT_LOGOUT_HANDLER"),
        )
        config["acs_handler"] = make_handler(
            config["acs_handler"],
            self.app.config.get("SSO_SAML_DEFAULT_ACS_HANDLER"),
        )
        config["sls_handler"] = make_handler(
            config["sls_handler"],
            self.app.config.get("SSO_SAML_DEFAULT_SLS_HANDLER"),
        )

        return config


[docs] class InvenioSSOSAML(object): """Invenio-SSO-SAML extension.""" def __init__(self, app=None): """Extension initialization.""" if app: self.init_app(app)
[docs] def init_app(self, app): """Flask application initialization.""" self.init_config(app) state = _InvenioSSOSAMLState(app) # Register blueprint and routes app.register_blueprint(create_blueprint(state, __name__)) app.extensions["invenio-sso-saml"] = state return state
[docs] def init_config(self, app): """Initialize configuration.""" # Only load the configuration, we'll build it lazily because we can for k in dir(config): if k.startswith("SSO_SAML_"): app.config.setdefault(k, getattr(config, k)) # Set default when needed app.config.setdefault( "SSO_SAML_PREPARE_FLASK_REQUEST_FUNCTION", prepare_flask_request )