He Weiwei 88fcacd4b7
feat(middleware): add blob middlewares (#10710)
1. Add middleware to record the accepted blob size for stream blob
2. Add middleware to create blob and associate it with project after blob upload
3. Add middleware to sync blobs, create blob for manifest and associate blobs
with the manifest after put manifest.
4. Add middleware to associate blob with project after mount blob.
5. Cleanup associations for the project when artifact deleted.

Signed-off-by: He Weiwei <hweiwei@vmware.com>
2020-02-20 23:20:34 +08:00

42 lines
1.5 KiB

import time
import os
import sys
sys.path.insert(0, os.environ["SWAGGER_CLIENT_PATH"])
from swagger_client.rest import ApiException
import swagger_client.models
from pprint import pprint
admin_user = "admin"
admin_pwd = "Harbor12345"
harbor_server = os.environ["HARBOR_HOST"]
ADMIN_CLIENT=dict(endpoint = os.environ.get("HARBOR_HOST_SCHEMA", "https")+ "://"+harbor_server+"/api/v2.0", username = admin_user, password = admin_pwd)
TEARDOWN = os.environ.get('TEARDOWN', 'true').lower() in ('true', 'yes')
def GetProductApi(username, password, harbor_server= os.environ["HARBOR_HOST"]):
cfg = swagger_client.Configuration()
cfg.host = "https://"+harbor_server+"/api/v2.0"
cfg.username = username
cfg.password = password
cfg.verify_ssl = False
cfg.debug = True
api_client = swagger_client.ApiClient(cfg)
api_instance = swagger_client.ProductsApi(api_client)
return api_instance
class TestResult(object):
def __init__(self):
self.num_errors = 0
self.error_message = []
def add_test_result(self, error_message):
self.num_errors = self.num_errors + 1
def get_final_result(self):
if self.num_errors > 0:
for each_err_msg in self.error_message:
print "Error message:", each_err_msg
raise Exception(r"Test case failed with {} errors.".format(self.num_errors))