harbor/tests/apitests/python/library/repository.py
danfengliu 0e752d8cf8 Add docker credential for harbor 1.10.0 and fix docker api issue
1. Move LDAP API test out of robot script, LDAP API test will be triggered in Jenkins job independently;
2. Docker api returned from function before analyze the return message for expected error.

Signed-off-by: danfengliu <danfengl@vmware.com>
2021-01-11 14:06:25 +08:00

159 lines
7.3 KiB
Python

# -*- coding: utf-8 -*-
import time
import base
import swagger_client
from docker_api import DockerAPI
from swagger_client.rest import ApiException
def pull_harbor_image(registry, username, password, image, tag, expected_login_error_message = None, expected_error_message = None):
_docker_api = DockerAPI()
_docker_api.docker_login(registry, username, password, expected_error_message = expected_login_error_message)
if expected_login_error_message != None:
return
time.sleep(2)
ret = _docker_api.docker_image_pull(r'{}/{}'.format(registry, image), tag = tag, expected_error_message = expected_error_message)
def push_image_to_project(project_name, registry, username, password, image, tag, expected_login_error_message = None, expected_error_message = None):
_docker_api = DockerAPI()
_docker_api.docker_login(registry, username, password, expected_error_message = expected_login_error_message)
time.sleep(2)
if expected_login_error_message != None:
return
ret = _docker_api.docker_image_pull(image, tag = tag)
print("docker image pull ret:", ret)
time.sleep(2)
new_harbor_registry, new_tag = _docker_api.docker_image_tag(r'{}:{}'.format(image, tag), r'{}/{}/{}'.format(registry, project_name, image))
time.sleep(2)
_docker_api.docker_image_push(new_harbor_registry, new_tag, expected_error_message = expected_error_message)
return r'{}/{}'.format(project_name, image), new_tag
def push_special_image_to_project(project_name, registry, username, password, image, tags=None, size=1, expected_login_error_message=None, expected_error_message = None):
_docker_api = DockerAPI()
_docker_api.docker_login(registry, username, password, expected_error_message = expected_login_error_message)
time.sleep(2)
if expected_login_error_message != None:
return
_docker_api.docker_image_build(r'{}/{}/{}'.format(registry, project_name, image), tags = tags, size=size, expected_error_message=expected_error_message)
def is_repo_exist_in_project(repositories, repo_name):
result = False
for reop in repositories:
if reop.name == repo_name:
return True
return result
class Repository(base.Base):
def list_tags(self, repository, **kwargs):
client = self._get_client(**kwargs)
return client.repositories_repo_name_tags_get(repository)
def get_tag(self, repo_name, tag, **kwargs):
client = self._get_client(**kwargs)
return client.repositories_repo_name_tags_tag_get(repo_name, tag)
def image_exists(self, repository, tag, **kwargs):
tags = self.list_tags(repository, **kwargs)
exist = False
for t in tags:
if t.name == tag:
exist = True
break
return exist
def image_should_exist(self, repository, tag, **kwargs):
if not self.image_exists(repository, tag, **kwargs):
raise Exception("image %s:%s not exist" % (repository, tag))
def image_should_not_exist(self, repository, tag, **kwargs):
if self.image_exists(repository, tag, **kwargs):
raise Exception("image %s:%s exists" % (repository, tag))
def delete_repoitory(self, repo_name, **kwargs):
client = self._get_client(**kwargs)
_, status_code, _ = client.repositories_repo_name_delete_with_http_info(repo_name)
base._assert_status_code(200, status_code)
def get_repository(self, project_id, **kwargs):
client = self._get_client(**kwargs)
data, status_code, _ = client.repositories_get_with_http_info(project_id)
base._assert_status_code(200, status_code)
return data
def add_label_to_tag(self, repo_name, tag, label_id, expect_status_code = 200, **kwargs):
client = self._get_client(**kwargs)
label = swagger_client.Label(id=label_id)
_, status_code, _ = client.repositories_repo_name_tags_tag_labels_post_with_http_info(repo_name, tag, label)
base._assert_status_code(expect_status_code, status_code)
def get_repo_signatures(self, repo_name, expect_status_code = 200, **kwargs):
client = self._get_client(**kwargs)
data, status_code, _ = client.repositories_repo_name_signatures_get_with_http_info(repo_name)
base._assert_status_code(expect_status_code, status_code)
return data
def check_image_not_scanned(self, repo_name, tag, **kwargs):
tag = self.get_tag(repo_name, tag, **kwargs)
if tag.scan_overview != None:
raise Exception("Image should be <Not Scanned> state!")
def check_image_scan_result(self, repo_name, tag, expected_scan_status = "Success", **kwargs):
timeout_count = 30
while True:
time.sleep(5)
timeout_count = timeout_count - 1
if (timeout_count == 0):
break
_tag = self.get_tag(repo_name, tag, **kwargs)
if _tag.name == tag and _tag.scan_overview != None:
for report in _tag.scan_overview.values():
if report.scan_status == expected_scan_status:
return
raise Exception("Scan image result is not as expected {}.".format(expected_scan_status))
def scan_image(self, repo_name, tag, expect_status_code = 202, **kwargs):
client = self._get_client(**kwargs)
data, status_code, _ = client.repositories_repo_name_tags_tag_scan_post_with_http_info(repo_name, tag)
base._assert_status_code(expect_status_code, status_code)
return data
def repository_should_exist(self, project_id, repo_name, **kwargs):
repositories = self.get_repository(project_id, **kwargs)
print("repositories:", repositories)
if is_repo_exist_in_project(repositories, repo_name) == False:
raise Exception("Repository {} is not exist.".format(repo_name))
def repository_should_not_exist(self, project_id, repo_name, **kwargs):
repositories = self.get_repository(project_id, **kwargs)
print("repositories:", repositories)
if is_repo_exist_in_project(repositories, repo_name) == True:
raise Exception("Repository {} should not exist.".format(repo_name))
def signature_should_exist(self, repo_name, tag, **kwargs):
signatures = self.get_repo_signatures(repo_name, **kwargs)
for each_sign in signatures:
if each_sign.tag == tag and len(each_sign.hashes["sha256"]) == 44:
print("sha256:", len(each_sign.hashes["sha256"]))
return
raise Exception(r"Signature of {}:{} is not exist!".format(repo_name, tag))
def retag_image(self, repo_name, tag, src_image, override=True, expect_status_code = 200, expect_response_body = None, **kwargs):
client = self._get_client(**kwargs)
request = swagger_client.RetagReq(tag=tag, src_image=src_image, override=override)
try:
data, status_code, _ = client.repositories_repo_name_tags_post_with_http_info(repo_name, request)
except ApiException as e:
base._assert_status_code(expect_status_code, e.status)
if expect_response_body is not None:
base._assert_status_body(expect_response_body, e.body)
return
base._assert_status_code(expect_status_code, status_code)
base._assert_status_code(200, status_code)
return data