Merge pull request #12989 from danfengliu/Add-Checkpoint-for-LDAP-group-py-test

Add checkpoint for LDAP group py-test
This commit is contained in:
danfengliu 2020-09-15 14:15:31 +08:00 committed by GitHub
commit d99ea887f1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 206 additions and 25 deletions

View File

@ -189,7 +189,7 @@ class Project(base.Base):
base._assert_status_code(expect_status_code, status_code)
base._assert_status_code(200, status_code)
def add_project_members(self, project_id, user_id = None, member_role_id = None, _ldap_group_dn=None,expect_status_code = 201, **kwargs):
def add_project_members(self, project_id, user_id = None, member_role_id = None, _ldap_group_dn=None, expect_status_code = 201, **kwargs):
kwargs['api_type'] = 'products'
projectMember = swagger_client.ProjectMember()
if user_id is not None:
@ -203,9 +203,13 @@ class Project(base.Base):
client = self._get_client(**kwargs)
data = []
data, status_code, header = client.projects_project_id_members_post_with_http_info(project_id, project_member = projectMember)
base._assert_status_code(expect_status_code, status_code)
return base._get_id_from_header(header)
try:
data, status_code, header = client.projects_project_id_members_post_with_http_info(project_id, project_member = projectMember)
except swagger_client.rest.ApiException as e:
base._assert_status_code(expect_status_code, e.status)
else:
base._assert_status_code(expect_status_code, status_code)
return base._get_id_from_header(header)
def add_project_robot_account(self, project_id, project_name, expires_at, robot_name = None, robot_desc = None, has_pull_right = True, has_push_right = True, has_chart_read_right = True, has_chart_create_right = True, expect_status_code = 201, **kwargs):
kwargs['api_type'] = 'products'

View File

@ -2,11 +2,12 @@
import base
import swagger_client
from swagger_client.rest import ApiException
class User(base.Base):
def create_user(self, name=None,
email = None, user_password=None, realname = None, role_id = None, **kwargs):
email = None, user_password=None, realname = None, role_id = None, expect_status_code=201, **kwargs):
if name is None:
name = base._random_name("user")
if realname is None:
@ -20,13 +21,16 @@ class User(base.Base):
client = self._get_client(**kwargs)
user = swagger_client.User(username = name, email = email, password = user_password, realname = realname, role_id = role_id)
_, status_code, header = client.users_post_with_http_info(user)
base._assert_status_code(201, status_code)
try:
_, status_code, header = client.users_post_with_http_info(user)
except ApiException as e:
base._assert_status_code(expect_status_code, e.status)
else:
base._assert_status_code(expect_status_code, status_code)
return base._get_id_from_header(header), name
return base._get_id_from_header(header), name
def get_users(self, user_name=None, email=None, page=None, page_size=None, **kwargs):
def get_users(self, user_name=None, email=None, page=None, page_size=None, expect_status_code=200, **kwargs):
client = self._get_client(**kwargs)
params={}
if user_name is not None:
@ -37,9 +41,13 @@ class User(base.Base):
params["page"] = page
if page_size is not None:
params["page_size"] = page_size
data, status_code, _ = client.users_get_with_http_info(**params)
base._assert_status_code(200, status_code)
return data
try:
data, status_code, _ = client.users_get_with_http_info(**params)
except ApiException as e:
base._assert_status_code(expect_status_code, e.status)
else:
base._assert_status_code(expect_status_code, status_code)
return data
def get_user_by_id(self, user_id, **kwargs):
client = self._get_client(**kwargs)
@ -47,8 +55,8 @@ class User(base.Base):
base._assert_status_code(200, status_code)
return data
def get_user_by_name(self, name, **kwargs):
users = self.get_users(user_name=name, **kwargs)
def get_user_by_name(self, name, expect_status_code=200, **kwargs):
users = self.get_users(user_name=name, expect_status_code=expect_status_code , **kwargs)
for user in users:
if user.username == name:
return user

View File

@ -10,7 +10,6 @@ from library.user import User
from library.repository import Repository
from library.repository import push_image_to_project
from library.artifact import Artifact
from library.scan import Scan
from library.scanner import Scanner
from library.configurations import Configurations
from library.projectV2 import ProjectV2
@ -23,7 +22,7 @@ class TestAssignRoleToLdapGroup(unittest.TestCase):
self.project = Project()
self.artifact = Artifact()
self.repo = Repository()
self.scan = Scan()
self.user= User()
@classmethod
def tearDown(self):
@ -38,17 +37,19 @@ class TestAssignRoleToLdapGroup(unittest.TestCase):
2. Create a new public project(PA) by Admin;
3. Add 3 member groups to project(PA);
4. Push image by each member role;
5. Verfify that admin_user and dev_user can push image, guest_user can not push image;
6. Verfify that admin_user, dev_user and guest_user can view logs, test user can not view logs.
7. Delete repository(RA) by user(UA);
8. Delete project(PA);
5. Verfify that admin_user can add project member, dev_user and guest_user can not add project member;
6. Verfify that admin_user and dev_user can push image, guest_user can not push image;
7. Verfify that admin_user, dev_user and guest_user can view logs, test user can not view logs.
8. Delete repository(RA) by user(UA);
9. Delete project(PA);
"""
url = ADMIN_CLIENT["endpoint"]
USER_ADMIN=dict(endpoint = url, username = "admin_user", password = "zhu88jie", repo = "hello-world")
USER_DEV=dict(endpoint = url, username = "dev_user", password = "zhu88jie", repo = "alpine")
USER_GUEST=dict(endpoint = url, username = "guest_user", password = "zhu88jie", repo = "busybox")
USER_TEST=dict(endpoint = url, username = "test", password = "123456")
USER_MIKE=dict(endpoint = url, username = "mike", password = "zhu88jie")
#USER001 is in group harbor_group3
self.conf.set_configurations_of_ldap(ldap_filter="", ldap_group_attribute_name="cn", ldap_group_base_dn="ou=groups,dc=example,dc=com",
ldap_group_search_filter="objectclass=groupOfNames", ldap_group_search_scope=2, **ADMIN_CLIENT)
@ -56,10 +57,19 @@ class TestAssignRoleToLdapGroup(unittest.TestCase):
self.project.add_project_members(project_id, member_role_id = 1, _ldap_group_dn = "cn=harbor_admin,ou=groups,dc=example,dc=com", **ADMIN_CLIENT)
self.project.add_project_members(project_id, member_role_id = 2, _ldap_group_dn = "cn=harbor_dev,ou=groups,dc=example,dc=com", **ADMIN_CLIENT)
self.project.add_project_members(project_id, member_role_id = 3, _ldap_group_dn = "cn=harbor_guest,ou=groups,dc=example,dc=com", **ADMIN_CLIENT)
projects = self.project.get_projects(dict(name=project_name), **USER_ADMIN)
self.assertTrue(len(projects) == 1)
self.assertEqual(1, projects[0].current_user_role_id)
#Mike has logged in harbor in previous test.
mike = self.user.get_user_by_name(USER_MIKE["username"], **ADMIN_CLIENT)
#Verify role difference in add project member feature, to distinguish between admin and dev role
self.project.add_project_members(project_id, user_id=mike.user_id, member_role_id = 3, **USER_ADMIN)
self.project.add_project_members(project_id, user_id=mike.user_id, member_role_id = 3, expect_status_code=403, **USER_DEV)
self.project.add_project_members(project_id, user_id=mike.user_id, member_role_id = 3, expect_status_code=403, **USER_GUEST)
repo_name_admin, _ = push_image_to_project(project_name, harbor_server, USER_ADMIN["username"], USER_ADMIN["password"], USER_ADMIN["repo"], "latest")
artifacts = self.artifact.list_artifacts(project_name, USER_ADMIN["repo"], **USER_ADMIN)
self.assertTrue(len(artifacts) == 1)
@ -70,7 +80,6 @@ class TestAssignRoleToLdapGroup(unittest.TestCase):
artifacts = self.artifact.list_artifacts(project_name, USER_GUEST["repo"], **USER_GUEST)
self.assertTrue(len(artifacts) == 0)
self.assertTrue(self.project.query_user_logs(project_name, **USER_ADMIN)>0, "admin user can see logs")
self.assertTrue(self.project.query_user_logs(project_name, **USER_DEV)>0, "dev user can see logs")
self.assertTrue(self.project.query_user_logs(project_name, **USER_GUEST)>0, "guest user can see logs")

View File

@ -14,7 +14,7 @@ class TestLdapAdminRole(unittest.TestCase):
def setUp(self):
url = ADMIN_CLIENT["endpoint"]
self.conf= Configurations()
self.uesr = User()
self.user = User()
self.project = Project()
self.USER_MIKE=dict(endpoint = url, username = "mike", password = "zhu88jie")
@ -41,7 +41,7 @@ class TestLdapAdminRole(unittest.TestCase):
TestLdapAdminRole.project_id, project_name = self.project.create_project(metadata = {"public": "false"}, **self.USER_MIKE)
self.project.check_project_name_exist(name=project_name, **self.USER_MIKE)
_user = self.uesr.get_user_by_name(self.USER_MIKE["username"], **ADMIN_CLIENT)
_user = self.user.get_user_by_name(self.USER_MIKE["username"], **ADMIN_CLIENT)
self.assertFalse(_user.sysadmin_flag)

View File

@ -23,6 +23,18 @@
{
"branch":2,
"version":"1.10"
},
{
"branch":2,
"version":"2.0"
},
{
"branch":2,
"version":"2.1"
},
{
"branch":2,
"version":"2.2"
}
],
"add_member":[
@ -49,6 +61,18 @@
{
"branch":2,
"version":"1.10"
},
{
"branch":2,
"version":"2.0"
},
{
"branch":2,
"version":"2.1"
},
{
"branch":2,
"version":"2.2"
}
],
"set_user_admin":[
@ -75,6 +99,18 @@
{
"branch":2,
"version":"1.10"
},
{
"branch":2,
"version":"2.0"
},
{
"branch":2,
"version":"2.1"
},
{
"branch":2,
"version":"2.2"
}
],
"add_endpoint":[
@ -101,6 +137,18 @@
{
"branch":2,
"version":"1.10"
},
{
"branch":2,
"version":"2.0"
},
{
"branch":2,
"version":"2.1"
},
{
"branch":2,
"version":"2.2"
}
],
"add_replication_rule":[
@ -127,6 +175,18 @@
{
"branch":2,
"version":"1.10"
},
{
"branch":2,
"version":"2.0"
},
{
"branch":2,
"version":"2.1"
},
{
"branch":2,
"version":"2.2"
}
],
"add_sys_allowlist":[
@ -137,6 +197,18 @@
{
"branch":1,
"version":"1.10"
},
{
"branch":1,
"version":"2.0"
},
{
"branch":1,
"version":"2.1"
},
{
"branch":1,
"version":"2.2"
}
],
"update_project_setting_allowlist":[
@ -147,6 +219,18 @@
{
"branch":1,
"version":"1.10"
},
{
"branch":1,
"version":"2.0"
},
{
"branch":1,
"version":"2.1"
},
{
"branch":1,
"version":"2.2"
}
],
"add_project_robot_account":[
@ -161,6 +245,18 @@
{
"branch":1,
"version":"1.10"
},
{
"branch":1,
"version":"2.0"
},
{
"branch":1,
"version":"2.1"
},
{
"branch":1,
"version":"2.2"
}
],
"add_tag_retention_rule":[
@ -171,12 +267,36 @@
{
"branch":1,
"version":"1.10"
},
{
"branch":1,
"version":"2.0"
},
{
"branch":1,
"version":"2.1"
},
{
"branch":1,
"version":"2.2"
}
],
"add_tag_immutability_rule":[
{
"branch":1,
"version":"1.10"
},
{
"branch":1,
"version":"2.0"
},
{
"branch":1,
"version":"2.1"
},
{
"branch":1,
"version":"2.2"
}
],
"add_webhook":[
@ -187,6 +307,18 @@
{
"branch":1,
"version":"1.10"
},
{
"branch":1,
"version":"2.0"
},
{
"branch":1,
"version":"2.1"
},
{
"branch":1,
"version":"2.2"
}
],
"update_interrogation_services":[
@ -197,6 +329,32 @@
{
"branch":1,
"version":"1.10"
},
{
"branch":1,
"version":"2.0"
},
{
"branch":1,
"version":"2.1"
},
{
"branch":1,
"version":"2.2"
}
],
"push_artifact":[
{
"branch":1,
"version":"2.0"
},
{
"branch":1,
"version":"2.1"
},
{
"branch":1,
"version":"2.2"
}
]
}

View File

@ -403,6 +403,8 @@ class HarborAPI:
pass
open(target, 'wb').write(ca_content.encode('utf-8'))
@get_feature_branch
def push_artifact(self, project, **kwargs):
def request(url, method, user = None, userp = None, **kwargs):
if user is None: