diff --git a/pii-detection/README.md b/pii-detection/README.md new file mode 100644 index 00000000..70c7580b --- /dev/null +++ b/pii-detection/README.md @@ -0,0 +1,75 @@ +# Run a PII Detection streaming app using Microsoft Presidio and Natural Language Processing + +## Intro + +This PII Detection demo application is using python, faust, Microsoft presidio and other libraries to automatically detect and redact PII information in Kafka messages. + +![](img/app-topology.png) + +A common topology is to go through a raw data topic that is accessible by priviledged consumers and at the same time use a PII detection application to send the data, after analysis and redaction, into an anonymized data topic. + +The app also sends alerts into a topic with the details of where data has been redacted. + +Unpriviledged consumers, that don't have enough rights to read the full data, can consume the anonymized messages. + +For reference: +- PII Detection slides : https://docs.google.com/presentation/d/1qgJ-_1_ifXjG9sgM__d7s5Z-LOfMqNDZwCrsjItI3xw/ +- Python tools slides: https://docs.google.com/presentation/d/1cdfVa2A5FQXwo5Oxi9FUEscQp2z78SAo5xfvjNNZhIg/ + +## Running PII Detection + +To send data to the source topics, we use a sample producer but any data source will work of course. +The PII detection app is a faust python app that uses libraries that we have developed using Microsoft Presidio and other tools to read from the source topics, inspect the messages and redact them if PII is detected. + +### Prepare the python environment + +With python 3.6 or later installed/available on your system: + +- Open a terminal and go to the tutorial folder +- run the `create_py_env.sh` script with `source`: `source ./create_py_env.sh` + +This will create a python virtual environment and load the required libraries in it. The environment is created in a folder named `venv` in the current directory. There is a variable that you can change in the script if you want to put it somewhere else. The environment can be safely deleted and recreated at will. + +### Connecting to a Confluent Cloud cluster + +Credentials to the CC cluster can be entered inside a `.ini` or `.env` file or passed as environment variables. Envvars take precedence. We have provided a `settings.ini` that you can supplement easily. The 1st 6 lines have to be provided (CC URL, key and secret + SR URL, key and secret). + +- Move to the `code` folder: `cd code` +- Open `settings.ini` +- Enter your CC credentials +- That's it! + +### Run the producer + +(You should already be in the `code` folder.) + +`python message_producer.py` + +This will produce fake data into a topic named `demo-raw` in your Confluent Cloud cluster. You should see messages coming into the topic and some messages will have PII in them. + +### Run the streaming app + +- Open a new terminal and go the tutorial folder. +- Move to the `code` folder: `cd code` +- Activate the environment: `source ../pii-detection-tutorial-venv/bin/activate` +- Ru the app: `python app.py worker -l info` + +As the app runs, you should see messages going to the `demo-anon` topic (with the PII redacted) and for each redaction, there will be an alert message in the `demo-alerts` topic. + +## Customization points and improving the detection rate + +The `settings.ini` file contains a few customization options: +- `LANGUAGES`: it's important to provide an accurate list of the languages used in the messages. It helps with the syntactic analysis of the sentences contained in the messages. +- `MODELS`: NLP models loaded are derived from the expected languages but different sizes are available for each language. If you can use the largest ones. See https://spacy.io/usage/models +- `ENTITY_TYPES`: they are the type of data that is detected (credit card, postal code, name, etc). You can provide a list to supplement or replace the default list. Entities are described here: https://microsoft.github.io/presidio/supported_entities/ +- `IGNORE_FIELD_TAGS`: you can provide here a list of field tags to be skipped. If the list is empty it will look at all the text fields. + +The settings file also shows declaring an external `yaml` file with extra recognizers. They can be used to improve recognition of specific entity types or words. + +Factors impacting the PII detection: +- The longer and the more formed the sentences are, the better will be the syntactic analysis done by the NLP models. From there, when a sentence is properly analyzed, differentiating words that could be part of PII or just be common words works better. +- Short malformed sentences such as chat exchanges will have a lower detection performance. Loading larger models in this situation will improve results. +- Language detection can be done automatically (`LANGUAGES` left empty or set to `auto`) but works better with more words (from the language). If the sentences are very short and few, it is better (when practical) to provide the list of possible languages. + + + diff --git a/pii-detection/code/__main__.py b/pii-detection/code/__main__.py new file mode 100644 index 00000000..4b9cc03b --- /dev/null +++ b/pii-detection/code/__main__.py @@ -0,0 +1,3 @@ +from app import main + +main() diff --git a/pii-detection/code/app.py b/pii-detection/code/app.py new file mode 100644 index 00000000..968bfea8 --- /dev/null +++ b/pii-detection/code/app.py @@ -0,0 +1,191 @@ +from asyncio import get_event_loop +import json +import logging +from typing import NamedTuple + +from aiokafka.helpers import create_ssl_context +from decouple import config, AutoConfig, RepositoryEnv, Csv +from faust import App, SASLCredentials +from schema_registry.client import SchemaRegistryClient + +from schema_registry.client.utils import AVRO_SCHEMA_TYPE, JSON_SCHEMA_TYPE +from schema_registry.serializers import AvroMessageSerializer, JsonMessageSerializer +from schema_registry.serializers.errors import SerializerError + +from pii.analyzer import PresidioAnalyzer, anonymizer +from pii.language_detection import LanguageDetector +from pii.record_processing import AsyncAvroProcessor, AsyncJsonProcessor +from pii.schema_explorer import SchemaExplorer, is_json +from pii.schema_registry_serder import decode_schema_id +from pii.settings_utils import list_or_string_to_list +from pii.spacy_model import init_models +from pii.stream_catalog import StreamCatalogTagQuery, StreamCatalogError + + +class Auth(NamedTuple): + username: str + password: str + + +env_file = config('PII_ENV_FILE', default='') +if env_file: + AutoConfig.SUPPORTED = {env_file: RepositoryEnv} + config = AutoConfig() + +auth_env_var = 'USE_AUTH' +use_authorisation = config(auth_env_var, default=True, cast=bool) + +faust_creds = SASLCredentials(username=config('CLUSTER_API_KEY'), password=config('CLUSTER_API_SECRET'), + ssl_context=create_ssl_context()) if use_authorisation else None +num_topic_partitions = config('TOPIC_PARTITIONS', default=1, cast=int) + +offset = config('OFFSET', default='earliest') +offset_reset_policy = offset if not offset.isdigit() else None + +app = App('pii-detector', + broker=config('KAFKA_BROKER_URL'), + broker_credentials=faust_creds, + topic_replication_factor=config('TOPIC_REPLICATION_FACTOR'), + topic_partitions=num_topic_partitions, + consumer_auto_offset_reset=offset_reset_policy) + +sr_auth = Auth(config('SR_API_KEY'), config('SR_API_SECRET')) if use_authorisation else None +sr_url_env_var = 'SCHEMA_REGISTRY_URL' +sr_url = config(sr_url_env_var) +if use_authorisation: + assert sr_url.startswith('https://'), \ + f'Authorisation requires use of https protocol. Set {auth_env_var} to False or change {sr_url_env_var}' +sr_client = SchemaRegistryClient(url=sr_url, auth=sr_auth) + +field_tags = config('IGNORE_FIELD_TAGS', cast=Csv(), default='') +catalog_tag_query = StreamCatalogTagQuery(sr_client.url_manager.base_url, sr_auth, field_tags) if field_tags else None + +in_topic = app.topic(config('IN_TOPIC'), internal=True, value_serializer='raw', key_serializer='raw') +anon_topic = app.topic(config('OUT_TOPIC')) +entity_alert_topic = app.topic(config('ALERT_TOPIC')) + +# init models + analyzer + +pii_detection_config = init_models(config("MODELS")) +analyzer = PresidioAnalyzer(pii_detection_config, + entity_types=config('ENTITY_TYPES', cast=Csv(), default=''), + extend_default_entity_types=config('EXTEND_DEFAULT_ENTITIES', default=True), + custom_recognizers_yaml_path=config('CUSTOM_RECOGNIZER_YAML', default=None)) + +# Language detection init (warning: setting it to "auto" costs 800 MB in memory) +# empty or "auto" = default --> all supported languages are pre-loaded and lang is detected on each string +# or a list of language codes separated by commas +# if only language code is provided, no language detection happens +lang_settings = config("LANGUAGES", "auto") +if lang_settings == "auto": + lang_codes = [] +else: + lang_codes = list_or_string_to_list(lang_settings) if lang_settings else [] + +if len(lang_codes) == 1: + lang_code = lang_codes[0] + lang_detector = None +else: + lang_code = None + lang_detector = LanguageDetector(lang_codes) + + +class PartitionOffsetter: + def __init__(self, index: int, offset: int): + self._index = index + self._offset = offset + + async def set_offset(self): + from faust.types import TP + await app.consumer.seek(TP(in_topic.get_topic_name(), self._index), self._offset) + logging.info(f'Moved partition {self._index} offset to: {self._offset}') + + +if offset.isdigit(): + offset = int(offset) + for partition in range(num_topic_partitions): + app.task()(PartitionOffsetter(partition, offset).set_offset) + + +async def anonymize(raw_text: str) -> str: + language = lang_code if lang_code else lang_detector.detect_lang(raw_text) + + analysis_results = analyzer.analyze(raw_text, language=language) + print(f'Analysis results: {analysis_results}') + anonymized_result = anonymizer.anonymize(text=raw_text, analyzer_results=analysis_results) + print(f'Anonymised Text: {anonymized_result.text}') + + for (detected_entity, anonymized_entity) in zip(analysis_results, anonymized_result.items): + alert = dict( + entity_type=anonymized_entity.entity_type, + operation=str(anonymized_entity), + text=raw_text, + confidence_score=detected_entity.score, + language=language, + topic=in_topic.get_topic_name(), + + + ) + await entity_alert_topic.send(value=alert) + return anonymized_result.text + + +async def anonymize_record(record): + schema_explorer = SchemaExplorer(record) + + for field_name, field_value in schema_explorer.all_text_fields(): + anonymized_value = anonymize(field_value) + res = schema_explorer.update_field_from_dotted(field_name, anonymized_value) + if not res: + logging.error(f"{field_name} was not found in the record. The field will not be anonymized.") + return record + + +@app.agent(in_topic, sink=[anon_topic]) +async def pii_detection(stream): + async for message_bytes in stream: + try: + in_schema_id = decode_schema_id(message_bytes) + except SerializerError: + in_schema_id = None + + if in_schema_id: + tagged_field_names = {} + if catalog_tag_query: + try: + tagged_field_names = catalog_tag_query(in_schema_id) + except StreamCatalogError: + logging.exception(f'Contacting Stream Catalog failed, cannot retrieve fields with tags {field_tags}' + 'Defaulting to no ignored fields') + out_message_subject = f'{anon_topic.get_topic_name()}-value' + in_schema = sr_client.get_by_id(in_schema_id) + if in_schema.schema_type == AVRO_SCHEMA_TYPE: + serdes = AvroMessageSerializer(sr_client) + format_anonymizer = AsyncAvroProcessor(anonymize) + elif in_schema.schema_type == JSON_SCHEMA_TYPE: + serdes = JsonMessageSerializer(sr_client) + format_anonymizer = AsyncJsonProcessor(anonymize) + else: + raise ValueError(f'Schema type {in_schema.schema_type} is not supported') + in_message = serdes.decode_message(message_bytes) + out_message = await format_anonymizer.process_record(in_schema.schema, in_message, tagged_field_names) + encoded_out_message = serdes.encode_record_with_schema(out_message_subject, in_schema, out_message) + else: + msg_str = message_bytes.decode("utf-8") + if is_json(msg_str): + json_obj = json.loads(msg_str) + encoded_out_message = await anonymize_record(json_obj) + else: + encoded_out_message = await anonymize(msg_str) + yield encoded_out_message + + +def main(): + event_loop = get_event_loop() + for topic in (in_topic, anon_topic, entity_alert_topic): + event_loop.run_until_complete(topic.maybe_declare()) + app.main() + + +if __name__ == '__main__': + main() diff --git a/pii-detection/code/context.py b/pii-detection/code/context.py new file mode 100644 index 00000000..a4b74f33 --- /dev/null +++ b/pii-detection/code/context.py @@ -0,0 +1,12 @@ +from dataclasses import dataclass + +from pii.analyzer import PresidioAnalyzer +from pii.language_detection import LanguageDetector + + +@dataclass +class PiiSmtContext: + lang_detector: LanguageDetector = None + lang_code: str = "auto" + analyzer: PresidioAnalyzer = None + settings: dict = None diff --git a/pii-detection/code/message_producer.py b/pii-detection/code/message_producer.py new file mode 100644 index 00000000..e6b948ca --- /dev/null +++ b/pii-detection/code/message_producer.py @@ -0,0 +1,70 @@ +import random +from asyncio import get_event_loop +from dataclasses import dataclass +from typing import NamedTuple + +from aiokafka.helpers import create_ssl_context +from dataclasses_avroschema import AvroModel +from decouple import config, AutoConfig, RepositoryEnv +from faust import App, SASLCredentials +from presidio_evaluator.data_generator import PresidioSentenceFaker +from schema_registry.client import SchemaRegistryClient +from schema_registry.client.schema import AvroSchema +from schema_registry.serializers import AvroMessageSerializer + + +class Auth(NamedTuple): + username: str + password: str + + +@dataclass +class RawData(AvroModel): + event_id: int + log: str + + +faker = PresidioSentenceFaker(locale='en', lower_case_ratio=0.05) +fake_records = faker.generate_new_fake_sentences(num_samples=1000) + +env_file = config('PII_ENV_FILE', default='') +if env_file: + AutoConfig.SUPPORTED = {env_file: RepositoryEnv} + config = AutoConfig() + +auth_env_var = 'USE_AUTH' +use_authorisation = config(auth_env_var, default=True, cast=bool) + +faust_creds = SASLCredentials(username=config('CLUSTER_API_KEY'), password=config('CLUSTER_API_SECRET'), + ssl_context=create_ssl_context()) if use_authorisation else None +app = App('message-producer', + broker=config('KAFKA_BROKER_URL'), + broker_credentials=faust_creds, + topic_replication_factor=config('TOPIC_REPLICATION_FACTOR'), + topic_partitions=1) + +sr_auth = Auth(config('SR_API_KEY'), config('SR_API_SECRET')) if use_authorisation else None +sr_client = SchemaRegistryClient(url=config('SCHEMA_REGISTRY_URL'), auth=sr_auth) +avro_serdes = AvroMessageSerializer(sr_client) + +topic_name = config('IN_TOPIC') +raw_data_topic = app.topic(topic_name, internal=True) +event_loop = get_event_loop() +event_loop.run_until_complete(raw_data_topic.maybe_declare()) + + +@app.timer(1.0) +async def produce_raw_data(): + fake_record = random.choice(fake_records) + snippet = fake_record.fake + data = RawData(random.randint(0, 1000), snippet).asdict() + schema = AvroSchema(RawData.avro_schema()) + data = avro_serdes.encode_record_with_schema(f'{topic_name}-value', schema, data) + await raw_data_topic.send(value=data) + + +if __name__ == '__main__': + import sys + + sys.argv = [sys.argv[0], 'worker', '-l', 'info', '--web-port', '7001'] + app.main() diff --git a/pii-detection/code/pii/__init__.py b/pii-detection/code/pii/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/pii-detection/code/pii/analyzer.py b/pii-detection/code/pii/analyzer.py new file mode 100644 index 00000000..68905a5b --- /dev/null +++ b/pii-detection/code/pii/analyzer.py @@ -0,0 +1,76 @@ +import logging +from pathlib import Path +from typing import List, Union + +import yaml +from presidio_analyzer import AnalyzerEngine, RecognizerRegistry +from presidio_analyzer import PatternRecognizer +from presidio_analyzer.nlp_engine import NlpEngineProvider +from presidio_anonymizer import AnonymizerEngine +from spacy.util import fix_random_seed as spacy_fix_random_seed + + +class PresidioAnalyzer: + def __init__(self, nlp_configuration, + entity_types: List, + extend_default_entity_types: bool = True, + custom_recognizers_yaml_path = None): + + random_seed = 456 + spacy_fix_random_seed(random_seed) + + default_entity_types = [ + 'CREDIT_CARD', + 'EMAIL_ADDRESS', + 'IP_ADDRESS', + 'NRP', + 'LOCATION', + 'PERSON', + 'PHONE_NUMBER', + ] + if not entity_types: + entity_types = default_entity_types.copy() + elif extend_default_entity_types: + entity_types.extend(default_entity_types) + + self.entities = entity_types + + self.supported_languages = [m["lang_code"] for m in nlp_configuration["models"]] + + self.nlp_engine = NlpEngineProvider(nlp_configuration=nlp_configuration).create_engine() + + self.registry = RecognizerRegistry() + self.registry.load_predefined_recognizers(nlp_engine=self.nlp_engine, languages=self.supported_languages) + + if custom_recognizers_yaml_path: + custom_recognizers = self.load_custom_recognizers_from_yaml(custom_recognizers_yaml_path) + + for recognizer in custom_recognizers: + if recognizer.supported_language in self.supported_languages: + self.registry.add_recognizer(recognizer) + self.entities.extend(recognizer.supported_entities) + else: + logging.warning(f'Recognizer {recognizer.name} will not be used as it is built for language ' + f'{recognizer.supported_language}, while this app is supporting languages: ' + f'{self.supported_languages}') + + self.analyzer = AnalyzerEngine(registry=self.registry, + nlp_engine=self.nlp_engine, + supported_languages=self.supported_languages) + + def analyze(self, raw_text, language): + print(f"analysing in {language}: {raw_text}") + return self.analyzer.analyze(raw_text, entities=self.entities, language=language) + + @staticmethod + def load_custom_recognizers_from_yaml(recognizers_yaml_path: Union[str, Path]): + if isinstance(recognizers_yaml_path, str): + recognizers_yaml_path = Path(recognizers_yaml_path) + assert recognizers_yaml_path.exists(), 'Path to YAML file containing custom recognizers does not exist: ' \ + f'{recognizers_yaml_path}' + with recognizers_yaml_path.open() as yaml_file: + serialized_recognizers = yaml.load(yaml_file, Loader=yaml.FullLoader)["recognizers"] + return [PatternRecognizer.from_dict(serialised_recognizer) for serialised_recognizer in serialized_recognizers] + + +anonymizer = AnonymizerEngine() diff --git a/pii-detection/code/pii/auth_utils.py b/pii-detection/code/pii/auth_utils.py new file mode 100644 index 00000000..7fb48741 --- /dev/null +++ b/pii-detection/code/pii/auth_utils.py @@ -0,0 +1,6 @@ +from typing import NamedTuple + + +class Auth(NamedTuple): + username: str + password: str diff --git a/pii-detection/code/pii/dotenv.example b/pii-detection/code/pii/dotenv.example new file mode 100644 index 00000000..83a761d5 --- /dev/null +++ b/pii-detection/code/pii/dotenv.example @@ -0,0 +1,32 @@ +USE_AUTH=True + +KAFKA_BROKER_URL= +CLUSTER_API_KEY= +CLUSTER_API_SECRET= + +#TOPIC_PARTITIONS +#OFFSET +TOPIC_REPLICATION_FACTOR=3 + +SCHEMA_REGISTRY_URL= +SR_API_KEY= +SR_API_SECRET= + +#IGNORE_FIELD_TAGS + +IN_TOPIC= +OUT_TOPIC= +ALERT_TOPIC= + +CUSTOM_RECOGNIZER_YAML=./reco.yaml + +LANGUAGES=en, fr +MODELS=en_core_web_lg, fr_core_news_lg + +# list of entity types: see https://microsoft.github.io/presidio/supported_entities/ +#ENTITY_TYPES +# if providing entity types, set to true if they're extending the default +# and false if they're replacing them (this option defaults to True, meaning that if omitted the entity types +# provided will supplement the default set). +#EXTEND_DEFAULT_ENTITIES + diff --git a/pii-detection/code/pii/language_detection.py b/pii-detection/code/pii/language_detection.py new file mode 100644 index 00000000..86a8e0a6 --- /dev/null +++ b/pii-detection/code/pii/language_detection.py @@ -0,0 +1,32 @@ +from typing import List + +from lingua import Language, LanguageDetectorBuilder, IsoCode639_1 + + +class LanguageDetector: + DEFAULT_LANG_CODE = 'en' + def __init__(self, lang_codes: List[str]): + self.languages = [self.iso639_1_to_language(lang_code) for lang_code in lang_codes] + if len(self.languages) == 0: + self.detector = LanguageDetectorBuilder\ + .from_all_languages_without(Language.ESPERANTO, Language.LATIN)\ + .with_preloaded_language_models().build() + else: + self.detector = LanguageDetectorBuilder.from_languages(*self.languages).build() + + @staticmethod + def iso639_1_to_language(code: str) -> Language: + iso = IsoCode639_1[code.upper()] + return Language.from_iso_code_639_1(iso) + + @staticmethod + def language_to_iso639_1(lang: Language) -> str: + iso = lang.iso_code_639_1 + return iso.name.lower() + + + def detect_lang(self, text: str): + lang = self.detector.detect_language_of(text) + if not lang: + return self.DEFAULT_LANG_CODE + return self.language_to_iso639_1(lang) diff --git a/pii-detection/code/pii/record_processing.py b/pii-detection/code/pii/record_processing.py new file mode 100644 index 00000000..5af701c5 --- /dev/null +++ b/pii-detection/code/pii/record_processing.py @@ -0,0 +1,159 @@ +from abc import ABC as ABSTRACT_BASE_CLASS, abstractmethod +from copy import copy, deepcopy +from typing import Set, Awaitable, Union, Any + + +class AsyncRecordProcessor(ABSTRACT_BASE_CLASS): + def __init__(self, process_fn: Awaitable): + self._process_fn = process_fn + + @abstractmethod + async def process_record(self, schema: dict, record: dict, fields_to_ignore: Set[str] = {}) -> dict: + raise NotImplementedError + + +class AsyncAvroProcessor(AsyncRecordProcessor): + async def process_record(self, schema: dict, record: dict, fields_to_ignore: Set[str] = {}) -> dict: + processed_record = copy(record) + for field_schema in schema['fields']: + field_name = field_schema['name'] + if field_name in fields_to_ignore: + continue + processed_record[field_name] = await self.process_field(field_schema, record[field_name], fields_to_ignore) + return processed_record + + async def process_field(self, field_schema: dict, field_value: Any, *args) -> Any: + if isinstance(field_value, str): + return await self._process_fn(field_value) + if field_schema['type'] in ('array', 'map', 'record'): + return await self.process_complex_type(field_schema['name'], field_schema, field_value, *args) + if isinstance(field_schema['type'], dict): + return await self.process_complex_type(field_schema['name'], field_schema['type'], field_value, *args) + return field_value + + async def process_complex_type(self, field_name: str, type_schema: dict, field_value: Any, *args) -> Any: + field_type = type_schema['type'] + if field_type == 'record': + return await self.process_record(type_schema, field_value) + + if field_type == 'map': + item_schema = type_schema['values'] + if isinstance(item_schema, dict): + item_schema['name'] = field_name + else: + item_schema = dict(name=field_name, type=item_schema) + for key in field_value: + field_value[key] = await self.process_field(item_schema, field_value[key], *args) + return field_value + + if field_type == 'array': + item_schema = type_schema['items'] + else: + raise NotImplementedError + + if item_schema == 'string': + return [await self._process_fn(value) for value in field_value] + if isinstance(item_schema, list): + return [await self._process_fn(value) if isinstance(value, str) else value for value in field_value] + if isinstance(item_schema, dict): + item_schema['name'] = field_name + field_values = [await self.process_field(item_schema, value, *args) for value in field_value] + return field_values + return field_value + + +def _maybe_to_number(value: str) -> Union[int, float]: + try: + return int(value) + except ValueError: + try: + return float(value) + except ValueError: + return value + + +class AsyncJsonProcessor(AsyncRecordProcessor): + _complex_types = ('array', 'object',) + _ref_key = '$ref' + + async def process_record(self, schema: dict, record: dict, fields_to_ignore: Set[str] = {}) -> dict: + processed_record = deepcopy(record) + ref_schemas = schema.get('definitions', None) + for field_name, field_schema in schema['properties'].items(): + if field_name in fields_to_ignore: + continue + processed_record[field_name] = await self.process_field(field_name, field_schema, record[field_name], + ref_schemas, fields_to_ignore) + return processed_record + + async def process_field(self, field_name: str, field_schema: dict, field_value: Any, + ref_schemas: dict, *args) -> Any: + if 'type' in field_schema: + if field_schema['type'] == 'string': + return await self._process_fn(field_value) + if field_schema['type'] in self._complex_types: + return await self.process_complex_type(field_name, field_schema, field_value, ref_schemas, *args) + elif 'anyOf' in field_schema and dict(type='string') in field_schema['anyOf']: + # All numbers are deserialized as strings + # Assume numeric strings were originally numbers, favouring int over float + field_value = _maybe_to_number(field_value) + if isinstance(field_value, str): + field_value = await self._process_fn(field_value) + return field_value + elif self._ref_key in field_schema: + field_record_name = self._parse_ref_schema_name(field_schema) + field_schema = self._get_referenced_schema(field_record_name, ref_schemas) + return await self.process_record(field_schema, field_value, *args) + return field_value + + async def process_complex_type(self, field_name: str, type_schema: dict, field_value: Any, ref_schemas: dict, + *args) -> Any: + field_type = type_schema['type'] + + if field_type == 'object': + item_schema = type_schema['additionalProperties'] + for key in field_value: + field_value[key] = await self.process_field(field_name, item_schema, field_value[key], ref_schemas, + *args) + return field_value + + if field_type == 'array': + item_schema = type_schema['items'] + else: + raise NotImplementedError + + if 'type' in item_schema: + if item_schema['type'] == 'string': + return [await self._process_fn(value) for value in field_value] + if item_schema['type'] in self._complex_types: + field_values = [] + for value in field_value: + sub_field_values = await self.process_field(field_name, item_schema, value, ref_schemas, *args) + field_values.append(sub_field_values) + return field_values + elif 'anyOf' in item_schema and dict(type='string') in item_schema['anyOf']: + # All numbers are deserialized as strings + # Assume numeric strings were originally numbers, favouring int over float + field_values = [] + for value in field_value: + value = _maybe_to_number(value) + if isinstance(value, str): + value = await self._process_fn(value) + field_values.append(value) + return field_values + elif self._ref_key in item_schema: + field_record_name = self._parse_ref_schema_name(item_schema) + field_schema = self._get_referenced_schema(field_record_name, ref_schemas) + return [await self.process_record(field_schema, record, *args) for record in field_value] + return field_value + + def _parse_ref_schema_name(self, full_schema: dict) -> str: + return full_schema[self._ref_key].split('/')[-1] + + @staticmethod + def _get_referenced_schema(schema_name: str, ref_schemas: dict) -> dict: + assert ref_schemas + field_schema = ref_schemas[schema_name] + field_schema['definitions'] = deepcopy(ref_schemas) + assert field_schema['type'] == 'object' + return field_schema diff --git a/pii-detection/code/pii/schema_explorer.py b/pii-detection/code/pii/schema_explorer.py new file mode 100644 index 00000000..d06cd6b2 --- /dev/null +++ b/pii-detection/code/pii/schema_explorer.py @@ -0,0 +1,55 @@ +import json +from typing import Dict, List, Tuple, Union + + +def is_json(some_string): + try: + json.loads(some_string) + except ValueError: + return False + return True + + +class SchemaExplorer: + @staticmethod + def iterate_fields(record_dict: Dict, results: List, name_prefix: str = ""): + for field_name, field_value in record_dict.items(): + if isinstance(field_value, dict): + SchemaExplorer.iterate_fields(field_value, results, f"{field_name}.") + elif isinstance(field_value, str): + results.append((f"{name_prefix}{field_name}", field_value)) + + def __init__(self, source_object): + self.text_fields = [] + self.source_object = source_object + self.iterate_fields(self.source_object, self.text_fields) + + def all_text_values(self) -> List[str]: + return [i[1] for i in self.text_fields] + + def all_text_fields(self) -> List[Tuple[str, str]]: + return [(i[0], i[1]) for i in self.text_fields] + + def get_parent_from_field_path_list(self, field_items: List[str]): + field_items = field_items[:-1] + result = self.source_object + try: + for item in field_items: + result = result[item] + return result + except KeyError: + return None + + def update_field_from_dotted(self, dotted_field_name: str, new_value): + field_items = dotted_field_name.split('.') + + if not field_items or len(field_items) == 0 or not field_items[0]: + return False + + field_name = field_items[-1] + parent = self.get_parent_from_field_path_list(field_items) + # the 2nd test is to ensure we don't create a new entry (on the next code line) if it doesn't already exist + if parent and parent.get(field_name): + parent[field_name] = new_value + return True + return False diff --git a/pii-detection/code/pii/schema_registry_serder.py b/pii-detection/code/pii/schema_registry_serder.py new file mode 100644 index 00000000..89853c6a --- /dev/null +++ b/pii-detection/code/pii/schema_registry_serder.py @@ -0,0 +1,22 @@ +import struct + +from schema_registry.serializers.errors import SerializerError +from schema_registry.serializers.message_serializer import ContextStringIO, MAGIC_BYTE + + +def decode_schema_id(message: bytes) -> int: + """ + Decode the schema ID from a message from kafka that has been encoded for use with the schema registry. + This function is an extension to the python-schema-registry-client, which only provides the deserialised message. + Args: + message: message to be decoded + Returns: + dict: The ID of the schema the message was encoded with + """ + if len(message) <= 5: + raise SerializerError("message is too small to decode") + with ContextStringIO(message) as payload: + magic, schema_id = struct.unpack(">bI", payload.read(5)) + if magic != MAGIC_BYTE: + raise SerializerError("message does not start with magic byte") + return schema_id diff --git a/pii-detection/code/pii/settings_utils.py b/pii-detection/code/pii/settings_utils.py new file mode 100644 index 00000000..07be21b1 --- /dev/null +++ b/pii-detection/code/pii/settings_utils.py @@ -0,0 +1,26 @@ +from typing import List + + +# takes a string with a single element or several separated by commas +# and returns a List of strings +def string_to_list(text: str) -> List[str]: + result = [] + if text.find(','): + items = text.split(",") + for i in items: + result.append(i.strip()) + else: + result.append(text) + return result + + +# settings can contain either a single name in a string, +# a List of names +# or a list of names separated by commas +def list_or_string_to_list(settings): + if isinstance(settings, str): + return string_to_list(settings) + elif isinstance(settings, list): + return settings + + raise RuntimeError(f"Invalid format for settings: {settings}") \ No newline at end of file diff --git a/pii-detection/code/pii/spacy_model.py b/pii-detection/code/pii/spacy_model.py new file mode 100644 index 00000000..9d34cb7b --- /dev/null +++ b/pii-detection/code/pii/spacy_model.py @@ -0,0 +1,51 @@ +import pkgutil +import subprocess +from typing import Dict, List + +from pii.settings_utils import list_or_string_to_list + + +class SpacyModel: + def __init__(self, lang_code, model_name): + self.lang_code = lang_code + self.model_name = model_name + + @classmethod + def from_name(cls, model_name): + lang_code = model_name[:2] + return cls(lang_code=lang_code, model_name=model_name) + + def load(self): + if pkgutil.find_loader(self.model_name): + print(f"spacy model: {self.model_name} already present") + return + + print(f"(down)loading spacy model: {self.model_name}") + + commands = ["python", "-m", "spacy", "download", self.model_name] + process = subprocess.run([*commands], capture_output=True) + if process.returncode != 0: + raise EnvironmentError(f"Model not found (or unable to install) {self.model_name} - " + f"{process.returncode} {process.stderr.decode()} {process.stdout.decode()}") + + +def load_models(model_names: List[str]) -> Dict: + configuration = { + "nlp_engine_name": "spacy", + "models": [], + } + for model_name in model_names: + model = SpacyModel.from_name(model_name) + model.load() + configuration["models"].append({"lang_code": model.lang_code, "model_name": model.model_name}) + + return configuration + + +def init_models(models): + # models is optional and can contain either a single model name in a string, + # a List of model names + # or a list of model names separated by commas + models_to_load = list_or_string_to_list(models) if models else ['en_core_web_lg'] + return load_models(models_to_load) + diff --git a/pii-detection/code/pii/stream_catalog.py b/pii-detection/code/pii/stream_catalog.py new file mode 100644 index 00000000..36772074 --- /dev/null +++ b/pii-detection/code/pii/stream_catalog.py @@ -0,0 +1,37 @@ +import json +from string import Template +from typing import List + +from gql import Client, gql +from gql.transport.exceptions import TransportServerError +from gql.transport.requests import RequestsHTTPTransport + +from typing import Set + + +class StreamCatalogError(TransportServerError): + pass + + +class StreamCatalogTagQuery: + def __init__(self, schema_registry_url: str, schema_registry_auth, field_tags: List[str]): + catalog_query_url = f'{schema_registry_url}/catalog/graphql' + transport = RequestsHTTPTransport(url=catalog_query_url, verify=True, retries=3, auth=schema_registry_auth) + self._catalog_client = Client(transport=transport) + self._tagged_fields_query_template = Template(""" + query { + sr_field(tags: $field_tags, where: {id: {_eq: $schema_id}}) { + name + } + } + """) + self._tagged_fields_query_template = Template( + self._tagged_fields_query_template.safe_substitute(field_tags=json.dumps(field_tags))) + + def __call__(self, schema_id: int) -> Set[str]: + query = self._tagged_fields_query_template.substitute(schema_id=schema_id) + try: + catalog_response = self._catalog_client.execute(gql(query)) + except TransportServerError as e: + raise StreamCatalogError('GraphQL query to Stream Catalog failed') from e + return {field['name'] for field in catalog_response['sr_field']} if catalog_response['sr_field'] else set() diff --git a/pii-detection/code/reco.yaml b/pii-detection/code/reco.yaml new file mode 100644 index 00000000..4fe6344b --- /dev/null +++ b/pii-detection/code/reco.yaml @@ -0,0 +1,33 @@ +recognizers: + - name: "Zip code Recognizer" + supported_language: "de" + patterns: + - name: "zip code (weak)" + regex: "(\\b\\d{5}(?:\\-\\d{4})?\\b)" + score: 0.01 + context: + - zip + - code + supported_entity: "ZIP" + + - name: "Extended Credit Card Recognizer" + supported_language: "en" + patterns: + - name: "Extended Credit Card (weak)" + regex: "(\\b\\d{16,19}\\b)" + score: 0.01 + context: + - zip + - code + supported_entity: "CREDIT_CARD" + + - name: "Titles recognizer" + supported_language: "en" + supported_entity: "TITLE" + deny_list: + - Mr. + - Mrs. + - Ms. + - Miss + - Dr. + - Prof. diff --git a/pii-detection/code/requirements.txt b/pii-detection/code/requirements.txt new file mode 100644 index 00000000..0b835987 --- /dev/null +++ b/pii-detection/code/requirements.txt @@ -0,0 +1,14 @@ +pip +setuptools +wheel +spacy +aiokafka +python-schema-registry-client +python-decouple +faust-streaming +presidio-analyzer==2.2.* +presidio-anonymizer==2.2.* +lingua-language-detector==1.3.2 +gql[requests] +dataclasses-avroschema==0.34.4 +./presidio-research-fake-record-generator.zip diff --git a/pii-detection/code/settings.ini b/pii-detection/code/settings.ini new file mode 100644 index 00000000..50046517 --- /dev/null +++ b/pii-detection/code/settings.ini @@ -0,0 +1,33 @@ +[settings] +KAFKA_BROKER_URL= +CLUSTER_API_KEY= +CLUSTER_API_SECRET= + +SCHEMA_REGISTRY_URL= +SR_API_KEY= +SR_API_SECRET= + +USE_AUTH=True + +#TOPIC_PARTITIONS +#OFFSET +TOPIC_REPLICATION_FACTOR=3 + +#IGNORE_FIELD_TAGS + +IN_TOPIC=demo-raw +OUT_TOPIC=demo-anon +ALERT_TOPIC=demo-alerts + +CUSTOM_RECOGNIZER_YAML=./reco.yaml + +LANGUAGES=en, fr +MODELS=en_core_web_lg, fr_core_news_lg + +# list of entity types: see https://microsoft.github.io/presidio/supported_entities/ +#ENTITY_TYPES +# if providing entity types, set to true if they're extending the default +# and false if they're replacing them (this option defaults to True, meaning that if omitted the entity types +# provided will supplement the default set). +#EXTEND_DEFAULT_ENTITIES + diff --git a/pii-detection/create_py_env.sh b/pii-detection/create_py_env.sh new file mode 100755 index 00000000..1927cf3e --- /dev/null +++ b/pii-detection/create_py_env.sh @@ -0,0 +1,11 @@ +#!/bin/sh + +echo "creating a virtual environment" +VENV_PATH='./pii-detection-tutorial-venv' +python3 -m venv $VENV_PATH +source $VENV_PATH/bin/activate + +echo "loading libraries" +pip install -r ./code/requirements.txt + +echo "done" \ No newline at end of file diff --git a/pii-detection/img/app-topology.png b/pii-detection/img/app-topology.png new file mode 100644 index 00000000..db8695ad Binary files /dev/null and b/pii-detection/img/app-topology.png differ