diff --git a/sdcclient/_scanning.py b/sdcclient/_scanning.py index 0cb906b6..efe18f13 100644 --- a/sdcclient/_scanning.py +++ b/sdcclient/_scanning.py @@ -1,10 +1,10 @@ import base64 -import hashlib import json import re +import time + import requests from requests_toolbelt.multipart.encoder import MultipartEncoder -import time try: from urllib.parse import quote_plus, unquote_plus @@ -164,7 +164,8 @@ def query_image_vuln(self, image, vuln_type="", vendor_only=True): ''' return self._query_image(image, query_group='vuln', query_type=vuln_type, vendor_only=vendor_only) - def query_images_by_vulnerability(self, vulnerability_id, namespace=None, package=None, severity=None, vendor_only=True): + def query_images_by_vulnerability(self, vulnerability_id, namespace=None, package=None, severity=None, + vendor_only=True): '''**Description** Search system for images with the given vulnerability ID present @@ -408,17 +409,18 @@ def get_image_scan_result_by_id(self, image_id, full_tag_name, detail): A JSON object containing pass/fail status of image scan policy. ''' url = "{base_url}/api/scanning/v1/anchore/images/by_id/{image_id}/check?tag={full_tag_name}&detail={detail}".format( - base_url=self.url, - image_id=image_id, - full_tag_name=full_tag_name, - detail=detail) + base_url=self.url, + image_id=image_id, + full_tag_name=full_tag_name, + detail=detail) res = requests.get(url, headers=self.hdrs, verify=self.ssl_verify) if not self._checkResponse(res): return [False, self.lasterr] return [True, res.json()] - def add_registry(self, registry, registry_user, registry_pass, insecure=False, registry_type="docker_v2", validate=True): + def add_registry(self, registry, registry_user, registry_pass, insecure=False, registry_type="docker_v2", + validate=True): '''**Description** Add image registry @@ -437,7 +439,8 @@ def add_registry(self, registry, registry_user, registry_pass, insecure=False, r if registry_type and registry_type not in registry_types: return [False, "input registry type not supported (supported registry_types: " + str(registry_types)] if self._registry_string_is_valid(registry): - return [False, "input registry name cannot contain '/' characters - valid registry names are of the form : where : is optional"] + return [False, + "input registry name cannot contain '/' characters - valid registry names are of the form : where : is optional"] if not registry_type: registry_type = self._get_registry_type(registry) @@ -458,7 +461,8 @@ def add_registry(self, registry, registry_user, registry_pass, insecure=False, r return [True, res.json()] - def update_registry(self, registry, registry_user, registry_pass, insecure=False, registry_type="docker_v2", validate=True): + def update_registry(self, registry, registry_user, registry_pass, insecure=False, registry_type="docker_v2", + validate=True): '''**Description** Update an existing image registry. @@ -474,7 +478,8 @@ def update_registry(self, registry, registry_user, registry_pass, insecure=False A JSON object representing the registry. ''' if self._registry_string_is_valid(registry): - return [False, "input registry name cannot contain '/' characters - valid registry names are of the form : where : is optional"] + return [False, + "input registry name cannot contain '/' characters - valid registry names are of the form : where : is optional"] payload = { 'registry': registry, @@ -502,7 +507,8 @@ def delete_registry(self, registry): ''' # do some input string checking if re.match(".*\\/.*", registry): - return [False, "input registry name cannot contain '/' characters - valid registry names are of the form : where : is optional"] + return [False, + "input registry name cannot contain '/' characters - valid registry names are of the form : where : is optional"] url = self.url + "/api/scanning/v1/anchore/registries/" + registry res = requests.delete(url, headers=self.hdrs, verify=self.ssl_verify) @@ -539,7 +545,8 @@ def get_registry(self, registry): A JSON object representing the registry. ''' if self._registry_string_is_valid(registry): - return [False, "input registry name cannot contain '/' characters - valid registry names are of the form : where : is optional"] + return [False, + "input registry name cannot contain '/' characters - valid registry names are of the form : where : is optional"] url = self.url + "/api/scanning/v1/anchore/registries/" + registry res = requests.get(url, headers=self.hdrs, verify=self.ssl_verify) @@ -1059,4 +1066,117 @@ def get_vulnerability_details(self, id): if "vulnerabilities" not in json_res or not json_res["vulnerabilities"]: return [False, f"Vulnerability {id} was not found"] - return [True, json_res["vulnerabilities"][0]] \ No newline at end of file + return [True, json_res["vulnerabilities"][0]] + + def add_vulnerability_exception_bundle(self, name, comment=""): + if not name: + return [False, "A name is required for the exception bundle"] + + url = self.url + f"/api/scanning/v1/vulnexceptions" + params = { + "version": "1_0", + "name": name, + "comment": comment, + } + + data = json.dumps(params) + res = requests.post(url, headers=self.hdrs, data=data, verify=self.ssl_verify) + if not self._checkResponse(res): + return [False, self.lasterr] + + return [True, res.json()] + + def delete_vulnerability_exception_bundle(self, id): + + url = self.url + f"/api/scanning/v1/vulnexceptions/{id}" + + res = requests.delete(url, headers=self.hdrs, verify=self.ssl_verify) + if not self._checkResponse(res): + return [False, self.lasterr] + + return [True, None] + + def list_vulnerability_exception_bundles(self): + url = self.url + f"/api/scanning/v1/vulnexceptions" + + params = { + "bundleId": "default", + } + + res = requests.get(url, params=params, headers=self.hdrs, verify=self.ssl_verify) + if not self._checkResponse(res): + return [False, self.lasterr] + + return [True, res.json()] + + def get_vulnerability_exception_bundle(self, bundle): + url = f"{self.url}/api/scanning/v1/vulnexceptions/{bundle}" + + params = { + "bundleId": "default", + } + + res = requests.get(url, params=params, headers=self.hdrs, verify=self.ssl_verify) + if not self._checkResponse(res): + return [False, self.lasterr] + + res_json = res.json() + for item in res_json["items"]: + item["trigger_id"] = str(item["trigger_id"]).rstrip("+*") + return [True, res_json] + + def add_vulnerability_exception(self, bundle, cve, note=None, expiration_date=None): + url = f"{self.url}/api/scanning/v1/vulnexceptions/{bundle}/vulnerabilities" + + params = { + "gate": "vulnerabilities", + "is_busy": False, + "trigger_id": f"{cve}+*", + "expiration_date": int(expiration_date) if expiration_date else None, + "notes": note, + } + + data = json.dumps(params) + res = requests.post(url, headers=self.hdrs, data=data, verify=self.ssl_verify) + if not self._checkResponse(res): + return [False, self.lasterr] + + res_json = res.json() + res_json["trigger_id"] = str(res_json["trigger_id"]).rstrip("+*") + return [True, res_json] + + def delete_vulnerability_exception(self, bundle, id): + url = f"{self.url}/api/scanning/v1/vulnexceptions/{bundle}/vulnerabilities/{id}" + + params = { + "bundleId": "default", + } + + res = requests.delete(url, params=params, headers=self.hdrs, verify=self.ssl_verify) + if not self._checkResponse(res): + return [False, self.lasterr] + + return [True, None] + + def update_vulnerability_exception(self, bundle, id, cve, enabled, note, expiration_date): + url = f"{self.url}/api/scanning/v1/vulnexceptions/{bundle}/vulnerabilities/{id}" + + data = { + "id": id, + "gate": "vulnerabilities", + "trigger_id": f"{cve}+*", + "enabled": enabled, + "notes": note, + "expiration_date": int(expiration_date) if expiration_date else None, + } + params = { + "bundleId": "default", + } + + res = requests.put(url, data=json.dumps(data), params=params, headers=self.hdrs, verify=self.ssl_verify) + if not self._checkResponse(res): + return [False, self.lasterr] + + res_json = res.json() + res_json["trigger_id"] = str(res_json["trigger_id"]).rstrip("+*") + return [True, res_json] \ No newline at end of file diff --git a/specs/secure/scanning_vulnerability_exceptions_spec.py b/specs/secure/scanning_vulnerability_exceptions_spec.py new file mode 100644 index 00000000..c2cc4eab --- /dev/null +++ b/specs/secure/scanning_vulnerability_exceptions_spec.py @@ -0,0 +1,178 @@ +import datetime +import os +import uuid + +from expects import equal, expect, contain, be_empty, have_key, be_true, have_keys, not_, be_false, be_above +from mamba import before, context, description, after, it + +from sdcclient import SdScanningClient +from specs import be_successful_api_call + +with description("Scanning vulnerability exceptions") as self: + with before.each: + self.client = SdScanningClient(sdc_url=os.getenv("SDC_SECURE_URL", "https://secure.sysdig.com"), + token=os.getenv("SDC_SECURE_TOKEN")) + + with after.each: + self.clean_bundles() + + + def clean_bundles(self): + _, res = self.client.list_vulnerability_exception_bundles() + for bundle in res: + if str(bundle["name"]).startswith("test_exception_bundle_"): + call = self.client.delete_vulnerability_exception_bundle(id=bundle["id"]) + expect(call).to(be_successful_api_call) + + + with context("when we are creating a new vulnerability exception bundle"): + with it("creates the bundle correctly"): + exception_bundle = f"test_exception_bundle_{uuid.uuid4()}" + exception_comment = "This is an example of an exception bundle" + ok, res = self.client.add_vulnerability_exception_bundle(name=exception_bundle, comment=exception_comment) + + expect((ok, res)).to(be_successful_api_call) + expect(res).to( + have_keys("id", items=be_empty, policyBundleId=equal("default"), version="1_0", + comment=equal(exception_comment), name=equal(exception_bundle)) + ) + + with it("creates the bundle correctly with name only and removes it correctly"): + exception_bundle = f"test_exception_bundle_{uuid.uuid4()}" + ok, res = self.client.add_vulnerability_exception_bundle(name=exception_bundle) + + expect((ok, res)).to(be_successful_api_call) + expect(res).to( + have_keys("id", items=be_empty, policyBundleId=equal("default"), version="1_0", + comment=be_empty, name=equal(exception_bundle)) + ) + + with context("when we are listing the vulnerability exception bundles"): + with before.each: + self.exception_bundle = f"test_exception_bundle_{uuid.uuid4()}" + ok, res = self.client.add_vulnerability_exception_bundle(name=self.exception_bundle) + expect((ok, res)).to(be_successful_api_call) + self.created_exception_bundle = res["id"] + + with it("retrieves the list of bundles"): + ok, res = self.client.list_vulnerability_exception_bundles() + + expect((ok, res)).to(be_successful_api_call) + expect(res).to(contain( + have_keys(id=self.created_exception_bundle, items=None, policyBundleId=equal("default"), + version=equal("1_0"), comment=be_empty, name=equal(self.exception_bundle)) + )) + + with context("when we are working with vulnerability exceptions in a bundle"): + with before.each: + ok, res = self.client.add_vulnerability_exception_bundle(name=f"test_exception_bundle_{uuid.uuid4()}") + expect((ok, res)).to(be_successful_api_call) + self.created_exception_bundle = res["id"] + + with it("is able to add a vulnerability exception to a bundle"): + exception_notes = "Microsoft Vulnerability" + exception_cve = "CVE-2020-1234" + ok, res = self.client.add_vulnerability_exception(bundle=self.created_exception_bundle, + cve=exception_cve, + note=exception_notes, + expiration_date=datetime.datetime(2030, 12, 31) + .timestamp()) + + expect((ok, res)).to(be_successful_api_call) + expect(res).to( + have_keys("id", "description", gate=equal("vulnerabilities"), trigger_id=equal(exception_cve), + notes=equal(exception_notes), enabled=be_true) + ) + + with context("and there are existing vulnerability exceptions"): + with before.each: + self.created_exception_cve = "CVE-2020-1234" + ok, res = self.client.add_vulnerability_exception(bundle=self.created_exception_bundle, + cve=self.created_exception_cve) + expect((ok, res)).to(be_successful_api_call) + self.created_exception = res["id"] + + with it("is able to list all the vulnerability exceptions from a bundle"): + ok, res = self.client.get_vulnerability_exception_bundle(bundle=self.created_exception_bundle) + + expect((ok, res)).to(be_successful_api_call) + expect(res).to( + have_keys(id=equal(self.created_exception_bundle), + items=contain( + have_keys( + id=equal(self.created_exception), + gate=equal("vulnerabilities"), + trigger_id=equal(self.created_exception_cve), + enabled=be_true, + ) + )) + ) + + with it("is able to remove them"): + _, ex_before = self.client.get_vulnerability_exception_bundle(bundle=self.created_exception_bundle) + ok, res = self.client.delete_vulnerability_exception(bundle=self.created_exception_bundle, + id=self.created_exception) + _, ex_after = self.client.get_vulnerability_exception_bundle(bundle=self.created_exception_bundle) + + expect((ok, res)).to(be_successful_api_call) + expect(ex_before).to( + have_key("items", contain( + have_keys( + id=equal(self.created_exception), + gate=equal("vulnerabilities"), + trigger_id=equal(self.created_exception_cve), + enabled=be_true, + ) + )) + ) + expect(ex_after).to( + have_key("items", not_(contain( + have_keys( + id=equal(self.created_exception), + gate=equal("vulnerabilities"), + trigger_id=equal(self.created_exception_cve), + enabled=be_true, + ) + ))) + ) + + with it("is able to update them"): + _, ex_before = self.client.get_vulnerability_exception_bundle(bundle=self.created_exception_bundle) + + ok, res = self.client.update_vulnerability_exception(bundle=self.created_exception_bundle, + id=self.created_exception, + cve="CVE-2020-1235", + enabled=False, + note="Dummy note", + expiration_date=datetime.datetime(2030, 12, 31) + .timestamp()) + + _, ex_after = self.client.get_vulnerability_exception_bundle(bundle=self.created_exception_bundle) + + expect((ok, res)).to(be_successful_api_call) + + expect(ex_before).to( + have_key("items", contain( + have_keys( + id=equal(self.created_exception), + gate=equal("vulnerabilities"), + trigger_id=equal(self.created_exception_cve), + notes=equal(None), + expiration_date=equal(None), + enabled=be_true, + ) + )) + ) + + expect(ex_after).to( + have_key("items", contain( + have_keys( + id=equal(self.created_exception), + gate=equal("vulnerabilities"), + trigger_id=equal("CVE-2020-1235"), + notes=equal("Dummy note"), + expiration_date=be_above(0), + enabled=be_false, + ) + )) + ) diff --git a/specs/secure/scanning_vulnerability_spec.py b/specs/secure/scanning_vulnerability_spec.py index 9a0f2d27..d7a3fe54 100644 --- a/specs/secure/scanning_vulnerability_spec.py +++ b/specs/secure/scanning_vulnerability_spec.py @@ -1,8 +1,7 @@ import os -from expects import * -from expects import equal -from mamba import * +from expects import equal, expect, have_keys +from mamba import before, context, description, it from sdcclient import SdScanningClient from specs import be_successful_api_call