2018-11-27 12:17:41 +01:00
# -*- coding: utf-8 -*-
import time
import re
import base
import swagger_client
from swagger_client . rest import ApiException
class System ( base . Base ) :
def get_gc_history ( self , expect_status_code = 200 , expect_response_body = None , * * kwargs ) :
client = self . _get_client ( * * kwargs )
try :
data , status_code , _ = client . system_gc_get_with_http_info ( )
except ApiException as e :
if e . status == expect_status_code :
if expect_response_body is not None and e . body . strip ( ) != expect_response_body . strip ( ) :
raise Exception ( r " Get configuration response body is not as expected {} actual status is {} . " . format ( expect_response_body . strip ( ) , e . body . strip ( ) ) )
else :
return e . reason , e . body
else :
raise Exception ( r " Get configuration result is not as expected {} actual status is {} . " . format ( expect_status_code , e . status ) )
base . _assert_status_code ( expect_status_code , status_code )
return data
def get_gc_status_by_id ( self , job_id , expect_status_code = 200 , expect_response_body = None , * * kwargs ) :
client = self . _get_client ( * * kwargs )
try :
data , status_code , _ = client . system_gc_id_get_with_http_info ( job_id )
except ApiException as e :
if e . status == expect_status_code :
if expect_response_body is not None and e . body . strip ( ) != expect_response_body . strip ( ) :
raise Exception ( r " Get configuration response body is not as expected {} actual status is {} . " . format ( expect_response_body . strip ( ) , e . body . strip ( ) ) )
else :
return e . reason , e . body
else :
raise Exception ( r " Get configuration result is not as expected {} actual status is {} . " . format ( expect_status_code , e . status ) )
base . _assert_status_code ( expect_status_code , status_code )
return data
def get_gc_log_by_id ( self , job_id , expect_status_code = 200 , expect_response_body = None , * * kwargs ) :
client = self . _get_client ( * * kwargs )
try :
data , status_code , _ = client . system_gc_id_log_get_with_http_info ( job_id )
except ApiException as e :
if e . status == expect_status_code :
if expect_response_body is not None and e . body . strip ( ) != expect_response_body . strip ( ) :
raise Exception ( r " Get configuration response body is not as expected {} actual status is {} . " . format ( expect_response_body . strip ( ) , e . body . strip ( ) ) )
else :
return e . reason , e . body
else :
raise Exception ( r " Get configuration result is not as expected {} actual status is {} . " . format ( expect_status_code , e . status ) )
base . _assert_status_code ( expect_status_code , status_code )
return data
def get_gc_schedule ( self , expect_status_code = 200 , expect_response_body = None , * * kwargs ) :
client = self . _get_client ( * * kwargs )
try :
data , status_code , _ = client . system_gc_schedule_get_with_http_info ( )
except ApiException as e :
if e . status == expect_status_code :
if expect_response_body is not None and e . body . strip ( ) != expect_response_body . strip ( ) :
raise Exception ( r " Get configuration response body is not as expected {} actual status is {} . " . format ( expect_response_body . strip ( ) , e . body . strip ( ) ) )
else :
return e . reason , e . body
else :
raise Exception ( r " Get configuration result is not as expected {} actual status is {} . " . format ( expect_status_code , e . status ) )
base . _assert_status_code ( expect_status_code , status_code )
return data
2019-03-22 10:52:21 +01:00
def set_gc_schedule ( self , schedule_type = ' None ' , cron = None , expect_status_code = 200 , expect_response_body = None , * * kwargs ) :
2018-11-27 12:17:41 +01:00
client = self . _get_client ( * * kwargs )
2019-03-22 10:52:21 +01:00
gc_schedule = swagger_client . AdminJobSchedule ( )
2018-11-27 12:17:41 +01:00
gc_schedule . type = schedule_type
2019-03-22 10:52:21 +01:00
if cron is not None :
gc_schedule . cron = cron
2018-11-27 12:17:41 +01:00
try :
data , status_code , _ = client . system_gc_schedule_put_with_http_info ( gc_schedule )
except ApiException as e :
if e . status == expect_status_code :
if expect_response_body is not None and e . body . strip ( ) != expect_response_body . strip ( ) :
raise Exception ( r " Get configuration response body is not as expected {} actual status is {} . " . format ( expect_response_body . strip ( ) , e . body . strip ( ) ) )
else :
return e . reason , e . body
else :
raise Exception ( r " Get configuration result is not as expected {} actual status is {} . " . format ( expect_status_code , e . status ) )
base . _assert_status_code ( expect_status_code , status_code )
return data
2020-07-08 14:06:42 +02:00
def create_gc_schedule ( self , schedule_type , is_delete_untagged , cron = None , expect_status_code = 201 , expect_response_body = None , * * kwargs ) :
2018-11-27 12:17:41 +01:00
client = self . _get_client ( * * kwargs )
2020-03-11 04:19:29 +01:00
2020-07-08 14:06:42 +02:00
gc_parameters = { ' delete_untagged ' : is_delete_untagged }
2020-03-11 04:19:29 +01:00
gc_schedule = swagger_client . AdminJobScheduleObj ( )
gc_schedule . type = schedule_type
2019-03-22 10:52:21 +01:00
if cron is not None :
2020-03-11 04:19:29 +01:00
gc_schedule . cron = cron
gc_job = swagger_client . AdminJobSchedule ( )
gc_job . schedule = gc_schedule
gc_job . parameters = gc_parameters
2018-11-27 12:17:41 +01:00
try :
2020-03-11 04:19:29 +01:00
_ , status_code , header = client . system_gc_schedule_post_with_http_info ( gc_job )
2018-11-27 12:17:41 +01:00
except ApiException as e :
if e . status == expect_status_code :
if expect_response_body is not None and e . body . strip ( ) != expect_response_body . strip ( ) :
raise Exception ( r " Create GC schedule response body is not as expected {} actual status is {} . " . format ( expect_response_body . strip ( ) , e . body . strip ( ) ) )
else :
return e . reason , e . body
else :
raise Exception ( r " Create GC schedule result is not as expected {} actual status is {} . " . format ( expect_status_code , e . status ) )
base . _assert_status_code ( expect_status_code , status_code )
return base . _get_id_from_header ( header )
2020-11-27 12:11:39 +01:00
def wait_until_scans_all_finish ( self , * * kwargs ) :
client = self . _get_client ( * * kwargs )
timeout_count = 50
scan_status = " "
while True :
time . sleep ( 5 )
timeout_count = timeout_count - 1
if ( timeout_count == 0 ) :
break
stats = client . scans_all_metrics_get ( )
print ( " Scan all status: " , stats )
2020-10-14 10:36:47 +02:00
if stats . ongoing is False :
2020-11-27 12:11:39 +01:00
return
raise Exception ( " Error: Scan all job is timeout. " )
2019-03-22 10:52:21 +01:00
def create_scan_all_schedule ( self , schedule_type , cron = None , expect_status_code = 201 , expect_response_body = None , * * kwargs ) :
client = self . _get_client ( * * kwargs )
scanschedule = swagger_client . AdminJobScheduleObj ( )
scanschedule . type = schedule_type
if cron is not None :
scanschedule . cron = cron
scan_all_schedule = swagger_client . AdminJobSchedule ( scanschedule )
try :
_ , status_code , header = client . system_scan_all_schedule_post_with_http_info ( scan_all_schedule )
except ApiException as e :
if e . status == expect_status_code :
if expect_response_body is not None and e . body . strip ( ) != expect_response_body . strip ( ) :
raise Exception ( r " Create Scan All schedule response body is not as expected {} actual status is {} . " . format ( expect_response_body . strip ( ) , e . body . strip ( ) ) )
else :
return e . reason , e . body
else :
raise Exception ( r " Create Scan All schedule result is not as expected {} actual status is {} . " . format ( expect_status_code , e . status ) )
base . _assert_status_code ( expect_status_code , status_code )
return base . _get_id_from_header ( header )
def scan_now ( self , * * kwargs ) :
scan_all_id = self . create_scan_all_schedule ( ' Manual ' , * * kwargs )
return scan_all_id
2020-06-22 04:34:03 +02:00
def set_cve_allowlist ( self , expires_at = None , expected_status_code = 200 , * cve_ids , * * kwargs ) :
2019-08-13 08:02:00 +02:00
client = self . _get_client ( * * kwargs )
2020-06-22 04:34:03 +02:00
cve_list = [ swagger_client . CVEAllowlistItem ( cve_id = c ) for c in cve_ids ]
allowlist = swagger_client . CVEAllowlist ( expires_at = expires_at , items = cve_list )
2019-08-13 08:02:00 +02:00
try :
2020-06-22 04:34:03 +02:00
r = client . system_cve_allowlist_put_with_http_info ( allowlist = allowlist , _preload_content = False )
2019-08-13 08:02:00 +02:00
except Exception as e :
base . _assert_status_code ( expected_status_code , e . status )
else :
2020-08-17 08:51:18 +02:00
base . _assert_status_code ( expected_status_code , r . status )
2019-08-13 08:02:00 +02:00
2020-06-22 04:34:03 +02:00
def get_cve_allowlist ( self , * * kwargs ) :
2019-08-13 08:02:00 +02:00
client = self . _get_client ( * * kwargs )
2020-06-22 04:34:03 +02:00
return client . system_cve_allowlist_get ( )
2019-09-11 07:34:23 +02:00
def get_project_quota ( self , reference , reference_id , * * kwargs ) :
params = { }
params [ ' reference ' ] = reference
params [ ' reference_id ' ] = reference_id
client = self . _get_client ( * * kwargs )
data , status_code , _ = client . quotas_get_with_http_info ( * * params )
base . _assert_status_code ( 200 , status_code )
2020-06-22 04:34:03 +02:00
return data