Skip to content

PII Detection #7

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions pii-detection/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# Run a PII Detection streaming app using Microsoft Presidio and Natural Language Processing

## Intro

This PII Detection demo application is using python, faust, Microsoft presidio and other libraries to automatically detect and redact PII information in Kafka messages.

![](img/app-topology.png)

A common topology is to go through a raw data topic that is accessible by priviledged consumers and at the same time use a PII detection application to send the data, after analysis and redaction, into an anonymized data topic.

The app also sends alerts into a topic with the details of where data has been redacted.

Unpriviledged consumers, that don't have enough rights to read the full data, can consume the anonymized messages.

For reference:
- PII Detection slides : https://docs.google.com/presentation/d/1qgJ-_1_ifXjG9sgM__d7s5Z-LOfMqNDZwCrsjItI3xw/
- Python tools slides: https://docs.google.com/presentation/d/1cdfVa2A5FQXwo5Oxi9FUEscQp2z78SAo5xfvjNNZhIg/

## Running PII Detection

To send data to the source topics, we use a sample producer but any data source will work of course.
The PII detection app is a faust python app that uses libraries that we have developed using Microsoft Presidio and other tools to read from the source topics, inspect the messages and redact them if PII is detected.

### Prepare the python environment

With python 3.6 or later installed/available on your system:

- Open a terminal and go to the tutorial folder
- run the `create_py_env.sh` script with `source`: `source ./create_py_env.sh`

This will create a python virtual environment and load the required libraries in it. The environment is created in a folder named `venv` in the current directory. There is a variable that you can change in the script if you want to put it somewhere else. The environment can be safely deleted and recreated at will.

### Connecting to a Confluent Cloud cluster

Credentials to the CC cluster can be entered inside a `.ini` or `.env` file or passed as environment variables. Envvars take precedence. We have provided a `settings.ini` that you can supplement easily. The 1st 6 lines have to be provided (CC URL, key and secret + SR URL, key and secret).

- Move to the `code` folder: `cd code`
- Open `settings.ini`
- Enter your CC credentials
- That's it!

### Run the producer

(You should already be in the `code` folder.)

`python message_producer.py`

This will produce fake data into a topic named `demo-raw` in your Confluent Cloud cluster. You should see messages coming into the topic and some messages will have PII in them.

### Run the streaming app

- Open a new terminal and go the tutorial folder.
- Move to the `code` folder: `cd code`
- Activate the environment: `source ../pii-detection-tutorial-venv/bin/activate`
- Ru the app: `python app.py worker -l info`

As the app runs, you should see messages going to the `demo-anon` topic (with the PII redacted) and for each redaction, there will be an alert message in the `demo-alerts` topic.

## Customization points and improving the detection rate

The `settings.ini` file contains a few customization options:
- `LANGUAGES`: it's important to provide an accurate list of the languages used in the messages. It helps with the syntactic analysis of the sentences contained in the messages.
- `MODELS`: NLP models loaded are derived from the expected languages but different sizes are available for each language. If you can use the largest ones. See https://spacy.io/usage/models
- `ENTITY_TYPES`: they are the type of data that is detected (credit card, postal code, name, etc). You can provide a list to supplement or replace the default list. Entities are described here: https://microsoft.github.io/presidio/supported_entities/
- `IGNORE_FIELD_TAGS`: you can provide here a list of field tags to be skipped. If the list is empty it will look at all the text fields.

The settings file also shows declaring an external `yaml` file with extra recognizers. They can be used to improve recognition of specific entity types or words.

Factors impacting the PII detection:
- The longer and the more formed the sentences are, the better will be the syntactic analysis done by the NLP models. From there, when a sentence is properly analyzed, differentiating words that could be part of PII or just be common words works better.
- Short malformed sentences such as chat exchanges will have a lower detection performance. Loading larger models in this situation will improve results.
- Language detection can be done automatically (`LANGUAGES` left empty or set to `auto`) but works better with more words (from the language). If the sentences are very short and few, it is better (when practical) to provide the list of possible languages.



3 changes: 3 additions & 0 deletions pii-detection/code/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from app import main

main()
191 changes: 191 additions & 0 deletions pii-detection/code/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
from asyncio import get_event_loop
import json
import logging
from typing import NamedTuple

from aiokafka.helpers import create_ssl_context
from decouple import config, AutoConfig, RepositoryEnv, Csv
from faust import App, SASLCredentials
from schema_registry.client import SchemaRegistryClient

from schema_registry.client.utils import AVRO_SCHEMA_TYPE, JSON_SCHEMA_TYPE
from schema_registry.serializers import AvroMessageSerializer, JsonMessageSerializer
from schema_registry.serializers.errors import SerializerError

from pii.analyzer import PresidioAnalyzer, anonymizer
from pii.language_detection import LanguageDetector
from pii.record_processing import AsyncAvroProcessor, AsyncJsonProcessor
from pii.schema_explorer import SchemaExplorer, is_json
from pii.schema_registry_serder import decode_schema_id
from pii.settings_utils import list_or_string_to_list
from pii.spacy_model import init_models
from pii.stream_catalog import StreamCatalogTagQuery, StreamCatalogError


class Auth(NamedTuple):
username: str
password: str


env_file = config('PII_ENV_FILE', default='')
if env_file:
AutoConfig.SUPPORTED = {env_file: RepositoryEnv}
config = AutoConfig()

auth_env_var = 'USE_AUTH'
use_authorisation = config(auth_env_var, default=True, cast=bool)

faust_creds = SASLCredentials(username=config('CLUSTER_API_KEY'), password=config('CLUSTER_API_SECRET'),
ssl_context=create_ssl_context()) if use_authorisation else None
num_topic_partitions = config('TOPIC_PARTITIONS', default=1, cast=int)

offset = config('OFFSET', default='earliest')
offset_reset_policy = offset if not offset.isdigit() else None

app = App('pii-detector',
broker=config('KAFKA_BROKER_URL'),
broker_credentials=faust_creds,
topic_replication_factor=config('TOPIC_REPLICATION_FACTOR'),
topic_partitions=num_topic_partitions,
consumer_auto_offset_reset=offset_reset_policy)

sr_auth = Auth(config('SR_API_KEY'), config('SR_API_SECRET')) if use_authorisation else None
sr_url_env_var = 'SCHEMA_REGISTRY_URL'
sr_url = config(sr_url_env_var)
if use_authorisation:
assert sr_url.startswith('https://'), \
f'Authorisation requires use of https protocol. Set {auth_env_var} to False or change {sr_url_env_var}'
sr_client = SchemaRegistryClient(url=sr_url, auth=sr_auth)

field_tags = config('IGNORE_FIELD_TAGS', cast=Csv(), default='')
catalog_tag_query = StreamCatalogTagQuery(sr_client.url_manager.base_url, sr_auth, field_tags) if field_tags else None

in_topic = app.topic(config('IN_TOPIC'), internal=True, value_serializer='raw', key_serializer='raw')
anon_topic = app.topic(config('OUT_TOPIC'))
entity_alert_topic = app.topic(config('ALERT_TOPIC'))

# init models + analyzer

pii_detection_config = init_models(config("MODELS"))
analyzer = PresidioAnalyzer(pii_detection_config,
entity_types=config('ENTITY_TYPES', cast=Csv(), default=''),
extend_default_entity_types=config('EXTEND_DEFAULT_ENTITIES', default=True),
custom_recognizers_yaml_path=config('CUSTOM_RECOGNIZER_YAML', default=None))

# Language detection init (warning: setting it to "auto" costs 800 MB in memory)
# empty or "auto" = default --> all supported languages are pre-loaded and lang is detected on each string
# or a list of language codes separated by commas
# if only language code is provided, no language detection happens
lang_settings = config("LANGUAGES", "auto")
if lang_settings == "auto":
lang_codes = []
else:
lang_codes = list_or_string_to_list(lang_settings) if lang_settings else []

if len(lang_codes) == 1:
lang_code = lang_codes[0]
lang_detector = None
else:
lang_code = None
lang_detector = LanguageDetector(lang_codes)


class PartitionOffsetter:
def __init__(self, index: int, offset: int):
self._index = index
self._offset = offset

async def set_offset(self):
from faust.types import TP
await app.consumer.seek(TP(in_topic.get_topic_name(), self._index), self._offset)
logging.info(f'Moved partition {self._index} offset to: {self._offset}')


if offset.isdigit():
offset = int(offset)
for partition in range(num_topic_partitions):
app.task()(PartitionOffsetter(partition, offset).set_offset)


async def anonymize(raw_text: str) -> str:
language = lang_code if lang_code else lang_detector.detect_lang(raw_text)

analysis_results = analyzer.analyze(raw_text, language=language)
print(f'Analysis results: {analysis_results}')
anonymized_result = anonymizer.anonymize(text=raw_text, analyzer_results=analysis_results)
print(f'Anonymised Text: {anonymized_result.text}')

for (detected_entity, anonymized_entity) in zip(analysis_results, anonymized_result.items):
alert = dict(
entity_type=anonymized_entity.entity_type,
operation=str(anonymized_entity),
text=raw_text,
confidence_score=detected_entity.score,
language=language,
topic=in_topic.get_topic_name(),


)
await entity_alert_topic.send(value=alert)
return anonymized_result.text


async def anonymize_record(record):
schema_explorer = SchemaExplorer(record)

for field_name, field_value in schema_explorer.all_text_fields():
anonymized_value = anonymize(field_value)
res = schema_explorer.update_field_from_dotted(field_name, anonymized_value)
if not res:
logging.error(f"{field_name} was not found in the record. The field will not be anonymized.")
return record


@app.agent(in_topic, sink=[anon_topic])
async def pii_detection(stream):
async for message_bytes in stream:
try:
in_schema_id = decode_schema_id(message_bytes)
except SerializerError:
in_schema_id = None

if in_schema_id:
tagged_field_names = {}
if catalog_tag_query:
try:
tagged_field_names = catalog_tag_query(in_schema_id)
except StreamCatalogError:
logging.exception(f'Contacting Stream Catalog failed, cannot retrieve fields with tags {field_tags}'
'Defaulting to no ignored fields')
out_message_subject = f'{anon_topic.get_topic_name()}-value'
in_schema = sr_client.get_by_id(in_schema_id)
if in_schema.schema_type == AVRO_SCHEMA_TYPE:
serdes = AvroMessageSerializer(sr_client)
format_anonymizer = AsyncAvroProcessor(anonymize)
elif in_schema.schema_type == JSON_SCHEMA_TYPE:
serdes = JsonMessageSerializer(sr_client)
format_anonymizer = AsyncJsonProcessor(anonymize)
else:
raise ValueError(f'Schema type {in_schema.schema_type} is not supported')
in_message = serdes.decode_message(message_bytes)
out_message = await format_anonymizer.process_record(in_schema.schema, in_message, tagged_field_names)
encoded_out_message = serdes.encode_record_with_schema(out_message_subject, in_schema, out_message)
else:
msg_str = message_bytes.decode("utf-8")
if is_json(msg_str):
json_obj = json.loads(msg_str)
encoded_out_message = await anonymize_record(json_obj)
else:
encoded_out_message = await anonymize(msg_str)
yield encoded_out_message


def main():
event_loop = get_event_loop()
for topic in (in_topic, anon_topic, entity_alert_topic):
event_loop.run_until_complete(topic.maybe_declare())
app.main()


if __name__ == '__main__':
main()
12 changes: 12 additions & 0 deletions pii-detection/code/context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from dataclasses import dataclass

from pii.analyzer import PresidioAnalyzer
from pii.language_detection import LanguageDetector


@dataclass
class PiiSmtContext:
lang_detector: LanguageDetector = None
lang_code: str = "auto"
analyzer: PresidioAnalyzer = None
settings: dict = None
70 changes: 70 additions & 0 deletions pii-detection/code/message_producer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import random
from asyncio import get_event_loop
from dataclasses import dataclass
from typing import NamedTuple

from aiokafka.helpers import create_ssl_context
from dataclasses_avroschema import AvroModel
from decouple import config, AutoConfig, RepositoryEnv
from faust import App, SASLCredentials
from presidio_evaluator.data_generator import PresidioSentenceFaker
from schema_registry.client import SchemaRegistryClient
from schema_registry.client.schema import AvroSchema
from schema_registry.serializers import AvroMessageSerializer


class Auth(NamedTuple):
username: str
password: str


@dataclass
class RawData(AvroModel):
event_id: int
log: str


faker = PresidioSentenceFaker(locale='en', lower_case_ratio=0.05)
fake_records = faker.generate_new_fake_sentences(num_samples=1000)

env_file = config('PII_ENV_FILE', default='')
if env_file:
AutoConfig.SUPPORTED = {env_file: RepositoryEnv}
config = AutoConfig()

auth_env_var = 'USE_AUTH'
use_authorisation = config(auth_env_var, default=True, cast=bool)

faust_creds = SASLCredentials(username=config('CLUSTER_API_KEY'), password=config('CLUSTER_API_SECRET'),
ssl_context=create_ssl_context()) if use_authorisation else None
app = App('message-producer',
broker=config('KAFKA_BROKER_URL'),
broker_credentials=faust_creds,
topic_replication_factor=config('TOPIC_REPLICATION_FACTOR'),
topic_partitions=1)

sr_auth = Auth(config('SR_API_KEY'), config('SR_API_SECRET')) if use_authorisation else None
sr_client = SchemaRegistryClient(url=config('SCHEMA_REGISTRY_URL'), auth=sr_auth)
avro_serdes = AvroMessageSerializer(sr_client)

topic_name = config('IN_TOPIC')
raw_data_topic = app.topic(topic_name, internal=True)
event_loop = get_event_loop()
event_loop.run_until_complete(raw_data_topic.maybe_declare())


@app.timer(1.0)
async def produce_raw_data():
fake_record = random.choice(fake_records)
snippet = fake_record.fake
data = RawData(random.randint(0, 1000), snippet).asdict()
schema = AvroSchema(RawData.avro_schema())
data = avro_serdes.encode_record_with_schema(f'{topic_name}-value', schema, data)
await raw_data_topic.send(value=data)


if __name__ == '__main__':
import sys

sys.argv = [sys.argv[0], 'worker', '-l', 'info', '--web-port', '7001']
app.main()
Empty file.
Loading