mirror of
https://github.com/goharbor/harbor.git
synced 2024-11-22 18:25:56 +01:00
add test case of add a replication rule
Signed-off-by: danfengliu <danfengl@vmware.com>
This commit is contained in:
parent
55c609b1a7
commit
660bfdf677
@ -11,6 +11,7 @@ class Replication(base.Base):
|
||||
filters=[], trigger=swagger_client.RepTrigger(kind="Manual"),
|
||||
replicate_deletion=True,
|
||||
replicate_existing_image_now=False,
|
||||
expect_status_code = 201,
|
||||
**kwargs):
|
||||
projects = []
|
||||
for projectID in projectIDList:
|
||||
@ -25,9 +26,32 @@ class Replication(base.Base):
|
||||
projects=projects, targets=targets, filters=filters,
|
||||
trigger=trigger, replicate_deletion=replicate_deletion,
|
||||
replicate_existing_image_now=replicate_existing_image_now)
|
||||
_, _, header = client.policies_replication_post_with_http_info(policy)
|
||||
_, status_code, header = client.policies_replication_post_with_http_info(policy)
|
||||
base._assert_status_code(expect_status_code, status_code)
|
||||
return base._get_id_from_header(header), name
|
||||
|
||||
def get_replication_rule(self, param = dict(), rule_id = None, expect_status_code = 200, **kwargs):
|
||||
client = self._get_client(**kwargs)
|
||||
if rule_id is None:
|
||||
data, status_code, _ = client.policies_replication_get_with_http_info(param)
|
||||
else:
|
||||
data, status_code, _ = client.policies_replication_id_get_with_http_info(rule_id)
|
||||
base._assert_status_code(expect_status_code, status_code)
|
||||
return data
|
||||
|
||||
def check_replication_rule_should_exist(self, check_rule_id, expect_rule_name, expect_trigger = None, **kwargs):
|
||||
rule_data = self.get_replication_rule(rule_id = check_rule_id, **kwargs)
|
||||
if str(rule_data.name) != str(expect_rule_name):
|
||||
raise Exception(r"Check replication rule failed, expect <{}> actual <{}>.".format(expect_rule_name, str(rule_data.name)))
|
||||
else:
|
||||
print r"Check Replication rule passed, rule name <{}>.".format(str(rule_data.name))
|
||||
get_trigger = str(rule_data.trigger.kind)
|
||||
if expect_trigger is not None and get_trigger == str(expect_trigger):
|
||||
print r"Check Replication rule trigger passed, trigger name <{}>.".format(get_trigger)
|
||||
else:
|
||||
raise Exception(r"Check replication rule trigger failed, expect <{}> actual <{}>.".format(expect_trigger, get_trigger))
|
||||
|
||||
|
||||
def start_replication(self, rule_id, **kwargs):
|
||||
client = self._get_client(**kwargs)
|
||||
return client.replications_post(swagger_client.Replication(int(rule_id)))
|
||||
@ -49,3 +73,8 @@ class Replication(base.Base):
|
||||
time.sleep(interval)
|
||||
if not finished:
|
||||
raise Exception("The jobs not finished")
|
||||
|
||||
def delete_replication_rule(self, rule_id, expect_status_code = 200, **kwargs):
|
||||
client = self._get_client(**kwargs)
|
||||
_, status_code, _ = client.policies_replication_id_delete_with_http_info(rule_id)
|
||||
base._assert_status_code(expect_status_code, status_code)
|
||||
|
99
tests/apitests/python/test_add_replication_rule.py
Normal file
99
tests/apitests/python/test_add_replication_rule.py
Normal file
@ -0,0 +1,99 @@
|
||||
from __future__ import absolute_import
|
||||
import unittest
|
||||
|
||||
from testutils import CLIENT
|
||||
from testutils import harbor_server
|
||||
from testutils import TEARDOWN
|
||||
from library.project import Project
|
||||
from library.user import User
|
||||
from library.replication import Replication
|
||||
from library.target import Target
|
||||
import swagger_client
|
||||
|
||||
class TestProjects(unittest.TestCase):
|
||||
@classmethod
|
||||
def setUp(self):
|
||||
project = Project()
|
||||
self.project= project
|
||||
|
||||
user = User()
|
||||
self.user= user
|
||||
|
||||
replication = Replication()
|
||||
self.replication= replication
|
||||
|
||||
target = Target()
|
||||
self.target= target
|
||||
|
||||
@classmethod
|
||||
def tearDown(self):
|
||||
print "Case completed"
|
||||
|
||||
@unittest.skipIf(TEARDOWN == False, "Test data should be remain in the harbor.")
|
||||
def test_ClearData(self):
|
||||
#1. Delete rule(RA);
|
||||
for i in range(len(TestProjects.rule_id_list)):
|
||||
self.replication.delete_replication_rule(TestProjects.rule_id_list[i], **TestProjects.ADMIN_CLIENT)
|
||||
|
||||
#2. Delete target(TA);
|
||||
self.target.delete_target(TestProjects.target_id, **TestProjects.ADMIN_CLIENT)
|
||||
|
||||
#3. Delete project(PA);
|
||||
self.project.delete_project(TestProjects.project_add_rule_id, **TestProjects.USER_add_rule_CLIENT)
|
||||
|
||||
#4. Delete user(UA);
|
||||
self.user.delete_user(TestProjects.user_add_rule_id, **TestProjects.ADMIN_CLIENT)
|
||||
|
||||
def testAddSysLabelToRepo(self):
|
||||
"""
|
||||
Test case:
|
||||
Delete a repository
|
||||
Test step & Expectation:
|
||||
1. Create a new user(UA);
|
||||
2. Create a new private project(PA) by user(UA);
|
||||
3. Create a new target(TA)/registry;
|
||||
4. Create a new rule for project(PA) and target(TA);
|
||||
5. Check if rule is exist.
|
||||
Tear down:
|
||||
1. Delete rule(RA);
|
||||
2. Delete targe(TA);
|
||||
3. Delete project(PA);
|
||||
4. Delete user(UA).
|
||||
"""
|
||||
admin_user = "admin"
|
||||
admin_pwd = "Harbor12345"
|
||||
url = CLIENT["endpoint"]
|
||||
user_add_rule_password = "Aa123456"
|
||||
TestProjects.ADMIN_CLIENT=dict(endpoint = url, username = admin_user, password = admin_pwd)
|
||||
|
||||
#1. Create user(UA)
|
||||
TestProjects.user_add_rule_id, user_add_rule_name = self.user.create_user_success(user_password = user_add_rule_password, **TestProjects.ADMIN_CLIENT)
|
||||
|
||||
TestProjects.USER_add_rule_CLIENT=dict(endpoint = url, username = user_add_rule_name, password = user_add_rule_password)
|
||||
|
||||
#2.1. Create private project(PA) by user(UA)
|
||||
project_add_rule_name, TestProjects.project_add_rule_id = self.project.create_project(metadata = {"public": "false"}, **TestProjects.USER_add_rule_CLIENT)
|
||||
|
||||
#2.2. Get private project of uesr-001, uesr-001 can see only one private project which is project-001
|
||||
self.project.projects_should_exist(dict(public=False), expected_count = 1,
|
||||
expected_project_id = TestProjects.project_add_rule_id, **TestProjects.USER_add_rule_CLIENT)
|
||||
|
||||
#3. Create a new target(TA)/registry
|
||||
TestProjects.target_id, _ = self.target.create_target(**TestProjects.ADMIN_CLIENT)
|
||||
print "TestProjects.target_id:", TestProjects.target_id
|
||||
|
||||
TestProjects.rule_id_list = []
|
||||
|
||||
trigger_values_to_set = ["Manual", "Immediate"]
|
||||
for i in range(len(trigger_values_to_set)):
|
||||
#4. Create a new rule for project(PA) and target(TA)
|
||||
rule_id, rule_name = self.replication.create_replication_rule([TestProjects.project_add_rule_id],
|
||||
[TestProjects.target_id], trigger=swagger_client.RepTrigger(kind=trigger_values_to_set[i]), **TestProjects.ADMIN_CLIENT)
|
||||
TestProjects.rule_id_list.append(rule_id)
|
||||
|
||||
#5. Check rule should be exist
|
||||
self.replication.check_replication_rule_should_exist(rule_id, rule_name, expect_trigger = trigger_values_to_set[i], **TestProjects.ADMIN_CLIENT)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
Loading…
Reference in New Issue
Block a user