mirror of
https://github.com/goharbor/harbor.git
synced 2024-11-27 04:35:16 +01:00
166 lines
6.6 KiB
Python
166 lines
6.6 KiB
Python
#!/usr/bin/env python
|
|
# -*- coding:utf-8 -*-
|
|
# bug-report: feilengcui008@gmail.com
|
|
|
|
""" api for docker registry """
|
|
|
|
import urllib2
|
|
import urllib
|
|
import json
|
|
import base64
|
|
|
|
|
|
class RegistryException(Exception):
|
|
""" registry api related exception """
|
|
pass
|
|
|
|
|
|
class RegistryApi(object):
|
|
""" interact with docker registry and harbor """
|
|
def __init__(self, username, password, registry_endpoint):
|
|
self.username = username
|
|
self.password = password
|
|
self.basic_token = base64.encodestring("%s:%s" % (str(username), str(password)))[0:-1]
|
|
self.registry_endpoint = registry_endpoint.rstrip('/')
|
|
auth = self.pingRegistry("%s/v2/_catalog" % (self.registry_endpoint,))
|
|
if auth is None:
|
|
raise RegistryException("get token realm and service failed")
|
|
self.token_endpoint = auth[0]
|
|
self.service = auth[1]
|
|
|
|
def pingRegistry(self, registry_endpoint):
|
|
""" ping v2 registry and get realm and service """
|
|
headers = dict()
|
|
try:
|
|
res = urllib2.urlopen(registry_endpoint)
|
|
except urllib2.HTTPError as e:
|
|
headers = e.hdrs.dict
|
|
try:
|
|
(realm, service, _) = headers['www-authenticate'].split(',')
|
|
return (realm[14:-1:], service[9:-1])
|
|
except Exception as e:
|
|
return None
|
|
|
|
def getBearerTokenForScope(self, scope):
|
|
""" get bearer token from harbor """
|
|
payload = urllib.urlencode({'service': self.service, 'scope': scope})
|
|
url = "%s?%s" % (self.token_endpoint, payload)
|
|
req = urllib2.Request(url)
|
|
req.add_header('Authorization', 'Basic %s' % (self.basic_token,))
|
|
try:
|
|
response = urllib2.urlopen(req)
|
|
return json.loads(response.read())["token"]
|
|
except Exception as e:
|
|
return None
|
|
|
|
def getRepositoryList(self, n=None):
|
|
""" get repository list """
|
|
scope = "registry:catalog:*"
|
|
bear_token = self.getBearerTokenForScope(scope)
|
|
if bear_token is None:
|
|
return None
|
|
url = "%s/v2/_catalog" % (self.registry_endpoint,)
|
|
if n is not None:
|
|
url = "%s?n=%s" % (url, str(n))
|
|
req = urllib2.Request(url)
|
|
req.add_header('Authorization', r'Bearer %s' % (bear_token,))
|
|
try:
|
|
response = urllib2.urlopen(req)
|
|
return json.loads(response.read())
|
|
except Exception as e:
|
|
return None
|
|
|
|
def getTagList(self, repository):
|
|
""" get tag list for repository """
|
|
scope = "repository:%s:pull" % (repository,)
|
|
bear_token = self.getBearerTokenForScope(scope)
|
|
if bear_token is None:
|
|
return None
|
|
url = "%s/v2/%s/tags/list" % (self.registry_endpoint, repository)
|
|
req = urllib2.Request(url)
|
|
req.add_header('Authorization', r'Bearer %s' % (bear_token,))
|
|
try:
|
|
response = urllib2.urlopen(req)
|
|
return json.loads(response.read())
|
|
except Exception as e:
|
|
return None
|
|
|
|
def getManifest(self, repository, reference="latest", v1=False):
|
|
""" get manifest for tag or digest """
|
|
scope = "repository:%s:pull" % (repository,)
|
|
bear_token = self.getBearerTokenForScope(scope)
|
|
if bear_token is None:
|
|
return None
|
|
url = "%s/v2/%s/manifests/%s" % (self.registry_endpoint, repository, reference)
|
|
req = urllib2.Request(url)
|
|
req.get_method = lambda: 'GET'
|
|
req.add_header('Authorization', r'Bearer %s' % (bear_token,))
|
|
req.add_header('Accept', 'application/vnd.docker.distribution.manifest.v2+json')
|
|
if v1:
|
|
req.add_header('Accept', 'application/vnd.docker.distribution.manifest.v1+json')
|
|
try:
|
|
response = urllib2.urlopen(req)
|
|
return json.loads(response.read())
|
|
except Exception as e:
|
|
return None
|
|
|
|
def existManifest(self, repository, reference, v1=False):
|
|
""" check to see it manifest exist """
|
|
scope = "repository:%s:pull" % (repository,)
|
|
bear_token = self.getBearerTokenForScope(scope)
|
|
if bear_token is None:
|
|
raise RegistryException("manifestExist failed due to token error")
|
|
url = "%s/v2/%s/manifests/%s" % (self.registry_endpoint, repository, reference)
|
|
req = urllib2.Request(url)
|
|
req.get_method = lambda: 'HEAD'
|
|
req.add_header('Authorization', r'Bearer %s' % (bear_token,))
|
|
req.add_header('Accept', 'application/vnd.docker.distribution.manifest.v2+json')
|
|
if v1:
|
|
req.add_header('Accept', 'application/vnd.docker.distribution.manifest.v1+json')
|
|
try:
|
|
response = urllib2.urlopen(req)
|
|
return (True, response.headers.dict["docker-content-digest"])
|
|
except Exception as e:
|
|
return (False, None)
|
|
|
|
def deleteManifest(self, repository, reference):
|
|
""" delete manifest by tag """
|
|
(is_exist, digest) = self.existManifest(repository, reference)
|
|
if not is_exist:
|
|
raise RegistryException("manifest not exist")
|
|
scope = "repository:%s:pull,push" % (repository,)
|
|
bear_token = self.getBearerTokenForScope(scope)
|
|
if bear_token is None:
|
|
raise RegistryException("delete manifest failed due to token error")
|
|
url = "%s/v2/%s/manifests/%s" % (self.registry_endpoint, repository, digest)
|
|
req = urllib2.Request(url)
|
|
req.get_method = lambda: 'DELETE'
|
|
req.add_header('Authorization', r'Bearer %s' % (bear_token,))
|
|
try:
|
|
urllib2.urlopen(req)
|
|
except Exception as e:
|
|
return False
|
|
return True
|
|
|
|
def getManifestWithConf(self, repository, reference="latest"):
|
|
""" get manifest for tag or digest """
|
|
manifest = self.getManifest(repository, reference)
|
|
if manifest is None:
|
|
raise RegistryException("manifest for %s %s not exist" % (repository, reference))
|
|
config_digest = manifest["config"]["digest"]
|
|
scope = "repository:%s:pull" % (repository,)
|
|
bear_token = self.getBearerTokenForScope(scope)
|
|
if bear_token is None:
|
|
return None
|
|
url = "%s/v2/%s/blobs/%s" % (self.registry_endpoint, repository, config_digest)
|
|
req = urllib2.Request(url)
|
|
req.get_method = lambda: 'GET'
|
|
req.add_header('Authorization', r'Bearer %s' % (bear_token,))
|
|
req.add_header('Accept', 'application/vnd.docker.distribution.manifest.v2+json')
|
|
try:
|
|
response = urllib2.urlopen(req)
|
|
manifest["configContent"] = json.loads(response.read())
|
|
return manifest
|
|
except Exception as e:
|
|
return None
|