Add purge audit log API test cases (#17175)

Added test cases for the following APIs:
1. PUT /system/purgeaudit/{purge_id}  Stop the specific purge audit log execution
2. GET /system/purgeaudit/{purge_id}  Get purge job status
3. GET /system/purgeaudit/{purge_id}/log  Get purge job log
4. PUT /system/purgeaudit/schedule   Update purge job's schedule
5. POST /system/purgeaudit/schedule  Create a purge job schedule
6. GET /system/purgeaudit/schedule  Get purge's schedule
7. GET /system/purgeaudit  Get purge job results.

Signed-off-by: Yang Jiao <jiaoya@vmware.com>
This commit is contained in:
Yang Jiao 2022-08-01 16:52:57 +08:00 committed by GitHub
parent bf741ad381
commit 1dd2b0bc7c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 248 additions and 3 deletions

View File

@ -31,7 +31,7 @@ def _create_client(server, credential, debug, api_type="products"):
cfg = None
if api_type in ('projectv2', 'artifact', 'repository', 'scanner', 'scan', 'scanall', 'preheat', 'quota',
'replication', 'registry', 'robot', 'gc', 'retention', 'immutable', 'system_cve_allowlist',
'configure', 'user', 'member', 'health', 'label', 'webhook'):
'configure', 'user', 'member', 'health', 'label', 'webhook', 'purge'):
cfg = v2_swagger_client.Configuration()
else:
cfg = swagger_client.Configuration()
@ -76,7 +76,8 @@ def _create_client(server, credential, debug, api_type="products"):
"user": v2_swagger_client.UserApi(v2_swagger_client.ApiClient(cfg)),
"member": v2_swagger_client.MemberApi(v2_swagger_client.ApiClient(cfg)),
"health": v2_swagger_client.HealthApi(v2_swagger_client.ApiClient(cfg)),
"webhook": v2_swagger_client.WebhookApi(v2_swagger_client.ApiClient(cfg))
"webhook": v2_swagger_client.WebhookApi(v2_swagger_client.ApiClient(cfg)),
"purge": v2_swagger_client.PurgeApi(v2_swagger_client.ApiClient(cfg))
}.get(api_type,'Error: Wrong API type')
def _assert_status_code(expect_code, return_code, err_msg = r"HTTPS status code s not as we expected. Expected {}, while actual HTTPS status code is {}."):

View File

@ -0,0 +1,105 @@
# -*- coding: utf-8 -*-
import base
import v2_swagger_client
from v2_swagger_client.rest import ApiException
class Purge(base.Base):
def __init__(self):
super(Purge, self).__init__(api_type="purge")
def create_purge_schedule(self, type, cron, dry_run, audit_retention_hour=24, include_operations="create,delete,pull", expect_status_code=201, expect_response_body=None, **kwargs):
scheduleObj = v2_swagger_client.ScheduleObj(type=type)
if cron is not None:
scheduleObj.cron = cron
parameters = {
"audit_retention_hour": audit_retention_hour,
"include_operations": include_operations,
"dry_run": dry_run
}
schedule = v2_swagger_client.Schedule(schedule=scheduleObj, parameters=parameters)
try:
_, status_code, _ = self._get_client(**kwargs).create_purge_schedule_with_http_info(schedule)
except ApiException as e:
base._assert_status_code(expect_status_code, e.status)
if expect_response_body is not None:
print(e.body)
base._assert_status_body(expect_response_body, e.body)
return
base._assert_status_code(expect_status_code, status_code)
def update_purge_schedule(self, type, cron, audit_retention_hour=24, include_operations="create,delete,pull", expect_status_code=200, expect_response_body=None, **kwargs):
scheduleObj = v2_swagger_client.ScheduleObj(type=type, cron=cron)
parameters = {
"audit_retention_hour": audit_retention_hour,
"include_operations": include_operations,
"dry_run": False
}
schedule = v2_swagger_client.Schedule(schedule=scheduleObj, parameters=parameters)
try:
_, status_code, _ = self._get_client(**kwargs).update_purge_schedule_with_http_info(schedule)
except ApiException as e:
base._assert_status_code(expect_status_code, e.status)
if expect_response_body is not None:
base._assert_status_body(expect_response_body, e.body)
return
base._assert_status_code(expect_status_code, status_code)
def stop_purge_execution(self, purge_id, expect_status_code=200, expect_response_body=None, **kwargs):
try:
return_data, status_code, h = self._get_client(**kwargs).stop_purge_with_http_info(purge_id)
except ApiException as e:
base._assert_status_code(expect_status_code, e.status)
if expect_response_body is not None:
base._assert_status_body(expect_response_body, e.body)
return
base._assert_status_code(expect_status_code, status_code)
def get_latest_purge_job(self, **kwargs):
return self.get_purge_jobs(sort="-creation_time", page_size=1, page=1)[0]
def get_purge_jobs(self, sort, page_size, page, expect_status_code=200, expect_response_body=None, **kwargs):
try:
return_data, status_code, _ = self._get_client(**kwargs).get_purge_history_with_http_info(sort=sort, page_size=page_size, page=page)
except ApiException as e:
base._assert_status_code(expect_status_code, e.status)
if expect_response_body is not None:
base._assert_status_body(expect_response_body, e.body)
return
base._assert_status_code(expect_status_code, status_code)
return return_data
def get_purge_job(self, purge_id, expect_status_code=200, expect_response_body=None, **kwargs):
try:
return_data, status_code, _ = self._get_client(**kwargs).get_purge_job_with_http_info(purge_id)
except ApiException as e:
base._assert_status_code(expect_status_code, e.status)
if expect_response_body is not None:
base._assert_status_body(expect_response_body, e.body)
return
base._assert_status_code(expect_status_code, status_code)
return return_data
def get_purge_job_log(self, purge_id, expect_status_code=200, expect_response_body=None, **kwargs):
try:
return_data, status_code, _ = self._get_client(**kwargs).get_purge_job_log_with_http_info(purge_id)
except ApiException as e:
base._assert_status_code(expect_status_code, e.status)
if expect_response_body is not None:
base._assert_status_body(expect_response_body, e.body)
return
base._assert_status_code(expect_status_code, status_code)
return return_data
def get_purge_schedule(self, expect_status_code=200, expect_response_body=None, **kwargs):
try:
return_data, status_code, _ = self._get_client(**kwargs).get_purge_schedule_with_http_info()
except ApiException as e:
base._assert_status_code(expect_status_code, e.status)
if expect_response_body is not None:
base._assert_status_body(expect_response_body, e.body)
return
base._assert_status_code(expect_status_code, status_code)
return return_data

View File

@ -0,0 +1,135 @@
from __future__ import absolute_import
import json
import time
import unittest
from testutils import ADMIN_CLIENT, suppress_urllib3_warning
from library.purge import Purge
from library.user import User
class TestLogRotation(unittest.TestCase, object):
@suppress_urllib3_warning
def setUp(self):
self.purge = Purge()
self.user = User()
def tearDown(self):
# 1. Reset schedule
self.purge.update_purge_schedule(type=None, cron="", audit_retention_hour=0)
def testLogRotation(self):
"""
Test case:
Log Rotaion API
Test step and expected result:
1. Create a purge audit log job;
2. Stop this purge audit log job;
3. Verify purge audit log job status is Stopped;
4. Create a purge audit log job;
5. Verify purge audit log job status is Success;
6. Verify the log of the purge audit log job;
7. Create purge audit log schedule;
8. Verify purge audit log schedule;
9. Update purge audit log schedule;
10. Verify purge audit log schedule.
Tear down:
1 Reset schedule.
"""
# 1. Create a purge audit log job
self.purge.create_purge_schedule(type="Manual", cron=None, dry_run=True)
# 2. Stop this purge audit log job
latest_job = self.purge.get_latest_purge_job()
self.purge.stop_purge_execution(latest_job.id)
# 3. Verify purge audit log job status is Stopped
job_status = self.purge.get_purge_job(latest_job.id).job_status
self.assertEqual(self.purge.get_purge_job(latest_job.id).job_status, "Stopped")
# 4. Create a purge audit log job
self.purge.create_purge_schedule(type="Manual", cron=None, dry_run=False, audit_retention_hour=1)
# 5. Verify purge audit log job status is Success
job_status = None
job_id = None
for i in range(10):
print("wait for the job to finish:", i)
if job_id == None:
latest_job = self.purge.get_latest_purge_job()
job_status = latest_job.job_status
job_id = latest_job.id
else:
job_status = self.purge.get_purge_job(job_id).job_status
if job_status == "Success":
break
time.sleep(2)
self.assertEqual(job_status, "Success")
# 6. Verify the log of the purge audit log job
job_logs = self.purge.get_purge_job_log(job_id)
self.assertIn("Purge audit job start", job_logs)
self.assertIn("rows of audit logs", job_logs)
# 7. Create a schedule
schedule_type = "Weekly"
schedule_cron = "0 0 0 * * 0"
audit_retention_hour = 24
include_operations = "create,delete,pull"
self.purge.create_purge_schedule(type=schedule_type, cron=schedule_cron, dry_run=False, audit_retention_hour=audit_retention_hour, include_operations=include_operations)
# 8. Verify schedule
self.verifySchedule(schedule_type, schedule_cron, audit_retention_hour, include_operations)
# 9. Update schedule
schedule_type = "Custom"
schedule_cron = "0 15 10 ? * *"
audit_retention_hour = 12
include_operations = "create,delete"
self.purge.update_purge_schedule(type=schedule_type, cron=schedule_cron, audit_retention_hour=audit_retention_hour, include_operations=include_operations)
# 10. Verify schedule
self.verifySchedule(schedule_type, schedule_cron, audit_retention_hour, include_operations)
def testLogRotationAPIPermission(self):
"""
Test case:
Log Rotaion Permission API
Test step and expected result:
1. Create a new user(UA);
2. User(UA) should not have permission to create purge schedule API;
3. Create a purge audit log job;
4. User(UA) should not have permission to stop purge execution API;
5. User(UA) should not have permission to get purge job API;
6. User(UA) should not have permission to get purge job log API;
7. User(UA) should not have permission to get purge jobs API;
8. User(UA) should not have permission to get purge schedule API;
9. User(UA) should not have permission to update purge schedule API;
"""
expect_status_code = 403
expect_response_body = "FORBIDDEN"
# 1. Create a new user(UA)
user_password = "Aa123456"
_, user_name = self.user.create_user(user_password = user_password)
USER_CLIENT = dict(endpoint = ADMIN_CLIENT["endpoint"], username = user_name, password = user_password)
# 2. User(UA) should not have permission to create purge schedule API
self.purge.create_purge_schedule(type="Manual", cron=None, dry_run=False, expect_status_code=expect_status_code, expect_response_body=expect_response_body, **USER_CLIENT)
# 3. Create a purge audit log job
self.purge.create_purge_schedule(type="Manual", cron=None, dry_run=False)
latest_job = self.purge.get_latest_purge_job()
# 4. User(UA) should not have permission to stop purge execution API
self.purge.stop_purge_execution(latest_job.id, expect_status_code=expect_status_code, expect_response_body=expect_response_body, **USER_CLIENT)
# 5. User(UA) should not have permission to get purge job API
self.purge.get_purge_job(latest_job.id, expect_status_code=expect_status_code, expect_response_body=expect_response_body, **USER_CLIENT)
# 6. User(UA) should not have permission to get purge job log API
self.purge.get_purge_job_log(latest_job.id, expect_status_code=expect_status_code, expect_response_body=expect_response_body, **USER_CLIENT)
# 7. User(UA) should not have permission to get purge jobs API
self.purge.get_purge_jobs("creation_time", 10, 1, expect_status_code=expect_status_code, expect_response_body=expect_response_body, **USER_CLIENT)
# 8. User(UA) should not have permission to get purge schedule API
self.purge.get_purge_schedule(expect_status_code=expect_status_code, expect_response_body=expect_response_body, **USER_CLIENT)
# 9. User(UA) should not have permission to update purge schedule API
self.purge.update_purge_schedule(type="Custom", cron="0 15 10 ? * *", expect_status_code=expect_status_code, expect_response_body=expect_response_body, **USER_CLIENT)
def verifySchedule(self, schedule_type, schedule_cron, audit_retention_hour, include_operations):
purge_schedule = self.purge.get_purge_schedule()
job_parameters = json.loads(purge_schedule.job_parameters)
self.assertEqual(purge_schedule.schedule.type, schedule_type)
self.assertEqual(purge_schedule.schedule.cron, schedule_cron)
self.assertEqual(job_parameters["audit_retention_hour"], audit_retention_hour)
self.assertEqual(job_parameters["include_operations"], include_operations)
if __name__ == '__main__':
unittest.main()

View File

@ -179,4 +179,8 @@ Test Case - Webhook CRUD
Test Case - Cosign Sign Artifact
[Tags] cosign
Harbor API Test ./tests/apitests/python/test_cosign_sign_artifact.py
Harbor API Test ./tests/apitests/python/test_cosign_sign_artifact.py
Test Case - Log Rotation
[Tags] log_rotation
Harbor API Test ./tests/apitests/python/test_log_rotation.py