Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 44 additions & 1 deletion azure/functions/decorators/function_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@
from azure.functions.decorators.http import HttpTrigger, HttpOutput, \
HttpMethod
from azure.functions.decorators.kafka import KafkaTrigger, KafkaOutput, \
BrokerAuthenticationMode, BrokerProtocol, OAuthBearerMethod
BrokerAuthenticationMode, BrokerProtocol, OAuthBearerMethod, \
KafkaMessageKeyType
from azure.functions.decorators.queue import QueueTrigger, QueueOutput
from azure.functions.decorators.servicebus import ServiceBusQueueTrigger, \
ServiceBusQueueOutput, ServiceBusTopicTrigger, \
Expand Down Expand Up @@ -1244,12 +1245,18 @@ def kafka_trigger(self,
event_hub_connection_string: Optional[str] = None,
consumer_group: Optional[str] = None,
avro_schema: Optional[str] = None,
key_avro_schema: Optional[str] = None,
key_data_type: Optional[Union[KafkaMessageKeyType, str]] = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you set the default value for key_data_type to KafkaMessageKeyType.STRING here since it's given a default value in kafka.py?

username: Optional[str] = None,
password: Optional[str] = None,
ssl_key_location: Optional[str] = None,
ssl_ca_location: Optional[str] = None,
ssl_certificate_location: Optional[str] = None,
ssl_key_password: Optional[str] = None,
ssl_certificate_pem: Optional[str] = None,
ssl_key_pem: Optional[str] = None,
ssl_ca_pem: Optional[str] = None,
ssl_certificate_and_key_pem: Optional[str] = None,
schema_registry_url: Optional[str] = None,
schema_registry_username: Optional[str] = None,
schema_registry_password: Optional[str] = None,
Expand Down Expand Up @@ -1287,6 +1294,10 @@ def kafka_trigger(self,
Azure Event Hubs).
:param consumer_group: Kafka consumer group used by the trigger.
:param avro_schema: Used only if a generic Avro record should be generated.
:param key_avro_schema: Avro schema for the message key. Used only if a
generic Avro record should be generated for the key.
:param key_data_type: Data type of the message key. Valid values: Int, Long,
String, Binary. Default is String. Ignored if key_avro_schema is set.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please also add pydocs for the new ssl_ arguments as well here?

:param username: SASL username for use with the PLAIN or SASL-SCRAM mechanisms.
Equivalent to 'sasl.username' in librdkafka. Default is empty string.
:param password: SASL password for use with the PLAIN or SASL-SCRAM mechanisms.
Expand Down Expand Up @@ -1338,12 +1349,19 @@ def decorator():
event_hub_connection_string=event_hub_connection_string, # noqa: E501
consumer_group=consumer_group,
avro_schema=avro_schema,
key_avro_schema=key_avro_schema,
key_data_type=parse_singular_param_to_enum(
key_data_type, KafkaMessageKeyType),
username=username,
password=password,
ssl_key_location=ssl_key_location,
ssl_ca_location=ssl_ca_location,
ssl_certificate_location=ssl_certificate_location,
ssl_key_password=ssl_key_password,
ssl_certificate_pem=ssl_certificate_pem,
ssl_key_pem=ssl_key_pem,
ssl_ca_pem=ssl_ca_pem,
ssl_certificate_and_key_pem=ssl_certificate_and_key_pem,
schema_registry_url=schema_registry_url,
schema_registry_username=schema_registry_username,
schema_registry_password=schema_registry_password,
Expand Down Expand Up @@ -2588,12 +2606,18 @@ def kafka_output(self,
topic: str,
broker_list: str,
avro_schema: Optional[str] = None,
key_avro_schema: Optional[str] = None,
key_data_type: Optional[Union[KafkaMessageKeyType, str]] = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as trigger - please set default value here to match kafka.py

username: Optional[str] = None,
password: Optional[str] = None,
ssl_key_location: Optional[str] = None,
ssl_ca_location: Optional[str] = None,
ssl_certificate_location: Optional[str] = None,
ssl_key_password: Optional[str] = None,
ssl_certificate_pem: Optional[str] = None,
ssl_key_pem: Optional[str] = None,
ssl_ca_pem: Optional[str] = None,
ssl_certificate_and_key_pem: Optional[str] = None,
schema_registry_url: Optional[str] = None,
schema_registry_username: Optional[str] = None,
schema_registry_password: Optional[str] = None,
Expand Down Expand Up @@ -2630,6 +2654,10 @@ def kafka_output(self,
:param topic: The Kafka topic to which messages are published.
:param broker_list: The list of Kafka brokers to which the producer connects.
:param avro_schema: Optional. Avro schema to generate a generic record.
:param key_avro_schema: Avro schema for the message key. Used only if a
generic Avro record should be generated for the key.
:param key_data_type: Data type of the message key. Valid values: Int, Long,
String, Binary. Default is String. Ignored if key_avro_schema is set.
:param username: SASL username for use with the PLAIN and SASL-SCRAM
mechanisms. Equivalent to `'sasl.username'` in librdkafka.
:param password: SASL password for use with the PLAIN and SASL-SCRAM
Expand All @@ -2642,6 +2670,14 @@ def kafka_output(self,
Equivalent to `'ssl.certificate.location'` in librdkafka.
:param ssl_key_password: Password for the client's SSL key.
Equivalent to `'ssl.key.password'` in librdkafka.
:param ssl_certificate_pem: Client certificate in PEM format.
Equivalent to 'ssl.certificate.pem' in librdkafka.
:param ssl_key_pem: Client private key in PEM format.
Equivalent to 'ssl.key.pem' in librdkafka.
:param ssl_ca_pem: CA certificate for verifying the broker's certificate in PEM format.
Equivalent to 'ssl.ca.pem' in librdkafka.
:param ssl_certificate_and_key_pem: Client certificate and key in PEM format.
Additional configuration for KeyVault support (certificate with private key).
:param schema_registry_url: URL of the Avro Schema Registry.
:param schema_registry_username: Username for accessing the Schema Registry.
:param schema_registry_password: Password for accessing the Schema Registry.
Expand Down Expand Up @@ -2695,12 +2731,19 @@ def decorator():
topic=topic,
broker_list=broker_list,
avro_schema=avro_schema,
key_avro_schema=key_avro_schema,
key_data_type=parse_singular_param_to_enum(
key_data_type, KafkaMessageKeyType),
username=username,
password=password,
ssl_key_location=ssl_key_location,
ssl_ca_location=ssl_ca_location,
ssl_certificate_location=ssl_certificate_location,
ssl_key_password=ssl_key_password,
ssl_certificate_pem=ssl_certificate_pem,
ssl_key_pem=ssl_key_pem,
ssl_ca_pem=ssl_ca_pem,
ssl_certificate_and_key_pem=ssl_certificate_and_key_pem,
schema_registry_url=schema_registry_url,
schema_registry_username=schema_registry_username,
schema_registry_password=schema_registry_password,
Expand Down
73 changes: 52 additions & 21 deletions azure/functions/decorators/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ class OAuthBearerMethod(StringifyEnum):
OIDC = 1


class KafkaMessageKeyType(StringifyEnum):
INT = 0
LONG = 1
STRING = 2
BINARY = 3


class KafkaOutput(OutputBinding):
@staticmethod
def get_binding_name() -> str:
Expand All @@ -39,15 +46,21 @@ def __init__(self,
topic: str,
broker_list: str,
avro_schema: Optional[str],
username: Optional[str],
password: Optional[str],
ssl_key_location: Optional[str],
ssl_ca_location: Optional[str],
ssl_certificate_location: Optional[str],
ssl_key_password: Optional[str],
schema_registry_url: Optional[str],
schema_registry_username: Optional[str],
schema_registry_password: Optional[str],
key_avro_schema: Optional[str] = None,
key_data_type: Optional[KafkaMessageKeyType] = KafkaMessageKeyType.STRING,
username: Optional[str] = None,
password: Optional[str] = None,
ssl_key_location: Optional[str] = None,
ssl_ca_location: Optional[str] = None,
ssl_certificate_location: Optional[str] = None,
ssl_key_password: Optional[str] = None,
ssl_certificate_pem: Optional[str] = None,
ssl_key_pem: Optional[str] = None,
ssl_ca_pem: Optional[str] = None,
ssl_certificate_and_key_pem: Optional[str] = None,
schema_registry_url: Optional[str] = None,
schema_registry_username: Optional[str] = None,
schema_registry_password: Optional[str] = None,
o_auth_bearer_method: Optional[OAuthBearerMethod] = None,
o_auth_bearer_client_id: Optional[str] = None,
o_auth_bearer_client_secret: Optional[str] = None,
Expand All @@ -68,12 +81,18 @@ def __init__(self,
self.topic = topic
self.broker_list = broker_list
self.avro_schema = avro_schema
self.key_avro_schema = key_avro_schema
self.key_data_type = key_data_type
self.username = username
self.password = password
self.ssl_key_location = ssl_key_location
self.ssl_ca_location = ssl_ca_location
self.ssl_certificate_location = ssl_certificate_location
self.ssl_key_password = ssl_key_password
self.ssl_certificate_pem = ssl_certificate_pem
self.ssl_key_pem = ssl_key_pem
self.ssl_ca_pem = ssl_ca_pem
self.ssl_certificate_and_key_pem = ssl_certificate_and_key_pem
self.schema_registry_url = schema_registry_url
self.schema_registry_username = schema_registry_username
self.schema_registry_password = schema_registry_password
Expand Down Expand Up @@ -104,18 +123,24 @@ def __init__(self,
name: str,
topic: str,
broker_list: str,
event_hub_connection_string: Optional[str],
consumer_group: Optional[str],
avro_schema: Optional[str],
username: Optional[str],
password: Optional[str],
ssl_key_location: Optional[str],
ssl_ca_location: Optional[str],
ssl_certificate_location: Optional[str],
ssl_key_password: Optional[str],
schema_registry_url: Optional[str],
schema_registry_username: Optional[str],
schema_registry_password: Optional[str],
event_hub_connection_string: Optional[str] = None,
consumer_group: Optional[str] = None,
avro_schema: Optional[str] = None,
key_avro_schema: Optional[str] = None,
key_data_type: Optional[KafkaMessageKeyType] = KafkaMessageKeyType.STRING,
username: Optional[str] = None,
password: Optional[str] = None,
ssl_key_location: Optional[str] = None,
ssl_ca_location: Optional[str] = None,
ssl_certificate_location: Optional[str] = None,
ssl_key_password: Optional[str] = None,
ssl_certificate_pem: Optional[str] = None,
ssl_key_pem: Optional[str] = None,
ssl_ca_pem: Optional[str] = None,
ssl_certificate_and_key_pem: Optional[str] = None,
schema_registry_url: Optional[str] = None,
schema_registry_username: Optional[str] = None,
schema_registry_password: Optional[str] = None,
o_auth_bearer_method: Optional[OAuthBearerMethod] = None,
o_auth_bearer_client_id: Optional[str] = None,
o_auth_bearer_client_secret: Optional[str] = None,
Expand All @@ -133,12 +158,18 @@ def __init__(self,
self.event_hub_connection_string = event_hub_connection_string
self.consumer_group = consumer_group
self.avro_schema = avro_schema
self.key_avro_schema = key_avro_schema
self.key_data_type = key_data_type
self.username = username
self.password = password
self.ssl_key_location = ssl_key_location
self.ssl_ca_location = ssl_ca_location
self.ssl_certificate_location = ssl_certificate_location
self.ssl_key_password = ssl_key_password
self.ssl_certificate_pem = ssl_certificate_pem
self.ssl_key_pem = ssl_key_pem
self.ssl_ca_pem = ssl_ca_pem
self.ssl_certificate_and_key_pem = ssl_certificate_and_key_pem
self.schema_registry_url = schema_registry_url
self.schema_registry_username = schema_registry_username
self.schema_registry_password = schema_registry_password
Expand Down
70 changes: 69 additions & 1 deletion tests/decorators/test_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from azure.functions.decorators.core import BindingDirection, Cardinality, \
DataType
from azure.functions.decorators.kafka import KafkaTrigger, KafkaOutput, \
BrokerAuthenticationMode, BrokerProtocol
BrokerAuthenticationMode, BrokerProtocol, KafkaMessageKeyType


class TestKafka(unittest.TestCase):
Expand Down Expand Up @@ -102,3 +102,71 @@ def test_kafka_output_valid_creation(self):
'topic': 'topic',
'type': KAFKA,
'username': 'username'})

def test_kafka_trigger_with_key_data_type_and_pem(self):
trigger = KafkaTrigger(name="arg_name",
topic="topic",
broker_list="broker_list",
key_avro_schema="key_avro_schema",
key_data_type=KafkaMessageKeyType.LONG,
ssl_certificate_pem="cert_pem",
ssl_key_pem="key_pem",
ssl_ca_pem="ca_pem",
ssl_certificate_and_key_pem="cert_and_key_pem",
data_type=DataType.UNDEFINED)

self.assertEqual(trigger.get_binding_name(), "kafkaTrigger")
dict_repr = trigger.get_dict_repr()
self.assertEqual(dict_repr["keyAvroSchema"], "key_avro_schema")
self.assertEqual(dict_repr["keyDataType"], KafkaMessageKeyType.LONG)
self.assertEqual(dict_repr["sslCertificatePem"], "cert_pem")
self.assertEqual(dict_repr["sslKeyPem"], "key_pem")
self.assertEqual(dict_repr["sslCaPem"], "ca_pem")
self.assertEqual(dict_repr["sslCertificateAndKeyPem"], "cert_and_key_pem")

def test_kafka_output_with_key_data_type_and_pem(self):
output = KafkaOutput(name="arg_name",
topic="topic",
broker_list="broker_list",
key_avro_schema="key_avro_schema",
key_data_type=KafkaMessageKeyType.BINARY,
ssl_certificate_pem="cert_pem",
ssl_key_pem="key_pem",
ssl_ca_pem="ca_pem",
ssl_certificate_and_key_pem="cert_and_key_pem",
data_type=DataType.UNDEFINED)

self.assertEqual(output.get_binding_name(), "kafka")
dict_repr = output.get_dict_repr()
self.assertEqual(dict_repr["keyAvroSchema"], "key_avro_schema")
self.assertEqual(dict_repr["keyDataType"], KafkaMessageKeyType.BINARY)
self.assertEqual(dict_repr["sslCertificatePem"], "cert_pem")
self.assertEqual(dict_repr["sslKeyPem"], "key_pem")
self.assertEqual(dict_repr["sslCaPem"], "ca_pem")
self.assertEqual(dict_repr["sslCertificateAndKeyPem"], "cert_and_key_pem")

def test_kafka_message_key_type_enum(self):
"""Test that KafkaMessageKeyType enum has the correct values"""
self.assertEqual(KafkaMessageKeyType.INT, 0)
self.assertEqual(KafkaMessageKeyType.LONG, 1)
self.assertEqual(KafkaMessageKeyType.STRING, 2)
self.assertEqual(KafkaMessageKeyType.BINARY, 3)

def test_kafka_trigger_key_data_type_default(self):
"""Test that key_data_type defaults to STRING"""
trigger = KafkaTrigger(name="arg_name",
topic="topic",
broker_list="broker_list")

dict_repr = trigger.get_dict_repr()
self.assertEqual(dict_repr["keyDataType"], KafkaMessageKeyType.STRING)

def test_kafka_output_key_data_type_default(self):
"""Test that key_data_type defaults to STRING"""
output = KafkaOutput(name="arg_name",
topic="topic",
broker_list="broker_list",
avro_schema="schema")

dict_repr = output.get_dict_repr()
self.assertEqual(dict_repr["keyDataType"], KafkaMessageKeyType.STRING)
Loading