mirror of
https://github.com/goharbor/harbor.git
synced 2025-02-16 20:01:35 +01:00
Updating Permission Test Scripts (#19687)
Support for test cases that run multiple resources in a single run Signed-off-by: Yang Jiao <jiaoya@vmware.com>
This commit is contained in:
parent
8859f69668
commit
52d2d5c303
@ -1,54 +1,47 @@
|
||||
import copy
|
||||
import json
|
||||
import time
|
||||
import random
|
||||
import requests
|
||||
import urllib3
|
||||
import os
|
||||
|
||||
admin_name = os.environ.get("ADMIN_NAME")
|
||||
admin_user_name = os.environ.get("ADMIN_USER_NAME")
|
||||
admin_password = os.environ.get("ADMIN_PASSWORD")
|
||||
user_name = os.environ.get("USER_NAME")
|
||||
password = os.environ.get("PASSWORD")
|
||||
harbor_base_url = os.environ.get("HARBOR_BASE_URL")
|
||||
resource = os.environ.get("RESOURCE")
|
||||
resources = os.environ.get("RESOURCES")
|
||||
project_id = os.environ.get("PROJECT_ID")
|
||||
project_name = os.environ.get("PROJECT_NAME")
|
||||
# the source artifact should belong to the provided project, e.g. "nginx"
|
||||
source_artifact_name = os.environ.get("SOURCE_ARTIFACT_NAME")
|
||||
# the source artifact tag should belong to the provided project, e.g. "latest"
|
||||
source_artifact_tag = os.environ.get("SOURCE_ARTIFACT_TAG")
|
||||
id_or_name = None
|
||||
|
||||
ID_PLACEHOLDER = "(id)"
|
||||
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
||||
|
||||
|
||||
class Permission:
|
||||
|
||||
def __init__(self, url, method, expect_status_code, payload=None, need_id_or_name=False, res_id_field=None,
|
||||
payload_id_field=None):
|
||||
def __init__(self, url, method, expect_status_code, payload=None, res_id_field=None, payload_id_field=None, id_from_header=False):
|
||||
self.url = url
|
||||
self.method = method
|
||||
self.expect_status_code = expect_status_code
|
||||
self.payload = payload
|
||||
self.res_id_field = res_id_field
|
||||
self.need_id_or_name = need_id_or_name
|
||||
self.payload_id_field = payload_id_field if payload_id_field else res_id_field
|
||||
self.id_from_header = id_from_header
|
||||
|
||||
|
||||
def call(self):
|
||||
global id_or_name
|
||||
url = self.url
|
||||
if self.need_id_or_name:
|
||||
url = self.url.format(id_or_name)
|
||||
response = requests.request(self.method, url, data=json.dumps(self.payload), verify=False,
|
||||
auth=(user_name, password), headers={
|
||||
"Content-Type": "application/json"
|
||||
})
|
||||
print("response: {}".format(response.text))
|
||||
assert response.status_code == self.expect_status_code, ("Failed to call the {} {}, expected status code is {"
|
||||
"}, but got {}, error msg is {}").format(
|
||||
self.method, self.url, self.expect_status_code, response.status_code, response.text)
|
||||
if self.res_id_field and self.payload_id_field and len(json.loads(response.text)) > 0:
|
||||
id_or_name = json.loads(response.text)[0][self.res_id_field]
|
||||
if ID_PLACEHOLDER in self.url:
|
||||
self.url = self.url.replace(ID_PLACEHOLDER, str(self.payload.get(self.payload_id_field)))
|
||||
response = requests.request(self.method, self.url, data=json.dumps(self.payload), verify=False, auth=(user_name, password), headers={"Content-Type": "application/json"})
|
||||
assert response.status_code == self.expect_status_code, "Failed to call the {} {}, expected status code is {}, but got {}, error msg is {}".format(self.method, self.url, self.expect_status_code, response.status_code, response.text)
|
||||
if self.res_id_field and self.payload_id_field and self.id_from_header == False:
|
||||
self.payload[self.payload_id_field] = int(json.loads(response.text)[self.res_id_field])
|
||||
elif self.res_id_field and self.payload_id_field and self.id_from_header == True:
|
||||
self.payload[self.payload_id_field] = int(response.headers["Location"].split("/")[-1])
|
||||
return response
|
||||
|
||||
|
||||
# Project permissions:
|
||||
@ -56,164 +49,101 @@ class Permission:
|
||||
label_payload = {
|
||||
"color": "#FFFFFF",
|
||||
"description": "Just for testing",
|
||||
"name": "label-name-{}".format(int(round(time.time() * 1000))),
|
||||
"name": "label-name-{}".format(int(random.randint(1000, 9999))),
|
||||
"project_id": int(project_id),
|
||||
"scope": "p",
|
||||
"id": None
|
||||
"scope": "p"
|
||||
}
|
||||
create_label = Permission("{}/labels".format(harbor_base_url), "POST", 201, label_payload)
|
||||
list_label = Permission("{}/labels?scope=p&project_id={}".format(harbor_base_url, project_id), "GET", 200,
|
||||
label_payload, False, "id", "id")
|
||||
read_label = Permission("{}/labels/{}".format(harbor_base_url, "{}"), "GET", 200, label_payload, True)
|
||||
label_payload_for_update = copy.deepcopy(label_payload)
|
||||
label_payload_for_update["description"] = "For update"
|
||||
update_label = Permission("{}/labels/{}".format(harbor_base_url, "{}"), "PUT", 200, label_payload_for_update, True)
|
||||
delete_label = Permission("{}/labels/{}".format(harbor_base_url, "{}"), "DELETE", 200, label_payload, True)
|
||||
create_label = Permission("{}/labels".format(harbor_base_url), "POST", 201, label_payload, "id", id_from_header=True)
|
||||
list_label = Permission("{}/labels?scope=p&project_id={}".format(harbor_base_url, project_id), "GET", 200)
|
||||
read_label = Permission("{}/labels/{}".format(harbor_base_url, ID_PLACEHOLDER), "GET", 200, label_payload, payload_id_field="id")
|
||||
update_label = Permission("{}/labels/{}".format(harbor_base_url, ID_PLACEHOLDER), "PUT", 200, label_payload, payload_id_field="id")
|
||||
delete_label = Permission("{}/labels/{}".format(harbor_base_url, ID_PLACEHOLDER), "DELETE", 200, label_payload, payload_id_field="id")
|
||||
|
||||
# 2. Resource: project, actions: ['read', 'update', 'delete']
|
||||
project_payload_for_update = {"project_name": "test", "metadata": {"public": "false"}, "storage_limit": -1}
|
||||
read_project = Permission("{}/projects/{}".format(harbor_base_url, project_id), "GET", 200, project_payload_for_update,
|
||||
False)
|
||||
update_project = Permission("{}/projects/{}".format(harbor_base_url, project_id), "PUT", 200,
|
||||
project_payload_for_update, False)
|
||||
delete_project = Permission("{}/projects/{}".format(harbor_base_url, project_id), "DELETE", 412,
|
||||
project_payload_for_update, False)
|
||||
deletable_project = Permission("{}/projects/{}/_deletable".format(harbor_base_url, project_id), "GET", 200,
|
||||
project_payload_for_update, False)
|
||||
project_payload = {"project_name": "test", "metadata": {"public": "false"}, "storage_limit": -1}
|
||||
read_project = Permission("{}/projects/{}".format(harbor_base_url, project_id), "GET", 200)
|
||||
update_project = Permission("{}/projects/{}".format(harbor_base_url, project_id), "PUT", 200, project_payload)
|
||||
delete_project = Permission("{}/projects/{}".format(harbor_base_url, project_id), "DELETE", 412)
|
||||
deletable_project = Permission("{}/projects/{}/_deletable".format(harbor_base_url, project_id), "GET", 200)
|
||||
|
||||
# 3. Resource: metadata actions: ['read', 'list', 'create', 'update', 'delete'],
|
||||
metadata_payload = {
|
||||
"auto_scan": "true"
|
||||
}
|
||||
create_metadata = Permission("{}/projects/{}/metadatas".format(harbor_base_url, project_id), "POST", 200,
|
||||
metadata_payload, False)
|
||||
list_metadata = Permission("{}/projects/{}/metadatas".format(harbor_base_url, project_id), "GET", 200, metadata_payload,
|
||||
False, )
|
||||
read_metadata = Permission("{}/projects/{}/metadatas/auto_scan".format(harbor_base_url, project_id), "GET", 200,
|
||||
metadata_payload, False)
|
||||
metadata_payload_for_update = {
|
||||
"auto_scan": "false"
|
||||
}
|
||||
update_metadata = Permission("{}/projects/{}/metadatas/auto_scan".format(harbor_base_url, project_id), "PUT", 200,
|
||||
metadata_payload_for_update, False)
|
||||
delete_metadata = Permission("{}/projects/{}/metadatas/auto_scan".format(harbor_base_url, project_id), "DELETE", 200,
|
||||
metadata_payload, False)
|
||||
metadata_payload = { "auto_scan": "true" }
|
||||
create_metadata = Permission("{}/projects/{}/metadatas".format(harbor_base_url, project_id), "POST", 200, metadata_payload)
|
||||
list_metadata = Permission("{}/projects/{}/metadatas".format(harbor_base_url, project_id), "GET", 200, metadata_payload)
|
||||
read_metadata = Permission("{}/projects/{}/metadatas/auto_scan".format(harbor_base_url, project_id), "GET", 200, metadata_payload)
|
||||
metadata_payload_for_update = { "auto_scan": "false" }
|
||||
update_metadata = Permission("{}/projects/{}/metadatas/auto_scan".format(harbor_base_url, project_id), "PUT", 200, metadata_payload_for_update)
|
||||
delete_metadata = Permission("{}/projects/{}/metadatas/auto_scan".format(harbor_base_url, project_id), "DELETE", 200, metadata_payload)
|
||||
|
||||
# 4. Resource: repository actions: ['read', 'list', 'update', 'delete', 'pull', 'push']
|
||||
# note: pull and push are for docker cli, no API needs them
|
||||
list_repo = Permission("{}/projects/{}/repositories".format(harbor_base_url, project_name), "GET", 200)
|
||||
read_repo = Permission("{}/projects/{}/repositories/does_not_exist".format(harbor_base_url, project_name), "GET", 404)
|
||||
repo_payload_for_update = {
|
||||
}
|
||||
update_repo = Permission("{}/projects/{}/repositories/does_not_exist".format(harbor_base_url, project_name), "PUT", 404,
|
||||
repo_payload_for_update)
|
||||
delete_repo = Permission("{}/projects/{}/repositories/does_not_exist".format(harbor_base_url, project_name), "DELETE",
|
||||
404)
|
||||
update_repo = Permission("{}/projects/{}/repositories/does_not_exist".format(harbor_base_url, project_name), "PUT", 404, {})
|
||||
delete_repo = Permission("{}/projects/{}/repositories/does_not_exist".format(harbor_base_url, project_name), "DELETE", 404)
|
||||
|
||||
# 5. Resource artifact actions: ['read', 'list', 'create', 'delete'],
|
||||
list_artifact = Permission("{}/projects/{}/repositories/does_not_exist/artifacts".format(harbor_base_url, project_name),
|
||||
"GET", 200)
|
||||
read_artifact = Permission(
|
||||
"{}/projects/{}/repositories/does_not_exist/artifacts/reference_does_not_exist".format(harbor_base_url,
|
||||
project_name), "GET", 404)
|
||||
copy_artifact = Permission(
|
||||
"{}/projects/{}/repositories/target_repo/artifacts?from={}/{}:{}".format(harbor_base_url, project_name,
|
||||
project_name, source_artifact_name,
|
||||
source_artifact_tag), "POST", 201)
|
||||
delete_artifact = Permission(
|
||||
"{}/projects/{}/repositories/target_repo/artifacts/{}".format(harbor_base_url, project_name, source_artifact_tag),
|
||||
"DELETE", 200)
|
||||
list_artifact = Permission("{}/projects/{}/repositories/{}/artifacts".format(harbor_base_url, project_name, source_artifact_name), "GET", 200)
|
||||
read_artifact = Permission("{}/projects/{}/repositories/{}/artifacts/{}".format(harbor_base_url, project_name, source_artifact_name, source_artifact_tag), "GET", 200)
|
||||
copy_artifact = Permission("{}/projects/{}/repositories/target_repo/artifacts?from={}/{}:{}".format(harbor_base_url, project_name, project_name, source_artifact_name, source_artifact_tag), "POST", 201)
|
||||
delete_artifact = Permission("{}/projects/{}/repositories/target_repo/artifacts/{}".format(harbor_base_url, project_name, source_artifact_tag), "DELETE", 200)
|
||||
|
||||
# 6. Resource scan actions: ['read', 'create', 'stop']
|
||||
create_scan = Permission(
|
||||
"{}/projects/{}/repositories/{}/artifacts/{}/scan".format(harbor_base_url, project_name, source_artifact_name,
|
||||
source_artifact_tag), "POST", 202)
|
||||
stop_scan = Permission(
|
||||
"{}/projects/{}/repositories/{}/artifacts/{}/scan/stop".format(harbor_base_url, project_name, source_artifact_name,
|
||||
source_artifact_tag), "POST", 202)
|
||||
read_scan = Permission(
|
||||
"{}/projects/{}/repositories/{}/artifacts/{}/scan/0/log".format(harbor_base_url, project_name, source_artifact_name,
|
||||
source_artifact_tag), "get", 404)
|
||||
create_scan = Permission("{}/projects/{}/repositories/{}/artifacts/{}/scan".format(harbor_base_url, project_name, source_artifact_name, source_artifact_tag), "POST", 202)
|
||||
stop_scan = Permission("{}/projects/{}/repositories/{}/artifacts/{}/scan/stop".format(harbor_base_url, project_name, source_artifact_name, source_artifact_tag), "POST", 202)
|
||||
read_scan = Permission("{}/projects/{}/repositories/{}/artifacts/{}/scan/0/log".format(harbor_base_url, project_name, source_artifact_name, source_artifact_tag), "get", 404)
|
||||
|
||||
# 7. Resource tag actions: ['list', 'create', 'delete']
|
||||
tag_payload = {
|
||||
"name": "test-{}".format(int(round(time.time() * 1000)))
|
||||
}
|
||||
create_tag = Permission(
|
||||
"{}/projects/{}/repositories/{}/artifacts/{}/tags".format(harbor_base_url, project_name, source_artifact_name,
|
||||
source_artifact_tag), "POST", 201, tag_payload)
|
||||
list_tag = Permission(
|
||||
"{}/projects/{}/repositories/{}/artifacts/{}/tags".format(harbor_base_url, project_name, source_artifact_name,
|
||||
source_artifact_tag), "GET", 200)
|
||||
delete_tag = Permission(
|
||||
"{}/projects/{}/repositories/{}/artifacts/{}/tags/tag_name_does_not_exist".format(harbor_base_url, project_name,
|
||||
source_artifact_name,
|
||||
source_artifact_tag), "DELETE",
|
||||
404)
|
||||
tag_payload = { "name": "test-{}".format(int(random.randint(1000, 9999))) }
|
||||
create_tag = Permission("{}/projects/{}/repositories/{}/artifacts/{}/tags".format(harbor_base_url, project_name, source_artifact_name, source_artifact_tag), "POST", 201, tag_payload)
|
||||
list_tag = Permission("{}/projects/{}/repositories/{}/artifacts/{}/tags".format(harbor_base_url, project_name, source_artifact_name, source_artifact_tag), "GET", 200)
|
||||
delete_tag = Permission("{}/projects/{}/repositories/{}/artifacts/{}/tags/{}".format(harbor_base_url, project_name, source_artifact_name, source_artifact_tag, tag_payload['name']), "DELETE", 200)
|
||||
|
||||
# 8. Resource accessory actions: ['list']
|
||||
list_accessory = Permission(
|
||||
"{}/projects/{}/repositories/{}/artifacts/{}/accessories".format(harbor_base_url, project_name,
|
||||
source_artifact_name, source_artifact_tag), "GET",
|
||||
200)
|
||||
list_accessory = Permission("{}/projects/{}/repositories/{}/artifacts/{}/accessories".format(harbor_base_url, project_name, source_artifact_name, source_artifact_tag), "GET", 200)
|
||||
|
||||
# 9. Resource artifact-addition actions: ['read']
|
||||
read_artifact_addition_vul = Permission(
|
||||
"{}/projects/{}/repositories/{}/artifacts/{}/additions/vulnerabilities".format(harbor_base_url, project_name,
|
||||
source_artifact_name,
|
||||
source_artifact_tag), "GET", 200)
|
||||
read_artifact_addition_dependencies = Permission(
|
||||
"{}/projects/{}/repositories/{}/artifacts/{}/additions/dependencies".format(harbor_base_url, project_name,
|
||||
source_artifact_name,
|
||||
source_artifact_tag), "GET", 400)
|
||||
read_artifact_addition_vul = Permission("{}/projects/{}/repositories/{}/artifacts/{}/additions/vulnerabilities".format(harbor_base_url, project_name, source_artifact_name, source_artifact_tag), "GET", 200)
|
||||
read_artifact_addition_dependencies = Permission("{}/projects/{}/repositories/{}/artifacts/{}/additions/dependencies".format(harbor_base_url, project_name, source_artifact_name, source_artifact_tag), "GET", 400)
|
||||
|
||||
# 10. Resource artifact-label actions: ['create', 'delete'],
|
||||
artifact_label_payload = copy.deepcopy(label_payload)
|
||||
artifact_label_payload["description"] = "Add label to an artifact"
|
||||
add_label_to_artifact = Permission(
|
||||
"{}/projects/{}/repositories/{}/artifacts/{}/labels".format(harbor_base_url, project_name, source_artifact_name,
|
||||
source_artifact_tag), "POST", 404,
|
||||
artifact_label_payload)
|
||||
delete_artifact_label = Permission(
|
||||
"{}/projects/{}/repositories/{}/artifacts/{}/labels/0".format(harbor_base_url, project_name, source_artifact_name,
|
||||
source_artifact_tag), "DELETE", 404,
|
||||
artifact_label_payload)
|
||||
label_id = None
|
||||
artifact_label_payload = None
|
||||
if "artifact-label" in resources or "all" == resources:
|
||||
label_payload = {
|
||||
"name": "label-name-{}".format(int(random.randint(1000, 9999))),
|
||||
"project_id": int(project_id),
|
||||
"scope": "p"
|
||||
}
|
||||
response = requests.post("{}/labels".format(harbor_base_url), data=json.dumps(label_payload), verify=False, auth=(admin_user_name, admin_password), headers={"Content-Type": "application/json"})
|
||||
label_id = int(response.headers["Location"].split("/")[-1])
|
||||
artifact_label_payload = { "id": label_id }
|
||||
add_label_to_artifact = Permission("{}/projects/{}/repositories/{}/artifacts/{}/labels".format(harbor_base_url, project_name, source_artifact_name, source_artifact_tag), "POST", 200, artifact_label_payload)
|
||||
delete_artifact_label = Permission("{}/projects/{}/repositories/{}/artifacts/{}/labels/{}".format(harbor_base_url, project_name, source_artifact_name, source_artifact_tag, label_id), "DELETE", 200)
|
||||
|
||||
# 11. Resource scanner actions: ['create', 'read']
|
||||
update_project_scanner = Permission("{}/projects/{}/scanner".format(harbor_base_url, project_id), "PUT", 200,
|
||||
{"uuid": "faked_uuid"})
|
||||
update_project_scanner = Permission("{}/projects/{}/scanner".format(harbor_base_url, project_id), "PUT", 200, {"uuid": "faked_uuid"})
|
||||
read_project_scanner = Permission("{}/projects/{}/scanner".format(harbor_base_url, project_id), "GET", 200)
|
||||
read_project_scanner_candidates = Permission("{}/projects/{}/scanner/candidates".format(harbor_base_url, project_id),
|
||||
"GET", 200)
|
||||
read_project_scanner_candidates = Permission("{}/projects/{}/scanner/candidates".format(harbor_base_url, project_id), "GET", 200)
|
||||
|
||||
# 12. Resource preheat-policy actions: ['read', 'list', 'create', 'update', 'delete']
|
||||
create_preheat_policy = Permission("{}/projects/{}/preheat/policies".format(harbor_base_url, project_name), "POST", 500,
|
||||
{})
|
||||
create_preheat_policy = Permission("{}/projects/{}/preheat/policies".format(harbor_base_url, project_name), "POST", 500, {})
|
||||
list_preheat_policy = Permission("{}/projects/{}/preheat/policies".format(harbor_base_url, project_name), "GET", 200)
|
||||
read_preheat_policy = Permission(
|
||||
"{}/projects/{}/preheat/policies/policy_name_does_not_exist".format(harbor_base_url, project_name), "GET", 404)
|
||||
update_preheat_policy = Permission(
|
||||
"{}/projects/{}/preheat/policies/policy_name_does_not_exist".format(harbor_base_url, project_name), "PUT", 500)
|
||||
delete_preheat_policy = Permission(
|
||||
"{}/projects/{}/preheat/policies/policy_name_does_not_exist".format(harbor_base_url, project_name), "DELETE", 404)
|
||||
read_preheat_policy = Permission("{}/projects/{}/preheat/policies/policy_name_does_not_exist".format(harbor_base_url, project_name), "GET", 404)
|
||||
update_preheat_policy = Permission("{}/projects/{}/preheat/policies/policy_name_does_not_exist".format(harbor_base_url, project_name), "PUT", 500)
|
||||
delete_preheat_policy = Permission("{}/projects/{}/preheat/policies/policy_name_does_not_exist".format(harbor_base_url, project_name), "DELETE", 404)
|
||||
|
||||
# 13. Resource immutable-tag actions: ['list', 'create', 'update', 'delete']
|
||||
immutable_tag_rule_payload = {
|
||||
"disabled": False,
|
||||
"scope_selectors": {
|
||||
"repository": [{"kind": "doublestar", "decoration": "repoMatches",
|
||||
"pattern": "{}".format(int(round(time.time() * 1000)))}]},
|
||||
"tag_selectors": [
|
||||
{"kind": "doublestar", "decoration": "matches", "pattern": "{}".format(int(round(time.time() * 1000)))}],
|
||||
"repository": [{"kind": "doublestar", "decoration": "repoMatches", "pattern": "{}".format(int(random.randint(1000, 9999)))}]},
|
||||
"tag_selectors": [{"kind": "doublestar", "decoration": "matches", "pattern": "{}".format(int(random.randint(1000, 9999)))}],
|
||||
}
|
||||
create_immutable_tag_rule = Permission("{}/projects/{}/immutabletagrules".format(harbor_base_url, project_id), "POST",
|
||||
201,
|
||||
immutable_tag_rule_payload)
|
||||
create_immutable_tag_rule = Permission("{}/projects/{}/immutabletagrules".format(harbor_base_url, project_id), "POST", 201, immutable_tag_rule_payload)
|
||||
list_immutable_tag_rule = Permission("{}/projects/{}/immutabletagrules".format(harbor_base_url, project_id), "GET", 200)
|
||||
update_immutable_tag_rule = Permission("{}/projects/{}/immutabletagrules/0".format(harbor_base_url, project_id), "PUT",
|
||||
404)
|
||||
delete_immutable_tag_rule = Permission("{}/projects/{}/immutabletagrules/0".format(harbor_base_url, project_id),
|
||||
"DELETE", 404)
|
||||
update_immutable_tag_rule = Permission("{}/projects/{}/immutabletagrules/0".format(harbor_base_url, project_id), "PUT", 404)
|
||||
delete_immutable_tag_rule = Permission("{}/projects/{}/immutabletagrules/0".format(harbor_base_url, project_id), "DELETE", 404)
|
||||
|
||||
# 14. Resource tag-retention actions: ['read', 'list', 'create', 'update', 'delete']
|
||||
tag_retention_rule_payload = {
|
||||
@ -256,53 +186,26 @@ tag_retention_rule_payload = {
|
||||
}
|
||||
}
|
||||
|
||||
# 15. Resource tag-retention actions: ['read', 'list', 'create', 'update', 'delete']
|
||||
if "tag-retention" in resources or "all" == resources:
|
||||
requests.delete("{}/projects/{}/metadatas/retention_id".format(harbor_base_url, project_id), verify=False, auth=(admin_user_name, admin_password))
|
||||
create_tag_retention_rule = Permission("{}/retentions".format(harbor_base_url), "POST", 201, tag_retention_rule_payload, "id", id_from_header=True)
|
||||
read_tag_retention = Permission("{}/retentions/{}".format(harbor_base_url, ID_PLACEHOLDER), "GET", 200, tag_retention_rule_payload, payload_id_field="id")
|
||||
update_tag_retention = Permission("{}/retentions/{}".format(harbor_base_url, ID_PLACEHOLDER), "PUT", 200, tag_retention_rule_payload, payload_id_field="id")
|
||||
execute_tag_retention = Permission("{}/retentions/888/executions".format(harbor_base_url), "POST", 400, tag_retention_rule_payload, payload_id_field="id")
|
||||
list_tag_retention_execution = Permission("{}/retentions/{}/executions".format(harbor_base_url, ID_PLACEHOLDER), "GET", 200, tag_retention_rule_payload, payload_id_field="id")
|
||||
tag_retention_rule_payload["action"] = "stop"
|
||||
stop_tag_retention = Permission("{}/retentions/{}/executions/888".format(harbor_base_url, ID_PLACEHOLDER), "PATCH", 404, tag_retention_rule_payload, payload_id_field="id")
|
||||
list_tag_retention_tasks = Permission("{}/retentions/{}/executions/888/tasks".format(harbor_base_url, ID_PLACEHOLDER), "GET", 404, tag_retention_rule_payload, payload_id_field="id")
|
||||
read_tag_retention_tasks = Permission("{}/retentions/{}/executions/888/tasks/888".format(harbor_base_url, ID_PLACEHOLDER), "GET", 404, tag_retention_rule_payload, payload_id_field="id")
|
||||
delete_tag_retention = Permission("{}/retentions/{}".format(harbor_base_url, ID_PLACEHOLDER), "DELETE", 200, tag_retention_rule_payload, payload_id_field="id")
|
||||
|
||||
def get_retention_id() -> str:
|
||||
# create retention rule fist
|
||||
# this request can be failed(retention rule existed) or succeeded, but we can finally get the retention id
|
||||
requests.request("POST", "{}/retentions".format(harbor_base_url),
|
||||
data=json.dumps(tag_retention_rule_payload), verify=False,
|
||||
auth=(admin_name, admin_password), headers={"Content-Type": "application/json"})
|
||||
response1 = requests.request("GET", "{}/projects/{}/metadatas/retention_id".format(harbor_base_url, project_id),
|
||||
data=None, verify=False,
|
||||
auth=(admin_name, admin_password), headers={"Content-Type": "application/json"})
|
||||
retention_id = project_id
|
||||
if "retention_id" in json.loads(response1.text):
|
||||
retention_id = json.loads(response1.text)["retention_id"]
|
||||
return retention_id
|
||||
|
||||
|
||||
# because get_retention_id() has been called, so the expected status code is 400
|
||||
create_tag_retention_rule = Permission("{}/retentions".format(harbor_base_url), "POST",
|
||||
400,
|
||||
tag_retention_rule_payload)
|
||||
|
||||
update_retention_payload = copy.deepcopy(tag_retention_rule_payload)
|
||||
update_retention_payload["rules"][0]["disabled"] = True
|
||||
read_tag_retention = Permission("{}/retentions/{}".format(harbor_base_url, get_retention_id()), "GET", 200)
|
||||
update_tag_retention = Permission("{}/retentions/{}".format(harbor_base_url, get_retention_id()), "PUT", 200,
|
||||
update_retention_payload)
|
||||
delete_tag_retention = Permission("{}/retentions/{}".format(harbor_base_url, get_retention_id()), "DELETE", 200)
|
||||
execute_tag_retention = Permission("{}/retentions/{}/executions".format(harbor_base_url, get_retention_id()), "POST",
|
||||
201)
|
||||
list_tag_retention_execution = Permission("{}/retentions/{}/executions".format(harbor_base_url, get_retention_id()),
|
||||
"GET",
|
||||
200)
|
||||
stop_tag_retention = Permission("{}/retentions/{}/executions/0".format(harbor_base_url, get_retention_id()), "PATCH",
|
||||
404,
|
||||
{"action": "stop"})
|
||||
list_tag_retention_tasks = Permission("{}/retentions/{}/executions/0/tasks".format(harbor_base_url, get_retention_id()),
|
||||
"GET", 404)
|
||||
read_tag_retention_tasks = Permission(
|
||||
"{}/retentions/{}/executions/0/tasks/0".format(harbor_base_url, get_retention_id()),
|
||||
"GET", 404)
|
||||
|
||||
# 15. Resource log actions: ['list']
|
||||
# 16. Resource log actions: ['list']
|
||||
list_log = Permission("{}/projects/{}/logs".format(harbor_base_url, project_name), "GET", 200)
|
||||
|
||||
# 16. Resource notification-policy actions: ['read', 'list', 'create', 'update', 'delete']
|
||||
# 17. Resource notification-policy actions: ['read', 'list', 'create', 'update', 'delete']
|
||||
webhook_payload = {
|
||||
"name": "webhook-{}".format(int(round(time.time() * 1000))),
|
||||
"name": "webhook-{}".format(int(random.randint(1000, 9999))),
|
||||
"description": "Just for test",
|
||||
"project_id": int(project_id),
|
||||
"targets": [
|
||||
@ -318,26 +221,15 @@ webhook_payload = {
|
||||
],
|
||||
"enabled": True
|
||||
}
|
||||
|
||||
create_webhook = Permission("{}/projects/{}/webhook/policies".format(harbor_base_url, project_id), "POST",
|
||||
201,
|
||||
webhook_payload)
|
||||
list_webhook = Permission("{}/projects/{}/webhook/policies".format(harbor_base_url, project_id), "GET",
|
||||
200)
|
||||
read_webhook = Permission("{}/projects/{}/webhook/policies/0".format(harbor_base_url, project_id), "GET",
|
||||
404)
|
||||
update_webhook = Permission("{}/projects/{}/webhook/policies/0".format(harbor_base_url, project_id), "PUT",
|
||||
404, {})
|
||||
delete_webhook = Permission("{}/projects/{}/webhook/policies/0".format(harbor_base_url, project_id), "DELETE",
|
||||
404)
|
||||
|
||||
list_webhook_executions = Permission("{}/projects/{}/webhook/policies/0/executions".format(harbor_base_url, project_id),
|
||||
"GET", 404)
|
||||
list_webhook_executions_tasks = Permission(
|
||||
"{}/projects/{}/webhook/policies/0/executions/0/tasks".format(harbor_base_url, project_id), "GET", 404)
|
||||
read_webhook_executions_tasks = Permission(
|
||||
"{}/projects/{}/webhook/policies/0/executions/0/tasks/0/log".format(harbor_base_url, project_id), "GET", 404)
|
||||
create_webhook = Permission("{}/projects/{}/webhook/policies".format(harbor_base_url, project_id), "POST", 201, webhook_payload, "id", id_from_header=True)
|
||||
list_webhook = Permission("{}/projects/{}/webhook/policies".format(harbor_base_url, project_id), "GET", 200)
|
||||
read_webhook = Permission("{}/projects/{}/webhook/policies/{}".format(harbor_base_url, project_id, ID_PLACEHOLDER), "GET", 200, webhook_payload, payload_id_field="id")
|
||||
update_webhook = Permission("{}/projects/{}/webhook/policies/{}".format(harbor_base_url, project_id, ID_PLACEHOLDER), "PUT", 200, webhook_payload, payload_id_field="id")
|
||||
list_webhook_executions = Permission("{}/projects/{}/webhook/policies/{}/executions".format(harbor_base_url, project_id, ID_PLACEHOLDER), "GET", 200, webhook_payload, payload_id_field="id")
|
||||
list_webhook_executions_tasks = Permission("{}/projects/{}/webhook/policies/{}/executions/888/tasks".format(harbor_base_url, project_id, ID_PLACEHOLDER), "GET", 404, webhook_payload, payload_id_field="id")
|
||||
read_webhook_executions_tasks = Permission("{}/projects/{}/webhook/policies/{}/executions/888/tasks/888/log".format(harbor_base_url, project_id, ID_PLACEHOLDER), "GET", 404, webhook_payload, payload_id_field="id")
|
||||
list_webhook_events = Permission("{}/projects/{}/webhook/events".format(harbor_base_url, project_id), "GET", 200)
|
||||
delete_webhook = Permission("{}/projects/{}/webhook/policies/{}".format(harbor_base_url, project_id, ID_PLACEHOLDER), "DELETE", 200, webhook_payload, payload_id_field="id")
|
||||
|
||||
resource_permissions = {
|
||||
"label": [create_label, list_label, read_label, update_label, delete_label],
|
||||
@ -351,27 +243,23 @@ resource_permissions = {
|
||||
"artifact-addition": [read_artifact_addition_vul, read_artifact_addition_dependencies],
|
||||
"artifact-label": [add_label_to_artifact, delete_artifact_label],
|
||||
"scanner": [update_project_scanner, read_project_scanner, read_project_scanner_candidates],
|
||||
"preheat-policy": [create_preheat_policy, list_preheat_policy, read_preheat_policy, update_preheat_policy,
|
||||
delete_preheat_policy],
|
||||
"immutable-tag": [create_immutable_tag_rule, list_immutable_tag_rule, update_immutable_tag_rule,
|
||||
delete_immutable_tag_rule],
|
||||
"tag-retention": [create_tag_retention_rule, read_tag_retention, update_tag_retention, execute_tag_retention,
|
||||
list_tag_retention_execution, stop_tag_retention, list_tag_retention_tasks,
|
||||
read_tag_retention_tasks, delete_tag_retention],
|
||||
"preheat-policy": [create_preheat_policy, list_preheat_policy, read_preheat_policy, update_preheat_policy, delete_preheat_policy],
|
||||
"immutable-tag": [create_immutable_tag_rule, list_immutable_tag_rule, update_immutable_tag_rule, delete_immutable_tag_rule],
|
||||
"tag-retention": [create_tag_retention_rule, read_tag_retention, update_tag_retention, execute_tag_retention, list_tag_retention_execution, stop_tag_retention, list_tag_retention_tasks, read_tag_retention_tasks, delete_tag_retention],
|
||||
"log": [list_log],
|
||||
"notification-policy": [create_webhook, list_webhook, read_webhook, update_webhook, delete_webhook,
|
||||
list_webhook_executions, list_webhook_executions_tasks, read_webhook_executions_tasks,
|
||||
list_webhook_events]
|
||||
"notification-policy": [create_webhook, list_webhook, read_webhook, update_webhook, list_webhook_executions, list_webhook_executions_tasks, read_webhook_executions_tasks, list_webhook_events, delete_webhook]
|
||||
}
|
||||
resource_permissions["all"] = [item for sublist in resource_permissions.values() for item in sublist]
|
||||
|
||||
|
||||
def main():
|
||||
for permission in resource_permissions[resource]:
|
||||
print("=================================================")
|
||||
print("call: {} {}".format(permission.method, permission.url))
|
||||
print("payload: {}".format(json.dumps(permission.payload)))
|
||||
print("=================================================\n")
|
||||
permission.call()
|
||||
for resource in resources.split(","):
|
||||
for permission in resource_permissions[resource]:
|
||||
print("=================================================")
|
||||
print("call: {} {}".format(permission.method, permission.url))
|
||||
print("payload: {}".format(json.dumps(permission.payload)))
|
||||
print("response: {}".format(permission.call().text))
|
||||
print("=================================================\n")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
@ -11,7 +11,7 @@ password = os.environ.get("PASSWORD")
|
||||
admin_user_name = os.environ.get("ADMIN_USER_NAME")
|
||||
admin_password = os.environ.get("ADMIN_PASSWORD")
|
||||
harbor_base_url = os.environ.get("HARBOR_BASE_URL")
|
||||
resource = os.environ.get("RESOURCE")
|
||||
resources = os.environ.get("RESOURCES")
|
||||
ID_PLACEHOLDER = "(id)"
|
||||
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
||||
|
||||
@ -38,13 +38,11 @@ class Permission:
|
||||
self.payload[self.payload_id_field] = int(json.loads(response.text)[self.res_id_field])
|
||||
elif self.res_id_field and self.payload_id_field and self.id_from_header == True:
|
||||
self.payload[self.payload_id_field] = int(response.headers["Location"].split("/")[-1])
|
||||
return response
|
||||
|
||||
|
||||
resource_permissions = {}
|
||||
# audit logs permissions start
|
||||
list_audit_logs = Permission("{}/audit-logs".format(harbor_base_url), "GET", 200)
|
||||
audit_log = [ list_audit_logs ]
|
||||
resource_permissions["audit-log"] = audit_log
|
||||
# audit logs permissions end
|
||||
|
||||
# preheat instance permissions start
|
||||
@ -62,8 +60,6 @@ read_preheat_instance = Permission("{}/p2p/preheat/instances/{}".format(harbor_b
|
||||
update_preheat_instance = Permission("{}/p2p/preheat/instances/{}".format(harbor_base_url, preheat_instance_payload["name"]), "PUT", 200, preheat_instance_payload)
|
||||
delete_preheat_instance = Permission("{}/p2p/preheat/instances/{}".format(harbor_base_url, preheat_instance_payload["name"]), "DELETE", 200, preheat_instance_payload)
|
||||
ping_preheat_instance = Permission("{}/p2p/preheat/instances/ping".format(harbor_base_url), "POST", 500, preheat_instance_payload)
|
||||
preheat_instances = [ create_preheat_instance, list_preheat_instance, read_preheat_instance, update_preheat_instance, delete_preheat_instance ]
|
||||
resource_permissions["preheat-instance"] = preheat_instances
|
||||
# preheat instance permissions end
|
||||
|
||||
# project permissions start
|
||||
@ -76,8 +72,6 @@ project_payload = {
|
||||
}
|
||||
create_project = Permission("{}/projects".format(harbor_base_url), "POST", 201, project_payload)
|
||||
list_project = Permission("{}/projects".format(harbor_base_url), "GET", 200, project_payload)
|
||||
project = [ create_project, list_project ]
|
||||
resource_permissions["project"] = project
|
||||
# project permissions end
|
||||
|
||||
# registry permissions start
|
||||
@ -100,21 +94,17 @@ registry_ping_payload = {
|
||||
"url": "https://hub.docker.com"
|
||||
}
|
||||
ping_registry = Permission("{}/registries/ping".format(harbor_base_url), "POST", 200, registry_ping_payload)
|
||||
registry = [ create_registry, list_registry, read_registry, info_registry, update_registry, delete_registry, ping_registry ]
|
||||
resource_permissions["registry"] = registry
|
||||
# registry permissions end
|
||||
|
||||
# replication-adapter permissions start
|
||||
list_replication_adapters = Permission("{}/replication/adapters".format(harbor_base_url), "GET", 200)
|
||||
list_replication_adapterinfos = Permission("{}/replication/adapterinfos".format(harbor_base_url), "GET", 200)
|
||||
replication_adapter = [ list_replication_adapters, list_replication_adapterinfos ]
|
||||
resource_permissions["replication-adapter"] = replication_adapter
|
||||
# replication-adapter permissions end
|
||||
|
||||
# replication policy permissions start
|
||||
replication_registry_id = None
|
||||
replication_registry_name = "replication-registry-{}".format(random.randint(1000, 9999))
|
||||
if resource == "replication-policy":
|
||||
if "replication-policy" in resources or "all" == resources:
|
||||
result = urlsplit(harbor_base_url)
|
||||
endpoint_URL = "{}://{}".format(result.scheme, result.netloc)
|
||||
replication_registry_payload = {
|
||||
@ -162,8 +152,6 @@ list_replication_policy = Permission("{}/replication/policies".format(harbor_bas
|
||||
read_replication_policy = Permission("{}/replication/policies/{}".format(harbor_base_url, ID_PLACEHOLDER), "GET", 200, replication_policy_payload, payload_id_field="id")
|
||||
update_replication_policy = Permission("{}/replication/policies/{}".format(harbor_base_url, ID_PLACEHOLDER), "PUT", 200, replication_policy_payload, payload_id_field="id")
|
||||
delete_replication_policy = Permission("{}/replication/policies/{}".format(harbor_base_url, ID_PLACEHOLDER), "DELETE", 200, replication_policy_payload, payload_id_field="id")
|
||||
replication_and_policy = [ create_replication_policy, list_replication_policy, read_replication_policy, update_replication_policy, delete_replication_policy ]
|
||||
resource_permissions["replication-policy"] = replication_and_policy
|
||||
# replication policy permissions end
|
||||
|
||||
# replication permissions start
|
||||
@ -171,7 +159,7 @@ replication_policy_id = None
|
||||
replication_policy_name = "replication-policy-{}".format(random.randint(1000, 9999))
|
||||
result = urlsplit(harbor_base_url)
|
||||
endpoint_URL = "{}://{}".format(result.scheme, result.netloc)
|
||||
if resource == "replication":
|
||||
if "replication" in resources or "all" == resources:
|
||||
replication_registry_payload = {
|
||||
"credential": {
|
||||
"access_key": admin_user_name,
|
||||
@ -223,8 +211,6 @@ read_replication_execution = Permission("{}/replication/executions/{}".format(ha
|
||||
stop_replication_execution = Permission("{}/replication/executions/{}".format(harbor_base_url, ID_PLACEHOLDER), "PUT", 200, replication_execution_payload, payload_id_field="id")
|
||||
list_replication_execution_tasks = Permission("{}/replication/executions/{}/tasks".format(harbor_base_url, ID_PLACEHOLDER), "GET", 200, replication_execution_payload, payload_id_field="id")
|
||||
read_replication_execution_task = Permission("{}/replication/executions/{}/tasks/{}".format(harbor_base_url, ID_PLACEHOLDER, 1), "GET", 404, replication_execution_payload, payload_id_field="id")
|
||||
replication = [ create_replication_execution, list_replication_execution, read_replication_execution, stop_replication_execution, list_replication_execution_tasks, read_replication_execution_task ]
|
||||
resource_permissions["replication"] = replication
|
||||
# replication permissions end
|
||||
|
||||
# scan all permissions start
|
||||
@ -245,14 +231,10 @@ update_scan_all_schedule = Permission("{}/system/scanAll/schedule".format(harbor
|
||||
stop_scan_all = Permission("{}/system/scanAll/stop".format(harbor_base_url), "POST", 202)
|
||||
scan_all_metrics = Permission("{}/scans/all/metrics".format(harbor_base_url), "GET", 200)
|
||||
scan_all_schedule_metrics = Permission("{}/scans/schedule/metrics".format(harbor_base_url), "GET", 200)
|
||||
scan_all = [ create_scan_all_schedule, update_scan_all_schedule, stop_scan_all, scan_all_metrics, scan_all_schedule_metrics ]
|
||||
resource_permissions["scan-all"] = scan_all
|
||||
# scan all permissions end
|
||||
|
||||
# system volumes permissions start
|
||||
read_system_volumes = Permission("{}/systeminfo/volumes".format(harbor_base_url), "GET", 200)
|
||||
system_volumes = [ read_system_volumes ]
|
||||
resource_permissions["system-volumes"] = system_volumes
|
||||
# system volumes permissions end
|
||||
|
||||
# jobservice monitor permissions start
|
||||
@ -262,8 +244,6 @@ stop_jobservice_job = Permission("{}/jobservice/jobs/{}".format(harbor_base_url,
|
||||
get_jobservice_job_log = Permission("{}/jobservice/jobs/{}/log".format(harbor_base_url, "88888888"), "GET", 500)
|
||||
list_jobservice_queue = Permission("{}/jobservice/queues".format(harbor_base_url), "GET", 200)
|
||||
stop_jobservice = Permission("{}/jobservice/queues/{}".format(harbor_base_url, "88888888"), "PUT", 200, payload={ "action": "stop" })
|
||||
jobservice_monitor = [ list_jobservice_pool, list_jobservice_pool_worker, stop_jobservice_job, get_jobservice_job_log, list_jobservice_queue, stop_jobservice ]
|
||||
resource_permissions["jobservice-monitor"] = jobservice_monitor
|
||||
# jobservice monitor permissions end
|
||||
|
||||
# scanner permissions start
|
||||
@ -283,8 +263,6 @@ update_scanner = Permission("{}/scanners/{}".format(harbor_base_url, "88888888")
|
||||
delete_scanner = Permission("{}/scanners/{}".format(harbor_base_url, "88888888"), "DELETE", 404)
|
||||
set_default_scanner = Permission("{}/scanners/{}".format(harbor_base_url, "88888888"), "PATCH", 404, payload={ "is_default": True })
|
||||
get_scanner_metadata = Permission("{}/scanners/{}/metadata".format(harbor_base_url, "88888888"), "GET", 404)
|
||||
scanner = [ list_scanner, create_scanner, ping_scanner, read_scanner, update_scanner, delete_scanner, set_default_scanner, get_scanner_metadata ]
|
||||
resource_permissions["scanner"] = scanner
|
||||
# scanner permissions end
|
||||
|
||||
# system label permissions start
|
||||
@ -299,31 +277,44 @@ create_label = Permission("{}/labels".format(harbor_base_url), "POST", 201, labe
|
||||
read_label = Permission("{}/labels/{}".format(harbor_base_url, ID_PLACEHOLDER), "GET", 200, payload=label_payload, payload_id_field="id")
|
||||
update_label = Permission("{}/labels/{}".format(harbor_base_url, ID_PLACEHOLDER), "PUT", 200, payload=label_payload, payload_id_field="id")
|
||||
delete_label = Permission("{}/labels/{}".format(harbor_base_url, ID_PLACEHOLDER), "DELETE", 200, payload=label_payload, payload_id_field="id")
|
||||
label = [ create_label, read_label, update_label, delete_label ]
|
||||
resource_permissions["label"] = label
|
||||
# system label permissions end
|
||||
|
||||
# security hub permissions start
|
||||
read_summary = Permission("{}/security/summary".format(harbor_base_url), "GET", 200)
|
||||
list_vul = Permission("{}/security/vul".format(harbor_base_url), "GET", 200)
|
||||
security_hub = [ read_summary, list_vul ]
|
||||
resource_permissions["security-hub"] = security_hub
|
||||
# security hub permissions end
|
||||
|
||||
# catalog permissions start
|
||||
read_catalog = Permission("{}/v2/_catalog".format(endpoint_URL), "GET", 200)
|
||||
catalog = [ read_catalog ]
|
||||
resource_permissions["catalog"] = catalog
|
||||
# catalog permissions end
|
||||
|
||||
resource_permissions = {
|
||||
"audit-log": [list_audit_logs],
|
||||
"preheat-instance": [create_preheat_instance, list_preheat_instance, read_preheat_instance, update_preheat_instance, delete_preheat_instance],
|
||||
"project": [create_project, list_project],
|
||||
"registry": [create_registry, list_registry, read_registry, info_registry, update_registry, delete_registry, ping_registry],
|
||||
"replication-adapter": [list_replication_adapters, list_replication_adapterinfos],
|
||||
"replication-policy": [create_replication_policy, list_replication_policy, read_replication_policy, update_replication_policy, delete_replication_policy],
|
||||
"replication": [create_replication_execution, list_replication_execution, read_replication_execution, stop_replication_execution, list_replication_execution_tasks, read_replication_execution_task],
|
||||
"scan-all": [create_scan_all_schedule, update_scan_all_schedule, stop_scan_all, scan_all_metrics, scan_all_schedule_metrics],
|
||||
"system-volumes": [read_system_volumes],
|
||||
"jobservice-monitor": [list_jobservice_pool, list_jobservice_pool_worker, stop_jobservice_job, get_jobservice_job_log, list_jobservice_queue, stop_jobservice],
|
||||
"scanner": [list_scanner, create_scanner, ping_scanner, read_scanner, update_scanner, delete_scanner, set_default_scanner, get_scanner_metadata],
|
||||
"label": [create_label, read_label, update_label, delete_label],
|
||||
"security-hub": [read_summary, list_vul],
|
||||
"catalog": [read_catalog]
|
||||
}
|
||||
resource_permissions["all"] = [item for sublist in resource_permissions.values() for item in sublist]
|
||||
|
||||
|
||||
def main():
|
||||
for permission in resource_permissions[resource]:
|
||||
print("=================================================")
|
||||
print("call: {} {}".format(permission.method, permission.url))
|
||||
print("payload: {}".format(json.dumps(permission.payload)))
|
||||
print("=================================================\n")
|
||||
permission.call()
|
||||
for resource in resources.split(","):
|
||||
for permission in resource_permissions[resource]:
|
||||
print("=================================================")
|
||||
print("call: {} {}".format(permission.method, permission.url))
|
||||
print("payload: {}".format(json.dumps(permission.payload)))
|
||||
print("response: {}".format(permission.call().text))
|
||||
print("=================================================\n")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
Loading…
Reference in New Issue
Block a user