Add API test case of pushing index by docker manifest

Signed-off-by: danfengliu <danfengl@vmware.com>
This commit is contained in:
danfengliu 2020-03-01 17:57:40 +08:00
parent 5f2544941e
commit 2d6e18a895
7 changed files with 159 additions and 3 deletions

View File

@ -2,6 +2,7 @@
import sys import sys
import time import time
import subprocess
import swagger_client import swagger_client
import v2_swagger_client import v2_swagger_client
try: try:
@ -64,6 +65,16 @@ def _get_string_from_unicode(udata):
result = result + tmp.strip('\n\r\t') result = result + tmp.strip('\n\r\t')
return result return result
def run_command(command):
print "Command: ", subprocess.list2cmdline(command)
try:
output = subprocess.check_output(command,
stderr=subprocess.STDOUT,
universal_newlines=True)
except subprocess.CalledProcessError as e:
raise Exception('Error: Exited with error code: %s. Output:%s'% (e.returncode, e.output))
return output
class Base: class Base:
def __init__(self, def __init__(self,
server = Server(endpoint="http://localhost:8080/api", verify_ssl=False), server = Server(endpoint="http://localhost:8080/api", verify_ssl=False),

View File

@ -1,6 +1,7 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import base import base
import subprocess
try: try:
import docker import docker
@ -9,6 +10,39 @@ except ImportError:
pip.main(['install', 'docker']) pip.main(['install', 'docker'])
import docker import docker
def docker_login(harbor_host, user, password):
command = ["sudo", "docker", "login", harbor_host, "-u", user, "-p", password]
print "Docker Login Command: ", command
base.run_command(command)
try:
ret = subprocess.check_output(["./tests/apitests/python/update_docker_cfg.sh"], shell=False)
except subprocess.CalledProcessError, exc:
raise Exception("Failed to update docker config, error is {} {}.".format(exc.returncode, exc.output))
def docker_manifest_create(index, manifests):
command = ["sudo", "docker","manifest","create",index]
command.extend(manifests)
print "Docker Manifest Command: ", command
base.run_command(command)
def docker_manifest_push(index):
command = ["sudo", "docker","manifest","push",index]
print "Docker Manifest Command: ", command
ret = base.run_command(command)
index_sha256=""
manifest_list=[]
for line in ret.split("\n"):
if line[:7] == "sha256:":
index_sha256 = line
if line.find('Pushed ref') == 0:
manifest_list.append(line[-71:])
return index_sha256, manifest_list
def docker_manifest_push_to_harbor(index, manifests, harbor_server, user, password):
docker_login(harbor_server, user, password)
docker_manifest_create(index, manifests)
return docker_manifest_push(index)
class DockerAPI(object): class DockerAPI(object):
def __init__(self): def __init__(self):
self.DCLIENT = docker.APIClient(base_url='unix://var/run/docker.sock',version='auto',timeout=30) self.DCLIENT = docker.APIClient(base_url='unix://var/run/docker.sock',version='auto',timeout=30)
@ -38,6 +72,7 @@ class DockerAPI(object):
ret = "" ret = ""
try: try:
ret = base._get_string_from_unicode(self.DCLIENT.pull(r'{}:{}'.format(image, _tag))) ret = base._get_string_from_unicode(self.DCLIENT.pull(r'{}:{}'.format(image, _tag)))
return ret
except Exception, err: except Exception, err:
caught_err = True caught_err = True
if expected_error_message is not None: if expected_error_message is not None:
@ -71,6 +106,7 @@ class DockerAPI(object):
expected_error_message = None expected_error_message = None
try: try:
ret = base._get_string_from_unicode(self.DCLIENT.push(harbor_registry, tag, stream=True)) ret = base._get_string_from_unicode(self.DCLIENT.push(harbor_registry, tag, stream=True))
return ret
except Exception, err: except Exception, err:
caught_err = True caught_err = True
if expected_error_message is not None: if expected_error_message is not None:
@ -129,3 +165,5 @@ class DockerAPI(object):
else: else:
if str(ret).lower().find("errorDetail".lower()) >= 0: if str(ret).lower().find("errorDetail".lower()) >= 0:
raise Exception(r" It's was not suppose to catch error when push image {}, return message is [{}]".format (harbor_registry, ret)) raise Exception(r" It's was not suppose to catch error when push image {}, return message is [{}]".format (harbor_registry, ret))

View File

@ -12,7 +12,8 @@ def pull_harbor_image(registry, username, password, image, tag, expected_login_e
if expected_login_error_message != None: if expected_login_error_message != None:
return return
time.sleep(2) time.sleep(2)
_docker_api.docker_image_pull(r'{}/{}'.format(registry, image), tag = tag, expected_error_message = expected_error_message) ret = _docker_api.docker_image_pull(r'{}/{}'.format(registry, image), tag = tag, expected_error_message = expected_error_message)
print ret
def push_image_to_project(project_name, registry, username, password, image, tag, expected_login_error_message = None, expected_error_message = None): def push_image_to_project(project_name, registry, username, password, image, tag, expected_login_error_message = None, expected_error_message = None):
_docker_api = DockerAPI() _docker_api = DockerAPI()

View File

@ -0,0 +1,99 @@
from __future__ import absolute_import
import unittest
import library.repository
import library.docker_api
from library.base import _assert_status_code
from testutils import ADMIN_CLIENT
from testutils import harbor_server
from testutils import TEARDOWN
from library.project import Project
from library.user import User
from library.repository import Repository
from library.artifact import Artifact
from library.repository import push_image_to_project
from library.repository import pull_harbor_image
class TestProjects(unittest.TestCase):
@classmethod
def setUpClass(self):
self.project= Project()
self.user= User()
self.artifact = Artifact(api_type='artifact')
self.repo= Repository(api_type='repository')
self.url = ADMIN_CLIENT["endpoint"]
self.user_push_index_password = "Aa123456"
self.index_name = "ci_test_index"
self.index_tag = "test_tag"
self.image_a = "alpine"
self.image_b = "busybox"
@classmethod
def tearDownClass(self):
print "Case completed"
@unittest.skipIf(TEARDOWN == False, "Test data won't be erased.")
def test_ClearData(self):
#1. Delete repository(RA,RB,IA) by user(UA);
self.repo.delete_repoitory(TestProjects.project_push_index_name, self.index_name, **TestProjects.USER_CLIENT)
self.repo.delete_repoitory(TestProjects.project_push_index_name, self.image_a, **TestProjects.USER_CLIENT)
self.repo.delete_repoitory(TestProjects.project_push_index_name, self.image_b, **TestProjects.USER_CLIENT)
#2. Delete project(PA);
self.project.delete_project(TestProjects.project_push_index_id, **TestProjects.USER_CLIENT)
#3. Delete user(UA).
self.user.delete_user(TestProjects.user_id, **ADMIN_CLIENT)
def testAddIndexByDockerManifest(self):
"""
Test case:
Push Index By Docker Manifest
Test step and expected result:
1. Create a new user(UA);
2. Create a new project(PA) by user(UA);
3. Create 2 new repositorys(RA,RB) in project(PA) by user(UA);
4. Push an index(IA) to Harbor by docker manifest CLI successfully;
5. Get index(IA) from Harbor successfully;
6. Verify harbor index is index(IA) pushed by docker manifest CLI;
7. Verify harbor index(IA) can be pulled by docker CLI successfully.
Tear down:
1. Delete repository(RA,RB,IA) by user(UA);
2. Delete project(PA);
3. Delete user(UA).
"""
#1. Create a new user(UA);
TestProjects.user_id, user_name = self.user.create_user(user_password = self.user_push_index_password, **ADMIN_CLIENT)
TestProjects.USER_CLIENT=dict(endpoint = self.url, username = user_name, password = self.user_push_index_password)
#2. Create a new project(PA) by user(UA);
TestProjects.project_push_index_id, TestProjects.project_push_index_name = self.project.create_project(metadata = {"public": "false"}, **TestProjects.USER_CLIENT)
#3. Create 2 new repositorys(RA,RB) in project(PA) by user(UA);
repo_name_a, tag_a = push_image_to_project(TestProjects.project_push_index_name, harbor_server, 'admin', 'Harbor12345', self.image_a, "latest")
repo_name_b, tag_b = push_image_to_project(TestProjects.project_push_index_name, harbor_server, 'admin', 'Harbor12345', self.image_b, "latest")
#4. Push an index(IA) to Harbor by docker manifest CLI successfully;
manifests = [harbor_server+"/"+repo_name_a+":"+tag_a, harbor_server+"/"+repo_name_b+":"+tag_b]
index = harbor_server+"/"+TestProjects.project_push_index_name+"/"+self.index_name+":"+self.index_tag
index_sha256_cli_ret, manifests_sha256_cli_ret = library.docker_api.docker_manifest_push_to_harbor(index, manifests, harbor_server, user_name, self.user_push_index_password)
#5. Get index(IA) from Harbor successfully;
index_data = self.artifact.get_reference_info(TestProjects.project_push_index_name, self.index_name, self.index_tag, **TestProjects.USER_CLIENT)
manifests_sha256_harbor_ret = [index_data[0].references[1].child_digest, index_data[0].references[0].child_digest]
#6. Verify harbor index is index(IA) pushed by docker manifest CLI;
self.assertEqual(index_data[0].digest, index_sha256_cli_ret)
self.assertEqual(manifests_sha256_harbor_ret.count(manifests_sha256_cli_ret[0]), 1)
self.assertEqual(manifests_sha256_harbor_ret.count(manifests_sha256_cli_ret[1]), 1)
#7. Verify harbor index(IA) can be pulled by docker CLI successfully;
pull_harbor_image(harbor_server, user_name, self.user_push_index_password, TestProjects.project_push_index_name+"/"+self.index_name, self.index_tag)
if __name__ == '__main__':
unittest.main()

View File

@ -0,0 +1,5 @@
#!/bin/sh
sudo sed -i '$d' /$HOME/.docker/config.json
sudo sed -i '$d' /$HOME/.docker/config.json
sudo echo -e "\n },\n \"experimental\": \"enabled\"\n}" >> /$HOME/.docker/config.json

View File

@ -68,4 +68,6 @@ Test Case - Tag Retention
Harbor API Test ./tests/apitests/python/test_retention.py Harbor API Test ./tests/apitests/python/test_retention.py
Test Case - Health Check Test Case - Health Check
Harbor API Test ./tests/apitests/python/test_health_check.py Harbor API Test ./tests/apitests/python/test_health_check.py
Test Case - Push Index By Docker Manifest
Harbor API Test ./tests/apitests/python/test_push_index_by_docker_manifest.py

View File

@ -18,5 +18,5 @@ Test Case - LDAP Group Admin Role
Test Case - LDAP Group User Group Test Case - LDAP Group User Group
Harbor API Test ./tests/apitests/python/test_user_group.py Harbor API Test ./tests/apitests/python/test_user_group.py
### Test Case - Run LDAP Group Related API Test Test Case - Run LDAP Group Related API Test
### Harbor API Test ./tests/apitests/python/test_assign_role_to_ldap_group.py Harbor API Test ./tests/apitests/python/test_assign_role_to_ldap_group.py