Add JobService Dashboard API test cases (#18234)

Added test cases for the following APIs:
1. GET /jobservice/pools/{pool_id}/workers Get workers
2. PUT /jobservice/jobs/{job_id} Stop running jc
3. PUT /jobservice/queues/{job_type} stop and clean, pause, resume pending jobs in the queue
4. GET /jobservice/queues list job queues
5. GET /jobservice/pools Get worker pools
6. GET /schedules List schedules
7. GET /schedules/{job_type}/paused Get scheduler paused status

Signed-off-by: Yang Jiao <jiaoya@vmware.com>
This commit is contained in:
Yang Jiao 2023-02-17 14:56:13 +08:00 committed by GitHub
parent 73d8e68921
commit 2b82f99986
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 514 additions and 13 deletions

View File

@ -32,7 +32,7 @@ def _create_client(server, credential, debug, api_type="products"):
if api_type in ('projectv2', 'artifact', 'repository', 'scanner', 'scan', 'scanall', 'preheat', 'quota', if api_type in ('projectv2', 'artifact', 'repository', 'scanner', 'scan', 'scanall', 'preheat', 'quota',
'replication', 'registry', 'robot', 'gc', 'retention', 'immutable', 'system_cve_allowlist', 'replication', 'registry', 'robot', 'gc', 'retention', 'immutable', 'system_cve_allowlist',
'configure', 'user', 'member', 'health', 'label', 'webhook', 'purge', 'audit_log', 'scan_data_export', 'configure', 'user', 'member', 'health', 'label', 'webhook', 'purge', 'audit_log', 'scan_data_export',
'statistic', "system_info"): 'statistic', "system_info", "jobservice", "schedule"):
cfg = v2_swagger_client.Configuration() cfg = v2_swagger_client.Configuration()
else: else:
cfg = swagger_client.Configuration() cfg = swagger_client.Configuration()
@ -81,7 +81,9 @@ def _create_client(server, credential, debug, api_type="products"):
"audit_log": v2_swagger_client.AuditlogApi(v2_swagger_client.ApiClient(cfg)), "audit_log": v2_swagger_client.AuditlogApi(v2_swagger_client.ApiClient(cfg)),
"scan_data_export": v2_swagger_client.ScanDataExportApi(v2_swagger_client.ApiClient(cfg)), "scan_data_export": v2_swagger_client.ScanDataExportApi(v2_swagger_client.ApiClient(cfg)),
"statistic": v2_swagger_client.StatisticApi(v2_swagger_client.ApiClient(cfg)), "statistic": v2_swagger_client.StatisticApi(v2_swagger_client.ApiClient(cfg)),
"system_info": v2_swagger_client.SysteminfoApi(v2_swagger_client.ApiClient(cfg)) "system_info": v2_swagger_client.SysteminfoApi(v2_swagger_client.ApiClient(cfg)),
"jobservice": v2_swagger_client.JobserviceApi(v2_swagger_client.ApiClient(cfg)),
"schedule": v2_swagger_client.ScheduleApi(v2_swagger_client.ApiClient(cfg)),
}.get(api_type,'Error: Wrong API type') }.get(api_type,'Error: Wrong API type')
def _assert_status_code(expect_code, return_code, err_msg = r"HTTPS status code s not as we expected. Expected {}, while actual HTTPS status code is {}."): def _assert_status_code(expect_code, return_code, err_msg = r"HTTPS status code s not as we expected. Expected {}, while actual HTTPS status code is {}."):

View File

@ -91,6 +91,28 @@ class GC(base.Base, object):
base._assert_status_code(expect_status_code, status_code) base._assert_status_code(expect_status_code, status_code)
return base._get_id_from_header(header) return base._get_id_from_header(header)
def update_gc_schedule(self, schedule_type, cron = None, expect_status_code = 200, expect_response_body = None, **kwargs):
gc_schedule = v2_swagger_client.ScheduleObj()
gc_schedule.type = schedule_type
if cron is not None:
gc_schedule.cron = cron
gc_job = v2_swagger_client.Schedule()
gc_job.schedule = gc_schedule
try:
_, status_code, header = self._get_client(**kwargs).update_gc_schedule_with_http_info(gc_job)
except ApiException as e:
if e.status == expect_status_code:
if expect_response_body is not None and e.body.strip() != expect_response_body.strip():
raise Exception(r"Update GC schedule response body is not as expected {} actual status is {}.".format(expect_response_body.strip(), e.body.strip()))
else:
return e.reason, e.body
else:
raise Exception(r"Update GC schedule result is not as expected {} actual status is {}.".format(expect_status_code, e.status))
base._assert_status_code(expect_status_code, status_code)
return base._get_id_from_header(header)
def gc_now(self, is_delete_untagged=False, **kwargs): def gc_now(self, is_delete_untagged=False, **kwargs):
gc_id = self.create_gc_schedule('Manual', is_delete_untagged, **kwargs) gc_id = self.create_gc_schedule('Manual', is_delete_untagged, **kwargs)
return gc_id return gc_id

View File

@ -0,0 +1,67 @@
# -*- coding: utf-8 -*-
import base
import v2_swagger_client
from v2_swagger_client.rest import ApiException
class Jobservice(base.Base):
def __init__(self):
super(Jobservice, self).__init__(api_type="jobservice")
def get_job_queues(self, expect_status_code=200, expect_response_body=None, **kwargs):
try:
return_data, status_code, _ = self._get_client(**kwargs).list_job_queues_with_http_info()
except ApiException as e:
base._assert_status_code(expect_status_code, e.status)
if expect_response_body is not None:
base._assert_status_body(expect_response_body, e.body)
return
base._assert_status_code(expect_status_code, status_code)
return dict(zip([job_queue.job_type for job_queue in return_data], return_data))
def action_pending_jobs(self, job_type, action, expect_status_code=200, expect_response_body=None, **kwargs):
try:
action_request = v2_swagger_client.ActionRequest(action=action)
return_data, status_code, _ = self._get_client(**kwargs).action_pending_jobs_with_http_info(job_type, action_request)
except ApiException as e:
base._assert_status_code(expect_status_code, e.status)
if expect_response_body is not None:
base._assert_status_body(expect_response_body, e.body)
return
base._assert_status_code(expect_status_code, status_code)
return return_data
def get_worker_pools(self, expect_status_code=200, expect_response_body=None, **kwargs):
try:
return_data, status_code, _ = self._get_client(**kwargs).get_worker_pools_with_http_info()
except ApiException as e:
base._assert_status_code(expect_status_code, e.status)
if expect_response_body is not None:
base._assert_status_body(expect_response_body, e.body)
return
base._assert_status_code(expect_status_code, status_code)
return return_data
def get_workers(self, pool_id, expect_status_code=200, expect_response_body=None, **kwargs):
try:
return_data, status_code, _ = self._get_client(**kwargs).get_workers_with_http_info(pool_id)
except ApiException as e:
base._assert_status_code(expect_status_code, e.status)
if expect_response_body is not None:
base._assert_status_body(expect_response_body, e.body)
return
base._assert_status_code(expect_status_code, status_code)
return return_data
def stop_running_job(self, job_id, expect_status_code=200, expect_response_body=None, **kwargs):
try:
return_data, status_code, _ = self._get_client(**kwargs).stop_running_job_with_http_info(job_id)
except ApiException as e:
base._assert_status_code(expect_status_code, e.status)
if expect_response_body is not None:
base._assert_status_body(expect_response_body, e.body)
return
base._assert_status_code(expect_status_code, status_code)
return return_data

View File

@ -16,7 +16,6 @@ class Preheat(base.Base, object):
name = base._random_name("instance") name = base._random_name("instance")
instance = v2_swagger_client.Instance(name=name, description=description,vendor=vendor, instance = v2_swagger_client.Instance(name=name, description=description,vendor=vendor,
endpoint=endpoint_url, auth_mode=auth_mode, enabled=enabled) endpoint=endpoint_url, auth_mode=auth_mode, enabled=enabled)
print("instance:",instance)
try: try:
_, status_code, header = self._get_client(**kwargs).create_instance_with_http_info(instance) _, status_code, header = self._get_client(**kwargs).create_instance_with_http_info(instance)
except ApiException as e: except ApiException as e:
@ -36,7 +35,6 @@ class Preheat(base.Base, object):
policy = v2_swagger_client.PreheatPolicy(name=name, project_id=project_id, provider_id=provider_id, policy = v2_swagger_client.PreheatPolicy(name=name, project_id=project_id, provider_id=provider_id,
description=description,filters=filters, description=description,filters=filters,
trigger=trigger, enabled=enabled) trigger=trigger, enabled=enabled)
print("policy:",policy)
try: try:
data, status_code, header = self._get_client(**kwargs).create_policy_with_http_info(project_name, policy) data, status_code, header = self._get_client(**kwargs).create_policy_with_http_info(project_name, policy)
except ApiException as e: except ApiException as e:

View File

@ -10,7 +10,7 @@ class Purge(base.Base):
def __init__(self): def __init__(self):
super(Purge, self).__init__(api_type="purge") super(Purge, self).__init__(api_type="purge")
def create_purge_schedule(self, type, cron, dry_run, audit_retention_hour=24, include_operations="create,delete,pull", expect_status_code=201, expect_response_body=None, **kwargs): def create_purge_schedule(self, type, cron, dry_run=True, audit_retention_hour=24, include_operations="create,delete,pull", expect_status_code=201, expect_response_body=None, **kwargs):
scheduleObj = v2_swagger_client.ScheduleObj(type=type) scheduleObj = v2_swagger_client.ScheduleObj(type=type)
if cron is not None: if cron is not None:
scheduleObj.cron = cron scheduleObj.cron = cron

View File

@ -63,7 +63,7 @@ class Retention(base.Base):
base._assert_status_code(expect_status_code, status_code) base._assert_status_code(expect_status_code, status_code)
return policy return policy
def update_retention_policy(self, retention_id, selector_repository="**", selector_tag="**", expect_status_code = 200, **kwargs): def update_retention_policy(self, retention_id, project_id, selector_repository="**", selector_tag="**", cron="", expect_status_code = 200, **kwargs):
policy=v2_swagger_client.RetentionPolicy( policy=v2_swagger_client.RetentionPolicy(
id=retention_id, id=retention_id,
algorithm='or', algorithm='or',
@ -72,9 +72,7 @@ class Retention(base.Base):
disabled=False, disabled=False,
action="retain", action="retain",
template="always", template="always",
params= { params= {},
},
scope_selectors={ scope_selectors={
"repository": [ "repository": [
{ {
@ -96,7 +94,7 @@ class Retention(base.Base):
trigger= { trigger= {
"kind": "Schedule", "kind": "Schedule",
"settings": { "settings": {
"cron": "" "cron": cron
}, },
"references": { "references": {
} }

View File

@ -30,6 +30,27 @@ class ScanAll(base.Base):
raise Exception(r"Create scan all schedule result is not as expected {} actual status is {}.".format(expect_status_code, e.status)) raise Exception(r"Create scan all schedule result is not as expected {} actual status is {}.".format(expect_status_code, e.status))
base._assert_status_code(expect_status_code, status_code) base._assert_status_code(expect_status_code, status_code)
def update_scan_all_schedule(self, schedule_type, cron=None, expect_status_code=200, expect_response_body=None, **kwargs):
schedule_obj = v2_swagger_client.ScheduleObj()
schedule_obj.type = schedule_type
if cron is not None:
schedule_obj.cron = cron
schedule = v2_swagger_client.Schedule()
schedule.schedule = schedule_obj
try:
_, status_code, _ = self._get_client(**kwargs).update_scan_all_schedule_with_http_info(schedule)
except ApiException as e:
if e.status == expect_status_code:
if expect_response_body is not None and e.body.strip() != expect_response_body.strip():
raise Exception(r"Update scan all schedule response body is not as expected {} actual status is {}.".format(expect_response_body.strip(), e.body.strip()))
else:
return e.reason, e.body
else:
raise Exception(r"Update scan all schedule result is not as expected {} actual status is {}.".format(expect_status_code, e.status))
base._assert_status_code(expect_status_code, status_code)
def scan_all_now(self, **kwargs): def scan_all_now(self, **kwargs):
self.create_scan_all_schedule('Manual', **kwargs) self.create_scan_all_schedule('Manual', **kwargs)

View File

@ -0,0 +1,38 @@
# -*- coding: utf-8 -*-
import base
from v2_swagger_client.rest import ApiException
class Schedule(base.Base):
def __init__(self):
super(Schedule, self).__init__(api_type="schedule")
def get_schedule_paused(self, job_type, expect_status_code=200, expect_response_body=None, **kwargs):
try:
return_data, status_code, _ = self._get_client(**kwargs).get_schedule_paused_with_http_info(job_type)
except ApiException as e:
base._assert_status_code(expect_status_code, e.status)
if expect_response_body is not None:
base._assert_status_body(expect_response_body, e.body)
return
base._assert_status_code(expect_status_code, status_code)
return return_data
def list_schedules(self, page_size=50, page=1, expect_status_code=200, expect_response_body=None, **kwargs):
try:
schedules, status_code, _ = self._get_client(**kwargs).list_schedules_with_http_info(page_size=50, page=1)
except ApiException as e:
base._assert_status_code(expect_status_code, e.status)
if expect_response_body is not None:
base._assert_status_body(expect_response_body, e.body)
return
base._assert_status_code(expect_status_code, status_code)
schedule_dict = {}
for schedule in schedules:
if schedule.vendor_id in [ None, -1]:
schedule_dict[schedule.vendor_type] = schedule
else:
schedule_dict["%s-%d" % (schedule.vendor_type, schedule.vendor_id)] = schedule
return schedule_dict

View File

@ -0,0 +1,351 @@
from __future__ import absolute_import
import time
import unittest
import v2_swagger_client
from library import base
from testutils import harbor_server, ADMIN_CLIENT, suppress_urllib3_warning
from library.jobservice import Jobservice
from library.gc import GC
from library.purge import Purge
from library.user import User
from library.project import Project
from library.retention import Retention
from library.preheat import Preheat
from library.replication import Replication
from library.registry import Registry
from library.scan_all import ScanAll
from library.schedule import Schedule
class TestJobServiceDashboard(unittest.TestCase, object):
@suppress_urllib3_warning
def setUp(self):
self.jobservice = Jobservice()
self.gc = GC()
self.purge = Purge()
self.user = User()
self.project = Project()
self.retention = Retention()
self.preheat = Preheat()
self.replication = Replication()
self.registry = Registry()
self.scan_all = ScanAll()
self.schedule = Schedule()
self.job_types = [ "GARBAGE_COLLECTION", "PURGE_AUDIT", "P2P_PREHEAT", "IMAGE_SCAN", "REPLICATION", "RETENTION", "SCAN_DATA_EXPORT", "SCHEDULER", "SLACK", "SYSTEM_ARTIFACT_CLEANUP", "WEBHOOK"]
self.cron_type = "Custom"
self.cron = "0 0 0 * * 0"
def testJobQueues(self):
"""
Test case:
Job Service Dashboard Job Queues
Test step and expected result:
1. List job queue;
2. Pause GC Job and purge audit Job;
3. Verify that the Job status is Paused;
4. Run GC and purge audit;
5. Verify pending jobs of job queues;
6. Resume GC Job and purge audit Job;
7. Verify pending jobs of job queues;
8. Pause GC Job and purge audit Job;
9. Verify that the Job status is Paused;
10. Run GC and purge audit;
11. Verify pending jobs of job queues;
12. Stop GC Job and purge audit Job;
13. Verify pending jobs of job queues;
14. Run GC and purge audit;
15. Verify pending jobs of job queues;
16. Stop all Job;
17. Verify pending jobs of job queues;
18. Resume all Job;
"""
# 1. List job queue
job_queues = self.jobservice.get_job_queues()
self.assertSetEqual(set(self.job_types), set(job_queues.keys()))
# 2. Pause GC Job and purge audit Job
self.jobservice.action_pending_jobs(self.job_types[0], "pause")
self.jobservice.action_pending_jobs(self.job_types[1], "pause")
# 3. Verify that the Job status is Paused
job_queues = self.jobservice.get_job_queues()
self.assertTrue(job_queues[self.job_types[0]].paused)
self.assertTrue(job_queues[self.job_types[1]].paused)
# 4. Run GC and purge audit
self.gc.gc_now()
self.purge.create_purge_schedule(type="Manual", cron=None, dry_run=False)
time.sleep(2)
# 5. Verify pending jobs of job queues
self.verifyPendingJobs([self.job_types[0], self.job_types[1]])
# 6. Resume GC Job and purge audit Job
self.jobservice.action_pending_jobs(self.job_types[0], "resume")
self.jobservice.action_pending_jobs(self.job_types[1], "resume")
# 7. Verify pending jobs of job queues
self.waitJobQueuesStopToComplete([self.job_types[0], self.job_types[1]])
# 8. Pause GC Job and purge audit Job;
self.jobservice.action_pending_jobs(self.job_types[0], "pause")
self.jobservice.action_pending_jobs(self.job_types[1], "pause")
# 9. Verify that the Job status is Paused
job_queues = self.jobservice.get_job_queues()
self.assertTrue(job_queues[self.job_types[0]].paused)
self.assertTrue(job_queues[self.job_types[1]].paused)
# 10. Run GC and purge audit
self.gc.gc_now()
self.purge.create_purge_schedule(type="Manual", cron=None, dry_run=False)
time.sleep(2)
# 11. Verify pending jobs of job queues
self.verifyPendingJobs([self.job_types[0], self.job_types[1]])
# 12. Stop GC Job and purge audit Job
self.jobservice.action_pending_jobs(self.job_types[0], "stop")
self.jobservice.action_pending_jobs(self.job_types[1], "stop")
# 13. Verify pending jobs of job queues
self.waitJobQueuesStopToComplete([self.job_types[0], self.job_types[1]])
# 14. Run GC and purge audit
self.gc.gc_now()
self.purge.create_purge_schedule(type="Manual", cron=None, dry_run=False)
time.sleep(2)
# 15. Verify pending jobs of job queues
self.verifyPendingJobs([self.job_types[0], self.job_types[1]])
# 16. Stop all Job
self.jobservice.action_pending_jobs("all", "stop")
# 17. Verify pending jobs of job queues
self.waitJobQueuesStopToComplete([self.job_types[0], self.job_types[1]])
# 18. Resume all Job
self.jobservice.action_pending_jobs("all", "resume")
job_queues = self.jobservice.get_job_queues()
self.assertFalse(job_queues[self.job_types[0]].paused)
self.assertFalse(job_queues[self.job_types[1]].paused)
def verifyPendingJobs(self, job_types):
job_queues = self.jobservice.get_job_queues()
for job_type in job_types:
self.assertTrue(job_queues[job_type].count > 0)
self.assertTrue(job_queues[job_type].latency > 0)
self.assertTrue(job_queues[job_type].count > 0)
self.assertTrue(job_queues[job_type].latency > 0)
def waitJobQueuesStopToComplete(self, job_types):
is_success = False
for i in range(10):
print("Wait for queues to be consumed:", i)
job_queues = self.jobservice.get_job_queues()
for job_type in job_types:
if job_queues[job_type].count is not None or job_queues[job_type].latency is not None:
is_success = False
break
else:
is_success = True
break
time.sleep(2)
self.assertTrue(is_success)
def testSchedules(self):
"""
Test case:
Job Service Dashboard Schedules
Test step and expected result:
1. Create a new project;
2. Create a retention policy triggered by schedule;
3. Create a new distribution;
4. Create a preheat policy triggered by schedule;
5. Create a new registry;
6. Create a replication policy triggered by schedule;
7. Set up a schedule to scan all;
8. Set up a schedule to GC;
9. Set up a schedule to log rotation;
10. Verify schedules;
11. Pause all schedules;
12. Verify schedules is Paused;
13. Resume all schedules;
14. Verify schedules is not Paused;
15. Reset the schedule for scan all, GC, and log rotation;
16. Verify schedules;
"""
# 1. Create a new project(PA) by user(UA)
project_id, project_name = self.project.create_project(metadata = {"public": "false"})
# 2. Create a retention policy
retention_id = self.retention.create_retention_policy(project_id, selector_repository="**", selector_tag="**")
self.retention.update_retention_policy(retention_id, project_id, cron=self.cron)
# 3. Create a new distribution
_, distribution_name = self.preheat.create_instance(endpoint_url=base._random_name("https://"))
# 4. Create a new preheat policy
distribution = self.preheat.get_instance(distribution_name)
_, preheat_policy_name = self.preheat.create_policy(project_name, project_id, distribution.id, trigger=r'{"type":"scheduled","trigger_setting":{"cron":"%s"}}' % (self.cron))
preheat_policy = self.preheat.get_policy(project_name, preheat_policy_name)
# 5. Create a new registry
registry_id, _ = self.registry.create_registry("https://" + harbor_server)
# 6. Create a replication policy triggered by schedule
replication_id, _ = self.replication.create_replication_policy(dest_registry=v2_swagger_client.Registry(id=int(registry_id)), trigger=v2_swagger_client.ReplicationTrigger(type="scheduled",trigger_settings=v2_swagger_client.ReplicationTriggerSettings(cron=self.cron)))
# 7. Set up a schedule to scan all
self.scan_all.create_scan_all_schedule(self.cron_type, cron=self.cron)
# 8. Set up a schedule to GC
self.gc.create_gc_schedule(self.cron_type, is_delete_untagged=True, cron=self.cron)
# 9. Set up a schedule to Log Rotation
self.purge.create_purge_schedule(self.cron_type, self.cron, True)
# 10. Verify schedules
schedules = self.schedule.list_schedules(page_size=50, page=1)
# 10.1 Verify retention schedule
retention_schedule = schedules["%s-%d" % (self.job_types[5], retention_id)]
self.assertEqual(retention_schedule.cron, self.cron)
# 10.2 Verify preheat schedule
preheat_schedule = schedules["%s-%d" % (self.job_types[2], preheat_policy.id)]
self.assertEqual(preheat_schedule.cron, self.cron)
# 10.3 Verify replication schedule
replication_schedule = schedules["%s-%d" % (self.job_types[4], replication_id)]
self.assertEqual(replication_schedule.vendor_type, self.job_types[4])
self.assertEqual(replication_schedule.cron, self.cron)
# 10.4 Verify scan all schedule
scan_all_schedule = schedules["SCAN_ALL"]
self.assertEqual(scan_all_schedule.cron, self.cron)
# 10.5 Verify GC schedule
gc_schedule = schedules[self.job_types[0]]
self.assertEqual(gc_schedule.cron, self.cron)
# 10.6 Verify log rotation
log_rotation_schedule = schedules["PURGE_AUDIT_LOG"]
self.assertEqual(log_rotation_schedule.cron, self.cron)
# 11. Pause all schedules
self.jobservice.action_pending_jobs("scheduler", "pause")
# 12. Verify schedules is Paused;
self.assertTrue(self.schedule.get_schedule_paused("all").paused)
# 13. Resume all schedules
self.jobservice.action_pending_jobs("scheduler", "resume")
# 14. Verify schedules is not Paused
self.assertFalse(self.schedule.get_schedule_paused("all").paused)
# 15. Reset the schedule for scan all, GC, and log rotation
self.scan_all.update_scan_all_schedule("None", cron="")
self.gc.update_gc_schedule("None", cron="")
self.purge.update_purge_schedule("None", "")
# 16. Verify schedules
schedules = self.schedule.list_schedules(page_size=50, page=1)
# 16.1 Verify retention schedule
retention_schedule = schedules["%s-%d" % (self.job_types[5], retention_id)]
self.assertEqual(retention_schedule.cron, self.cron)
# 16.2 Verify preheat schedule
preheat_schedule = schedules["%s-%d" % (self.job_types[2], preheat_policy.id)]
self.assertEqual(preheat_schedule.cron, self.cron)
# 16.3 Verify replication schedule
replication_schedule = schedules["%s-%d" % (self.job_types[4], replication_id)]
self.assertEqual(replication_schedule.vendor_type, self.job_types[4])
self.assertEqual(replication_schedule.cron, self.cron)
# 16.4 Verify scan all schedule
self.assertNotIn("SCAN_ALL", schedules)
# 16.5 Verify GC schedule
self.assertNotIn(self.job_types[0], schedules)
# 16.6 Verify log rotation
self.assertNotIn("PURGE_AUDIT_LOG", schedules)
def testWorkers(self):
"""
Test case:
Job Service Dashboard Workers
Test step and expected result:
1. Get worker pools;
2. Get workers in current pool;
3. Stop running job;
"""
# 1. Get worker pools
worker_pools = self.jobservice.get_worker_pools()
for worker_pool in worker_pools:
self.assertIsNotNone(worker_pool.pid)
self.assertIsNotNone(worker_pool.worker_pool_id)
self.assertIsNotNone(worker_pool.concurrency)
self.assertIsNotNone(worker_pool.start_at)
self.assertIsNotNone(worker_pool.heartbeat_at)
# 2. Get workers in current pool
workers = self.jobservice.get_workers(worker_pool.worker_pool_id)
self.assertEqual(len(workers), worker_pool.concurrency)
for worker in workers:
self.assertIsNotNone(worker.id)
self.assertEqual(worker_pool.worker_pool_id, worker.pool_id)
# 3. Stop running job
self.jobservice.stop_running_job("966a49aa2278b67d743f8aca")
def testJobServiceDashboardAPIPermission(self):
"""
Test case:
Log Rotaion Permission API
Test step and expected result:
1. Create a new user(UA);
2. User(UA) should not have permission to list job queue API;
3. User(UA) should not have permission to action_pending_jobs API;
4. User(UA) should not have permission to stop running job API;
5. User(UA) should not have permission to get worker pools API;
6. User(UA) should not have permission to get workers in current pool API;
7. User(UA) should not have permission to list schedules API;
8. User(UA) should have permission to get scheduler paused status API;
9. Verify that the get scheduler paused status API parameter job_type only support all;
"""
expect_status_code = 403
expect_response_body = "FORBIDDEN"
# 1. Create a new user(UA)
user_password = "Aa123456"
_, user_name = self.user.create_user(user_password = user_password)
USER_CLIENT = dict(endpoint = ADMIN_CLIENT["endpoint"], username = user_name, password = user_password)
# 2. User(UA) should not have permission to list job queue API
self.jobservice.get_job_queues(expect_status_code=expect_status_code, expect_response_body=expect_response_body, **USER_CLIENT)
# 3. User(UA) should not have permission to action_pending_jobs API
self.jobservice.action_pending_jobs(self.job_types[0], "pause", expect_status_code=expect_status_code, expect_response_body=expect_response_body, **USER_CLIENT)
# 4. User(UA) should not have permission to stop running job API
self.jobservice.stop_running_job("966a49aa2278b67d743f8aca", expect_status_code=expect_status_code, expect_response_body=expect_response_body, **USER_CLIENT)
# 5. User(UA) should not have permission to get worker pools API
self.jobservice.get_worker_pools(expect_status_code=expect_status_code, expect_response_body=expect_response_body, **USER_CLIENT)
# 6. User(UA) should not have permission to get workers in current pool API
self.jobservice.get_workers("13e0cfe999715102c47614ec", expect_status_code=expect_status_code, expect_response_body=expect_response_body, **USER_CLIENT)
# 7. User(UA) should not have permission to list schedules API
self.schedule.list_schedules(expect_status_code=expect_status_code, expect_response_body=expect_response_body, **USER_CLIENT)
# 8. User(UA) should have permission to get scheduler paused status API
self.schedule.get_schedule_paused("all")
# 9. Verify that the get scheduler paused status API parameter job_type only support all
self.schedule.get_schedule_paused(self.job_types[0], expect_status_code=400, expect_response_body="job_type can only be 'all'")
if __name__ == '__main__':
unittest.main()

View File

@ -182,3 +182,7 @@ Test Case - Log Forward
Test Case - Scan Data Export Test Case - Scan Data Export
[Tags] scan_data_export [Tags] scan_data_export
Harbor API Test ./tests/apitests/python/test_scan_data_export.py Harbor API Test ./tests/apitests/python/test_scan_data_export.py
Test Case - Job Service Dashboard
[Tags] job_service_dashboard
Harbor API Test ./tests/apitests/python/test_job_service_dashboard.py