mirror of
https://github.com/goharbor/harbor.git
synced 2024-11-17 07:45:24 +01:00
Merge pull request #8656 from reasonerjt/cve_whitelist_apitest
API test for system level CVE whitelist
This commit is contained in:
commit
032d57d8b2
@ -168,3 +168,17 @@ class System(base.Base):
|
|||||||
if deleted_files_count == 0:
|
if deleted_files_count == 0:
|
||||||
raise Exception(r"Get blobs eligible for deletion count is {}, while we expect more than 1.".format(deleted_files_count))
|
raise Exception(r"Get blobs eligible for deletion count is {}, while we expect more than 1.".format(deleted_files_count))
|
||||||
|
|
||||||
|
def set_cve_whitelist(self, expires_at=None, expected_status_code=200, *cve_ids, **kwargs):
|
||||||
|
client = self._get_client(**kwargs)
|
||||||
|
cve_list = [swagger_client.CVEWhitelistItem(cve_id=c) for c in cve_ids]
|
||||||
|
whitelist = swagger_client.CVEWhitelist(expires_at=expires_at, items=cve_list)
|
||||||
|
try:
|
||||||
|
r = client.system_cve_whitelist_put_with_http_info(whitelist=whitelist, _preload_content=False)
|
||||||
|
except Exception as e:
|
||||||
|
base._assert_status_code(expected_status_code, e.status)
|
||||||
|
else:
|
||||||
|
base._assert_status_code(expected_status_code, r[1])
|
||||||
|
|
||||||
|
def get_cve_whitelist(self, **kwargs):
|
||||||
|
client = self._get_client(**kwargs)
|
||||||
|
return client.system_cve_whitelist_get()
|
||||||
|
@ -70,14 +70,14 @@ class User(base.Base):
|
|||||||
base._assert_status_code(200, status_code)
|
base._assert_status_code(200, status_code)
|
||||||
return user_id
|
return user_id
|
||||||
|
|
||||||
def update_uesr_profile(self, user_id, email=None, realname=None, comment=None, **kwargs):
|
def update_user_profile(self, user_id, email=None, realname=None, comment=None, **kwargs):
|
||||||
client = self._get_client(**kwargs)
|
client = self._get_client(**kwargs)
|
||||||
user_rofile = swagger_client.UserProfile(email, realname, comment)
|
user_rofile = swagger_client.UserProfile(email, realname, comment)
|
||||||
_, status_code, _ = client.users_user_id_put_with_http_info(user_id, user_rofile)
|
_, status_code, _ = client.users_user_id_put_with_http_info(user_id, user_rofile)
|
||||||
base._assert_status_code(200, status_code)
|
base._assert_status_code(200, status_code)
|
||||||
return user_id
|
return user_id
|
||||||
|
|
||||||
def update_uesr_role_as_sysadmin(self, user_id, IsAdmin, **kwargs):
|
def update_user_role_as_sysadmin(self, user_id, IsAdmin, **kwargs):
|
||||||
client = self._get_client(**kwargs)
|
client = self._get_client(**kwargs)
|
||||||
has_admin_role = swagger_client.HasAdminRole(IsAdmin)
|
has_admin_role = swagger_client.HasAdminRole(IsAdmin)
|
||||||
print "has_admin_role:", has_admin_role
|
print "has_admin_role:", has_admin_role
|
||||||
|
@ -45,15 +45,15 @@ class TestProjects(unittest.TestCase):
|
|||||||
USER_ASSIGN_SYS_ADMIN_CLIENT=dict(endpoint = url, username = user_assign_sys_admin_name, password = user_assign_sys_admin_password)
|
USER_ASSIGN_SYS_ADMIN_CLIENT=dict(endpoint = url, username = user_assign_sys_admin_name, password = user_assign_sys_admin_password)
|
||||||
|
|
||||||
#2. Set user(UA) has sysadmin role by admin, check user(UA) can modify system configuration;
|
#2. Set user(UA) has sysadmin role by admin, check user(UA) can modify system configuration;
|
||||||
self.user.update_uesr_role_as_sysadmin(TestProjects.user_assign_sys_admin_id, True, **ADMIN_CLIENT)
|
self.user.update_user_role_as_sysadmin(TestProjects.user_assign_sys_admin_id, True, **ADMIN_CLIENT)
|
||||||
self.conf.set_configurations_of_token_expiration(60, **USER_ASSIGN_SYS_ADMIN_CLIENT)
|
self.conf.set_configurations_of_token_expiration(60, **USER_ASSIGN_SYS_ADMIN_CLIENT)
|
||||||
|
|
||||||
#3. Set user(UA) has no sysadmin role by admin, check user(UA) can not modify system configuration;
|
#3. Set user(UA) has no sysadmin role by admin, check user(UA) can not modify system configuration;
|
||||||
self.user.update_uesr_role_as_sysadmin(TestProjects.user_assign_sys_admin_id, False, **ADMIN_CLIENT)
|
self.user.update_user_role_as_sysadmin(TestProjects.user_assign_sys_admin_id, False, **ADMIN_CLIENT)
|
||||||
self.conf.set_configurations_of_token_expiration(70, expect_status_code = 403, **USER_ASSIGN_SYS_ADMIN_CLIENT)
|
self.conf.set_configurations_of_token_expiration(70, expect_status_code = 403, **USER_ASSIGN_SYS_ADMIN_CLIENT)
|
||||||
|
|
||||||
#4. Set user(UA) has sysadmin role by admin, check user(UA) can modify system configuration.
|
#4. Set user(UA) has sysadmin role by admin, check user(UA) can modify system configuration.
|
||||||
self.user.update_uesr_role_as_sysadmin(TestProjects.user_assign_sys_admin_id, True, **ADMIN_CLIENT)
|
self.user.update_user_role_as_sysadmin(TestProjects.user_assign_sys_admin_id, True, **ADMIN_CLIENT)
|
||||||
self.conf.set_configurations_of_token_expiration(80, **USER_ASSIGN_SYS_ADMIN_CLIENT)
|
self.conf.set_configurations_of_token_expiration(80, **USER_ASSIGN_SYS_ADMIN_CLIENT)
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
|
73
tests/apitests/python/test_sys_cve_whitelists.py
Normal file
73
tests/apitests/python/test_sys_cve_whitelists.py
Normal file
@ -0,0 +1,73 @@
|
|||||||
|
from __future__ import absolute_import
|
||||||
|
|
||||||
|
import unittest
|
||||||
|
import swagger_client
|
||||||
|
import time
|
||||||
|
|
||||||
|
from testutils import ADMIN_CLIENT
|
||||||
|
from library.user import User
|
||||||
|
from library.system import System
|
||||||
|
|
||||||
|
|
||||||
|
class TestSysCVEWhitelist(unittest.TestCase):
|
||||||
|
"""
|
||||||
|
Test case:
|
||||||
|
System Level CVE Whitelist
|
||||||
|
Setup:
|
||||||
|
Create user(RA)
|
||||||
|
Test Steps:
|
||||||
|
1. User(RA) reads the system level CVE whitelist and it's empty.
|
||||||
|
2. User(RA) updates the system level CVE whitelist, verify it's failed.
|
||||||
|
3. Update user(RA) to system admin
|
||||||
|
4. User(RA) updates the system level CVE whitelist, verify it's successful.
|
||||||
|
5. User(RA) reads the system level CVE whitelist, verify the CVE list is updated.
|
||||||
|
6. User(RA) updates the expiration date of system level CVE whitelist.
|
||||||
|
7. User(RA) reads the system level CVE whitelist, verify the expiration date is updated.
|
||||||
|
Tear Down:
|
||||||
|
1. Clear the system level CVE whitelist.
|
||||||
|
2. Delete User(RA)
|
||||||
|
"""
|
||||||
|
def setUp(self):
|
||||||
|
self.user = User()
|
||||||
|
self.system = System()
|
||||||
|
user_ra_password = "Aa123456"
|
||||||
|
print("Setup: Creating user for test")
|
||||||
|
user_ra_id, user_ra_name = self.user.create_user(user_password=user_ra_password, **ADMIN_CLIENT)
|
||||||
|
print("Created user: %s, id: %s" % (user_ra_name, user_ra_id))
|
||||||
|
self.USER_RA_CLIENT = dict(endpoint=ADMIN_CLIENT["endpoint"],
|
||||||
|
username=user_ra_name,
|
||||||
|
password=user_ra_password)
|
||||||
|
self.user_ra_id = int(user_ra_id)
|
||||||
|
|
||||||
|
def testSysCVEWhitelist(self):
|
||||||
|
# 1. User(RA) reads the system level CVE whitelist and it's empty.
|
||||||
|
wl = self.system.get_cve_whitelist(**self.USER_RA_CLIENT)
|
||||||
|
self.assertEqual(0, len(wl.items), "The initial system level CVE whitelist is not empty: %s" % wl.items)
|
||||||
|
# 2. User(RA) updates the system level CVE whitelist, verify it's failed.
|
||||||
|
cves = ['CVE-2019-12310']
|
||||||
|
self.system.set_cve_whitelist(None, 403, *cves, **self.USER_RA_CLIENT)
|
||||||
|
# 3. Update user(RA) to system admin
|
||||||
|
self.user.update_user_role_as_sysadmin(self.user_ra_id, True, **ADMIN_CLIENT)
|
||||||
|
# 4. User(RA) updates the system level CVE whitelist, verify it's successful.
|
||||||
|
self.system.set_cve_whitelist(None, 200, *cves, **self.USER_RA_CLIENT)
|
||||||
|
# 5. User(RA) reads the system level CVE whitelist, verify the CVE list is updated.
|
||||||
|
expect_wl = [swagger_client.CVEWhitelistItem(cve_id='CVE-2019-12310')]
|
||||||
|
wl = self.system.get_cve_whitelist(**self.USER_RA_CLIENT)
|
||||||
|
self.assertIsNone(wl.expires_at)
|
||||||
|
self.assertEqual(expect_wl, wl.items)
|
||||||
|
# 6. User(RA) updates the expiration date of system level CVE whitelist.
|
||||||
|
exp = int(time.time()) + 3600
|
||||||
|
self.system.set_cve_whitelist(exp, 200, *cves, **self.USER_RA_CLIENT)
|
||||||
|
# 7. User(RA) reads the system level CVE whitelist, verify the expiration date is updated.
|
||||||
|
wl = self.system.get_cve_whitelist(**self.USER_RA_CLIENT)
|
||||||
|
self.assertEqual(exp, wl.expires_at)
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
print("TearDown: Clearing the Whitelist")
|
||||||
|
self.system.set_cve_whitelist(**ADMIN_CLIENT)
|
||||||
|
print("TearDown: Deleting user: %d" % self.user_ra_id)
|
||||||
|
self.user.delete_user(self.user_ra_id, **ADMIN_CLIENT)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
unittest.main()
|
Loading…
Reference in New Issue
Block a user