mirror of
https://github.com/goharbor/harbor.git
synced 2025-01-20 06:31:55 +01:00
1b127b16c5
1. Add 7 API python test scripts for tag immutability; 2. Swagger file has object struct defination issue on imuutability policy rule, because this policy is in shared with retention rule, therefore, they should be unified to the same type of rule; 3. Fixed a timeout issue of action keyword, waiting for a long period; 4. Add API test scripts for scan signed image. Signed-off-by: danfengliu <danfengl@vmware.com>
134 lines
5.0 KiB
Python
134 lines
5.0 KiB
Python
# -*- coding: utf-8 -*-
|
|
import os
|
|
import sys
|
|
import time
|
|
import subprocess
|
|
import client
|
|
import swagger_client
|
|
import v2_swagger_client
|
|
try:
|
|
from urllib import getproxies
|
|
except ImportError:
|
|
from urllib.request import getproxies
|
|
|
|
class Server:
|
|
def __init__(self, endpoint, verify_ssl):
|
|
self.endpoint = endpoint
|
|
self.verify_ssl = verify_ssl
|
|
|
|
class Credential:
|
|
def __init__(self, type, username, password):
|
|
self.type = type
|
|
self.username = username
|
|
self.password = password
|
|
|
|
def get_endpoint():
|
|
harbor_server = os.environ.get("HARBOR_HOST", "localhost:8080")
|
|
return os.environ.get("HARBOR_HOST_SCHEMA", "https")+ "://"+harbor_server+"/api/v2.0"
|
|
|
|
def _create_client(server, credential, debug, api_type="products"):
|
|
cfg = None
|
|
if api_type in ('projectv2', 'artifact', 'repository', 'scan'):
|
|
cfg = v2_swagger_client.Configuration()
|
|
else:
|
|
cfg = swagger_client.Configuration()
|
|
|
|
cfg.host = server.endpoint
|
|
cfg.verify_ssl = server.verify_ssl
|
|
# support basic auth only for now
|
|
cfg.username = credential.username
|
|
cfg.password = credential.password
|
|
cfg.debug = debug
|
|
|
|
proxies = getproxies()
|
|
proxy = proxies.get('http', proxies.get('all', None))
|
|
if proxy:
|
|
cfg.proxy = proxy
|
|
|
|
if cfg.username is None and cfg.password is None:
|
|
# returns {} for auth_settings for anonymous access
|
|
import types
|
|
cfg.auth_settings = types.MethodType(lambda self: {}, cfg)
|
|
|
|
return {
|
|
"chart": client.ChartRepositoryApi(client.ApiClient(cfg)),
|
|
"products": swagger_client.ProductsApi(swagger_client.ApiClient(cfg)),
|
|
"projectv2": v2_swagger_client.ProjectApi(v2_swagger_client.ApiClient(cfg)),
|
|
"artifact": v2_swagger_client.ArtifactApi(v2_swagger_client.ApiClient(cfg)),
|
|
"repository": v2_swagger_client.RepositoryApi(v2_swagger_client.ApiClient(cfg)),
|
|
"scan": v2_swagger_client.ScanApi(v2_swagger_client.ApiClient(cfg)),
|
|
"scanner": swagger_client.ScannersApi(swagger_client.ApiClient(cfg)),
|
|
}.get(api_type,'Error: Wrong API type')
|
|
|
|
def _assert_status_code(expect_code, return_code):
|
|
if str(return_code) != str(expect_code):
|
|
raise Exception(r"HTTPS status code s not as we expected. Expected {}, while actual HTTPS status code is {}.".format(expect_code, return_code))
|
|
|
|
def _assert_status_body(expect_status_body, returned_status_body):
|
|
if str(returned_status_body.strip()).lower().find(expect_status_body.lower()) < 0:
|
|
raise Exception(r"HTTPS status body s not as we expected. Expected {}, while actual HTTPS status body is {}.".format(expect_status_body, returned_status_body))
|
|
|
|
def _random_name(prefix):
|
|
return "%s-%d" % (prefix, int(round(time.time() * 1000)))
|
|
|
|
def _get_id_from_header(header):
|
|
try:
|
|
location = header["Location"]
|
|
return int(location.split("/")[-1])
|
|
except Exception:
|
|
return None
|
|
|
|
def _get_string_from_unicode(udata):
|
|
result=''
|
|
for u in udata:
|
|
tmp = u.encode('utf8')
|
|
result = result + tmp.strip('\n\r\t')
|
|
return result
|
|
|
|
def run_command(command):
|
|
print("Command: ", subprocess.list2cmdline(command))
|
|
try:
|
|
output = subprocess.check_output(command,
|
|
stderr=subprocess.STDOUT,
|
|
universal_newlines=True)
|
|
except subprocess.CalledProcessError as e:
|
|
raise Exception('Error: Exited with error code: %s. Output:%s'% (e.returncode, e.output))
|
|
return output
|
|
|
|
class Base(object):
|
|
def __init__(self, server=None, credential=None, debug=True, api_type="products"):
|
|
if server is None:
|
|
server = Server(endpoint=get_endpoint(), verify_ssl=False)
|
|
if not isinstance(server.verify_ssl, bool):
|
|
server.verify_ssl = server.verify_ssl == "True"
|
|
|
|
if credential is None:
|
|
credential = Credential(type="basic_auth", username="admin", password="Harbor12345")
|
|
|
|
self.server = server
|
|
self.credential = credential
|
|
self.debug = debug
|
|
self.api_type = api_type
|
|
self.client = _create_client(server, credential, debug, api_type=api_type)
|
|
|
|
def _get_client(self, **kwargs):
|
|
if len(kwargs) == 0:
|
|
return self.client
|
|
|
|
server = self.server
|
|
if "endpoint" in kwargs:
|
|
server.endpoint = kwargs.get("endpoint")
|
|
if "verify_ssl" in kwargs:
|
|
if not isinstance(kwargs.get("verify_ssl"), bool):
|
|
server.verify_ssl = kwargs.get("verify_ssl") == "True"
|
|
else:
|
|
server.verify_ssl = kwargs.get("verify_ssl")
|
|
|
|
credential = Credential(
|
|
kwargs.get("type", self.credential.type),
|
|
kwargs.get("username", self.credential.username),
|
|
kwargs.get("password", self.credential.password),
|
|
)
|
|
|
|
return _create_client(server, credential, self.debug, kwargs.get('api_type', self.api_type))
|