diff --git a/tests/apitests/python/library/base.py b/tests/apitests/python/library/base.py index d1ed51414..b4caf9714 100644 --- a/tests/apitests/python/library/base.py +++ b/tests/apitests/python/library/base.py @@ -31,7 +31,7 @@ def _create_client(server, credential, debug, api_type="products"): cfg = None if api_type in ('projectv2', 'artifact', 'repository', 'scanner', 'scan', 'scanall', 'preheat', 'quota', 'replication', 'registry', 'robot', 'gc', 'retention', 'immutable', 'system_cve_allowlist', - 'configure', 'user', 'member', 'health', 'label', 'webhook', 'purge', 'audit_log'): + 'configure', 'user', 'member', 'health', 'label', 'webhook', 'purge', 'audit_log', 'scan_data_export'): cfg = v2_swagger_client.Configuration() else: cfg = swagger_client.Configuration() @@ -78,7 +78,8 @@ def _create_client(server, credential, debug, api_type="products"): "health": v2_swagger_client.HealthApi(v2_swagger_client.ApiClient(cfg)), "webhook": v2_swagger_client.WebhookApi(v2_swagger_client.ApiClient(cfg)), "purge": v2_swagger_client.PurgeApi(v2_swagger_client.ApiClient(cfg)), - "audit_log": v2_swagger_client.AuditlogApi(v2_swagger_client.ApiClient(cfg)) + "audit_log": v2_swagger_client.AuditlogApi(v2_swagger_client.ApiClient(cfg)), + "scan_data_export": v2_swagger_client.ScanDataExportApi(v2_swagger_client.ApiClient(cfg)) }.get(api_type,'Error: Wrong API type') def _assert_status_code(expect_code, return_code, err_msg = r"HTTPS status code s not as we expected. Expected {}, while actual HTTPS status code is {}."): diff --git a/tests/apitests/python/library/scan_data_export.py b/tests/apitests/python/library/scan_data_export.py new file mode 100644 index 000000000..8749c4624 --- /dev/null +++ b/tests/apitests/python/library/scan_data_export.py @@ -0,0 +1,56 @@ +# -*- coding: utf-8 -*- + +import base +import v2_swagger_client +from v2_swagger_client.rest import ApiException + + +class Scan_data_export(base.Base): + + def __init__(self): + super(Scan_data_export, self).__init__(api_type="scan_data_export") + + def get_scan_data_export_execution_list(self, expect_status_code=200, expect_response_body=None, **kwargs): + try: + return_data, status_code, _ = self._get_client(**kwargs).get_scan_data_export_execution_list_with_http_info() + except ApiException as e: + base._assert_status_code(expect_status_code, e.status) + if expect_response_body is not None: + base._assert_status_body(expect_response_body, e.body) + return + base._assert_status_code(expect_status_code, status_code) + return return_data + + def get_scan_data_export_execution(self, execution_id, expect_status_code=200, expect_response_body=None, **kwargs): + try: + return_data, status_code, _ = self._get_client(**kwargs).get_scan_data_export_execution_with_http_info(execution_id) + except ApiException as e: + base._assert_status_code(expect_status_code, e.status) + if expect_response_body is not None: + base._assert_status_body(expect_response_body, e.body) + return + base._assert_status_code(expect_status_code, status_code) + return return_data + + def export_scan_data(self, x_scan_data_type, projects, labels=None, repositories=None, cve_ids=None, tags=None, expect_status_code=200, expect_response_body=None, **kwargs): + criteria = v2_swagger_client.ScanDataExportRequest(projects=projects, labels=labels, repositories=repositories, cve_ids=cve_ids, tags=tags) + try: + return_data, status_code, _ = self._get_client(**kwargs).export_scan_data_with_http_info(x_scan_data_type, criteria) + except ApiException as e: + base._assert_status_code(expect_status_code, e.status) + if expect_response_body is not None: + base._assert_status_body(expect_response_body, e.body) + return + base._assert_status_code(expect_status_code, status_code) + return return_data + + def download_scan_data(self, execution_id, expect_status_code=200, expect_response_body=None, **kwargs): + try: + return_data, status_code, _ = self._get_client(**kwargs).download_scan_data_with_http_info(execution_id) + except ApiException as e: + base._assert_status_code(expect_status_code, e.status) + if expect_response_body is not None: + base._assert_status_body(expect_response_body, e.body) + return + base._assert_status_code(expect_status_code, status_code) + return return_data \ No newline at end of file diff --git a/tests/apitests/python/test_scan_data_export.py b/tests/apitests/python/test_scan_data_export.py new file mode 100644 index 000000000..b0b41a068 --- /dev/null +++ b/tests/apitests/python/test_scan_data_export.py @@ -0,0 +1,115 @@ +# -*- coding: utf-8 -*- + +from __future__ import absolute_import +import time + +import unittest + +from testutils import harbor_server, suppress_urllib3_warning +from testutils import ADMIN_CLIENT +from library.scan_data_export import Scan_data_export +from library.project import Project +from library.user import User +from library.artifact import Artifact +from library.scan import Scan +from library.repository import push_self_build_image_to_project + +class TestScanDataExport(unittest.TestCase): + + @suppress_urllib3_warning + def setUp(self): + self.scan_data_export = Scan_data_export() + self.project = Project() + self.user = User() + self.scan = Scan() + self.artifact = Artifact() + self.image = "alpine" + self.tag = "latest" + self.x_scan_data_type = "application/vnd.security.vulnerability.report; version=1.1" + + def testScanDataExportArtifact(self): + """ + Test case: + Scan Data Export API + Test step and expected result: + 1. Create a new user(UA); + 2. Create a new project(PA) by user(UA); + 3. Push a new image(IA) in project(PA) by user(UA); + 4. Send scan image command and get tag(TA) information to check scan result, it should be finished; + 5. Verify trigger export scan data execution but does not specify Scan-Data-Type status code should be 422; + 6. Verify trigger export scan data execution but specifying multiple project status code should be 400; + 7. Trigger export scan data execution correctly; + 8. Verify that the export scan data execution triggered by the user(UA) cannot be queried by other users; + 9. User (UA) should be able to query the triggered export scan data execution; + 10. Wait for the export scan data execution to succeed; + 11. Verify that the export scan data execution triggered by the user (UA) cannot be download by other users; + 12. User (UA) should be able to download the triggered export scan data execution + 13. Verify that the downloaded export scan data execution cannot be downloaded again + """ + url = ADMIN_CLIENT["endpoint"] + user_password = "Aa123456" + + # 1. Create user(UA) + user_id, user_name = self.user.create_user(user_password = user_password, **ADMIN_CLIENT) + user_client = dict(endpoint = url, username = user_name, password = user_password) + + # 2.1. Create private project(PA) by user(UA) + project_id, project_name = self.project.create_project(metadata = {"public": "false"}, **user_client) + # 2.2. Get private project of uesr-001, uesr-001 can see only one private project which is project-001 + self.project.projects_should_exist(dict(public=False), expected_count = 1, expected_project_id = project_id, **user_client) + + # 3. Push a new image(IA) in project(PA) by user(UA) + push_self_build_image_to_project(project_name, harbor_server, user_name, user_password, self.image, self.tag) + + # 4. Send scan image command and get tag(TA) information to check scan result, it should be finished + self.scan.scan_artifact(project_name, self.image, self.tag, **user_client) + self.artifact.check_image_scan_result(project_name, self.image, self.tag, with_scan_overview = True, **user_client) + + # 5. Verify trigger export scan data execution but does not specify Scan-Data-Type status code should be 422 + self.scan_data_export.export_scan_data("", projects=[project_id], expect_status_code=422, expect_response_body="X-Scan-Data-Type in header is required") + + # 6. Verify trigger export scan data execution but specifying multiple project status code should be 400 + self.scan_data_export.export_scan_data(self.x_scan_data_type, projects=[1, project_id], expect_status_code=400, expect_response_body="bad request: only support export single project") + + # 7. Trigger export scan data execution correctly + execution_id = self.scan_data_export.export_scan_data(self.x_scan_data_type, projects=[project_id], **user_client).id + print("execution_id:", execution_id) + + # 8.1. Verify that the export scan data execution triggered by the user(UA) cannot be queried by other users by get scan data export execution list API + execution_list = self.scan_data_export.get_scan_data_export_execution_list() + if not execution_list: + self.assertNotEqual(execution_id, execution_list.items[0].id) + self.assertEqual(ADMIN_CLIENT["username"], execution_list.items[0].user_name) + + # 8.2. Verify that the export scan data execution triggered by the user(UA) cannot be queried by other users by get scan_data export execution API + self.scan_data_export.get_scan_data_export_execution(execution_id, expect_status_code=403, expect_response_body="FORBIDDEN") + + # 9. User (UA) should be able to query the triggered export scan data execution + execution_list = self.scan_data_export.get_scan_data_export_execution_list(**user_client) + self.assertEqual(execution_id, execution_list.items[0].id) + self.assertEqual(user_name, execution_list.items[0].user_name) + + # 10. Wait for the export scan data execution to succeed + executio_status = None + for i in range(5): + print("wait for the job to finish:", i) + execution = self.scan_data_export.get_scan_data_export_execution(execution_id, **user_client) + executio_status = execution.status + if executio_status == "Success": + self.assertEqual(user_name, execution.user_name) + self.assertEqual(user_id, execution.user_id) + break + time.sleep(2) + self.assertEqual(executio_status, "Success") + + # 11. Verify that the export scan data execution triggered by the user (UA) cannot be download by other users + self.scan_data_export.download_scan_data(execution_id, expect_status_code=403) + + # 12. User (UA) should be able to download the triggered export scan data execution + self.scan_data_export.download_scan_data(execution_id, **user_client) + + # 13. Verify that the downloaded export scan data execution cannot be downloaded again + self.scan_data_export.download_scan_data(execution_id, expect_status_code=404, **user_client) + +if __name__ == '__main__': + unittest.main() \ No newline at end of file diff --git a/tests/robot-cases/Group0-BAT/API_DB.robot b/tests/robot-cases/Group0-BAT/API_DB.robot index cb7db9895..d7a7b1c66 100644 --- a/tests/robot-cases/Group0-BAT/API_DB.robot +++ b/tests/robot-cases/Group0-BAT/API_DB.robot @@ -187,4 +187,8 @@ Test Case - Log Rotation Test Case - Log Forward [Tags] log_forward - Harbor API Test ./tests/apitests/python/test_audit_log_forward.py \ No newline at end of file + Harbor API Test ./tests/apitests/python/test_audit_log_forward.py + +Test Case - Scan Data Export + [Tags] scan_data_export + Harbor API Test ./tests/apitests/python/test_scan_data_export.py \ No newline at end of file