harbor/tests/apitests/python/library/repository.py
danfengliu 7fb9dbd0fa Upgrade docker and containerd
1. Fix E2E quotas issue, push the same image but with different name;
2. Add checkpoint for robot account test;
3. Upgraded docker and containerd in E2E image;
4. Package base image sample(busybox) into E2E image, so in E2E
container, all local docker images can be cleaned up, once base image is needed for
building image, it can be loaded locally;
5. Adapt OIDC service of supporting LDAP user, and add OIDC group user
test;
6. Restart docker deamon before content trust test, both in API and UI
test;
7. Add retry for keyword "Add A Tag Immutability Rule";
8. Fix tag retention test issue, missing click angle icon, and enhance
checkpoint of dry run and real run;
9. Fix schedule test issue for wrong cron string;
10. Disable quotas verification, it's not stable for script defect;

Signed-off-by: danfengliu <danfengl@vmware.com>
2021-02-24 15:43:11 +08:00

171 lines
8.2 KiB
Python

# -*- coding: utf-8 -*-
import time
import base
import swagger_client
import docker_api
from docker_api import DockerAPI
from swagger_client.rest import ApiException
from testutils import DOCKER_USER, DOCKER_PWD
def pull_harbor_image(registry, username, password, image, tag, expected_login_error_message = None, expected_error_message = None):
_docker_api = DockerAPI()
_docker_api.docker_login(registry, username, password, expected_error_message = expected_login_error_message)
if expected_login_error_message != None:
return
time.sleep(2)
_docker_api.docker_image_pull(r'{}/{}'.format(registry, image), tag = tag, expected_error_message = expected_error_message)
def push_image_to_project(project_name, registry, username, password, image, tag, expected_login_error_message = None, expected_error_message = None, profix_for_image = None, new_image=None):
print("Start to push image {}/{}/{}:{}".format(registry, project_name, image, tag) )
_docker_api = DockerAPI()
_docker_api.docker_login("docker", DOCKER_USER, DOCKER_PWD)
_docker_api.docker_image_pull(image, tag = tag, is_clean_all_img = False)
_docker_api.docker_login(registry, username, password, expected_error_message = expected_login_error_message)
time.sleep(2)
if expected_login_error_message != None:
return
time.sleep(2)
original_name = image
image = new_image or image
if profix_for_image == None:
target_image = r'{}/{}/{}'.format(registry, project_name, image)
else:
target_image = r'{}/{}/{}/{}'.format(registry, project_name, profix_for_image, image)
new_harbor_registry, new_tag = _docker_api.docker_image_tag(r'{}:{}'.format(original_name, tag), target_image, tag = tag)
time.sleep(2)
_docker_api.docker_image_push(new_harbor_registry, new_tag, expected_error_message = expected_error_message)
docker_api.docker_image_clean_all()
return r'{}/{}'.format(project_name, image), new_tag
def push_self_build_image_to_project(project_name, registry, username, password, image, tag, size=2, expected_login_error_message = None, expected_error_message = None):
_docker_api = DockerAPI()
_docker_api.docker_login(registry, username, password, expected_error_message = expected_login_error_message)
time.sleep(2)
if expected_login_error_message in [None, ""]:
push_special_image_to_project(project_name, registry, username, password, image, tags=[tag], size=size, expected_login_error_message = expected_login_error_message, expected_error_message = expected_error_message)
return r'{}/{}'.format(project_name, image), tag
def push_special_image_to_project(project_name, registry, username, password, image, tags=None, size=1, expected_login_error_message=None, expected_error_message = None):
_docker_api = DockerAPI()
_docker_api.docker_login(registry, username, password, expected_error_message = expected_login_error_message)
time.sleep(2)
if expected_login_error_message in [None, ""]:
return _docker_api.docker_image_build(r'{}/{}/{}'.format(registry, project_name, image), tags = tags, size=int(size), expected_error_message=expected_error_message)
class Repository(base.Base, object):
def __init__(self):
super(Repository,self).__init__(api_type = "repository")
def list_tags(self, repository, **kwargs):
client = self._get_client(**kwargs)
return client.repositories_repo_name_tags_get(repository)
def get_tag(self, repo_name, tag, **kwargs):
client = self._get_client(**kwargs)
return client.repositories_repo_name_tags_tag_get(repo_name, tag)
def image_exists(self, repository, tag, **kwargs):
tags = self.list_tags(repository, **kwargs)
exist = False
for t in tags:
if t.name == tag:
exist = True
break
return exist
def image_should_exist(self, repository, tag, **kwargs):
if not self.image_exists(repository, tag, **kwargs):
raise Exception("image %s:%s not exist" % (repository, tag))
def image_should_not_exist(self, repository, tag, **kwargs):
if self.image_exists(repository, tag, **kwargs):
raise Exception("image %s:%s exists" % (repository, tag))
def delete_repository(self, project_name, repo_name, expect_status_code = 200, expect_response_body = None, **kwargs):
client = self._get_client(**kwargs)
try:
_, status_code, _ = client.delete_repository_with_http_info(project_name, repo_name)
except Exception as e:
base._assert_status_code(expect_status_code, e.status)
if expect_response_body is not None:
base._assert_status_body(expect_response_body, e.body)
return
else:
base._assert_status_code(expect_status_code, status_code)
base._assert_status_code(200, status_code)
def list_repositories(self, project_name, **kwargs):
client = self._get_client(**kwargs)
data, status_code, _ = client.list_repositories_with_http_info(project_name)
base._assert_status_code(200, status_code)
return data
def clear_repositories(self, project_name, **kwargs):
repos = self.list_repositories(project_name, **kwargs)
print("Repos to be cleared:", repos)
for repo in repos:
self.delete_repository(project_name, repo.name.split('/')[1])
def get_repository(self, project_name, repo_name, **kwargs):
client = self._get_client(**kwargs)
data, status_code, _ = client.get_repository_with_http_info(project_name, repo_name)
base._assert_status_code(200, status_code)
return data
def add_label_to_tag(self, repo_name, tag, label_id, expect_status_code = 200, **kwargs):
client = self._get_client(**kwargs)
label = swagger_client.Label(id=label_id)
_, status_code, _ = client.repositories_repo_name_tags_tag_labels_post_with_http_info(repo_name, tag, label)
base._assert_status_code(expect_status_code, status_code)
def get_repo_signatures(self, repo_name, expect_status_code = 200, **kwargs):
client = self._get_client(**kwargs)
data, status_code, _ = client.repositories_repo_name_signatures_get_with_http_info(repo_name)
base._assert_status_code(expect_status_code, status_code)
return data
def check_image_not_scanned(self, repo_name, tag, **kwargs):
tag = self.get_tag(repo_name, tag, **kwargs)
if tag.scan_overview != None:
raise Exception("Image should be <Not Scanned> state!")
def repository_should_exist(self, project_Name, repo_name, **kwargs):
repositories = self.list_repositories(project_Name, **kwargs)
if repo_name not in repositories:
raise Exception("Repository {} is not exist.".format(repo_name))
def check_repository_exist(self, project_Name, repo_name, **kwargs):
repositories = self.list_repositories(project_Name, **kwargs)
for repo in repositories:
if repo.name == project_Name+"/"+repo_name:
return True
return False
def signature_should_exist(self, repo_name, tag, **kwargs):
signatures = self.get_repo_signatures(repo_name, **kwargs)
for each_sign in signatures:
if each_sign.tag == tag and len(each_sign.hashes["sha256"]) == 44:
return
raise Exception(r"Signature of {}:{} is not exist!".format(repo_name, tag))
def retag_image(self, repo_name, tag, src_image, override=True, expect_status_code = 200, expect_response_body = None, **kwargs):
client = self._get_client(**kwargs)
request = swagger_client.RetagReq(tag=tag, src_image=src_image, override=override)
try:
data, status_code, _ = client.repositories_repo_name_tags_post_with_http_info(repo_name, request)
except ApiException as e:
base._assert_status_code(expect_status_code, e.status)
if expect_response_body is not None:
base._assert_status_body(expect_response_body, e.body)
return
base._assert_status_code(expect_status_code, status_code)
base._assert_status_code(200, status_code)
return data