Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/azure-firewall/HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

Release History
===============
2.0.0
++++++
* `az network azure-firewall packet-capture-operation`: Add packet capture operation support for the azure firewall

1.5.0
++++++
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
#
# Code generated by aaz-dev-tools
# --------------------------------------------------------------------------------------------

# pylint: skip-file
# flake8: noqa

from azure.cli.core.aaz import *


@register_command_group(
"network azure-firewall",
)
class __CMDGroup(AAZCommandGroup):
"""Manage Azure Firewall
"""
pass


__all__ = ["__CMDGroup"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
#
# Code generated by aaz-dev-tools
# --------------------------------------------------------------------------------------------

# pylint: skip-file
# flake8: noqa

from .__cmd_group import *
from ._packet_capture_operation import *
Original file line number Diff line number Diff line change
@@ -0,0 +1,336 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
#
# Code generated by aaz-dev-tools
# --------------------------------------------------------------------------------------------

# pylint: skip-file
# flake8: noqa

from azure.cli.core.aaz import *


@register_command(
"network azure-firewall packet-capture-operation",
)
class PacketCaptureOperation(AAZCommand):
"""Runs a packet capture operation on AzureFirewall.

Runs a start/status/stop packet capture operation on the Azure Firewall.

:example: AzureFirewallPacketCaptureOperation
az network azure-firewall packet-capture-operation --resource-group rg1 --azure-firewall-name azureFirewall1 --duration-in-seconds 300 --number-of-packets-to-capture 5000 --sas-url someSASURL --file-name azureFirewallPacketCapture --protocol Any --flags "[{type:syn},{type:fin}]" --filters "[{sources:[20.1.1.0],destinations:[20.1.2.0],destination-ports:[4500]},{sources:[10.1.1.0,10.1.1.1],destinations:[10.1.2.0],destination-ports:[123,80]}]" --operation Start
az network azure-firewall packet-capture-operation --resource-group rg1 --azure-firewall-name azureFirewall1 --operation Status
az network azure-firewall packet-capture-operation --resource-group rg1 --azure-firewall-name azureFirewall1 --operation Stop
"""

_aaz_info = {
"version": "2024-10-01",
"resources": [
["mgmt-plane", "/subscriptions/{}/resourcegroups/{}/providers/microsoft.network/azurefirewalls/{}/packetcaptureoperation", "2024-10-01"],
]
}

AZ_SUPPORT_NO_WAIT = True

def _handler(self, command_args):
super()._handler(command_args)
return self.build_lro_poller(self._execute_operations, self._output)

_args_schema = None

@classmethod
def _build_arguments_schema(cls, *args, **kwargs):
if cls._args_schema is not None:
return cls._args_schema
cls._args_schema = super()._build_arguments_schema(*args, **kwargs)

# define Arg Group ""

_args_schema = cls._args_schema
_args_schema.azure_firewall_name = AAZStrArg(
options=["--azure-firewall-name"],
help="The name of the azure firewall.",
required=True,
id_part="name",
fmt=AAZStrArgFormat(
pattern="^[A-Za-z0-9][\\w\\-._]{0,54}[A-Za-z0-9_]$",
),
)
_args_schema.resource_group = AAZResourceGroupNameArg(
required=True,
)

# define Arg Group "Parameters"

_args_schema = cls._args_schema
_args_schema.duration_in_seconds = AAZIntArg(
options=["--duration-in-seconds"],
arg_group="Parameters",
help="Duration of packet capture in seconds. If the field is not provided, the default value is 60.",
default=60,
fmt=AAZIntArgFormat(
maximum=1800,
minimum=30,
),
)
_args_schema.file_name = AAZStrArg(
options=["--file-name"],
arg_group="Parameters",
help="Name of file to be uploaded to sasURL",
)
_args_schema.filters = AAZListArg(
options=["--filters"],
arg_group="Parameters",
help="Rules to filter packet captures.",
)
_args_schema.flags = AAZListArg(
options=["--flags"],
arg_group="Parameters",
help="The tcp-flag type to be captured. Used with protocol TCP",
)
_args_schema.number_of_packets_to_capture = AAZIntArg(
options=["--num-packets", "--number-of-packets-to-capture"],
arg_group="Parameters",
help="Number of packets to be captured. If the field is not provided, the default value is 1000.",
default=1000,
fmt=AAZIntArgFormat(
maximum=90000,
minimum=100,
),
)
_args_schema.operation = AAZStrArg(
options=["--operation"],
arg_group="Parameters",
help="The Azure Firewall packet capture operation to perform",
default="Start",
enum={"Start": "Start", "Status": "Status", "Stop": "Stop"},
)
_args_schema.protocol = AAZStrArg(
options=["--protocol"],
arg_group="Parameters",
help="The protocol of packets to capture",
enum={"Any": "Any", "ICMP": "ICMP", "TCP": "TCP", "UDP": "UDP"},
)
_args_schema.sas_url = AAZStrArg(
options=["--sas-url"],
arg_group="Parameters",
help="SAS URL of the destination blob container where the packet capture file will be uploaded.",
)

filters = cls._args_schema.filters
filters.Element = AAZObjectArg()

_element = cls._args_schema.filters.Element
_element.destination_ports = AAZListArg(
options=["destination-ports"],
help="List of ports to be captured.",
)
_element.destinations = AAZListArg(
options=["destinations"],
help="List of destination IP addresses/subnets to be captured.",
)
_element.sources = AAZListArg(
options=["sources"],
help="List of source IP addresses/subnets to be captured.",
)

destination_ports = cls._args_schema.filters.Element.destination_ports
destination_ports.Element = AAZStrArg()

destinations = cls._args_schema.filters.Element.destinations
destinations.Element = AAZStrArg()

sources = cls._args_schema.filters.Element.sources
sources.Element = AAZStrArg()

flags = cls._args_schema.flags
flags.Element = AAZObjectArg()

_element = cls._args_schema.flags.Element
_element.type = AAZStrArg(
options=["type"],
help="Flags to capture",
enum={"ack": "ack", "fin": "fin", "push": "push", "rst": "rst", "syn": "syn", "urg": "urg"},
)
return cls._args_schema

def _execute_operations(self):
self.pre_operations()
yield self.AzureFirewallsPacketCaptureOperation(ctx=self.ctx)()
self.post_operations()

@register_callback
def pre_operations(self):
pass

@register_callback
def post_operations(self):
pass

def _output(self, *args, **kwargs):
result = self.deserialize_output(self.ctx.vars.instance, client_flatten=False)
return result

class AzureFirewallsPacketCaptureOperation(AAZHttpOperation):
CLIENT_TYPE = "MgmtClient"

def __call__(self, *args, **kwargs):
request = self.make_request()
session = self.client.send_request(request=request, stream=False, **kwargs)
if session.http_response.status_code in [202]:
return self.client.build_lro_polling(
self.ctx.args.no_wait,
session,
self.on_200,
self.on_error,
lro_options={"final-state-via": "location"},
path_format_arguments=self.url_parameters,
)
if session.http_response.status_code in [200]:
return self.client.build_lro_polling(
self.ctx.args.no_wait,
session,
self.on_200,
self.on_error,
lro_options={"final-state-via": "location"},
path_format_arguments=self.url_parameters,
)

return self.on_error(session.http_response)

@property
def url(self):
return self.client.format_url(
"/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Network/azureFirewalls/{azureFirewallName}/packetCaptureOperation",
**self.url_parameters
)

@property
def method(self):
return "POST"

@property
def error_format(self):
return "ODataV4Format"

@property
def url_parameters(self):
parameters = {
**self.serialize_url_param(
"azureFirewallName", self.ctx.args.azure_firewall_name,
required=True,
),
**self.serialize_url_param(
"resourceGroupName", self.ctx.args.resource_group,
required=True,
),
**self.serialize_url_param(
"subscriptionId", self.ctx.subscription_id,
required=True,
),
}
return parameters

@property
def query_parameters(self):
parameters = {
**self.serialize_query_param(
"api-version", "2024-10-01",
required=True,
),
}
return parameters

@property
def header_parameters(self):
parameters = {
**self.serialize_header_param(
"Content-Type", "application/json",
),
**self.serialize_header_param(
"Accept", "application/json",
),
}
return parameters

@property
def content(self):
_content_value, _builder = self.new_content_builder(
self.ctx.args,
typ=AAZObjectType,
typ_kwargs={"flags": {"required": True, "client_flatten": True}}
)
_builder.set_prop("durationInSeconds", AAZIntType, ".duration_in_seconds")
_builder.set_prop("fileName", AAZStrType, ".file_name")
_builder.set_prop("filters", AAZListType, ".filters")
_builder.set_prop("flags", AAZListType, ".flags")
_builder.set_prop("numberOfPacketsToCapture", AAZIntType, ".number_of_packets_to_capture")
_builder.set_prop("operation", AAZStrType, ".operation")
_builder.set_prop("protocol", AAZStrType, ".protocol")
_builder.set_prop("sasUrl", AAZStrType, ".sas_url")

filters = _builder.get(".filters")
if filters is not None:
filters.set_elements(AAZObjectType, ".")

_elements = _builder.get(".filters[]")
if _elements is not None:
_elements.set_prop("destinationPorts", AAZListType, ".destination_ports")
_elements.set_prop("destinations", AAZListType, ".destinations")
_elements.set_prop("sources", AAZListType, ".sources")

destination_ports = _builder.get(".filters[].destinationPorts")
if destination_ports is not None:
destination_ports.set_elements(AAZStrType, ".")

destinations = _builder.get(".filters[].destinations")
if destinations is not None:
destinations.set_elements(AAZStrType, ".")

sources = _builder.get(".filters[].sources")
if sources is not None:
sources.set_elements(AAZStrType, ".")

flags = _builder.get(".flags")
if flags is not None:
flags.set_elements(AAZObjectType, ".")

_elements = _builder.get(".flags[]")
if _elements is not None:
_elements.set_prop("type", AAZStrType, ".type")

return self.serialize_content(_content_value)

def on_200(self, session):
data = self.deserialize_http_content(session)
self.ctx.set_var(
"instance",
data,
schema_builder=self._build_schema_on_200
)

_schema_on_200 = None

@classmethod
def _build_schema_on_200(cls):
if cls._schema_on_200 is not None:
return cls._schema_on_200

cls._schema_on_200 = AAZObjectType()

_schema_on_200 = cls._schema_on_200
_schema_on_200.message = AAZStrType()
_schema_on_200.status_code = AAZStrType(
serialized_name="statusCode",
)

return cls._schema_on_200


class _PacketCaptureOperationHelper:
"""Helper class for PacketCaptureOperation"""


__all__ = ["PacketCaptureOperation"]
Loading
Loading