Skip to content
This repository was archived by the owner on Sep 5, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 64 additions & 50 deletions samples/snippets/snippets_notification_configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,26 @@
"""Demos for working with notification configs."""


def create_notification_config(organization_id, notification_config_id, pubsub_topic):

# [START securitycenter_create_notification_config]
# [START securitycenter_create_notification_config]
def create_notification_config(parent_id, notification_config_id, pubsub_topic):
"""
Args:
parent_id: must be in one of the following formats:
"organizations/{organization_id}"
"projects/{project_id}"
"folders/{folder_id}"
notification_config_id: "your-config-id"
pubsub_topic: "projects/{your-project-id}/topics/{your-topic-ic}"

Ensure this ServiceAccount has the "pubsub.topics.setIamPolicy" permission on the new topic.
"""
from google.cloud import securitycenter as securitycenter

client = securitycenter.SecurityCenterClient()

# TODO: organization_id = "your-org-id"
# TODO: notification_config_id = "your-config-id"
# TODO: pubsub_topic = "projects/{your-project-id}/topics/{your-topic-ic}"
# Ensure this ServiceAccount has the "pubsub.topics.setIamPolicy" permission on the new topic.

org_name = "organizations/{org_id}".format(org_id=organization_id)

created_notification_config = client.create_notification_config(
request={
"parent": org_name,
"parent": parent_id,
"config_id": notification_config_id,
"notification_config": {
"description": "Notification for active findings",
Expand All @@ -47,88 +50,99 @@ def create_notification_config(organization_id, notification_config_id, pubsub_t
return created_notification_config


def delete_notification_config(organization_id, notification_config_id):

# [START securitycenter_delete_notification_config]
# [START securitycenter_delete_notification_config]
def delete_notification_config(parent_id, notification_config_id):
"""
Args:
parent_id: must be in one of the following formats:
"organizations/{organization_id}"
"projects/{project_id}"
"folders/{folder_id}"
notification_config_id: "your-config-id"
"""
from google.cloud import securitycenter as securitycenter

client = securitycenter.SecurityCenterClient()

# TODO: organization_id = "your-org-id"
# TODO: notification_config_id = "your-config-id"

notification_config_name = (
"organizations/{org_id}/notificationConfigs/{config_id}".format(
org_id=organization_id, config_id=notification_config_id
)
f"{parent_id}/notificationConfigs/{notification_config_id}"
)

client.delete_notification_config(request={"name": notification_config_name})
print("Deleted notification config: {}".format(notification_config_name))
print(f"Deleted notification config: {notification_config_name}")
# [END securitycenter_delete_notification_config]
return True


def get_notification_config(organization_id, notification_config_id):

# [START securitycenter_get_notification_config]
# [START securitycenter_get_notification_config]
def get_notification_config(parent_id, notification_config_id):
"""
Args:
parent_id: must be in one of the following formats:
"organizations/{organization_id}"
"projects/{project_id}"
"folders/{folder_id}"
notification_config_id: "your-config-id"
"""
from google.cloud import securitycenter as securitycenter

client = securitycenter.SecurityCenterClient()

# TODO: organization_id = "your-org-id"
# TODO: notification_config_id = "your-config-id"

notification_config_name = (
"organizations/{org_id}/notificationConfigs/{config_id}".format(
org_id=organization_id, config_id=notification_config_id
)
f"{parent_id}/notificationConfigs/{notification_config_id}"
)

notification_config = client.get_notification_config(
request={"name": notification_config_name}
)
print("Got notification config: {}".format(notification_config))
print(f"Got notification config: {notification_config}")
# [END securitycenter_get_notification_config]
return notification_config


def list_notification_configs(organization_id):

# [START securitycenter_list_notification_configs]
# [START securitycenter_list_notification_configs]
def list_notification_configs(parent_id):
"""
Args:
parent_id: must be in one of the following formats:
"organizations/{organization_id}"
"projects/{project_id}"
"folders/{folder_id}"
"""
from google.cloud import securitycenter as securitycenter

client = securitycenter.SecurityCenterClient()

# TODO: organization_id = "your-org-id"
org_name = "organizations/{org_id}".format(org_id=organization_id)

notification_configs_iterator = client.list_notification_configs(
request={"parent": org_name}
request={"parent": parent_id}
)
for i, config in enumerate(notification_configs_iterator):
print("{}: notification_config: {}".format(i, config))
print(f"{i}: notification_config: {config}")
# [END securitycenter_list_notification_configs]]
return notification_configs_iterator


def update_notification_config(organization_id, notification_config_id, pubsub_topic):
# [START securitycenter_update_notification_config]
# [START securitycenter_update_notification_config]
def update_notification_config(parent_id, notification_config_id, pubsub_topic):
"""
Args:
parent_id: must be in one of the following formats:
"organizations/{organization_id}"
"projects/{project_id}"
"folders/{folder_id}"
notification_config_id: "config-id-to-update"
pubsub_topic: "projects/{new-project}/topics/{new-topic}"

If updating a pubsub_topic, ensure this ServiceAccount has the
"pubsub.topics.setIamPolicy" permission on the new topic.
"""
from google.cloud import securitycenter as securitycenter
from google.protobuf import field_mask_pb2

client = securitycenter.SecurityCenterClient()

# TODO organization_id = "your-org-id"
# TODO notification_config_id = "config-id-to-update"
# TODO pubsub_topic = "projects/{new-project}/topics/{new-topic}"
# If updating a pubsub_topic, ensure this ServiceAccount has the
# "pubsub.topics.setIamPolicy" permission on the new topic.

notification_config_name = (
"organizations/{org_id}/notificationConfigs/{config_id}".format(
org_id=organization_id, config_id=notification_config_id
)
f"{parent_id}/notificationConfigs/{notification_config_id}"
)

updated_description = "New updated description"
Expand Down
12 changes: 7 additions & 5 deletions samples/snippets/snippets_notification_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def deleted_notification_config():
def test_create_notification_config():
created_notification_config = (
snippets_notification_configs.create_notification_config(
ORG_ID, CREATE_CONFIG_ID, PUBSUB_TOPIC
f"organizations/{ORG_ID}", CREATE_CONFIG_ID, PUBSUB_TOPIC
)
)
assert created_notification_config is not None
Expand All @@ -121,25 +121,27 @@ def test_create_notification_config():

def test_delete_notification_config(deleted_notification_config):
assert snippets_notification_configs.delete_notification_config(
ORG_ID, DELETE_CONFIG_ID
f"organizations/{ORG_ID}", DELETE_CONFIG_ID
)


def test_get_notification_config(new_notification_config_for_get):
retrieved_config = snippets_notification_configs.get_notification_config(
ORG_ID, GET_CONFIG_ID
f"organizations/{ORG_ID}", GET_CONFIG_ID
)
assert retrieved_config is not None


def test_list_notification_configs():
iterator = snippets_notification_configs.list_notification_configs(ORG_ID)
iterator = snippets_notification_configs.list_notification_configs(
f"organizations/{ORG_ID}"
)
assert iterator is not None


def test_update_notification_config(new_notification_config_for_update):
updated_config = snippets_notification_configs.update_notification_config(
ORG_ID, UPDATE_CONFIG_ID, PUBSUB_TOPIC
f"organizations/{ORG_ID}", UPDATE_CONFIG_ID, PUBSUB_TOPIC
)
assert updated_config is not None

Expand Down