harbor/tests/apitests/python/library/replication.py

85 lines
3.8 KiB
Python
Raw Normal View History

# -*- coding: utf-8 -*-
import sys
import time
import base
import swagger_client
class Replication(base.Base):
def create_replication_rule(self,
projectIDList, targetIDList, name=None, desc="",
filters=[], trigger=swagger_client.RepTrigger(kind="Manual"),
replicate_deletion=True,
replicate_existing_image_now=False,
expect_status_code = 201,
**kwargs):
if name is None:
name = base._random_name("rule")
projects = []
for projectID in projectIDList:
projects.append(swagger_client.Project(project_id=int(projectID)))
targets = []
for targetID in targetIDList:
targets.append(swagger_client.RepTarget(id=int(targetID)))
for filter in filters:
filter["value"] = int(filter["value"])
client = self._get_client(**kwargs)
policy = swagger_client.RepPolicy(name=name, description=desc,
projects=projects, targets=targets, filters=filters,
trigger=trigger, replicate_deletion=replicate_deletion,
replicate_existing_image_now=replicate_existing_image_now)
_, status_code, header = client.policies_replication_post_with_http_info(policy)
base._assert_status_code(expect_status_code, status_code)
return base._get_id_from_header(header), name
def get_replication_rule(self, param = None, rule_id = None, expect_status_code = 200, **kwargs):
client = self._get_client(**kwargs)
if rule_id is None:
if param is None:
param = dict()
data, status_code, _ = client.policies_replication_get_with_http_info(param)
else:
data, status_code, _ = client.policies_replication_id_get_with_http_info(rule_id)
base._assert_status_code(expect_status_code, status_code)
return data
def check_replication_rule_should_exist(self, check_rule_id, expect_rule_name, expect_trigger = None, **kwargs):
rule_data = self.get_replication_rule(rule_id = check_rule_id, **kwargs)
if str(rule_data.name) != str(expect_rule_name):
raise Exception(r"Check replication rule failed, expect <{}> actual <{}>.".format(expect_rule_name, str(rule_data.name)))
else:
print r"Check Replication rule passed, rule name <{}>.".format(str(rule_data.name))
get_trigger = str(rule_data.trigger.kind)
if expect_trigger is not None and get_trigger == str(expect_trigger):
print r"Check Replication rule trigger passed, trigger name <{}>.".format(get_trigger)
else:
raise Exception(r"Check replication rule trigger failed, expect <{}> actual <{}>.".format(expect_trigger, get_trigger))
def start_replication(self, rule_id, **kwargs):
client = self._get_client(**kwargs)
return client.replications_post(swagger_client.Replication(int(rule_id)))
def list_replication_jobs(self, rule_id, **kwargs):
client = self._get_client(**kwargs)
return client.jobs_replication_get(int(rule_id))
def wait_until_jobs_finish(self, rule_id, retry=10, interval=5, **kwargs):
finished = True
for i in range(retry):
finished = True
jobs = self.list_replication_jobs(rule_id, **kwargs)
for job in jobs:
if job.status != "finished":
finished = False
break
if not finished:
time.sleep(interval)
if not finished:
raise Exception("The jobs not finished")
def delete_replication_rule(self, rule_id, expect_status_code = 200, **kwargs):
client = self._get_client(**kwargs)
_, status_code, _ = client.policies_replication_id_delete_with_http_info(rule_id)
base._assert_status_code(expect_status_code, status_code)