Skip to content

feat: Add vulnerability exception methods #145

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 134 additions & 14 deletions sdcclient/_scanning.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import base64
import hashlib
import json
import re
import time

import requests
from requests_toolbelt.multipart.encoder import MultipartEncoder
import time

try:
from urllib.parse import quote_plus, unquote_plus
Expand Down Expand Up @@ -164,7 +164,8 @@ def query_image_vuln(self, image, vuln_type="", vendor_only=True):
'''
return self._query_image(image, query_group='vuln', query_type=vuln_type, vendor_only=vendor_only)

def query_images_by_vulnerability(self, vulnerability_id, namespace=None, package=None, severity=None, vendor_only=True):
def query_images_by_vulnerability(self, vulnerability_id, namespace=None, package=None, severity=None,
vendor_only=True):
'''**Description**
Search system for images with the given vulnerability ID present

Expand Down Expand Up @@ -408,17 +409,18 @@ def get_image_scan_result_by_id(self, image_id, full_tag_name, detail):
A JSON object containing pass/fail status of image scan policy.
'''
url = "{base_url}/api/scanning/v1/anchore/images/by_id/{image_id}/check?tag={full_tag_name}&detail={detail}".format(
base_url=self.url,
image_id=image_id,
full_tag_name=full_tag_name,
detail=detail)
base_url=self.url,
image_id=image_id,
full_tag_name=full_tag_name,
detail=detail)
res = requests.get(url, headers=self.hdrs, verify=self.ssl_verify)
if not self._checkResponse(res):
return [False, self.lasterr]

return [True, res.json()]

def add_registry(self, registry, registry_user, registry_pass, insecure=False, registry_type="docker_v2", validate=True):
def add_registry(self, registry, registry_user, registry_pass, insecure=False, registry_type="docker_v2",
validate=True):
'''**Description**
Add image registry

Expand All @@ -437,7 +439,8 @@ def add_registry(self, registry, registry_user, registry_pass, insecure=False, r
if registry_type and registry_type not in registry_types:
return [False, "input registry type not supported (supported registry_types: " + str(registry_types)]
if self._registry_string_is_valid(registry):
return [False, "input registry name cannot contain '/' characters - valid registry names are of the form <host>:<port> where :<port> is optional"]
return [False,
"input registry name cannot contain '/' characters - valid registry names are of the form <host>:<port> where :<port> is optional"]

if not registry_type:
registry_type = self._get_registry_type(registry)
Expand All @@ -458,7 +461,8 @@ def add_registry(self, registry, registry_user, registry_pass, insecure=False, r

return [True, res.json()]

def update_registry(self, registry, registry_user, registry_pass, insecure=False, registry_type="docker_v2", validate=True):
def update_registry(self, registry, registry_user, registry_pass, insecure=False, registry_type="docker_v2",
validate=True):
'''**Description**
Update an existing image registry.

Expand All @@ -474,7 +478,8 @@ def update_registry(self, registry, registry_user, registry_pass, insecure=False
A JSON object representing the registry.
'''
if self._registry_string_is_valid(registry):
return [False, "input registry name cannot contain '/' characters - valid registry names are of the form <host>:<port> where :<port> is optional"]
return [False,
"input registry name cannot contain '/' characters - valid registry names are of the form <host>:<port> where :<port> is optional"]

payload = {
'registry': registry,
Expand Down Expand Up @@ -502,7 +507,8 @@ def delete_registry(self, registry):
'''
# do some input string checking
if re.match(".*\\/.*", registry):
return [False, "input registry name cannot contain '/' characters - valid registry names are of the form <host>:<port> where :<port> is optional"]
return [False,
"input registry name cannot contain '/' characters - valid registry names are of the form <host>:<port> where :<port> is optional"]

url = self.url + "/api/scanning/v1/anchore/registries/" + registry
res = requests.delete(url, headers=self.hdrs, verify=self.ssl_verify)
Expand Down Expand Up @@ -539,7 +545,8 @@ def get_registry(self, registry):
A JSON object representing the registry.
'''
if self._registry_string_is_valid(registry):
return [False, "input registry name cannot contain '/' characters - valid registry names are of the form <host>:<port> where :<port> is optional"]
return [False,
"input registry name cannot contain '/' characters - valid registry names are of the form <host>:<port> where :<port> is optional"]

url = self.url + "/api/scanning/v1/anchore/registries/" + registry
res = requests.get(url, headers=self.hdrs, verify=self.ssl_verify)
Expand Down Expand Up @@ -1059,4 +1066,117 @@ def get_vulnerability_details(self, id):
if "vulnerabilities" not in json_res or not json_res["vulnerabilities"]:
return [False, f"Vulnerability {id} was not found"]

return [True, json_res["vulnerabilities"][0]]
return [True, json_res["vulnerabilities"][0]]

def add_vulnerability_exception_bundle(self, name, comment=""):
if not name:
return [False, "A name is required for the exception bundle"]

url = self.url + f"/api/scanning/v1/vulnexceptions"
params = {
"version": "1_0",
"name": name,
"comment": comment,
}

data = json.dumps(params)
res = requests.post(url, headers=self.hdrs, data=data, verify=self.ssl_verify)
if not self._checkResponse(res):
return [False, self.lasterr]

return [True, res.json()]

def delete_vulnerability_exception_bundle(self, id):

url = self.url + f"/api/scanning/v1/vulnexceptions/{id}"

res = requests.delete(url, headers=self.hdrs, verify=self.ssl_verify)
if not self._checkResponse(res):
return [False, self.lasterr]

return [True, None]

def list_vulnerability_exception_bundles(self):
url = self.url + f"/api/scanning/v1/vulnexceptions"

params = {
"bundleId": "default",
}

res = requests.get(url, params=params, headers=self.hdrs, verify=self.ssl_verify)
if not self._checkResponse(res):
return [False, self.lasterr]

return [True, res.json()]

def get_vulnerability_exception_bundle(self, bundle):
url = f"{self.url}/api/scanning/v1/vulnexceptions/{bundle}"

params = {
"bundleId": "default",
}

res = requests.get(url, params=params, headers=self.hdrs, verify=self.ssl_verify)
if not self._checkResponse(res):
return [False, self.lasterr]

res_json = res.json()
for item in res_json["items"]:
item["trigger_id"] = str(item["trigger_id"]).rstrip("+*")
return [True, res_json]

def add_vulnerability_exception(self, bundle, cve, note=None, expiration_date=None):
url = f"{self.url}/api/scanning/v1/vulnexceptions/{bundle}/vulnerabilities"

params = {
"gate": "vulnerabilities",
"is_busy": False,
"trigger_id": f"{cve}+*",
"expiration_date": int(expiration_date) if expiration_date else None,
"notes": note,
}

data = json.dumps(params)
res = requests.post(url, headers=self.hdrs, data=data, verify=self.ssl_verify)
if not self._checkResponse(res):
return [False, self.lasterr]

res_json = res.json()
res_json["trigger_id"] = str(res_json["trigger_id"]).rstrip("+*")
return [True, res_json]

def delete_vulnerability_exception(self, bundle, id):
url = f"{self.url}/api/scanning/v1/vulnexceptions/{bundle}/vulnerabilities/{id}"

params = {
"bundleId": "default",
}

res = requests.delete(url, params=params, headers=self.hdrs, verify=self.ssl_verify)
if not self._checkResponse(res):
return [False, self.lasterr]

return [True, None]

def update_vulnerability_exception(self, bundle, id, cve, enabled, note, expiration_date):
url = f"{self.url}/api/scanning/v1/vulnexceptions/{bundle}/vulnerabilities/{id}"

data = {
"id": id,
"gate": "vulnerabilities",
"trigger_id": f"{cve}+*",
"enabled": enabled,
"notes": note,
"expiration_date": int(expiration_date) if expiration_date else None,
}
params = {
"bundleId": "default",
}

res = requests.put(url, data=json.dumps(data), params=params, headers=self.hdrs, verify=self.ssl_verify)
if not self._checkResponse(res):
return [False, self.lasterr]

res_json = res.json()
res_json["trigger_id"] = str(res_json["trigger_id"]).rstrip("+*")
return [True, res_json]
178 changes: 178 additions & 0 deletions specs/secure/scanning_vulnerability_exceptions_spec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
import datetime
import os
import uuid

from expects import equal, expect, contain, be_empty, have_key, be_true, have_keys, not_, be_false, be_above
from mamba import before, context, description, after, it

from sdcclient import SdScanningClient
from specs import be_successful_api_call

with description("Scanning vulnerability exceptions") as self:
with before.each:
self.client = SdScanningClient(sdc_url=os.getenv("SDC_SECURE_URL", "https://secure.sysdig.com"),
token=os.getenv("SDC_SECURE_TOKEN"))

with after.each:
self.clean_bundles()


def clean_bundles(self):
_, res = self.client.list_vulnerability_exception_bundles()
for bundle in res:
if str(bundle["name"]).startswith("test_exception_bundle_"):
call = self.client.delete_vulnerability_exception_bundle(id=bundle["id"])
expect(call).to(be_successful_api_call)


with context("when we are creating a new vulnerability exception bundle"):
with it("creates the bundle correctly"):
exception_bundle = f"test_exception_bundle_{uuid.uuid4()}"
exception_comment = "This is an example of an exception bundle"
ok, res = self.client.add_vulnerability_exception_bundle(name=exception_bundle, comment=exception_comment)

expect((ok, res)).to(be_successful_api_call)
expect(res).to(
have_keys("id", items=be_empty, policyBundleId=equal("default"), version="1_0",
comment=equal(exception_comment), name=equal(exception_bundle))
)

with it("creates the bundle correctly with name only and removes it correctly"):
exception_bundle = f"test_exception_bundle_{uuid.uuid4()}"
ok, res = self.client.add_vulnerability_exception_bundle(name=exception_bundle)

expect((ok, res)).to(be_successful_api_call)
expect(res).to(
have_keys("id", items=be_empty, policyBundleId=equal("default"), version="1_0",
comment=be_empty, name=equal(exception_bundle))
)

with context("when we are listing the vulnerability exception bundles"):
with before.each:
self.exception_bundle = f"test_exception_bundle_{uuid.uuid4()}"
ok, res = self.client.add_vulnerability_exception_bundle(name=self.exception_bundle)
expect((ok, res)).to(be_successful_api_call)
self.created_exception_bundle = res["id"]

with it("retrieves the list of bundles"):
ok, res = self.client.list_vulnerability_exception_bundles()

expect((ok, res)).to(be_successful_api_call)
expect(res).to(contain(
have_keys(id=self.created_exception_bundle, items=None, policyBundleId=equal("default"),
version=equal("1_0"), comment=be_empty, name=equal(self.exception_bundle))
))

with context("when we are working with vulnerability exceptions in a bundle"):
with before.each:
ok, res = self.client.add_vulnerability_exception_bundle(name=f"test_exception_bundle_{uuid.uuid4()}")
expect((ok, res)).to(be_successful_api_call)
self.created_exception_bundle = res["id"]

with it("is able to add a vulnerability exception to a bundle"):
exception_notes = "Microsoft Vulnerability"
exception_cve = "CVE-2020-1234"
ok, res = self.client.add_vulnerability_exception(bundle=self.created_exception_bundle,
cve=exception_cve,
note=exception_notes,
expiration_date=datetime.datetime(2030, 12, 31)
.timestamp())

expect((ok, res)).to(be_successful_api_call)
expect(res).to(
have_keys("id", "description", gate=equal("vulnerabilities"), trigger_id=equal(exception_cve),
notes=equal(exception_notes), enabled=be_true)
)

with context("and there are existing vulnerability exceptions"):
with before.each:
self.created_exception_cve = "CVE-2020-1234"
ok, res = self.client.add_vulnerability_exception(bundle=self.created_exception_bundle,
cve=self.created_exception_cve)
expect((ok, res)).to(be_successful_api_call)
self.created_exception = res["id"]

with it("is able to list all the vulnerability exceptions from a bundle"):
ok, res = self.client.get_vulnerability_exception_bundle(bundle=self.created_exception_bundle)

expect((ok, res)).to(be_successful_api_call)
expect(res).to(
have_keys(id=equal(self.created_exception_bundle),
items=contain(
have_keys(
id=equal(self.created_exception),
gate=equal("vulnerabilities"),
trigger_id=equal(self.created_exception_cve),
enabled=be_true,
)
))
)

with it("is able to remove them"):
_, ex_before = self.client.get_vulnerability_exception_bundle(bundle=self.created_exception_bundle)
ok, res = self.client.delete_vulnerability_exception(bundle=self.created_exception_bundle,
id=self.created_exception)
_, ex_after = self.client.get_vulnerability_exception_bundle(bundle=self.created_exception_bundle)

expect((ok, res)).to(be_successful_api_call)
expect(ex_before).to(
have_key("items", contain(
have_keys(
id=equal(self.created_exception),
gate=equal("vulnerabilities"),
trigger_id=equal(self.created_exception_cve),
enabled=be_true,
)
))
)
expect(ex_after).to(
have_key("items", not_(contain(
have_keys(
id=equal(self.created_exception),
gate=equal("vulnerabilities"),
trigger_id=equal(self.created_exception_cve),
enabled=be_true,
)
)))
)

with it("is able to update them"):
_, ex_before = self.client.get_vulnerability_exception_bundle(bundle=self.created_exception_bundle)

ok, res = self.client.update_vulnerability_exception(bundle=self.created_exception_bundle,
id=self.created_exception,
cve="CVE-2020-1235",
enabled=False,
note="Dummy note",
expiration_date=datetime.datetime(2030, 12, 31)
.timestamp())

_, ex_after = self.client.get_vulnerability_exception_bundle(bundle=self.created_exception_bundle)

expect((ok, res)).to(be_successful_api_call)

expect(ex_before).to(
have_key("items", contain(
have_keys(
id=equal(self.created_exception),
gate=equal("vulnerabilities"),
trigger_id=equal(self.created_exception_cve),
notes=equal(None),
expiration_date=equal(None),
enabled=be_true,
)
))
)

expect(ex_after).to(
have_key("items", contain(
have_keys(
id=equal(self.created_exception),
gate=equal("vulnerabilities"),
trigger_id=equal("CVE-2020-1235"),
notes=equal("Dummy note"),
expiration_date=be_above(0),
enabled=be_false,
)
))
)
Loading