2020-07-30 10:04:14 +02:00
# -*- coding: utf-8 -*-
2020-08-15 18:09:06 +02:00
import os
2020-07-30 10:04:14 +02:00
import sys
import time
import subprocess
import client
import swagger_client
import v2_swagger_client
try :
from urllib import getproxies
except ImportError :
from urllib . request import getproxies
class Server :
def __init__ ( self , endpoint , verify_ssl ) :
self . endpoint = endpoint
self . verify_ssl = verify_ssl
class Credential :
def __init__ ( self , type , username , password ) :
self . type = type
self . username = username
self . password = password
2020-08-15 18:09:06 +02:00
def get_endpoint ( ) :
harbor_server = os . environ . get ( " HARBOR_HOST " , " localhost:8080 " )
return os . environ . get ( " HARBOR_HOST_SCHEMA " , " https " ) + " :// " + harbor_server + " /api/v2.0 "
2020-07-30 10:04:14 +02:00
def _create_client ( server , credential , debug , api_type = " products " ) :
cfg = None
2020-12-21 03:17:02 +01:00
if api_type in ( ' projectv2 ' , ' artifact ' , ' repository ' , ' scan ' , ' scanall ' , ' preheat ' , ' replication ' , ' robot ' , ' gc ' ) :
2020-07-30 10:04:14 +02:00
cfg = v2_swagger_client . Configuration ( )
else :
cfg = swagger_client . Configuration ( )
cfg . host = server . endpoint
cfg . verify_ssl = server . verify_ssl
# support basic auth only for now
cfg . username = credential . username
cfg . password = credential . password
cfg . debug = debug
proxies = getproxies ( )
proxy = proxies . get ( ' http ' , proxies . get ( ' all ' , None ) )
if proxy :
cfg . proxy = proxy
2020-08-17 11:55:38 +02:00
if cfg . username is None and cfg . password is None :
# returns {} for auth_settings for anonymous access
import types
cfg . auth_settings = types . MethodType ( lambda self : { } , cfg )
2020-07-30 10:04:14 +02:00
return {
" chart " : client . ChartRepositoryApi ( client . ApiClient ( cfg ) ) ,
" products " : swagger_client . ProductsApi ( swagger_client . ApiClient ( cfg ) ) ,
" projectv2 " : v2_swagger_client . ProjectApi ( v2_swagger_client . ApiClient ( cfg ) ) ,
" artifact " : v2_swagger_client . ArtifactApi ( v2_swagger_client . ApiClient ( cfg ) ) ,
2020-10-27 10:31:48 +01:00
" preheat " : v2_swagger_client . PreheatApi ( v2_swagger_client . ApiClient ( cfg ) ) ,
2020-07-30 10:04:14 +02:00
" repository " : v2_swagger_client . RepositoryApi ( v2_swagger_client . ApiClient ( cfg ) ) ,
" scan " : v2_swagger_client . ScanApi ( v2_swagger_client . ApiClient ( cfg ) ) ,
2020-12-21 03:17:02 +01:00
" scanall " : v2_swagger_client . ScanAllApi ( v2_swagger_client . ApiClient ( cfg ) ) ,
2020-07-30 10:04:14 +02:00
" scanner " : swagger_client . ScannersApi ( swagger_client . ApiClient ( cfg ) ) ,
2020-09-30 09:14:00 +02:00
" replication " : v2_swagger_client . ReplicationApi ( v2_swagger_client . ApiClient ( cfg ) ) ,
2020-11-20 06:13:12 +01:00
" robot " : v2_swagger_client . RobotApi ( v2_swagger_client . ApiClient ( cfg ) ) ,
2020-09-23 10:18:11 +02:00
" gc " : v2_swagger_client . GcApi ( v2_swagger_client . ApiClient ( cfg ) ) ,
2020-07-30 10:04:14 +02:00
} . get ( api_type , ' Error: Wrong API type ' )
2020-12-16 11:06:41 +01:00
def _assert_status_code ( expect_code , return_code , err_msg = r " HTTPS status code s not as we expected. Expected {} , while actual HTTPS status code is {} . " ) :
2020-07-30 10:04:14 +02:00
if str ( return_code ) != str ( expect_code ) :
2020-12-16 11:06:41 +01:00
raise Exception ( err_msg . format ( expect_code , return_code ) )
2020-07-30 10:04:14 +02:00
def _assert_status_body ( expect_status_body , returned_status_body ) :
2020-11-03 08:32:13 +01:00
if str ( returned_status_body . strip ( ) ) . lower ( ) . find ( expect_status_body . lower ( ) ) < 0 :
2020-07-30 10:04:14 +02:00
raise Exception ( r " HTTPS status body s not as we expected. Expected {} , while actual HTTPS status body is {} . " . format ( expect_status_body , returned_status_body ) )
def _random_name ( prefix ) :
return " %s - %d " % ( prefix , int ( round ( time . time ( ) * 1000 ) ) )
def _get_id_from_header ( header ) :
try :
location = header [ " Location " ]
return int ( location . split ( " / " ) [ - 1 ] )
except Exception :
return None
def _get_string_from_unicode ( udata ) :
result = ' '
for u in udata :
tmp = u . encode ( ' utf8 ' )
result = result + tmp . strip ( ' \n \r \t ' )
return result
2020-12-16 11:06:41 +01:00
def run_command ( command , expected_error_message = None ) :
2020-07-30 10:04:14 +02:00
print ( " Command: " , subprocess . list2cmdline ( command ) )
try :
output = subprocess . check_output ( command ,
stderr = subprocess . STDOUT ,
universal_newlines = True )
except subprocess . CalledProcessError as e :
2020-12-16 11:06:41 +01:00
print ( " Run command error: " , str ( e ) )
print ( " expected_error_message: " , expected_error_message )
if expected_error_message is not None :
if str ( e . output ) . lower ( ) . find ( expected_error_message . lower ( ) ) < 0 :
raise Exception ( r " Error message {} is not as expected {} " . format ( str ( e . output ) , expected_error_message ) )
else :
raise Exception ( ' Error: Exited with error code: %s . Output: %s ' % ( e . returncode , e . output ) )
else :
print ( " output: " , output )
return output
2020-07-30 10:04:14 +02:00
2020-08-15 18:09:06 +02:00
class Base ( object ) :
def __init__ ( self , server = None , credential = None , debug = True , api_type = " products " ) :
if server is None :
server = Server ( endpoint = get_endpoint ( ) , verify_ssl = False )
2020-07-30 10:04:14 +02:00
if not isinstance ( server . verify_ssl , bool ) :
server . verify_ssl = server . verify_ssl == " True "
2020-08-15 18:09:06 +02:00
if credential is None :
credential = Credential ( type = " basic_auth " , username = " admin " , password = " Harbor12345 " )
2020-07-30 10:04:14 +02:00
self . server = server
self . credential = credential
self . debug = debug
self . api_type = api_type
self . client = _create_client ( server , credential , debug , api_type = api_type )
def _get_client ( self , * * kwargs ) :
if len ( kwargs ) == 0 :
return self . client
2020-08-17 11:55:38 +02:00
2020-07-30 10:04:14 +02:00
server = self . server
if " endpoint " in kwargs :
server . endpoint = kwargs . get ( " endpoint " )
if " verify_ssl " in kwargs :
if not isinstance ( kwargs . get ( " verify_ssl " ) , bool ) :
server . verify_ssl = kwargs . get ( " verify_ssl " ) == " True "
else :
server . verify_ssl = kwargs . get ( " verify_ssl " )
2020-08-17 11:55:38 +02:00
credential = Credential (
kwargs . get ( " type " , self . credential . type ) ,
kwargs . get ( " username " , self . credential . username ) ,
kwargs . get ( " password " , self . credential . password ) ,
)
2020-08-15 18:09:06 +02:00
return _create_client ( server , credential , self . debug , kwargs . get ( ' api_type ' , self . api_type ) )