harbor/tests/apitests/python/library/jobservice.py

68 lines
3.1 KiB
Python
Raw Normal View History

# -*- coding: utf-8 -*-
import base
import v2_swagger_client
from v2_swagger_client.rest import ApiException
class Jobservice(base.Base):
def __init__(self):
super(Jobservice, self).__init__(api_type="jobservice")
def get_job_queues(self, expect_status_code=200, expect_response_body=None, **kwargs):
try:
return_data, status_code, _ = self._get_client(**kwargs).list_job_queues_with_http_info()
except ApiException as e:
base._assert_status_code(expect_status_code, e.status)
if expect_response_body is not None:
base._assert_status_body(expect_response_body, e.body)
return
base._assert_status_code(expect_status_code, status_code)
return dict(zip([job_queue.job_type for job_queue in return_data], return_data))
def action_pending_jobs(self, job_type, action, expect_status_code=200, expect_response_body=None, **kwargs):
try:
action_request = v2_swagger_client.ActionRequest(action=action)
return_data, status_code, _ = self._get_client(**kwargs).action_pending_jobs_with_http_info(job_type, action_request)
except ApiException as e:
base._assert_status_code(expect_status_code, e.status)
if expect_response_body is not None:
base._assert_status_body(expect_response_body, e.body)
return
base._assert_status_code(expect_status_code, status_code)
return return_data
def get_worker_pools(self, expect_status_code=200, expect_response_body=None, **kwargs):
try:
return_data, status_code, _ = self._get_client(**kwargs).get_worker_pools_with_http_info()
except ApiException as e:
base._assert_status_code(expect_status_code, e.status)
if expect_response_body is not None:
base._assert_status_body(expect_response_body, e.body)
return
base._assert_status_code(expect_status_code, status_code)
return return_data
def get_workers(self, pool_id, expect_status_code=200, expect_response_body=None, **kwargs):
try:
return_data, status_code, _ = self._get_client(**kwargs).get_workers_with_http_info(pool_id)
except ApiException as e:
base._assert_status_code(expect_status_code, e.status)
if expect_response_body is not None:
base._assert_status_body(expect_response_body, e.body)
return
base._assert_status_code(expect_status_code, status_code)
return return_data
def stop_running_job(self, job_id, expect_status_code=200, expect_response_body=None, **kwargs):
try:
return_data, status_code, _ = self._get_client(**kwargs).stop_running_job_with_http_info(job_id)
except ApiException as e:
base._assert_status_code(expect_status_code, e.status)
if expect_response_body is not None:
base._assert_status_body(expect_response_body, e.body)
return
base._assert_status_code(expect_status_code, status_code)
return return_data