From 443b812fe398b7e13a348d5cb3f4a9374953344e Mon Sep 17 00:00:00 2001 From: Daniel Mohns Date: Mon, 16 Oct 2023 11:59:35 +0200 Subject: [PATCH 1/4] Black-ify codebase --- openpaygo/metrics_request.py | 84 ++++++++----- openpaygo/metrics_response.py | 190 ++++++++++++++++++----------- openpaygo/metrics_shared.py | 158 +++++++++++++----------- openpaygo/token_decode.py | 139 ++++++++++++++++----- openpaygo/token_encode.py | 102 ++++++++++++---- openpaygo/token_shared.py | 44 ++++--- openpaygo/token_shared_extended.py | 20 +-- 7 files changed, 485 insertions(+), 252 deletions(-) diff --git a/openpaygo/metrics_request.py b/openpaygo/metrics_request.py index 8500d99..9232dbe 100644 --- a/openpaygo/metrics_request.py +++ b/openpaygo/metrics_request.py @@ -3,36 +3,39 @@ class MetricsRequestHandler(object): - - def __init__(self, serial_number, data_format=None, secret_key=None, auth_method=None): + def __init__( + self, serial_number, data_format=None, secret_key=None, auth_method=None + ): self.secret_key = secret_key self.auth_method = auth_method self.request_dict = { - 'serial_number': serial_number, + "serial_number": serial_number, } self.data_format = data_format if self.data_format: - if self.data_format.get('id'): - self.request_dict['data_format_id'] = data_format.get('id') + if self.data_format.get("id"): + self.request_dict["data_format_id"] = data_format.get("id") else: - self.request_dict['data_format'] = data_format + self.request_dict["data_format"] = data_format self.data = {} self.historical_data = {} def set_request_count(self, request_count): - self.request_dict['request_count'] = request_count + self.request_dict["request_count"] = request_count def set_timestamp(self, timestamp): - self.request_dict['timestamp'] = timestamp + self.request_dict["timestamp"] = timestamp def set_data(self, data): self.data = data def set_historical_data(self, historical_data): - if not self.data_format.get('historical_data_interval'): + if not self.data_format.get("historical_data_interval"): for time_step in historical_data: - if not time_step.get('timestamp'): - raise ValueError('Historical Data objects must have a time stamp if no historical_data_interval is defined.') + if not time_step.get("timestamp"): + raise ValueError( + "Historical Data objects must have a time stamp if no historical_data_interval is defined." + ) self.historical_data = historical_data def get_simple_request_payload(self): @@ -41,11 +44,15 @@ def get_simple_request_payload(self): def get_simple_request_dict(self): simple_request = self.request_dict - simple_request['data'] = self.data - simple_request['historical_data'] = self.historical_data + simple_request["data"] = self.data + simple_request["historical_data"] = self.historical_data # We prepare the auth if self.auth_method: - simple_request['auth'] = OpenPAYGOMetricsShared.generate_request_signature_from_data(simple_request, self.auth_method, self.secret_key) + simple_request[ + "auth" + ] = OpenPAYGOMetricsShared.generate_request_signature_from_data( + simple_request, self.auth_method, self.secret_key + ) return simple_request def get_condensed_request_payload(self): @@ -54,23 +61,31 @@ def get_condensed_request_payload(self): def get_condensed_request_dict(self): if not self.data_format: - raise ValueError('No Data Format provided for condensed request') - data_order = self.data_format.get('data_order') + raise ValueError("No Data Format provided for condensed request") + data_order = self.data_format.get("data_order") if self.data and not data_order: - raise ValueError('Data Format does not contain data_order') - historical_data_order = self.data_format.get('historical_data_order') + raise ValueError("Data Format does not contain data_order") + historical_data_order = self.data_format.get("historical_data_order") if self.historical_data and not historical_data_order: - raise ValueError('Data Format does not contain historical_data_order') + raise ValueError("Data Format does not contain historical_data_order") condensed_request = copy.deepcopy(self.request_dict) - condensed_request['data'] = [] - condensed_request['historical_data'] = [] + condensed_request["data"] = [] + condensed_request["historical_data"] = [] # We add the data data_copy = copy.deepcopy(self.data) for var in data_order: - condensed_request['data'].append(data_copy.pop(var) if var in data_copy else None) + condensed_request["data"].append( + data_copy.pop(var) if var in data_copy else None + ) if len(data_copy) > 0: - raise ValueError('Additional variables not present in the data format: '+str(data_copy)) - condensed_request['data'] = OpenPAYGOMetricsShared.remove_trailing_empty_elements(condensed_request['data']) + raise ValueError( + "Additional variables not present in the data format: " + str(data_copy) + ) + condensed_request[ + "data" + ] = OpenPAYGOMetricsShared.remove_trailing_empty_elements( + condensed_request["data"] + ) # We add the historical data historical_data_copy = copy.deepcopy(self.historical_data) for time_step in historical_data_copy: @@ -78,12 +93,23 @@ def get_condensed_request_dict(self): for var in historical_data_order: time_step_data.append(time_step.pop(var) if var in time_step else None) if len(time_step) > 0: - raise ValueError('Additional variables not present in the historical data format: '+str(time_step)) - time_step_data = OpenPAYGOMetricsShared.remove_trailing_empty_elements(time_step_data) - condensed_request['historical_data'].append(time_step_data) + raise ValueError( + "Additional variables not present in the historical data format: " + + str(time_step) + ) + time_step_data = OpenPAYGOMetricsShared.remove_trailing_empty_elements( + time_step_data + ) + condensed_request["historical_data"].append(time_step_data) # We prepare the auth if self.auth_method: - condensed_request['auth'] = OpenPAYGOMetricsShared.generate_request_signature_from_data(condensed_request, self.auth_method, self.secret_key) + condensed_request[ + "auth" + ] = OpenPAYGOMetricsShared.generate_request_signature_from_data( + condensed_request, self.auth_method, self.secret_key + ) # We replace the key names by the condensed ones - condensed_request = OpenPAYGOMetricsShared.convert_dict_keys_to_condensed(condensed_request) + condensed_request = OpenPAYGOMetricsShared.convert_dict_keys_to_condensed( + condensed_request + ) return condensed_request diff --git a/openpaygo/metrics_response.py b/openpaygo/metrics_response.py index 8b6fa33..fd8134d 100644 --- a/openpaygo/metrics_response.py +++ b/openpaygo/metrics_response.py @@ -3,37 +3,52 @@ import copy from datetime import datetime, timedelta -class MetricsResponseHandler(object): - def __init__(self, received_metrics, data_format=None, secret_key=None, last_request_count=None, last_request_timestamp=None): +class MetricsResponseHandler(object): + def __init__( + self, + received_metrics, + data_format=None, + secret_key=None, + last_request_count=None, + last_request_timestamp=None, + ): self.received_metrics = received_metrics self.request_dict = json.loads(received_metrics) # We convert the base variable names to simple - self.request_dict = OpenPAYGOMetricsShared.convert_dict_keys_to_simple(self.request_dict) + self.request_dict = OpenPAYGOMetricsShared.convert_dict_keys_to_simple( + self.request_dict + ) # We add the reception timestamp if not timestamp was provided - self.request_timestamp = self.request_dict.get('timestamp') - if not self.request_dict.get('timestamp'): + self.request_timestamp = self.request_dict.get("timestamp") + if not self.request_dict.get("timestamp"): self.timestamp = int(datetime.now().timestamp()) else: - self.timestamp = self.request_dict.get('timestamp') + self.timestamp = self.request_dict.get("timestamp") self.response_dict = {} self.secret_key = secret_key self.data_format = data_format self.last_request_count = last_request_count self.last_request_timestamp = last_request_timestamp - if not self.data_format and self.request_dict.get('data_format'): - self.data_format = self.request_dict.get('data_format') + if not self.data_format and self.request_dict.get("data_format"): + self.data_format = self.request_dict.get("data_format") def get_device_serial(self): - return self.request_dict.get('serial_number') - + return self.request_dict.get("serial_number") + def get_data_format_id(self): - return self.request_dict.get('data_format_id') - + return self.request_dict.get("data_format_id") + def data_format_available(self): return self.data_format != None - - def set_device_parameters(self, secret_key=None, data_format=None, last_request_count=None, last_request_timestamp=None): + + def set_device_parameters( + self, + secret_key=None, + data_format=None, + last_request_count=None, + last_request_timestamp=None, + ): if secret_key: self.secret_key = secret_key if data_format: @@ -44,19 +59,29 @@ def set_device_parameters(self, secret_key=None, data_format=None, last_request_ self.last_request_timestamp = last_request_timestamp def is_auth_valid(self): - auth_string = self.request_dict.get('auth', None) + auth_string = self.request_dict.get("auth", None) if not auth_string: return False elif not self.secret_key: - raise ValueError('Secret key is required to check the auth.') + raise ValueError("Secret key is required to check the auth.") self.auth_method = auth_string[:2] - new_signature = OpenPAYGOMetricsShared.generate_request_signature_from_data(self.request_dict, self.auth_method, self.secret_key) + new_signature = OpenPAYGOMetricsShared.generate_request_signature_from_data( + self.request_dict, self.auth_method, self.secret_key + ) if auth_string == new_signature: request_count = self.get_request_count() - if request_count and self.last_request_count and request_count <= self.last_request_count: + if ( + request_count + and self.last_request_count + and request_count <= self.last_request_count + ): return False timestamp = self.get_request_timestamp() - if timestamp and self.last_request_timestamp and timestamp <= self.last_request_timestamp: + if ( + timestamp + and self.last_request_timestamp + and timestamp <= self.last_request_timestamp + ): return False # Either the request count or timestamp is required if request_count or timestamp: @@ -66,66 +91,76 @@ def is_auth_valid(self): def get_simple_metrics(self): # We start the process by making a copy of the dict to work with simple_dict = copy.deepcopy(self.request_dict) - simple_dict.pop('auth') if 'auth' in simple_dict else None # We remove the auth + simple_dict.pop("auth") if "auth" in simple_dict else None # We remove the auth # We process the data and replace it - simple_dict['data'] = self._get_simple_data() + simple_dict["data"] = self._get_simple_data() # We process the historical data - simple_dict['historical_data'] = self._get_simple_historical_data() + simple_dict["historical_data"] = self._get_simple_historical_data() # We fill in the timestamps for each time step - simple_dict['historical_data'] = self._fill_timestamp_in_historical_data(simple_dict['historical_data']) + simple_dict["historical_data"] = self._fill_timestamp_in_historical_data( + simple_dict["historical_data"] + ) return simple_dict - + def get_data_timestamp(self): - return self.request_dict.get('data_collection_timestamp', self.timestamp) - + return self.request_dict.get("data_collection_timestamp", self.timestamp) + def get_request_timestamp(self): return self.request_timestamp - + def get_request_count(self): - return self.request_dict.get('request_count') - + return self.request_dict.get("request_count") + def get_token_count(self): data = self._get_simple_data() - return data.get('token_count') - + return data.get("token_count") + def expects_token_answer(self): return self.get_token_count() is not None def add_tokens_to_answer(self, token_list): - self.response_dict['token_list'] = token_list + self.response_dict["token_list"] = token_list def expects_time_answer(self): data = self._get_simple_data() - if data.get('active_until_timestamp_requested', False) or data.get('active_seconds_left_requested', False): + if data.get("active_until_timestamp_requested", False) or data.get( + "active_seconds_left_requested", False + ): return True return False def add_time_to_answer(self, target_datetime): data = self._get_simple_data() - if data.get('active_until_timestamp_requested', False): + if data.get("active_until_timestamp_requested", False): target_timestamp = 0 if target_datetime: if target_datetime.year > 1970: target_timestamp = target_datetime.timestamp() - self.response_dict['active_until_timestamp'] = target_timestamp - elif data.get('active_seconds_left_requested', False): - seconds_left = (datetime.now() - target_datetime).total_seconds() if target_datetime else 0 - self.response_dict['active_seconds_left'] = seconds_left if seconds_left > 0 else 0 + self.response_dict["active_until_timestamp"] = target_timestamp + elif data.get("active_seconds_left_requested", False): + seconds_left = ( + (datetime.now() - target_datetime).total_seconds() + if target_datetime + else 0 + ) + self.response_dict["active_seconds_left"] = ( + seconds_left if seconds_left > 0 else 0 + ) else: - raise ValueError('No time requested') - + raise ValueError("No time requested") + def add_new_base_url_to_answer(self, new_base_url): - self.add_settings_to_answer({'base_url': new_base_url}) + self.add_settings_to_answer({"base_url": new_base_url}) def add_settings_to_answer(self, settings_dict): - if not self.response_dict.get('settings'): - self.response_dict['settings'] = {} - self.response_dict['settings'].update(settings_dict) + if not self.response_dict.get("settings"): + self.response_dict["settings"] = {} + self.response_dict["settings"].update(settings_dict) def add_extra_data_to_answer(self, extra_data_dict): - if not self.response_dict.get('extra_data'): - self.response_dict['extra_data'] = {} - self.response_dict['extra_data'].update(extra_data_dict) + if not self.response_dict.get("extra_data"): + self.response_dict["extra_data"] = {} + self.response_dict["extra_data"].update(extra_data_dict) def get_answer_payload(self): payload = self.get_answer_dict() @@ -135,52 +170,61 @@ def get_answer_dict(self): # If there is not data format, we just return the full response condensed_answer = copy.deepcopy(self.response_dict) if self.secret_key: - condensed_answer['auth'] = OpenPAYGOMetricsShared.generate_response_signature_from_data( - serial_number=self.request_dict.get('serial_number'), - request_count=self.request_dict.get('request_count'), + condensed_answer[ + "auth" + ] = OpenPAYGOMetricsShared.generate_response_signature_from_data( + serial_number=self.request_dict.get("serial_number"), + request_count=self.request_dict.get("request_count"), data=condensed_answer, - timestamp=self.request_dict.get('timestamp'), - secret_key=self.secret_key + timestamp=self.request_dict.get("timestamp"), + secret_key=self.secret_key, ) return OpenPAYGOMetricsShared.convert_dict_keys_to_condensed(condensed_answer) def _get_simple_data(self): - data = copy.deepcopy(self.request_dict.get('data')) + data = copy.deepcopy(self.request_dict.get("data")) # If no data or not condensed in list, we just return it if not data: return {} if not isinstance(data, list): return data - data_order = self.data_format.get('data_order') + data_order = self.data_format.get("data_order") if not data_order: - raise ValueError('Data Format does not contain data_order') + raise ValueError("Data Format does not contain data_order") clean_data = {} data_len = len(data) for idx, var in enumerate(data_order): clean_data[var] = data[idx] if idx < data_len else None data = data[data_len:] if len(data) > 0: - raise ValueError('Additional variables not present in the data format: '+str(data)) + raise ValueError( + "Additional variables not present in the data format: " + str(data) + ) return OpenPAYGOMetricsShared.convert_dict_keys_to_simple(clean_data) - + def _get_simple_historical_data(self): - historical_data = copy.deepcopy(self.request_dict.get('historical_data')) + historical_data = copy.deepcopy(self.request_dict.get("historical_data")) if not historical_data: return [] - historical_data_order = self.data_format.get('historical_data_order') + historical_data_order = self.data_format.get("historical_data_order") clean_historical_data = [] for time_step in historical_data: time_step_data = {} if isinstance(time_step, list): if not historical_data_order: - raise ValueError('Data Format does not contain historical_data_order') + raise ValueError( + "Data Format does not contain historical_data_order" + ) timse_step_len = len(time_step) for idx, var in enumerate(historical_data_order): if idx < timse_step_len: time_step_data[var] = time_step[idx] time_step = time_step[timse_step_len:] if len(time_step) > 0: - raise ValueError('Additional variables not present in the historical data format: '+str(time_step)) + raise ValueError( + "Additional variables not present in the historical data format: " + + str(time_step) + ) elif isinstance(time_step, dict): for key in time_step: if key.isdigit() and int(key) < len(historical_data_order): @@ -188,21 +232,25 @@ def _get_simple_historical_data(self): else: time_step_data[key] = time_step[key] else: - raise ValueError('Invalid historical data step type: '+str(time_step)) + raise ValueError("Invalid historical data step type: " + str(time_step)) clean_historical_data.append(time_step_data) return clean_historical_data - + def _fill_timestamp_in_historical_data(self, historical_data): last_timestamp = datetime.fromtimestamp(self.get_data_timestamp()) for idx, time_step in enumerate(historical_data): - if time_step.get('relative_time') is not None: - last_timestamp = last_timestamp + timedelta(seconds=int(time_step.get('relative_time'))) - historical_data[idx]['timestamp'] = int(last_timestamp.timestamp()) - del historical_data[idx]['relative_time'] - elif time_step.get('timestamp'): - last_timestamp = datetime.fromtimestamp(time_step.get('timestamp')) + if time_step.get("relative_time") is not None: + last_timestamp = last_timestamp + timedelta( + seconds=int(time_step.get("relative_time")) + ) + historical_data[idx]["timestamp"] = int(last_timestamp.timestamp()) + del historical_data[idx]["relative_time"] + elif time_step.get("timestamp"): + last_timestamp = datetime.fromtimestamp(time_step.get("timestamp")) else: if idx != 0: - last_timestamp = last_timestamp + timedelta(seconds=int(self.data_format.get('historical_data_interval'))) - historical_data[idx]['timestamp'] = int(last_timestamp.timestamp()) + last_timestamp = last_timestamp + timedelta( + seconds=int(self.data_format.get("historical_data_interval")) + ) + historical_data[idx]["timestamp"] = int(last_timestamp.timestamp()) return historical_data diff --git a/openpaygo/metrics_shared.py b/openpaygo/metrics_shared.py index 6907c04..4e6174c 100644 --- a/openpaygo/metrics_shared.py +++ b/openpaygo/metrics_shared.py @@ -4,43 +4,42 @@ class AuthMethod(object): - SIMPLE_AUTH = 'sa' - TIMESTAMP_AUTH = 'ta' - COUNTER_AUTH = 'ca' - DATA_AUTH = 'da' - RECURSIVE_DATA_AUTH = 'ra' + SIMPLE_AUTH = "sa" + TIMESTAMP_AUTH = "ta" + COUNTER_AUTH = "ca" + DATA_AUTH = "da" + RECURSIVE_DATA_AUTH = "ra" class OpenPAYGOMetricsShared(object): - CONDENSED_KEY_NAMES = { # Request - 'serial_number': 'sn', - 'timestamp': 'ts', - 'auth': 'a', - 'request_count': 'rc', - 'data_collection_timestamp': 'dtc', - 'data_format_id': 'df', - 'data_format': 'dfo', - 'data': 'd', - 'historical_data': 'hd', - 'accessories': 'acc', + "serial_number": "sn", + "timestamp": "ts", + "auth": "a", + "request_count": "rc", + "data_collection_timestamp": "dtc", + "data_format_id": "df", + "data_format": "dfo", + "data": "d", + "historical_data": "hd", + "accessories": "acc", # Response - 'token_list': 'tkl', - 'active_until_timestamp': 'auts', - 'active_seconds_left': 'asl', - 'settings': 'st', - 'extra_data': 'ed', + "token_list": "tkl", + "active_until_timestamp": "auts", + "active_seconds_left": "asl", + "settings": "st", + "extra_data": "ed", # Data - 'token_count': 'tc', - 'active_until_timestamp_requested': 'autsr', - 'active_seconds_left_requested': 'aslr' + "token_count": "tc", + "active_until_timestamp_requested": "autsr", + "active_seconds_left_requested": "aslr", } @classmethod def convert_dict_keys_to_condensed(cls, simple_dict): return cls.convert_dict_keys(simple_dict, cls.CONDENSED_KEY_NAMES) - + @classmethod def convert_dict_keys_to_simple(cls, condensed_dict): revert_keys = {v: k for k, v in cls.CONDENSED_KEY_NAMES.items()} @@ -55,7 +54,7 @@ def convert_dict_keys(cls, origin_dict, key_map): else: condensed_dict[key] = origin_dict[key] return condensed_dict - + @classmethod def remove_trailing_empty_elements(cls, list_with_empty): while list_with_empty and list_with_empty[-1] is None: @@ -64,76 +63,91 @@ def remove_trailing_empty_elements(cls, list_with_empty): @classmethod def convert_to_metrics_json(cls, data): - return json.dumps(data, separators=(',', ':')) - + return json.dumps(data, separators=(",", ":")) + @classmethod - def generate_response_signature_from_data(cls, data, secret_key, serial_number, timestamp=None, request_count=None): + def generate_response_signature_from_data( + cls, data, secret_key, serial_number, timestamp=None, request_count=None + ): payload = serial_number if timestamp: payload += str(timestamp) if request_count: payload += str(request_count) - if data.get('active_until_timestamp'): - payload += str(data.get('active_until_timestamp')) - if data.get('active_seconds_left'): - payload += str(data.get('active_seconds_left')) - if data.get('token_list'): - payload += cls.convert_to_metrics_json(data.get('token_list')) - if data.get('settings'): - payload += cls.convert_to_metrics_json(data.get('settings')) - if data.get('extra_data'): - payload += cls.convert_to_metrics_json(data.get('extra_data')) - return AuthMethod.DATA_AUTH+cls.generate_hash_string(payload, secret_key) - + if data.get("active_until_timestamp"): + payload += str(data.get("active_until_timestamp")) + if data.get("active_seconds_left"): + payload += str(data.get("active_seconds_left")) + if data.get("token_list"): + payload += cls.convert_to_metrics_json(data.get("token_list")) + if data.get("settings"): + payload += cls.convert_to_metrics_json(data.get("settings")) + if data.get("extra_data"): + payload += cls.convert_to_metrics_json(data.get("extra_data")) + return AuthMethod.DATA_AUTH + cls.generate_hash_string(payload, secret_key) + @classmethod def generate_request_signature_from_data(cls, data, auth_method, secret_key): if auth_method == AuthMethod.SIMPLE_AUTH: - signature = cls.generate_hash_string(data.get('serial_number'), secret_key) + signature = cls.generate_hash_string(data.get("serial_number"), secret_key) elif auth_method == AuthMethod.TIMESTAMP_AUTH: - if not data.get('timestamp', None): - raise ValueError('Timestamp is required for Timestamp Auth') - signature = cls.generate_hash_string(data.get('serial_number') + str(data.get('timestamp')), secret_key) + if not data.get("timestamp", None): + raise ValueError("Timestamp is required for Timestamp Auth") + signature = cls.generate_hash_string( + data.get("serial_number") + str(data.get("timestamp")), secret_key + ) elif auth_method == AuthMethod.COUNTER_AUTH: - if not data.get('request_count', None): - raise ValueError('Request Count is required for Counter Auth') - signature = cls.generate_hash_string(data.get('serial_number') + str(data.get('request_count')), secret_key) + if not data.get("request_count", None): + raise ValueError("Request Count is required for Counter Auth") + signature = cls.generate_hash_string( + data.get("serial_number") + str(data.get("request_count")), secret_key + ) elif auth_method == AuthMethod.DATA_AUTH: - payload = data.get('serial_number') - if data.get('timestamp'): - payload += str(data.get('timestamp')) - if data.get('request_count'): - payload += str(data.get('request_count')) - if data.get('data'): - payload += cls.convert_to_metrics_json(data.get('data', [])) - if data.get('historical_data'): - payload += cls.convert_to_metrics_json(data.get('historical_data', [])) + payload = data.get("serial_number") + if data.get("timestamp"): + payload += str(data.get("timestamp")) + if data.get("request_count"): + payload += str(data.get("request_count")) + if data.get("data"): + payload += cls.convert_to_metrics_json(data.get("data", [])) + if data.get("historical_data"): + payload += cls.convert_to_metrics_json(data.get("historical_data", [])) signature = cls.generate_hash_string(payload, secret_key) elif auth_method == AuthMethod.RECURSIVE_DATA_AUTH: - payload = data.get('serial_number') + payload = data.get("serial_number") payload = cls.generate_hash_string(payload, secret_key) - if data.get('timestamp'): - payload = cls.generate_hash_string(payload+str(data.get('timestamp')), secret_key) - if data.get('request_count'): - payload = cls.generate_hash_string(payload+str(data.get('request_count')), secret_key) - payload = cls.generate_hash_string(payload+cls.convert_to_metrics_json(data.get('data', [])), secret_key) - for time_step_data in data.get('historical_data', []): - payload = cls.generate_hash_string(payload+cls.convert_to_metrics_json(time_step_data), secret_key) + if data.get("timestamp"): + payload = cls.generate_hash_string( + payload + str(data.get("timestamp")), secret_key + ) + if data.get("request_count"): + payload = cls.generate_hash_string( + payload + str(data.get("request_count")), secret_key + ) + payload = cls.generate_hash_string( + payload + cls.convert_to_metrics_json(data.get("data", [])), secret_key + ) + for time_step_data in data.get("historical_data", []): + payload = cls.generate_hash_string( + payload + cls.convert_to_metrics_json(time_step_data), secret_key + ) signature = payload else: - raise ValueError('Invalid Authentication Method') - return auth_method+signature - + raise ValueError("Invalid Authentication Method") + return auth_method + signature @classmethod def generate_hash_string(cls, input_string, secret_key): key = cls.load_secret_key_from_hex(secret_key) - hash = siphash.SipHash_2_4(key, codecs.encode(input_string, 'utf-8')).hash() - hash_string = '{:x}'.format(hash) + hash = siphash.SipHash_2_4(key, codecs.encode(input_string, "utf-8")).hash() + hash_string = "{:x}".format(hash) return hash_string @classmethod def load_secret_key_from_hex(cls, secret_key): try: - return codecs.decode(secret_key, 'hex') + return codecs.decode(secret_key, "hex") except Exception as e: - raise ValueError('The secret key provided is not correctly formatted, it should be 32 hexadecimal characters. ') + raise ValueError( + "The secret key provided is not correctly formatted, it should be 32 hexadecimal characters. " + ) diff --git a/openpaygo/token_decode.py b/openpaygo/token_decode.py index 506196f..b518012 100644 --- a/openpaygo/token_decode.py +++ b/openpaygo/token_decode.py @@ -5,10 +5,19 @@ class OpenPAYGOTokenDecoder(object): MAX_TOKEN_JUMP = 64 MAX_TOKEN_JUMP_COUNTER_SYNC = 100 - MAX_UNUSED_OLDER_TOKENS = 8*2 + MAX_UNUSED_OLDER_TOKENS = 8 * 2 @classmethod - def decode_token(cls, token, secret_key, count, used_counts=None, starting_code=None, value_divider=1, restricted_digit_set=False): + def decode_token( + cls, + token, + secret_key, + count, + used_counts=None, + starting_code=None, + value_divider=1, + restricted_digit_set=False, + ): secret_key = OpenPAYGOTokenShared.load_secret_key_from_hex(secret_key) if not starting_code: # We generate the starting code from the key if not provided @@ -29,23 +38,62 @@ def decode_token(cls, token, secret_key, count, used_counts=None, starting_code= raise ValueError("Token is too long") token = int(token) if not extended_token: - value, token_type, count, updated_counts = cls.get_activation_value_count_and_type_from_token(token, starting_code, secret_key, count, restricted_digit_set, used_counts) + ( + value, + token_type, + count, + updated_counts, + ) = cls.get_activation_value_count_and_type_from_token( + token, + starting_code, + secret_key, + count, + restricted_digit_set, + used_counts, + ) else: - value, token_type, count, updated_counts = cls.get_activation_value_count_from_extended_token(token, starting_code, secret_key, count, restricted_digit_set, used_counts) + ( + value, + token_type, + count, + updated_counts, + ) = cls.get_activation_value_count_from_extended_token( + token, + starting_code, + secret_key, + count, + restricted_digit_set, + used_counts, + ) if value and value_divider: value = value / value_divider return value, token_type, count, updated_counts @classmethod - def get_activation_value_count_and_type_from_token(cls, token, starting_code, key, last_count, - restricted_digit_set=False, used_counts=None): + def get_activation_value_count_and_type_from_token( + cls, + token, + starting_code, + key, + last_count, + restricted_digit_set=False, + used_counts=None, + ): if restricted_digit_set: token = OpenPAYGOTokenShared.convert_from_4_digit_token(token) valid_older_token = False - token_base = OpenPAYGOTokenShared.get_token_base(token) # We get the base of the token - current_code = OpenPAYGOTokenShared.put_base_in_token(starting_code, token_base) # We put it into the starting code - starting_code_base = OpenPAYGOTokenShared.get_token_base(starting_code) # We get the base of the starting code - value = cls._decode_base(starting_code_base, token_base) # If there is a match we get the value from the token + token_base = OpenPAYGOTokenShared.get_token_base( + token + ) # We get the base of the token + current_code = OpenPAYGOTokenShared.put_base_in_token( + starting_code, token_base + ) # We put it into the starting code + starting_code_base = OpenPAYGOTokenShared.get_token_base( + starting_code + ) # We get the base of the starting code + value = cls._decode_base( + starting_code_base, token_base + ) # If there is a match we get the value from the token # We try all combination up until last_count + TOKEN_JUMP, or to the larger jump if syncing counter # We could start directly the loop at the last count if we kept the token value for the last count if value == OpenPAYGOTokenShared.COUNTER_SYNC_VALUE: @@ -53,7 +101,9 @@ def get_activation_value_count_and_type_from_token(cls, token, starting_code, ke else: max_count_try = last_count + cls.MAX_TOKEN_JUMP + 1 for count in range(0, max_count_try): - masked_token = OpenPAYGOTokenShared.put_base_in_token(current_code, token_base) + masked_token = OpenPAYGOTokenShared.put_base_in_token( + current_code, token_base + ) if count % 2: if value == OpenPAYGOTokenShared.COUNTER_SYNC_VALUE: this_type = TokenType.COUNTER_SYNC @@ -64,12 +114,18 @@ def get_activation_value_count_and_type_from_token(cls, token, starting_code, ke else: this_type = TokenType.ADD_TIME if masked_token == token: - if cls._count_is_valid(count, last_count, value, this_type, used_counts): - updated_counts = cls.update_used_counts(used_counts, value, count, this_type) + if cls._count_is_valid( + count, last_count, value, this_type, used_counts + ): + updated_counts = cls.update_used_counts( + used_counts, value, count, this_type + ) return value, this_type, count, updated_counts else: valid_older_token = True - current_code = OpenPAYGOTokenShared.generate_next_token(current_code, key) # If not we go to the next token + current_code = OpenPAYGOTokenShared.generate_next_token( + current_code, key + ) # If not we go to the next token if valid_older_token: return None, TokenType.ALREADY_USED, None, None return None, TokenType.INVALID, None, None @@ -94,15 +150,19 @@ def update_used_counts(cls, past_used_counts, value, new_count, type): highest_count = max(past_used_counts) if past_used_counts else 0 if new_count > highest_count: highest_count = new_count - bottom_range = highest_count-cls.MAX_UNUSED_OLDER_TOKENS + bottom_range = highest_count - cls.MAX_UNUSED_OLDER_TOKENS used_counts = [] - if type != TokenType.ADD_TIME or value == OpenPAYGOTokenShared.COUNTER_SYNC_VALUE or value == OpenPAYGOTokenShared.PAYG_DISABLE_VALUE: + if ( + type != TokenType.ADD_TIME + or value == OpenPAYGOTokenShared.COUNTER_SYNC_VALUE + or value == OpenPAYGOTokenShared.PAYG_DISABLE_VALUE + ): # If it is not an Add-Time token, we mark all the past tokens as used in the range - for count in range(bottom_range, highest_count+1): + for count in range(bottom_range, highest_count + 1): used_counts.append(count) else: # If it is an Add-Time token, we just mark the tokens actually used in the range - for count in range(bottom_range, highest_count+1): + for count in range(bottom_range, highest_count + 1): if count == new_count or count in past_used_counts: used_counts.append(count) return used_counts @@ -116,28 +176,51 @@ def _decode_base(cls, starting_code_base, token_base): return decoded_value @classmethod - def get_activation_value_count_from_extended_token(cls, token, starting_code, key, last_count, - restricted_digit_set=False, used_counts=None): + def get_activation_value_count_from_extended_token( + cls, + token, + starting_code, + key, + last_count, + restricted_digit_set=False, + used_counts=None, + ): if restricted_digit_set: token = OpenPAYGOTokenSharedExtended.convert_from_4_digit_token(token) - token_base = OpenPAYGOTokenSharedExtended.get_token_base(token) # We get the base of the token - current_code = OpenPAYGOTokenSharedExtended.put_base_in_token(starting_code, token_base) # We put it into the starting code - starting_code_base = OpenPAYGOTokenSharedExtended.get_token_base(starting_code) # We get the base of the starting code - value = cls._decode_base_extended(starting_code_base, token_base) # If there is a match we get the value from the token + token_base = OpenPAYGOTokenSharedExtended.get_token_base( + token + ) # We get the base of the token + current_code = OpenPAYGOTokenSharedExtended.put_base_in_token( + starting_code, token_base + ) # We put it into the starting code + starting_code_base = OpenPAYGOTokenSharedExtended.get_token_base( + starting_code + ) # We get the base of the starting code + value = cls._decode_base_extended( + starting_code_base, token_base + ) # If there is a match we get the value from the token max_count_try = last_count + cls.MAX_TOKEN_JUMP + 1 for count in range(0, max_count_try): - masked_token = OpenPAYGOTokenSharedExtended.put_base_in_token(current_code, token_base) + masked_token = OpenPAYGOTokenSharedExtended.put_base_in_token( + current_code, token_base + ) if count % 2: this_type = TokenType.SET_TIME else: this_type = TokenType.ADD_TIME if masked_token == token: - if cls._count_is_valid(count, last_count, value, this_type, used_counts): - updated_counts = cls.update_used_counts(used_counts, value, count, this_type) + if cls._count_is_valid( + count, last_count, value, this_type, used_counts + ): + updated_counts = cls.update_used_counts( + used_counts, value, count, this_type + ) return value, this_type, count, updated_counts else: valid_older_token = True - current_code = OpenPAYGOTokenSharedExtended.generate_next_token(current_code, key) # If not we go to the next token + current_code = OpenPAYGOTokenSharedExtended.generate_next_token( + current_code, key + ) # If not we go to the next token if valid_older_token: return None, TokenType.ALREADY_USED, None, None return None, TokenType.INVALID, None, None diff --git a/openpaygo/token_encode.py b/openpaygo/token_encode.py index de5a0c9..448e2d2 100644 --- a/openpaygo/token_encode.py +++ b/openpaygo/token_encode.py @@ -3,9 +3,18 @@ class OpenPAYGOTokenEncoder(object): - @classmethod - def generate_token(cls, secret_key, count, value=None, token_type=TokenType.ADD_TIME, starting_code=None, value_divider=1, restricted_digit_set=False, extended_token=False): + def generate_token( + cls, + secret_key, + count, + value=None, + token_type=TokenType.ADD_TIME, + starting_code=None, + value_divider=1, + restricted_digit_set=False, + extended_token=False, + ): secret_key = OpenPAYGOTokenShared.load_secret_key_from_hex(secret_key) if not starting_code: # We generate the starting code from the key if not provided @@ -17,37 +26,60 @@ def generate_token(cls, secret_key, count, value=None, token_type=TokenType.ADD_ else: max_value = OpenPAYGOTokenSharedExtended.MAX_ACTIVATION_VALUE if value > max_value: - raise ValueError('The value provided is too high.') + raise ValueError("The value provided is too high.") elif value: - raise ValueError('A value is not allowed for this token type.') + raise ValueError("A value is not allowed for this token type.") else: if token_type == TokenType.DISABLE_PAYG: value = OpenPAYGOTokenShared.PAYG_DISABLE_VALUE elif token_type == TokenType.COUNTER_SYNC: value = OpenPAYGOTokenShared.COUNTER_SYNC_VALUE else: - raise ValueError('The token type provided is not supported.') + raise ValueError("The token type provided is not supported.") if extended_token: - return cls.generate_extended_token(starting_code, secret_key, value, count, token_type, restricted_digit_set) + return cls.generate_extended_token( + starting_code, + secret_key, + value, + count, + token_type, + restricted_digit_set, + ) else: - return cls.generate_standard_token(starting_code, secret_key, value, count, token_type, restricted_digit_set) + return cls.generate_standard_token( + starting_code, + secret_key, + value, + count, + token_type, + restricted_digit_set, + ) @classmethod - def generate_standard_token(cls, starting_code=None, key=None, value=None, count=None, - mode=TokenType.ADD_TIME, restricted_digit_set=False): + def generate_standard_token( + cls, + starting_code=None, + key=None, + value=None, + count=None, + mode=TokenType.ADD_TIME, + restricted_digit_set=False, + ): # We get the first 3 digits with encoded value starting_code_base = OpenPAYGOTokenShared.get_token_base(starting_code) token_base = cls._encode_base(starting_code_base, value) - current_token = OpenPAYGOTokenShared.put_base_in_token(starting_code, token_base) + current_token = OpenPAYGOTokenShared.put_base_in_token( + starting_code, token_base + ) new_count = cls._get_new_count(count, mode) for xn in range(0, new_count): current_token = OpenPAYGOTokenShared.generate_next_token(current_token, key) final_token = OpenPAYGOTokenShared.put_base_in_token(current_token, token_base) if restricted_digit_set: final_token = OpenPAYGOTokenShared.convert_to_4_digit_token(final_token) - final_token = '{:015d}'.format(final_token) + final_token = "{:015d}".format(final_token) else: - final_token = '{:09d}'.format(final_token) + final_token = "{:09d}".format(final_token) return new_count, final_token @classmethod @@ -58,19 +90,35 @@ def _encode_base(cls, base, number): return number + base @classmethod - def generate_extended_token(cls, starting_code, key, value, count, mode=TokenType.ADD_TIME, restricted_digit_set=False): + def generate_extended_token( + cls, + starting_code, + key, + value, + count, + mode=TokenType.ADD_TIME, + restricted_digit_set=False, + ): starting_code_base = OpenPAYGOTokenSharedExtended.get_token_base(starting_code) token_base = cls._encode_base_extended(starting_code_base, value) - current_token = OpenPAYGOTokenSharedExtended.put_base_in_token(starting_code, token_base) + current_token = OpenPAYGOTokenSharedExtended.put_base_in_token( + starting_code, token_base + ) new_count = cls._get_new_count(count, mode) for xn in range(0, new_count): - current_token = OpenPAYGOTokenSharedExtended.generate_next_token(current_token, key) - final_token = OpenPAYGOTokenSharedExtended.put_base_in_token(current_token, token_base) + current_token = OpenPAYGOTokenSharedExtended.generate_next_token( + current_token, key + ) + final_token = OpenPAYGOTokenSharedExtended.put_base_in_token( + current_token, token_base + ) if restricted_digit_set: - final_token = OpenPAYGOTokenSharedExtended.convert_to_4_digit_token(final_token) - final_token = '{:020d}'.format(final_token) + final_token = OpenPAYGOTokenSharedExtended.convert_to_4_digit_token( + final_token + ) + final_token = "{:020d}".format(final_token) else: - final_token = '{:012d}'.format(final_token) + final_token = "{:012d}".format(final_token) return new_count, final_token @classmethod @@ -79,18 +127,20 @@ def _encode_base_extended(cls, base, number): return number + base - 1000000 else: return number + base - + @classmethod def _get_new_count(cls, count, mode): current_count_odd = count % 2 if mode in [TokenType.SET_TIME, TokenType.DISABLE_PAYG, TokenType.COUNTER_SYNC]: - if current_count_odd: # Odd numbers are for Set Time, Disable PAYG or Counter Sync - new_count = count+2 + if ( + current_count_odd + ): # Odd numbers are for Set Time, Disable PAYG or Counter Sync + new_count = count + 2 else: - new_count = count+1 + new_count = count + 1 else: - if current_count_odd: # Even numbers are for Add Time - new_count = count+1 + if current_count_odd: # Even numbers are for Add Time + new_count = count + 1 else: - new_count = count+2 + new_count = count + 2 return new_count diff --git a/openpaygo/token_shared.py b/openpaygo/token_shared.py index 7fcabd6..5b8b32c 100644 --- a/openpaygo/token_shared.py +++ b/openpaygo/token_shared.py @@ -26,38 +26,46 @@ def get_token_base(cls, code): @classmethod def put_base_in_token(cls, token, token_base): if token_base > cls.MAX_BASE: - Exception('INVALID_VALUE') + Exception("INVALID_VALUE") return token - cls.get_token_base(token) + token_base @classmethod def generate_next_token(cls, last_code, key): - conformed_token = struct.pack('>L', last_code) # We convert the token to bytes - conformed_token += conformed_token # We duplicate it to fit the minimum length - token_hash = cls.generate_hash(key, conformed_token) # We hash it - new_token = cls.convert_hash_to_token(token_hash) # We convert to token and return + conformed_token = struct.pack(">L", last_code) # We convert the token to bytes + conformed_token += conformed_token # We duplicate it to fit the minimum length + token_hash = cls.generate_hash(key, conformed_token) # We hash it + new_token = cls.convert_hash_to_token( + token_hash + ) # We convert to token and return return new_token @classmethod def convert_hash_to_token(cls, this_hash): - hash_int = struct.pack('>Q', this_hash) # We convert the hash to bytes - hi_hash = struct.unpack('>L', hash_int[0:4])[0] # We split it in two 32bits INT - lo_hash = struct.unpack('>L', hash_int[4:8])[0] - result_hash = hi_hash ^ lo_hash # We XOR the two together to get a single 32bits INT - token = cls._convert_to_29_5_bits(result_hash) # We convert the 32bits value to an INT no greater than 9 digits + hash_int = struct.pack(">Q", this_hash) # We convert the hash to bytes + hi_hash = struct.unpack(">L", hash_int[0:4])[0] # We split it in two 32bits INT + lo_hash = struct.unpack(">L", hash_int[4:8])[0] + result_hash = ( + hi_hash ^ lo_hash + ) # We XOR the two together to get a single 32bits INT + token = cls._convert_to_29_5_bits( + result_hash + ) # We convert the 32bits value to an INT no greater than 9 digits return token - + @classmethod def generate_starting_code(cls, key): # We make a hash of the key starting_hash = OpenPAYGOTokenShared.generate_hash(key, key) return OpenPAYGOTokenShared.convert_hash_to_token(starting_hash) - + @classmethod def load_secret_key_from_hex(cls, secret_key): try: - return codecs.decode(secret_key, 'hex') + return codecs.decode(secret_key, "hex") except Exception as e: - raise ValueError('The secret key provided is not correctly formatted, it should be 32 hexadecimal characters. ') + raise ValueError( + "The secret key provided is not correctly formatted, it should be 32 hexadecimal characters. " + ) @classmethod def _convert_to_29_5_bits(cls, source): @@ -69,11 +77,11 @@ def _convert_to_29_5_bits(cls, source): @classmethod def convert_to_4_digit_token(cls, source): - restricted_digit_token = '' + restricted_digit_token = "" bit_array = cls._bit_array_from_int(source, 30) for i in range(15): - this_array = bit_array[i*2:(i*2)+2] - restricted_digit_token += str(cls._bit_array_to_int(this_array)+1) + this_array = bit_array[i * 2 : (i * 2) + 2] + restricted_digit_token += str(cls._bit_array_to_int(this_array) + 1) return int(restricted_digit_token) @classmethod @@ -84,7 +92,7 @@ def convert_from_4_digit_token(cls, source): this_array = cls._bit_array_from_int(digit, 2) bit_array += this_array return cls._bit_array_to_int(bit_array) - + @classmethod def generate_hash(cls, key, value): return siphash.SipHash_2_4(key, value).hash() diff --git a/openpaygo/token_shared_extended.py b/openpaygo/token_shared_extended.py index ac73b47..7c6bbdc 100644 --- a/openpaygo/token_shared_extended.py +++ b/openpaygo/token_shared_extended.py @@ -14,19 +14,23 @@ def get_token_base(cls, code): @classmethod def put_base_in_token(cls, token, token_base): if token_base > cls.MAX_BASE: - Exception('INVALID_VALUE') + Exception("INVALID_VALUE") return token - cls.get_token_base(token) + token_base @classmethod def generate_next_token(cls, last_code, key): - conformed_token = struct.pack('>Q', last_code) # We convert the token to bytes - token_hash = siphash.SipHash_2_4(key, conformed_token).hash() # We hash it - new_token = cls.convert_hash_to_token(token_hash) # We convert to token and return + conformed_token = struct.pack(">Q", last_code) # We convert the token to bytes + token_hash = siphash.SipHash_2_4(key, conformed_token).hash() # We hash it + new_token = cls.convert_hash_to_token( + token_hash + ) # We convert to token and return return new_token @classmethod def convert_hash_to_token(cls, this_hash): - token = cls._convert_to_40_bits(this_hash) # We convert the 64bits value to an INT no greater than 12 digits + token = cls._convert_to_40_bits( + this_hash + ) # We convert the 64bits value to an INT no greater than 12 digits return token @classmethod @@ -39,11 +43,11 @@ def _convert_to_40_bits(cls, source): @classmethod def convert_to_4_digit_token(cls, source): - restricted_digit_token = '' + restricted_digit_token = "" bit_array = cls._bit_array_from_int(source, 40) for i in range(20): - this_array = bit_array[i*2:(i*2)+2] - restricted_digit_token += str(cls._bit_array_to_int(this_array)+1) + this_array = bit_array[i * 2 : (i * 2) + 2] + restricted_digit_token += str(cls._bit_array_to_int(this_array) + 1) return int(restricted_digit_token) @classmethod From 8c63d8fe8a3aa2ddc254eae88d2ea6ea6c41de73 Mon Sep 17 00:00:00 2001 From: Daniel Mohns Date: Mon, 16 Oct 2023 12:13:28 +0200 Subject: [PATCH 2/4] Apply `ruff` --- openpaygo/__init__.py | 4 ---- openpaygo/metrics_request.py | 3 ++- openpaygo/metrics_response.py | 6 +++--- openpaygo/metrics_shared.py | 5 +++-- openpaygo/token_decode.py | 12 ++++++++---- openpaygo/token_shared.py | 5 +++-- pyproject.toml | 4 ++++ 7 files changed, 23 insertions(+), 16 deletions(-) create mode 100644 pyproject.toml diff --git a/openpaygo/__init__.py b/openpaygo/__init__.py index 1b083aa..bb4dc65 100644 --- a/openpaygo/__init__.py +++ b/openpaygo/__init__.py @@ -1,9 +1,5 @@ from .token_encode import OpenPAYGOTokenEncoder from .token_decode import OpenPAYGOTokenDecoder -from .token_shared import TokenType -from .metrics_request import MetricsRequestHandler -from .metrics_response import MetricsResponseHandler -from .metrics_shared import AuthMethod def generate_token(**kwargs): diff --git a/openpaygo/metrics_request.py b/openpaygo/metrics_request.py index 9232dbe..bc3c8e3 100644 --- a/openpaygo/metrics_request.py +++ b/openpaygo/metrics_request.py @@ -34,7 +34,8 @@ def set_historical_data(self, historical_data): for time_step in historical_data: if not time_step.get("timestamp"): raise ValueError( - "Historical Data objects must have a time stamp if no historical_data_interval is defined." + "Historical Data objects must have a time stamp if no " + "historical_data_interval is defined." ) self.historical_data = historical_data diff --git a/openpaygo/metrics_response.py b/openpaygo/metrics_response.py index fd8134d..6cbb33c 100644 --- a/openpaygo/metrics_response.py +++ b/openpaygo/metrics_response.py @@ -40,7 +40,7 @@ def get_data_format_id(self): return self.request_dict.get("data_format_id") def data_format_available(self): - return self.data_format != None + return self.data_format is not None def set_device_parameters( self, @@ -222,8 +222,8 @@ def _get_simple_historical_data(self): time_step = time_step[timse_step_len:] if len(time_step) > 0: raise ValueError( - "Additional variables not present in the historical data format: " - + str(time_step) + "Additional variables not present in the historical data format" + ": " + str(time_step) ) elif isinstance(time_step, dict): for key in time_step: diff --git a/openpaygo/metrics_shared.py b/openpaygo/metrics_shared.py index 4e6174c..3a8c533 100644 --- a/openpaygo/metrics_shared.py +++ b/openpaygo/metrics_shared.py @@ -147,7 +147,8 @@ def generate_hash_string(cls, input_string, secret_key): def load_secret_key_from_hex(cls, secret_key): try: return codecs.decode(secret_key, "hex") - except Exception as e: + except Exception: raise ValueError( - "The secret key provided is not correctly formatted, it should be 32 hexadecimal characters. " + "The secret key provided is not correctly formatted, it should be 32 " + "hexadecimal characters. " ) diff --git a/openpaygo/token_decode.py b/openpaygo/token_decode.py index b518012..5a389bb 100644 --- a/openpaygo/token_decode.py +++ b/openpaygo/token_decode.py @@ -94,8 +94,10 @@ def get_activation_value_count_and_type_from_token( value = cls._decode_base( starting_code_base, token_base ) # If there is a match we get the value from the token - # We try all combination up until last_count + TOKEN_JUMP, or to the larger jump if syncing counter - # We could start directly the loop at the last count if we kept the token value for the last count + # We try all combination up until last_count + TOKEN_JUMP, or to the larger jump + # if syncing counter. + # We could start directly the loop at the last count if we kept the token value + # for the last count if value == OpenPAYGOTokenShared.COUNTER_SYNC_VALUE: max_count_try = last_count + cls.MAX_TOKEN_JUMP_COUNTER_SYNC + 1 else: @@ -157,11 +159,13 @@ def update_used_counts(cls, past_used_counts, value, new_count, type): or value == OpenPAYGOTokenShared.COUNTER_SYNC_VALUE or value == OpenPAYGOTokenShared.PAYG_DISABLE_VALUE ): - # If it is not an Add-Time token, we mark all the past tokens as used in the range + # If it is not an Add-Time token, we mark all the past tokens as used in the + # range for count in range(bottom_range, highest_count + 1): used_counts.append(count) else: - # If it is an Add-Time token, we just mark the tokens actually used in the range + # If it is an Add-Time token, we just mark the tokens actually used in the + # range for count in range(bottom_range, highest_count + 1): if count == new_count or count in past_used_counts: used_counts.append(count) diff --git a/openpaygo/token_shared.py b/openpaygo/token_shared.py index 5b8b32c..ced4093 100644 --- a/openpaygo/token_shared.py +++ b/openpaygo/token_shared.py @@ -62,9 +62,10 @@ def generate_starting_code(cls, key): def load_secret_key_from_hex(cls, secret_key): try: return codecs.decode(secret_key, "hex") - except Exception as e: + except Exception: raise ValueError( - "The secret key provided is not correctly formatted, it should be 32 hexadecimal characters. " + "The secret key provided is not correctly formatted, it should be 32 " + "hexadecimal characters. " ) @classmethod diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..adf2597 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,4 @@ +[tool.ruff] + +# Same as Black. +line-length = 88 From 8fe661d7162ba3aba96799eff19d9e2330aabc32 Mon Sep 17 00:00:00 2001 From: Daniel Mohns Date: Tue, 24 Oct 2023 16:45:08 +0200 Subject: [PATCH 3/4] Add more configuration rules to `ruff` --- openpaygo/__init__.py | 2 +- openpaygo/metrics_request.py | 3 ++- openpaygo/metrics_response.py | 5 +++-- openpaygo/metrics_shared.py | 3 ++- openpaygo/token_shared.py | 5 +++-- openpaygo/token_shared_extended.py | 3 ++- pyproject.toml | 4 ++++ 7 files changed, 17 insertions(+), 8 deletions(-) diff --git a/openpaygo/__init__.py b/openpaygo/__init__.py index bb4dc65..00227c3 100644 --- a/openpaygo/__init__.py +++ b/openpaygo/__init__.py @@ -1,5 +1,5 @@ -from .token_encode import OpenPAYGOTokenEncoder from .token_decode import OpenPAYGOTokenDecoder +from .token_encode import OpenPAYGOTokenEncoder def generate_token(**kwargs): diff --git a/openpaygo/metrics_request.py b/openpaygo/metrics_request.py index bc3c8e3..72a05b8 100644 --- a/openpaygo/metrics_request.py +++ b/openpaygo/metrics_request.py @@ -1,6 +1,7 @@ -from .metrics_shared import OpenPAYGOMetricsShared import copy +from .metrics_shared import OpenPAYGOMetricsShared + class MetricsRequestHandler(object): def __init__( diff --git a/openpaygo/metrics_response.py b/openpaygo/metrics_response.py index 6cbb33c..2568d61 100644 --- a/openpaygo/metrics_response.py +++ b/openpaygo/metrics_response.py @@ -1,8 +1,9 @@ -import json -from .metrics_shared import OpenPAYGOMetricsShared import copy +import json from datetime import datetime, timedelta +from .metrics_shared import OpenPAYGOMetricsShared + class MetricsResponseHandler(object): def __init__( diff --git a/openpaygo/metrics_shared.py b/openpaygo/metrics_shared.py index 3a8c533..586ab2e 100644 --- a/openpaygo/metrics_shared.py +++ b/openpaygo/metrics_shared.py @@ -1,7 +1,8 @@ -import siphash import codecs import json +import siphash + class AuthMethod(object): SIMPLE_AUTH = "sa" diff --git a/openpaygo/token_shared.py b/openpaygo/token_shared.py index ced4093..2529b91 100644 --- a/openpaygo/token_shared.py +++ b/openpaygo/token_shared.py @@ -1,6 +1,7 @@ -import siphash -import struct import codecs +import struct + +import siphash class TokenType(object): diff --git a/openpaygo/token_shared_extended.py b/openpaygo/token_shared_extended.py index 7c6bbdc..73040ea 100644 --- a/openpaygo/token_shared_extended.py +++ b/openpaygo/token_shared_extended.py @@ -1,6 +1,7 @@ -import siphash import struct +import siphash + class OpenPAYGOTokenSharedExtended(object): MAX_BASE = 999999 diff --git a/pyproject.toml b/pyproject.toml index adf2597..862fc93 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,4 +1,8 @@ [tool.ruff] +# Add some rules to ruff's default: +# - isort: https://docs.astral.sh/ruff/rules/#isort-i +# - pep8-naming: https://docs.astral.sh/ruff/rules/#pep8-naming-n +select = ["E", "F", "I", "N"] # Same as Black. line-length = 88 From fee6a247736ba32bde3bc0438f0e188e2b6ec1b5 Mon Sep 17 00:00:00 2001 From: Daniel Mohns Date: Thu, 2 Nov 2023 16:36:54 +0100 Subject: [PATCH 4/4] Add back previously removed imports and add them to `__all__` --- openpaygo/__init__.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/openpaygo/__init__.py b/openpaygo/__init__.py index 00227c3..2ac6f91 100644 --- a/openpaygo/__init__.py +++ b/openpaygo/__init__.py @@ -1,5 +1,9 @@ +from .metrics_request import MetricsRequestHandler +from .metrics_response import MetricsResponseHandler +from .metrics_shared import AuthMethod from .token_decode import OpenPAYGOTokenDecoder from .token_encode import OpenPAYGOTokenEncoder +from .token_shared import TokenType def generate_token(**kwargs): @@ -8,3 +12,13 @@ def generate_token(**kwargs): def decode_token(**kwargs): return OpenPAYGOTokenDecoder.decode_token(**kwargs) + + +__all__ = [ + MetricsRequestHandler, + MetricsResponseHandler, + AuthMethod, + OpenPAYGOTokenDecoder, + OpenPAYGOTokenEncoder, + TokenType, +]