harbor/tests/apitests/python/testutils.py
danfengliu acc027afd7 Remove deploytment of python3.6 and set dns for docker
1. Docker v20 is the default version for git action, dns should be set manually.
Python v3.8 is the default version for git action, so remove deployment for python v3.6.
2. Some of API tests would be affect by docker v20, like manifest get
message will not be triggered if there is one locally, to avoid this
impact, API tests will be tested with docker E2E image, in this image,
docker v19 is enabled.

Signed-off-by: danfengliu <danfengl@vmware.com>
2021-03-11 17:53:36 +08:00

85 lines
2.8 KiB
Python

import time
import os
import sys
sys.path.insert(0, os.environ["SWAGGER_CLIENT_PATH"])
path=os.getcwd() + "/library"
sys.path.insert(0, path)
path=os.getcwd() + "/tests/apitests/python/library"
sys.path.insert(0, path)
from swagger_client.rest import ApiException
import swagger_client.models
from pprint import pprint
admin_user = "admin"
admin_pwd = "Harbor12345"
harbor_server = os.environ["HARBOR_HOST"]
#CLIENT=dict(endpoint="https://"+harbor_server+"/api")
ADMIN_CLIENT=dict(endpoint = os.environ.get("HARBOR_HOST_SCHEMA", "https")+ "://"+harbor_server+"/api", username = admin_user, password = admin_pwd)
USER_ROLE=dict(admin=0,normal=1)
TEARDOWN = os.environ.get('TEARDOWN', 'true').lower() in ('true', 'yes')
notary_url = os.environ.get('NOTARY_URL', 'https://'+harbor_server+':4443')
DOCKER_USER = os.environ.get('DOCKER_USER', '')
DOCKER_PWD = os.environ.get('DOCKER_PWD', '')
METRIC_URL = os.environ.get('METRIC_URL', 'http://'+harbor_server+':9090')
BASE_IMAGE = dict(name='busybox', tag='latest')
BASE_IMAGE_ABS_PATH_NAME = '/' + BASE_IMAGE['name'] + '.tar'
def GetProductApi(username, password, harbor_server= os.environ["HARBOR_HOST"]):
cfg = swagger_client.Configuration()
cfg.host = "https://"+harbor_server+"/api"
cfg.username = username
cfg.password = password
cfg.verify_ssl = False
cfg.debug = True
api_client = swagger_client.ApiClient(cfg)
api_instance = swagger_client.ProductsApi(api_client)
return api_instance
class TestResult(object):
def __init__(self):
self.num_errors = 0
self.error_message = []
def add_test_result(self, error_message):
self.num_errors = self.num_errors + 1
self.error_message.append(error_message)
def get_final_result(self):
if self.num_errors > 0:
for each_err_msg in self.error_message:
print("Error message:", each_err_msg)
raise Exception(r"Test case failed with {} errors.".format(self.num_errors))
from contextlib import contextmanager
@contextmanager
def created_user(password):
from library.user import User
api = User()
user_id, user_name = api.create_user(user_password=password, **ADMIN_CLIENT)
try:
yield (user_id, user_name)
finally:
if TEARDOWN:
api.delete_user(user_id, **ADMIN_CLIENT)
@contextmanager
def created_project(name=None, metadata=None, user_id=None, member_role_id=None):
from library.project import Project
api = Project()
project_id, project_name = api.create_project(name=name, metadata=metadata, **ADMIN_CLIENT)
if user_id:
api.add_project_members(project_id, user_id=user_id, member_role_id=member_role_id, **ADMIN_CLIENT)
try:
yield (project_id, project_name)
finally:
if TEARDOWN:
api.delete_project(project_id, **ADMIN_CLIENT)