2018-08-16 12:36:27 +02:00
# -*- coding: utf-8 -*-
import base
import swagger_client
2020-08-15 18:09:06 +02:00
import v2_swagger_client
from v2_swagger_client . rest import ApiException
2020-08-17 08:51:18 +02:00
from library . base import _assert_status_code
2018-08-16 12:36:27 +02:00
2018-11-20 07:24:13 +01:00
def is_member_exist_in_project ( members , member_user_name , expected_member_role_id = None ) :
result = False
for member in members :
if member . entity_name == member_user_name :
if expected_member_role_id != None :
if member . role_id == expected_member_role_id :
return True
else :
return True
return result
2019-04-11 09:05:46 +02:00
def get_member_id_by_name ( members , member_user_name ) :
for member in members :
if member . entity_name == member_user_name :
return member . id
return None
2018-08-16 12:36:27 +02:00
class Project ( base . Base ) :
2020-08-15 18:09:06 +02:00
def __init__ ( self , username = None , password = None ) :
kwargs = dict ( api_type = " projectv2 " )
if username and password :
kwargs [ " credential " ] = base . Credential ( ' basic_auth ' , username , password )
super ( Project , self ) . __init__ ( * * kwargs )
2020-10-22 03:55:38 +02:00
def create_project ( self , name = None , registry_id = None , metadata = None , expect_status_code = 201 , expect_response_body = None , * * kwargs ) :
2018-09-19 11:49:43 +02:00
if name is None :
name = base . _random_name ( " project " )
2018-10-31 06:13:31 +01:00
if metadata is None :
metadata = { }
2020-10-22 03:55:38 +02:00
if registry_id is None :
registry_id = registry_id
2018-08-16 12:36:27 +02:00
client = self . _get_client ( * * kwargs )
2018-11-14 09:23:44 +01:00
try :
2020-10-22 03:55:38 +02:00
_ , status_code , header = client . create_project_with_http_info ( v2_swagger_client . ProjectReq ( project_name = name , registry_id = registry_id , metadata = metadata ) )
2018-11-14 09:23:44 +01:00
except ApiException as e :
2018-11-29 11:27:53 +01:00
base . _assert_status_code ( expect_status_code , e . status )
if expect_response_body is not None :
base . _assert_status_body ( expect_response_body , e . body )
return
2018-11-14 09:23:44 +01:00
base . _assert_status_code ( expect_status_code , status_code )
2018-11-29 11:27:53 +01:00
base . _assert_status_code ( 201 , status_code )
2018-11-14 09:23:44 +01:00
return base . _get_id_from_header ( header ) , name
2018-10-31 06:13:31 +01:00
def get_projects ( self , params , * * kwargs ) :
client = self . _get_client ( * * kwargs )
data = [ ]
2020-08-15 18:09:06 +02:00
data , status_code , _ = client . list_projects_with_http_info ( * * params )
2018-10-31 06:13:31 +01:00
base . _assert_status_code ( 200 , status_code )
return data
2020-07-21 03:45:13 +02:00
def get_project_id ( self , project_name , * * kwargs ) :
project_data = self . get_projects ( dict ( ) , * * kwargs )
actual_count = len ( project_data )
if actual_count == 1 and str ( project_data [ 0 ] . project_name ) != str ( project_name ) :
return project_data [ 0 ] . project_id
else :
return None
2018-11-01 11:26:04 +01:00
def projects_should_exist ( self , params , expected_count = None , expected_project_id = None , * * kwargs ) :
project_data = self . get_projects ( params , * * kwargs )
actual_count = len ( project_data )
if expected_count is not None and actual_count != expected_count :
raise Exception ( r " Private project count should be {} . " . format ( expected_count ) )
if expected_project_id is not None and actual_count == 1 and str ( project_data [ 0 ] . project_id ) != str ( expected_project_id ) :
raise Exception ( r " Project-id check failed, expect {} but got {} , please check this test case. " . format ( str ( expected_project_id ) , str ( project_data [ 0 ] . project_id ) ) )
2018-10-31 06:13:31 +01:00
def check_project_name_exist ( self , name = None , * * kwargs ) :
client = self . _get_client ( * * kwargs )
2020-07-16 12:08:10 +02:00
try :
2020-08-15 18:09:06 +02:00
_ , status_code , _ = client . head_project_with_http_info ( name )
2020-07-16 12:08:10 +02:00
except ApiException as e :
status_code = - 1
2018-10-31 06:13:31 +01:00
return {
200 : True ,
404 : False ,
2020-07-16 12:08:10 +02:00
} . get ( status_code , False )
2018-10-31 06:13:31 +01:00
2018-11-29 11:27:53 +01:00
def get_project ( self , project_id , expect_status_code = 200 , expect_response_body = None , * * kwargs ) :
2018-10-31 06:13:31 +01:00
client = self . _get_client ( * * kwargs )
2018-11-20 07:24:13 +01:00
try :
2020-08-15 18:09:06 +02:00
data , status_code , _ = client . get_project_with_http_info ( project_id )
2018-11-20 07:24:13 +01:00
except ApiException as e :
2018-11-29 11:27:53 +01:00
base . _assert_status_code ( expect_status_code , e . status )
if expect_response_body is not None :
base . _assert_status_body ( expect_response_body , e . body )
return
2018-11-20 07:24:13 +01:00
base . _assert_status_code ( expect_status_code , status_code )
2018-11-29 11:27:53 +01:00
base . _assert_status_code ( 200 , status_code )
2018-10-31 06:13:31 +01:00
return data
2020-06-22 04:34:03 +02:00
def update_project ( self , project_id , expect_status_code = 200 , metadata = None , cve_allowlist = None , * * kwargs ) :
2018-10-31 06:13:31 +01:00
client = self . _get_client ( * * kwargs )
2020-08-15 18:09:06 +02:00
project = v2_swagger_client . ProjectReq ( metadata = metadata , cve_allowlist = cve_allowlist )
2019-08-14 11:47:52 +02:00
try :
2020-08-15 18:09:06 +02:00
_ , sc , _ = client . update_project_with_http_info ( project_id , project )
2019-08-14 11:47:52 +02:00
except ApiException as e :
base . _assert_status_code ( expect_status_code , e . status )
else :
base . _assert_status_code ( expect_status_code , sc )
2018-10-31 06:13:31 +01:00
2018-11-01 11:26:04 +01:00
def delete_project ( self , project_id , expect_status_code = 200 , * * kwargs ) :
2018-10-31 06:13:31 +01:00
client = self . _get_client ( * * kwargs )
2020-08-15 18:09:06 +02:00
_ , status_code , _ = client . delete_project_with_http_info ( project_id )
2018-11-01 11:26:04 +01:00
base . _assert_status_code ( expect_status_code , status_code )
2018-10-31 06:13:31 +01:00
2020-08-15 18:09:06 +02:00
def get_project_log ( self , project_name , expect_status_code = 200 , * * kwargs ) :
2018-11-29 11:27:53 +01:00
client = self . _get_client ( * * kwargs )
2020-08-15 18:09:06 +02:00
body , status_code , _ = client . get_logs_with_http_info ( project_name )
2018-11-29 11:27:53 +01:00
base . _assert_status_code ( expect_status_code , status_code )
return body
2020-08-15 18:09:06 +02:00
def filter_project_logs ( self , project_name , operator , resource , resource_type , operation , * * kwargs ) :
access_logs = self . get_project_log ( project_name , * * kwargs )
2018-11-29 11:27:53 +01:00
count = 0
for each_access_log in list ( access_logs ) :
if each_access_log . username == operator and \
2020-08-15 18:09:06 +02:00
each_access_log . resource_type == resource_type and \
each_access_log . resource == resource and \
each_access_log . operation == operation :
2018-11-29 11:27:53 +01:00
count = count + 1
return count
2018-10-31 06:13:31 +01:00
def get_project_members ( self , project_id , * * kwargs ) :
2020-08-15 18:09:06 +02:00
kwargs [ ' api_type ' ] = ' products '
2018-11-20 07:24:13 +01:00
client = self . _get_client ( * * kwargs )
return client . projects_project_id_members_get ( project_id )
2018-11-29 11:27:53 +01:00
def get_project_member ( self , project_id , member_id , expect_status_code = 200 , expect_response_body = None , * * kwargs ) :
2020-08-15 18:09:06 +02:00
from swagger_client . rest import ApiException
kwargs [ ' api_type ' ] = ' products '
2018-10-31 06:13:31 +01:00
client = self . _get_client ( * * kwargs )
data = [ ]
2018-11-20 07:24:13 +01:00
try :
data , status_code , _ = client . projects_project_id_members_mid_get_with_http_info ( project_id , member_id , )
except ApiException as e :
2018-11-29 11:27:53 +01:00
base . _assert_status_code ( expect_status_code , e . status )
if expect_response_body is not None :
base . _assert_status_body ( expect_response_body , e . body )
return
2018-11-20 07:24:13 +01:00
base . _assert_status_code ( expect_status_code , status_code )
2018-11-29 11:27:53 +01:00
base . _assert_status_code ( 200 , status_code )
2018-11-20 07:24:13 +01:00
return data
2019-04-11 09:05:46 +02:00
def get_project_member_id ( self , project_id , member_user_name , * * kwargs ) :
2020-08-15 18:09:06 +02:00
kwargs [ ' api_type ' ] = ' products '
2019-04-11 09:05:46 +02:00
members = self . get_project_members ( project_id , * * kwargs )
result = get_member_id_by_name ( list ( members ) , member_user_name )
if result == None :
raise Exception ( r " Failed to get member id of member {} in project {} . " . format ( member_user_name , project_id ) )
else :
return result
2018-11-20 07:24:13 +01:00
def check_project_member_not_exist ( self , project_id , member_user_name , * * kwargs ) :
2020-08-15 18:09:06 +02:00
kwargs [ ' api_type ' ] = ' products '
2018-11-20 07:24:13 +01:00
members = self . get_project_members ( project_id , * * kwargs )
result = is_member_exist_in_project ( list ( members ) , member_user_name )
if result == True :
raise Exception ( r " User {} should not be a member of project with ID {} . " . format ( member_user_name , project_id ) )
def check_project_members_exist ( self , project_id , member_user_name , expected_member_role_id = None , * * kwargs ) :
2020-08-15 18:09:06 +02:00
kwargs [ ' api_type ' ] = ' products '
2018-11-20 07:24:13 +01:00
members = self . get_project_members ( project_id , * * kwargs )
result = is_member_exist_in_project ( members , member_user_name , expected_member_role_id = expected_member_role_id )
if result == False :
raise Exception ( r " User {} should be a member of project with ID {} . " . format ( member_user_name , project_id ) )
def update_project_member_role ( self , project_id , member_id , member_role_id , expect_status_code = 200 , * * kwargs ) :
2020-08-15 18:09:06 +02:00
kwargs [ ' api_type ' ] = ' products '
2018-11-20 07:24:13 +01:00
client = self . _get_client ( * * kwargs )
role = swagger_client . Role ( role_id = member_role_id )
data , status_code , _ = client . projects_project_id_members_mid_put_with_http_info ( project_id , member_id , role = role )
base . _assert_status_code ( expect_status_code , status_code )
2019-02-18 06:47:16 +01:00
base . _assert_status_code ( 200 , status_code )
2018-10-31 06:13:31 +01:00
return data
2018-11-20 07:24:13 +01:00
def delete_project_member ( self , project_id , member_id , expect_status_code = 200 , * * kwargs ) :
2020-08-15 18:09:06 +02:00
kwargs [ ' api_type ' ] = ' products '
2018-11-20 07:24:13 +01:00
client = self . _get_client ( * * kwargs )
_ , status_code , _ = client . projects_project_id_members_mid_delete_with_http_info ( project_id , member_id )
base . _assert_status_code ( expect_status_code , status_code )
2019-02-18 06:47:16 +01:00
base . _assert_status_code ( 200 , status_code )
2018-11-20 07:24:13 +01:00
2020-09-07 05:33:27 +02:00
def add_project_members ( self , project_id , user_id = None , member_role_id = None , _ldap_group_dn = None , expect_status_code = 201 , * * kwargs ) :
2020-08-15 18:09:06 +02:00
kwargs [ ' api_type ' ] = ' products '
2020-08-17 08:51:18 +02:00
projectMember = swagger_client . ProjectMember ( )
if user_id is not None :
projectMember . member_user = { " user_id " : int ( user_id ) }
2018-10-31 06:13:31 +01:00
if member_role_id is None :
2020-08-17 08:51:18 +02:00
projectMember . role_id = 1
else :
projectMember . role_id = member_role_id
if _ldap_group_dn is not None :
projectMember . member_group = swagger_client . UserGroup ( ldap_group_dn = _ldap_group_dn )
2018-10-31 06:13:31 +01:00
client = self . _get_client ( * * kwargs )
data = [ ]
2020-09-07 05:33:27 +02:00
try :
data , status_code , header = client . projects_project_id_members_post_with_http_info ( project_id , project_member = projectMember )
except swagger_client . rest . ApiException as e :
base . _assert_status_code ( expect_status_code , e . status )
else :
base . _assert_status_code ( expect_status_code , status_code )
return base . _get_id_from_header ( header )
2018-11-20 07:24:13 +01:00
2020-08-17 08:51:18 +02:00
def query_user_logs ( self , project_name , status_code = 200 , * * kwargs ) :
try :
logs = self . get_project_log ( project_name , expect_status_code = status_code , * * kwargs )
count = 0
for log in list ( logs ) :
count = count + 1
return count
except ApiException as e :
_assert_status_code ( status_code , e . status )
return 0