diff --git a/doc/server/contents/conf.rst b/doc/server/contents/conf.rst index d34503a3..cdf6d81c 100644 --- a/doc/server/contents/conf.rst +++ b/doc/server/contents/conf.rst @@ -411,6 +411,19 @@ An example:: ] } }, + "revocation": { + "path": "revoke", + "class": "idpyoidc.server.oauth2.revocation.Revocation", + "kwargs": { + "client_authn_method": [ + "client_secret_post", + "client_secret_basic", + "client_secret_jwt", + "private_key_jwt", + "bearer_header" + ] + } + }, "end_session": { "path": "session", "class": "idpyoidc.server.oidc.session.Session", @@ -874,6 +887,12 @@ For example:: return request +================== +Client Credentials +================== +There are two possible ways to enable Client Credentials in OIDC-OP, globally and per-client. +For both cases the the Client Credentials handler is enabled throught the `client_credentials` +dictionary in token's `grant_types_supported`. ================================== idpyoidc\.server\.configure module @@ -948,3 +967,102 @@ For example:: return request +============== +Token revocation +============== + +In order to enable the token revocation endpoint a dictionary with key `token_revocation` should be placed +under the `endpoint` key of the configuration. + +If present, the token revocation configuration should contain a `policy` dictionary +that defines the behaviour for each token type. Each token type +is mapped to a dictionary with the keys `callable` (mandatory), which must be a +python callable or a string that represents the path to a python callable, and +`kwargs` (optional), which must be a dict of key-value arguments that will be +passed to the callable. + +The key `""` represents a fallback policy that will be used if the token +type can't be found. If a token type is defined in the `policy` but is +not in the `token_types_supported` list then it is ignored. + +"token_revocation": { + "path": "revoke", + "class": "idpyoidc.server.oauth2.token_revocation.TokenRevocation", + "kwargs": { + "token_types_supported": ["access_token"], + "client_authn_method": [ + "client_secret_post", + "client_secret_basic", + "client_secret_jwt", + "private_key_jwt", + "bearer_header" + ], + "policy": { + "urn:ietf:params:oauth:token-type:access_token": { + "callable": "/path/to/callable", + "kwargs": { + "audience": ["https://example.com"], + "scopes": ["openid"] + } + }, + "urn:ietf:params:oauth:token-type:refresh_token": { + "callable": "/path/to/callable", + "kwargs": { + "resource": ["https://example.com"], + "scopes": ["openid"] + } + }, + "": { + "callable": "/path/to/callable", + "kwargs": { + "scopes": ["openid"] + } + } + } + } +} + +For the per-client configuration a similar configuration scheme should be present in the client's +metadata under the `token_revocation` key. + +For example:: + + "token_revocation":{ + "token_types_supported": ["access_token"], + "policy": { + "urn:ietf:params:oauth:token-type:access_token": { + "callable": "/path/to/callable", + "kwargs": { + "audience": ["https://example.com"], + "scopes": ["openid"] + } + }, + "urn:ietf:params:oauth:token-type:refresh_token": { + "callable": "/path/to/callable", + "kwargs": { + "resource": ["https://example.com"], + "scopes": ["openid"] + } + }, + "": { + "callable": "/path/to/callable", + "kwargs": { + "scopes": ["openid"] + } + } + } + } + } + +The policy callable accepts a specific argument list and handles the revocation appropriately and returns +an :py:class:`idpyoidc.message.oauth2..TokenRevocationResponse` or raises an exception. + +For example:: + + def custom_token_revocation_policy(token, session_info, **kwargs): + if some_condition: + return TokenErrorResponse( + error="invalid_request", error_description="Some error occured" + ) + response_args = {"response_args": {}} + return oauth2.TokenRevocationResponse(**response_args) diff --git a/doc/server/contents/examples.rst b/doc/server/contents/examples.rst index 57ee0b70..6130da26 100644 --- a/doc/server/contents/examples.rst +++ b/doc/server/contents/examples.rst @@ -159,3 +159,60 @@ The [RFC-8693](https://datatracker.ietf.org/doc/html/rfc8693) describes the `aud defines the authorized targets of a token exchange request. If `subject_token = urn:ietf:params:oauth:token-type:refresh_token` then `audience` should not be included in the token exchange request. + +Revocation endpoint +---------------------- + +Here an example about how to use a idpyoidc OP revocation endpoint. +This example uses a client with an HTTP Basic Authentication:: + + import base64 + import requests + + TOKEN = "eyJhbGciOiJFUzI1NiIsImtpZCI6IlQwZGZTM1ZVYUcxS1ZubG9VVTQwUXpJMlMyMHpjSHBRYlMxdGIzZ3hZVWhCYzNGaFZWTlpTbWhMTUEifQ.eyJzY29wZSI6IFsib3BlbmlkIiwgInByb2ZpbGUiLCAiZW1haWwiLCAiYWRkcmVzcyIsICJwaG9uZSJdLCAiYXVkIjogWyJvTHlSajdzSkozWHZBWWplRENlOHJRIl0sICJqdGkiOiAiOWQzMjkzYjZiYmNjMTFlYmEzMmU5ODU0MWIwNzE1ZWQiLCAiY2xpZW50X2lkIjogIm9MeVJqN3NKSjNYdkFZamVEQ2U4clEiLCAic3ViIjogIm9MeVJqN3NKSjNYdkFZamVEQ2U4clEiLCAic2lkIjogIlowRkJRVUZCUW1keGJIVlpkRVJKYkZaUFkxQldaa0pQVUVGc1pHOUtWWFZ3VFdkZmVEY3diMVprYmpSamRrNXRMVzB4YTNnelExOHlRbHBHYTNRNVRHZEdUUzF1UW1sMlkzVnhjRE5sUm01dFRFSmxabGRXYVhJeFpFdHVSV2xtUzBKcExWTmFaRzV3VjJodU0yNXlSbTU0U1ZWVWRrWTRRM2x2UWs1TlpVUk9SazlGVlVsRWRteGhjWGx2UWxWRFdubG9WbTFvZGpORlVUSnBkaTFaUTFCcFptZFRabWRDVWt0YVNuaGtOalZCWVhkcGJFNXpaV2xOTTFCMk0yaE1jMDV0ZGxsUlRFc3dObWxsYUcxa1lrTkhkemhuU25OaWFWZE1kVUZzZDBwWFdWbzFiRWhEZFhGTFFXWTBPVzl5VjJOUk4zaGtPRDA9IiwgInR0eXBlIjogIlQiLCAiaXNzIjogImh0dHBzOi8vMTI3LjAuMC4xOjgwMDAiLCAiaWF0IjogMTYyMTc3NzMwNSwgImV4cCI6IDE2MjE3ODA5MDV9.pVqxUNznsoZu9ND18IEMJIHDOT6_HxzoFiTLsniNdbAdXTuOoiaKeRTqtDyjT9WuUPszdHkVjt5xxeFX8gQMuA" + + data = { + 'token': TOKEN, + 'token_type_hint': 'access_token' + } + + _basic_secret = base64.b64encode( + f'{"oLyRj7sJJ3XvAYjeDCe8rQ"}:{"53fb49f2a6501ec775355c89750dc416744a3253138d5a04e409b313"}'.encode() + ) + headers = { + 'Authorization': f"Basic {_basic_secret.decode()}" + } + + requests.post('https://127.0.0.1:8000/revoke', verify=False, data=data, headers=headers) + + +The idpyoidc OP will return a HTTP 200 response containing an empty json. + +Client Credentials +-------------- + +Here an example about how a client can request a new access token using only its credentials:: + + import requests + + CLIENT_ID="" + CLIENT_SECRET="" + + data = { + "grant_type" : "client_credentials", + "client_id" : f"{CLIENT_ID}", + "client_secret" : f"{CLIENT_SECRET}" + } + headers = {'Content-Type': "application/x-www-form-urlencoded" } + response = requests.post( + 'https://example.com/OIDC/token', verify=False, data=data, headers=headers + ) + +The idpyoidc OP will return a json response like this:: + + { + "access_token": "eyJhbGciOiJFUzI1NiIsI...Bo6aQcOKEN-1U88jjKxLb-9Q", + "token_type": "example", + "expires_in": 86400 + } + diff --git a/src/idpyoidc/message/oauth2/__init__.py b/src/idpyoidc/message/oauth2/__init__.py index 86494e68..f11d7f75 100644 --- a/src/idpyoidc/message/oauth2/__init__.py +++ b/src/idpyoidc/message/oauth2/__init__.py @@ -314,6 +314,30 @@ class TokenIntrospectionResponse(Message): "jti": SINGLE_OPTIONAL_STRING, } +# RFC 7009 +class TokenRevocationRequest(Message): + c_param = { + "token": SINGLE_REQUIRED_STRING, + "token_type_hint": SINGLE_OPTIONAL_STRING, + # The ones below are part of authentication information + "client_id": SINGLE_OPTIONAL_STRING, + "client_secret": SINGLE_OPTIONAL_STRING, + } + + +class TokenRevocationResponse(Message): + pass + +class TokenRevocationErrorResponse(ResponseMessage): + """ + Error response from the revocation endpoint + """ + c_allowed_values = ResponseMessage.c_allowed_values.copy() + c_allowed_values = { + "error": [ + "unsupported_token_type" + ] + } # RFC 8693 class TokenExchangeRequest(Message): diff --git a/src/idpyoidc/server/client_authn.py b/src/idpyoidc/server/client_authn.py index cd3e2c2d..fd5d8129 100755 --- a/src/idpyoidc/server/client_authn.py +++ b/src/idpyoidc/server/client_authn.py @@ -539,7 +539,6 @@ def verify_client( auth_info = {} continue break - # store what authn method was used if "method" in auth_info and client_id: _request_type = request.__class__.__name__ @@ -548,10 +547,10 @@ def verify_client( endpoint_context.cdb[client_id]["auth_method"][_request_type] = auth_info["method"] else: endpoint_context.cdb[client_id]["auth_method"] = {_request_type: auth_info["method"]} - return auth_info + def client_auth_setup(server_get, auth_set=None): if auth_set is None: auth_set = CLIENT_AUTHN_METHOD diff --git a/src/idpyoidc/server/endpoint.py b/src/idpyoidc/server/endpoint.py index 2167285f..964e396f 100755 --- a/src/idpyoidc/server/endpoint.py +++ b/src/idpyoidc/server/endpoint.py @@ -254,6 +254,8 @@ def client_authentication(self, request: Message, http_info: Optional[dict] = No raise UnAuthorizedClient("Authorization failed") if "client_id" not in authn_info and authn_info.get("method") != "none": raise UnAuthorizedClient("Authorization failed") + if "client_id" not in authn_info and authn_info.get("method") != "none": + raise UnAuthorizedClient("Authorization failed") return authn_info def do_post_parse_request( diff --git a/src/idpyoidc/server/oauth2/introspection.py b/src/idpyoidc/server/oauth2/introspection.py index 11b29cca..1adf8632 100644 --- a/src/idpyoidc/server/oauth2/introspection.py +++ b/src/idpyoidc/server/oauth2/introspection.py @@ -39,7 +39,6 @@ def _introspect(self, token, client_id, grant): return None if not token.is_active(): - # return None scope = token.scope diff --git a/src/idpyoidc/server/oauth2/token.py b/src/idpyoidc/server/oauth2/token.py index b23fa03c..c44c85c0 100755 --- a/src/idpyoidc/server/oauth2/token.py +++ b/src/idpyoidc/server/oauth2/token.py @@ -5,7 +5,7 @@ from cryptojwt.jwe.exception import JWEException from idpyoidc.message import Message -from idpyoidc.message.oauth2 import AccessTokenResponse +from idpyoidc.message.oauth2 import AccessTokenResponse, CCAccessTokenRequest from idpyoidc.message.oauth2 import ResponseMessage from idpyoidc.message.oauth2 import TokenExchangeRequest from idpyoidc.message.oidc import TokenErrorResponse @@ -13,6 +13,7 @@ from idpyoidc.server.endpoint import Endpoint from idpyoidc.server.exception import ProcessError from idpyoidc.server.oauth2.token_helper import AccessTokenHelper +from idpyoidc.server.oauth2.token_helper import CCAccessTokenHelper from idpyoidc.server.oauth2.token_helper import RefreshTokenHelper from idpyoidc.server.oauth2.token_helper import TokenExchangeHelper from idpyoidc.server.session import MintingNotAllowed @@ -35,6 +36,7 @@ class Token(Endpoint): default_capabilities = {"token_endpoint_auth_signing_alg_values_supported": None} helper_by_grant_type = { "authorization_code": AccessTokenHelper, + "client_credentials": CCAccessTokenHelper, "refresh_token": RefreshTokenHelper, } diff --git a/src/idpyoidc/server/oauth2/token_helper.py b/src/idpyoidc/server/oauth2/token_helper.py index 6a8e05aa..f621aede 100755 --- a/src/idpyoidc/server/oauth2/token_helper.py +++ b/src/idpyoidc/server/oauth2/token_helper.py @@ -9,12 +9,13 @@ from idpyoidc.exception import MissingRequiredAttribute from idpyoidc.exception import MissingRequiredValue from idpyoidc.message import Message -from idpyoidc.message.oauth2 import TokenExchangeRequest +from idpyoidc.message.oauth2 import CCAccessTokenRequest, TokenExchangeRequest from idpyoidc.message.oauth2 import TokenExchangeResponse from idpyoidc.message.oidc import RefreshAccessTokenRequest from idpyoidc.message.oidc import TokenErrorResponse from idpyoidc.server.constant import DEFAULT_REQUESTED_TOKEN_TYPE from idpyoidc.server.constant import DEFAULT_TOKEN_LIFETIME +from idpyoidc.server.authn_event import create_authn_event from idpyoidc.server.exception import ToOld from idpyoidc.server.exception import UnAuthorizedClientScope from idpyoidc.server.oauth2.authorization import check_unknown_scopes_policy @@ -337,6 +338,103 @@ def post_parse_request( return request +class CCAccessTokenHelper(TokenEndpointHelper): + def process_request(self, req: Union[Message, dict], **kwargs): + """ + :param req: + :param kwargs: + :return: + """ + _context = self.endpoint.server_get("endpoint_context") + _mngr = _context.session_manager + logger.debug("Access Token") + + _token_usage_rules = _context.authz.usage_rules(req["client_id"]) + _authn_event = create_authn_event( + req["client_id"] + ) + sid = _mngr.create_session( + authn_event=_authn_event, + auth_req=req, + user_id=req["client_id"], + client_id=req["client_id"], + token_usage_rules=_token_usage_rules, + scopes=req.get("scope", []), + ) + + try: + _session_info = _mngr.get_session_info(session_id=sid, grant=True) + except Exception: + logger.error("Error retrieving token exchange session information") + return self.error_cls( + error="server_error", error_description="Internal server error" + ) + + client_id = _session_info["client_id"] + if client_id != req["client_id"]: + logger.debug("{} owner of token".format(client_id)) + logger.warning("Client using token it was not given") + return self.error_cls(error="invalid_grant", error_description="Wrong client") + + grant = _session_info["grant"] + _authn_req = grant.authorization_request + + # If redirect_uri was in the initial authorization request + # verify that the one given here is the correct one. + if "redirect_uri" in _authn_req: + if req["redirect_uri"] != _authn_req["redirect_uri"]: + return self.error_cls( + error="invalid_request", error_description="redirect_uri mismatch" + ) + + logger.debug("All checks OK") + + token_type = "Bearer" + # Is DPOP supported + if "dpop_signing_alg_values_supported" in _context.provider_info: + _dpop_jkt = req.get("dpop_jkt") + if _dpop_jkt: + grant.extra["dpop_jkt"] = _dpop_jkt + token_type = "DPoP" + + _response = { + "token_type": token_type, + "scope": grant.scope, + } + + try: + token = self._mint_token( + token_class="access_token", + grant=grant, + session_id=_session_info["session_id"], + client_id=_session_info["client_id"], + token_type=token_type + ) + except MintingNotAllowed as err: + logger.warning(err) + else: + _response["access_token"] = token.value + if token.expires_at: + _response["expires_in"] = token.expires_at - utc_time_sans_frac() + + return _response + + def post_parse_request( + self, request: Union[Message, dict], client_id: Optional[str] = "", **kwargs + ): + """ + This is where clients come to get their access tokens + + :param request: The request + :param client_id: Client identifier + :returns: + """ + request = CCAccessTokenRequest(**request.to_dict()) + logger.debug("%s: %s" % (request.__class__.__name__, sanitize(request))) + + return request + + class RefreshTokenHelper(TokenEndpointHelper): def process_request(self, req: Union[Message, dict], **kwargs): _context = self.endpoint.server_get("endpoint_context") @@ -788,8 +886,7 @@ def validate_token_exchange_policy(request, context, subject_token, **kwargs): if "offline_access" not in subject_token.scope: return TokenErrorResponse( error="invalid_request", - error_description=f"Exchange {request['subject_token_type']} to refresh token " - f"forbidden", + error_description=f"Exchange {request['subject_token_type']} to refresh token forbidden", ) scopes = request.get("scope", subject_token.scope) diff --git a/src/idpyoidc/server/oauth2/token_revocation.py b/src/idpyoidc/server/oauth2/token_revocation.py new file mode 100644 index 00000000..076927b5 --- /dev/null +++ b/src/idpyoidc/server/oauth2/token_revocation.py @@ -0,0 +1,133 @@ +"""Implements RFC7009""" +import logging +from typing import Optional + +from idpyoidc.exception import ImproperlyConfigured +from idpyoidc.message import oauth2 +from idpyoidc.server.endpoint import Endpoint +from idpyoidc.server.token.exception import UnknownToken +from idpyoidc.server.token.exception import WrongTokenClass + +from idpyoidc.util import importer + +logger = logging.getLogger(__name__) + + +class TokenRevocation(Endpoint): + """Implements RFC7009""" + + request_cls = oauth2.TokenRevocationRequest + response_cls = oauth2.TokenRevocationResponse + error_cls = oauth2.TokenRevocationErrorResponse + request_format = "urlencoded" + response_format = "json" + endpoint_name = "revocation_endpoint" + name = "token_revocation" + default_capabilities = { + "client_authn_method": [ + "client_secret_basic", + "client_secret_post", + "client_secret_jwt", + "bearer_header", + "private_key_jwt", + ] + } + + token_types_supported = ["authorization_code", "access_token", "refresh_token"] + + def __init__(self, server_get, **kwargs): + Endpoint.__init__(self, server_get, **kwargs) + self.token_revocation_kwargs = kwargs + + def get_client_id_from_token(self, endpoint_context, token, request=None): + _info = endpoint_context.session_manager.get_session_info_by_token( + token, handler_key="access_token" + ) + return _info["client_id"] + + def process_request(self, request=None, **kwargs): + """ + :param request: The revocation request as a dictionary + :param kwargs: + :return: + """ + _revoke_request = self.request_cls(**request) + if "error" in _revoke_request: + return _revoke_request + + request_token = _revoke_request["token"] + _resp = self.response_cls() + _context = self.server_get("endpoint_context") + logger.debug("Token Revocation") + + try: + _session_info = _context.session_manager.get_session_info_by_token( + request_token, grant=True + ) + except (UnknownToken, WrongTokenClass): + return {"response_args": _resp} + + client_id = _session_info["client_id"] + if client_id != _revoke_request["client_id"]: + logger.debug("{} owner of token".format(client_id)) + logger.warning("Client using token it was not given") + return self.error_cls(error="invalid_grant", error_description="Wrong client") + + grant = _session_info["grant"] + _token = grant.get_token(request_token) + + try: + self.token_types_supported = _context.cdb[client_id]["token_revocation"]["token_types_supported"] + except: + self.token_types_supported = self.token_revocation_kwargs.get("token_types_supported", self.token_types_supported) + + try: + self.policy = _context.cdb[client_id]["token_revocation"]["policy"] + except: + self.policy = self.token_revocation_kwargs.get("policy", {"": {"callable": validate_token_revocation_policy}}) + + if _token.token_class not in self.token_types_supported: + desc = ( + "The authorization server does not support the revocation of " + "the presented token type. That is, the client tried to revoke an access " + "token on a server not supporting this feature." + ) + return self.error_cls(error="unsupported_token_type", error_description=desc) + + return self._revoke(_revoke_request, _session_info) + + def _revoke(self, request, session_info): + _context = self.server_get("endpoint_context") + _mngr = _context.session_manager + _token = _mngr.find_token(session_info["branch_id"], request["token"]) + + _cls = _token.token_class + if _cls not in self.policy: + _cls = "" + + temp_policy = self.policy[_cls] + callable = temp_policy["callable"] + kwargs = temp_policy.get("kwargs", {}) + + if isinstance(callable, str): + try: + fn = importer(callable) + except Exception: + raise ImproperlyConfigured(f"Error importing {callable} policy callable") + else: + fn = callable + + try: + return fn(_token, session_info=session_info, **kwargs) + except Exception as e: + logger.error(f"Error while executing the {fn} policy callable: {e}") + return self.error_cls(error="server_error", error_description="Internal server error") + + +def validate_token_revocation_policy(token, session_info, **kwargs): + _token = token + _token.revoke() + + response_args = {"response_args": {}} + return oauth2.TokenRevocationResponse(**response_args) + diff --git a/src/idpyoidc/server/oidc/add_on/pkce.py b/src/idpyoidc/server/oidc/add_on/pkce.py index 298b0ac7..495363b1 100644 --- a/src/idpyoidc/server/oidc/add_on/pkce.py +++ b/src/idpyoidc/server/oidc/add_on/pkce.py @@ -4,7 +4,7 @@ from cryptojwt.utils import b64e -from idpyoidc.message.oauth2 import AuthorizationErrorResponse +from idpyoidc.message.oauth2 import AuthorizationErrorResponse, CCAccessTokenRequest from idpyoidc.message.oauth2 import RefreshAccessTokenRequest from idpyoidc.message.oauth2 import TokenExchangeRequest from idpyoidc.message.oidc import TokenErrorResponse @@ -93,7 +93,7 @@ def post_token_parse(request, client_id, endpoint_context, **kwargs): """ if isinstance( request, - (AuthorizationErrorResponse, RefreshAccessTokenRequest, TokenExchangeRequest), + (AuthorizationErrorResponse, RefreshAccessTokenRequest, TokenExchangeRequest, CCAccessTokenRequest), ): return request diff --git a/src/idpyoidc/server/oidc/token_helper.py b/src/idpyoidc/server/oidc/token_helper.py index d319b9e2..34603036 100755 --- a/src/idpyoidc/server/oidc/token_helper.py +++ b/src/idpyoidc/server/oidc/token_helper.py @@ -122,7 +122,6 @@ def process_request(self, req: Union[Message, dict], **kwargs): if ( issue_refresh and "refresh_token" in _supports_minting - and "refresh_token" in grant_types_supported ): try: refresh_token = self._mint_token( @@ -237,7 +236,7 @@ def process_request(self, req: Union[Message, dict], **kwargs): token_type = "DPoP" token = _grant.get_token(token_value) - scope = _grant.find_scope(token.based_on) + scope = _grant.find_scope(token) if "scope" in req: scope = req["scope"] access_token = self._mint_token( diff --git a/src/idpyoidc/server/oidc/userinfo.py b/src/idpyoidc/server/oidc/userinfo.py index ae6e87b5..ccbaf8ec 100755 --- a/src/idpyoidc/server/oidc/userinfo.py +++ b/src/idpyoidc/server/oidc/userinfo.py @@ -156,7 +156,10 @@ def process_request(self, request=None, **kwargs): _session_info["user_id"], claims_restriction=_claims_restriction ) info["sub"] = _grant.sub - if _grant.add_acr_value("userinfo"): + if ( + "authn_info" in _grant.authentication_event + and _grant.add_acr_value("userinfo") + ): info["acr"] = _grant.authentication_event["authn_info"] else: info = { diff --git a/src/idpyoidc/server/session/grant.py b/src/idpyoidc/server/session/grant.py index 761991fe..064b3265 100644 --- a/src/idpyoidc/server/session/grant.py +++ b/src/idpyoidc/server/session/grant.py @@ -211,7 +211,7 @@ def payload_arguments( payload[_out] = _val payload["jti"] = uuid1().hex - + if scope is None: scope = self.scope payload["scope"] = scope @@ -228,6 +228,9 @@ def payload_arguments( if client_id: payload.update({"client_id": client_id, "sub": self.sub}) + if "scope" not in payload: + payload["scope"] = self.scope + if item.claims: _claims_restriction = item.claims else: @@ -599,4 +602,5 @@ def payload_arguments( elif self.add_acr_value(secondary_identifier): payload["acr"] = self.authentication_event["authn_info"] + return payload return payload \ No newline at end of file diff --git a/src/idpyoidc/server/session/manager.py b/src/idpyoidc/server/session/manager.py index 42bf5a5d..8c3f90f8 100644 --- a/src/idpyoidc/server/session/manager.py +++ b/src/idpyoidc/server/session/manager.py @@ -231,7 +231,7 @@ def create_exchange_grant( scopes: Optional[list] = None, ) -> str: """ - + :param scopes: Scopes :param exchange_req: :param user_id: @@ -239,9 +239,9 @@ def create_exchange_grant( :param sub_type: :return: """ - return self.add_exchange_grant( authentication_event=original_grant.authentication_event, + authorization_request=original_grant.authorization_request, exchange_request=exchange_request, original_branch_id=original_session_id, diff --git a/tests/test_server_16_endpoint.py b/tests/test_server_16_endpoint.py index 74002c0f..04a0cb32 100755 --- a/tests/test_server_16_endpoint.py +++ b/tests/test_server_16_endpoint.py @@ -20,7 +20,8 @@ {"type": "EC", "crv": "P-256", "use": ["sig"]}, ] -REQ = Message(foo="bar", hej="hopp", client_id="client_id") +CLIENT_ID = "client_id" +REQ = Message(foo="bar", hej="hopp", client_id=CLIENT_ID) EXAMPLE_MSG = { "name": "Jane Doe", @@ -78,6 +79,7 @@ def create_endpoint(self): server.endpoint_context.cdb["client_id"] = {} self.endpoint_context = server.endpoint_context + self.endpoint_context.cdb[CLIENT_ID] = {} _endpoints = do_endpoints(conf, server.server_get) self.endpoint = _endpoints[""] diff --git a/tests/test_server_17_client_authn.py b/tests/test_server_17_client_authn.py index d42a2325..0bdea61b 100644 --- a/tests/test_server_17_client_authn.py +++ b/tests/test_server_17_client_authn.py @@ -564,14 +564,14 @@ def test_verify_client_bearer_body(self): assert set(res.keys()) == {"token", "method", "client_id"} assert res["method"] == "bearer_body" - # def test_verify_client_client_secret_post(self): - # request = {"client_id": client_id, "client_secret": client_secret} - # res = verify_client( - # self.endpoint_context, request, endpoint=self.server.server_get("endpoint", - # "endpoint_1"), - # ) - # assert set(res.keys()) == {"method", "client_id"} - # assert res["method"] == "client_secret_post" + def test_verify_client_client_secret_post(self): + request = {"client_id": client_id, "client_secret": client_secret} + res = verify_client( + self.endpoint_context, request, endpoint=self.server.server_get("endpoint", + "endpoint_1"), + ) + assert set(res.keys()) == {"method", "client_id"} + assert res["method"] == "client_secret_post" def test_verify_client_client_secret_basic(self): _token = "{}:{}".format(client_id, client_secret) diff --git a/tests/test_server_20d_client_authn.py b/tests/test_server_20d_client_authn.py index 55ab886c..a63ff1d8 100755 --- a/tests/test_server_20d_client_authn.py +++ b/tests/test_server_20d_client_authn.py @@ -463,7 +463,7 @@ def test_verify_per_client_per_endpoint(self): endpoint=self.server.server_get("endpoint", "token"), ) assert res == {} - + request = {"client_id": client_id, "client_secret": client_secret} res = verify_client( self.endpoint_context, diff --git a/tests/test_server_23_oidc_registration_endpoint.py b/tests/test_server_23_oidc_registration_endpoint.py index 64cb2a1b..6a2934d4 100755 --- a/tests/test_server_23_oidc_registration_endpoint.py +++ b/tests/test_server_23_oidc_registration_endpoint.py @@ -49,6 +49,7 @@ MSG = { "client_id": "client_id", "application_type": "web", + "client_id": "client_id", "redirect_uris": [ "https://client.example.org/callback", "https://client.example.org/callback2", diff --git a/tests/test_server_31_oauth2_introspection.py b/tests/test_server_31_oauth2_introspection.py index 04748917..366d417f 100644 --- a/tests/test_server_31_oauth2_introspection.py +++ b/tests/test_server_31_oauth2_introspection.py @@ -467,7 +467,7 @@ def mock(): monkeypatch.setattr("idpyoidc.server.token.utc_time_sans_frac", mock) _context = self.introspection_endpoint.server_get("endpoint_context") - + _req = self.introspection_endpoint.parse_request( { "token": access_token.value, diff --git a/tests/test_server_37_oauth2_client_credentials.py b/tests/test_server_37_oauth2_client_credentials.py new file mode 100644 index 00000000..dbe4f572 --- /dev/null +++ b/tests/test_server_37_oauth2_client_credentials.py @@ -0,0 +1,324 @@ +import json +import os + +import pytest +from cryptojwt import JWT +from cryptojwt.key_jar import build_keyjar + +from idpyoidc.defaults import JWT_BEARER +from idpyoidc.message.oauth2 import CCAccessTokenRequest +from idpyoidc.message.oidc import AuthorizationRequest +from idpyoidc.message.oidc import RefreshAccessTokenRequest +from idpyoidc.message.oidc import TokenErrorResponse +from idpyoidc.server import Server +from idpyoidc.server.authn_event import create_authn_event +from idpyoidc.server.authz import AuthzHandling +from idpyoidc.server.client_authn import verify_client +from idpyoidc.server.configure import ASConfiguration +from idpyoidc.server.exception import InvalidToken +from idpyoidc.server.oauth2.authorization import Authorization +from idpyoidc.server.oauth2.introspection import Introspection +from idpyoidc.server.oauth2.token import Token +from idpyoidc.server.session import MintingNotAllowed +from idpyoidc.server.user_authn.authn_context import INTERNETPROTOCOLPASSWORD +from idpyoidc.server.user_info import UserInfo +from idpyoidc.time_util import utc_time_sans_frac +from tests import CRYPT_CONFIG +from tests import SESSION_PARAMS + +KEYDEFS = [ + {"type": "RSA", "key": "", "use": ["sig"]}, + {"type": "EC", "crv": "P-256", "use": ["sig"]}, +] + +CLIENT_KEYJAR = build_keyjar(KEYDEFS) + +RESPONSE_TYPES_SUPPORTED = [ + ["code"], + ["token"], +] + +CAPABILITIES = { + "grant_types_supported": [ + "authorization_code", + "implicit", + "urn:ietf:params:oauth:grant-type:jwt-bearer", + "refresh_token", + "client_credentials" + ], +} + +TOKEN_REQ = CCAccessTokenRequest( + client_id="client_1", + grant_type="client_credentials", + client_secret="hemligt", + scope="" +) + +TOKEN_REQ_DICT = TOKEN_REQ.to_dict() + +BASEDIR = os.path.abspath(os.path.dirname(__file__)) + + +def full_path(local_file): + return os.path.join(BASEDIR, local_file) + + +@pytest.fixture +def conf(): + return { + "issuer": "https://example.com/", + "httpc_params": {"verify": False}, + "capabilities": CAPABILITIES, + "keys": {"uri_path": "jwks.json", "key_defs": KEYDEFS}, + "token_handler_args": { + "jwks_file": "private/token_jwks.json", + "code": {"lifetime": 600, "kwargs": {"crypt_conf": CRYPT_CONFIG}}, + "token": { + "class": "idpyoidc.server.token.jwt_token.JWTToken", + "kwargs": { + "lifetime": 3600, + "add_claims_by_scope": True, + "aud": ["https://example.org/appl"], + }, + }, + "refresh": { + "class": "idpyoidc.server.token.jwt_token.JWTToken", + "kwargs": { + "lifetime": 3600, + "aud": ["https://example.org/appl"], + }, + }, + }, + "endpoint": { + "authorization": { + "path": "authorization", + "class": Authorization, + "kwargs": {}, + }, + "introspection": { + "path": "{}/intro", + "class": Introspection, + "kwargs": { + "client_authn_method": ["client_secret_post"], + "enable_claims_per_client": False, + }, + }, + "token": { + "path": "token", + "class": Token, + "kwargs": { + "client_authn_method": [ + "client_secret_basic", + "client_secret_post", + "client_secret_jwt", + "private_key_jwt", + ] + }, + }, + }, + "authentication": { + "anon": { + "acr": INTERNETPROTOCOLPASSWORD, + "class": "idpyoidc.server.user_authn.user.NoAuthn", + "kwargs": {"user": "diana"}, + } + }, + "userinfo": {"class": UserInfo, "kwargs": {"db": {}}}, + "client_authn": verify_client, + "template_dir": "template", + "claims_interface": { + "class": "idpyoidc.server.session.claims.OAuth2ClaimsInterface", + "kwargs": {}, + }, + "authz": { + "class": AuthzHandling, + "kwargs": { + "grant_config": { + "usage_rules": { + "authorization_code": { + "expires_in": 300, + "supports_minting": ["access_token", "refresh_token"], + "max_usage": 1, + }, + "access_token": {"expires_in": 600}, + "refresh_token": { + "expires_in": 86400, + "supports_minting": ["access_token", "refresh_token"], + }, + }, + "expires_in": 43200, + } + }, + }, + "session_params": {"encrypter": SESSION_PARAMS}, + } + + +class TestEndpoint(object): + @pytest.fixture(autouse=True) + def create_endpoint(self, conf): + server = Server(ASConfiguration(conf=conf, base_path=BASEDIR), cwd=BASEDIR) + endpoint_context = server.endpoint_context + endpoint_context.cdb["client_1"] = { + "client_secret": "hemligt", + "redirect_uris": [("https://example.com/cb", None)], + "client_salt": "salted", + "endpoint_auth_method": "client_secret_post", + "response_types": ["code", "token"], + } + endpoint_context.keyjar.import_jwks(CLIENT_KEYJAR.export_jwks(), "client_1") + self.session_manager = endpoint_context.session_manager + self.token_endpoint = server.server_get("endpoint", "token") + self.introspection_endpoint = server.server_get("endpoint", "introspection") + self.endpoint_context = endpoint_context + + def test_client_credentials(self): + """ + Test client credentials + """ + self.endpoint_context.cdb["client_1"] = { + "client_secret": "hemligt", + "redirect_uris": [("https://example.com/cb", None)], + "client_salt": "salted", + "endpoint_auth_method": "client_secret_post", + "response_types": ["code", "token"], + } + + _token_request = TOKEN_REQ_DICT.copy() + + _req = self.token_endpoint.parse_request(_token_request) + _resp = self.token_endpoint.process_request(request=_req) + + assert "access_token" in _resp["response_args"] + assert "token_type" in _resp["response_args"] + assert "expires_in" in _resp["response_args"] + + def test_client_credentials_with_scopes(self): + """ + Test client credentials grant with scopes requested. + """ + self.endpoint_context.cdb["client_1"] = { + "client_secret": "hemligt", + "redirect_uris": [("https://example.com/cb", None)], + "client_salt": "salted", + "endpoint_auth_method": "client_secret_post", + "response_types": ["code", "token"], + } + + _token_request = TOKEN_REQ_DICT.copy() + _token_request["scope"] = "profile email" + + _req = self.token_endpoint.parse_request(_token_request) + _resp = self.token_endpoint.process_request(request=_req) + + assert "access_token" in _resp["response_args"] + assert "token_type" in _resp["response_args"] + assert "expires_in" in _resp["response_args"] + assert set(_resp["response_args"]["scope"]) == set(["profile", "email"]) + + def test_client_credentials_offline_access(self): + """ + Test client credentials grant with offline scope requested. + """ + self.endpoint_context.cdb["client_1"] = { + "client_secret": "hemligt", + "redirect_uris": [("https://example.com/cb", None)], + "client_salt": "salted", + "endpoint_auth_method": "client_secret_post", + "response_types": ["code", "token"], + } + + _token_request = TOKEN_REQ_DICT.copy() + _token_request["scope"] = "offline_access" + + _req = self.token_endpoint.parse_request(_token_request) + _resp = self.token_endpoint.process_request(request=_req) + + assert "access_token" in _resp["response_args"] + assert "token_type" in _resp["response_args"] + assert "expires_in" in _resp["response_args"] + assert "refresh_token" not in _resp["response_args"] + + + def test_client_credentials_fails_if_disabled(self): + """ + Test that Client Credentials fails if it's not included in Token's + grant_types_supported (that are set in its helper attribute). + """ + del self.token_endpoint.helper["client_credentials"] + + self.endpoint_context.cdb["client_1"] = { + "client_secret": "hemligt", + "redirect_uris": [("https://example.com/cb", None)], + "client_salt": "salted", + "endpoint_auth_method": "client_secret_post", + "response_types": ["code", "token", "code id_token", "id_token"], + } + + _token_request = TOKEN_REQ_DICT.copy() + + _req = self.token_endpoint.parse_request(_token_request) + _resp = self.token_endpoint.process_request(request=_req) + + assert _resp["error"] == "invalid_request" + assert ( + _resp["error_description"] + == "Unsupported grant_type: client_credentials" + ) + + def test_client_credentials_per_client(self): + """ + Test that per-client client credentials configuration works correctly + """ + self.token_endpoint.grant_types_supported.remove("client_credentials") + self.endpoint_context.cdb["client_1"] = { + "client_secret": "hemligt", + "redirect_uris": [("https://example.com/cb", None)], + "client_salt": "salted", + "endpoint_auth_method": "client_secret_post", + "response_types": ["code", "token", "code id_token"], + "grant_types_supported": ["client_credentials"] + } + + _token_request = TOKEN_REQ_DICT.copy() + + _req = self.token_endpoint.parse_request(_token_request) + _resp = self.token_endpoint.process_request(request=_req) + + assert "access_token" in _resp["response_args"] + assert "token_type" in _resp["response_args"] + assert "expires_in" in _resp["response_args"] + + def test_introspection(self): + self.token_endpoint.grant_types_supported.remove("client_credentials") + self.endpoint_context.cdb["client_1"] = { + "client_secret": "hemligt", + "redirect_uris": [("https://example.com/cb", None)], + "client_salt": "salted", + "endpoint_auth_method": "client_secret_post", + "response_types": ["code", "token", "code id_token"], + "grant_types_supported": ["client_credentials"] + } + + _token_request = TOKEN_REQ_DICT.copy() + + _req = self.token_endpoint.parse_request(_token_request) + _resp = self.token_endpoint.process_request(request=_req) + access_token = _resp["response_args"]["access_token"] + + self.introspection_endpoint.kwargs["enable_claims_per_client"] = True + + _context = self.introspection_endpoint.server_get("endpoint_context") + _req = self.introspection_endpoint.parse_request( + { + "token": access_token, + "client_id": "client_1", + "client_secret": _context.cdb["client_1"]["client_secret"], + } + ) + _resp = self.introspection_endpoint.process_request(_req) + _resp_args = _resp["response_args"] + assert "active" in _resp_args + assert _resp_args["active"] == True + diff --git a/tests/test_server_38_oauth2_revocation_endpoint.py b/tests/test_server_38_oauth2_revocation_endpoint.py new file mode 100644 index 00000000..616d877c --- /dev/null +++ b/tests/test_server_38_oauth2_revocation_endpoint.py @@ -0,0 +1,581 @@ +import base64 +import json +import os + +import pytest +from cryptojwt import JWT +from cryptojwt import as_unicode +from cryptojwt.key_jar import build_keyjar +from cryptojwt.utils import as_bytes + +from idpyoidc.message.oauth2 import TokenRevocationRequest +from idpyoidc.message.oauth2 import TokenRevocationResponse +from idpyoidc.message.oidc import AccessTokenRequest +from idpyoidc.message.oidc import AuthorizationRequest +from idpyoidc.server import Server +from idpyoidc.server.authn_event import create_authn_event +from idpyoidc.server.authz import AuthzHandling +from idpyoidc.server.client_authn import verify_client +from idpyoidc.server.configure import ASConfiguration +from idpyoidc.server.exception import ClientAuthenticationError +from idpyoidc.server.oauth2.authorization import Authorization +from idpyoidc.server.oauth2.introspection import Introspection +from idpyoidc.server.oauth2.token_revocation import TokenRevocation +from idpyoidc.server.oauth2.token_revocation import validate_token_revocation_policy +from idpyoidc.server.oidc.token import Token +from idpyoidc.server.user_authn.authn_context import INTERNETPROTOCOLPASSWORD +from idpyoidc.server.user_info import UserInfo +from idpyoidc.time_util import utc_time_sans_frac +from tests import CRYPT_CONFIG +from tests import SESSION_PARAMS + +KEYDEFS = [ + {"type": "RSA", "key": "", "use": ["sig"]}, + {"type": "EC", "crv": "P-256", "use": ["sig"]}, +] + +RESPONSE_TYPES_SUPPORTED = [ + ["code"], + ["token"], + ["id_token"], + ["code", "token"], + ["code", "id_token"], + ["id_token", "token"], + ["code", "token", "id_token"], + ["none"], +] + +CAPABILITIES = { + "response_types_supported": [" ".join(x) for x in RESPONSE_TYPES_SUPPORTED], + "token_endpoint_auth_methods_supported": [ + "client_secret_post", + "client_secret_basic", + "client_secret_jwt", + "private_key_jwt", + ], + "response_modes_supported": ["query", "fragment", "form_post"], + "subject_types_supported": ["public", "pairwise", "ephemeral"], + "grant_types_supported": [ + "authorization_code", + "implicit", + "urn:ietf:params:oauth:grant-type:jwt-bearer", + "refresh_token", + ], + "claim_types_supported": ["normal", "aggregated", "distributed"], + "claims_parameter_supported": True, + "request_parameter_supported": True, + "request_uri_parameter_supported": True, +} + +AUTH_REQ = AuthorizationRequest( + client_id="client_1", + redirect_uri="https://example.com/cb", + scope=["openid", "offline_access"], + state="STATE", + response_type="code id_token", +) + +TOKEN_REQ = AccessTokenRequest( + client_id="client_1", + redirect_uri="https://example.com/cb", + state="STATE", + grant_type="authorization_code", + client_secret="hemligt", +) + +TOKEN_REQ_DICT = TOKEN_REQ.to_dict() + +BASEDIR = os.path.abspath(os.path.dirname(__file__)) + + +def full_path(local_file): + return os.path.join(BASEDIR, local_file) + + +@pytest.mark.parametrize("jwt_token", [True, False]) +class TestEndpoint: + @pytest.fixture(autouse=True) + def create_endpoint(self, jwt_token): + conf = { + "issuer": "https://example.com/", + "httpc_params": {"verify": False, "timeout": 1}, + "capabilities": CAPABILITIES, + "keys": {"uri_path": "jwks.json", "key_defs": KEYDEFS}, + "token_handler_args": { + "jwks_file": "private/token_jwks.json", + "code": {"lifetime": 600, "kwargs": {"crypt_conf": CRYPT_CONFIG}}, + "token": { + "class": "idpyoidc.server.token.jwt_token.JWTToken", + "kwargs": { + "lifetime": 3600, + "add_claims_by_scope": True, + "aud": ["https://example.org/appl"], + }, + }, + "refresh": { + "class": "idpyoidc.server.token.jwt_token.JWTToken", + "kwargs": { + "lifetime": 3600, + "aud": ["https://example.org/appl"], + }, + }, + "id_token": { + "class": "idpyoidc.server.token.id_token.IDToken", + }, + }, + "endpoint": { + "authorization": { + "path": "{}/authorization", + "class": Authorization, + "kwargs": {}, + }, + "introspection": { + "path": "{}/intro", + "class": Introspection, + "kwargs": { + "client_authn_method": ["client_secret_post"], + "enable_claims_per_client": False, + }, + }, + "token_revocation": { + "path": "{}/revoke", + "class": TokenRevocation, + "kwargs": { + "client_authn_method": ["client_secret_post"], + }, + }, + "token": { + "path": "token", + "class": Token, + "kwargs": { + "client_authn_method": [ + "client_secret_basic", + "client_secret_post", + "client_secret_jwt", + "private_key_jwt", + ] + }, + }, + }, + "authentication": { + "anon": { + "acr": INTERNETPROTOCOLPASSWORD, + "class": "idpyoidc.server.user_authn.user.NoAuthn", + "kwargs": {"user": "diana"}, + } + }, + "userinfo": { + "path": "{}/userinfo", + "class": UserInfo, + "kwargs": {"db_file": full_path("users.json")}, + }, + "client_authn": verify_client, + "template_dir": "template", + "authz": { + "class": AuthzHandling, + "kwargs": { + "grant_config": { + "usage_rules": { + "authorization_code": { + "supports_minting": [ + "access_token", + "refresh_token", + "id_token", + ], + "max_usage": 1, + }, + "access_token": {}, + "refresh_token": { + "supports_minting": ["access_token", "refresh_token"], + }, + }, + "expires_in": 43200, + } + }, + }, + "session_params": SESSION_PARAMS, + } + if jwt_token: + conf["token_handler_args"]["token"] = { + "class": "idpyoidc.server.token.jwt_token.JWTToken", + "kwargs": {}, + } + server = Server(ASConfiguration(conf=conf, base_path=BASEDIR), cwd=BASEDIR) + endpoint_context = server.endpoint_context + endpoint_context.cdb["client_1"] = { + "client_secret": "hemligt", + "redirect_uris": [("https://example.com/cb", None)], + "client_salt": "salted", + "token_endpoint_auth_method": "client_secret_post", + "response_types": ["code", "token", "code id_token", "id_token"], + "add_claims": { + "always": { + "introspection": ["nickname", "eduperson_scoped_affiliation"], + }, + "by_scope": {}, + }, + "allowed_scopes": ["openid", "profile", "email", "address", "phone", "offline_access", "research_and_scholarship"] + } + endpoint_context.keyjar.import_jwks_as_json( + endpoint_context.keyjar.export_jwks_as_json(private=True), + endpoint_context.issuer, + ) + self.revocation_endpoint = server.server_get("endpoint", "token_revocation") + self.token_endpoint = server.server_get("endpoint", "token") + self.session_manager = endpoint_context.session_manager + self.user_id = "diana" + + def _create_session(self, auth_req, sub_type="public", sector_identifier=""): + if sector_identifier: + authz_req = auth_req.copy() + authz_req["sector_identifier_uri"] = sector_identifier + else: + authz_req = auth_req + client_id = authz_req["client_id"] + ae = create_authn_event(self.user_id) + return self.session_manager.create_session( + ae, authz_req, self.user_id, client_id=client_id, sub_type=sub_type + ) + + def _mint_token(self, token_class, grant, session_id, based_on=None, **kwargs): + # Constructing an authorization code is now done + return grant.mint_token( + session_id=session_id, + endpoint_context=self.token_endpoint.server_get("endpoint_context"), + token_class=token_class, + token_handler=self.session_manager.token_handler.handler[token_class], + expires_at=utc_time_sans_frac() + 300, # 5 minutes from now + based_on=based_on, + **kwargs + ) + + def _get_access_token(self, areq): + session_id = self._create_session(areq) + # Consent handling + grant = self.token_endpoint.server_get("endpoint_context").authz(session_id, areq) + self.session_manager[session_id] = grant + # grant = self.session_manager[session_id] + code = self._mint_token("authorization_code", grant, session_id) + return self._mint_token("access_token", grant, session_id, code) + + def _get_refresh_token(self, areq): + session_id = self._create_session(areq) + # Consent handling + grant = self.token_endpoint.server_get("endpoint_context").authz(session_id, areq) + self.session_manager[session_id] = grant + # grant = self.session_manager[session_id] + code = self._mint_token("authorization_code", grant, session_id) + return self._mint_token("refresh_token", grant, session_id, code) + + def test_parse_no_authn(self): + access_token = self._get_access_token(AUTH_REQ) + with pytest.raises(ClientAuthenticationError): + self.revocation_endpoint.parse_request({"token": access_token.value}) + + def test_parse_with_client_auth_in_req(self): + access_token = self._get_access_token(AUTH_REQ) + + _context = self.revocation_endpoint.server_get("endpoint_context") + _req = self.revocation_endpoint.parse_request( + { + "token": access_token.value, + "client_id": "client_1", + "client_secret": _context.cdb["client_1"]["client_secret"], + } + ) + + assert isinstance(_req, TokenRevocationRequest) + assert set(_req.keys()) == {"token", "client_id", "client_secret"} + + def test_parse_with_wrong_client_authn(self): + access_token = self._get_access_token(AUTH_REQ) + + _basic_token = "{}:{}".format( + "client_1", + self.revocation_endpoint.server_get("endpoint_context").cdb["client_1"][ + "client_secret" + ], + ) + _basic_token = as_unicode(base64.b64encode(as_bytes(_basic_token))) + _basic_authz = "Basic {}".format(_basic_token) + http_info = {"headers": {"authorization": _basic_authz}} + + with pytest.raises(ClientAuthenticationError): + self.revocation_endpoint.parse_request( + {"token": access_token.value}, http_info=http_info + ) + + def test_process_request(self): + access_token = self._get_access_token(AUTH_REQ) + + _req = self.revocation_endpoint.parse_request( + { + "token": access_token.value, + "client_id": "client_1", + "client_secret": self.revocation_endpoint.server_get("endpoint_context").cdb[ + "client_1" + ]["client_secret"], + } + ) + _resp = self.revocation_endpoint.process_request(_req) + assert _resp + assert set(_resp.keys()) == {"response_args"} + + def test_do_response(self): + access_token = self._get_access_token(AUTH_REQ) + + _req = self.revocation_endpoint.parse_request( + { + "token": access_token.value, + "client_id": "client_1", + "client_secret": self.revocation_endpoint.server_get("endpoint_context").cdb[ + "client_1" + ]["client_secret"], + } + ) + _resp = self.revocation_endpoint.process_request(_req) + msg_info = self.revocation_endpoint.do_response(request=_req, **_resp) + assert isinstance(msg_info, dict) + assert set(msg_info.keys()) == {"response", "http_headers"} + assert msg_info["http_headers"] == [ + ("Content-type", "application/json; charset=utf-8"), + ("Pragma", "no-cache"), + ("Cache-Control", "no-store"), + ] + + def test_do_response_no_token(self): + # access_token = self._get_access_token(AUTH_REQ) + _context = self.revocation_endpoint.server_get("endpoint_context") + _req = self.revocation_endpoint.parse_request( + { + "client_id": "client_1", + "client_secret": _context.cdb["client_1"]["client_secret"], + } + ) + _resp = self.revocation_endpoint.process_request(_req) + assert "error" in _resp + + def test_access_token(self): + access_token = self._get_access_token(AUTH_REQ) + assert access_token.revoked is False + _context = self.revocation_endpoint.server_get("endpoint_context") + _req = self.revocation_endpoint.parse_request( + { + "token": access_token.value, + "client_id": "client_1", + "client_secret": _context.cdb["client_1"]["client_secret"], + } + ) + _resp = self.revocation_endpoint.process_request(_req) + assert "response_args" in _resp + assert access_token.revoked + + def test_access_token_per_client(self): + + def custom_token_revocation_policy(token, session_info, **kwargs): + _token = token + _token.revoke() + response_args = {"response_args": {"type": "custom"}} + return TokenRevocationResponse(**response_args) + + access_token = self._get_access_token(AUTH_REQ) + assert access_token.revoked is False + _context = self.revocation_endpoint.server_get("endpoint_context") + _context.cdb["client_1"]["token_revocation"] = { + "token_types_supported": [ + "access_token", + ], + "policy": { + "": { + "callable": validate_token_revocation_policy, + }, + "access_token": { + "callable": custom_token_revocation_policy, + } + }, + } + _req = self.revocation_endpoint.parse_request( + { + "token": access_token.value, + "client_id": "client_1", + "client_secret": _context.cdb["client_1"]["client_secret"], + } + ) + _resp = self.revocation_endpoint.process_request(_req) + assert "response_args" in _resp + assert "type" in _resp["response_args"] + assert _resp["response_args"]["type"] == "custom" + assert access_token.revoked + + def test_missing_token_policy_per_client(self): + + def custom_token_revocation_policy(token, session_info, **kwargs): + _token = token + _token.revoke() + response_args = {"response_args": {"type": "custom"}} + return TokenRevocationResponse(**response_args) + + access_token = self._get_access_token(AUTH_REQ) + assert access_token.revoked is False + _context = self.revocation_endpoint.server_get("endpoint_context") + _context.cdb["client_1"]["token_revocation"] = { + "token_types_supported": [ + "access_token", + ], + "policy": { + "": { + "callable": validate_token_revocation_policy, + }, + "refresh_token": { + "callable": custom_token_revocation_policy, + } + }, + } + _req = self.revocation_endpoint.parse_request( + { + "token": access_token.value, + "client_id": "client_1", + "client_secret": _context.cdb["client_1"]["client_secret"], + } + ) + _resp = self.revocation_endpoint.process_request(_req) + assert "response_args" in _resp + assert access_token.revoked + + def test_code(self): + session_id = self._create_session(AUTH_REQ) + + # Apply consent + grant = self.token_endpoint.server_get("endpoint_context").authz(session_id, AUTH_REQ) + self.session_manager[session_id] = grant + + code = self._mint_token("authorization_code", grant, session_id) + assert code.revoked is False + _context = self.revocation_endpoint.server_get("endpoint_context") + + _req = self.revocation_endpoint.parse_request( + { + "token": code.value, + "client_id": "client_1", + "client_secret": _context.cdb["client_1"]["client_secret"], + } + ) + _resp = self.revocation_endpoint.process_request(_req) + assert "response_args" in _resp + assert code.revoked + + def test_refresh_token(self): + refresh_token = self._get_refresh_token(AUTH_REQ) + assert refresh_token.revoked is False + _context = self.revocation_endpoint.server_get("endpoint_context") + _req = self.revocation_endpoint.parse_request( + { + "token": refresh_token.value, + "client_id": "client_1", + "client_secret": _context.cdb["client_1"]["client_secret"], + } + ) + _resp = self.revocation_endpoint.process_request(_req) + assert "response_args" in _resp + assert refresh_token.revoked + + def test_expired_access_token(self): + access_token = self._get_access_token(AUTH_REQ) + access_token.expires_at = utc_time_sans_frac() - 1000 + + _context = self.revocation_endpoint.server_get("endpoint_context") + + _req = self.revocation_endpoint.parse_request( + { + "token": access_token.value, + "client_id": "client_1", + "client_secret": _context.cdb["client_1"]["client_secret"], + } + ) + _resp = self.revocation_endpoint.process_request(_req) + assert "response_args" in _resp + + def test_revoked_access_token(self): + access_token = self._get_access_token(AUTH_REQ) + access_token.revoked = True + + _context = self.revocation_endpoint.server_get("endpoint_context") + + _req = self.revocation_endpoint.parse_request( + { + "token": access_token.value, + "client_id": "client_1", + "client_secret": _context.cdb["client_1"]["client_secret"], + } + ) + _resp = self.revocation_endpoint.process_request(_req) + assert "response_args" in _resp + + def test_unsupported_token_type(self): + self.revocation_endpoint.token_types_supported = ["access_token"] + session_id = self._create_session(AUTH_REQ) + + # Apply consent + grant = self.token_endpoint.server_get("endpoint_context").authz(session_id, AUTH_REQ) + self.session_manager[session_id] = grant + + code = self._mint_token("authorization_code", grant, session_id) + assert code.revoked is False + _context = self.revocation_endpoint.server_get("endpoint_context") + + _req = self.revocation_endpoint.parse_request( + { + "token": code.value, + "client_id": "client_1", + "client_secret": _context.cdb["client_1"]["client_secret"], + } + ) + _resp = self.revocation_endpoint.process_request(_req) + err_dscr = ( + "The authorization server does not support the revocation of " + "the presented token type. That is, the client tried to revoke an access " + "token on a server not supporting this feature." + ) + assert "error" in _resp + assert _resp.to_dict() == { + "error": "unsupported_token_type", + "error_description": err_dscr, + } + assert code.revoked is False + + def test_unsupported_token_type_per_client(self): + _context = self.revocation_endpoint.server_get("endpoint_context") + _context.cdb["client_1"]["token_revocation"] = { + "token_types_supported": [ + "refresh_token", + ], + } + session_id = self._create_session(AUTH_REQ) + + # Apply consent + grant = self.token_endpoint.server_get("endpoint_context").authz(session_id, AUTH_REQ) + self.session_manager[session_id] = grant + + code = self._mint_token("authorization_code", grant, session_id) + assert code.revoked is False + _context = self.revocation_endpoint.server_get("endpoint_context") + + _req = self.revocation_endpoint.parse_request( + { + "token": code.value, + "client_id": "client_1", + "client_secret": _context.cdb["client_1"]["client_secret"], + } + ) + _resp = self.revocation_endpoint.process_request(_req) + err_dscr = ( + "The authorization server does not support the revocation of " + "the presented token type. That is, the client tried to revoke an access " + "token on a server not supporting this feature." + ) + assert "error" in _resp + assert _resp.to_dict() == { + "error": "unsupported_token_type", + "error_description": err_dscr, + } + assert code.revoked is False diff --git a/tests/test_tandem_10_token_exchange.py b/tests/test_tandem_10_token_exchange.py index b462b6c4..54b44872 100644 --- a/tests/test_tandem_10_token_exchange.py +++ b/tests/test_tandem_10_token_exchange.py @@ -363,6 +363,7 @@ def test_token_exchange_per_client(self, token): "urn:ietf:params:oauth:token-type:access_token", "urn:ietf:params:oauth:token-type:refresh_token", ], + "default_requested_token_type": "urn:ietf:params:oauth:token-type:access_token", "policy": { "": { "callable": @@ -373,7 +374,6 @@ def test_token_exchange_per_client(self, token): } resp, _state, _scope = self.process_setup(token) - # ****** Token Exchange Request ********** req_args = { @@ -386,7 +386,6 @@ def test_token_exchange_per_client(self, token): _token_exchange_request, _te_resp = self.do_query("token_exchange", "token", req_args, _state) - assert set(_te_resp.keys()) == { "access_token", "token_type",