|
| 1 | +from functools import partial, wraps |
| 2 | +from html import escape |
| 3 | +from typing import List # Needed in Python 3.7 & 3.8 |
| 4 | + |
| 5 | +from django.shortcuts import redirect, render |
| 6 | +from django.urls import path, reverse |
| 7 | + |
| 8 | +from .web import Auth as _Auth |
| 9 | + |
| 10 | + |
| 11 | +class Auth(object): |
| 12 | + _name_of_auth_response_view = f"{__name__}.auth_response" # Presumably unique |
| 13 | + |
| 14 | + def __init__( |
| 15 | + self, |
| 16 | + client_id: str, |
| 17 | + *, |
| 18 | + client_credential=None, |
| 19 | + redirect_view: str=None, |
| 20 | + scopes: List[str]=None, |
| 21 | + authority: str=None, |
| 22 | + |
| 23 | + # We end up accepting Microsoft Entra ID B2C parameters rather than generic urls |
| 24 | + # because it is troublesome to build those urls in settings.py or templates |
| 25 | + b2c_tenant_name: str=None, |
| 26 | + b2c_signup_signin_user_flow: str=None, |
| 27 | + b2c_edit_profile_user_flow: str=None, |
| 28 | + b2c_reset_password_user_flow: str=None, |
| 29 | + ): |
| 30 | + """Create an identity helper for a Django web project. |
| 31 | +
|
| 32 | + This instance is expected to be long-lived with the web project. |
| 33 | +
|
| 34 | + :param str client_id: |
| 35 | + The client_id of your web application, issued by its authority. |
| 36 | +
|
| 37 | + :param str client_credential: |
| 38 | + It is somtimes a string. |
| 39 | + The actual format is decided by the underlying auth library. TBD. |
| 40 | +
|
| 41 | + :param str redirect_view: |
| 42 | + This will be used as the last segment to form your project's redirect_uri. |
| 43 | +
|
| 44 | + For example, if you provide an input here as "auth_response", |
| 45 | + and your Django project mounts this ``Auth`` object's ``urlpatterns`` |
| 46 | + by ``path("prefix/", include(auth.urlpatterns))``, |
| 47 | + then the actual redirect_uri will become ``.../prefix/auth_response`` |
| 48 | + which MUST match what you have registered for your web application. |
| 49 | +
|
| 50 | + Typically, if your application uses a flat redirect_uri as |
| 51 | + ``https://example.com/auth_response``, |
| 52 | + your shall use an redirect_view value as ``auth_response``, |
| 53 | + and then mount it by ``path("", include(auth.urlpatterns))``. |
| 54 | +
|
| 55 | +
|
| 56 | + :param list[str] scopes: |
| 57 | + A list of strings representing the scopes used during login. |
| 58 | +
|
| 59 | + :param str authority: |
| 60 | + The authority which your application registers with. |
| 61 | + For example, ``https://example.com/foo``. |
| 62 | + This is a required parameter unless you the following B2C parameters. |
| 63 | +
|
| 64 | + :param str b2c_tenant_name: |
| 65 | + The tenant name of your Microsoft Entra ID tenant, such as "contoso". |
| 66 | + Required if your project is using Microsoft Entra ID B2C. |
| 67 | +
|
| 68 | + :param str b2c_signup_signin_user_flow: |
| 69 | + The name of your Microsoft Entra ID tenant's sign-in flow, |
| 70 | + such as "B2C_1_signupsignin1". |
| 71 | + Required if your project is using Microsoft Entra ID B2C. |
| 72 | +
|
| 73 | + :param str b2c_edit_profile_user_flow: |
| 74 | + The name of your Microsoft Entra ID tenant's edit-profile flow, |
| 75 | + such as "B2C_1_profile_editing". |
| 76 | + Optional. |
| 77 | +
|
| 78 | + :param str b2c_edit_profile_user_flow: |
| 79 | + The name of your Microsoft Entra ID tenant's reset-password flow, |
| 80 | + such as "B2C_1_reset_password". |
| 81 | + Optional. |
| 82 | +
|
| 83 | + """ |
| 84 | + self._client_id = client_id |
| 85 | + self._client_credential = client_credential |
| 86 | + if redirect_view and "/" in redirect_view: |
| 87 | + raise ValueError("redirect_view shall not contain slash") |
| 88 | + self._redirect_view = redirect_view |
| 89 | + self._scopes = scopes |
| 90 | + self.urlpatterns = [ # Note: path(..., view, ...) does not accept classmethod |
| 91 | + path('login', self.login), |
| 92 | + path('logout', self.logout), |
| 93 | + path( |
| 94 | + redirect_view or 'auth_response', # The latter is used by device code flow |
| 95 | + self.auth_response, |
| 96 | + name=self._name_of_auth_response_view, |
| 97 | + ), |
| 98 | + ] |
| 99 | + self._http_cache = {} # All subsequent _Auth instances will share this |
| 100 | + |
| 101 | + # Note: We do not use overload, because we want to allow the caller to |
| 102 | + # have only one code path that relay in all the optional parameters. |
| 103 | + if b2c_tenant_name and b2c_signup_signin_user_flow: |
| 104 | + b2c_authority_template = ( # TODO: Support custom domain |
| 105 | + "https://{tenant}.b2clogin.com/{tenant}.onmicrosoft.com/{user_flow}") |
| 106 | + self._authority = b2c_authority_template.format( |
| 107 | + tenant=b2c_tenant_name, |
| 108 | + user_flow=b2c_signup_signin_user_flow, |
| 109 | + ) |
| 110 | + self._edit_profile_auth = _Auth( |
| 111 | + session={}, |
| 112 | + authority=b2c_authority_template.format( |
| 113 | + tenant=b2c_tenant_name, |
| 114 | + user_flow=b2c_edit_profile_user_flow, |
| 115 | + ), |
| 116 | + client_id=client_id, |
| 117 | + ) if b2c_edit_profile_user_flow else None |
| 118 | + self._reset_password_auth = _Auth( |
| 119 | + session={}, |
| 120 | + authority=b2c_authority_template.format( |
| 121 | + tenant=b2c_tenant_name, |
| 122 | + user_flow=b2c_reset_password_user_flow, |
| 123 | + ), |
| 124 | + client_id=client_id, |
| 125 | + ) if b2c_reset_password_user_flow else None |
| 126 | + else: |
| 127 | + self._authority = authority |
| 128 | + self._edit_profile_auth = None |
| 129 | + self._reset_password_auth = None |
| 130 | + if not self._authority: |
| 131 | + raise ValueError( |
| 132 | + "Either authority or b2c_tenant_name and b2c_signup_signin_user_flow " |
| 133 | + "must be provided") |
| 134 | + |
| 135 | + def _build_auth(self, request): |
| 136 | + return _Auth( |
| 137 | + session=request.session, |
| 138 | + authority=self._authority, |
| 139 | + client_id=self._client_id, |
| 140 | + client_credential=self._client_credential, |
| 141 | + http_cache=self._http_cache, |
| 142 | + ) |
| 143 | + |
| 144 | + def _get_reset_password_url(self, request): |
| 145 | + return self._reset_password_auth.log_in( |
| 146 | + redirect_uri=request.build_absolute_uri(self._redirect_view) |
| 147 | + )["auth_uri"] if self._reset_password_auth and self._redirect_view else None |
| 148 | + |
| 149 | + def get_edit_profile_url(self, request): |
| 150 | + return self._edit_profile_auth.log_in( |
| 151 | + redirect_uri=request.build_absolute_uri(self._redirect_view) |
| 152 | + )["auth_uri"] if self._edit_profile_auth and self._redirect_view else None |
| 153 | + |
| 154 | + def login(self, request): |
| 155 | + """The login view""" |
| 156 | + if not self._client_id: |
| 157 | + return self._render_auth_error( |
| 158 | + request, |
| 159 | + error="configuration_error", |
| 160 | + error_description="Did you forget to setup CLIENT_ID (and other configuration)?", |
| 161 | + ) |
| 162 | + redirect_uri = request.build_absolute_uri( |
| 163 | + self._redirect_view) if self._redirect_view else None |
| 164 | + log_in_result = self._build_auth(request).log_in( |
| 165 | + scopes=self._scopes, # Have user consent to scopes during log-in |
| 166 | + redirect_uri=redirect_uri, # Optional. If present, this absolute URL must match your app's redirect_uri registered in Azure Portal |
| 167 | + prompt="select_account", # Optional. More values defined in https://openid.net/specs/openid-connect-core-1_0.html#AuthRequest |
| 168 | + ) |
| 169 | + if "error" in log_in_result: |
| 170 | + return self._render_auth_error( |
| 171 | + request, |
| 172 | + error=log_in_result["error"], |
| 173 | + error_description=log_in_result.get("error_description"), |
| 174 | + ) |
| 175 | + return render(request, "identity/login.html", dict( |
| 176 | + log_in_result, |
| 177 | + reset_password_url=self._get_reset_password_url(request), |
| 178 | + auth_response_url=reverse(self._name_of_auth_response_view), |
| 179 | + )) |
| 180 | + |
| 181 | + def _render_auth_error(self, request, error, error_description=None): |
| 182 | + return render(request, "identity/auth_error.html", dict( |
| 183 | + # Use flat data types so that the template can be as simple as possible |
| 184 | + error=escape(error), |
| 185 | + error_description=escape(error_description or ""), |
| 186 | + reset_password_url=self._get_reset_password_url(request), |
| 187 | + )) |
| 188 | + |
| 189 | + def auth_response(self, request): |
| 190 | + """The auth_response view""" |
| 191 | + result = self._build_auth(request).complete_log_in(request.GET) |
| 192 | + if "error" in result: |
| 193 | + return self._render_auth_error( |
| 194 | + request, |
| 195 | + error=result["error"], |
| 196 | + error_description=result.get("error_description"), |
| 197 | + ) |
| 198 | + return redirect("index") # TODO: Go back to a customizable url |
| 199 | + |
| 200 | + def logout(self, request): |
| 201 | + """The logout view""" |
| 202 | + return redirect( |
| 203 | + self._build_auth(request).log_out(request.build_absolute_uri("/"))) |
| 204 | + |
| 205 | + def get_user(self, request): |
| 206 | + return self._build_auth(request).get_user() |
| 207 | + |
| 208 | + def get_token_for_user(self, request, scopes: List[str]): |
| 209 | + return self._build_auth(request).get_token_for_user(scopes) |
| 210 | + |
| 211 | + def login_required( |
| 212 | + self, |
| 213 | + function=None, # TODO: /, *, redirect_field_name=None, login_url=None, |
| 214 | + ): |
| 215 | + # With or without parameter. Inspired by https://stackoverflow.com/a/39335652 |
| 216 | + |
| 217 | + # With parameter |
| 218 | + if function is None: |
| 219 | + return partial( |
| 220 | + self.login_required, |
| 221 | + #redirect_field_name=redirect_field_name, |
| 222 | + #login_url=login_url, |
| 223 | + ) |
| 224 | + |
| 225 | + # Without parameter |
| 226 | + @wraps(function) |
| 227 | + def wrapper(request, *args, **kwargs): |
| 228 | + auth = self._build_auth(request) |
| 229 | + if not auth.get_user(): |
| 230 | + return redirect(self.login) |
| 231 | + return function(request, *args, **kwargs) |
| 232 | + return wrapper |
| 233 | + |
0 commit comments