Merge pull request #12686 from danfengliu/upgrade-python-for-1.10

Upgrade python and robot framework
This commit is contained in:
danfengliu 2020-09-10 17:38:52 +08:00 committed by GitHub
commit a55c3ae3e7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
35 changed files with 260 additions and 336 deletions

View File

@ -451,10 +451,10 @@ down:
swagger_client:
@echo "Generate swagger client"
wget -q https://repo1.maven.org/maven2/io/swagger/swagger-codegen-cli/2.3.1/swagger-codegen-cli-2.3.1.jar -O swagger-codegen-cli.jar
wget https://repo1.maven.org/maven2/org/openapitools/openapi-generator-cli/4.3.1/openapi-generator-cli-4.3.1.jar -O openapi-generator-cli.jar
rm -rf harborclient
mkdir harborclient
java -jar swagger-codegen-cli.jar generate -i api/harbor/swagger.yaml -l python -o harborclient
java -jar openapi-generator-cli.jar generate -i api/harbor/swagger.yaml -g python -o harborclient --package-name swagger_client
cd harborclient; python ./setup.py install
pip install docker -q
pip freeze

View File

@ -1,6 +1,4 @@
# -*- coding: utf-8 -*-
import site
reload(site)
import project
import label
import registry

View File

@ -14,7 +14,6 @@ class Chart(base.Base):
def chart_should_exist(self, repository, chart_name, **kwargs):
charts_data = self.get_charts(repository, **kwargs)
print "charts_data:", charts_data
for chart in charts_data:
if chart.name == chart_name:
return True

View File

@ -11,6 +11,16 @@ def set_configurations(client, expect_status_code = 200, expect_response_body =
conf.project_creation_restriction = config.get("project_creation_restriction")
if "token_expiration" in config:
conf.token_expiration = config.get("token_expiration")
if "ldap_filter" in config:
conf.ldap_filter = config.get("ldap_filter")
if "ldap_group_attribute_name" in config:
conf.ldap_group_attribute_name = config.get("ldap_group_attribute_name")
if "ldap_group_base_dn" in config:
conf.ldap_group_base_dn = config.get("ldap_group_base_dn")
if "ldap_group_search_filter" in config:
conf.ldap_group_search_filter = config.get("ldap_group_search_filter")
if "ldap_group_search_scope" in config:
conf.ldap_group_search_scope = config.get("ldap_group_search_scope")
try:
_, status_code, _ = client.configurations_put_with_http_info(conf)
@ -56,3 +66,11 @@ class Configurations(base.Base):
config=dict(token_expiration=token_expiration)
set_configurations(client, expect_status_code = expect_status_code, **config)
def set_configurations_of_ldap(self, ldap_filter=None, ldap_group_attribute_name=None,
ldap_group_base_dn=None, ldap_group_search_filter=None, ldap_group_search_scope=None, expect_status_code = 200, **kwargs):
client = self._get_client(**kwargs)
config=dict(ldap_filter=ldap_filter, ldap_group_attribute_name=ldap_group_attribute_name,
ldap_group_base_dn=ldap_group_base_dn, ldap_group_search_filter=ldap_group_search_filter, ldap_group_search_scope=ldap_group_search_scope)
set_configurations(client, expect_status_code = expect_status_code, **config)

View File

@ -19,9 +19,9 @@ class DockerAPI(object):
expected_error_message = None
try:
self.DCLIENT.login(registry = registry, username=username, password=password)
except docker.errors.APIError, err:
except docker.errors.APIError as err:
if expected_error_message is not None:
print "docker login error:", str(err)
print( "docker login error:", str(err))
if str(err).lower().find(expected_error_message.lower()) < 0:
raise Exception(r"Docker login: Return message {} is not as expected {}".format(str(err), expected_error_message))
else:
@ -37,11 +37,12 @@ class DockerAPI(object):
caught_err = False
ret = ""
try:
ret = base._get_string_from_unicode(self.DCLIENT.pull(r'{}:{}'.format(image, _tag)))
except Exception, err:
self.DCLIENT.pull(r'{}:{}'.format(image, _tag))
return ret
except Exception as err:
caught_err = True
if expected_error_message is not None:
print "docker image pull error:", str(err)
print( "docker image pull error:", str(err))
if str(err).lower().find(expected_error_message.lower()) < 0:
raise Exception(r"Pull image: Return message {} is not as expected {}".format(str(err), expected_error_message))
else:
@ -49,7 +50,7 @@ class DockerAPI(object):
if caught_err == False:
if expected_error_message is not None:
if str(ret).lower().find(expected_error_message.lower()) < 0:
raise Exception(r" Failed to catch error [{}] when pull image {}".format (expected_error_message, image))
raise Exception(r" Failed to catch error [{}] when pull image {}, return message: {}".format (expected_error_message, image, str(ret)))
else:
if str(ret).lower().find("error".lower()) >= 0:
raise Exception(r" It's was not suppose to catch error when pull image {}, return message is [{}]".format (image, ret))
@ -61,7 +62,7 @@ class DockerAPI(object):
try:
self.DCLIENT.tag(image, harbor_registry, _tag, force=True)
return harbor_registry, _tag
except docker.errors.APIError, e:
except docker.errors.APIError as e:
raise Exception(r" Docker tag image {} failed, error is [{}]".format (image, e.message))
def docker_image_push(self, harbor_registry, tag, expected_error_message = None):
@ -70,11 +71,12 @@ class DockerAPI(object):
if expected_error_message is "":
expected_error_message = None
try:
ret = base._get_string_from_unicode(self.DCLIENT.push(harbor_registry, tag, stream=True))
except Exception, err:
self.DCLIENT.push(harbor_registry, tag)
return ret
except Exception as err:
caught_err = True
if expected_error_message is not None:
print "docker image push error:", str(err)
print( "docker image push error:", str(err))
if str(err).lower().find(expected_error_message.lower()) < 0:
raise Exception(r"Push image: Return message {} is not as expected {}".format(str(err), expected_error_message))
else:
@ -82,10 +84,12 @@ class DockerAPI(object):
if caught_err == False:
if expected_error_message is not None:
if str(ret).lower().find(expected_error_message.lower()) < 0:
raise Exception(r" Failed to catch error [{}] when push image {}".format (expected_error_message, harbor_registry))
raise Exception(r" Failed to catch error [{}] when push image {}, return message: {}".
format (expected_error_message, harbor_registry, str(ret)))
else:
if str(ret).lower().find("errorDetail".lower()) >= 0:
raise Exception(r" It's was not suppose to catch error when push image {}, return message is [{}]".format (harbor_registry, ret))
raise Exception(r" It's was not suppose to catch error when push image {}, return message is [{}]".
format (harbor_registry, ret))
def docker_image_build(self, harbor_registry, tags=None, size=1, expected_error_message = None):
caught_err = False
@ -111,10 +115,13 @@ class DockerAPI(object):
print("build image %s with size %d" % (repo, size))
self.DCLIENT.remove_image(repo)
self.DCLIENT.remove_container(c)
except Exception, err:
self.DCLIENT.pull(repo)
image = self.DCLIENT2.images.get(repo)
return repo, image.id
except Exception as err:
caught_err = True
if expected_error_message is not None:
print "docker image build error:", str(err)
print( "docker image build error:", str(err))
if str(err).lower().find(expected_error_message.lower()) < 0:
raise Exception(r"Push image: Return message {} is not as expected {}".format(str(err), expected_error_message))
else:
@ -122,7 +129,8 @@ class DockerAPI(object):
if caught_err == False:
if expected_error_message is not None:
if str(ret).lower().find(expected_error_message.lower()) < 0:
raise Exception(r" Failed to catch error [{}] when push image {}".format (expected_error_message, harbor_registry))
raise Exception(r" Failed to catch error [{}] when build image {}, return message: {}".
format (expected_error_message, harbor_registry, str(ret)))
else:
if str(ret).lower().find("errorDetail".lower()) >= 0:
raise Exception(r" It's was not suppose to catch error when push image {}, return message is [{}]".format (harbor_registry, ret))

View File

@ -57,11 +57,14 @@ class Project(base.Base):
def check_project_name_exist(self, name=None, **kwargs):
client = self._get_client(**kwargs)
_, status_code, _ = client.projects_head_with_http_info(name)
try:
_, status_code, _ = client.projects_head_with_http_info(name)
except ApiException as e:
status_code = -1
return {
200: True,
404: False,
}.get(status_code,'error')
}.get(status_code,False)
def get_project(self, project_id, expect_status_code = 200, expect_response_body = None, **kwargs):
client = self._get_client(**kwargs)
@ -94,10 +97,14 @@ class Project(base.Base):
def get_project_log(self, project_id, expect_status_code = 200, **kwargs):
client = self._get_client(**kwargs)
body, status_code, _ = client.projects_project_id_logs_get_with_http_info(project_id)
base._assert_status_code(expect_status_code, status_code)
try:
body, status_code, _ = client.projects_project_id_logs_get_with_http_info(project_id)
except ApiException as e:
base._assert_status_code(expect_status_code, e.status)
body=[]
else:
base._assert_status_code(expect_status_code, status_code)
return body
def filter_project_logs(self, project_id, operator, repository, tag, operation_type, **kwargs):
access_logs = self.get_project_log(project_id, **kwargs)
count = 0
@ -162,11 +169,18 @@ class Project(base.Base):
base._assert_status_code(expect_status_code, status_code)
base._assert_status_code(200, status_code)
def add_project_members(self, project_id, user_id, member_role_id = None, expect_status_code = 201, **kwargs):
def add_project_members(self, project_id, user_id = None, member_role_id = None, _ldap_group_dn=None,expect_status_code = 201, **kwargs):
kwargs['api_type'] = 'products'
projectMember = swagger_client.ProjectMember()
if user_id is not None:
projectMember.member_user = {"user_id": int(user_id)}
if member_role_id is None:
member_role_id = 1
_member_user = {"user_id": int(user_id)}
projectMember = swagger_client.ProjectMember(member_role_id, member_user = _member_user)
projectMember.role_id = 1
else:
projectMember.role_id = member_role_id
if _ldap_group_dn is not None:
projectMember.member_group = swagger_client.UserGroup(ldap_group_dn=_ldap_group_dn)
client = self._get_client(**kwargs)
data = []
data, status_code, header = client.projects_project_id_members_post_with_http_info(project_id, project_member = projectMember)
@ -214,4 +228,15 @@ class Project(base.Base):
client = self._get_client(**kwargs)
_, status_code, _ = client.projects_project_id_robots_robot_id_delete_with_http_info(project_id, robot_id)
base._assert_status_code(expect_status_code, status_code)
base._assert_status_code(200, status_code)
base._assert_status_code(200, status_code)
def query_user_logs(self, project_id, status_code=200, **kwargs):
try:
logs = self.get_project_log(project_id, expect_status_code=status_code, **kwargs)
count = 0
for log in list(logs):
count = count + 1
return count
except ApiException as e:
_assert_status_code(status_code, e.status)
return 0

View File

@ -39,7 +39,7 @@ class Replication(base.Base):
if str(rule_data.name) != str(expect_rule_name):
raise Exception(r"Check replication rule failed, expect <{}> actual <{}>.".format(expect_rule_name, str(rule_data.name)))
else:
print r"Check Replication rule passed, rule name <{}>.".format(str(rule_data.name))
print(r"Check Replication rule passed, rule name <{}>.".format(str(rule_data.name)))
#get_trigger = str(rule_data.trigger.kind)
#if expect_trigger is not None and get_trigger == str(expect_trigger):
# print r"Check Replication rule trigger passed, trigger name <{}>.".format(get_trigger)

View File

@ -12,7 +12,7 @@ def pull_harbor_image(registry, username, password, image, tag, expected_login_e
if expected_login_error_message != None:
return
time.sleep(2)
_docker_api.docker_image_pull(r'{}/{}'.format(registry, image), tag = tag, expected_error_message = expected_error_message)
ret = _docker_api.docker_image_pull(r'{}/{}'.format(registry, image), tag = tag, expected_error_message = expected_error_message)
def push_image_to_project(project_name, registry, username, password, image, tag, expected_login_error_message = None, expected_error_message = None):
_docker_api = DockerAPI()
@ -110,7 +110,7 @@ class Repository(base.Base):
_tag = self.get_tag(repo_name, tag, **kwargs)
if _tag.name == tag and _tag.scan_overview != None:
for report in _tag.scan_overview.values():
if report.get('scan_status') == expected_scan_status:
if report.scan_status == expected_scan_status:
return
raise Exception("Scan image result is not as expected {}.".format(expected_scan_status))
@ -129,7 +129,7 @@ class Repository(base.Base):
signatures = self.get_repo_signatures(repo_name, **kwargs)
for each_sign in signatures:
if each_sign.tag == tag and len(each_sign.hashes["sha256"]) == 44:
print "sha256:", len(each_sign.hashes["sha256"])
print("sha256:", len(each_sign.hashes["sha256"]))
return
raise Exception(r"Signature of {}:{} is not exist!".format(repo_name, tag))

View File

@ -1,10 +1,11 @@
# -*- coding: utf-8 -*-
import subprocess
from testutils import notary_url
def sign_image(registry_ip, project_name, image, tag):
try:
ret = subprocess.check_output(["./tests/apitests/python/sign_image.sh", registry_ip, project_name, image, tag], shell=False)
print "sign_image return: ", ret
except subprocess.CalledProcessError, exc:
ret = subprocess.check_output(["./tests/apitests/python/sign_image.sh", registry_ip, project_name, image, tag, notary_url], shell=False)
print("sign_image return: ", ret)
except subprocess.CalledProcessError as exc:
raise Exception("Failed to sign image error is {} {}.".format(exc.returncode, exc.output))

View File

@ -177,7 +177,7 @@ class System(base.Base):
except Exception as e:
base._assert_status_code(expected_status_code, e.status)
else:
base._assert_status_code(expected_status_code, r[1])
base._assert_status_code(expected_status_code, r.status)
def get_cve_whitelist(self, **kwargs):
client = self._get_client(**kwargs)

View File

@ -45,7 +45,6 @@ class User(base.Base):
client = self._get_client(**kwargs)
data, status_code, _ = client.users_user_id_get_with_http_info(user_id)
base._assert_status_code(200, status_code)
print "data in lib:", data
return data
@ -80,7 +79,6 @@ class User(base.Base):
def update_user_role_as_sysadmin(self, user_id, IsAdmin, **kwargs):
client = self._get_client(**kwargs)
has_admin_role = swagger_client.HasAdminRole(IsAdmin)
print "has_admin_role:", has_admin_role
_, status_code, _ = client.users_user_id_sysadmin_put_with_http_info(user_id, has_admin_role)
base._assert_status_code(200, status_code)
return user_id

View File

@ -1,11 +1,12 @@
#!/bin/sh
IP=$1
NOTARY_URL=$5
PASSHRASE='Harbor12345'
echo $IP
export DOCKER_CONTENT_TRUST=1
export DOCKER_CONTENT_TRUST_SERVER=https://$IP:4443
export DOCKER_CONTENT_TRUST_SERVER=$NOTARY_URL
export NOTARY_ROOT_PASSPHRASE=$PASSHRASE
export NOTARY_TARGETS_PASSPHRASE=$PASSHRASE

View File

@ -10,12 +10,8 @@ from library.user import User
class TestProjects(unittest.TestCase):
"""UserGroup unit test stubs"""
def setUp(self):
project = Project()
self.project= project
user = User()
self.user= user
self.project = Project()
self.user= User()
def tearDown(self):
pass

View File

@ -13,21 +13,14 @@ import swagger_client
class TestProjects(unittest.TestCase):
@classmethod
def setUp(self):
project = Project()
self.project= project
user = User()
self.user= user
replication = Replication()
self.replication= replication
registry = Registry()
self.registry= registry
self.project = Project()
self.user = User()
self.replication = Replication()
self.registry = Registry()
@classmethod
def tearDown(self):
print "Case completed"
print("Case completed")
@unittest.skipIf(TEARDOWN == False, "Test data won't be erased.")
def test_ClearData(self):
@ -76,7 +69,6 @@ class TestProjects(unittest.TestCase):
#3. Create a new registry
TestProjects.registry_id, _ = self.registry.create_registry("https://" + harbor_server,**ADMIN_CLIENT)
print "TestProjects.registry_id:", TestProjects.registry_id
#4. Create a new rule for this registry;
TestProjects.rule_id, rule_name = self.replication.create_replication_policy(dest_registry=swagger_client.Registry(id=int(TestProjects.registry_id)), **ADMIN_CLIENT)

View File

@ -14,21 +14,14 @@ from library.label import Label
class TestProjects(unittest.TestCase):
@classmethod
def setUp(self):
project = Project()
self.project= project
user = User()
self.user= user
repo = Repository()
self.repo= repo
label = Label()
self.label= label
self.project = Project()
self.user = User()
self.repo = Repository()
self.label = Label()
@classmethod
def tearDown(self):
print "Case completed"
print("Case completed")
@unittest.skipIf(TEARDOWN == False, "Test data won't be erased.")
def test_ClearData(self):

View File

@ -1,169 +1,74 @@
# coding: utf-8
"""
Harbor API
These APIs provide services for manipulating Harbor project.
OpenAPI spec version: 1.4.0
Generated by: https://github.com/swagger-api/swagger-codegen.git
"""
from __future__ import absolute_import
import os
import sys
sys.path.append(os.environ["SWAGGER_CLIENT_PATH"])
import unittest
import testutils
import docker
import swagger_client
from swagger_client.models.project import Project
from swagger_client.models.project_req import ProjectReq
from swagger_client.models.project_metadata import ProjectMetadata
from swagger_client.models.project_member import ProjectMember
from swagger_client.models.user_group import UserGroup
from swagger_client.models.configurations import Configurations
from testutils import harbor_server
from testutils import TEARDOWN
from testutils import ADMIN_CLIENT
from testutils import created_user, created_project
from library.project import Project
from library.user import User
from library.repository import Repository
from library.repository import push_image_to_project
from library.configurations import Configurations
from swagger_client.rest import ApiException
from pprint import pprint
#Testcase
#3-07-LDAP usergroup manage project group members
class TestAssignRoleToLdapGroup(unittest.TestCase):
harbor_host = os.environ["HARBOR_HOST"]
"""AssignRoleToLdapGroup unit test stubs"""
product_api = testutils.GetProductApi("admin", "Harbor12345")
project_id = 0
docker_client = docker.from_env()
@classmethod
def setUp(self):
#login with admin, create a project and assign role to ldap group
result = self.product_api.configurations_put(configurations=Configurations(ldap_filter="", ldap_group_attribute_name="cn", ldap_group_base_dn="ou=groups,dc=example,dc=com", ldap_group_search_filter="objectclass=groupOfNames", ldap_group_search_scope=2))
pprint(result)
cfgs = self.product_api.configurations_get()
pprint(cfgs)
req = ProjectReq()
req.project_name = "ldap_group_test_prj"
req.metadata = ProjectMetadata(public="false")
result = self.product_api.projects_post(req)
pprint(result)
projs = self.product_api.projects_get(name="ldap_group_test_prj")
if projs.count>0 :
project = projs[0]
self.project_id = project.project_id
# asign role to project with dn
group_dn = "cn=harbor_admin,ou=groups,dc=example,dc=com"
projectmember = ProjectMember()
projectmember.role_id = 1
projectmember.member_group = UserGroup(ldap_group_dn=group_dn)
result = self.product_api.projects_project_id_members_post( project_id=self.project_id, project_member=projectmember )
pprint(result)
group_dn = "cn=harbor_dev,ou=groups,dc=example,dc=com"
projectmember = ProjectMember()
projectmember.role_id = 2
projectmember.member_group = UserGroup(ldap_group_dn=group_dn)
result = self.product_api.projects_project_id_members_post( project_id=self.project_id, project_member=projectmember )
pprint(result)
group_dn = "cn=harbor_guest,ou=groups,dc=example,dc=com"
projectmember = ProjectMember()
projectmember.role_id = 3
projectmember.member_group = UserGroup(ldap_group_dn=group_dn)
result = self.product_api.projects_project_id_members_post( project_id=self.project_id, project_member=projectmember )
pprint(result)
pass
self.conf= Configurations()
self.project = Project()
self.repo = Repository()
@classmethod
def tearDown(self):
#delete images in project
result = self.product_api.repositories_repo_name_delete(repo_name="ldap_group_test_prj/busybox")
pprint(result)
result = self.product_api.repositories_repo_name_delete(repo_name="ldap_group_test_prj/busyboxdev")
pprint(result)
if self.project_id > 0 :
self.product_api.projects_project_id_delete(self.project_id)
pass
print("Case completed")
def testAssignRoleToLdapGroup(self):
"""Test AssignRoleToLdapGroup"""
admin_product_api = testutils.GetProductApi(username="admin_user", password="zhu88jie")
projects = admin_product_api.projects_get(name="ldap_group_test_prj")
self.assertTrue(projects.count > 1)
self.assertEqual(1, projects[0].current_user_role_id)
"""
Test case:
Assign Role To Ldap Group
Test step and expected result:
1. Set LDAP Auth configurations;
2. Create a new public project(PA) by Admin;
3. Add 3 member groups to project(PA);
4. Push image by each member role;
5. Verfify that admin_user and dev_user can push image, guest_user can not push image;
6. Verfify that admin_user, dev_user and guest_user can view logs, test user can not view logs.
7. Delete repository(RA) by user(UA);
8. Delete project(PA);
"""
url = ADMIN_CLIENT["endpoint"]
USER_ADMIN=dict(endpoint = url, username = "admin_user", password = "zhu88jie", repo = "hello-world")
USER_DEV=dict(endpoint = url, username = "dev_user", password = "zhu88jie", repo = "alpine")
USER_GUEST=dict(endpoint = url, username = "guest_user", password = "zhu88jie", repo = "busybox")
USER_TEST=dict(endpoint = url, username = "test", password = "123456")
dev_product_api = testutils.GetProductApi("dev_user", "zhu88jie")
projects = dev_product_api.projects_get(name="ldap_group_test_prj")
self.assertTrue(projects.count > 1)
self.assertEqual(2, projects[0].current_user_role_id)
self.conf.set_configurations_of_ldap(ldap_filter="", ldap_group_attribute_name="cn", ldap_group_base_dn="ou=groups,dc=example,dc=com",
ldap_group_search_filter="objectclass=groupOfNames", ldap_group_search_scope=2, **ADMIN_CLIENT)
guest_product_api = testutils.GetProductApi("guest_user", "zhu88jie")
projects = guest_product_api.projects_get(name="ldap_group_test_prj")
self.assertTrue(projects.count > 1)
self.assertEqual(3, projects[0].current_user_role_id)
with created_project(metadata={"public": "false"}) as (project_id, project_name):
self.project.add_project_members(project_id, member_role_id = 1, _ldap_group_dn = "cn=harbor_admin,ou=groups,dc=example,dc=com", **ADMIN_CLIENT)
self.project.add_project_members(project_id, member_role_id = 2, _ldap_group_dn = "cn=harbor_dev,ou=groups,dc=example,dc=com", **ADMIN_CLIENT)
self.project.add_project_members(project_id, member_role_id = 3, _ldap_group_dn = "cn=harbor_guest,ou=groups,dc=example,dc=com", **ADMIN_CLIENT)
projects = self.project.get_projects(dict(name=project_name), **USER_ADMIN)
self.assertTrue(len(projects) == 1)
self.assertEqual(1, projects[0].current_user_role_id)
self.dockerCmdLoginAdmin(username="admin_user", password="zhu88jie")
self.dockerCmdLoginDev(username="dev_user", password="zhu88jie")
self.dockerCmdLoginGuest(username="guest_user", password="zhu88jie")
repo_name_admin, tag_name_admin = push_image_to_project(project_name, harbor_server, USER_ADMIN["username"], USER_ADMIN["password"], USER_ADMIN["repo"], "latest")
self.repo.image_should_exist(repo_name_admin, tag_name_admin, **USER_ADMIN)
repo_name_dev, tag_name_dev = push_image_to_project(project_name, harbor_server, USER_DEV["username"], USER_DEV["password"], USER_DEV["repo"], "latest")
self.repo.image_should_exist(repo_name_dev, tag_name_dev, **USER_DEV)
repo_name_guest, tag_name_guest = push_image_to_project(project_name, harbor_server, USER_GUEST["username"], USER_GUEST["password"], USER_GUEST["repo"], "latest")
self.repo.image_should_not_exist(repo_name_guest, tag_name_guest, **USER_GUEST)
self.assertTrue(self.queryUserLogs(username="admin_user", password="zhu88jie")>0, "admin user can see logs")
self.assertTrue(self.queryUserLogs(username="dev_user", password="zhu88jie")>0, "dev user can see logs")
self.assertTrue(self.queryUserLogs(username="guest_user", password="zhu88jie")>0, "guest user can see logs")
self.assertTrue(self.queryUserLogs(username="test", password="123456")==0, "test user can not see any logs")
pass
self.assertTrue(self.project.query_user_logs(project_id, **USER_ADMIN)>0, "admin user can see logs")
self.assertTrue(self.project.query_user_logs(project_id, **USER_DEV)>0, "dev user can see logs")
self.assertTrue(self.project.query_user_logs(project_id, **USER_GUEST)>0, "guest user can see logs")
self.assertTrue(self.project.query_user_logs(project_id, status_code=403, **USER_TEST)==0, "test user can not see any logs")
# admin user can push, pull images
def dockerCmdLoginAdmin(self, username, password):
pprint(self.docker_client.info())
self.docker_client.login(username=username, password=password, registry=self.harbor_host)
self.docker_client.images.pull("busybox:latest")
image = self.docker_client.images.get("busybox:latest")
image.tag(repository=self.harbor_host+"/ldap_group_test_prj/busybox", tag="latest")
output = self.docker_client.images.push(repository=self.harbor_host+"/ldap_group_test_prj/busybox", tag="latest")
if output.find("error")>0 :
self.fail("Should not fail to push image for admin_user")
self.docker_client.images.pull(repository=self.harbor_host+"/ldap_group_test_prj/busybox", tag="latest")
pass
# dev user can push, pull images
def dockerCmdLoginDev(self, username, password, harbor_server=harbor_host):
self.docker_client.login(username=username, password=password, registry=self.harbor_host)
self.docker_client.images.pull("busybox:latest")
image = self.docker_client.images.get("busybox:latest")
image.tag(repository=self.harbor_host+"/ldap_group_test_prj/busyboxdev", tag="latest")
output = self.docker_client.images.push(repository=self.harbor_host+"/ldap_group_test_prj/busyboxdev", tag="latest")
if output.find("error") >0 :
self.fail("Should not fail to push images for dev_user")
pass
# guest user can pull images
def dockerCmdLoginGuest(self, username, password, harbor_server=harbor_host):
self.docker_client.login(username=username, password=password, registry=self.harbor_host)
self.docker_client.images.pull("busybox:latest")
image = self.docker_client.images.get("busybox:latest")
image.tag(repository=self.harbor_host+"/ldap_group_test_prj/busyboxguest", tag="latest")
output = self.docker_client.images.push(repository=self.harbor_host+"1/ldap_group_test_prj/busyboxguest", tag="latest")
if output.find("error")<0 :
self.fail("Should failed to push image for guest user")
self.docker_client.images.pull(repository=self.harbor_host+"/ldap_group_test_prj/busybox", tag="latest")
pass
# check can see his log in current project
def queryUserLogs(self, username, password, harbor_host=harbor_host):
client_product_api = testutils.GetProductApi(username=username, password=password)
logs = client_product_api.logs_get(repository="ldap_group_test_prj", username=username)
if logs == None:
return 0
else:
return logs.count
self.repo.delete_repoitory(repo_name_admin, **USER_ADMIN)
self.repo.delete_repoitory(repo_name_dev, **USER_ADMIN)
if __name__ == '__main__':
unittest.main()
unittest.main()

View File

@ -10,15 +10,12 @@ from library.configurations import Configurations
class TestProjects(unittest.TestCase):
@classmethod
def setUp(self):
conf = Configurations()
self.conf= conf
user = User()
self.user= user
self.conf= Configurations()
self.user = User()
@classmethod
def tearDown(self):
print "Case completed"
print("Case completed")
@unittest.skipIf(TEARDOWN == False, "Test data won't be erased.")
def test_ClearData(self):

View File

@ -3,11 +3,10 @@ from __future__ import absolute_import
import unittest
from library.base import _assert_status_code
from testutils import ADMIN_CLIENT
from testutils import harbor_server
from testutils import TEARDOWN
from library.base import _assert_status_code
from library.project import Project
from library.user import User
from library.repository import Repository
@ -16,19 +15,14 @@ from library.repository import push_image_to_project
class TestProjects(unittest.TestCase):
@classmethod
def setUpClass(self):
project = Project()
self.project= project
user = User()
self.user= user
repo = Repository()
self.repo= repo
self.project= Project()
self.user= User()
self.repo= Repository()
@classmethod
def tearDownClass(self):
print "Case completed"
print("Case completed")
@unittest.skipIf(TEARDOWN == False, "Test data won't be erased.")
def test_ClearData(self):

View File

@ -10,22 +10,16 @@ from library.configurations import Configurations
class TestProjects(unittest.TestCase):
@classmethod
def setUp(self):
conf = Configurations()
self.conf= conf
project = Project()
self.project= project
user = User()
self.user= user
self.conf= Configurations()
self.project= Project()
self.user= User()
@classmethod
def tearDown(self):
print "Case completed"
print("Case completed")
@unittest.skipIf(TEARDOWN == False, "Test data won't be erased.")
def test_ClearData(self):
print "Clear trace"
#1. Delete project(PA);
self.project.delete_project(TestProjects.project_edit_project_creation_id, **TestProjects.USER_edit_project_creation_CLIENT)

View File

@ -29,7 +29,7 @@ class TestProjects(unittest.TestCase):
@classmethod
def tearDown(self):
print "Case completed"
print("Case completed")
@unittest.skipIf(TEARDOWN == False, "Test data won't be erased.")
def test_ClearData(self):

View File

@ -11,18 +11,13 @@ from library.chart import Chart
class TestProjects(unittest.TestCase):
@classmethod
def setUp(self):
chart = Chart()
self.chart= chart
project = Project()
self.project= project
user = User()
self.user= user
self.chart= Chart()
self.project= Project()
self.user= User()
@classmethod
def tearDown(self):
print "Case completed"
print("Case completed")
@unittest.skipIf(TEARDOWN == False, "Test data won't be erased.")
def test_ClearData(self):

View File

@ -13,18 +13,13 @@ from library.repository import Repository
class TestProjects(unittest.TestCase):
@classmethod
def setUp(self):
project = Project()
self.project= project
user = User()
self.user= user
repo = Repository()
self.repo= repo
self.project = Project()
self.user = User()
self.repo = Repository()
@classmethod
def tearDown(self):
print "Case completed"
print("Case completed")
@unittest.skipIf(TEARDOWN == False, "Test data won't be erased.")
def test_ClearData(self):

View File

@ -15,18 +15,13 @@ from library.repository import pull_harbor_image
class TestProjects(unittest.TestCase):
@classmethod
def setUp(self):
project = Project()
self.project= project
user = User()
self.user= user
repo = Repository()
self.repo= repo
self.project= Project()
self.user= User()
self.repo= Repository()
@classmethod
def tearDown(self):
print "Case completed"
print("Case completed")
@unittest.skipIf(TEARDOWN == False, "Test data won't be erased.")
def test_ClearData(self):

View File

@ -25,16 +25,8 @@ class TestProjects(unittest.TestCase):
self.system = System()
@classmethod
def tearDown(self):
print "Case completed"
@unittest.skipIf(TEARDOWN == False, "Test data won't be erased.")
def test_ClearData(self):
#1. Delete project(PA);
self.project.delete_project(TestProjects.project_test_quota_id, **ADMIN_CLIENT)
#2. Delete user(UA);
self.user.delete_user(TestProjects.user_test_quota_id, **ADMIN_CLIENT)
def tearDown(cls):
print("Case completed")
def testProjectQuota(self):
"""

View File

@ -28,7 +28,7 @@ class TestProjects(unittest.TestCase):
@classmethod
def tearDownClass(self):
print "Case completed"
print("Case completed")
@unittest.skipIf(TEARDOWN == False, "Test data won't be erased.")
def test_ClearData(self):

View File

@ -95,7 +95,7 @@ class TestProjects(unittest.TestCase):
@classmethod
def tearDownClass(self):
print "Case completed"
print("Case completed")
@unittest.skipIf(TEARDOWN == False, "Test data won't be erased.")
def test_ClearData(self):

View File

@ -15,18 +15,13 @@ from library.base import _assert_status_code
class TestProjects(unittest.TestCase):
@classmethod
def setUp(self):
project = Project()
self.project= project
user = User()
self.user= user
repo = Repository()
self.repo= repo
self.project = Project()
self.user = User()
self.repo = Repository()
@classmethod
def tearDown(self):
print "Case completed"
print("Case completed")
@unittest.skipIf(TEARDOWN == False, "Test data won't be erased.")
def test_ClearData(self):
@ -79,57 +74,55 @@ class TestProjects(unittest.TestCase):
image_robot_account = "mariadb"
tag = "latest"
print "#1. Create user(UA);"
# "#1. Create user(UA);"
TestProjects.user_ra_id, user_ra_name = self.user.create_user(user_password = user_ra_password, **ADMIN_CLIENT)
TestProjects.USER_RA_CLIENT=dict(endpoint = url, username = user_ra_name, password = user_ra_password)
print "#2. Create private project(PA), private project(PB) and public project(PC) by user(UA);"
# "#2. Create private project(PA), private project(PB) and public project(PC) by user(UA);"
TestProjects.project_ra_id_a, project_ra_name_a = self.project.create_project(metadata = {"public": "false"}, **TestProjects.USER_RA_CLIENT)
TestProjects.project_ra_id_b, project_ra_name_b = self.project.create_project(metadata = {"public": "false"}, **TestProjects.USER_RA_CLIENT)
TestProjects.project_ra_id_c, project_ra_name_c = self.project.create_project(metadata = {"public": "true"}, **TestProjects.USER_RA_CLIENT)
print "#3. Push image(ImagePA) to project(PA), image(ImagePB) to project(PB) and image(ImagePC) to project(PC) by user(UA);"
# "#3. Push image(ImagePA) to project(PA), image(ImagePB) to project(PB) and image(ImagePC) to project(PC) by user(UA);"
TestProjects.repo_name_in_project_a, tag_a = push_image_to_project(project_ra_name_a, harbor_server, user_ra_name, user_ra_password, image_project_a, tag)
TestProjects.repo_name_in_project_b, tag_b = push_image_to_project(project_ra_name_b, harbor_server, user_ra_name, user_ra_password, image_project_b, tag)
TestProjects.repo_name_in_project_c, tag_c = push_image_to_project(project_ra_name_c, harbor_server, user_ra_name, user_ra_password, image_project_c, tag)
print "#4. Create a new robot account(RA) with pull and push privilige in project(PA) by user(UA);"
# "#4. Create a new robot account(RA) with pull and push privilige in project(PA) by user(UA);"
robot_id, robot_account = self.project.add_project_robot_account(TestProjects.project_ra_id_a, project_ra_name_a, **TestProjects.USER_RA_CLIENT)
print robot_account.name
print robot_account.token
print "#5. Check robot account info, it should has both pull and push priviliges;"
# "#5. Check robot account info, it should has both pull and push priviliges;"
data = self.project.get_project_robot_account_by_id(TestProjects.project_ra_id_a, robot_id, **TestProjects.USER_RA_CLIENT)
_assert_status_code(robot_account.name, data.name)
print "#6. Pull image(ImagePA) from project(PA) by robot account(RA), it must be successful;"
# "#6. Pull image(ImagePA) from project(PA) by robot account(RA), it must be successful;"
pull_harbor_image(harbor_server, robot_account.name, robot_account.token, TestProjects.repo_name_in_project_a, tag_a)
print "#7. Push image(ImageRA) to project(PA) by robot account(RA), it must be successful;"
# "#7. Push image(ImageRA) to project(PA) by robot account(RA), it must be successful;"
TestProjects.repo_name_pa, _ = push_image_to_project(project_ra_name_a, harbor_server, robot_account.name, robot_account.token, image_robot_account, tag)
print "#8. Push image(ImageRA) to project(PB) by robot account(RA), it must be not successful;"
# "#8. Push image(ImageRA) to project(PB) by robot account(RA), it must be not successful;"
push_image_to_project(project_ra_name_b, harbor_server, robot_account.name, robot_account.token, image_robot_account, tag, expected_error_message = "denied: requested access to the resource is denied")
print "#9. Pull image(ImagePB) from project(PB) by robot account(RA), it must be not successful;"
# "#9. Pull image(ImagePB) from project(PB) by robot account(RA), it must be not successful;"
pull_harbor_image(harbor_server, robot_account.name, robot_account.token, TestProjects.repo_name_in_project_b, tag_b, expected_error_message = r"pull access denied for " + harbor_server + "/" + TestProjects.repo_name_in_project_b)
print "#10. Pull image from project(PC), it must be successful;"
# "#10. Pull image from project(PC), it must be successful;"
pull_harbor_image(harbor_server, robot_account.name, robot_account.token, TestProjects.repo_name_in_project_c, tag_c)
print "#11. Push image(ImageRA) to project(PC) by robot account(RA), it must be not successful;"
# "#11. Push image(ImageRA) to project(PC) by robot account(RA), it must be not successful;"
push_image_to_project(project_ra_name_c, harbor_server, robot_account.name, robot_account.token, image_robot_account, tag, expected_error_message = "denied: requested access to the resource is denied")
print "#12. Update action property of robot account(RA);"
# "#12. Update action property of robot account(RA);"
self.project.disable_project_robot_account(TestProjects.project_ra_id_a, robot_id, True, **TestProjects.USER_RA_CLIENT)
print "#13. Pull image(ImagePA) from project(PA) by robot account(RA), it must be not successful;"
# "#13. Pull image(ImagePA) from project(PA) by robot account(RA), it must be not successful;"
pull_harbor_image(harbor_server, robot_account.name, robot_account.token, TestProjects.repo_name_in_project_a, tag_a, expected_login_error_message = "401 Client Error: Unauthorized")
print "#14. Push image(ImageRA) to project(PA) by robot account(RA), it must be not successful;"
# "#14. Push image(ImageRA) to project(PA) by robot account(RA), it must be not successful;"
push_image_to_project(project_ra_name_a, harbor_server, robot_account.name, robot_account.token, image_robot_account, tag, expected_login_error_message = "401 Client Error: Unauthorized")
print "#15. Delete robot account(RA), it must be not successful."
# "#15. Delete robot account(RA), it must be not successful."
self.project.delete_project_robot_account(TestProjects.project_ra_id_a, robot_id, **TestProjects.USER_RA_CLIENT)
if __name__ == '__main__':

View File

@ -27,7 +27,7 @@ class TestProjects(unittest.TestCase):
@classmethod
def tearDown(self):
print "Case completed"
print("Case completed")
@unittest.skipIf(TEARDOWN == False, "Test data won't be erased.")
def test_ClearData(self):

View File

@ -23,7 +23,7 @@ class TestProjects(unittest.TestCase):
@classmethod
def tearDown(self):
print "Case completed"
print("Case completed")
@unittest.skipIf(TEARDOWN == True, "Test data won't be erased.")
def test_ClearData(self):

View File

@ -24,7 +24,7 @@ class TestProjects(unittest.TestCase):
@classmethod
def tearDown(self):
print "Case completed"
print("Case completed")
@unittest.skipIf(TEARDOWN == True, "Test data won't be erased.")
def test_ClearData(self):

View File

@ -29,7 +29,7 @@ class TestProjects(unittest.TestCase):
@classmethod
def tearDown(self):
self.test_result.get_final_result()
print "Case completed"
print("Case completed")
@unittest.skipIf(TEARDOWN == False, "Test data won't be erased.")
def test_ClearData(self):

View File

@ -3,6 +3,12 @@ import os
import sys
sys.path.insert(0, os.environ["SWAGGER_CLIENT_PATH"])
path=os.getcwd() + "/library"
sys.path.insert(0, path)
path=os.getcwd() + "/tests/apitests/python/library"
sys.path.insert(0, path)
from swagger_client.rest import ApiException
import swagger_client.models
from pprint import pprint
@ -14,7 +20,8 @@ harbor_server = os.environ["HARBOR_HOST"]
#CLIENT=dict(endpoint="https://"+harbor_server+"/api")
ADMIN_CLIENT=dict(endpoint = os.environ.get("HARBOR_HOST_SCHEMA", "https")+ "://"+harbor_server+"/api", username = admin_user, password = admin_pwd)
USER_ROLE=dict(admin=0,normal=1)
TEARDOWN = True
TEARDOWN = os.environ.get('TEARDOWN', 'true').lower() in ('true', 'yes')
notary_url = os.environ.get('NOTARY_URL', 'https://'+harbor_server+':4443')
def GetProductApi(username, password, harbor_server= os.environ["HARBOR_HOST"]):
@ -37,5 +44,36 @@ class TestResult(object):
def get_final_result(self):
if self.num_errors > 0:
for each_err_msg in self.error_message:
print "Error message:", each_err_msg
print("Error message:", each_err_msg)
raise Exception(r"Test case failed with {} errors.".format(self.num_errors))
from contextlib import contextmanager
@contextmanager
def created_user(password):
from library.user import User
api = User()
user_id, user_name = api.create_user(user_password=password, **ADMIN_CLIENT)
try:
yield (user_id, user_name)
finally:
if TEARDOWN:
api.delete_user(user_id, **ADMIN_CLIENT)
@contextmanager
def created_project(name=None, metadata=None, user_id=None, member_role_id=None):
from library.project import Project
api = Project()
project_id, project_name = api.create_project(name=name, metadata=metadata, **ADMIN_CLIENT)
if user_id:
api.add_project_members(project_id, user_id=user_id, member_role_id=member_role_id, **ADMIN_CLIENT)
try:
yield (project_id, project_name)
finally:
if TEARDOWN:
api.delete_project(project_id, **ADMIN_CLIENT)

View File

@ -2,7 +2,6 @@
Documentation Harbor BATs
Resource ../../resources/APITest-Util.robot
Resource ../../resources/Docker-Util.robot
Library ../../apitests/python/library/Harbor.py ${SERVER_CONFIG}
Library OperatingSystem
Library String
Library Collections

View File

@ -1,4 +1,4 @@
#!/usr/local/bin/expect
#!/usr/bin/env expect
set HOST [lindex $argv 0]
set PROJECT [lindex $argv 1]

View File

@ -14,7 +14,6 @@ args = parser.parse_args()
url = "https://"+args.endpoint+"/api/"
endpoint_url = "https://"+args.endpoint
print url
with open("feature_map.json") as f:
feature_map = json.load(f)
@ -108,7 +107,7 @@ class HarborAPI:
"url":""+endpointurl+""
}
body=dict(body=payload)
print body
print(body)
request(url+"/registries", 'post', **body)
else:
raise Exception(r"Error: Feature {} has no branch {}.".format(sys._getframe().f_code.co_name, branch))
@ -131,8 +130,8 @@ class HarborAPI:
else:
registry = r'"dest_registry": { "id": '+str(targetid)+r'},'
body=dict(body=json.loads(r'{"name":"'+replicationrule["rulename"].encode('utf-8')+r'","dest_namespace":"'+replicationrule["dest_namespace"].encode('utf-8')+r'","deletion": '+str(replicationrule["deletion"]).lower()+r',"enabled": '+str(replicationrule["enabled"]).lower()+r',"override": '+str(replicationrule["override"]).lower()+r',"description": "string",'+ registry + r'"trigger":{"type": "'+replicationrule["trigger_type"]+r'", "trigger_settings":{"cron":"'+replicationrule["cron"]+r'"}},"filters":[ {"type":"name","value":"'+replicationrule["name_filters"]+r'"},{"type":"tag","value":"'+replicationrule["tag_filters"]+r'"}]}'))
print body
body=dict(body=json.loads(r'{"name":"'+replicationrule["rulename"]+r'","dest_namespace":"'+replicationrule["dest_namespace"] + r'","deletion": '+str(replicationrule["deletion"]).lower()+r',"enabled": '+str(replicationrule["enabled"]).lower()+r',"override": '+str(replicationrule["override"]).lower()+r',"description": "string",'+ registry + r'"trigger":{"type": "'+replicationrule["trigger_type"]+r'", "trigger_settings":{"cron":"'+replicationrule["cron"]+r'"}},"filters":[ {"type":"name","value":"'+replicationrule["name_filters"]+r'"},{"type":"tag","value":"'+replicationrule["tag_filters"]+r'"}]}'))
print(body)
request(url+"replication/policies", 'post', **body)
else:
raise Exception(r"Error: Feature {} has no branch {}.".format(sys._getframe().f_code.co_name, branch))
@ -151,7 +150,7 @@ class HarborAPI:
}
}
body=dict(body=payload)
print body
print(body)
request(url+"projects/"+projectid+"", 'put', **body)
@get_feature_branch
@ -162,7 +161,7 @@ class HarborAPI:
cve_id_str = cve_id_str + '{"cve_id":"' +cve_id["id"] + '"}'
if index != len(cve_id_list["cve"]) - 1:
cve_id_str = cve_id_str + ","
body=dict(body=json.loads(r'{"items":['+cve_id_str.encode('utf-8')+r'],"expires_at":'+cve_id_list["expires_at"]+'}'))
body=dict(body=json.loads(r'{"items":['+cve_id_str + r'],"expires_at":' + cve_id_list["expires_at"] + '}'))
request(url+"system/CVEWhitelist", 'put', **body)
else:
raise Exception(r"Error: Feature {} has no branch {}.".format(sys._getframe().f_code.co_name, branch))
@ -177,12 +176,11 @@ class HarborAPI:
cve_id_str = cve_id_str + '{"cve_id":"' +cve_id["id"] + '"}'
if index != len(cve_id_list["cve"]) - 1:
cve_id_str = cve_id_str + ","
print cve_id_str
if reuse_sys_cve_whitelist == "true":
payload = r'{"metadata":{"reuse_sys_cve_whitelist":"true"}}'
else:
payload = r'{"metadata":{"reuse_sys_cve_whitelist":"false"},"cve_whitelist":{"project_id":'+projectid+',"items":['+cve_id_str.encode('utf-8')+r'],"expires_at":'+cve_id_list["expires_at"]+'}}'
print payload
payload = r'{"metadata":{"reuse_sys_cve_whitelist":"false"},"cve_whitelist":{"project_id":'+projectid+',"items":['+cve_id_str + r'],"expires_at":'+cve_id_list["expires_at"]+'}}'
print(payload)
body=dict(body=json.loads(payload))
request(url+"projects/"+projectid+"", 'put', **body)
else:
@ -248,7 +246,7 @@ class HarborAPI:
raise Exception(r"Error: Robot account count {} is not legal!".format(len(robot_account["access"])))
else:
raise Exception(r"Error: Feature {} has no branch {}.".format(sys._getframe().f_code.co_name, branch))
print payload
print(payload)
body=dict(body=payload)
request(url+"projects/"+projectid+"/robots", 'post', **body)
@ -301,10 +299,10 @@ class HarborAPI:
if not os.path.exists(ca_path):
try:
os.makedirs(ca_path)
except Exception, e:
print str(e)
except Exception as e:
print(str(e))
pass
open(target, 'wb').write(ca_content)
open(target, 'wb').write(ca_content.encode('utf-8'))
def request(url, method, user = None, userp = None, **kwargs):