Add API test case scan all images and test case API test case list helm charts. (#6388)

Signed-off-by: danfengliu <danfengl@vmware.com>
This commit is contained in:
danfengliu 2018-12-03 17:05:06 +08:00 committed by GitHub
parent ec32603917
commit b4c7663f5f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 227 additions and 32 deletions

View File

@ -1333,6 +1333,8 @@ paths:
in: query
type: integer
description: When this parm is set only the images under the project identified by the project_id will be scanned.
tags:
- Products
responses:
'202':
description: The action is successully taken in the background. If some images are failed to scan it will only be reflected in the job status.

View File

@ -0,0 +1,26 @@
import base
class Chart(base.Base):
def upload_chart(self, repository, chart, prov = None, expect_status_code = 201, **kwargs):
client = self._get_client(**kwargs)
_, status_code, _ = client.chartrepo_repo_charts_post_with_http_info(repository, chart)
base._assert_status_code(expect_status_code, status_code)
def get_charts(self, repository, expect_status_code = 200, **kwargs):
client = self._get_client(**kwargs)
body, status_code, _ = client.chartrepo_repo_charts_get_with_http_info(repository)
base._assert_status_code(expect_status_code, status_code)
return body
def chart_should_exist(self, repository, chart_name, **kwargs):
charts_data = self.get_charts(repository, **kwargs)
print "charts_data:", charts_data
for chart in charts_data:
if chart.name == chart_name:
return True
raise Exception(r"Chart {} does not exist in project {}.".format(chart_name, repository))
def delete_chart_with_version(self, repository, chart_name, version, expect_status_code = 200, **kwargs):
client = self._get_client(**kwargs)
_, status_code, _ = client.chartrepo_repo_charts_name_version_delete_with_http_info(repository, chart_name, version)
base._assert_status_code(expect_status_code, status_code)

View File

@ -39,15 +39,13 @@ class DockerAPI(object):
if tag is not None:
_tag = tag
try:
tag_ret = self.DCLIENT.tag(image, harbor_registry, _tag, force=True)
print "tag_ret:", tag_ret
self.DCLIENT.tag(image, harbor_registry, _tag, force=True)
return harbor_registry, _tag
except docker.errors.APIError, e:
raise Exception(r" Docker tag image {} failed, error is [{}]".format (image, e.message))
def docker_image_push(self, harbor_registry, tag):
try:
push_ret = base._get_string_from_unicode(self.DCLIENT.push(harbor_registry, tag, stream=True))
print "push_ret:", push_ret
base._get_string_from_unicode(self.DCLIENT.push(harbor_registry, tag, stream=True))
except docker.errors.APIError, e:
raise Exception(r" Docker tag image {} failed, error is [{}]".format (image, e.message))

View File

@ -89,32 +89,23 @@ class Repository(base.Base):
base._assert_status_code(expect_status_code, status_code)
return data
def get_not_scanned_image_init_state_success(self, repo_name, tag, **kwargs):
def check_image_not_scanned(self, repo_name, tag, **kwargs):
tag = self.get_tag(repo_name, tag, **kwargs)
if tag.scan_overview != None:
raise Exception("Image should be <Not Scanned> state!")
def check_image_scan_result(self, repo_name, tag, expected_scan_status = "finished", **kwargs):
scan_finish = False
timeout_count = 20
actual_scan_status = "NULL"
while not (scan_finish):
while True:
time.sleep(5)
_tag = self.get_tag(repo_name, tag, **kwargs)
scan_result = False
print "t.name:{} tag:{}, t.scan_overview.scan_status:{}".format(_tag.name, tag, _tag.scan_overview.scan_status)
if _tag.name == tag and _tag.scan_overview !=None:
if _tag.scan_overview.scan_status == expected_scan_status:
scan_finish = True
scan_result = True
break
else:
actual_scan_status = _tag.scan_overview.scan_status
timeout_count = timeout_count - 1
if (timeout_count == 0):
scan_finish = True
if not (scan_result):
raise Exception("Scan image result is not as expected {} actual scan status is {}".format(expected_scan_status, actual_scan_status))
break
_tag = self.get_tag(repo_name, tag, **kwargs)
if _tag.name == tag and _tag.scan_overview !=None:
if _tag.scan_overview.scan_status == expected_scan_status:
return
raise Exception("Scan image result is not as expected {}.".format(expected_scan_status))
def scan_image(self, repo_name, tag, expect_status_code = 200, **kwargs):
client = self._get_client(**kwargs)
@ -122,10 +113,10 @@ class Repository(base.Base):
base._assert_status_code(expect_status_code, status_code)
return data
def scan_not_scanned_image_success(self, repo_name, tag, **kwargs):
self.get_not_scanned_image_init_state_success(repo_name, tag, **kwargs)
self.scan_image(repo_name, tag, **kwargs)
self.check_image_scan_result(repo_name, tag, **kwargs)
def scan_all_image_now(self, expect_status_code = 202, **kwargs):
client = self._get_client(**kwargs)
_, status_code, _ = client.repositories_scan_all_post_with_http_info()
base._assert_status_code(expect_status_code, status_code)
def repository_should_exist(self, project_id, repo_name, **kwargs):
repositories = self.get_repository(project_id, **kwargs)

View File

@ -0,0 +1,73 @@
from __future__ import absolute_import
import unittest
from testutils import ADMIN_CLIENT
from testutils import TEARDOWN
from library.user import User
from library.project import Project
from library.chart import Chart
class TestProjects(unittest.TestCase):
@classmethod
def setUp(self):
chart = Chart()
self.chart= chart
project = Project()
self.project= project
user = User()
self.user= user
@classmethod
def tearDown(self):
print "Case completed"
@unittest.skipIf(TEARDOWN == False, "Test data won't be erased.")
def test_ClearData(self):
#1. Delete chart file;
self.chart.delete_chart_with_version(TestProjects.project_chart_name, TestProjects.CHART_NAME, TestProjects.VERSION, **ADMIN_CLIENT)
#2. Delete project(PA);
self.project.delete_project(TestProjects.project_chart_id, **TestProjects.USER_CHART_CLIENT)
#3. Delete user(UA);
self.user.delete_user(TestProjects.user_chart_id, **ADMIN_CLIENT)
def testListHelmCharts(self):
"""
Test case:
List Helm Charts
Test step and expected result:
1. Create a new user(UA);
2. Create a new project(PA) by user(UA);
3. Upload a chart file to project(PA);
4. Chart file should be exist in project(PA).
Tear down:
1. Delete chart file;
2. Delete project(PA);
3. Delete user(UA).
"""
url = ADMIN_CLIENT["endpoint"]
user_chart_password = "Aa123456"
TestProjects.CHART_NAME = 'mariadb'
TestProjects.VERSION = '4.3.1'
#1. Create a new user(UA);
TestProjects.user_chart_id, user_chart_name = self.user.create_user(user_password = user_chart_password, **ADMIN_CLIENT)
TestProjects.USER_CHART_CLIENT=dict(endpoint = url, username = user_chart_name, password = user_chart_password)
#2. Create a new project(PA) by user(UA);
TestProjects.project_chart_id, TestProjects.project_chart_name = self.project.create_project(metadata = {"public": "false"}, **TestProjects.USER_CHART_CLIENT)
#3. Upload a chart file to project(PA);
self.chart.upload_chart(TestProjects.project_chart_name, r'./tests/apitests/python/mariadb-{}.tgz'.format(TestProjects.VERSION), **TestProjects.USER_CHART_CLIENT)
#4. Chart file should be exist in project(PA).
self.chart.chart_should_exist(TestProjects.project_chart_name, TestProjects.CHART_NAME, **TestProjects.USER_CHART_CLIENT)
if __name__ == '__main__':
unittest.main()

View File

@ -0,0 +1,96 @@
from __future__ import absolute_import
import unittest
from testutils import harbor_server
from testutils import TEARDOWN
from testutils import ADMIN_CLIENT
from library.project import Project
from library.user import User
from library.repository import Repository
from library.repository import push_image_to_project
class TestProjects(unittest.TestCase):
@classmethod
def setUp(self):
project = Project()
self.project= project
user = User()
self.user= user
repo = Repository()
self.repo= repo
@classmethod
def tearDown(self):
print "Case completed"
@unittest.skipIf(TEARDOWN == False, "Test data should be remain in the harbor.")
def test_ClearData(self):
#1. Delete Alice's repository and Luca's repository;
self.repo.delete_repoitory(TestProjects.repo_Alice_name, **ADMIN_CLIENT)
self.repo.delete_repoitory(TestProjects.repo_Luca_name, **ADMIN_CLIENT)
#2. Delete Alice's project and Luca's project;
self.project.delete_project(TestProjects.project_Alice_id, **ADMIN_CLIENT)
self.project.delete_project(TestProjects.project_Luca_id, **ADMIN_CLIENT)
#3. Delete user Alice and Luca.
self.user.delete_user(TestProjects.user_Alice_id, **ADMIN_CLIENT)
self.user.delete_user(TestProjects.user_Luca_id, **ADMIN_CLIENT)
def testScanImage(self):
"""
Test case:
Scan All Image
Test step & Expectation:
1. Create user Alice and Luca;
2. Create 2 new private projects project_Alice and project_Luca;
3. Push a image to project_Alice and push another image to project_Luca;
4. Trigger scan all event;
5. Check if image in project_Alice and another image in project_Luca were both scanned.
Tear down:
1. Delete Alice's repository and Luca's repository;
2. Delete Alice's project and Luca's project;
3. Delete user Alice and Luca.
"""
url = ADMIN_CLIENT["endpoint"]
user_common_password = "Aa123456"
#1. Create user Alice and Luca;
TestProjects.user_Alice_id, user_Alice_name = self.user.create_user(user_password = user_common_password, **ADMIN_CLIENT)
TestProjects.user_Luca_id, user_Luca_name = self.user.create_user(user_password = user_common_password, **ADMIN_CLIENT)
USER_ALICE_CLIENT=dict(endpoint = url, username = user_Alice_name, password = user_common_password)
USER_LUCA_CLIENT=dict(endpoint = url, username = user_Luca_name, password = user_common_password)
#2. Create 2 new private projects project_Alice and project_Luca;
TestProjects.project_Alice_id, project_Alice_name = self.project.create_project(metadata = {"public": "false"}, **USER_ALICE_CLIENT)
TestProjects.project_Luca_id, project_Luca_name = self.project.create_project(metadata = {"public": "false"}, **USER_LUCA_CLIENT)
#3. Push a image to project_Alice and push another image to project_Luca;
#Note: Please make sure that this Image has never been pulled before by any other cases,
# so it is a not-scanned image rigth after repository creation.
#image = "tomcat"
image = "mariadb"
src_tag = "latest"
#3.1 Push a image to project_Alice;
TestProjects.repo_Alice_name, tag_Alice = push_image_to_project(project_Alice_name, harbor_server, user_Alice_name, user_common_password, image, src_tag)
#Note: Please make sure that this Image has never been pulled before by any other cases,
# so it is a not-scanned image rigth after repository creation.
image = "mysql"
src_tag = "latest"
#3.2 push another image to project_Luca;
TestProjects.repo_Luca_name, tag_Luca = push_image_to_project(project_Luca_name, harbor_server, user_Luca_name, user_common_password, image, src_tag)
#4. Trigger scan all event;
self.repo.scan_all_image_now(**ADMIN_CLIENT)
#5. Check if image in project_Alice and another image in project_Luca were both scanned.
self.repo.check_image_scan_result(TestProjects.repo_Alice_name, tag_Alice, expected_scan_status = "finished", **USER_ALICE_CLIENT)
self.repo.check_image_scan_result(TestProjects.repo_Luca_name, tag_Luca, expected_scan_status = "finished", **USER_LUCA_CLIENT)
if __name__ == '__main__':
unittest.main()

View File

@ -28,10 +28,10 @@ class TestProjects(unittest.TestCase):
@unittest.skipIf(TEARDOWN == True, "Test data should be remain in the harbor.")
def test_ClearData(self):
#1. Delete repository(RA) by user(UA);
self.repo.delete_repoitory(TestProjects.repo_name, **TestProjects.USER_scan_image_CLIENT)
self.repo.delete_repoitory(TestProjects.repo_name, **TestProjects.USER_SCAN_IMAGE_CLIENT)
#2. Delete project(PA);
self.project.delete_project(TestProjects.project_scan_image_id, **TestProjects.USER_scan_image_CLIENT)
self.project.delete_project(TestProjects.project_scan_image_id, **TestProjects.USER_SCAN_IMAGE_CLIENT)
#3. Delete user(UA);
self.user.delete_user(TestProjects.user_scan_image_id, **ADMIN_CLIENT)
@ -58,7 +58,7 @@ class TestProjects(unittest.TestCase):
#1. Create user-001
TestProjects.user_scan_image_id, user_scan_image_name = self.user.create_user(user_password = user_001_password, **ADMIN_CLIENT)
TestProjects.USER_scan_image_CLIENT=dict(endpoint = url, username = user_scan_image_name, password = user_001_password)
TestProjects.USER_SCAN_IMAGE_CLIENT=dict(endpoint = url, username = user_scan_image_name, password = user_001_password)
#2. Create a new private project(PA) by user(UA);
TestProjects.project_scan_image_id, project_scan_image_name = self.project.create_project(metadata = {"public": "false"}, **ADMIN_CLIENT)
@ -68,7 +68,7 @@ class TestProjects(unittest.TestCase):
#4. Get private project of user(UA), user(UA) can see only one private project which is project(PA);
self.project.projects_should_exist(dict(public=False), expected_count = 1,
expected_project_id = TestProjects.project_scan_image_id, **TestProjects.USER_scan_image_CLIENT)
expected_project_id = TestProjects.project_scan_image_id, **TestProjects.USER_SCAN_IMAGE_CLIENT)
#Note: Please make sure that this Image has never been pulled before by any other cases,
# so it is a not-scanned image rigth after rpository creation.
@ -79,7 +79,8 @@ class TestProjects(unittest.TestCase):
TestProjects.repo_name, tag = push_image_to_project(project_scan_image_name, harbor_server, user_scan_image_name, user_001_password, image, src_tag)
#6. Send scan image command and get tag(TA) infomation to check scan result, it should be finished;
self.repo.scan_not_scanned_image_success(TestProjects.repo_name, tag, **TestProjects.USER_scan_image_CLIENT)
self.repo.scan_image(TestProjects.repo_name, tag, **TestProjects.USER_SCAN_IMAGE_CLIENT)
self.repo.check_image_scan_result(TestProjects.repo_name, tag, expected_scan_status = "finished", **TestProjects.USER_SCAN_IMAGE_CLIENT)
if __name__ == '__main__':
unittest.main()

View File

@ -37,3 +37,7 @@ Test Case - Garbage Collection
Harbor API Test ./tests/apitests/python/test_garbage_collection.py
Test Case - User View Logs
Harbor API Test ./tests/apitests/python/test_user_view_logs.py
Test Case - Scan All Images
Harbor API Test ./tests/apitests/python/test_scan_all_images.py
Test Case - List Helm Charts
Harbor API Test ./tests/apitests/python/test_list_helm_charts.py

View File

@ -19,6 +19,9 @@ if [ "$2" = 'DB' ]; then
sudo ./tests/hostcfg.sh
fi
# prepare a chart file for API_DB test...
sudo curl -o /home/travis/gopath/src/github.com/goharbor/harbor/tests/apitests/python/mariadb-4.3.1.tgz https://storage.googleapis.com/harbor-builds/bin/charts/mariadb-4.3.1.tgz
sudo apt-get update && sudo apt-get install -y --no-install-recommends python-dev openjdk-7-jdk libssl-dev && sudo apt-get autoremove -y && sudo rm -rf /var/lib/apt/lists/*
sudo wget https://bootstrap.pypa.io/get-pip.py && sudo python ./get-pip.py && sudo pip install --ignore-installed urllib3 chardet requests && sudo pip install robotframework robotframework-httplibrary requests dbbot robotframework-pabot --upgrade
sudo make swagger_client

View File

@ -26,6 +26,7 @@ function uploader {
set +e
docker ps
# run db auth api cases
if [ "$1" = 'DB' ]; then
pybot -v ip:$2 -v HARBOR_PASSWORD:Harbor12345 /home/travis/gopath/src/github.com/goharbor/harbor/tests/robot-cases/Group0-BAT/API_DB.robot