|  | 
| 3 | 3 | # Shared Secret Authenticator module for Matrix Synapse | 
| 4 | 4 | # Copyright (C) 2018 Slavi Pantaleev | 
| 5 | 5 | # | 
| 6 |  | -# http://devture.com/ | 
|  | 6 | +# https://devture.com/ | 
| 7 | 7 | # | 
| 8 | 8 | # This program is free software: you can redistribute it and/or modify | 
| 9 | 9 | # it under the terms of the GNU Affero General Public License as | 
|  | 
| 18 | 18 | # You should have received a copy of the GNU Affero General Public License | 
| 19 | 19 | # along with this program.  If not, see <https://www.gnu.org/licenses/>. | 
| 20 | 20 | # | 
|  | 21 | +from typing import Awaitable, Callable, Optional, Tuple | 
| 21 | 22 | 
 | 
| 22 |  | -import logging | 
| 23 | 23 | import hashlib | 
| 24 | 24 | import hmac | 
| 25 |  | -from twisted.internet import defer | 
|  | 25 | +import logging | 
|  | 26 | + | 
|  | 27 | +import synapse | 
|  | 28 | +from synapse import module_api | 
| 26 | 29 | 
 | 
| 27 | 30 | logger = logging.getLogger(__name__) | 
| 28 | 31 | 
 | 
| 29 |  | -class SharedSecretAuthenticator(object): | 
|  | 32 | +class SharedSecretAuthProvider: | 
|  | 33 | +    def __init__(self, config: dict, api: module_api): | 
|  | 34 | +        for k in ('shared_secret',): | 
|  | 35 | +            if k not in config: | 
|  | 36 | +                raise Error('Required `{0}` configuration key not found'.format(k)) | 
|  | 37 | + | 
|  | 38 | +        m_login_password_support_enabled = bool(config['m_login_password_support_enabled']) if 'm_login_password_support_enabled' in config else False | 
|  | 39 | + | 
|  | 40 | +        self.api = api | 
|  | 41 | +        self.shared_secret = config['shared_secret'] | 
|  | 42 | + | 
|  | 43 | +        auth_checkers: Optional[Dict[Tuple[str, Tuple], CHECK_AUTH_CALLBACK]] = {} | 
|  | 44 | +        auth_checkers[("com.devture.shared_secret_auth", ("token",))] = self.check_com_devture_shared_secret_auth | 
|  | 45 | +        if m_login_password_support_enabled: | 
|  | 46 | +            auth_checkers[("m.login.password", ("password",))] = self.check_m_login_password | 
|  | 47 | + | 
|  | 48 | +        enabled_login_types = [k[0] for k in auth_checkers] | 
|  | 49 | +        logger.info('Enabled login types: %s', enabled_login_types) | 
| 30 | 50 | 
 | 
| 31 |  | -    def __init__(self, config, account_handler): | 
| 32 |  | -        self.account_handler = account_handler | 
| 33 |  | -        self.sharedSecret = config['sharedSecret'] | 
|  | 51 | +        api.register_password_auth_provider_callbacks( | 
|  | 52 | +            auth_checkers=auth_checkers, | 
|  | 53 | +        ) | 
| 34 | 54 | 
 | 
| 35 |  | -    @defer.inlineCallbacks | 
| 36 |  | -    def check_password(self, user_id, password): | 
| 37 |  | -        # The password is supposed to be an HMAC of the user id, keyed with the shared secret. | 
| 38 |  | -        # It's not really a password in this case. | 
| 39 |  | -        given_hmac = password.encode('utf-8') | 
|  | 55 | +    async def check_com_devture_shared_secret_auth( | 
|  | 56 | +        self, | 
|  | 57 | +        username: str, | 
|  | 58 | +        login_type: str, | 
|  | 59 | +        login_dict: "synapse.module_api.JsonDict", | 
|  | 60 | +    ) -> Optional[ | 
|  | 61 | +        Tuple[ | 
|  | 62 | +            str, | 
|  | 63 | +            Optional[Callable[["synapse.module_api.LoginResponse"], Awaitable[None]]], | 
|  | 64 | +        ] | 
|  | 65 | +    ]: | 
|  | 66 | +        if login_type != "com.devture.shared_secret_auth": | 
|  | 67 | +            return None | 
|  | 68 | +        return await self._log_in_username_with_token("com.devture.shared_secret_auth", username, login_dict.get("token")) | 
| 40 | 69 | 
 | 
| 41 |  | -        logger.info('Authenticating user: %s', user_id) | 
|  | 70 | +    async def check_m_login_password( | 
|  | 71 | +        self, | 
|  | 72 | +        username: str, | 
|  | 73 | +        login_type: str, | 
|  | 74 | +        login_dict: "synapse.module_api.JsonDict", | 
|  | 75 | +    ) -> Optional[ | 
|  | 76 | +        Tuple[ | 
|  | 77 | +            str, | 
|  | 78 | +            Optional[Callable[["synapse.module_api.LoginResponse"], Awaitable[None]]], | 
|  | 79 | +        ] | 
|  | 80 | +    ]: | 
|  | 81 | +        if login_type != "m.login.password": | 
|  | 82 | +            return None | 
|  | 83 | +        return await self._log_in_username_with_token("m.login.password", username, login_dict.get("password")) | 
| 42 | 84 | 
 | 
| 43 |  | -        h = hmac.new(self.sharedSecret.encode('utf-8'), user_id.encode('utf-8'), hashlib.sha512) | 
|  | 85 | +    async def _log_in_username_with_token( | 
|  | 86 | +        self, | 
|  | 87 | +        login_type: str, | 
|  | 88 | +        username: str, | 
|  | 89 | +        token: str, | 
|  | 90 | +    ) -> Optional[ | 
|  | 91 | +        Tuple[ | 
|  | 92 | +            str, | 
|  | 93 | +            Optional[Callable[["synapse.module_api.LoginResponse"], Awaitable[None]]], | 
|  | 94 | +        ] | 
|  | 95 | +    ]: | 
|  | 96 | +        logger.info('Authenticating user `%s` with login type `%s`', username, login_type) | 
|  | 97 | + | 
|  | 98 | +        full_user_id = self.api.get_qualified_user_id(username) | 
|  | 99 | + | 
|  | 100 | +        # The password (token) is supposed to be an HMAC of the full user id, keyed with the shared secret. | 
|  | 101 | +        given_hmac = token.encode('utf-8') | 
|  | 102 | + | 
|  | 103 | +        h = hmac.new(self.shared_secret.encode('utf-8'), full_user_id.encode('utf-8'), hashlib.sha512) | 
| 44 | 104 |         computed_hmac = h.hexdigest().encode('utf-8') | 
| 45 | 105 | 
 | 
| 46 |  | -        try: | 
| 47 |  | -            is_identical = hmac.compare_digest(computed_hmac, given_hmac) | 
| 48 |  | -        except AttributeError: | 
| 49 |  | -            # `hmac.compare_digest` is only available on Python >= 2.7.7 | 
| 50 |  | -            # Fall back to being somewhat insecure on older versions. | 
| 51 |  | -            is_identical = (computed_hmac == given_hmac) | 
| 52 |  | - | 
| 53 |  | -        if not is_identical: | 
| 54 |  | -            logger.info('Bad hmac value for user: %s', user_id) | 
| 55 |  | -            defer.returnValue(False) | 
| 56 |  | -            return | 
| 57 |  | - | 
| 58 |  | -        if not (yield self.account_handler.check_user_exists(user_id)): | 
| 59 |  | -            logger.info('Refusing to authenticate missing user: %s', user_id) | 
| 60 |  | -            defer.returnValue(False) | 
| 61 |  | -            return | 
| 62 |  | - | 
| 63 |  | -        logger.info('Authenticated user: %s', user_id) | 
| 64 |  | -        defer.returnValue(True) | 
| 65 |  | - | 
| 66 |  | -    @staticmethod | 
| 67 |  | -    def parse_config(config): | 
| 68 |  | -        if 'sharedSecret' not in config: | 
| 69 |  | -            raise Exception('Missing sharedSecret parameter for SharedSecretAuthenticator') | 
| 70 |  | -        return config | 
|  | 106 | +        if not hmac.compare_digest(computed_hmac, given_hmac): | 
|  | 107 | +            logger.info('Bad hmac value for user: %s', full_user_id) | 
|  | 108 | +            return None | 
|  | 109 | + | 
|  | 110 | +        user_info = await self.api.get_userinfo_by_id(full_user_id) | 
|  | 111 | +        if user_info is None: | 
|  | 112 | +            logger.info('Refusing to authenticate missing user: %s', full_user_id) | 
|  | 113 | +            return None | 
|  | 114 | + | 
|  | 115 | +        logger.info('Authenticated user: %s', full_user_id) | 
|  | 116 | + | 
|  | 117 | +        return full_user_id, None | 
0 commit comments