fix(scan): fix the permission checking for artifact scanning

Closes #12778

Signed-off-by: He Weiwei <hweiwei@vmware.com>
This commit is contained in:
He Weiwei 2020-08-17 09:55:38 +00:00
parent 0921beaf4c
commit f659523f50
6 changed files with 92 additions and 11 deletions

View File

@ -48,7 +48,7 @@ func (s *scanAPI) Prepare(ctx context.Context, operation string, params interfac
}
func (s *scanAPI) ScanArtifact(ctx context.Context, params operation.ScanArtifactParams) middleware.Responder {
if err := s.RequireProjectAccess(ctx, params.ProjectName, rbac.ActionRead, rbac.ResourceScan); err != nil {
if err := s.RequireProjectAccess(ctx, params.ProjectName, rbac.ActionCreate, rbac.ResourceScan); err != nil {
return s.SendError(ctx, err)
}

View File

@ -44,6 +44,12 @@ def _create_client(server, credential, debug, api_type="products"):
proxy = proxies.get('http', proxies.get('all', None))
if proxy:
cfg.proxy = proxy
if cfg.username is None and cfg.password is None:
# returns {} for auth_settings for anonymous access
import types
cfg.auth_settings = types.MethodType(lambda self: {}, cfg)
return {
"chart": client.ChartRepositoryApi(client.ApiClient(cfg)),
"products": swagger_client.ProductsApi(swagger_client.ApiClient(cfg)),
@ -108,6 +114,7 @@ class Base(object):
def _get_client(self, **kwargs):
if len(kwargs) == 0:
return self.client
server = self.server
if "endpoint" in kwargs:
server.endpoint = kwargs.get("endpoint")
@ -116,11 +123,11 @@ class Base(object):
server.verify_ssl = kwargs.get("verify_ssl") == "True"
else:
server.verify_ssl = kwargs.get("verify_ssl")
credential = self.credential
if "type" in kwargs:
credential.type = kwargs.get("type")
if "username" in kwargs:
credential.username = kwargs.get("username")
if "password" in kwargs:
credential.password = kwargs.get("password")
credential = Credential(
kwargs.get("type", self.credential.type),
kwargs.get("username", self.credential.username),
kwargs.get("password", self.credential.password),
)
return _create_client(server, credential, self.debug, kwargs.get('api_type', self.api_type))

View File

@ -9,8 +9,16 @@ class Scan(base.Base, object):
def __init__(self):
super(Scan,self).__init__(api_type = "scan")
def scan_artifact(self, project_name, repo_name, reference, expect_status_code = 202, **kwargs):
def scan_artifact(self, project_name, repo_name, reference, expect_status_code = 202, expect_response_body = None, **kwargs):
client = self._get_client(**kwargs)
data, status_code, _ = client.scan_artifact_with_http_info(project_name, repo_name, reference)
try:
data, status_code, _ = client.scan_artifact_with_http_info(project_name, repo_name, reference)
except ApiException as e:
base._assert_status_code(expect_status_code, e.status)
if expect_response_body is not None:
base._assert_status_body(expect_response_body, e.body)
return
base._assert_status_code(expect_status_code, status_code)
return data

View File

@ -0,0 +1,62 @@
from __future__ import absolute_import
import unittest
from testutils import harbor_server
from testutils import created_user, created_project
from library.artifact import Artifact
from library.repository import Repository, push_image_to_project
from library.scan import Scan
class TestScanImageInPublicProject(unittest.TestCase):
@classmethod
def setUp(self):
self.artifact = Artifact()
self.repo = Repository()
self.scan = Scan()
@classmethod
def tearDown(self):
print("Case completed")
def testScanImageInPublicProject(self):
"""
Test case:
Scan An Image Artifact In Public Project
Test step and expected result:
1. Create a new user(UA);
2. Create a new public project(PA) by user(UA);
3. Add user(UA) as a member of project(PA) with project-admin role;
4. Create a new repository(RA) and tag(TA) in project(PA) by user(UA);
5. Send scan image command without credential (anonymous), the API response should be 401;
6. Create a new user(UB) which is non member of the project(PA);
7. Send scan image command with credential of the new created user(UB), the API response should be 403;
8. Delete user(UB);
9. Send scan image command with credential of the user(UA) and get tag(TA) information to check scan result, it should be finished;
10. Delete repository(RA) by user(UA);
11. Delete project(PA);
12. Delete user(UA);
"""
password = 'Aa123456' # nosec
with created_user(password) as (user_id, username):
with created_project(metadata={"public": "true"}, user_id=user_id) as (_, project_name):
image, src_tag = "docker", "1.13"
full_name, tag = push_image_to_project(project_name, harbor_server, username, password, image, src_tag)
repo_name = full_name.split('/')[1]
# scan image with anonymous user
self.scan.scan_artifact(project_name, repo_name, tag, expect_status_code=401, username=None, password=None)
with created_user(password) as (_, username1):
# scan image with non project memeber
self.scan.scan_artifact(project_name, repo_name, tag, expect_status_code=403, username=username1, password=password)
self.scan.scan_artifact(project_name, repo_name, tag, username=username, password=password)
self.artifact.check_image_scan_result(project_name, image, tag, username=username, password=password, with_scan_overview=True)
self.repo.delete_repoitory(project_name, repo_name)
if __name__ == '__main__':
unittest.main()

View File

@ -84,7 +84,7 @@ def created_project(name=None, metadata=None, user_id=None, member_role_id=None)
api = Project()
project_id, project_name = api.create_project(name=None, metadata=None, **ADMIN_CLIENT)
project_id, project_name = api.create_project(name=name, metadata=metadata, **ADMIN_CLIENT)
if user_id:
api.add_project_members(project_id, user_id, member_role_id=member_role_id, **ADMIN_CLIENT)

View File

@ -114,6 +114,10 @@ Test Case - Scan Image
[Tags] scan
Harbor API Test ./tests/apitests/python/test_scan_image_artifact.py
Test Case - Scan Image In Public Project
[Tags] scan
Harbor API Test ./tests/apitests/python/test_scan_image_artifact_in_public_project.py
Test Case - Scan All Images
[Tags] scan_all
Harbor API Test ./tests/apitests/python/test_system_level_scan_all.py