add tc add-sys-label-to-tag and del-repo

Signed-off-by: danfengliu <danfengl@vmware.com>
This commit is contained in:
danfengliu 2018-11-01 03:26:04 -07:00
parent 4894d1f46f
commit 1979d17550
11 changed files with 343 additions and 31 deletions

View File

@ -33,8 +33,11 @@ def _random_name(prefix):
return "%s-%d" % (prefix, int(round(time.time() * 1000)))
def _get_id_from_header(header):
location = header["Location"]
return location.split("/")[-1]
try:
location = header["Location"]
return location.split("/")[-1]
except Exception:
return None
def _get_string_from_unicode(udata):
result=''
@ -73,4 +76,4 @@ class Base:
credential.username = kwargs.get("username")
if "password" in kwargs:
credential.password = kwargs.get("password")
return _create_client(server, credential, self.debug)
return _create_client(server, credential, self.debug)

View File

@ -0,0 +1,49 @@
# -*- coding: utf-8 -*-
import base
try:
import docker
except ImportError:
import pip
pip.main(['install', 'docker'])
import docker
class DockerAPI(object):
def __init__(self):
self.DCLIENT = docker.APIClient(base_url='unix://var/run/docker.sock',version='auto',timeout=10)
def docker_login(self, registry, username, password):
try:
self.DCLIENT.login(registry = registry, username=username, password=password)
except docker.errors.APIError, e:
raise Exception(r" Docker login failed, error is [{}]".format (e.message))
def docker_image_pull(self, image, tag = None):
_tag = "latest"
if tag is not None:
_tag = tag
try:
tag = base._random_name("tag")
pull_ret = base._get_string_from_unicode(self.DCLIENT.pull('{}:{}'.format(image, _tag)))
print "pull_ret:", pull_ret
except docker.errors.APIError, e:
raise Exception(r" Docker pull image {} failed, error is [{}]".format (image, e.message))
def docker_image_tag(self, image, harbor_registry, tag = None):
_tag = base._random_name("tag")
if tag is not None:
_tag = tag
try:
tag_ret = self.DCLIENT.tag(image, harbor_registry, _tag, force=True)
print "tag_ret:", tag_ret
return harbor_registry, _tag
except docker.errors.APIError, e:
raise Exception(r" Docker tag image {} failed, error is [{}]".format (image, e.message))
def docker_image_push(self, harbor_registry, tag):
try:
push_ret = base._get_string_from_unicode(self.DCLIENT.push(harbor_registry, tag, stream=True))
print "push_ret:", push_ret
except docker.errors.APIError, e:
raise Exception(r" Docker tag image {} failed, error is [{}]".format (image, e.message))

View File

@ -5,7 +5,7 @@ import base
import swagger_client
class Label(base.Base):
def create_label(self, name=base._random_name("label"), desc="",
def create_label(self, name=base._random_name("label"), desc="",
color="", scope="g", project_id=0, **kwargs):
label = swagger_client.Label(name=name,
description=desc, color=color,
@ -17,4 +17,8 @@ class Label(base.Base):
def add_label_to_image(self, label_id, repository, tag, **kwargs):
client = self._get_client(**kwargs)
return client.repositories_repo_name_tags_tag_labels_post(repository,
tag, swagger_client.Label(id=int(label_id)))
tag, swagger_client.Label(id=int(label_id)))
def delete_label(self, label_id, **kwargs):
client = self._get_client(**kwargs)
return client.labels_id_delete_with_http_info(int(label_id))

View File

@ -15,7 +15,6 @@ class Project(base.Base):
swagger_client.ProjectReq(name, metadata))
base._assert_status_code(201, status_code)
project_id = base._get_id_from_header(header)
return name, project_id
def get_projects(self, params, **kwargs):
@ -25,10 +24,17 @@ class Project(base.Base):
base._assert_status_code(200, status_code)
return data
def projects_should_exist(self, params, expected_count = None, expected_project_id = None, **kwargs):
project_data = self.get_projects(params, **kwargs)
actual_count = len(project_data)
if expected_count is not None and actual_count!= expected_count:
raise Exception(r"Private project count should be {}.".format(expected_count))
if expected_project_id is not None and actual_count == 1 and str(project_data[0].project_id) != str(expected_project_id):
raise Exception(r"Project-id check failed, expect {} but got {}, please check this test case.".format(str(expected_project_id), str(project_data[0].project_id)))
def check_project_name_exist(self, name=None, **kwargs):
client = self._get_client(**kwargs)
_, status_code, _ = client.projects_head_with_http_info(name)
return {
200: True,
404: False,
@ -39,7 +45,6 @@ class Project(base.Base):
client = self._get_client(**kwargs)
data, status_code, _ = client.projects_project_id_get_with_http_info(project_id)
base._assert_status_code(200, status_code)
return data
def update_project(self, project_id, metadata, **kwargs):
@ -48,17 +53,15 @@ class Project(base.Base):
_, status_code, _ = client.projects_project_id_put_with_http_info(project_id, project)
base._assert_status_code(200, status_code)
def delete_project(self, project_id, **kwargs):
def delete_project(self, project_id, expect_status_code = 200, **kwargs):
client = self._get_client(**kwargs)
_, status_code, _ = client.projects_project_id_delete_with_http_info(project_id)
base._assert_status_code(200, status_code)
base._assert_status_code(expect_status_code, status_code)
def get_project_metadata_by_name(self, project_id, meta_name, **kwargs):
client = self._get_client(**kwargs)
ProjectMetadata = swagger_client.ProjectMetadata()
ProjectMetadata, status_code, _ = client.projects_project_id_metadatas_meta_name_get_with_http_info(project_id, meta_name)
base._assert_status_code(200, status_code)
return {
'public': ProjectMetadata.public,
@ -68,16 +71,14 @@ class Project(base.Base):
'severity': ProjectMetadata.severity,
}.get(meta_name,'error')
def get_project_members(self, project_id, **kwargs):
client = self._get_client(**kwargs)
data = []
data, status_code, _ = client.projects_project_id_members_get_with_http_info(project_id)
print "****************data for get_project_members:", data
base._assert_status_code(200, status_code)
return data
def add_project_members(self, project_id, user_id, member_role_id = None, **kwargs):
def add_project_members(self, project_id, user_id, member_role_id = None, expect_status_code = 201, **kwargs):
if member_role_id is None:
member_role_id = 1
_member_user = {"user_id": int(user_id)}
@ -87,4 +88,3 @@ class Project(base.Base):
data, status_code, _ = client.projects_project_id_members_post_with_http_info(project_id, project_member = projectMember)
base._assert_status_code(201, status_code)
return data

View File

@ -0,0 +1,63 @@
# -*- coding: utf-8 -*-
import time
import base
import swagger_client
from docker_api import DockerAPI
def create_repository(project_name, registry, username, password, image, tag):
_docker_api = DockerAPI()
_docker_api.docker_login(registry, username, password)
time.sleep(2)
_docker_api.docker_image_pull(image, tag)
time.sleep(2)
new_harbor_registry, new_tag = _docker_api.docker_image_tag(image, r'{}/{}/{}'.format(registry, project_name, image))
time.sleep(2)
_docker_api.docker_image_push(new_harbor_registry, new_tag)
return r'{}/{}'.format(project_name, image), new_tag
class Repository(base.Base):
def list_tags(self, repository, **kwargs):
client = self._get_client(**kwargs)
return client.repositories_repo_name_tags_get_with_http_info(repository)
def image_exists(self, repository, tag, **kwargs):
tags = self.list_tags(repository, **kwargs)
exist = False
for t in tags:
if t.name == tag:
exist = True
break
return exist
def image_should_exist(self, repository, tag, **kwargs):
if not self.image_exists(repository, tag, **kwargs):
raise Exception("image %s:%s not exist" % (repository, tag))
def image_should_not_exist(self, repository, tag, **kwargs):
if self.image_exists(repository, tag, **kwargs):
raise Exception("image %s:%s exists" % (repository, tag))
def delete_repoitory(self, repo_name, **kwargs):
client = self._get_client(**kwargs)
_, status_code, _ = client.repositories_repo_name_delete_with_http_info(repo_name)
base._assert_status_code(200, status_code)
def get_repository(self, project_id, **kwargs):
client = self._get_client(**kwargs)
data, status_code, _ = client.repositories_get_with_http_info(project_id)
base._assert_status_code(200, status_code)
return data
def add_label_to_tag(self, repo_name, tag, label_id, expect_status_code = 200, **kwargs):
client = self._get_client(**kwargs)
label = swagger_client.Label(id=label_id)
_, status_code, _ = client.repositories_repo_name_tags_tag_labels_post_with_http_info(repo_name, tag, label)
base._assert_status_code(expect_status_code, status_code)

View File

@ -6,7 +6,7 @@ import swagger_client
class User(base.Base):
def create_user(self, name=None,
email = None, user_password=None,realname = None, **kwargs):
email = None, user_password=None, realname = None, role_id = None, **kwargs):
if name is None:
name = base._random_name("user")
if realname is None:
@ -15,15 +15,24 @@ class User(base.Base):
email = '%s@%s.com' % (realname,"vmware")
if user_password is None:
user_password = "Harbor12345678"
if role_id is None:
role_id = 0
client = self._get_client(**kwargs)
user = swagger_client.User(None, name, email, user_password, realname, None, None, None, None, None, None, None, None, None)
user = swagger_client.User(username = name, email = email, password = user_password, realname = realname, role_id = role_id)
_, status_code, header = client.users_post_with_http_info(user)
base._assert_status_code(201, status_code)
return base._get_id_from_header(header), name
def create_user_success(self, name=None,
email = None, user_password=None, realname = None, role_id = None, **kwargs):
user_id, user_name = self.create_user(name, email, user_password, realname, role_id, **kwargs)
if user_id != None:
return user_id, user_name
else:
raise Exception("user id is not exist, please contact developer to solve this problem")
def get_users(self, username=None, email=None, page=None, page_size=None, **kwargs):
client = self._get_client(**kwargs)
@ -54,10 +63,10 @@ class User(base.Base):
base._assert_status_code(200, status_code)
return data
def delete_user(self, user_id, **kwargs):
def delete_user(self, user_id, expect_status_code = 200, **kwargs):
client = self._get_client(**kwargs)
_, status_code, _ = client.users_user_id_delete_with_http_info(user_id)
base._assert_status_code(200, status_code)
base._assert_status_code(expect_status_code, status_code)
return user_id
def update_user_pwd(self, user_id, new_password=None, old_password=None, **kwargs):

View File

@ -1,5 +1,6 @@
from __future__ import absolute_import
import unittest
from testutils import CLIENT
@ -40,16 +41,14 @@ class TestProjects(unittest.TestCase):
user_001_id, user_001_name = self.user.create_user(user_password = user_001_password, **ADMIN_CLIENT)
self.assertNotEqual(user_001_id, None, msg="Failed to create user, return user is {}".format(user_001_id))
USER_001_CLIENT=dict(endpoint = url, username = user_001_name, password = user_001_password)
#2. Create private project-001
_metadata = {"public": "false"}
project_001_name, project_001_id = self.project.create_project(metadata = _metadata, **ADMIN_CLIENT)
project_001_name, project_001_id = self.project.create_project(metadata = {"public": "false"}, **ADMIN_CLIENT)
self.assertNotEqual(project_001_name, None, msg="Project was created failed, return project name is {} and id is {}.".format(project_001_name, project_001_id))
#3.1 Get private projects of user-001
USER_001_CLIENT=dict(endpoint = url, username = user_001_name, password = user_001_password)
params = {}
params["public"] = False
project_001_data = self.project.get_projects(params, **USER_001_CLIENT)
project_001_data = self.project.get_projects(dict(public=False), **USER_001_CLIENT)
#3.2 Check user-001 has no any private project
self.assertEqual(project_001_data, None, msg="user-001 should has no any private project, but we got {}".format(project_001_data))
@ -60,9 +59,7 @@ class TestProjects(unittest.TestCase):
#5 Get private project of uesr-001, uesr-001 can see only one private project which is project-001
params = {}
params["public"] = False
project_data = self.project.get_projects(params, **USER_001_CLIENT)
project_data = self.project.get_projects(dict(public=False), **USER_001_CLIENT)
self.assertEqual(len(project_data), 1, msg="Private project count should be 1.")
self.assertEqual(str(project_data[0].project_id), str(project_001_id), msg="Project-id check failed, please check this test case.")

View File

@ -0,0 +1,95 @@
from __future__ import absolute_import
import unittest
from testutils import CLIENT
from testutils import harbor_server
from testutils import TEARDOWN
from library.project import Project
from library.user import User
from library.repository import Repository
from library.repository import create_repository
from library.label import Label
class TestProjects(unittest.TestCase):
@classmethod
def setUp(self):
project = Project()
self.project= project
user = User()
self.user= user
repo = Repository()
self.repo= repo
label = Label()
self.label= label
@classmethod
def tearDown(self):
print "Case completed"
@unittest.skipIf(TEARDOWN == False, "Test data should be remain in the harbor.")
def test_ClearData(self):
#1. Delete repository(RA) by user(UA);
self.repo.delete_repoitory(TestProjects.repo_name, **TestProjects.USER_add_g_lbl_CLIENT)
#2. Delete project(PA);
self.project.delete_project(TestProjects.project_add_g_lbl_id, **TestProjects.USER_add_g_lbl_CLIENT)
#3. Delete user(UA);
self.user.delete_user(TestProjects.user_add_g_lbl_id, **TestProjects.ADMIN_CLIENT)
#4. Delete label(LA).
self.label.delete_label(TestProjects.label_id, **TestProjects.ADMIN_CLIENT)
def testAddSysLabelToRepo(self):
"""
Test case:
Delete a repository
Test step & Expectation:
1. Create a new user(UA);
2. Create a new private project(PA) by user(UA);
3. Add user(UA) as a member of project(PA) with project-admin role;
4. Get private project of uesr-001, uesr-001 can see only one private project which is project-001;
5. Create a new repository(RA) and tag(TA) in project(PA) by user(UA);
6. Create a new label(LA) in project(PA) by admin;;
7. Add this system global label to repository(RA)/tag(TA);
Tear down:
1. Delete repository(RA) by user(UA);
2. Delete project(PA);
3. Delete user(UA);
4. Delete label(LA).
"""
admin_user = "admin"
admin_pwd = "Harbor12345"
url = CLIENT["endpoint"]
user_001_password = "Aa123456"
TestProjects.ADMIN_CLIENT=dict(endpoint = url, username = admin_user, password = admin_pwd)
#1. Create user-001
TestProjects.user_add_g_lbl_id, user_add_g_lbl_name = self.user.create_user_success(user_password = user_001_password, **TestProjects.ADMIN_CLIENT)
TestProjects.USER_add_g_lbl_CLIENT=dict(endpoint = url, username = user_add_g_lbl_name, password = user_001_password)
#2. Create private project-001
project_add_g_lbl_name, TestProjects.project_add_g_lbl_id = self.project.create_project(metadata = {"public": "false"}, **TestProjects.ADMIN_CLIENT)
#3. Add user-001 as a member of project-001 with project-admin role
self.project.add_project_members(TestProjects.project_add_g_lbl_id, TestProjects.user_add_g_lbl_id, **TestProjects.ADMIN_CLIENT)
#4. Get private project of uesr-001, uesr-001 can see only one private project which is project-001
self.project.projects_should_exist(dict(public=False), expected_count = 1,
expected_project_id = TestProjects.project_add_g_lbl_id, **TestProjects.USER_add_g_lbl_CLIENT)
#5. Create a new repository(RA) and tag(TA) in project(PA) by user(UA);
TestProjects.repo_name, tag = create_repository(project_add_g_lbl_name, harbor_server, user_add_g_lbl_name, user_001_password, "hello-world", "latest")
#6. Create a new label(LA) in project(PA) by admin;
TestProjects.label_id, _ = self.label.create_label(**TestProjects.ADMIN_CLIENT)
#7. Add this system global label to repository(RA)/tag(TA).
self.repo.add_label_to_tag(TestProjects.repo_name, tag, int(TestProjects.label_id), **TestProjects.USER_add_g_lbl_CLIENT)
if __name__ == '__main__':
unittest.main()

View File

@ -0,0 +1,86 @@
from __future__ import absolute_import
import unittest
from library.base import _assert_status_code
from testutils import CLIENT
from testutils import harbor_server
from testutils import TEARDOWN
from library.project import Project
from library.user import User
from library.repository import Repository
from library.repository import create_repository
class TestProjects(unittest.TestCase):
@classmethod
def setUpClass(self):
project = Project()
self.project= project
user = User()
self.user= user
repo = Repository()
self.repo= repo
@classmethod
def tearDownClass(self):
print "Case completed"
@unittest.skipIf(TEARDOWN == False, "Test data should be remain in the harbor.")
def test_ClearData(self):
#1. Delete project(PA);
self.project.delete_project(TestProjects.project_del_repo_id, **TestProjects.USER_del_repo_CLIENT)
#2. Delete user(UA).
self.user.delete_user(TestProjects.user_del_repo_id, **TestProjects.ADMIN_CLIENT)
def testDelRepo(self):
"""
Test case:
Delete a repository
Test step & Expectation:
1. Create a new user(UA);
2. Create a new project(PA) by user(UA);
3. Create a new repository(RA) in project(PA) by user(UA);
4. Get repository in project(PA), there should be one repository which was created by user(UA);
5. Delete repository(RA) by user(UA);
6. Get repository by user(UA), it should get nothing;
Tear down:
1. Delete project(PA);
2. Delete user(UA).
"""
admin_user = "admin"
admin_pwd = "Harbor12345"
url = CLIENT["endpoint"]
user_del_repo_password = "Aa123456"
TestProjects.ADMIN_CLIENT=dict(endpoint = url, username = admin_user, password = admin_pwd)
#1. Create a new user(UA);
TestProjects.user_del_repo_id, user_del_repo_name = self.user.create_user_success(user_password = user_del_repo_password, **TestProjects.ADMIN_CLIENT)
TestProjects.USER_del_repo_CLIENT=dict(endpoint = url, username = user_del_repo_name, password = user_del_repo_password)
#2. Create a new project(PA) by user(UA);
project_del_repo_name, TestProjects.project_del_repo_id = self.project.create_project(metadata = {"public": "false"}, **TestProjects.USER_del_repo_CLIENT)
#3. Create a new repository(RA) in project(PA) by user(UA);
repo_name, _ = create_repository(project_del_repo_name, harbor_server, 'admin', 'Harbor12345', "hello-world", "latest")
#4. Get repository in project(PA), there should be one repository which was created by user(UA);
repo_data = self.repo.get_repository(TestProjects.project_del_repo_id, **TestProjects.USER_del_repo_CLIENT)
_assert_status_code(repo_name, repo_data[0].name)
#5. Delete repository(RA) by user(UA);
self.repo.delete_repoitory(repo_name, **TestProjects.USER_del_repo_CLIENT)
#6. Get repository by user(UA), it should get nothing;
repo_data = self.repo.get_repository(TestProjects.project_del_repo_id, **TestProjects.USER_del_repo_CLIENT)
_assert_status_code(len(repo_data), 0)
if __name__ == '__main__':
unittest.main()

View File

@ -9,7 +9,8 @@ from pprint import pprint
harbor_server = os.environ["HARBOR_HOST"]
CLIENT=dict(endpoint="https://"+harbor_server+"/api")
USER_ROLE=dict(admin=0,normal=1)
TEARDOWN = True
def GetProductApi(username, password, harbor_server= os.environ["HARBOR_HOST"]):

View File

@ -18,4 +18,9 @@ ${SERVER_API_ENDPOINT} ${SERVER_URL}/api
*** Test Cases ***
Test Case - Add Private Project Member and Check User Can See It
Harbor API Test ./tests/apitests/python/test_add_member_to_private_project.py
Harbor API Test ./tests/apitests/python/test_add_member_to_private_project.py
Test Case - Delete a Repository of a Certain Project Created by Normal User
Harbor API Test ./tests/apitests/python/test_del_repo.py
Test Case - Add a System Global Label to a Certain Tag
Harbor API Test ./tests/apitests/python/test_add_sys_label_to_tag.py