harbor/tests/apitests/python/library/robot.py

133 lines
6.6 KiB
Python
Raw Permalink Normal View History

# -*- coding: utf-8 -*-
import time
import base
import v2_swagger_client
from v2_swagger_client.rest import ApiException
from base import _assert_status_code
class Robot(base.Base, object):
def __init__(self):
super(Robot,self).__init__(api_type = "robot")
def list_robot(self, expect_status_code = 200, **kwargs):
try:
body, status_code, _ = self._get_client(**kwargs).list_robot_with_http_info()
except ApiException as e:
base._assert_status_code(expect_status_code, e.status)
return []
else:
base._assert_status_code(expect_status_code, status_code)
base._assert_status_code(200, status_code)
return body
def create_access_list(self, right_map = [True] * 7):
_assert_status_code(7, len(right_map), r"Please input full access list for system robot account. Expected {}, while actual input count is {}.")
action_pull = "pull"
action_push = "push"
action_read = "read"
action_create = "create"
action_del = "delete"
access_def_list = [
("repository", action_pull),
("repository", action_push),
("artifact", action_del),
("tag", action_create),
("tag", action_del),
("artifact-label", action_create),
("scan", action_create)
]
access_list = []
for i in range(len(access_def_list)):
if right_map[i] is True:
robotAccountAccess = v2_swagger_client.Access(resource = access_def_list[i][0], action = access_def_list[i][1])
access_list.append(robotAccountAccess)
return access_list
def create_project_robot(self, project_name, duration, robot_name = None, robot_desc = None,
has_pull_right = True, has_push_right = True, expect_status_code = 201, expect_response_body = None,
**kwargs):
if robot_name is None:
robot_name = base._random_name("robot")
if robot_desc is None:
robot_desc = base._random_name("robot_desc")
if has_pull_right is False and has_push_right is False:
has_pull_right = True
access_list = []
action_pull = "pull"
action_push = "push"
if has_pull_right is True:
robotAccountAccess = v2_swagger_client.Access(resource = "repository", action = action_pull)
access_list.append(robotAccountAccess)
if has_push_right is True:
robotAccountAccess = v2_swagger_client.Access(resource = "repository", action = action_push)
access_list.append(robotAccountAccess)
robotaccountPermissions = v2_swagger_client.RobotPermission(kind = "project", namespace = project_name, access = access_list)
permission_list = []
permission_list.append(robotaccountPermissions)
robotAccountCreate = v2_swagger_client.RobotCreate(name=robot_name, description=robot_desc, duration=duration, level="project", permissions = permission_list)
data = []
try:
data, status_code, header = self._get_client(**kwargs).create_robot_with_http_info(robotAccountCreate)
except ApiException as e:
base._assert_status_code(expect_status_code, e.status)
if expect_response_body is not None:
base._assert_status_body(expect_response_body, e.body)
else:
base._assert_status_code(expect_status_code, status_code)
base._assert_status_code(201, status_code)
return base._get_id_from_header(header), data
def get_robot_account_by_id(self, robot_id, **kwargs):
data, status_code, _ = self._get_client(**kwargs).get_robot_by_id_with_http_info(robot_id)
return data
def disable_robot_account(self, robot_id, disable, expect_status_code = 200, **kwargs):
data = self.get_robot_account_by_id(robot_id, **kwargs)
robotAccountUpdate = v2_swagger_client.RobotCreate(name=data.name, description=data.description, duration=data.duration, level=data.level, permissions = data.permissions, disable = disable)
_, status_code, _ = self._get_client(**kwargs).update_robot_with_http_info(robot_id, robotAccountUpdate)
base._assert_status_code(expect_status_code, status_code)
base._assert_status_code(200, status_code)
def delete_robot_account(self, robot_id, expect_status_code = 200, **kwargs):
_, status_code, _ = self._get_client(**kwargs).delete_robot_with_http_info(robot_id)
base._assert_status_code(expect_status_code, status_code)
base._assert_status_code(200, status_code)
def create_system_robot(self, permission_list, duration, robot_name = None, robot_desc = None, expect_status_code = 201, **kwargs):
if robot_name is None:
robot_name = base._random_name("robot")
if robot_desc is None:
robot_desc = base._random_name("robot_desc")
robotAccountCreate = v2_swagger_client.RobotCreate(name=robot_name, description=robot_desc, duration=duration, level="system", disable = False, permissions = permission_list)
data = []
data, status_code, header = self._get_client(**kwargs).create_robot_with_http_info(robotAccountCreate)
base._assert_status_code(expect_status_code, status_code)
base._assert_status_code(201, status_code)
return base._get_id_from_header(header), data
def update_robot_account(self, robot_id, robot, expect_status_code = 200, **kwargs):
_, status_code, _ = self._get_client(**kwargs).update_robot_with_http_info(robot_id, robot)
base._assert_status_code(expect_status_code, status_code)
base._assert_status_code(200, status_code)
def update_system_robot_account(self, robot_id, robot_name, robot_account_Permissions_list, disable = None, expect_status_code = 200, **kwargs):
robot = v2_swagger_client.Robot(id = robot_id, name = robot_name, level = "system", permissions = robot_account_Permissions_list)
if disable in (True, False):
robot.disable = disable
self.update_robot_account(robot_id, robot, expect_status_code = expect_status_code, **kwargs)
def refresh_robot_account_secret(self, robot_id, robot_new_sec, expect_status_code = 200, **kwargs):
robot_sec = v2_swagger_client.RobotSec(secret = robot_new_sec)
data, status_code, _ = self._get_client(**kwargs).refresh_sec_with_http_info(robot_id, robot_sec)
base._assert_status_code(expect_status_code, status_code)
base._assert_status_code(200, status_code)
print("Refresh new secret:", data)
return data