Add scan data export API test case (#17603)

1. Add export scan data API test case
2. Add get scan data export execution list test case
3. Add get scan data export execution test case
4. Add download scan data test case

Signed-off-by: Yang Jiao <jiaoya@vmware.com>

Signed-off-by: Yang Jiao <jiaoya@vmware.com>
This commit is contained in:
Yang Jiao 2022-10-17 13:25:03 +08:00 committed by GitHub
parent 159169227b
commit 7bbefca8c2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 179 additions and 3 deletions

View File

@ -31,7 +31,7 @@ def _create_client(server, credential, debug, api_type="products"):
cfg = None
if api_type in ('projectv2', 'artifact', 'repository', 'scanner', 'scan', 'scanall', 'preheat', 'quota',
'replication', 'registry', 'robot', 'gc', 'retention', 'immutable', 'system_cve_allowlist',
'configure', 'user', 'member', 'health', 'label', 'webhook', 'purge', 'audit_log'):
'configure', 'user', 'member', 'health', 'label', 'webhook', 'purge', 'audit_log', 'scan_data_export'):
cfg = v2_swagger_client.Configuration()
else:
cfg = swagger_client.Configuration()
@ -78,7 +78,8 @@ def _create_client(server, credential, debug, api_type="products"):
"health": v2_swagger_client.HealthApi(v2_swagger_client.ApiClient(cfg)),
"webhook": v2_swagger_client.WebhookApi(v2_swagger_client.ApiClient(cfg)),
"purge": v2_swagger_client.PurgeApi(v2_swagger_client.ApiClient(cfg)),
"audit_log": v2_swagger_client.AuditlogApi(v2_swagger_client.ApiClient(cfg))
"audit_log": v2_swagger_client.AuditlogApi(v2_swagger_client.ApiClient(cfg)),
"scan_data_export": v2_swagger_client.ScanDataExportApi(v2_swagger_client.ApiClient(cfg))
}.get(api_type,'Error: Wrong API type')
def _assert_status_code(expect_code, return_code, err_msg = r"HTTPS status code s not as we expected. Expected {}, while actual HTTPS status code is {}."):

View File

@ -0,0 +1,56 @@
# -*- coding: utf-8 -*-
import base
import v2_swagger_client
from v2_swagger_client.rest import ApiException
class Scan_data_export(base.Base):
def __init__(self):
super(Scan_data_export, self).__init__(api_type="scan_data_export")
def get_scan_data_export_execution_list(self, expect_status_code=200, expect_response_body=None, **kwargs):
try:
return_data, status_code, _ = self._get_client(**kwargs).get_scan_data_export_execution_list_with_http_info()
except ApiException as e:
base._assert_status_code(expect_status_code, e.status)
if expect_response_body is not None:
base._assert_status_body(expect_response_body, e.body)
return
base._assert_status_code(expect_status_code, status_code)
return return_data
def get_scan_data_export_execution(self, execution_id, expect_status_code=200, expect_response_body=None, **kwargs):
try:
return_data, status_code, _ = self._get_client(**kwargs).get_scan_data_export_execution_with_http_info(execution_id)
except ApiException as e:
base._assert_status_code(expect_status_code, e.status)
if expect_response_body is not None:
base._assert_status_body(expect_response_body, e.body)
return
base._assert_status_code(expect_status_code, status_code)
return return_data
def export_scan_data(self, x_scan_data_type, projects, labels=None, repositories=None, cve_ids=None, tags=None, expect_status_code=200, expect_response_body=None, **kwargs):
criteria = v2_swagger_client.ScanDataExportRequest(projects=projects, labels=labels, repositories=repositories, cve_ids=cve_ids, tags=tags)
try:
return_data, status_code, _ = self._get_client(**kwargs).export_scan_data_with_http_info(x_scan_data_type, criteria)
except ApiException as e:
base._assert_status_code(expect_status_code, e.status)
if expect_response_body is not None:
base._assert_status_body(expect_response_body, e.body)
return
base._assert_status_code(expect_status_code, status_code)
return return_data
def download_scan_data(self, execution_id, expect_status_code=200, expect_response_body=None, **kwargs):
try:
return_data, status_code, _ = self._get_client(**kwargs).download_scan_data_with_http_info(execution_id)
except ApiException as e:
base._assert_status_code(expect_status_code, e.status)
if expect_response_body is not None:
base._assert_status_body(expect_response_body, e.body)
return
base._assert_status_code(expect_status_code, status_code)
return return_data

View File

@ -0,0 +1,115 @@
# -*- coding: utf-8 -*-
from __future__ import absolute_import
import time
import unittest
from testutils import harbor_server, suppress_urllib3_warning
from testutils import ADMIN_CLIENT
from library.scan_data_export import Scan_data_export
from library.project import Project
from library.user import User
from library.artifact import Artifact
from library.scan import Scan
from library.repository import push_self_build_image_to_project
class TestScanDataExport(unittest.TestCase):
@suppress_urllib3_warning
def setUp(self):
self.scan_data_export = Scan_data_export()
self.project = Project()
self.user = User()
self.scan = Scan()
self.artifact = Artifact()
self.image = "alpine"
self.tag = "latest"
self.x_scan_data_type = "application/vnd.security.vulnerability.report; version=1.1"
def testScanDataExportArtifact(self):
"""
Test case:
Scan Data Export API
Test step and expected result:
1. Create a new user(UA);
2. Create a new project(PA) by user(UA);
3. Push a new image(IA) in project(PA) by user(UA);
4. Send scan image command and get tag(TA) information to check scan result, it should be finished;
5. Verify trigger export scan data execution but does not specify Scan-Data-Type status code should be 422;
6. Verify trigger export scan data execution but specifying multiple project status code should be 400;
7. Trigger export scan data execution correctly;
8. Verify that the export scan data execution triggered by the user(UA) cannot be queried by other users;
9. User (UA) should be able to query the triggered export scan data execution;
10. Wait for the export scan data execution to succeed;
11. Verify that the export scan data execution triggered by the user (UA) cannot be download by other users;
12. User (UA) should be able to download the triggered export scan data execution
13. Verify that the downloaded export scan data execution cannot be downloaded again
"""
url = ADMIN_CLIENT["endpoint"]
user_password = "Aa123456"
# 1. Create user(UA)
user_id, user_name = self.user.create_user(user_password = user_password, **ADMIN_CLIENT)
user_client = dict(endpoint = url, username = user_name, password = user_password)
# 2.1. Create private project(PA) by user(UA)
project_id, project_name = self.project.create_project(metadata = {"public": "false"}, **user_client)
# 2.2. Get private project of uesr-001, uesr-001 can see only one private project which is project-001
self.project.projects_should_exist(dict(public=False), expected_count = 1, expected_project_id = project_id, **user_client)
# 3. Push a new image(IA) in project(PA) by user(UA)
push_self_build_image_to_project(project_name, harbor_server, user_name, user_password, self.image, self.tag)
# 4. Send scan image command and get tag(TA) information to check scan result, it should be finished
self.scan.scan_artifact(project_name, self.image, self.tag, **user_client)
self.artifact.check_image_scan_result(project_name, self.image, self.tag, with_scan_overview = True, **user_client)
# 5. Verify trigger export scan data execution but does not specify Scan-Data-Type status code should be 422
self.scan_data_export.export_scan_data("", projects=[project_id], expect_status_code=422, expect_response_body="X-Scan-Data-Type in header is required")
# 6. Verify trigger export scan data execution but specifying multiple project status code should be 400
self.scan_data_export.export_scan_data(self.x_scan_data_type, projects=[1, project_id], expect_status_code=400, expect_response_body="bad request: only support export single project")
# 7. Trigger export scan data execution correctly
execution_id = self.scan_data_export.export_scan_data(self.x_scan_data_type, projects=[project_id], **user_client).id
print("execution_id:", execution_id)
# 8.1. Verify that the export scan data execution triggered by the user(UA) cannot be queried by other users by get scan data export execution list API
execution_list = self.scan_data_export.get_scan_data_export_execution_list()
if not execution_list:
self.assertNotEqual(execution_id, execution_list.items[0].id)
self.assertEqual(ADMIN_CLIENT["username"], execution_list.items[0].user_name)
# 8.2. Verify that the export scan data execution triggered by the user(UA) cannot be queried by other users by get scan_data export execution API
self.scan_data_export.get_scan_data_export_execution(execution_id, expect_status_code=403, expect_response_body="FORBIDDEN")
# 9. User (UA) should be able to query the triggered export scan data execution
execution_list = self.scan_data_export.get_scan_data_export_execution_list(**user_client)
self.assertEqual(execution_id, execution_list.items[0].id)
self.assertEqual(user_name, execution_list.items[0].user_name)
# 10. Wait for the export scan data execution to succeed
executio_status = None
for i in range(5):
print("wait for the job to finish:", i)
execution = self.scan_data_export.get_scan_data_export_execution(execution_id, **user_client)
executio_status = execution.status
if executio_status == "Success":
self.assertEqual(user_name, execution.user_name)
self.assertEqual(user_id, execution.user_id)
break
time.sleep(2)
self.assertEqual(executio_status, "Success")
# 11. Verify that the export scan data execution triggered by the user (UA) cannot be download by other users
self.scan_data_export.download_scan_data(execution_id, expect_status_code=403)
# 12. User (UA) should be able to download the triggered export scan data execution
self.scan_data_export.download_scan_data(execution_id, **user_client)
# 13. Verify that the downloaded export scan data execution cannot be downloaded again
self.scan_data_export.download_scan_data(execution_id, expect_status_code=404, **user_client)
if __name__ == '__main__':
unittest.main()

View File

@ -187,4 +187,8 @@ Test Case - Log Rotation
Test Case - Log Forward
[Tags] log_forward
Harbor API Test ./tests/apitests/python/test_audit_log_forward.py
Harbor API Test ./tests/apitests/python/test_audit_log_forward.py
Test Case - Scan Data Export
[Tags] scan_data_export
Harbor API Test ./tests/apitests/python/test_scan_data_export.py