diff --git a/tests/apitests/python/library/base.py b/tests/apitests/python/library/base.py index 0b5496441..847f0e5f7 100644 --- a/tests/apitests/python/library/base.py +++ b/tests/apitests/python/library/base.py @@ -73,6 +73,7 @@ def _create_client(server, credential, debug, api_type): "system_info": v2_swagger_client.SysteminfoApi(v2_swagger_client.ApiClient(cfg)), "jobservice": v2_swagger_client.JobserviceApi(v2_swagger_client.ApiClient(cfg)), "schedule": v2_swagger_client.ScheduleApi(v2_swagger_client.ApiClient(cfg)), + "securityhub": v2_swagger_client.SecurityhubApi(v2_swagger_client.ApiClient(cfg)), }.get(api_type,'Error: Wrong API type') def _assert_status_code(expect_code, return_code, err_msg = r"HTTPS status code s not as we expected. Expected {}, while actual HTTPS status code is {}."): diff --git a/tests/apitests/python/library/repository.py b/tests/apitests/python/library/repository.py index a0c8f081a..21dea33ee 100644 --- a/tests/apitests/python/library/repository.py +++ b/tests/apitests/python/library/repository.py @@ -17,10 +17,8 @@ def pull_harbor_image(registry, username, password, image, tag, expected_login_e _docker_api.docker_image_pull(r'{}/{}'.format(registry, image), tag = tag, expected_error_message = expected_error_message) def push_image_to_project(project_name, registry, username, password, image, tag, expected_login_error_message = None, expected_error_message = None, profix_for_image = None, new_image=None): - print("Start to push image {}/{}/{}:{}".format(registry, project_name, image, tag) ) _docker_api = DockerAPI() - _docker_api.docker_login("docker", DOCKER_USER, DOCKER_PWD) - _docker_api.docker_image_pull(image, tag = tag, is_clean_all_img = False) + _docker_api.docker_image_pull(image, tag, is_clean_all_img = False) _docker_api.docker_login(registry, username, password, expected_error_message = expected_login_error_message) time.sleep(2) if expected_login_error_message != None: diff --git a/tests/apitests/python/library/securityhub.py b/tests/apitests/python/library/securityhub.py new file mode 100644 index 000000000..649ccb71c --- /dev/null +++ b/tests/apitests/python/library/securityhub.py @@ -0,0 +1,40 @@ +# -*- coding: utf-8 -*- + +import base +from v2_swagger_client.rest import ApiException +from urllib.parse import quote + + +class SecurityHub(base.Base): + + + def __init__(self): + super(SecurityHub, self).__init__(api_type="securityhub") + + + def get_security_summary(self, with_dangerous_cve=True, with_dangerous_artifact=True, expect_status_code=200, expect_response_body=None, **kwargs): + try: + return_data, status_code, _ = self._get_client(**kwargs).get_security_summary_with_http_info(with_dangerous_cve=with_dangerous_cve, with_dangerous_artifact=with_dangerous_artifact) + except ApiException as e: + base._assert_status_code(expect_status_code, e.status) + if expect_response_body is not None: + base._assert_status_body(expect_response_body, e.body) + return + base._assert_status_code(expect_status_code, status_code) + return return_data + + + def list_vulnerabilities(self, q=None, tune_count=True, with_tag=True, page=1, page_size=10, expect_status_code=200, expect_response_body=None, **kwargs): + try: + if q is not None: + q = quote(q) + return_data, status_code, _ = self._get_client(**kwargs).list_vulnerabilities_with_http_info(q=q, tune_count=tune_count, with_tag=with_tag, page=page, page_size=page_size) + else: + return_data, status_code, _ = self._get_client(**kwargs).list_vulnerabilities_with_http_info(tune_count=tune_count, with_tag=with_tag, page=page, page_size=page_size) + except ApiException as e: + base._assert_status_code(expect_status_code, e.status) + if expect_response_body is not None: + base._assert_status_body(expect_response_body, e.body) + return + base._assert_status_code(expect_status_code, status_code) + return return_data diff --git a/tests/apitests/python/test_security_hub.py b/tests/apitests/python/test_security_hub.py new file mode 100644 index 000000000..432d3b4f8 --- /dev/null +++ b/tests/apitests/python/test_security_hub.py @@ -0,0 +1,187 @@ +import unittest + +from testutils import harbor_server, ADMIN_CLIENT, suppress_urllib3_warning +from library.project import Project +from library.user import User +from library.repository import push_image_to_project +from library.scan import Scan +from library.artifact import Artifact +from library.securityhub import SecurityHub + +class TestSecurityHub(unittest.TestCase): + + + @suppress_urllib3_warning + def setUp(self): + self.project = Project() + self.user = User() + self.scan = Scan() + self.artifact = Artifact() + self.securityhub = SecurityHub() + self.image = "ghcr.io/goharbor/notary-server-photon" + self.new_image = "notary-server-photon" + self.tag = "v2.2.0" + self.digest = "sha256:379bf2c7cd55b4214ced7f9a885b46f81992eb01abebfd67693f5cb394611ad1" + + + def testSecurityHub(self): + """ + Test case: + Security Hub + Test step and expected result: + 1. Create a new user(UA); + 2. Create a new project(PA) by user(UA); + 3. Push a new image(IA) in project(PA) by user(UA); + 4. Send scan image command and get tag(TA) information to check scan result, it should be finished; + 5. Get vulnerability system summary; + 6. Check the vulnerability system summary is correct; + 7. Check the Get the vulnerability list API; + 8. Check Get Vulnerability List API Search by severity; + 9. Check Get Vulnerability List API Search by project name; + 10. Check Get Vulnerability List API Search by repository name; + 11. Check Get Vulnerability List API Search by tag; + 12. Check Get Vulnerability List API Search by digest; + 13. Check Get Vulnerability List API Search by package; + 14. Check Get Vulnerability List API Search by cve_id; + 15. Check Get Vulnerability List API Search by cvss3; + 16. Check Get Vulnerability List API Search by all options; + """ + url = ADMIN_CLIENT["endpoint"] + user_password = "Aa123456" + + # 1. Create user(UA) + _, user_name = self.user.create_user(user_password=user_password, **ADMIN_CLIENT) + user_client = dict(endpoint=url, username=user_name, password=user_password, with_scan_overview = True) + + # 2. Create private project(PA) by user(UA) + project_id, project_name = self.project.create_project(metadata={"public": "false"}, **user_client) + + # 3. Push a new image(IA) in project(PA) by user(UA) + repository_name, tag = push_image_to_project(project_name, harbor_server, user_name, user_password, self.image, self.tag, new_image=self.new_image) + + # 4. Send scan image command and get tag(TA) information to check scan result, it should be finished + self.scan.scan_artifact(project_name, self.new_image, tag, **user_client) + self.artifact.check_image_scan_result(project_name, self.new_image, tag, **user_client) + + # 5. Get vulnerability system summary + security_summary = self.securityhub.get_security_summary(**ADMIN_CLIENT) + + # 6. Check the vulnerability system summary is correct + self.check_security_summary(security_summary, repository_name) + + # 7. Check the Get the vulnerability list API + vulnerabilities = self.securityhub.list_vulnerabilities(**ADMIN_CLIENT) + self.check_vulnerabilities(vulnerabilities) + + # 8. Check Get Vulnerability List API Search by severity + vulnerabilities = self.securityhub.list_vulnerabilities(q="severity=Critical", **ADMIN_CLIENT) + self.check_vulnerabilities(vulnerabilities, severity="Critical") + vulnerabilities = self.securityhub.list_vulnerabilities(q="severity=High", **ADMIN_CLIENT) + self.check_vulnerabilities(vulnerabilities, severity="High") + vulnerabilities = self.securityhub.list_vulnerabilities(q="severity=Medium", **ADMIN_CLIENT) + self.check_vulnerabilities(vulnerabilities, severity="Medium") + vulnerabilities = self.securityhub.list_vulnerabilities(q="severity=Low", **ADMIN_CLIENT) + self.check_vulnerabilities(vulnerabilities, severity="Low") + + # 9. Check Get Vulnerability List API Search by project name + vulnerabilities = self.securityhub.list_vulnerabilities(q="project_id=%s" % project_id, **ADMIN_CLIENT) + self.check_vulnerabilities(vulnerabilities, project_id=project_id) + vulnerability = vulnerabilities[0] + + # 10. Check Get Vulnerability List API Search by repository name + vulnerabilities = self.securityhub.list_vulnerabilities(q="repository_name=%s" % repository_name, **ADMIN_CLIENT) + self.check_vulnerabilities(vulnerabilities, repository_name=repository_name) + + # 11. Check Get Vulnerability List API Search by tag + vulnerabilities = self.securityhub.list_vulnerabilities(q="tag=%s" % tag, **ADMIN_CLIENT) + self.check_vulnerabilities(vulnerabilities, tag=tag) + + # 12. Check Get Vulnerability List API Search by digest + vulnerabilities = self.securityhub.list_vulnerabilities(q="digest=%s" % self.digest, **ADMIN_CLIENT) + self.check_vulnerabilities(vulnerabilities, digest=self.digest) + + # 13. Check Get Vulnerability List API Search by package + vulnerabilities = self.securityhub.list_vulnerabilities(q="package=%s" % vulnerability.package, **ADMIN_CLIENT) + self.check_vulnerabilities(vulnerabilities, package=vulnerability.package) + + # 14. Check Get Vulnerability List API Search by cve_id + vulnerabilities = self.securityhub.list_vulnerabilities(q="cve_id=%s" % vulnerability.cve_id, **ADMIN_CLIENT) + self.check_vulnerabilities(vulnerabilities, cve_id=vulnerability.cve_id) + + # 15. Check Get Vulnerability List API Search by cvss3 + vulnerabilities = self.securityhub.list_vulnerabilities(q="cvss_score_v3=[%s~%s]" % (vulnerability.cvss_v3_score, vulnerability.cvss_v3_score), **ADMIN_CLIENT) + self.check_vulnerabilities(vulnerabilities, cvss3_from=vulnerability.cvss_v3_score, cvss3_to=vulnerability.cvss_v3_score) + + # 16. Check Get Vulnerability List API Search by all options + vulnerabilities = self.securityhub.list_vulnerabilities(q="severity=%s,project_id=%s,repository_name=%s,tag=%s,digest=%s,package=%s,cve_id=%s,cvss_score_v3=[%s~%s]" % + (vulnerability.severity, vulnerability.project_id, vulnerability.repository_name, vulnerability.tags[0], vulnerability.digest, vulnerability.package, vulnerability.cve_id, vulnerability.cvss_v3_score, vulnerability.cvss_v3_score), **ADMIN_CLIENT) + self.check_vulnerabilities(vulnerabilities, severity=vulnerability.severity, project_id=vulnerability.project_id, repository_name=vulnerability.repository_name, tag=vulnerability.tags[0], digest=vulnerability.digest, package=vulnerability.package, cve_id=vulnerability.cve_id, cvss3_from=vulnerability.cvss_v3_score, cvss3_to=vulnerability.cvss_v3_score) + + def check_security_summary(self, security_summary, repository_name): + # Check the summary is correct + self.assertTrue(security_summary.critical_cnt > 0) + self.assertTrue(security_summary.high_cnt > 0) + self.assertTrue(security_summary.medium_cnt > 0) + self.assertTrue(security_summary.low_cnt > 0) + self.assertTrue(security_summary.fixable_cnt > 0) + self.assertTrue(security_summary.scanned_cnt > 0) + self.assertTrue(security_summary.total_artifact > 0) + self.assertTrue(security_summary.total_vuls > 0) + # Check the dangerous artifacts is correct + dangerous_artifacts = security_summary.dangerous_artifacts + self.assertTrue(0 < len(dangerous_artifacts) <= 5) + artifact_sorted_list = sorted(dangerous_artifacts, key=lambda artifact: (artifact.critical_cnt, artifact.high_cnt, artifact.medium_cnt), reverse=True) + self.assertListEqual(dangerous_artifacts, artifact_sorted_list) + dangerous_artifacts_dict = {item.repository_name: item for item in dangerous_artifacts} + self.assertIn(repository_name, dangerous_artifacts_dict.keys()) + for key, value in dangerous_artifacts_dict.items(): + if key == repository_name: + self.assertEqual(value.digest, self.digest) + self.assertTrue(value.critical_cnt > 0) + self.assertTrue(value.high_cnt > 0) + self.assertTrue(value.medium_cnt > 0) + break + # Check the dangerous cves is correct + dangerous_cves = security_summary.dangerous_cves + self.assertTrue(0 < len(dangerous_cves) <= 5) + cve_sorted_list = sorted(dangerous_cves, key=lambda cve: cve.cvss_score_v3) + self.assertListEqual(dangerous_cves, cve_sorted_list) + + + def check_vulnerabilities(self, vulnerabilities, severity=None, cve_id=None, cvss3_from=0.0, cvss3_to=10.0, project_id=None, repository_name=None, package=None, tag=None, digest=None): + for vulnerability in vulnerabilities: + self.assertEqual(vulnerability.severity, severity) if severity is not None else self.assertIsNotNone(vulnerability.severity) + self.assertEqual(vulnerability.cve_id, cve_id) if cve_id is not None else self.assertIsNotNone(vulnerability.cve_id) + self.assertEqual(vulnerability.project_id, project_id) if project_id is not None else self.assertIsNotNone(vulnerability.project_id) + self.assertEqual(vulnerability.repository_name, repository_name) if repository_name is not None else self.assertIsNotNone(vulnerability.repository_name) + self.assertEqual(vulnerability.package, package) if package is not None else self.assertIsNotNone(vulnerability.package) + self.assertEqual(vulnerability.tags[0], tag) if tag is not None else self.assertIsNotNone(vulnerability.tags[0]) + self.assertEqual(vulnerability.digest, digest) if digest is not None else self.assertIsNotNone(vulnerability.digest) + self.assertTrue(cvss3_from <= vulnerability.cvss_v3_score <= cvss3_to) + self.assertIsNotNone(vulnerability.desc) + self.assertIsNotNone(vulnerability.links) + self.assertIsNotNone(vulnerability.version) + + + def testSecurityHubAPIValidate(self): + """ + Test case: + Log Rotaion Permission API + Test step and expected result: + 1. Create a new user(UA); + 2. User(UA) should not have permission to call get vulnerability system summary API; + 3. User(UA) should not have permission to call get Vulnerability List API; + """ + url = ADMIN_CLIENT["endpoint"] + user_password = "Aa123456" + # 1. Create user(UA) + _, user_name = self.user.create_user(user_password=user_password, **ADMIN_CLIENT) + user_client = dict(endpoint=url, username=user_name, password=user_password) + # 2. User(UA) should not have permission to call get vulnerability system summary API + self.securityhub.get_security_summary(expect_status_code=403, expect_response_body="FORBIDDEN", **user_client) + # 3. User(UA) should not have permission to call get Vulnerability List API + self.securityhub.list_vulnerabilities(expect_status_code=403, expect_response_body="FORBIDDEN", **user_client) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/robot-cases/Group0-BAT/API_DB.robot b/tests/robot-cases/Group0-BAT/API_DB.robot index c94bbfd9b..cf419d66d 100644 --- a/tests/robot-cases/Group0-BAT/API_DB.robot +++ b/tests/robot-cases/Group0-BAT/API_DB.robot @@ -190,3 +190,7 @@ Test Case - Referrers API Test Case - Podman Pull And Push To Harbor [Tags] podman_pull_push Harbor API Test ./tests/apitests/python/test_podman_pull_push.py + +Test Case - Security Hub + [Tags] security_hub + Harbor API Test ./tests/apitests/python/test_security_hub.py