mirror of
https://github.com/goharbor/harbor.git
synced 2024-12-26 10:38:00 +01:00
Merge pull request #13431 from danfengliu/add-tag-immutability-py-tests
Add API test scripts for tag immutability and scan signed image
This commit is contained in:
commit
5cc890c78f
@ -2893,7 +2893,7 @@ paths:
|
||||
schema:
|
||||
type: array
|
||||
items:
|
||||
$ref: '#/definitions/ImmutableTagRule'
|
||||
$ref: '#/definitions/RetentionRule'
|
||||
'400':
|
||||
description: Illegal format of provided ID value.
|
||||
'401':
|
||||
@ -2913,10 +2913,11 @@ paths:
|
||||
format: int64
|
||||
required: true
|
||||
description: Relevant project ID.
|
||||
- name: immutabletagrule
|
||||
- name: RetentionRule
|
||||
in: body
|
||||
required: true
|
||||
schema:
|
||||
$ref: '#/definitions/ImmutableTagRule'
|
||||
$ref: '#/definitions/RetentionRule'
|
||||
tags:
|
||||
- Products
|
||||
responses:
|
||||
@ -2946,10 +2947,11 @@ paths:
|
||||
format: int64
|
||||
required: true
|
||||
description: Immutable tag rule ID.
|
||||
- name: immutabletagrule
|
||||
- name: RetentionRule
|
||||
in: body
|
||||
required: true
|
||||
schema:
|
||||
$ref: '#/definitions/ImmutableTagRule'
|
||||
$ref: '#/definitions/RetentionRule'
|
||||
tags:
|
||||
- Products
|
||||
responses:
|
||||
@ -5407,20 +5409,6 @@ definitions:
|
||||
enabled:
|
||||
type: boolean
|
||||
description: The quota is enable or disable
|
||||
ImmutableTagRule:
|
||||
type: object
|
||||
properties:
|
||||
id:
|
||||
type: integer
|
||||
format: int64
|
||||
project_id:
|
||||
type: integer
|
||||
format: int64
|
||||
tag_filter:
|
||||
type: string
|
||||
enabled:
|
||||
type: boolean
|
||||
|
||||
ScannerRegistration:
|
||||
type: object
|
||||
description: |
|
||||
|
@ -13,7 +13,7 @@ class Artifact(base.Base, object):
|
||||
client = self._get_client(**kwargs)
|
||||
return client.list_artifacts(project_name, repo_name)
|
||||
|
||||
def get_reference_info(self, project_name, repo_name, reference, ignore_not_found = False,**kwargs):
|
||||
def get_reference_info(self, project_name, repo_name, reference, expect_status_code = 200, ignore_not_found = False,**kwargs):
|
||||
client = self._get_client(**kwargs)
|
||||
params = {}
|
||||
if "with_signature" in kwargs:
|
||||
@ -22,12 +22,21 @@ class Artifact(base.Base, object):
|
||||
params["with_tag"] = kwargs["with_tag"]
|
||||
if "with_scan_overview" in kwargs:
|
||||
params["with_scan_overview"] = kwargs["with_scan_overview"]
|
||||
if "with_immutable_status" in kwargs:
|
||||
params["with_immutable_status"] = kwargs["with_immutable_status"]
|
||||
|
||||
try:
|
||||
return client.get_artifact_with_http_info(project_name, repo_name, reference, **params)
|
||||
data, status_code, _ = client.get_artifact_with_http_info(project_name, repo_name, reference, **params)
|
||||
return data
|
||||
except ApiException as e:
|
||||
if e.status == 404 and ignore_not_found == True:
|
||||
return []
|
||||
return None
|
||||
else:
|
||||
raise Exception("Failed to get reference, {} {}".format(e.status, e.body))
|
||||
else:
|
||||
base._assert_status_code(expect_status_code, status_code)
|
||||
base._assert_status_code(200, status_code)
|
||||
return None
|
||||
|
||||
def delete_artifact(self, project_name, repo_name, reference, expect_status_code = 200, expect_response_body = None, **kwargs):
|
||||
client = self._get_client(**kwargs)
|
||||
@ -39,9 +48,9 @@ class Artifact(base.Base, object):
|
||||
if expect_response_body is not None:
|
||||
base._assert_status_body(expect_response_body, e.body)
|
||||
return
|
||||
|
||||
base._assert_status_code(expect_status_code, status_code)
|
||||
base._assert_status_code(200, status_code)
|
||||
else:
|
||||
base._assert_status_code(expect_status_code, status_code)
|
||||
base._assert_status_code(200, status_code)
|
||||
|
||||
def get_addition(self, project_name, repo_name, reference, addition, **kwargs):
|
||||
client = self._get_client(**kwargs)
|
||||
@ -62,10 +71,10 @@ class Artifact(base.Base, object):
|
||||
if expect_response_body is not None:
|
||||
base._assert_status_body(expect_response_body, e.body)
|
||||
return
|
||||
|
||||
base._assert_status_code(expect_status_code, status_code)
|
||||
base._assert_status_code(201, status_code)
|
||||
return data
|
||||
else:
|
||||
base._assert_status_code(expect_status_code, status_code)
|
||||
base._assert_status_code(201, status_code)
|
||||
return data
|
||||
|
||||
def create_tag(self, project_name, repo_name, reference, tag_name, expect_status_code = 201, ignore_conflict = False, **kwargs):
|
||||
client = self._get_client(**kwargs)
|
||||
@ -75,12 +84,19 @@ class Artifact(base.Base, object):
|
||||
except ApiException as e:
|
||||
if e.status == 409 and ignore_conflict == True:
|
||||
return
|
||||
base._assert_status_code(expect_status_code, status_code)
|
||||
else:
|
||||
raise Exception("Create tag error, {}.".format(e.body))
|
||||
else:
|
||||
base._assert_status_code(expect_status_code, status_code)
|
||||
|
||||
def delete_tag(self, project_name, repo_name, reference, tag_name, expect_status_code = 200, **kwargs):
|
||||
client = self._get_client(**kwargs)
|
||||
_, status_code, _ = client.delete_tag_with_http_info(project_name, repo_name, reference, tag_name)
|
||||
base._assert_status_code(expect_status_code, status_code)
|
||||
try:
|
||||
_, status_code, _ = client.delete_tag_with_http_info(project_name, repo_name, reference, tag_name)
|
||||
except ApiException as e:
|
||||
base._assert_status_code(expect_status_code, e.status)
|
||||
else:
|
||||
base._assert_status_code(expect_status_code, status_code)
|
||||
|
||||
def check_image_scan_result(self, project_name, repo_name, reference, expected_scan_status = "Success", **kwargs):
|
||||
timeout_count = 30
|
||||
@ -91,7 +107,7 @@ class Artifact(base.Base, object):
|
||||
if (timeout_count == 0):
|
||||
break
|
||||
artifact = self.get_reference_info(project_name, repo_name, reference, **kwargs)
|
||||
scan_status = artifact[0].scan_overview['application/vnd.scanner.adapter.vuln.report.harbor+json; version=1.0'].scan_status
|
||||
scan_status = artifact.scan_overview['application/vnd.scanner.adapter.vuln.report.harbor+json; version=1.0'].scan_status
|
||||
if scan_status == expected_scan_status:
|
||||
return
|
||||
raise Exception("Scan image result is {}, not as expected {}.".format(scan_status, expected_scan_status))
|
||||
@ -99,10 +115,10 @@ class Artifact(base.Base, object):
|
||||
def check_reference_exist(self, project_name, repo_name, reference, ignore_not_found = False, **kwargs):
|
||||
artifact = self.get_reference_info( project_name, repo_name, reference, ignore_not_found=ignore_not_found, **kwargs)
|
||||
return {
|
||||
0: False,
|
||||
}.get(len(artifact), True)
|
||||
None: False,
|
||||
}.get(artifact, True)
|
||||
|
||||
def waiting_for_reference_exist(self, project_name, repo_name, reference, ignore_not_found = False, period = 60, loop_count = 8, **kwargs):
|
||||
def waiting_for_reference_exist(self, project_name, repo_name, reference, ignore_not_found = True, period = 60, loop_count = 8, **kwargs):
|
||||
_loop_count = loop_count
|
||||
while True:
|
||||
print("Waiting for reference {} round...".format(_loop_count))
|
||||
@ -114,4 +130,4 @@ class Artifact(base.Base, object):
|
||||
if artifact and artifact !=[]:
|
||||
return artifact
|
||||
time.sleep(period)
|
||||
raise Exception("Referencet is not exist {} {} {}.".format(project_name, repo_name, reference))
|
||||
raise Exception("Reference is not exist {} {} {}.".format(project_name, repo_name, reference))
|
||||
|
@ -65,7 +65,7 @@ def _assert_status_code(expect_code, return_code):
|
||||
raise Exception(r"HTTPS status code s not as we expected. Expected {}, while actual HTTPS status code is {}.".format(expect_code, return_code))
|
||||
|
||||
def _assert_status_body(expect_status_body, returned_status_body):
|
||||
if expect_status_body.strip() != returned_status_body.strip():
|
||||
if str(returned_status_body.strip()).lower().find(expect_status_body.lower()) < 0:
|
||||
raise Exception(r"HTTPS status body s not as we expected. Expected {}, while actual HTTPS status body is {}.".format(expect_status_body, returned_status_body))
|
||||
|
||||
def _random_name(prefix):
|
||||
|
@ -132,22 +132,19 @@ class DockerAPI(object):
|
||||
raise Exception(r" Docker tag image {} failed, error is [{}]".format (image, str(err)))
|
||||
|
||||
def docker_image_push(self, harbor_registry, tag, expected_error_message = None):
|
||||
caught_err = False
|
||||
ret = ""
|
||||
ret = None
|
||||
if expected_error_message is "":
|
||||
expected_error_message = None
|
||||
try:
|
||||
self.DCLIENT.push(harbor_registry, tag)
|
||||
return ret
|
||||
ret = self.DCLIENT.push(harbor_registry, tag)
|
||||
except Exception as err:
|
||||
caught_err = True
|
||||
if expected_error_message is not None:
|
||||
print( "docker image push error:", str(err))
|
||||
if str(err).lower().find(expected_error_message.lower()) < 0:
|
||||
raise Exception(r"Push image: Return message {} is not as expected {}".format(str(err), expected_error_message))
|
||||
else:
|
||||
raise Exception(r" Docker push image {} failed, error is [{}]".format (harbor_registry, message))
|
||||
if caught_err == False:
|
||||
else:
|
||||
if expected_error_message is not None:
|
||||
if str(ret).lower().find(expected_error_message.lower()) < 0:
|
||||
raise Exception(r" Failed to catch error [{}] when push image {}, return message: {}".
|
||||
|
@ -13,8 +13,10 @@ def pull_harbor_image(registry, username, password, image, tag, expected_login_e
|
||||
return
|
||||
time.sleep(2)
|
||||
ret = _docker_api.docker_image_pull(r'{}/{}'.format(registry, image), tag = tag, expected_error_message = expected_error_message)
|
||||
print("Docker pull image return message: {}".format(ret))
|
||||
|
||||
def push_image_to_project(project_name, registry, username, password, image, tag, expected_login_error_message = None, expected_error_message = None, profix_for_image = None, new_image=None):
|
||||
print("Start to push image {}/{}/{}:{}".format(registry, project_name, image, tag) )
|
||||
_docker_api = DockerAPI()
|
||||
_docker_api.docker_login(registry, username, password, expected_error_message = expected_login_error_message)
|
||||
time.sleep(2)
|
||||
@ -22,15 +24,14 @@ def push_image_to_project(project_name, registry, username, password, image, tag
|
||||
return
|
||||
_docker_api.docker_image_pull(image, tag = tag)
|
||||
time.sleep(2)
|
||||
|
||||
original_name = image
|
||||
image = new_image or image
|
||||
|
||||
if profix_for_image == None:
|
||||
new_harbor_registry, new_tag = _docker_api.docker_image_tag(r'{}:{}'.format(image, tag), r'{}/{}/{}'.format(registry, project_name, image))
|
||||
new_harbor_registry, new_tag = _docker_api.docker_image_tag(r'{}:{}'.format(original_name, tag), r'{}/{}/{}'.format(registry, project_name, image), tag = tag)
|
||||
else:
|
||||
new_harbor_registry, new_tag = _docker_api.docker_image_tag(r'{}:{}'.format(image, tag), r'{}/{}/{}/{}'.format(registry, project_name, profix_for_image, image))
|
||||
new_harbor_registry, new_tag = _docker_api.docker_image_tag(r'{}:{}'.format(original_name, tag), r'{}/{}/{}/{}'.format(registry, project_name, profix_for_image, image), tag = tag)
|
||||
time.sleep(2)
|
||||
|
||||
_docker_api.docker_image_push(new_harbor_registry, new_tag, expected_error_message = expected_error_message)
|
||||
|
||||
return r'{}/{}'.format(project_name, image), new_tag
|
||||
@ -146,4 +147,4 @@ class Repository(base.Base, object):
|
||||
|
||||
base._assert_status_code(expect_status_code, status_code)
|
||||
base._assert_status_code(200, status_code)
|
||||
return data
|
||||
return data
|
||||
|
@ -1,6 +1,93 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import base
|
||||
import swagger_client
|
||||
|
||||
class Tag_Immutability(base.Base):
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import base
|
||||
import swagger_client
|
||||
from swagger_client.rest import ApiException
|
||||
|
||||
class Tag_Immutability(base.Base):
|
||||
def create_tag_immutability_policy_rule(self, project_id, selector_repository_decoration = "repoMatches",
|
||||
selector_repository="**", selector_tag_decoration = "matches",
|
||||
selector_tag="**", expect_status_code = 201, **kwargs):
|
||||
#repoExcludes,excludes
|
||||
client = self._get_client(**kwargs)
|
||||
retention_rule = swagger_client.RetentionRule(
|
||||
action="immutable",
|
||||
template="immutable_template",
|
||||
priority = 0,
|
||||
scope_selectors={
|
||||
"repository": [
|
||||
{
|
||||
"kind": "doublestar",
|
||||
"decoration": selector_repository_decoration,
|
||||
"pattern": selector_repository
|
||||
}
|
||||
]
|
||||
},
|
||||
tag_selectors=[
|
||||
{
|
||||
"kind": "doublestar",
|
||||
"decoration": selector_tag_decoration,
|
||||
"pattern": selector_tag
|
||||
}
|
||||
]
|
||||
)
|
||||
try:
|
||||
_, status_code, header = client.projects_project_id_immutabletagrules_post_with_http_info(project_id, retention_rule)
|
||||
except ApiException as e:
|
||||
base._assert_status_code(expect_status_code, e.status)
|
||||
else:
|
||||
base._assert_status_code(expect_status_code, status_code)
|
||||
base._assert_status_code(201, status_code)
|
||||
return base._get_id_from_header(header)
|
||||
|
||||
def list_tag_immutability_policy_rules(self, project_id, **kwargs):
|
||||
client = self._get_client(**kwargs)
|
||||
return client.projects_project_id_immutabletagrules_get(project_id)
|
||||
|
||||
def get_rule(self, project_id, rule_id, **kwargs):
|
||||
rules = self.list_tag_immutability_policy_rules(project_id, **kwargs)
|
||||
for r in rules:
|
||||
if r.id == rule_id:
|
||||
return r
|
||||
return None
|
||||
|
||||
def update_tag_immutability_policy_rule(self, project_id, rule_id, selector_repository_decoration = None,
|
||||
selector_repository=None, selector_tag_decoration = None,
|
||||
selector_tag=None, disabled = None, expect_status_code = 200, **kwargs):
|
||||
rule = self.get_rule( project_id, rule_id,**kwargs)
|
||||
if selector_repository_decoration:
|
||||
rule.scope_selectors["repository"][0].decoration = selector_repository_decoration
|
||||
if selector_repository:
|
||||
rule.scope_selectors["repository"][0].pattern = selector_repository
|
||||
if selector_tag_decoration:
|
||||
rule.tag_selectors[0].decoration = selector_tag_decoration
|
||||
if selector_tag:
|
||||
rule.tag_selectors[0].pattern = selector_tag
|
||||
if disabled is not None:
|
||||
rule.disabled = disabled
|
||||
client = self._get_client(**kwargs)
|
||||
try:
|
||||
_, status_code, header = client.projects_project_id_immutabletagrules_id_put_with_http_info(project_id, rule_id, rule)
|
||||
except ApiException as e:
|
||||
base._assert_status_code(expect_status_code, e.status)
|
||||
if expect_response_body is not None:
|
||||
base._assert_status_body(expect_response_body, e.body)
|
||||
else:
|
||||
base._assert_status_code(expect_status_code, status_code)
|
||||
base._assert_status_code(200, status_code)
|
||||
return base._get_id_from_header(header)
|
||||
|
||||
def create_rule(self, project_id, selector_repository_decoration = "repoMatches", selector_repository="**",
|
||||
selector_tag_decoration = "matches", selector_tag="**",
|
||||
expect_status_code = 201, disabled = False, **kwargs):
|
||||
rule_id = self.create_tag_immutability_policy_rule(project_id, selector_repository_decoration = selector_repository_decoration,
|
||||
selector_repository = selector_repository,
|
||||
selector_tag_decoration = selector_tag_decoration,
|
||||
selector_tag = selector_tag, expect_status_code = expect_status_code, **kwargs)
|
||||
if expect_status_code != 201:
|
||||
return
|
||||
self.update_tag_immutability_policy_rule(project_id, rule_id, selector_repository_decoration = selector_repository_decoration,
|
||||
selector_repository = selector_repository, selector_tag_decoration = selector_tag_decoration,
|
||||
selector_tag = selector_tag, disabled = disabled, expect_status_code = 200, **kwargs)
|
||||
return rule_id
|
||||
|
||||
|
@ -76,7 +76,7 @@ class TestAssignRoleToLdapGroup(unittest.TestCase):
|
||||
repo_name_dev, _ = push_image_to_project(project_name, harbor_server, USER_DEV["username"], USER_DEV["password"], USER_DEV["repo"], "latest")
|
||||
artifacts = self.artifact.list_artifacts(project_name, USER_DEV["repo"], **USER_DEV)
|
||||
self.assertTrue(len(artifacts) == 1)
|
||||
push_image_to_project(project_name, harbor_server, USER_GUEST["username"], USER_GUEST["password"], USER_GUEST["repo"], "latest")
|
||||
push_image_to_project(project_name, harbor_server, USER_GUEST["username"], USER_GUEST["password"], USER_GUEST["repo"], "latest", expected_error_message = "unauthorized to access repository")
|
||||
artifacts = self.artifact.list_artifacts(project_name, USER_GUEST["repo"], **USER_GUEST)
|
||||
self.assertTrue(len(artifacts) == 0)
|
||||
|
||||
|
@ -97,19 +97,19 @@ class TestProjects(unittest.TestCase):
|
||||
src_tag_data = self.artifact.get_reference_info(TestProjects.project_src_repo_name, TestProjects.src_repo_name.split('/')[1], tag_name, **TestProjects.USER_RETAG_CLIENT)
|
||||
TestProjects.dst_repo_name = TestProjects.project_dst_repo_name+"/"+ dst_repo_sub_name
|
||||
#8. Retag image in project(PA) to project(PB), it should be forbidden;
|
||||
self.artifact.copy_artifact(TestProjects.project_dst_repo_name, dst_repo_sub_name, TestProjects.src_repo_name+"@"+src_tag_data[0].digest, expect_status_code=403, **TestProjects.USER_RETAG_CLIENT)
|
||||
self.artifact.copy_artifact(TestProjects.project_dst_repo_name, dst_repo_sub_name, TestProjects.src_repo_name+"@"+src_tag_data.digest, expect_status_code=403, **TestProjects.USER_RETAG_CLIENT)
|
||||
|
||||
#9. Update role of user-retag as admin member of project(PB);
|
||||
self.project.update_project_member_role(TestProjects.project_dst_repo_id, retag_member_id, 1, **ADMIN_CLIENT)
|
||||
|
||||
#10. Retag image in project(PA) to project(PB), it should be successful;
|
||||
self.artifact.copy_artifact(TestProjects.project_dst_repo_name, dst_repo_sub_name, TestProjects.src_repo_name+"@"+src_tag_data[0].digest, **TestProjects.USER_RETAG_CLIENT)
|
||||
self.artifact.copy_artifact(TestProjects.project_dst_repo_name, dst_repo_sub_name, TestProjects.src_repo_name+"@"+src_tag_data.digest, **TestProjects.USER_RETAG_CLIENT)
|
||||
|
||||
#11. Get repository(RB)'s image tag detail information;
|
||||
dst_tag_data = self.artifact.get_reference_info(TestProjects.project_dst_repo_name, dst_repo_sub_name, tag_name, **TestProjects.USER_RETAG_CLIENT)
|
||||
|
||||
#12. Read digest of retaged image, it must be the same with the image in repository(RA);
|
||||
self.assertEqual(src_tag_data[0].digest, dst_tag_data[0].digest)
|
||||
self.assertEqual(src_tag_data.digest, dst_tag_data.digest)
|
||||
|
||||
#13. Pull image from project(PB) by user_retag, it must be successful;"
|
||||
pull_harbor_image(harbor_server, user_retag_name, user_retag_password, TestProjects.dst_repo_name, tag_name)
|
||||
|
@ -71,7 +71,7 @@ class TestProjects(unittest.TestCase):
|
||||
|
||||
#4. Image(IA) should exist;
|
||||
artifact = self.artifact.get_reference_info(TestProjects.project_content_trust_name, image, tag, **TestProjects.USER_CONTENT_TRUST_CLIENT)
|
||||
self.assertEqual(artifact[0].tags[0].name, tag)
|
||||
self.assertEqual(artifact.tags[0].name, tag)
|
||||
|
||||
#5. Pull image(IA) successfully;
|
||||
pull_harbor_image(harbor_server, admin_name, admin_password, TestProjects.repo_name, tag)
|
||||
|
@ -114,19 +114,19 @@ class TestProxyCache(unittest.TestCase):
|
||||
|
||||
#10. Manifest index pulled by docker CLI should be cached;
|
||||
ret_index_by_d = self.artifact.waiting_for_reference_exist(project_name, urllib.parse.quote(index_repo_name,'utf-8'), index_for_docker["tag"], **USER_CLIENT)
|
||||
print("Index's reference by docker CLI:",ret_index_by_d[0].references)
|
||||
self.assertTrue(len(ret_index_by_d[0].references) == 1)
|
||||
print("Index's reference by docker CLI:", ret_index_by_d.references)
|
||||
self.assertTrue(len(ret_index_by_d.references) == 1)
|
||||
|
||||
#11. Manifest index pulled by ctr CLI should be cached;
|
||||
ret_index_by_c = self.artifact.waiting_for_reference_exist(project_name, urllib.parse.quote(index_repo_name_for_ctr,'utf-8'), index_for_ctr["tag"], **USER_CLIENT)
|
||||
print("Index's reference by ctr CLI:",ret_index_by_c[0].references)
|
||||
self.assertTrue(len(ret_index_by_c[0].references) == 1)
|
||||
print("Index's reference by ctr CLI:", ret_index_by_c.references)
|
||||
self.assertTrue(len(ret_index_by_c.references) == 1)
|
||||
|
||||
def test_proxy_cache_from_harbor(self):
|
||||
self.do_validate("harbor")
|
||||
|
||||
def test_proxy_cache_from_dockerhub(self):
|
||||
self.do_validate("docker-hub")
|
||||
#def test_proxy_cache_from_dockerhub(self):
|
||||
# self.do_validate("docker-hub")
|
||||
|
||||
def suite():
|
||||
suite = unittest.TestSuite(unittest.makeSuite(TestProxyCache))
|
||||
|
@ -76,8 +76,8 @@ class TestProjects(unittest.TestCase):
|
||||
|
||||
#5.1 Get chart(CA) by reference successfully;
|
||||
artifact = self.artifact.get_reference_info(TestProjects.project_push_chart_name, self.repo_name, self.verion, **TestProjects.USER_CLIENT)
|
||||
self.assertEqual(artifact[0].type, 'CHART')
|
||||
self.assertEqual(artifact[0].tags[0].name, self.verion)
|
||||
self.assertEqual(artifact.type, 'CHART')
|
||||
self.assertEqual(artifact.tags[0].name, self.verion)
|
||||
|
||||
#5.2 Chart bundle can be pulled by ctr successfully;
|
||||
#oci_ref = harbor_server+"/"+TestProjects.project_push_chart_name+"/"+self.repo_name+":"+self.verion
|
||||
|
@ -92,8 +92,8 @@ class TestProjects(unittest.TestCase):
|
||||
artifact = self.artifact.get_reference_info(TestProjects.project_push_bundle_name, self.cnab_repo_name, reference_sha256, **TestProjects.USER_CLIENT)
|
||||
|
||||
#8. Verify artifact information;
|
||||
self.assertEqual(artifact[0].type, 'CNAB')
|
||||
self.assertEqual(artifact[0].digest, reference_sha256)
|
||||
self.assertEqual(artifact.type, 'CNAB')
|
||||
self.assertEqual(artifact.digest, reference_sha256)
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
@ -67,7 +67,7 @@ class TestProjects(unittest.TestCase):
|
||||
|
||||
#5. Get and verify artifacts by tag;
|
||||
artifact = self.artifact.get_reference_info(TestProjects.project_name, self.repo_name, self.tag, **TestProjects.USER_CLIENT)
|
||||
self.assertEqual(artifact[0].tags[0].name, self.tag)
|
||||
self.assertEqual(artifact.tags[0].name, self.tag)
|
||||
|
||||
#6. ORAS CLI pull artifacts index by tag;
|
||||
md5_list_pull = library.oras.oras_pull(harbor_server, user_name, user_001_password, TestProjects.project_name, self.repo_name, self.tag)
|
||||
|
@ -80,7 +80,7 @@ class TestProjects(unittest.TestCase):
|
||||
full_name = urllib.parse.quote(profix+"/"+image,'utf-8')
|
||||
|
||||
artifact = self.artifact.get_reference_info(TestProjects.project_sign_image_name, full_name, tag, **TestProjects.USER_sign_image_CLIENT)
|
||||
self.assertEqual(artifact[0].type, 'IMAGE')
|
||||
self.assertEqual(artifact.type, 'IMAGE')
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
@ -95,10 +95,10 @@ class TestProjects(unittest.TestCase):
|
||||
|
||||
#6. Get index(IA) by reference successfully;
|
||||
index_data = self.artifact.get_reference_info(TestProjects.project_push_index_name, self.index_name, self.index_tag, **TestProjects.USER_CLIENT)
|
||||
manifests_sha256_harbor_ret = [index_data[0].references[1].child_digest, index_data[0].references[0].child_digest]
|
||||
manifests_sha256_harbor_ret = [index_data.references[1].child_digest, index_data.references[0].child_digest]
|
||||
|
||||
#7. Verify harbor index is index(IA) pushed by docker manifest CLI;
|
||||
self.assertEqual(index_data[0].digest, index_sha256_cli_ret)
|
||||
self.assertEqual(index_data.digest, index_sha256_cli_ret)
|
||||
self.assertEqual(manifests_sha256_harbor_ret.count(manifests_sha256_cli_ret[0]), 1)
|
||||
self.assertEqual(manifests_sha256_harbor_ret.count(manifests_sha256_cli_ret[1]), 1)
|
||||
|
||||
|
@ -67,7 +67,7 @@ class TestProjects(unittest.TestCase):
|
||||
|
||||
#5. Get and verify artifacts by tag;
|
||||
artifact = self.artifact.get_reference_info(TestProjects.project_name, self.repo_name, self.tag, **TestProjects.USER_CLIENT)
|
||||
self.assertEqual(artifact[0].tags[0].name, self.tag)
|
||||
self.assertEqual(artifact.tags[0].name, self.tag)
|
||||
|
||||
#6. Pull sif file from harbor by singularity;
|
||||
library.singularity.singularity_pull(TestProjects.project_name + ".sif", "oras://"+harbor_server + "/" + TestProjects.project_name + "/" + self.repo_name+":"+ self.tag)
|
||||
|
@ -112,12 +112,12 @@ class TestProjects(unittest.TestCase):
|
||||
#List artifacts successfully, and untagged artifact in test1 should be the only one retained;
|
||||
artifacts_1 = self.artifact.list_artifacts(TestProjects.project_src_repo_name, self.repo_name_1, **TestProjects.USER_RA_CLIENT)
|
||||
self.assertTrue(len(artifacts_1)==1)
|
||||
self.assertEqual(artifacts_1[0].digest, tag_data_artifact3_image1[0].digest)
|
||||
self.assertEqual(artifacts_1[0].digest, tag_data_artifact3_image1.digest)
|
||||
|
||||
#List artifacts successfully, and artifact with latest tag in test2 should be the only one retained;
|
||||
artifacts_2 = self.artifact.list_artifacts(TestProjects.project_src_repo_name, self.repo_name_2, **TestProjects.USER_RA_CLIENT)
|
||||
self.assertTrue(len(artifacts_2)==1)
|
||||
self.assertEqual(artifacts_2[0].digest, tag_data_artifact2_image2[0].digest)
|
||||
self.assertEqual(artifacts_2[0].digest, tag_data_artifact2_image2.digest)
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(self):
|
||||
|
@ -1,5 +1,6 @@
|
||||
from __future__ import absolute_import
|
||||
import unittest
|
||||
import sys
|
||||
|
||||
from testutils import harbor_server
|
||||
from testutils import TEARDOWN
|
||||
@ -11,7 +12,9 @@ from library.repository import push_image_to_project
|
||||
from library.artifact import Artifact
|
||||
from library.scan import Scan
|
||||
from library.scanner import Scanner
|
||||
class TestProjects(unittest.TestCase):
|
||||
from library.sign import sign_image
|
||||
|
||||
class TestScan(unittest.TestCase):
|
||||
@classmethod
|
||||
def setUp(self):
|
||||
self.project= Project()
|
||||
@ -21,6 +24,19 @@ class TestProjects(unittest.TestCase):
|
||||
self.scan = Scan()
|
||||
self.scanner = Scanner()
|
||||
|
||||
self.url = ADMIN_CLIENT["endpoint"]
|
||||
self.user_password = "Aa123456"
|
||||
self.project_id, self.project_name, self.user_id, self.user_name = [None] * 4
|
||||
self.user_id, self.user_name = self.user.create_user(user_password = self.user_password, **ADMIN_CLIENT)
|
||||
self.USER_CLIENT = dict(with_signature = True, with_immutable_status = True, endpoint = self.url, username = self.user_name, password = self.user_password, with_scan_overview = True)
|
||||
|
||||
|
||||
#2. Create a new private project(PA) by user(UA);
|
||||
self.project_id, self.project_name = self.project.create_project(metadata = {"public": "false"}, **ADMIN_CLIENT)
|
||||
|
||||
#3. Add user(UA) as a member of project(PA) with project-admin role;
|
||||
self.project.add_project_members(self.project_id, user_id = self.user_id, **ADMIN_CLIENT)
|
||||
|
||||
@classmethod
|
||||
def tearDown(self):
|
||||
print("Case completed")
|
||||
@ -28,13 +44,13 @@ class TestProjects(unittest.TestCase):
|
||||
@unittest.skipIf(TEARDOWN == True, "Test data won't be erased.")
|
||||
def test_ClearData(self):
|
||||
#1. Delete repository(RA) by user(UA);
|
||||
self.repo.delete_repoitory(TestProjects.project_scan_image_name, TestProjects.repo_name.split('/')[1], **TestProjects.USER_SCAN_IMAGE_CLIENT)
|
||||
self.repo.delete_repoitory(self.project_name, TestScan.repo_name.split('/')[1], **self.USER_CLIENTT)
|
||||
|
||||
#2. Delete project(PA);
|
||||
self.project.delete_project(TestProjects.project_scan_image_id, **TestProjects.USER_SCAN_IMAGE_CLIENT)
|
||||
self.project.delete_project(self.project_id, **self.USER_CLIENT)
|
||||
|
||||
#3. Delete user(UA);
|
||||
self.user.delete_user(TestProjects.user_scan_image_id, **ADMIN_CLIENT)
|
||||
self.user.delete_user(self.user_id, **ADMIN_CLIENT)
|
||||
|
||||
def testScanImageArtifact(self):
|
||||
"""
|
||||
@ -54,34 +70,21 @@ class TestProjects(unittest.TestCase):
|
||||
2. Delete project(PA);
|
||||
3. Delete user(UA);
|
||||
"""
|
||||
url = ADMIN_CLIENT["endpoint"]
|
||||
user_001_password = "Aa123456"
|
||||
|
||||
#1. Create user-001
|
||||
TestProjects.user_scan_image_id, user_scan_image_name = self.user.create_user(user_password = user_001_password, **ADMIN_CLIENT)
|
||||
|
||||
TestProjects.USER_SCAN_IMAGE_CLIENT=dict(endpoint = url, username = user_scan_image_name, password = user_001_password, with_scan_overview = True)
|
||||
|
||||
#2. Create a new private project(PA) by user(UA);
|
||||
TestProjects.project_scan_image_id, TestProjects.project_scan_image_name = self.project.create_project(metadata = {"public": "false"}, **ADMIN_CLIENT)
|
||||
|
||||
#3. Add user(UA) as a member of project(PA) with project-admin role;
|
||||
self.project.add_project_members(TestProjects.project_scan_image_id, user_id=TestProjects.user_scan_image_id, **ADMIN_CLIENT)
|
||||
|
||||
#4. Get private project of user(UA), user(UA) can see only one private project which is project(PA);
|
||||
self.project.projects_should_exist(dict(public=False), expected_count = 1,
|
||||
expected_project_id = TestProjects.project_scan_image_id, **TestProjects.USER_SCAN_IMAGE_CLIENT)
|
||||
expected_project_id = self.project_id, **self.USER_CLIENT)
|
||||
|
||||
#Note: Please make sure that this Image has never been pulled before by any other cases,
|
||||
# so it is a not-scanned image right after repository creation.
|
||||
image = "docker"
|
||||
src_tag = "1.13"
|
||||
#5. Create a new repository(RA) and tag(TA) in project(PA) by user(UA);
|
||||
TestProjects.repo_name, tag = push_image_to_project(TestProjects.project_scan_image_name, harbor_server, user_scan_image_name, user_001_password, image, src_tag)
|
||||
TestScan.repo_name, tag = push_image_to_project(self.project_name, harbor_server, self.user_name, self.user_password, image, src_tag)
|
||||
|
||||
#6. Send scan image command and get tag(TA) information to check scan result, it should be finished;
|
||||
self.scan.scan_artifact(TestProjects.project_scan_image_name, TestProjects.repo_name.split('/')[1], tag, **TestProjects.USER_SCAN_IMAGE_CLIENT)
|
||||
self.artifact.check_image_scan_result(TestProjects.project_scan_image_name, image, tag, **TestProjects.USER_SCAN_IMAGE_CLIENT)
|
||||
self.scan.scan_artifact(self.project_name, TestScan.repo_name.split('/')[1], tag, **self.USER_CLIENT)
|
||||
self.artifact.check_image_scan_result(self.project_name, image, tag, **self.USER_CLIENT)
|
||||
|
||||
#7. Swith Scanner;
|
||||
uuid = self.scanner.scanners_get_uuid(**ADMIN_CLIENT)
|
||||
@ -89,10 +92,45 @@ class TestProjects(unittest.TestCase):
|
||||
|
||||
image = "tomcat"
|
||||
src_tag = "latest"
|
||||
TestProjects.repo_name, tag = push_image_to_project(TestProjects.project_scan_image_name, harbor_server, user_scan_image_name, user_001_password, image, src_tag)
|
||||
TestScan.repo_name, tag = push_image_to_project(self.project_name, harbor_server, self.user_name, self.user_password, image, src_tag)
|
||||
#8. Send scan another image command and get tag(TA) information to check scan result, it should be finished.
|
||||
self.scan.scan_artifact(TestProjects.project_scan_image_name, TestProjects.repo_name.split('/')[1], tag, **TestProjects.USER_SCAN_IMAGE_CLIENT)
|
||||
self.artifact.check_image_scan_result(TestProjects.project_scan_image_name, image, tag, **TestProjects.USER_SCAN_IMAGE_CLIENT)
|
||||
self.scan.scan_artifact(self.project_name, TestScan.repo_name.split('/')[1], tag, **self.USER_CLIENT)
|
||||
self.artifact.check_image_scan_result(self.project_name, image, tag, **self.USER_CLIENT)
|
||||
|
||||
def testScanSignedImage(self):
|
||||
"""
|
||||
Test case:
|
||||
Scan A Signed Image
|
||||
Test step and expected result:
|
||||
1. Create a new user(UA);
|
||||
2. Create a new private project(PA) by user(UA);
|
||||
3. Add user(UA) as a member of project(PA) with project-admin role;
|
||||
4. Get private project of user(UA), user(UA) can see only one private project which is project(PA);
|
||||
5. Create a new repository(RA) and tag(TA) in project(PA) by user(UA);
|
||||
6. Send scan image command and get tag(TA) information to check scan result, it should be finished;
|
||||
7. Swith Scanner;
|
||||
8. Send scan another image command and get tag(TA) information to check scan result, it should be finished.
|
||||
Tear down:
|
||||
1. Delete repository(RA) by user(UA);
|
||||
2. Delete project(PA);
|
||||
3. Delete user(UA);
|
||||
"""
|
||||
|
||||
#Note: Please make sure that this Image has never been pulled before by any other cases,
|
||||
# so it is a not-scanned image right after repository creation.
|
||||
image = "redis"
|
||||
tag = "latest"
|
||||
#5. Create a new repository(RA) and tag(TA) in project(PA) by user(UA);
|
||||
TestScan.repo_name_1, tag = push_image_to_project(self.project_name, harbor_server, self.user_name, self.user_password, image, tag)
|
||||
|
||||
sign_image(harbor_server, self.project_name, image, tag)
|
||||
|
||||
#6. Send scan image command and get tag(TA) information to check scan result, it should be finished;
|
||||
self.scan.scan_artifact(self.project_name, TestScan.repo_name_1.split('/')[1], tag, **self.USER_CLIENT)
|
||||
self.artifact.check_image_scan_result(self.project_name, image, tag, **self.USER_CLIENT)
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
suite = unittest.TestSuite(unittest.makeSuite(TestScan))
|
||||
result = unittest.TextTestRunner(sys.stdout, verbosity=2, failfast=True).run(suite)
|
||||
if not result.wasSuccessful():
|
||||
raise Exception(r"Tag immutability test failed: {}".format(result))
|
@ -80,7 +80,7 @@ class TestProjects(unittest.TestCase):
|
||||
|
||||
#7. Get signature of image with tag(TA), it should be exist.
|
||||
artifact = self.artifact.get_reference_info(TestProjects.project_sign_image_name, image, tag, **TestProjects.USER_sign_image_CLIENT)
|
||||
self.assertEqual(artifact[0].tags[0].signed, True)
|
||||
self.assertEqual(artifact.tags[0].signed, True)
|
||||
|
||||
push_special_image_to_project(TestProjects.project_sign_image_name, harbor_server, user_sign_image_name, user_001_password, self.repo_name_1, ['1.0'])
|
||||
self.repo.delete_repoitory(TestProjects.project_sign_image_name, self.repo_name_1, **TestProjects.USER_sign_image_CLIENT)
|
||||
|
@ -79,8 +79,8 @@ class TestProjects(unittest.TestCase):
|
||||
artifact = self.artifact.get_reference_info(TestProjects.project_name, self.repo_name, tag, **TestProjects.USER_CLIENT)
|
||||
|
||||
#6. Verify the image(IA) contains tag named 1.0;
|
||||
self.assertEqual(artifact[0].tags[0].name, "1.0")
|
||||
self.assertEqual(artifact[0].tags[1].name, tag)
|
||||
self.assertEqual(artifact.tags[0].name, "1.0")
|
||||
self.assertEqual(artifact.tags[1].name, tag)
|
||||
|
||||
#7. Delete the tag(1.0) from image(IA);
|
||||
self.artifact.delete_tag(TestProjects.project_name, self.repo_name, tag, "1.0",**TestProjects.USER_CLIENT)
|
||||
@ -89,7 +89,7 @@ class TestProjects(unittest.TestCase):
|
||||
artifact = self.artifact.get_reference_info(TestProjects.project_name, self.repo_name, tag, **TestProjects.USER_CLIENT)
|
||||
|
||||
#9. Verify the image(IA) contains no tag named 1.0;
|
||||
self.assertEqual(artifact[0].tags[0].name, tag)
|
||||
self.assertEqual(artifact.tags[0].name, tag)
|
||||
|
||||
|
||||
|
@ -0,0 +1,300 @@
|
||||
from __future__ import absolute_import
|
||||
|
||||
|
||||
import unittest
|
||||
import sys
|
||||
|
||||
from testutils import ADMIN_CLIENT
|
||||
from testutils import harbor_server
|
||||
from library.project import Project
|
||||
from library.user import User
|
||||
from library.repository import Repository
|
||||
from library.repository import push_image_to_project
|
||||
from library.registry import Registry
|
||||
from library.artifact import Artifact
|
||||
from library.tag_immutability import Tag_Immutability
|
||||
from library.repository import push_special_image_to_project
|
||||
|
||||
class TestTagImmutability(unittest.TestCase):
|
||||
@classmethod
|
||||
def setUpClass(self):
|
||||
self.url = ADMIN_CLIENT["endpoint"]
|
||||
self.user_password = "Aa123456"
|
||||
self.project= Project()
|
||||
self.user= User()
|
||||
self.repo= Repository()
|
||||
self.registry = Registry()
|
||||
self.artifact = Artifact()
|
||||
self.tag_immutability = Tag_Immutability()
|
||||
self.project_id, self.project_name, self.user_id, self.user_name = [None] * 4
|
||||
self.user_id, self.user_name = self.user.create_user(user_password = self.user_password, **ADMIN_CLIENT)
|
||||
self.USER_CLIENT = dict(with_signature = True, with_immutable_status = True, endpoint = self.url, username = self.user_name, password = self.user_password)
|
||||
self.exsiting_rule = dict(selector_repository="rel*", selector_tag="v2.*")
|
||||
self.project_id, self.project_name = self.project.create_project(metadata = {"public": "false"}, **self.USER_CLIENT)
|
||||
|
||||
def check_tag_immutability(self, artifact, tag_name, status = True):
|
||||
for tag in artifact.tags:
|
||||
if tag.name == tag_name:
|
||||
self.assertTrue(tag.immutable == status)
|
||||
return
|
||||
raise Exception("No tag {} found in artifact {}".format(tag, artifact))
|
||||
|
||||
def test_disability_of_rules(self):
|
||||
"""
|
||||
Test case:
|
||||
Test Disability Of Rules
|
||||
Test step and expected result:
|
||||
1. Create a new project;
|
||||
2. Push image A to the project with 2 tags A and B;
|
||||
3. Create a disabled rule matched image A with tag A;
|
||||
4. Both tags of image A should not be immutable;
|
||||
5. Enable this rule;
|
||||
6. image A with tag A should be immutable.
|
||||
"""
|
||||
image_a = dict(name="image_disability_a", tag1="latest", tag2="6.2.2")
|
||||
|
||||
#1. Create a new project;
|
||||
project_id, project_name = self.project.create_project(metadata = {"public": "false"}, **self.USER_CLIENT)
|
||||
|
||||
#2. Push image A to the project with 2 tags;
|
||||
push_special_image_to_project(project_name, harbor_server, self.user_name, self.user_password, image_a["name"], [image_a["tag1"], image_a["tag2"]])
|
||||
|
||||
#3. Create a disabled rule matched image A;
|
||||
rule_id = self.tag_immutability.create_rule(project_id, disabled = True, selector_repository=image_a["name"], selector_tag=str(image_a["tag1"])[0:2] + "*", **self.USER_CLIENT)
|
||||
|
||||
#4. Both tags of image A should not be immutable;
|
||||
artifact_a = self.artifact.get_reference_info(project_name, image_a["name"], image_a["tag2"], **self.USER_CLIENT)
|
||||
print("[test_disability_of_rules] - artifact:{}".format(artifact_a))
|
||||
self.assertTrue(artifact_a)
|
||||
self.check_tag_immutability(artifact_a, image_a["tag1"], status = False)
|
||||
self.check_tag_immutability(artifact_a, image_a["tag2"], status = False)
|
||||
|
||||
#5. Enable this rule;
|
||||
self.tag_immutability.update_tag_immutability_policy_rule(project_id, rule_id, disabled = False, **self.USER_CLIENT)
|
||||
|
||||
#6. image A with tag A should be immutable.
|
||||
artifact_a = self.artifact.get_reference_info(project_name, image_a["name"], image_a["tag2"], **self.USER_CLIENT)
|
||||
print("[test_disability_of_rules] - artifact:{}".format(artifact_a))
|
||||
self.assertTrue(artifact_a)
|
||||
self.check_tag_immutability(artifact_a, image_a["tag1"], status = True)
|
||||
self.check_tag_immutability(artifact_a, image_a["tag2"], status = False)
|
||||
|
||||
def test_artifact_and_repo_is_undeletable(self):
|
||||
"""
|
||||
Test case:
|
||||
Test Artifact And Repo is Undeleteable
|
||||
Test step and expected result:
|
||||
1. Create a new project;
|
||||
2. Push image A to the project with 2 tags A and B;
|
||||
3. Create a enabled rule matched image A with tag A;
|
||||
4. Tag A should be immutable;
|
||||
5. Artifact is undeletable;
|
||||
6. Repository is undeletable.
|
||||
"""
|
||||
image_a = dict(name="image_repo_undeletable_a", tag1="latest", tag2="1.3.2")
|
||||
|
||||
#1. Create a new project;
|
||||
project_id, project_name = self.project.create_project(metadata = {"public": "false"}, **self.USER_CLIENT)
|
||||
|
||||
#2. Push image A to the project with 2 tags A and B;
|
||||
push_special_image_to_project(project_name, harbor_server, self.user_name, self.user_password, image_a["name"], [image_a["tag1"], image_a["tag2"]])
|
||||
|
||||
#3. Create a enabled rule matched image A with tag A;
|
||||
self.tag_immutability.create_rule(project_id, selector_repository=image_a["name"], selector_tag=str(image_a["tag1"])[0:2] + "*", **self.USER_CLIENT)
|
||||
|
||||
#4. Tag A should be immutable;
|
||||
artifact_a = self.artifact.get_reference_info(project_name, image_a["name"], image_a["tag2"], **self.USER_CLIENT)
|
||||
print("[test_artifact_and_repo_is_undeletable] - artifact:{}".format(artifact_a))
|
||||
self.assertTrue(artifact_a)
|
||||
self.check_tag_immutability(artifact_a, image_a["tag1"], status = True)
|
||||
self.check_tag_immutability(artifact_a, image_a["tag2"], status = False)
|
||||
|
||||
#5. Artifact is undeletable;
|
||||
self.artifact.delete_artifact(project_name, image_a["name"], image_a["tag1"], expect_status_code = 412,expect_response_body = "configured as immutable, cannot be deleted", **self.USER_CLIENT)
|
||||
|
||||
#6. Repository is undeletable.
|
||||
self.repo.delete_repoitory(project_name, image_a["name"], expect_status_code = 412, expect_response_body = "configured as immutable, cannot be deleted", **self.USER_CLIENT)
|
||||
|
||||
def test_tag_is_undeletable(self):
|
||||
"""
|
||||
Test case:
|
||||
Test Tag is Undeleteable
|
||||
Test step and expected result:
|
||||
1. Push image A to the project with 2 tags A and B;
|
||||
2. Create a enabled rule matched image A with tag A;
|
||||
3. Tag A should be immutable;
|
||||
4. Tag A is undeletable;
|
||||
5. Tag B is deletable.
|
||||
"""
|
||||
image_a = dict(name="image_undeletable_a", tag1="latest", tag2="9.3.2")
|
||||
|
||||
#1. Push image A to the project with 2 tags A and B;
|
||||
push_special_image_to_project(self.project_name, harbor_server, self.user_name, self.user_password, image_a["name"], [image_a["tag1"], image_a["tag2"]])
|
||||
|
||||
#2. Create a enabled rule matched image A with tag A;
|
||||
self.tag_immutability.create_rule(self.project_id, selector_repository=image_a["name"], selector_tag=str(image_a["tag2"])[0:2] + "*", **self.USER_CLIENT)
|
||||
|
||||
#3. Tag A should be immutable;
|
||||
artifact_a = self.artifact.get_reference_info(self.project_name, image_a["name"], image_a["tag2"], **self.USER_CLIENT)
|
||||
print("[test_tag_is_undeletable] - artifact:{}".format(artifact_a))
|
||||
self.assertTrue(artifact_a)
|
||||
self.check_tag_immutability(artifact_a, image_a["tag2"], status = True)
|
||||
|
||||
#4. Tag A is undeletable;
|
||||
self.artifact.delete_tag(self.project_name, image_a["name"], image_a["tag1"], image_a["tag2"], expect_status_code = 412, **self.USER_CLIENT)
|
||||
|
||||
#5. Tag B is deletable.
|
||||
self.artifact.delete_tag(self.project_name, image_a["name"], image_a["tag1"], image_a["tag1"], **self.USER_CLIENT)
|
||||
|
||||
def test_image_is_unpushable(self):
|
||||
"""
|
||||
Test case:
|
||||
Test Image is Unpushable
|
||||
Test step and expected result:
|
||||
1. Create a new project;
|
||||
2. Push image A to the project with 2 tags A and B;
|
||||
3. Create a enabled rule matched image A with tag A;
|
||||
4. Tag A should be immutable;
|
||||
5. Can not push image with the same image name and with the same tag name.
|
||||
"""
|
||||
image_a = dict(name="image_unpushable_a", tag1="latest", tag2="1.3.2")
|
||||
|
||||
#1. Create a new project;
|
||||
project_id, project_name = self.project.create_project(metadata = {"public": "false"}, **self.USER_CLIENT)
|
||||
|
||||
#2. Push image A to the project with 2 tags A and B;
|
||||
push_special_image_to_project(project_name, harbor_server, self.user_name, self.user_password, image_a["name"], [image_a["tag1"], image_a["tag2"]])
|
||||
|
||||
#3. Create a enabled rule matched image A with tag A;
|
||||
self.tag_immutability.create_rule(project_id, selector_repository=image_a["name"], selector_tag=str(image_a["tag1"])[0:2] + "*", **self.USER_CLIENT)
|
||||
|
||||
#4. Tag A should be immutable;
|
||||
artifact_a = self.artifact.get_reference_info(project_name, image_a["name"], image_a["tag2"], **self.USER_CLIENT)
|
||||
print("[test_image_is_unpushable] - artifact:{}".format(artifact_a))
|
||||
self.assertTrue(artifact_a)
|
||||
self.check_tag_immutability(artifact_a, image_a["tag1"], status = True)
|
||||
self.check_tag_immutability(artifact_a, image_a["tag2"], status = False)
|
||||
|
||||
#5. Can not push image with the same image name and with the same tag name.
|
||||
push_image_to_project(project_name, harbor_server, self.user_name, self.user_password, "tomcat", image_a["tag1"],
|
||||
new_image = image_a["name"], expected_error_message = "configured as immutable")
|
||||
|
||||
def test_copy_disability(self):
|
||||
"""
|
||||
Test case:
|
||||
Test Copy Disability
|
||||
Test step and expected result:
|
||||
1. Create 2 projects;
|
||||
2. Push image A with tag A and B to project A, push image B which has the same image name and tag name to project B;
|
||||
3. Create a enabled rule matched image A with tag A;
|
||||
4. Tag A should be immutable;
|
||||
5. Can not copy artifact from project A to project B with the same repository name.
|
||||
"""
|
||||
image_a = dict(name="image_copy_disability_a", tag1="latest", tag2="1.3.2")
|
||||
|
||||
#1. Create 2 projects;
|
||||
project_id, project_name = self.project.create_project(metadata = {"public": "false"}, **self.USER_CLIENT)
|
||||
_, project_name_src = self.project.create_project(metadata = {"public": "false"}, **self.USER_CLIENT)
|
||||
|
||||
#2. Push image A with tag A and B to project A, push image B which has the same image name and tag name to project B;
|
||||
push_special_image_to_project(project_name, harbor_server, self.user_name, self.user_password, image_a["name"], [image_a["tag1"], image_a["tag2"]])
|
||||
push_special_image_to_project(project_name_src, harbor_server, self.user_name, self.user_password, image_a["name"], [image_a["tag1"], image_a["tag2"]])
|
||||
|
||||
#3. Create a enabled rule matched image A with tag A;
|
||||
self.tag_immutability.create_rule(project_id, selector_repository=image_a["name"], selector_tag=str(image_a["tag1"])[0:2] + "*", **self.USER_CLIENT)
|
||||
|
||||
#4. Tag A should be immutable;
|
||||
artifact_a = self.artifact.get_reference_info(project_name, image_a["name"], image_a["tag2"], **self.USER_CLIENT)
|
||||
print("[test_copy_disability] - artifact:{}".format(artifact_a))
|
||||
self.assertTrue(artifact_a)
|
||||
self.check_tag_immutability(artifact_a, image_a["tag1"], status = True)
|
||||
self.check_tag_immutability(artifact_a, image_a["tag2"], status = False)
|
||||
|
||||
#5. Can not copy artifact from project A to project B with the same repository name.
|
||||
artifact_a_src = self.artifact.get_reference_info(project_name_src, image_a["name"], image_a["tag2"], **self.USER_CLIENT)
|
||||
print("[test_copy_disability] - artifact_a_src:{}".format(artifact_a_src))
|
||||
self.artifact.copy_artifact(project_name, image_a["name"], project_name_src+"/"+ image_a["name"] + "@" + artifact_a_src.digest, expect_status_code=412, expect_response_body = "configured as immutable, cannot be updated", **self.USER_CLIENT)
|
||||
|
||||
#def test_replication_disability(self):
|
||||
# pass
|
||||
|
||||
def test_priority_of_rules(self):
|
||||
"""
|
||||
Test case:
|
||||
Test Priority Of Rules(excluding rule will not affect matching rule)
|
||||
Test step and expected result:
|
||||
1. Push image A, B and C, image A has only 1 tag named tag1;
|
||||
2. Create a matching rule that matches image A and tag named tag2 which is not exist;
|
||||
3. Create a excluding rule to exlude image A and B;
|
||||
4. Add a tag named tag2 to image A, tag2 should be immutable;
|
||||
5. Tag2 should be immutable;
|
||||
6. All tags in image B should be immutable;
|
||||
7. All tags in image C should not be immutable;
|
||||
8. Disable all rules.
|
||||
"""
|
||||
image_a = dict(name="image_priority_a", tag1="latest", tag2="6.3.2")
|
||||
image_b = dict(name="image_priority_b", tag1="latest", tag2="0.12.0")
|
||||
image_c = dict(name="image_priority_c", tag1="latest", tag2="3.12.0")
|
||||
|
||||
#1. Push image A, B and C, image A has only 1 tag named tag1;
|
||||
push_special_image_to_project(self.project_name, harbor_server, self.user_name, self.user_password, image_a["name"], [image_a["tag1"]])
|
||||
push_special_image_to_project(self.project_name, harbor_server, self.user_name, self.user_password, image_b["name"], [image_b["tag1"],image_b["tag2"]])
|
||||
push_special_image_to_project(self.project_name, harbor_server, self.user_name, self.user_password, image_c["name"], [image_c["tag1"],image_c["tag2"]])
|
||||
|
||||
#2. Create a matching rule that matches image A and tag named tag2 which is not exist;
|
||||
rule_id_1 = self.tag_immutability.create_rule(self.project_id, selector_repository=image_a["name"], selector_tag=image_a["tag2"], **self.USER_CLIENT)
|
||||
|
||||
#3. Create a excluding rule to exlude image A and B;
|
||||
rule_id_2 = self.tag_immutability.create_rule(self.project_id, selector_repository_decoration = "repoExcludes",
|
||||
selector_repository="{image_priority_a,image_priority_b}", selector_tag="**", **self.USER_CLIENT)
|
||||
|
||||
#4. Add a tag named tag2 to image A, tag2 should be immutable;
|
||||
self.artifact.create_tag(self.project_name, image_a["name"], image_a["tag1"], image_a["tag2"], **self.USER_CLIENT)
|
||||
|
||||
#5. Tag2 should be immutable;
|
||||
artifact_a = self.artifact.get_reference_info(self.project_name, image_a["name"], image_a["tag2"], **self.USER_CLIENT)
|
||||
print("[test_priority_of_rules] - artifact:{}".format(artifact_a))
|
||||
self.assertTrue(artifact_a)
|
||||
self.check_tag_immutability(artifact_a, image_a["tag2"], status = True)
|
||||
self.check_tag_immutability(artifact_a, image_a["tag1"], status = False)
|
||||
|
||||
#6. All tags in image B should be immutable;
|
||||
artifact_b = self.artifact.get_reference_info(self.project_name, image_b["name"], image_b["tag2"], **self.USER_CLIENT)
|
||||
print("[test_priority_of_rules] - artifact:{}".format(artifact_b))
|
||||
self.assertTrue(artifact_b)
|
||||
self.check_tag_immutability(artifact_b, image_b["tag2"], status = False)
|
||||
self.check_tag_immutability(artifact_b, image_b["tag1"], status = False)
|
||||
|
||||
#7. All tags in image C should not be immutable;
|
||||
artifact_c = self.artifact.get_reference_info(self.project_name, image_c["name"], image_c["tag2"], **self.USER_CLIENT)
|
||||
print("[test_priority_of_rules] - artifact:{}".format(artifact_c))
|
||||
self.assertTrue(artifact_c)
|
||||
self.check_tag_immutability(artifact_c, image_c["tag2"], status = True)
|
||||
self.check_tag_immutability(artifact_c, image_c["tag1"], status = True)
|
||||
|
||||
#8. Disable all rules.
|
||||
self.tag_immutability.update_tag_immutability_policy_rule(self.project_id, rule_id_1, disabled = True, **self.USER_CLIENT)
|
||||
self.tag_immutability.update_tag_immutability_policy_rule(self.project_id, rule_id_2, disabled = True, **self.USER_CLIENT)
|
||||
|
||||
def test_add_exsiting_rule(self):
|
||||
"""
|
||||
Test case:
|
||||
Test Priority Of Rules(excluding rule will not affect matching rule)
|
||||
Test step and expected result:
|
||||
1. Push image A and B with no tag;
|
||||
2. Create a immutability policy rule A;
|
||||
3. Fail to create rule B which has the same config as rule A;
|
||||
"""
|
||||
self.tag_immutability.create_tag_immutability_policy_rule(self.project_id, **self.exsiting_rule, **self.USER_CLIENT)
|
||||
self.tag_immutability.create_tag_immutability_policy_rule(self.project_id, **self.exsiting_rule, expect_status_code = 409, **self.USER_CLIENT)
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(self):
|
||||
print("Case completed")
|
||||
|
||||
if __name__ == '__main__':
|
||||
suite = unittest.TestSuite(unittest.makeSuite(TestTagImmutability))
|
||||
result = unittest.TextTestRunner(sys.stdout, verbosity=2, failfast=True).run(suite)
|
||||
if not result.wasSuccessful():
|
||||
raise Exception(r"Tag immutability test failed: {}".format(result))
|
||||
|
@ -88,7 +88,7 @@ Wait Until Element Is Visible And Enabled
|
||||
|
||||
Retry Action Keyword
|
||||
[Arguments] ${keyword} @{param}
|
||||
Retry Keyword N Times When Error 3 ${keyword} @{param}
|
||||
Retry Keyword N Times When Error 8 ${keyword} @{param}
|
||||
|
||||
Retry Wait Element
|
||||
[Arguments] ${element_xpath}
|
||||
|
@ -105,16 +105,16 @@ Test Case - Push Cnab Bundle
|
||||
[Tags] push_cnab
|
||||
Harbor API Test ./tests/apitests/python/test_push_cnab_bundle.py
|
||||
|
||||
Test Case - Create/Delete tag
|
||||
[Tags] tag_cuid
|
||||
Harbor API Test ./tests/apitests/python/test_create_delete_tag.py
|
||||
Test Case - Tag CRUD
|
||||
[Tags] tag_crud
|
||||
Harbor API Test ./tests/apitests/python/test_tag_crud.py
|
||||
|
||||
Test Case - Scan Image
|
||||
[Tags] scan
|
||||
Harbor API Test ./tests/apitests/python/test_scan_image_artifact.py
|
||||
|
||||
Test Case - Scan Image In Public Project
|
||||
[Tags] scan
|
||||
[Tags] scan_public_project
|
||||
Harbor API Test ./tests/apitests/python/test_scan_image_artifact_in_public_project.py
|
||||
|
||||
Test Case - Scan All Images
|
||||
@ -148,3 +148,7 @@ Test Case - Replication From Dockerhub
|
||||
Test Case - Proxy Cache
|
||||
[Tags] proxy_cache
|
||||
Harbor API Test ./tests/apitests/python/test_proxy_cache.py
|
||||
|
||||
Test Case - Tag Immutability
|
||||
[Tags] tag_immutability
|
||||
Harbor API Test ./tests/apitests/python/test_tag_immutability.py
|
||||
|
Loading…
Reference in New Issue
Block a user