harbor/tests/apitests/python/library/system.py
Yan 8d3946a0e2
Refactor scan all api (#7120)
* Refactor scan all api

This commit is to let scan all api using admin job to handle schedule
management. After the PR, GC and scan all share unified code path.

Signed-off-by: wang yan <wangyan@vmware.com>

* update admin job api code according to review comments

Signed-off-by: wang yan <wangyan@vmware.com>

* Update test code and comments per review

Signed-off-by: wang yan <wangyan@vmware.com>
2019-03-22 17:52:21 +08:00

169 lines
8.9 KiB
Python

# -*- coding: utf-8 -*-
import time
import re
import base
import swagger_client
from swagger_client.rest import ApiException
class System(base.Base):
def get_gc_history(self, expect_status_code = 200, expect_response_body = None, **kwargs):
client = self._get_client(**kwargs)
try:
data, status_code, _ = client.system_gc_get_with_http_info()
except ApiException as e:
if e.status == expect_status_code:
if expect_response_body is not None and e.body.strip() != expect_response_body.strip():
raise Exception(r"Get configuration response body is not as expected {} actual status is {}.".format(expect_response_body.strip(), e.body.strip()))
else:
return e.reason, e.body
else:
raise Exception(r"Get configuration result is not as expected {} actual status is {}.".format(expect_status_code, e.status))
base._assert_status_code(expect_status_code, status_code)
return data
def get_gc_status_by_id(self, job_id, expect_status_code = 200, expect_response_body = None, **kwargs):
client = self._get_client(**kwargs)
try:
data, status_code, _ = client.system_gc_id_get_with_http_info(job_id)
except ApiException as e:
if e.status == expect_status_code:
if expect_response_body is not None and e.body.strip() != expect_response_body.strip():
raise Exception(r"Get configuration response body is not as expected {} actual status is {}.".format(expect_response_body.strip(), e.body.strip()))
else:
return e.reason, e.body
else:
raise Exception(r"Get configuration result is not as expected {} actual status is {}.".format(expect_status_code, e.status))
base._assert_status_code(expect_status_code, status_code)
return data
def get_gc_log_by_id(self, job_id, expect_status_code = 200, expect_response_body = None, **kwargs):
client = self._get_client(**kwargs)
try:
data, status_code, _ = client.system_gc_id_log_get_with_http_info(job_id)
except ApiException as e:
if e.status == expect_status_code:
if expect_response_body is not None and e.body.strip() != expect_response_body.strip():
raise Exception(r"Get configuration response body is not as expected {} actual status is {}.".format(expect_response_body.strip(), e.body.strip()))
else:
return e.reason, e.body
else:
raise Exception(r"Get configuration result is not as expected {} actual status is {}.".format(expect_status_code, e.status))
base._assert_status_code(expect_status_code, status_code)
return data
def get_gc_schedule(self, expect_status_code = 200, expect_response_body = None, **kwargs):
client = self._get_client(**kwargs)
try:
data, status_code, _ = client.system_gc_schedule_get_with_http_info()
except ApiException as e:
if e.status == expect_status_code:
if expect_response_body is not None and e.body.strip() != expect_response_body.strip():
raise Exception(r"Get configuration response body is not as expected {} actual status is {}.".format(expect_response_body.strip(), e.body.strip()))
else:
return e.reason, e.body
else:
raise Exception(r"Get configuration result is not as expected {} actual status is {}.".format(expect_status_code, e.status))
base._assert_status_code(expect_status_code, status_code)
return data
def set_gc_schedule(self, schedule_type = 'None', cron = None, expect_status_code = 200, expect_response_body = None, **kwargs):
client = self._get_client(**kwargs)
gc_schedule = swagger_client.AdminJobSchedule()
gc_schedule.type = schedule_type
if cron is not None:
gc_schedule.cron = cron
try:
data, status_code, _ = client.system_gc_schedule_put_with_http_info(gc_schedule)
except ApiException as e:
if e.status == expect_status_code:
if expect_response_body is not None and e.body.strip() != expect_response_body.strip():
raise Exception(r"Get configuration response body is not as expected {} actual status is {}.".format(expect_response_body.strip(), e.body.strip()))
else:
return e.reason, e.body
else:
raise Exception(r"Get configuration result is not as expected {} actual status is {}.".format(expect_status_code, e.status))
base._assert_status_code(expect_status_code, status_code)
return data
def create_gc_schedule(self, schedule_type, cron = None, expect_status_code = 201, expect_response_body = None, **kwargs):
client = self._get_client(**kwargs)
gcscheduleschedule = swagger_client.AdminJobScheduleObj()
gcscheduleschedule.type = schedule_type
if cron is not None:
gcscheduleschedule.cron = cron
gc_schedule = swagger_client.AdminJobSchedule(gcscheduleschedule)
try:
_, status_code, header = client.system_gc_schedule_post_with_http_info(gc_schedule)
except ApiException as e:
if e.status == expect_status_code:
if expect_response_body is not None and e.body.strip() != expect_response_body.strip():
raise Exception(r"Create GC schedule response body is not as expected {} actual status is {}.".format(expect_response_body.strip(), e.body.strip()))
else:
return e.reason, e.body
else:
raise Exception(r"Create GC schedule result is not as expected {} actual status is {}.".format(expect_status_code, e.status))
base._assert_status_code(expect_status_code, status_code)
return base._get_id_from_header(header)
def create_scan_all_schedule(self, schedule_type, cron = None, expect_status_code = 201, expect_response_body = None, **kwargs):
client = self._get_client(**kwargs)
scanschedule = swagger_client.AdminJobScheduleObj()
scanschedule.type = schedule_type
if cron is not None:
scanschedule.cron = cron
scan_all_schedule = swagger_client.AdminJobSchedule(scanschedule)
try:
_, status_code, header = client.system_scan_all_schedule_post_with_http_info(scan_all_schedule)
except ApiException as e:
if e.status == expect_status_code:
if expect_response_body is not None and e.body.strip() != expect_response_body.strip():
raise Exception(r"Create Scan All schedule response body is not as expected {} actual status is {}.".format(expect_response_body.strip(), e.body.strip()))
else:
return e.reason, e.body
else:
raise Exception(r"Create Scan All schedule result is not as expected {} actual status is {}.".format(expect_status_code, e.status))
base._assert_status_code(expect_status_code, status_code)
return base._get_id_from_header(header)
def scan_now(self, **kwargs):
scan_all_id = self.create_scan_all_schedule('Manual', **kwargs)
return scan_all_id
def gc_now(self, **kwargs):
gc_id = self.create_gc_schedule('Manual', **kwargs)
return gc_id
def validate_gc_job_status(self, gc_id, expected_gc_status, **kwargs):
get_gc_status_finish = False
timeout_count = 20
while not (get_gc_status_finish):
time.sleep(5)
status = self.get_gc_status_by_id(gc_id, **kwargs)
if status.job_status == expected_gc_status:
get_gc_status_finish = True
timeout_count = timeout_count - 1
if not (get_gc_status_finish):
raise Exception("Scan image result is not as expected {} actual scan status is {}".format(expected_scan_status, actual_scan_status))
def validate_deletion_success(self, gc_id, **kwargs):
log_content = self.get_gc_log_by_id(gc_id, **kwargs)
key_message = "manifests eligible for deletion"
key_message_pos = log_content.find(key_message)
full_message = log_content[key_message_pos-30 : key_message_pos + len(key_message)]
deleted_files_count_list = re.findall(r'\s+(\d+)\s+blobs\s+and\s+\d+\s+manifests\s+eligible\s+for\s+deletion', full_message)
if len(deleted_files_count_list) != 1:
raise Exception(r"Fail to get blobs eligible for deletion in log file, failure is {}.".format(len(deleted_files_count_list)))
deleted_files_count = int(deleted_files_count_list[0])
if deleted_files_count == 0:
raise Exception(r"Get blobs eligible for deletion count is {}, while we expect more than 1.".format(deleted_files_count))