Add audit log forword API test case (#17589)

Signed-off-by: Yang Jiao <jiaoya@vmware.com>
This commit is contained in:
Yang Jiao 2022-09-27 10:45:08 +08:00 committed by GitHub
parent 83bc24f6c1
commit b8b2c2a4ac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 171 additions and 4 deletions

View File

@ -0,0 +1,24 @@
# -*- coding: utf-8 -*-
import base
from v2_swagger_client.rest import ApiException
class Audit_Log(base.Base):
def __init__(self):
super(Audit_Log, self).__init__(api_type="audit_log")
def get_latest_audit_log(self):
return self.list_audit_logs(sort="-creation_time", page_size=1, page=1)[0]
def list_audit_logs(self, sort, page_size, page, expect_status_code=200, expect_response_body=None, **kwargs):
try:
return_data, status_code, _ = self._get_client(**kwargs).list_audit_logs_with_http_info(sort=sort, page_size=page_size, page=page)
except ApiException as e:
base._assert_status_code(expect_status_code, e.status)
if expect_response_body is not None:
base._assert_status_body(expect_response_body, e.body)
return
base._assert_status_code(expect_status_code, status_code)
return return_data

View File

@ -31,7 +31,7 @@ def _create_client(server, credential, debug, api_type="products"):
cfg = None cfg = None
if api_type in ('projectv2', 'artifact', 'repository', 'scanner', 'scan', 'scanall', 'preheat', 'quota', if api_type in ('projectv2', 'artifact', 'repository', 'scanner', 'scan', 'scanall', 'preheat', 'quota',
'replication', 'registry', 'robot', 'gc', 'retention', 'immutable', 'system_cve_allowlist', 'replication', 'registry', 'robot', 'gc', 'retention', 'immutable', 'system_cve_allowlist',
'configure', 'user', 'member', 'health', 'label', 'webhook', 'purge'): 'configure', 'user', 'member', 'health', 'label', 'webhook', 'purge', 'audit_log'):
cfg = v2_swagger_client.Configuration() cfg = v2_swagger_client.Configuration()
else: else:
cfg = swagger_client.Configuration() cfg = swagger_client.Configuration()
@ -77,7 +77,8 @@ def _create_client(server, credential, debug, api_type="products"):
"member": v2_swagger_client.MemberApi(v2_swagger_client.ApiClient(cfg)), "member": v2_swagger_client.MemberApi(v2_swagger_client.ApiClient(cfg)),
"health": v2_swagger_client.HealthApi(v2_swagger_client.ApiClient(cfg)), "health": v2_swagger_client.HealthApi(v2_swagger_client.ApiClient(cfg)),
"webhook": v2_swagger_client.WebhookApi(v2_swagger_client.ApiClient(cfg)), "webhook": v2_swagger_client.WebhookApi(v2_swagger_client.ApiClient(cfg)),
"purge": v2_swagger_client.PurgeApi(v2_swagger_client.ApiClient(cfg)) "purge": v2_swagger_client.PurgeApi(v2_swagger_client.ApiClient(cfg)),
"audit_log": v2_swagger_client.AuditlogApi(v2_swagger_client.ApiClient(cfg))
}.get(api_type,'Error: Wrong API type') }.get(api_type,'Error: Wrong API type')
def _assert_status_code(expect_code, return_code, err_msg = r"HTTPS status code s not as we expected. Expected {}, while actual HTTPS status code is {}."): def _assert_status_code(expect_code, return_code, err_msg = r"HTTPS status code s not as we expected. Expected {}, while actual HTTPS status code is {}."):

View File

@ -25,6 +25,10 @@ def set_configurations(client, expect_status_code = 200, expect_response_body =
conf["ldap_group_search_scope"] = config.get("ldap_group_search_scope") conf["ldap_group_search_scope"] = config.get("ldap_group_search_scope")
if "ldap_group_admin_dn" in config and config.get("ldap_group_admin_dn") is not None: if "ldap_group_admin_dn" in config and config.get("ldap_group_admin_dn") is not None:
conf["ldap_group_admin_dn"] = config.get("ldap_group_admin_dn") conf["ldap_group_admin_dn"] = config.get("ldap_group_admin_dn")
if "audit_log_forward_endpoint" in config and config.get("audit_log_forward_endpoint") is not None:
conf["audit_log_forward_endpoint"] = config.get("audit_log_forward_endpoint")
if "skip_audit_log_database" in config and config.get("skip_audit_log_database") is not None:
conf["skip_audit_log_database"] = config.get("skip_audit_log_database")
try: try:
_, status_code, _ = client.update_configurations_with_http_info(conf) _, status_code, _ = client.update_configurations_with_http_info(conf)
@ -81,3 +85,7 @@ class Configurations(base.Base, object):
ldap_group_base_dn=ldap_group_base_dn, ldap_group_search_filter=ldap_group_search_filter, ldap_group_admin_dn=ldap_group_admin_dn, ldap_group_search_scope=ldap_group_search_scope) ldap_group_base_dn=ldap_group_base_dn, ldap_group_search_filter=ldap_group_search_filter, ldap_group_admin_dn=ldap_group_admin_dn, ldap_group_search_scope=ldap_group_search_scope)
set_configurations(client, expect_status_code = expect_status_code, **config) set_configurations(client, expect_status_code = expect_status_code, **config)
def set_configurations_of_audit_log_forword(self, audit_log_forward_endpoint=None, skip_audit_log_database=None, expect_status_code = 200, **kwargs):
client = self._get_client(**kwargs)
config=dict(audit_log_forward_endpoint=audit_log_forward_endpoint, skip_audit_log_database=skip_audit_log_database)
set_configurations(client, expect_status_code = expect_status_code, **config)

View File

@ -0,0 +1,129 @@
from __future__ import absolute_import
import unittest
from testutils import ADMIN_CLIENT, LOG_PATH, harbor_server, suppress_urllib3_warning
from library.audit_log import Audit_Log
from library.user import User
from library.project import Project
from library.artifact import Artifact
from library.configurations import Configurations
from library.repository import Repository, push_self_build_image_to_project
class TestAuditLogForword(unittest.TestCase, object):
@suppress_urllib3_warning
def setUp(self):
self.project= Project()
self.user= User()
self.artifact = Artifact()
self.repo = Repository()
self.config = Configurations()
self.audit_log = Audit_Log()
self.image = "hello-world"
self.tag = "latest"
self.audit_log_path = LOG_PATH + "audit.log"
self.audit_log_forward_endpoint = "harbor-log:10514"
# 1. Reset audit log forword
self.config.set_configurations_of_audit_log_forword("", False)
def tearDown(self):
# 1. Reset audit log forword
self.config.set_configurations_of_audit_log_forword("", False)
# 2. Close audit log file
TestAuditLogForword.audit_log_file.close
def testAuditLogForword(self):
"""
Test case:
Audit Log Forword
Test step and expected result:
1. Create a new user(UA);
2. Create a new project(PA) by user(UA);
3. Verify that Skip Audit Log Database cannot be enabled without Audit Log Forward;
4. Enable Audit Log Forward;
5. Push a new image(IA) in project(PA) by user(UA);
6. Verify that the Audit Log should be in the log database;
7. Verify that the Audit Log should be in the audit.log;
8. Enable Skip Audit Log Database;
9. Delete image(IA);
10. Verify that the Audit Log should not be in log database;
11. Verify that the Audit Log should be in the audit.log;
12. Verify that Skip Audit Log Database cannot be enabled without Audit Log Forward;
Tear down:
1 Reset audit log forword.
"""
url = ADMIN_CLIENT["endpoint"]
user_password = "Aa123456"
# 1. Create user(UA)
user_id, user_name = self.user.create_user(user_password = user_password, **ADMIN_CLIENT)
user_client = dict(endpoint = url, username = user_name, password = user_password, with_accessory = True)
# 2.1. Create private project(PA) by user(UA)
project_id, project_name = self.project.create_project(metadata = {"public": "false"}, **user_client)
# 2.2. Get private project of user(UA), user(UA) can see only one private project which is project(PA)
self.project.projects_should_exist(dict(public=False), expected_count = 1, expected_project_id = project_id, **user_client)
# 3 Verify that Skip Audit Log Database cannot be enabled without Audit Log Forward
self.config.set_configurations_of_audit_log_forword(skip_audit_log_database=True, expect_status_code=400)
# 4 Enable Audit Log Forward
self.config.set_configurations_of_audit_log_forword(audit_log_forward_endpoint=self.audit_log_forward_endpoint, expect_status_code=200)
# 4.1 Verify configuration
configurations = self.config.get_configurations()
self.assertEqual(configurations.audit_log_forward_endpoint.value, self.audit_log_forward_endpoint)
self.assertFalse(configurations.skip_audit_log_database.value)
# 5 Push a new image(IA) in project(PA) by user(UA)
repo_name, tag = push_self_build_image_to_project(project_name, harbor_server, user_name, user_password, self.image, self.tag)
# 6. Verify that the Audit Log should be in the log database
first_audit_log = self.audit_log.get_latest_audit_log()
self.assertEqual(first_audit_log.operation, "create")
self.assertEqual(first_audit_log.resource, "{}:{}".format(repo_name, tag))
self.assertEqual(first_audit_log.resource_type, "artifact")
self.assertEqual(first_audit_log.username, user_name)
self.assertIsNotNone(first_audit_log.op_time)
# 7. Verify that the Audit Log should be in the audit.log
TestAuditLogForword.audit_log_file = open(self.audit_log_path, "r")
latest_line = TestAuditLogForword.audit_log_file.readlines()[-1]
self.assertIn('operator="{}"'.format(user_name), latest_line)
self.assertIn('resourceType="artifact"', latest_line)
self.assertIn('action:create', latest_line)
self.assertIn('resource:{}:{}'.format(repo_name, tag), latest_line)
self.assertIn('time="20', latest_line)
# 8.1 Enable Skip Audit Log Database
self.config.set_configurations_of_audit_log_forword(skip_audit_log_database=True)
# 8.1 Verify configuration
configurations = self.config.get_configurations()
self.assertEqual(configurations.audit_log_forward_endpoint.value, self.audit_log_forward_endpoint)
self.assertTrue(configurations.skip_audit_log_database.value)
# 9. Delete image(IA)
self.artifact.delete_artifact(project_name, self.image, self.tag, **user_client)
# 10. Verify that the Audit Log should not be in log database
second_audit_log = self.audit_log.get_latest_audit_log()
self.assertEqual(first_audit_log.operation, second_audit_log.operation)
self.assertEqual(first_audit_log.resource, second_audit_log.resource)
self.assertEqual(first_audit_log.resource_type,second_audit_log.resource_type)
self.assertEqual(first_audit_log.username, second_audit_log.username)
self.assertEqual(first_audit_log.op_time, second_audit_log.op_time)
# 11. Verify that the Audit Log should be in the audit.log
latest_line = TestAuditLogForword.audit_log_file.readlines()[-1]
self.assertIn('operator="{}"'.format(user_name), latest_line)
self.assertIn('resourceType="artifact"', latest_line)
self.assertIn('action:delete', latest_line)
self.assertIn('resource:{}'.format(repo_name), latest_line)
self.assertIn('time="20', latest_line)
# 12 Verify that Skip Audit Log Database cannot be enabled without Audit Log Forward
self.config.set_configurations_of_audit_log_forword(audit_log_forward_endpoint="", expect_status_code=400)
if __name__ == '__main__':
unittest.main()

View File

@ -32,6 +32,7 @@ notary_url = os.environ.get('NOTARY_URL', 'https://'+harbor_server+':4443')
DOCKER_USER = os.environ.get('DOCKER_USER', '') DOCKER_USER = os.environ.get('DOCKER_USER', '')
DOCKER_PWD = os.environ.get('DOCKER_PWD', '') DOCKER_PWD = os.environ.get('DOCKER_PWD', '')
METRIC_URL = os.environ.get('METRIC_URL', 'http://'+harbor_server+':9090') METRIC_URL = os.environ.get('METRIC_URL', 'http://'+harbor_server+':9090')
LOG_PATH = os.environ.get('LOG_PATH', '/var/log/harbor/')
BASE_IMAGE = dict(name='busybox', tag='latest') BASE_IMAGE = dict(name='busybox', tag='latest')
BASE_IMAGE_ABS_PATH_NAME = '/' + BASE_IMAGE['name'] + '.tar' BASE_IMAGE_ABS_PATH_NAME = '/' + BASE_IMAGE['name'] + '.tar'

View File

@ -21,7 +21,7 @@ set +e
docker ps docker ps
# run db auth api cases # run db auth api cases
if [ "$1" = 'DB' ]; then if [ "$1" = 'DB' ]; then
docker run -i --privileged -v $DIR/../../:/drone -v $DIR/../:/ca -w /drone $E2E_IMAGE robot --exclude proxy_cache -v DOCKER_USER:${DOCKER_USER} -v DOCKER_PWD:${DOCKER_PWD} -v ip:$2 -v ip1: -v http_get_ca:false -v HARBOR_PASSWORD:Harbor12345 /drone/tests/robot-cases/Group1-Nightly/Setup.robot /drone/tests/robot-cases/Group0-BAT/API_DB.robot docker run -i --privileged -v $DIR/../../:/drone -v $DIR/../:/ca -v /var/log/harbor/:/var/log/harbor/ -w /drone $E2E_IMAGE robot --exclude proxy_cache -v DOCKER_USER:${DOCKER_USER} -v DOCKER_PWD:${DOCKER_PWD} -v ip:$2 -v ip1: -v http_get_ca:false -v HARBOR_PASSWORD:Harbor12345 /drone/tests/robot-cases/Group1-Nightly/Setup.robot /drone/tests/robot-cases/Group0-BAT/API_DB.robot
elif [ "$1" = 'PROXY_CACHE' ]; then elif [ "$1" = 'PROXY_CACHE' ]; then
docker run -i --privileged -v $DIR/../../:/drone -v $DIR/../:/ca -w /drone $E2E_IMAGE robot --include setup --include proxy_cache -v DOCKER_USER:${DOCKER_USER} -v DOCKER_PWD:${DOCKER_PWD} -v ip:$2 -v ip1: -v http_get_ca:false -v HARBOR_PASSWORD:Harbor12345 /drone/tests/robot-cases/Group1-Nightly/Setup.robot /drone/tests/robot-cases/Group0-BAT/API_DB.robot docker run -i --privileged -v $DIR/../../:/drone -v $DIR/../:/ca -w /drone $E2E_IMAGE robot --include setup --include proxy_cache -v DOCKER_USER:${DOCKER_USER} -v DOCKER_PWD:${DOCKER_PWD} -v ip:$2 -v ip1: -v http_get_ca:false -v HARBOR_PASSWORD:Harbor12345 /drone/tests/robot-cases/Group1-Nightly/Setup.robot /drone/tests/robot-cases/Group0-BAT/API_DB.robot
elif [ "$1" = 'LDAP' ]; then elif [ "$1" = 'LDAP' ]; then

View File

@ -184,3 +184,7 @@ Test Case - Cosign Sign Artifact
Test Case - Log Rotation Test Case - Log Rotation
[Tags] log_rotation [Tags] log_rotation
Harbor API Test ./tests/apitests/python/test_log_rotation.py Harbor API Test ./tests/apitests/python/test_log_rotation.py
Test Case - Log Forward
[Tags] log_forward
Harbor API Test ./tests/apitests/python/test_audit_log_forward.py