mirror of
https://github.com/goharbor/harbor.git
synced 2025-01-01 13:37:47 +01:00
7bbefca8c2
1. Add export scan data API test case 2. Add get scan data export execution list test case 3. Add get scan data export execution test case 4. Add download scan data test case Signed-off-by: Yang Jiao <jiaoya@vmware.com> Signed-off-by: Yang Jiao <jiaoya@vmware.com>
195 lines
8.3 KiB
Python
195 lines
8.3 KiB
Python
# -*- coding: utf-8 -*-
|
|
import os
|
|
import subprocess
|
|
import time
|
|
|
|
import client
|
|
import swagger_client
|
|
import v2_swagger_client
|
|
|
|
try:
|
|
from urllib import getproxies
|
|
except ImportError:
|
|
from urllib.request import getproxies
|
|
|
|
class Server:
|
|
def __init__(self, endpoint, verify_ssl):
|
|
self.endpoint = endpoint
|
|
self.verify_ssl = verify_ssl
|
|
|
|
class Credential:
|
|
def __init__(self, type, username, password):
|
|
self.type = type
|
|
self.username = username
|
|
self.password = password
|
|
|
|
def get_endpoint():
|
|
harbor_server = os.environ.get("HARBOR_HOST", "localhost:8080")
|
|
return os.environ.get("HARBOR_HOST_SCHEMA", "https")+ "://"+harbor_server+"/api/v2.0"
|
|
|
|
def _create_client(server, credential, debug, api_type="products"):
|
|
cfg = None
|
|
if api_type in ('projectv2', 'artifact', 'repository', 'scanner', 'scan', 'scanall', 'preheat', 'quota',
|
|
'replication', 'registry', 'robot', 'gc', 'retention', 'immutable', 'system_cve_allowlist',
|
|
'configure', 'user', 'member', 'health', 'label', 'webhook', 'purge', 'audit_log', 'scan_data_export'):
|
|
cfg = v2_swagger_client.Configuration()
|
|
else:
|
|
cfg = swagger_client.Configuration()
|
|
|
|
cfg.host = server.endpoint
|
|
cfg.verify_ssl = server.verify_ssl
|
|
# support basic auth only for now
|
|
cfg.username = credential.username
|
|
cfg.password = credential.password
|
|
cfg.debug = debug
|
|
|
|
proxies = getproxies()
|
|
proxy = proxies.get('http', proxies.get('all', None))
|
|
if proxy:
|
|
cfg.proxy = proxy
|
|
|
|
if cfg.username is None and cfg.password is None:
|
|
# returns {} for auth_settings for anonymous access
|
|
import types
|
|
cfg.auth_settings = types.MethodType(lambda self: {}, cfg)
|
|
|
|
return {
|
|
"chart": client.ChartRepositoryApi(client.ApiClient(cfg)),
|
|
"products": swagger_client.ProductsApi(swagger_client.ApiClient(cfg)),
|
|
"projectv2":v2_swagger_client.ProjectApi(v2_swagger_client.ApiClient(cfg)),
|
|
"artifact": v2_swagger_client.ArtifactApi(v2_swagger_client.ApiClient(cfg)),
|
|
"preheat": v2_swagger_client.PreheatApi(v2_swagger_client.ApiClient(cfg)),
|
|
"quota": v2_swagger_client.QuotaApi(v2_swagger_client.ApiClient(cfg)),
|
|
"repository": v2_swagger_client.RepositoryApi(v2_swagger_client.ApiClient(cfg)),
|
|
"scan": v2_swagger_client.ScanApi(v2_swagger_client.ApiClient(cfg)),
|
|
"scanall": v2_swagger_client.ScanAllApi(v2_swagger_client.ApiClient(cfg)),
|
|
"scanner": v2_swagger_client.ScannerApi(v2_swagger_client.ApiClient(cfg)),
|
|
"replication": v2_swagger_client.ReplicationApi(v2_swagger_client.ApiClient(cfg)),
|
|
"registry": v2_swagger_client.RegistryApi(v2_swagger_client.ApiClient(cfg)),
|
|
"robot": v2_swagger_client.RobotApi(v2_swagger_client.ApiClient(cfg)),
|
|
"gc": v2_swagger_client.GcApi(v2_swagger_client.ApiClient(cfg)),
|
|
"retention": v2_swagger_client.RetentionApi(v2_swagger_client.ApiClient(cfg)),
|
|
"immutable": v2_swagger_client.ImmutableApi(v2_swagger_client.ApiClient(cfg)),
|
|
"system_cve_allowlist": v2_swagger_client.SystemCVEAllowlistApi(v2_swagger_client.ApiClient(cfg)),
|
|
"configure": v2_swagger_client.ConfigureApi(v2_swagger_client.ApiClient(cfg)),
|
|
"label": v2_swagger_client.LabelApi(v2_swagger_client.ApiClient(cfg)),
|
|
"user": v2_swagger_client.UserApi(v2_swagger_client.ApiClient(cfg)),
|
|
"member": v2_swagger_client.MemberApi(v2_swagger_client.ApiClient(cfg)),
|
|
"health": v2_swagger_client.HealthApi(v2_swagger_client.ApiClient(cfg)),
|
|
"webhook": v2_swagger_client.WebhookApi(v2_swagger_client.ApiClient(cfg)),
|
|
"purge": v2_swagger_client.PurgeApi(v2_swagger_client.ApiClient(cfg)),
|
|
"audit_log": v2_swagger_client.AuditlogApi(v2_swagger_client.ApiClient(cfg)),
|
|
"scan_data_export": v2_swagger_client.ScanDataExportApi(v2_swagger_client.ApiClient(cfg))
|
|
}.get(api_type,'Error: Wrong API type')
|
|
|
|
def _assert_status_code(expect_code, return_code, err_msg = r"HTTPS status code s not as we expected. Expected {}, while actual HTTPS status code is {}."):
|
|
if str(return_code) != str(expect_code):
|
|
raise Exception(err_msg.format(expect_code, return_code))
|
|
|
|
def _assert_status_body(expect_status_body, returned_status_body):
|
|
if str(returned_status_body.strip()).lower().find(expect_status_body.lower()) < 0:
|
|
raise Exception(r"HTTPS status body s not as we expected. Expected {}, while actual HTTPS status body is {}.".format(expect_status_body, returned_status_body))
|
|
|
|
def _random_name(prefix):
|
|
return "%s-%d" % (prefix, int(round(time.time() * 1000)))
|
|
|
|
def _get_id_from_header(header):
|
|
try:
|
|
location = header["Location"]
|
|
return int(location.split("/")[-1])
|
|
except Exception:
|
|
return None
|
|
|
|
def _get_string_from_unicode(udata):
|
|
result=''
|
|
for u in udata:
|
|
tmp = u.encode('utf8')
|
|
result = result + tmp.strip('\n\r\t')
|
|
return result
|
|
|
|
def restart_process(process):
|
|
if process == "dockerd":
|
|
full_process_name = process
|
|
elif process == "containerd":
|
|
full_process_name = "/usr/local/bin/containerd"
|
|
else:
|
|
raise Exception("Please input dockerd or containerd for process retarting.")
|
|
run_command_with_popen("ps aux |grep " + full_process_name)
|
|
for i in range(10):
|
|
pid = run_command_with_popen(["pidof " + full_process_name])
|
|
if pid in [None, ""]:
|
|
break
|
|
run_command_with_popen(["kill " + str(pid)])
|
|
time.sleep(3)
|
|
|
|
run_command_with_popen("ps aux |grep " + full_process_name)
|
|
run_command_with_popen("rm -rf /var/lib/" + process + "/*")
|
|
run_command_with_popen(full_process_name + " > ./daemon-local.log 2>&1 &")
|
|
time.sleep(3)
|
|
pid = run_command_with_popen(["pidof " + full_process_name])
|
|
if pid in [None, ""]:
|
|
raise Exception("Failed to start process {}.".format(full_process_name))
|
|
run_command_with_popen("ps aux |grep " + full_process_name)
|
|
|
|
def run_command_with_popen(command):
|
|
try:
|
|
proc = subprocess.Popen(command, universal_newlines=True, shell=True,
|
|
stdout=subprocess.PIPE,stderr=subprocess.STDOUT)
|
|
output, errors = proc.communicate()
|
|
except Exception as e:
|
|
output = None
|
|
finally:
|
|
proc.stdout.close()
|
|
return output
|
|
|
|
def run_command(command, expected_error_message = None):
|
|
try:
|
|
output = subprocess.check_output(command,
|
|
stderr=subprocess.STDOUT,
|
|
universal_newlines=True)
|
|
except subprocess.CalledProcessError as e:
|
|
if expected_error_message is not None:
|
|
if str(e.output).lower().find(expected_error_message.lower()) < 0:
|
|
raise Exception(r"Error message is not as expected {}".format(expected_error_message))
|
|
else:
|
|
raise Exception('Error: Exited with error code: %s.'% (e.returncode))
|
|
else:
|
|
return output
|
|
|
|
class Base(object):
|
|
def __init__(self, server=None, credential=None, debug=True, api_type="products"):
|
|
if server is None:
|
|
server = Server(endpoint=get_endpoint(), verify_ssl=False)
|
|
if not isinstance(server.verify_ssl, bool):
|
|
server.verify_ssl = server.verify_ssl == "True"
|
|
|
|
if credential is None:
|
|
credential = Credential(type="basic_auth", username="admin", password="Harbor12345")
|
|
|
|
self.server = server
|
|
self.credential = credential
|
|
self.debug = debug
|
|
self.api_type = api_type
|
|
self.client = _create_client(server, credential, debug, api_type=api_type)
|
|
|
|
def _get_client(self, **kwargs):
|
|
if len(kwargs) == 0:
|
|
return self.client
|
|
|
|
server = self.server
|
|
if "endpoint" in kwargs:
|
|
server.endpoint = kwargs.get("endpoint")
|
|
if "verify_ssl" in kwargs:
|
|
if not isinstance(kwargs.get("verify_ssl"), bool):
|
|
server.verify_ssl = kwargs.get("verify_ssl") == "True"
|
|
else:
|
|
server.verify_ssl = kwargs.get("verify_ssl")
|
|
|
|
credential = Credential(
|
|
kwargs.get("type", self.credential.type),
|
|
kwargs.get("username", self.credential.username),
|
|
kwargs.get("password", self.credential.password),
|
|
)
|
|
|
|
return _create_client(server, credential, self.debug, kwargs.get('api_type', self.api_type))
|