Upgrade python and robot framework

Signed-off-by: danfengliu <danfengl@vmware.com>
This commit is contained in:
danfengliu 2020-08-09 07:10:00 +00:00
parent 417e12ecaf
commit ed7029d0cc
34 changed files with 258 additions and 334 deletions

View File

@ -451,10 +451,10 @@ down:
swagger_client: swagger_client:
@echo "Generate swagger client" @echo "Generate swagger client"
wget -q https://repo1.maven.org/maven2/io/swagger/swagger-codegen-cli/2.3.1/swagger-codegen-cli-2.3.1.jar -O swagger-codegen-cli.jar wget https://repo1.maven.org/maven2/org/openapitools/openapi-generator-cli/4.3.1/openapi-generator-cli-4.3.1.jar -O openapi-generator-cli.jar
rm -rf harborclient rm -rf harborclient
mkdir harborclient mkdir harborclient
java -jar swagger-codegen-cli.jar generate -i api/harbor/swagger.yaml -l python -o harborclient java -jar openapi-generator-cli.jar generate -i api/harbor/swagger.yaml -g python -o harborclient
cd harborclient; python ./setup.py install cd harborclient; python ./setup.py install
pip install docker -q pip install docker -q
pip freeze pip freeze

View File

@ -1,6 +1,4 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import site
reload(site)
import project import project
import label import label
import registry import registry

View File

@ -14,7 +14,6 @@ class Chart(base.Base):
def chart_should_exist(self, repository, chart_name, **kwargs): def chart_should_exist(self, repository, chart_name, **kwargs):
charts_data = self.get_charts(repository, **kwargs) charts_data = self.get_charts(repository, **kwargs)
print "charts_data:", charts_data
for chart in charts_data: for chart in charts_data:
if chart.name == chart_name: if chart.name == chart_name:
return True return True

View File

@ -11,6 +11,16 @@ def set_configurations(client, expect_status_code = 200, expect_response_body =
conf.project_creation_restriction = config.get("project_creation_restriction") conf.project_creation_restriction = config.get("project_creation_restriction")
if "token_expiration" in config: if "token_expiration" in config:
conf.token_expiration = config.get("token_expiration") conf.token_expiration = config.get("token_expiration")
if "ldap_filter" in config:
conf.ldap_filter = config.get("ldap_filter")
if "ldap_group_attribute_name" in config:
conf.ldap_group_attribute_name = config.get("ldap_group_attribute_name")
if "ldap_group_base_dn" in config:
conf.ldap_group_base_dn = config.get("ldap_group_base_dn")
if "ldap_group_search_filter" in config:
conf.ldap_group_search_filter = config.get("ldap_group_search_filter")
if "ldap_group_search_scope" in config:
conf.ldap_group_search_scope = config.get("ldap_group_search_scope")
try: try:
_, status_code, _ = client.configurations_put_with_http_info(conf) _, status_code, _ = client.configurations_put_with_http_info(conf)
@ -56,3 +66,11 @@ class Configurations(base.Base):
config=dict(token_expiration=token_expiration) config=dict(token_expiration=token_expiration)
set_configurations(client, expect_status_code = expect_status_code, **config) set_configurations(client, expect_status_code = expect_status_code, **config)
def set_configurations_of_ldap(self, ldap_filter=None, ldap_group_attribute_name=None,
ldap_group_base_dn=None, ldap_group_search_filter=None, ldap_group_search_scope=None, expect_status_code = 200, **kwargs):
client = self._get_client(**kwargs)
config=dict(ldap_filter=ldap_filter, ldap_group_attribute_name=ldap_group_attribute_name,
ldap_group_base_dn=ldap_group_base_dn, ldap_group_search_filter=ldap_group_search_filter, ldap_group_search_scope=ldap_group_search_scope)
set_configurations(client, expect_status_code = expect_status_code, **config)

View File

@ -19,9 +19,9 @@ class DockerAPI(object):
expected_error_message = None expected_error_message = None
try: try:
self.DCLIENT.login(registry = registry, username=username, password=password) self.DCLIENT.login(registry = registry, username=username, password=password)
except docker.errors.APIError, err: except docker.errors.APIError as err:
if expected_error_message is not None: if expected_error_message is not None:
print "docker login error:", str(err) print( "docker login error:", str(err))
if str(err).lower().find(expected_error_message.lower()) < 0: if str(err).lower().find(expected_error_message.lower()) < 0:
raise Exception(r"Docker login: Return message {} is not as expected {}".format(str(err), expected_error_message)) raise Exception(r"Docker login: Return message {} is not as expected {}".format(str(err), expected_error_message))
else: else:
@ -37,11 +37,12 @@ class DockerAPI(object):
caught_err = False caught_err = False
ret = "" ret = ""
try: try:
ret = base._get_string_from_unicode(self.DCLIENT.pull(r'{}:{}'.format(image, _tag))) self.DCLIENT.pull(r'{}:{}'.format(image, _tag))
except Exception, err: return ret
except Exception as err:
caught_err = True caught_err = True
if expected_error_message is not None: if expected_error_message is not None:
print "docker image pull error:", str(err) print( "docker image pull error:", str(err))
if str(err).lower().find(expected_error_message.lower()) < 0: if str(err).lower().find(expected_error_message.lower()) < 0:
raise Exception(r"Pull image: Return message {} is not as expected {}".format(str(err), expected_error_message)) raise Exception(r"Pull image: Return message {} is not as expected {}".format(str(err), expected_error_message))
else: else:
@ -49,7 +50,7 @@ class DockerAPI(object):
if caught_err == False: if caught_err == False:
if expected_error_message is not None: if expected_error_message is not None:
if str(ret).lower().find(expected_error_message.lower()) < 0: if str(ret).lower().find(expected_error_message.lower()) < 0:
raise Exception(r" Failed to catch error [{}] when pull image {}".format (expected_error_message, image)) raise Exception(r" Failed to catch error [{}] when pull image {}, return message: {}".format (expected_error_message, image, str(ret)))
else: else:
if str(ret).lower().find("error".lower()) >= 0: if str(ret).lower().find("error".lower()) >= 0:
raise Exception(r" It's was not suppose to catch error when pull image {}, return message is [{}]".format (image, ret)) raise Exception(r" It's was not suppose to catch error when pull image {}, return message is [{}]".format (image, ret))
@ -61,7 +62,7 @@ class DockerAPI(object):
try: try:
self.DCLIENT.tag(image, harbor_registry, _tag, force=True) self.DCLIENT.tag(image, harbor_registry, _tag, force=True)
return harbor_registry, _tag return harbor_registry, _tag
except docker.errors.APIError, e: except docker.errors.APIError as e:
raise Exception(r" Docker tag image {} failed, error is [{}]".format (image, e.message)) raise Exception(r" Docker tag image {} failed, error is [{}]".format (image, e.message))
def docker_image_push(self, harbor_registry, tag, expected_error_message = None): def docker_image_push(self, harbor_registry, tag, expected_error_message = None):
@ -70,11 +71,12 @@ class DockerAPI(object):
if expected_error_message is "": if expected_error_message is "":
expected_error_message = None expected_error_message = None
try: try:
ret = base._get_string_from_unicode(self.DCLIENT.push(harbor_registry, tag, stream=True)) self.DCLIENT.push(harbor_registry, tag)
except Exception, err: return ret
except Exception as err:
caught_err = True caught_err = True
if expected_error_message is not None: if expected_error_message is not None:
print "docker image push error:", str(err) print( "docker image push error:", str(err))
if str(err).lower().find(expected_error_message.lower()) < 0: if str(err).lower().find(expected_error_message.lower()) < 0:
raise Exception(r"Push image: Return message {} is not as expected {}".format(str(err), expected_error_message)) raise Exception(r"Push image: Return message {} is not as expected {}".format(str(err), expected_error_message))
else: else:
@ -82,10 +84,12 @@ class DockerAPI(object):
if caught_err == False: if caught_err == False:
if expected_error_message is not None: if expected_error_message is not None:
if str(ret).lower().find(expected_error_message.lower()) < 0: if str(ret).lower().find(expected_error_message.lower()) < 0:
raise Exception(r" Failed to catch error [{}] when push image {}".format (expected_error_message, harbor_registry)) raise Exception(r" Failed to catch error [{}] when push image {}, return message: {}".
format (expected_error_message, harbor_registry, str(ret)))
else: else:
if str(ret).lower().find("errorDetail".lower()) >= 0: if str(ret).lower().find("errorDetail".lower()) >= 0:
raise Exception(r" It's was not suppose to catch error when push image {}, return message is [{}]".format (harbor_registry, ret)) raise Exception(r" It's was not suppose to catch error when push image {}, return message is [{}]".
format (harbor_registry, ret))
def docker_image_build(self, harbor_registry, tags=None, size=1, expected_error_message = None): def docker_image_build(self, harbor_registry, tags=None, size=1, expected_error_message = None):
caught_err = False caught_err = False
@ -111,10 +115,13 @@ class DockerAPI(object):
print("build image %s with size %d" % (repo, size)) print("build image %s with size %d" % (repo, size))
self.DCLIENT.remove_image(repo) self.DCLIENT.remove_image(repo)
self.DCLIENT.remove_container(c) self.DCLIENT.remove_container(c)
except Exception, err: self.DCLIENT.pull(repo)
image = self.DCLIENT2.images.get(repo)
return repo, image.id
except Exception as err:
caught_err = True caught_err = True
if expected_error_message is not None: if expected_error_message is not None:
print "docker image build error:", str(err) print( "docker image build error:", str(err))
if str(err).lower().find(expected_error_message.lower()) < 0: if str(err).lower().find(expected_error_message.lower()) < 0:
raise Exception(r"Push image: Return message {} is not as expected {}".format(str(err), expected_error_message)) raise Exception(r"Push image: Return message {} is not as expected {}".format(str(err), expected_error_message))
else: else:
@ -122,7 +129,8 @@ class DockerAPI(object):
if caught_err == False: if caught_err == False:
if expected_error_message is not None: if expected_error_message is not None:
if str(ret).lower().find(expected_error_message.lower()) < 0: if str(ret).lower().find(expected_error_message.lower()) < 0:
raise Exception(r" Failed to catch error [{}] when push image {}".format (expected_error_message, harbor_registry)) raise Exception(r" Failed to catch error [{}] when build image {}, return message: {}".
format (expected_error_message, harbor_registry, str(ret)))
else: else:
if str(ret).lower().find("errorDetail".lower()) >= 0: if str(ret).lower().find("errorDetail".lower()) >= 0:
raise Exception(r" It's was not suppose to catch error when push image {}, return message is [{}]".format (harbor_registry, ret)) raise Exception(r" It's was not suppose to catch error when push image {}, return message is [{}]".format (harbor_registry, ret))

View File

@ -57,11 +57,14 @@ class Project(base.Base):
def check_project_name_exist(self, name=None, **kwargs): def check_project_name_exist(self, name=None, **kwargs):
client = self._get_client(**kwargs) client = self._get_client(**kwargs)
try:
_, status_code, _ = client.projects_head_with_http_info(name) _, status_code, _ = client.projects_head_with_http_info(name)
except ApiException as e:
status_code = -1
return { return {
200: True, 200: True,
404: False, 404: False,
}.get(status_code,'error') }.get(status_code,False)
def get_project(self, project_id, expect_status_code = 200, expect_response_body = None, **kwargs): def get_project(self, project_id, expect_status_code = 200, expect_response_body = None, **kwargs):
client = self._get_client(**kwargs) client = self._get_client(**kwargs)
@ -94,10 +97,14 @@ class Project(base.Base):
def get_project_log(self, project_id, expect_status_code = 200, **kwargs): def get_project_log(self, project_id, expect_status_code = 200, **kwargs):
client = self._get_client(**kwargs) client = self._get_client(**kwargs)
try:
body, status_code, _ = client.projects_project_id_logs_get_with_http_info(project_id) body, status_code, _ = client.projects_project_id_logs_get_with_http_info(project_id)
except ApiException as e:
base._assert_status_code(expect_status_code, e.status)
body=[]
else:
base._assert_status_code(expect_status_code, status_code) base._assert_status_code(expect_status_code, status_code)
return body return body
def filter_project_logs(self, project_id, operator, repository, tag, operation_type, **kwargs): def filter_project_logs(self, project_id, operator, repository, tag, operation_type, **kwargs):
access_logs = self.get_project_log(project_id, **kwargs) access_logs = self.get_project_log(project_id, **kwargs)
count = 0 count = 0
@ -162,11 +169,18 @@ class Project(base.Base):
base._assert_status_code(expect_status_code, status_code) base._assert_status_code(expect_status_code, status_code)
base._assert_status_code(200, status_code) base._assert_status_code(200, status_code)
def add_project_members(self, project_id, user_id, member_role_id = None, expect_status_code = 201, **kwargs): def add_project_members(self, project_id, user_id = None, member_role_id = None, _ldap_group_dn=None,expect_status_code = 201, **kwargs):
kwargs['api_type'] = 'products'
projectMember = swagger_client.ProjectMember()
if user_id is not None:
projectMember.member_user = {"user_id": int(user_id)}
if member_role_id is None: if member_role_id is None:
member_role_id = 1 projectMember.role_id = 1
_member_user = {"user_id": int(user_id)} else:
projectMember = swagger_client.ProjectMember(member_role_id, member_user = _member_user) projectMember.role_id = member_role_id
if _ldap_group_dn is not None:
projectMember.member_group = swagger_client.UserGroup(ldap_group_dn=_ldap_group_dn)
client = self._get_client(**kwargs) client = self._get_client(**kwargs)
data = [] data = []
data, status_code, header = client.projects_project_id_members_post_with_http_info(project_id, project_member = projectMember) data, status_code, header = client.projects_project_id_members_post_with_http_info(project_id, project_member = projectMember)
@ -215,3 +229,14 @@ class Project(base.Base):
_, status_code, _ = client.projects_project_id_robots_robot_id_delete_with_http_info(project_id, robot_id) _, status_code, _ = client.projects_project_id_robots_robot_id_delete_with_http_info(project_id, robot_id)
base._assert_status_code(expect_status_code, status_code) base._assert_status_code(expect_status_code, status_code)
base._assert_status_code(200, status_code) base._assert_status_code(200, status_code)
def query_user_logs(self, project_id, status_code=200, **kwargs):
try:
logs = self.get_project_log(project_id, expect_status_code=status_code, **kwargs)
count = 0
for log in list(logs):
count = count + 1
return count
except ApiException as e:
_assert_status_code(status_code, e.status)
return 0

View File

@ -39,7 +39,7 @@ class Replication(base.Base):
if str(rule_data.name) != str(expect_rule_name): if str(rule_data.name) != str(expect_rule_name):
raise Exception(r"Check replication rule failed, expect <{}> actual <{}>.".format(expect_rule_name, str(rule_data.name))) raise Exception(r"Check replication rule failed, expect <{}> actual <{}>.".format(expect_rule_name, str(rule_data.name)))
else: else:
print r"Check Replication rule passed, rule name <{}>.".format(str(rule_data.name)) print(r"Check Replication rule passed, rule name <{}>.".format(str(rule_data.name)))
#get_trigger = str(rule_data.trigger.kind) #get_trigger = str(rule_data.trigger.kind)
#if expect_trigger is not None and get_trigger == str(expect_trigger): #if expect_trigger is not None and get_trigger == str(expect_trigger):
# print r"Check Replication rule trigger passed, trigger name <{}>.".format(get_trigger) # print r"Check Replication rule trigger passed, trigger name <{}>.".format(get_trigger)

View File

@ -12,7 +12,7 @@ def pull_harbor_image(registry, username, password, image, tag, expected_login_e
if expected_login_error_message != None: if expected_login_error_message != None:
return return
time.sleep(2) time.sleep(2)
_docker_api.docker_image_pull(r'{}/{}'.format(registry, image), tag = tag, expected_error_message = expected_error_message) ret = _docker_api.docker_image_pull(r'{}/{}'.format(registry, image), tag = tag, expected_error_message = expected_error_message)
def push_image_to_project(project_name, registry, username, password, image, tag, expected_login_error_message = None, expected_error_message = None): def push_image_to_project(project_name, registry, username, password, image, tag, expected_login_error_message = None, expected_error_message = None):
_docker_api = DockerAPI() _docker_api = DockerAPI()
@ -129,7 +129,7 @@ class Repository(base.Base):
signatures = self.get_repo_signatures(repo_name, **kwargs) signatures = self.get_repo_signatures(repo_name, **kwargs)
for each_sign in signatures: for each_sign in signatures:
if each_sign.tag == tag and len(each_sign.hashes["sha256"]) == 44: if each_sign.tag == tag and len(each_sign.hashes["sha256"]) == 44:
print "sha256:", len(each_sign.hashes["sha256"]) print("sha256:", len(each_sign.hashes["sha256"]))
return return
raise Exception(r"Signature of {}:{} is not exist!".format(repo_name, tag)) raise Exception(r"Signature of {}:{} is not exist!".format(repo_name, tag))

View File

@ -1,10 +1,11 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import subprocess import subprocess
from testutils import notary_url
def sign_image(registry_ip, project_name, image, tag): def sign_image(registry_ip, project_name, image, tag):
try: try:
ret = subprocess.check_output(["./tests/apitests/python/sign_image.sh", registry_ip, project_name, image, tag], shell=False) ret = subprocess.check_output(["./tests/apitests/python/sign_image.sh", registry_ip, project_name, image, tag, notary_url], shell=False)
print "sign_image return: ", ret print("sign_image return: ", ret)
except subprocess.CalledProcessError, exc: except subprocess.CalledProcessError as exc:
raise Exception("Failed to sign image error is {} {}.".format(exc.returncode, exc.output)) raise Exception("Failed to sign image error is {} {}.".format(exc.returncode, exc.output))

View File

@ -45,7 +45,6 @@ class User(base.Base):
client = self._get_client(**kwargs) client = self._get_client(**kwargs)
data, status_code, _ = client.users_user_id_get_with_http_info(user_id) data, status_code, _ = client.users_user_id_get_with_http_info(user_id)
base._assert_status_code(200, status_code) base._assert_status_code(200, status_code)
print "data in lib:", data
return data return data
@ -80,7 +79,6 @@ class User(base.Base):
def update_user_role_as_sysadmin(self, user_id, IsAdmin, **kwargs): def update_user_role_as_sysadmin(self, user_id, IsAdmin, **kwargs):
client = self._get_client(**kwargs) client = self._get_client(**kwargs)
has_admin_role = swagger_client.HasAdminRole(IsAdmin) has_admin_role = swagger_client.HasAdminRole(IsAdmin)
print "has_admin_role:", has_admin_role
_, status_code, _ = client.users_user_id_sysadmin_put_with_http_info(user_id, has_admin_role) _, status_code, _ = client.users_user_id_sysadmin_put_with_http_info(user_id, has_admin_role)
base._assert_status_code(200, status_code) base._assert_status_code(200, status_code)
return user_id return user_id

View File

@ -1,11 +1,12 @@
#!/bin/sh #!/bin/sh
IP=$1 IP=$1
NOTARY_URL=$5
PASSHRASE='Harbor12345' PASSHRASE='Harbor12345'
echo $IP echo $IP
export DOCKER_CONTENT_TRUST=1 export DOCKER_CONTENT_TRUST=1
export DOCKER_CONTENT_TRUST_SERVER=https://$IP:4443 export DOCKER_CONTENT_TRUST_SERVER=$NOTARY_URL
export NOTARY_ROOT_PASSPHRASE=$PASSHRASE export NOTARY_ROOT_PASSPHRASE=$PASSHRASE
export NOTARY_TARGETS_PASSPHRASE=$PASSHRASE export NOTARY_TARGETS_PASSPHRASE=$PASSHRASE

View File

@ -10,12 +10,8 @@ from library.user import User
class TestProjects(unittest.TestCase): class TestProjects(unittest.TestCase):
"""UserGroup unit test stubs""" """UserGroup unit test stubs"""
def setUp(self): def setUp(self):
project = Project() self.project = Project()
self.project= project self.user= User()
user = User()
self.user= user
def tearDown(self): def tearDown(self):
pass pass

View File

@ -13,21 +13,14 @@ import swagger_client
class TestProjects(unittest.TestCase): class TestProjects(unittest.TestCase):
@classmethod @classmethod
def setUp(self): def setUp(self):
project = Project() self.project = Project()
self.project= project self.user = User()
self.replication = Replication()
user = User() self.registry = Registry()
self.user= user
replication = Replication()
self.replication= replication
registry = Registry()
self.registry= registry
@classmethod @classmethod
def tearDown(self): def tearDown(self):
print "Case completed" print("Case completed")
@unittest.skipIf(TEARDOWN == False, "Test data won't be erased.") @unittest.skipIf(TEARDOWN == False, "Test data won't be erased.")
def test_ClearData(self): def test_ClearData(self):
@ -76,7 +69,6 @@ class TestProjects(unittest.TestCase):
#3. Create a new registry #3. Create a new registry
TestProjects.registry_id, _ = self.registry.create_registry("https://" + harbor_server,**ADMIN_CLIENT) TestProjects.registry_id, _ = self.registry.create_registry("https://" + harbor_server,**ADMIN_CLIENT)
print "TestProjects.registry_id:", TestProjects.registry_id
#4. Create a new rule for this registry; #4. Create a new rule for this registry;
TestProjects.rule_id, rule_name = self.replication.create_replication_policy(dest_registry=swagger_client.Registry(id=int(TestProjects.registry_id)), **ADMIN_CLIENT) TestProjects.rule_id, rule_name = self.replication.create_replication_policy(dest_registry=swagger_client.Registry(id=int(TestProjects.registry_id)), **ADMIN_CLIENT)

View File

@ -14,21 +14,14 @@ from library.label import Label
class TestProjects(unittest.TestCase): class TestProjects(unittest.TestCase):
@classmethod @classmethod
def setUp(self): def setUp(self):
project = Project() self.project = Project()
self.project= project self.user = User()
self.repo = Repository()
user = User() self.label = Label()
self.user= user
repo = Repository()
self.repo= repo
label = Label()
self.label= label
@classmethod @classmethod
def tearDown(self): def tearDown(self):
print "Case completed" print("Case completed")
@unittest.skipIf(TEARDOWN == False, "Test data won't be erased.") @unittest.skipIf(TEARDOWN == False, "Test data won't be erased.")
def test_ClearData(self): def test_ClearData(self):

View File

@ -1,169 +1,74 @@
# coding: utf-8
"""
Harbor API
These APIs provide services for manipulating Harbor project.
OpenAPI spec version: 1.4.0
Generated by: https://github.com/swagger-api/swagger-codegen.git
"""
from __future__ import absolute_import from __future__ import absolute_import
import os
import sys
sys.path.append(os.environ["SWAGGER_CLIENT_PATH"])
import unittest import unittest
import testutils
import docker
import swagger_client from testutils import harbor_server
from swagger_client.models.project import Project from testutils import TEARDOWN
from swagger_client.models.project_req import ProjectReq from testutils import ADMIN_CLIENT
from swagger_client.models.project_metadata import ProjectMetadata from testutils import created_user, created_project
from swagger_client.models.project_member import ProjectMember from library.project import Project
from swagger_client.models.user_group import UserGroup from library.user import User
from swagger_client.models.configurations import Configurations from library.repository import Repository
from library.repository import push_image_to_project
from library.configurations import Configurations
from swagger_client.rest import ApiException
from pprint import pprint
#Testcase
#3-07-LDAP usergroup manage project group members
class TestAssignRoleToLdapGroup(unittest.TestCase): class TestAssignRoleToLdapGroup(unittest.TestCase):
harbor_host = os.environ["HARBOR_HOST"] @classmethod
"""AssignRoleToLdapGroup unit test stubs"""
product_api = testutils.GetProductApi("admin", "Harbor12345")
project_id = 0
docker_client = docker.from_env()
def setUp(self): def setUp(self):
#login with admin, create a project and assign role to ldap group self.conf= Configurations()
result = self.product_api.configurations_put(configurations=Configurations(ldap_filter="", ldap_group_attribute_name="cn", ldap_group_base_dn="ou=groups,dc=example,dc=com", ldap_group_search_filter="objectclass=groupOfNames", ldap_group_search_scope=2)) self.project = Project()
pprint(result) self.repo = Repository()
cfgs = self.product_api.configurations_get()
pprint(cfgs)
req = ProjectReq()
req.project_name = "ldap_group_test_prj"
req.metadata = ProjectMetadata(public="false")
result = self.product_api.projects_post(req)
pprint(result)
projs = self.product_api.projects_get(name="ldap_group_test_prj")
if projs.count>0 :
project = projs[0]
self.project_id = project.project_id
# asign role to project with dn
group_dn = "cn=harbor_admin,ou=groups,dc=example,dc=com"
projectmember = ProjectMember()
projectmember.role_id = 1
projectmember.member_group = UserGroup(ldap_group_dn=group_dn)
result = self.product_api.projects_project_id_members_post( project_id=self.project_id, project_member=projectmember )
pprint(result)
group_dn = "cn=harbor_dev,ou=groups,dc=example,dc=com"
projectmember = ProjectMember()
projectmember.role_id = 2
projectmember.member_group = UserGroup(ldap_group_dn=group_dn)
result = self.product_api.projects_project_id_members_post( project_id=self.project_id, project_member=projectmember )
pprint(result)
group_dn = "cn=harbor_guest,ou=groups,dc=example,dc=com"
projectmember = ProjectMember()
projectmember.role_id = 3
projectmember.member_group = UserGroup(ldap_group_dn=group_dn)
result = self.product_api.projects_project_id_members_post( project_id=self.project_id, project_member=projectmember )
pprint(result)
pass
@classmethod
def tearDown(self): def tearDown(self):
#delete images in project print("Case completed")
result = self.product_api.repositories_repo_name_delete(repo_name="ldap_group_test_prj/busybox")
pprint(result)
result = self.product_api.repositories_repo_name_delete(repo_name="ldap_group_test_prj/busyboxdev")
pprint(result)
if self.project_id > 0 :
self.product_api.projects_project_id_delete(self.project_id)
pass
def testAssignRoleToLdapGroup(self): def testAssignRoleToLdapGroup(self):
"""Test AssignRoleToLdapGroup""" """
admin_product_api = testutils.GetProductApi(username="admin_user", password="zhu88jie") Test case:
projects = admin_product_api.projects_get(name="ldap_group_test_prj") Assign Role To Ldap Group
self.assertTrue(projects.count > 1) Test step and expected result:
1. Set LDAP Auth configurations;
2. Create a new public project(PA) by Admin;
3. Add 3 member groups to project(PA);
4. Push image by each member role;
5. Verfify that admin_user and dev_user can push image, guest_user can not push image;
6. Verfify that admin_user, dev_user and guest_user can view logs, test user can not view logs.
7. Delete repository(RA) by user(UA);
8. Delete project(PA);
"""
url = ADMIN_CLIENT["endpoint"]
USER_ADMIN=dict(endpoint = url, username = "admin_user", password = "zhu88jie", repo = "hello-world")
USER_DEV=dict(endpoint = url, username = "dev_user", password = "zhu88jie", repo = "alpine")
USER_GUEST=dict(endpoint = url, username = "guest_user", password = "zhu88jie", repo = "busybox")
USER_TEST=dict(endpoint = url, username = "test", password = "123456")
self.conf.set_configurations_of_ldap(ldap_filter="", ldap_group_attribute_name="cn", ldap_group_base_dn="ou=groups,dc=example,dc=com",
ldap_group_search_filter="objectclass=groupOfNames", ldap_group_search_scope=2, **ADMIN_CLIENT)
with created_project(metadata={"public": "false"}) as (project_id, project_name):
self.project.add_project_members(project_id, member_role_id = 1, _ldap_group_dn = "cn=harbor_admin,ou=groups,dc=example,dc=com", **ADMIN_CLIENT)
self.project.add_project_members(project_id, member_role_id = 2, _ldap_group_dn = "cn=harbor_dev,ou=groups,dc=example,dc=com", **ADMIN_CLIENT)
self.project.add_project_members(project_id, member_role_id = 3, _ldap_group_dn = "cn=harbor_guest,ou=groups,dc=example,dc=com", **ADMIN_CLIENT)
projects = self.project.get_projects(dict(name=project_name), **USER_ADMIN)
self.assertTrue(len(projects) == 1)
self.assertEqual(1, projects[0].current_user_role_id) self.assertEqual(1, projects[0].current_user_role_id)
repo_name_admin, tag_name_admin = push_image_to_project(project_name, harbor_server, USER_ADMIN["username"], USER_ADMIN["password"], USER_ADMIN["repo"], "latest")
self.repo.image_should_exist(repo_name_admin, tag_name_admin, **USER_ADMIN)
repo_name_dev, tag_name_dev = push_image_to_project(project_name, harbor_server, USER_DEV["username"], USER_DEV["password"], USER_DEV["repo"], "latest")
self.repo.image_should_exist(repo_name_dev, tag_name_dev, **USER_DEV)
repo_name_guest, tag_name_guest = push_image_to_project(project_name, harbor_server, USER_GUEST["username"], USER_GUEST["password"], USER_GUEST["repo"], "latest")
self.repo.image_should_not_exist(repo_name_guest, tag_name_guest, **USER_GUEST)
dev_product_api = testutils.GetProductApi("dev_user", "zhu88jie")
projects = dev_product_api.projects_get(name="ldap_group_test_prj")
self.assertTrue(projects.count > 1)
self.assertEqual(2, projects[0].current_user_role_id)
guest_product_api = testutils.GetProductApi("guest_user", "zhu88jie") self.assertTrue(self.project.query_user_logs(project_id, **USER_ADMIN)>0, "admin user can see logs")
projects = guest_product_api.projects_get(name="ldap_group_test_prj") self.assertTrue(self.project.query_user_logs(project_id, **USER_DEV)>0, "dev user can see logs")
self.assertTrue(projects.count > 1) self.assertTrue(self.project.query_user_logs(project_id, **USER_GUEST)>0, "guest user can see logs")
self.assertEqual(3, projects[0].current_user_role_id) self.assertTrue(self.project.query_user_logs(project_id, status_code=403, **USER_TEST)==0, "test user can not see any logs")
self.dockerCmdLoginAdmin(username="admin_user", password="zhu88jie") self.repo.delete_repoitory(repo_name_admin, **USER_ADMIN)
self.dockerCmdLoginDev(username="dev_user", password="zhu88jie") self.repo.delete_repoitory(repo_name_dev, **USER_ADMIN)
self.dockerCmdLoginGuest(username="guest_user", password="zhu88jie")
self.assertTrue(self.queryUserLogs(username="admin_user", password="zhu88jie")>0, "admin user can see logs")
self.assertTrue(self.queryUserLogs(username="dev_user", password="zhu88jie")>0, "dev user can see logs")
self.assertTrue(self.queryUserLogs(username="guest_user", password="zhu88jie")>0, "guest user can see logs")
self.assertTrue(self.queryUserLogs(username="test", password="123456")==0, "test user can not see any logs")
pass
# admin user can push, pull images
def dockerCmdLoginAdmin(self, username, password):
pprint(self.docker_client.info())
self.docker_client.login(username=username, password=password, registry=self.harbor_host)
self.docker_client.images.pull("busybox:latest")
image = self.docker_client.images.get("busybox:latest")
image.tag(repository=self.harbor_host+"/ldap_group_test_prj/busybox", tag="latest")
output = self.docker_client.images.push(repository=self.harbor_host+"/ldap_group_test_prj/busybox", tag="latest")
if output.find("error")>0 :
self.fail("Should not fail to push image for admin_user")
self.docker_client.images.pull(repository=self.harbor_host+"/ldap_group_test_prj/busybox", tag="latest")
pass
# dev user can push, pull images
def dockerCmdLoginDev(self, username, password, harbor_server=harbor_host):
self.docker_client.login(username=username, password=password, registry=self.harbor_host)
self.docker_client.images.pull("busybox:latest")
image = self.docker_client.images.get("busybox:latest")
image.tag(repository=self.harbor_host+"/ldap_group_test_prj/busyboxdev", tag="latest")
output = self.docker_client.images.push(repository=self.harbor_host+"/ldap_group_test_prj/busyboxdev", tag="latest")
if output.find("error") >0 :
self.fail("Should not fail to push images for dev_user")
pass
# guest user can pull images
def dockerCmdLoginGuest(self, username, password, harbor_server=harbor_host):
self.docker_client.login(username=username, password=password, registry=self.harbor_host)
self.docker_client.images.pull("busybox:latest")
image = self.docker_client.images.get("busybox:latest")
image.tag(repository=self.harbor_host+"/ldap_group_test_prj/busyboxguest", tag="latest")
output = self.docker_client.images.push(repository=self.harbor_host+"1/ldap_group_test_prj/busyboxguest", tag="latest")
if output.find("error")<0 :
self.fail("Should failed to push image for guest user")
self.docker_client.images.pull(repository=self.harbor_host+"/ldap_group_test_prj/busybox", tag="latest")
pass
# check can see his log in current project
def queryUserLogs(self, username, password, harbor_host=harbor_host):
client_product_api = testutils.GetProductApi(username=username, password=password)
logs = client_product_api.logs_get(repository="ldap_group_test_prj", username=username)
if logs == None:
return 0
else:
return logs.count
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()

View File

@ -10,15 +10,12 @@ from library.configurations import Configurations
class TestProjects(unittest.TestCase): class TestProjects(unittest.TestCase):
@classmethod @classmethod
def setUp(self): def setUp(self):
conf = Configurations() self.conf= Configurations()
self.conf= conf self.user = User()
user = User()
self.user= user
@classmethod @classmethod
def tearDown(self): def tearDown(self):
print "Case completed" print("Case completed")
@unittest.skipIf(TEARDOWN == False, "Test data won't be erased.") @unittest.skipIf(TEARDOWN == False, "Test data won't be erased.")
def test_ClearData(self): def test_ClearData(self):

View File

@ -3,11 +3,10 @@ from __future__ import absolute_import
import unittest import unittest
from library.base import _assert_status_code
from testutils import ADMIN_CLIENT from testutils import ADMIN_CLIENT
from testutils import harbor_server from testutils import harbor_server
from testutils import TEARDOWN from testutils import TEARDOWN
from library.base import _assert_status_code
from library.project import Project from library.project import Project
from library.user import User from library.user import User
from library.repository import Repository from library.repository import Repository
@ -16,19 +15,14 @@ from library.repository import push_image_to_project
class TestProjects(unittest.TestCase): class TestProjects(unittest.TestCase):
@classmethod @classmethod
def setUpClass(self): def setUpClass(self):
project = Project() self.project= Project()
self.project= project self.user= User()
self.repo= Repository()
user = User()
self.user= user
repo = Repository()
self.repo= repo
@classmethod @classmethod
def tearDownClass(self): def tearDownClass(self):
print "Case completed" print("Case completed")
@unittest.skipIf(TEARDOWN == False, "Test data won't be erased.") @unittest.skipIf(TEARDOWN == False, "Test data won't be erased.")
def test_ClearData(self): def test_ClearData(self):

View File

@ -10,22 +10,16 @@ from library.configurations import Configurations
class TestProjects(unittest.TestCase): class TestProjects(unittest.TestCase):
@classmethod @classmethod
def setUp(self): def setUp(self):
conf = Configurations() self.conf= Configurations()
self.conf= conf self.project= Project()
self.user= User()
project = Project()
self.project= project
user = User()
self.user= user
@classmethod @classmethod
def tearDown(self): def tearDown(self):
print "Case completed" print("Case completed")
@unittest.skipIf(TEARDOWN == False, "Test data won't be erased.") @unittest.skipIf(TEARDOWN == False, "Test data won't be erased.")
def test_ClearData(self): def test_ClearData(self):
print "Clear trace"
#1. Delete project(PA); #1. Delete project(PA);
self.project.delete_project(TestProjects.project_edit_project_creation_id, **TestProjects.USER_edit_project_creation_CLIENT) self.project.delete_project(TestProjects.project_edit_project_creation_id, **TestProjects.USER_edit_project_creation_CLIENT)

View File

@ -29,7 +29,7 @@ class TestProjects(unittest.TestCase):
@classmethod @classmethod
def tearDown(self): def tearDown(self):
print "Case completed" print("Case completed")
@unittest.skipIf(TEARDOWN == False, "Test data won't be erased.") @unittest.skipIf(TEARDOWN == False, "Test data won't be erased.")
def test_ClearData(self): def test_ClearData(self):

View File

@ -11,18 +11,13 @@ from library.chart import Chart
class TestProjects(unittest.TestCase): class TestProjects(unittest.TestCase):
@classmethod @classmethod
def setUp(self): def setUp(self):
chart = Chart() self.chart= Chart()
self.chart= chart self.project= Project()
self.user= User()
project = Project()
self.project= project
user = User()
self.user= user
@classmethod @classmethod
def tearDown(self): def tearDown(self):
print "Case completed" print("Case completed")
@unittest.skipIf(TEARDOWN == False, "Test data won't be erased.") @unittest.skipIf(TEARDOWN == False, "Test data won't be erased.")
def test_ClearData(self): def test_ClearData(self):

View File

@ -13,18 +13,13 @@ from library.repository import Repository
class TestProjects(unittest.TestCase): class TestProjects(unittest.TestCase):
@classmethod @classmethod
def setUp(self): def setUp(self):
project = Project() self.project = Project()
self.project= project self.user = User()
self.repo = Repository()
user = User()
self.user= user
repo = Repository()
self.repo= repo
@classmethod @classmethod
def tearDown(self): def tearDown(self):
print "Case completed" print("Case completed")
@unittest.skipIf(TEARDOWN == False, "Test data won't be erased.") @unittest.skipIf(TEARDOWN == False, "Test data won't be erased.")
def test_ClearData(self): def test_ClearData(self):

View File

@ -15,18 +15,13 @@ from library.repository import pull_harbor_image
class TestProjects(unittest.TestCase): class TestProjects(unittest.TestCase):
@classmethod @classmethod
def setUp(self): def setUp(self):
project = Project() self.project= Project()
self.project= project self.user= User()
self.repo= Repository()
user = User()
self.user= user
repo = Repository()
self.repo= repo
@classmethod @classmethod
def tearDown(self): def tearDown(self):
print "Case completed" print("Case completed")
@unittest.skipIf(TEARDOWN == False, "Test data won't be erased.") @unittest.skipIf(TEARDOWN == False, "Test data won't be erased.")
def test_ClearData(self): def test_ClearData(self):

View File

@ -25,16 +25,8 @@ class TestProjects(unittest.TestCase):
self.system = System() self.system = System()
@classmethod @classmethod
def tearDown(self): def tearDown(cls):
print "Case completed" print("Case completed")
@unittest.skipIf(TEARDOWN == False, "Test data won't be erased.")
def test_ClearData(self):
#1. Delete project(PA);
self.project.delete_project(TestProjects.project_test_quota_id, **ADMIN_CLIENT)
#2. Delete user(UA);
self.user.delete_user(TestProjects.user_test_quota_id, **ADMIN_CLIENT)
def testProjectQuota(self): def testProjectQuota(self):
""" """

View File

@ -28,7 +28,7 @@ class TestProjects(unittest.TestCase):
@classmethod @classmethod
def tearDownClass(self): def tearDownClass(self):
print "Case completed" print("Case completed")
@unittest.skipIf(TEARDOWN == False, "Test data won't be erased.") @unittest.skipIf(TEARDOWN == False, "Test data won't be erased.")
def test_ClearData(self): def test_ClearData(self):

View File

@ -95,7 +95,7 @@ class TestProjects(unittest.TestCase):
@classmethod @classmethod
def tearDownClass(self): def tearDownClass(self):
print "Case completed" print("Case completed")
@unittest.skipIf(TEARDOWN == False, "Test data won't be erased.") @unittest.skipIf(TEARDOWN == False, "Test data won't be erased.")
def test_ClearData(self): def test_ClearData(self):

View File

@ -15,18 +15,13 @@ from library.base import _assert_status_code
class TestProjects(unittest.TestCase): class TestProjects(unittest.TestCase):
@classmethod @classmethod
def setUp(self): def setUp(self):
project = Project() self.project = Project()
self.project= project self.user = User()
self.repo = Repository()
user = User()
self.user= user
repo = Repository()
self.repo= repo
@classmethod @classmethod
def tearDown(self): def tearDown(self):
print "Case completed" print("Case completed")
@unittest.skipIf(TEARDOWN == False, "Test data won't be erased.") @unittest.skipIf(TEARDOWN == False, "Test data won't be erased.")
def test_ClearData(self): def test_ClearData(self):
@ -79,57 +74,55 @@ class TestProjects(unittest.TestCase):
image_robot_account = "mariadb" image_robot_account = "mariadb"
tag = "latest" tag = "latest"
print "#1. Create user(UA);" # "#1. Create user(UA);"
TestProjects.user_ra_id, user_ra_name = self.user.create_user(user_password = user_ra_password, **ADMIN_CLIENT) TestProjects.user_ra_id, user_ra_name = self.user.create_user(user_password = user_ra_password, **ADMIN_CLIENT)
TestProjects.USER_RA_CLIENT=dict(endpoint = url, username = user_ra_name, password = user_ra_password) TestProjects.USER_RA_CLIENT=dict(endpoint = url, username = user_ra_name, password = user_ra_password)
print "#2. Create private project(PA), private project(PB) and public project(PC) by user(UA);" # "#2. Create private project(PA), private project(PB) and public project(PC) by user(UA);"
TestProjects.project_ra_id_a, project_ra_name_a = self.project.create_project(metadata = {"public": "false"}, **TestProjects.USER_RA_CLIENT) TestProjects.project_ra_id_a, project_ra_name_a = self.project.create_project(metadata = {"public": "false"}, **TestProjects.USER_RA_CLIENT)
TestProjects.project_ra_id_b, project_ra_name_b = self.project.create_project(metadata = {"public": "false"}, **TestProjects.USER_RA_CLIENT) TestProjects.project_ra_id_b, project_ra_name_b = self.project.create_project(metadata = {"public": "false"}, **TestProjects.USER_RA_CLIENT)
TestProjects.project_ra_id_c, project_ra_name_c = self.project.create_project(metadata = {"public": "true"}, **TestProjects.USER_RA_CLIENT) TestProjects.project_ra_id_c, project_ra_name_c = self.project.create_project(metadata = {"public": "true"}, **TestProjects.USER_RA_CLIENT)
print "#3. Push image(ImagePA) to project(PA), image(ImagePB) to project(PB) and image(ImagePC) to project(PC) by user(UA);" # "#3. Push image(ImagePA) to project(PA), image(ImagePB) to project(PB) and image(ImagePC) to project(PC) by user(UA);"
TestProjects.repo_name_in_project_a, tag_a = push_image_to_project(project_ra_name_a, harbor_server, user_ra_name, user_ra_password, image_project_a, tag) TestProjects.repo_name_in_project_a, tag_a = push_image_to_project(project_ra_name_a, harbor_server, user_ra_name, user_ra_password, image_project_a, tag)
TestProjects.repo_name_in_project_b, tag_b = push_image_to_project(project_ra_name_b, harbor_server, user_ra_name, user_ra_password, image_project_b, tag) TestProjects.repo_name_in_project_b, tag_b = push_image_to_project(project_ra_name_b, harbor_server, user_ra_name, user_ra_password, image_project_b, tag)
TestProjects.repo_name_in_project_c, tag_c = push_image_to_project(project_ra_name_c, harbor_server, user_ra_name, user_ra_password, image_project_c, tag) TestProjects.repo_name_in_project_c, tag_c = push_image_to_project(project_ra_name_c, harbor_server, user_ra_name, user_ra_password, image_project_c, tag)
print "#4. Create a new robot account(RA) with pull and push privilige in project(PA) by user(UA);" # "#4. Create a new robot account(RA) with pull and push privilige in project(PA) by user(UA);"
robot_id, robot_account = self.project.add_project_robot_account(TestProjects.project_ra_id_a, project_ra_name_a, **TestProjects.USER_RA_CLIENT) robot_id, robot_account = self.project.add_project_robot_account(TestProjects.project_ra_id_a, project_ra_name_a, **TestProjects.USER_RA_CLIENT)
print robot_account.name
print robot_account.token
print "#5. Check robot account info, it should has both pull and push priviliges;" # "#5. Check robot account info, it should has both pull and push priviliges;"
data = self.project.get_project_robot_account_by_id(TestProjects.project_ra_id_a, robot_id, **TestProjects.USER_RA_CLIENT) data = self.project.get_project_robot_account_by_id(TestProjects.project_ra_id_a, robot_id, **TestProjects.USER_RA_CLIENT)
_assert_status_code(robot_account.name, data.name) _assert_status_code(robot_account.name, data.name)
print "#6. Pull image(ImagePA) from project(PA) by robot account(RA), it must be successful;" # "#6. Pull image(ImagePA) from project(PA) by robot account(RA), it must be successful;"
pull_harbor_image(harbor_server, robot_account.name, robot_account.token, TestProjects.repo_name_in_project_a, tag_a) pull_harbor_image(harbor_server, robot_account.name, robot_account.token, TestProjects.repo_name_in_project_a, tag_a)
print "#7. Push image(ImageRA) to project(PA) by robot account(RA), it must be successful;" # "#7. Push image(ImageRA) to project(PA) by robot account(RA), it must be successful;"
TestProjects.repo_name_pa, _ = push_image_to_project(project_ra_name_a, harbor_server, robot_account.name, robot_account.token, image_robot_account, tag) TestProjects.repo_name_pa, _ = push_image_to_project(project_ra_name_a, harbor_server, robot_account.name, robot_account.token, image_robot_account, tag)
print "#8. Push image(ImageRA) to project(PB) by robot account(RA), it must be not successful;" # "#8. Push image(ImageRA) to project(PB) by robot account(RA), it must be not successful;"
push_image_to_project(project_ra_name_b, harbor_server, robot_account.name, robot_account.token, image_robot_account, tag, expected_error_message = "denied: requested access to the resource is denied") push_image_to_project(project_ra_name_b, harbor_server, robot_account.name, robot_account.token, image_robot_account, tag, expected_error_message = "denied: requested access to the resource is denied")
print "#9. Pull image(ImagePB) from project(PB) by robot account(RA), it must be not successful;" # "#9. Pull image(ImagePB) from project(PB) by robot account(RA), it must be not successful;"
pull_harbor_image(harbor_server, robot_account.name, robot_account.token, TestProjects.repo_name_in_project_b, tag_b, expected_error_message = r"pull access denied for " + harbor_server + "/" + TestProjects.repo_name_in_project_b) pull_harbor_image(harbor_server, robot_account.name, robot_account.token, TestProjects.repo_name_in_project_b, tag_b, expected_error_message = r"pull access denied for " + harbor_server + "/" + TestProjects.repo_name_in_project_b)
print "#10. Pull image from project(PC), it must be successful;" # "#10. Pull image from project(PC), it must be successful;"
pull_harbor_image(harbor_server, robot_account.name, robot_account.token, TestProjects.repo_name_in_project_c, tag_c) pull_harbor_image(harbor_server, robot_account.name, robot_account.token, TestProjects.repo_name_in_project_c, tag_c)
print "#11. Push image(ImageRA) to project(PC) by robot account(RA), it must be not successful;" # "#11. Push image(ImageRA) to project(PC) by robot account(RA), it must be not successful;"
push_image_to_project(project_ra_name_c, harbor_server, robot_account.name, robot_account.token, image_robot_account, tag, expected_error_message = "denied: requested access to the resource is denied") push_image_to_project(project_ra_name_c, harbor_server, robot_account.name, robot_account.token, image_robot_account, tag, expected_error_message = "denied: requested access to the resource is denied")
print "#12. Update action property of robot account(RA);" # "#12. Update action property of robot account(RA);"
self.project.disable_project_robot_account(TestProjects.project_ra_id_a, robot_id, True, **TestProjects.USER_RA_CLIENT) self.project.disable_project_robot_account(TestProjects.project_ra_id_a, robot_id, True, **TestProjects.USER_RA_CLIENT)
print "#13. Pull image(ImagePA) from project(PA) by robot account(RA), it must be not successful;" # "#13. Pull image(ImagePA) from project(PA) by robot account(RA), it must be not successful;"
pull_harbor_image(harbor_server, robot_account.name, robot_account.token, TestProjects.repo_name_in_project_a, tag_a, expected_login_error_message = "401 Client Error: Unauthorized") pull_harbor_image(harbor_server, robot_account.name, robot_account.token, TestProjects.repo_name_in_project_a, tag_a, expected_login_error_message = "401 Client Error: Unauthorized")
print "#14. Push image(ImageRA) to project(PA) by robot account(RA), it must be not successful;" # "#14. Push image(ImageRA) to project(PA) by robot account(RA), it must be not successful;"
push_image_to_project(project_ra_name_a, harbor_server, robot_account.name, robot_account.token, image_robot_account, tag, expected_login_error_message = "401 Client Error: Unauthorized") push_image_to_project(project_ra_name_a, harbor_server, robot_account.name, robot_account.token, image_robot_account, tag, expected_login_error_message = "401 Client Error: Unauthorized")
print "#15. Delete robot account(RA), it must be not successful." # "#15. Delete robot account(RA), it must be not successful."
self.project.delete_project_robot_account(TestProjects.project_ra_id_a, robot_id, **TestProjects.USER_RA_CLIENT) self.project.delete_project_robot_account(TestProjects.project_ra_id_a, robot_id, **TestProjects.USER_RA_CLIENT)
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -27,7 +27,7 @@ class TestProjects(unittest.TestCase):
@classmethod @classmethod
def tearDown(self): def tearDown(self):
print "Case completed" print("Case completed")
@unittest.skipIf(TEARDOWN == False, "Test data won't be erased.") @unittest.skipIf(TEARDOWN == False, "Test data won't be erased.")
def test_ClearData(self): def test_ClearData(self):

View File

@ -23,7 +23,7 @@ class TestProjects(unittest.TestCase):
@classmethod @classmethod
def tearDown(self): def tearDown(self):
print "Case completed" print("Case completed")
@unittest.skipIf(TEARDOWN == True, "Test data won't be erased.") @unittest.skipIf(TEARDOWN == True, "Test data won't be erased.")
def test_ClearData(self): def test_ClearData(self):

View File

@ -24,7 +24,7 @@ class TestProjects(unittest.TestCase):
@classmethod @classmethod
def tearDown(self): def tearDown(self):
print "Case completed" print("Case completed")
@unittest.skipIf(TEARDOWN == True, "Test data won't be erased.") @unittest.skipIf(TEARDOWN == True, "Test data won't be erased.")
def test_ClearData(self): def test_ClearData(self):

View File

@ -29,7 +29,7 @@ class TestProjects(unittest.TestCase):
@classmethod @classmethod
def tearDown(self): def tearDown(self):
self.test_result.get_final_result() self.test_result.get_final_result()
print "Case completed" print("Case completed")
@unittest.skipIf(TEARDOWN == False, "Test data won't be erased.") @unittest.skipIf(TEARDOWN == False, "Test data won't be erased.")
def test_ClearData(self): def test_ClearData(self):

View File

@ -3,6 +3,12 @@ import os
import sys import sys
sys.path.insert(0, os.environ["SWAGGER_CLIENT_PATH"]) sys.path.insert(0, os.environ["SWAGGER_CLIENT_PATH"])
path=os.getcwd() + "/library"
sys.path.insert(0, path)
path=os.getcwd() + "/tests/apitests/python/library"
sys.path.insert(0, path)
from swagger_client.rest import ApiException from swagger_client.rest import ApiException
import swagger_client.models import swagger_client.models
from pprint import pprint from pprint import pprint
@ -14,7 +20,8 @@ harbor_server = os.environ["HARBOR_HOST"]
#CLIENT=dict(endpoint="https://"+harbor_server+"/api") #CLIENT=dict(endpoint="https://"+harbor_server+"/api")
ADMIN_CLIENT=dict(endpoint = os.environ.get("HARBOR_HOST_SCHEMA", "https")+ "://"+harbor_server+"/api", username = admin_user, password = admin_pwd) ADMIN_CLIENT=dict(endpoint = os.environ.get("HARBOR_HOST_SCHEMA", "https")+ "://"+harbor_server+"/api", username = admin_user, password = admin_pwd)
USER_ROLE=dict(admin=0,normal=1) USER_ROLE=dict(admin=0,normal=1)
TEARDOWN = True TEARDOWN = os.environ.get('TEARDOWN', 'true').lower() in ('true', 'yes')
notary_url = os.environ.get('NOTARY_URL', 'https://'+harbor_server+':4443')
def GetProductApi(username, password, harbor_server= os.environ["HARBOR_HOST"]): def GetProductApi(username, password, harbor_server= os.environ["HARBOR_HOST"]):
@ -37,5 +44,36 @@ class TestResult(object):
def get_final_result(self): def get_final_result(self):
if self.num_errors > 0: if self.num_errors > 0:
for each_err_msg in self.error_message: for each_err_msg in self.error_message:
print "Error message:", each_err_msg print("Error message:", each_err_msg)
raise Exception(r"Test case failed with {} errors.".format(self.num_errors)) raise Exception(r"Test case failed with {} errors.".format(self.num_errors))
from contextlib import contextmanager
@contextmanager
def created_user(password):
from library.user import User
api = User()
user_id, user_name = api.create_user(user_password=password, **ADMIN_CLIENT)
try:
yield (user_id, user_name)
finally:
if TEARDOWN:
api.delete_user(user_id, **ADMIN_CLIENT)
@contextmanager
def created_project(name=None, metadata=None, user_id=None, member_role_id=None):
from library.project import Project
api = Project()
project_id, project_name = api.create_project(name=name, metadata=metadata, **ADMIN_CLIENT)
if user_id:
api.add_project_members(project_id, user_id=user_id, member_role_id=member_role_id, **ADMIN_CLIENT)
try:
yield (project_id, project_name)
finally:
if TEARDOWN:
api.delete_project(project_id, **ADMIN_CLIENT)

View File

@ -2,7 +2,6 @@
Documentation Harbor BATs Documentation Harbor BATs
Resource ../../resources/APITest-Util.robot Resource ../../resources/APITest-Util.robot
Resource ../../resources/Docker-Util.robot Resource ../../resources/Docker-Util.robot
Library ../../apitests/python/library/Harbor.py ${SERVER_CONFIG}
Library OperatingSystem Library OperatingSystem
Library String Library String
Library Collections Library Collections

View File

@ -1,4 +1,4 @@
#!/usr/local/bin/expect #!/usr/bin/env expect
set HOST [lindex $argv 0] set HOST [lindex $argv 0]
set PROJECT [lindex $argv 1] set PROJECT [lindex $argv 1]

View File

@ -14,7 +14,6 @@ args = parser.parse_args()
url = "https://"+args.endpoint+"/api/" url = "https://"+args.endpoint+"/api/"
endpoint_url = "https://"+args.endpoint endpoint_url = "https://"+args.endpoint
print url
with open("feature_map.json") as f: with open("feature_map.json") as f:
feature_map = json.load(f) feature_map = json.load(f)
@ -108,7 +107,7 @@ class HarborAPI:
"url":""+endpointurl+"" "url":""+endpointurl+""
} }
body=dict(body=payload) body=dict(body=payload)
print body print(body)
request(url+"/registries", 'post', **body) request(url+"/registries", 'post', **body)
else: else:
raise Exception(r"Error: Feature {} has no branch {}.".format(sys._getframe().f_code.co_name, branch)) raise Exception(r"Error: Feature {} has no branch {}.".format(sys._getframe().f_code.co_name, branch))
@ -131,8 +130,8 @@ class HarborAPI:
else: else:
registry = r'"dest_registry": { "id": '+str(targetid)+r'},' registry = r'"dest_registry": { "id": '+str(targetid)+r'},'
body=dict(body=json.loads(r'{"name":"'+replicationrule["rulename"].encode('utf-8')+r'","dest_namespace":"'+replicationrule["dest_namespace"].encode('utf-8')+r'","deletion": '+str(replicationrule["deletion"]).lower()+r',"enabled": '+str(replicationrule["enabled"]).lower()+r',"override": '+str(replicationrule["override"]).lower()+r',"description": "string",'+ registry + r'"trigger":{"type": "'+replicationrule["trigger_type"]+r'", "trigger_settings":{"cron":"'+replicationrule["cron"]+r'"}},"filters":[ {"type":"name","value":"'+replicationrule["name_filters"]+r'"},{"type":"tag","value":"'+replicationrule["tag_filters"]+r'"}]}')) body=dict(body=json.loads(r'{"name":"'+replicationrule["rulename"]+r'","dest_namespace":"'+replicationrule["dest_namespace"] + r'","deletion": '+str(replicationrule["deletion"]).lower()+r',"enabled": '+str(replicationrule["enabled"]).lower()+r',"override": '+str(replicationrule["override"]).lower()+r',"description": "string",'+ registry + r'"trigger":{"type": "'+replicationrule["trigger_type"]+r'", "trigger_settings":{"cron":"'+replicationrule["cron"]+r'"}},"filters":[ {"type":"name","value":"'+replicationrule["name_filters"]+r'"},{"type":"tag","value":"'+replicationrule["tag_filters"]+r'"}]}'))
print body print(body)
request(url+"replication/policies", 'post', **body) request(url+"replication/policies", 'post', **body)
else: else:
raise Exception(r"Error: Feature {} has no branch {}.".format(sys._getframe().f_code.co_name, branch)) raise Exception(r"Error: Feature {} has no branch {}.".format(sys._getframe().f_code.co_name, branch))
@ -151,7 +150,7 @@ class HarborAPI:
} }
} }
body=dict(body=payload) body=dict(body=payload)
print body print(body)
request(url+"projects/"+projectid+"", 'put', **body) request(url+"projects/"+projectid+"", 'put', **body)
@get_feature_branch @get_feature_branch
@ -162,7 +161,7 @@ class HarborAPI:
cve_id_str = cve_id_str + '{"cve_id":"' +cve_id["id"] + '"}' cve_id_str = cve_id_str + '{"cve_id":"' +cve_id["id"] + '"}'
if index != len(cve_id_list["cve"]) - 1: if index != len(cve_id_list["cve"]) - 1:
cve_id_str = cve_id_str + "," cve_id_str = cve_id_str + ","
body=dict(body=json.loads(r'{"items":['+cve_id_str.encode('utf-8')+r'],"expires_at":'+cve_id_list["expires_at"]+'}')) body=dict(body=json.loads(r'{"items":['+cve_id_str + r'],"expires_at":' + cve_id_list["expires_at"] + '}'))
request(url+"system/CVEWhitelist", 'put', **body) request(url+"system/CVEWhitelist", 'put', **body)
else: else:
raise Exception(r"Error: Feature {} has no branch {}.".format(sys._getframe().f_code.co_name, branch)) raise Exception(r"Error: Feature {} has no branch {}.".format(sys._getframe().f_code.co_name, branch))
@ -177,12 +176,11 @@ class HarborAPI:
cve_id_str = cve_id_str + '{"cve_id":"' +cve_id["id"] + '"}' cve_id_str = cve_id_str + '{"cve_id":"' +cve_id["id"] + '"}'
if index != len(cve_id_list["cve"]) - 1: if index != len(cve_id_list["cve"]) - 1:
cve_id_str = cve_id_str + "," cve_id_str = cve_id_str + ","
print cve_id_str
if reuse_sys_cve_whitelist == "true": if reuse_sys_cve_whitelist == "true":
payload = r'{"metadata":{"reuse_sys_cve_whitelist":"true"}}' payload = r'{"metadata":{"reuse_sys_cve_whitelist":"true"}}'
else: else:
payload = r'{"metadata":{"reuse_sys_cve_whitelist":"false"},"cve_whitelist":{"project_id":'+projectid+',"items":['+cve_id_str.encode('utf-8')+r'],"expires_at":'+cve_id_list["expires_at"]+'}}' payload = r'{"metadata":{"reuse_sys_cve_whitelist":"false"},"cve_whitelist":{"project_id":'+projectid+',"items":['+cve_id_str + r'],"expires_at":'+cve_id_list["expires_at"]+'}}'
print payload print(payload)
body=dict(body=json.loads(payload)) body=dict(body=json.loads(payload))
request(url+"projects/"+projectid+"", 'put', **body) request(url+"projects/"+projectid+"", 'put', **body)
else: else:
@ -248,7 +246,7 @@ class HarborAPI:
raise Exception(r"Error: Robot account count {} is not legal!".format(len(robot_account["access"]))) raise Exception(r"Error: Robot account count {} is not legal!".format(len(robot_account["access"])))
else: else:
raise Exception(r"Error: Feature {} has no branch {}.".format(sys._getframe().f_code.co_name, branch)) raise Exception(r"Error: Feature {} has no branch {}.".format(sys._getframe().f_code.co_name, branch))
print payload print(payload)
body=dict(body=payload) body=dict(body=payload)
request(url+"projects/"+projectid+"/robots", 'post', **body) request(url+"projects/"+projectid+"/robots", 'post', **body)
@ -301,10 +299,10 @@ class HarborAPI:
if not os.path.exists(ca_path): if not os.path.exists(ca_path):
try: try:
os.makedirs(ca_path) os.makedirs(ca_path)
except Exception, e: except Exception as e:
print str(e) print(str(e))
pass pass
open(target, 'wb').write(ca_content) open(target, 'wb').write(ca_content.encode('utf-8'))
def request(url, method, user = None, userp = None, **kwargs): def request(url, method, user = None, userp = None, **kwargs):