mirror of
https://github.com/goharbor/harbor.git
synced 2024-12-26 18:48:02 +01:00
Add user API test case (#19638)
Fix #19280 Signed-off-by: Yang Jiao <jiaoya@vmware.com>
This commit is contained in:
parent
f26b9f52e9
commit
7cef4217b0
@ -51,9 +51,17 @@ class User(base.Base, object):
|
||||
base._assert_status_code(expect_status_code, status_code)
|
||||
return data
|
||||
|
||||
def get_user_by_id(self, user_id, **kwargs):
|
||||
data, status_code, _ = self._get_client(**kwargs).get_user_with_http_info(user_id)
|
||||
base._assert_status_code(200, status_code)
|
||||
def get_user_by_id(self, user_id, expect_status_code=200, expect_response_body=None, **kwargs):
|
||||
data = None
|
||||
status_code = None
|
||||
try:
|
||||
data, status_code, _ = self._get_client(**kwargs).get_user_with_http_info(user_id)
|
||||
except ApiException as e:
|
||||
base._assert_status_code(expect_status_code, e.status)
|
||||
if expect_response_body is not None:
|
||||
base._assert_status_body(expect_response_body, e.body)
|
||||
return
|
||||
base._assert_status_code(expect_status_code, status_code)
|
||||
return data
|
||||
|
||||
def get_user_by_name(self, name, expect_status_code=200, **kwargs):
|
||||
@ -92,3 +100,29 @@ class User(base.Base, object):
|
||||
_, status_code, _ = self._get_client(**kwargs).set_user_sys_admin_with_http_info(user_id, sysadmin_flag)
|
||||
base._assert_status_code(200, status_code)
|
||||
return user_id
|
||||
|
||||
def search_user_by_username(self, user_name, expect_status_code=200, expect_response_body=None, **kwargs):
|
||||
return_data = None
|
||||
status_code = None
|
||||
try:
|
||||
return_data, status_code, _ = self._get_client(**kwargs).search_users_with_http_info(user_name)
|
||||
except ApiException as e:
|
||||
base._assert_status_code(expect_status_code, e.status)
|
||||
if expect_response_body is not None:
|
||||
base._assert_status_body(expect_response_body, e.body)
|
||||
return
|
||||
base._assert_status_code(expect_status_code, status_code)
|
||||
return return_data
|
||||
|
||||
def get_current_user_permissions(self, scope, relative, expect_status_code=200, expect_response_body=None, **kwargs):
|
||||
return_data = None
|
||||
status_code = None
|
||||
try:
|
||||
return_data, status_code, _ = self._get_client(**kwargs).get_current_user_permissions_with_http_info(scope=scope, relative=relative)
|
||||
except ApiException as e:
|
||||
base._assert_status_code(expect_status_code, e.status)
|
||||
if expect_response_body is not None:
|
||||
base._assert_status_body(expect_response_body, e.body)
|
||||
return
|
||||
base._assert_status_code(expect_status_code, status_code)
|
||||
return return_data
|
||||
|
106
tests/apitests/python/test_user_crud.py
Normal file
106
tests/apitests/python/test_user_crud.py
Normal file
@ -0,0 +1,106 @@
|
||||
from __future__ import absolute_import
|
||||
|
||||
|
||||
import unittest
|
||||
import time
|
||||
from testutils import ADMIN_CLIENT, suppress_urllib3_warning
|
||||
from library.user import User
|
||||
|
||||
|
||||
class TestUser(unittest.TestCase):
|
||||
|
||||
|
||||
@suppress_urllib3_warning
|
||||
def setUp(self):
|
||||
self.user = User()
|
||||
|
||||
|
||||
def testUser(self):
|
||||
"""
|
||||
Test case:
|
||||
User CRUD
|
||||
Test step and expected result:
|
||||
1. Create a new user(UA);
|
||||
2. List all users, there should be one user(UA);
|
||||
3. Get current user by user(UA), it should be user(UA;
|
||||
4. Search user(UA) by name, it should be user(UA);
|
||||
5. Get user profile by user(UA), it should be user(UA);
|
||||
6. Update user profile by user(UA);
|
||||
7. Update user to admin;
|
||||
8. Update user password by user(UA);
|
||||
9. Update user password by admin;
|
||||
10. Get current user permissions by user(UA);
|
||||
11. Delete user(UA);
|
||||
"""
|
||||
url = ADMIN_CLIENT["endpoint"]
|
||||
user_password = "Aa123456"
|
||||
|
||||
# 1. Create a new user(UA);
|
||||
user_id, user_name = self.user.create_user(user_password=user_password, **ADMIN_CLIENT)
|
||||
timestamp = user_name.split("-")[1]
|
||||
USER_CLIENT=dict(endpoint=url, username=user_name, password=user_password)
|
||||
|
||||
# 2. List all users, there should be one user(UA);
|
||||
users = self.user.get_users(**ADMIN_CLIENT)
|
||||
self.assertIsNotNone(users)
|
||||
|
||||
# 3. Get current user by user(UA), it should be user(UA;
|
||||
current_user = self.user.get_user_current(**USER_CLIENT)
|
||||
self.check_user(current_user, user_name, user_id, timestamp)
|
||||
|
||||
# 4. Search user(UA) by name, it should be user(UA);
|
||||
users = self.user.search_user_by_username(user_name, **USER_CLIENT)
|
||||
user = users[0]
|
||||
self.assertEqual(len(users), 1)
|
||||
self.assertEqual(user.username, user_name)
|
||||
self.assertEqual(user.user_id, user_id)
|
||||
|
||||
# 5. Get user profile by user(UA), it should be user(UA);
|
||||
user = self.user.get_user_by_id(user_id, **USER_CLIENT)
|
||||
self.check_user(user, user_name, user_id, timestamp)
|
||||
|
||||
# 6. Update user profile by user(UA);
|
||||
timestamp = int(round(time.time() * 1000))
|
||||
comment = "For testing"
|
||||
self.user.update_user_profile(user_id, email="realname-{}@harbortest.com".format(timestamp), realname="realname-{}".format(timestamp), comment=comment, **USER_CLIENT)
|
||||
user = self.user.get_user_by_id(user_id, **USER_CLIENT)
|
||||
self.check_user(user, user_name, user_id, timestamp, comment)
|
||||
|
||||
# 7. Update user to admin;
|
||||
self.user.update_user_role_as_sysadmin(user_id, True, **ADMIN_CLIENT)
|
||||
user = self.user.get_user_by_id(user_id, **USER_CLIENT)
|
||||
self.check_user(user, user_name, user_id, timestamp, comment, True)
|
||||
|
||||
# 8. Update user password by user(UA);
|
||||
new_password = "Aa1234567-New"
|
||||
self.user.update_user_pwd(user_id, new_password=new_password, old_password=user_password, **USER_CLIENT)
|
||||
self.user.search_user_by_username(user_name, expect_status_code=401, expect_response_body="unauthorized", **USER_CLIENT)
|
||||
USER_CLIENT["password"] = new_password
|
||||
|
||||
# 9. Update user password by admin;
|
||||
new_password = "Aa1234567-New-Edit"
|
||||
self.user.update_user_pwd(user_id, new_password=new_password, old_password=USER_CLIENT["password"], **ADMIN_CLIENT)
|
||||
self.user.search_user_by_username(user_name, expect_status_code=401, expect_response_body="unauthorized", **USER_CLIENT)
|
||||
USER_CLIENT["password"] = new_password
|
||||
|
||||
# 10. Get current user permissions by user(UA);
|
||||
permissions = self.user.get_current_user_permissions(scope="/project/1/repository", relative=True, **USER_CLIENT)
|
||||
self.assertTrue(len(permissions) > 0)
|
||||
|
||||
# 11. Delete user(UA);
|
||||
self.user.delete_user(user_id, **ADMIN_CLIENT)
|
||||
self.user.get_user_by_id(user_id, expect_status_code=404, expect_response_body="user {} not found".format(user_id), **ADMIN_CLIENT)
|
||||
|
||||
|
||||
def check_user(self, user, user_name, user_id, timestamp, comment=None, sysadmin_flag=False):
|
||||
self.assertEqual(user.username, user_name)
|
||||
self.assertEqual(user.user_id, user_id)
|
||||
self.assertEqual(user.email, "realname-{}@harbortest.com".format(timestamp))
|
||||
self.assertEqual(user.comment, comment)
|
||||
self.assertEqual(user.realname, "realname-{}".format(timestamp))
|
||||
self.assertEqual(user.sysadmin_flag, sysadmin_flag)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
@ -202,3 +202,7 @@ Test Case - Security Hub
|
||||
Test Case - Banner Message
|
||||
[Tags] banner_message
|
||||
Harbor API Test ./tests/apitests/python/test_banner_message.py
|
||||
|
||||
Test Case - User CRUD
|
||||
[Tags] user_crud
|
||||
Harbor API Test ./tests/apitests/python/test_user_crud.py
|
||||
|
Loading…
Reference in New Issue
Block a user