harbor/contrib/registryapi/registry.py

166 lines
6.6 KiB
Python

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# bug-report: feilengcui008@gmail.com
""" api for docker registry """
import urllib2
import urllib
import json
import base64
class RegistryException(Exception):
""" registry api related exception """
pass
class RegistryApi(object):
""" interact with docker registry and harbor """
def __init__(self, username, password, registry_endpoint):
self.username = username
self.password = password
self.basic_token = base64.encodestring("%s:%s" % (str(username), str(password)))[0:-1]
self.registry_endpoint = registry_endpoint.rstrip('/')
auth = self.pingRegistry("%s/v2/_catalog" % (self.registry_endpoint,))
if auth is None:
raise RegistryException("get token realm and service failed")
self.token_endpoint = auth[0]
self.service = auth[1]
def pingRegistry(self, registry_endpoint):
""" ping v2 registry and get realm and service """
headers = dict()
try:
res = urllib2.urlopen(registry_endpoint)
except urllib2.HTTPError as e:
headers = e.hdrs.dict
try:
(realm, service, _) = headers['www-authenticate'].split(',')
return (realm[14:-1:], service[9:-1])
except Exception as e:
return None
def getBearerTokenForScope(self, scope):
""" get bearer token from harbor """
payload = urllib.urlencode({'service': self.service, 'scope': scope})
url = "%s?%s" % (self.token_endpoint, payload)
req = urllib2.Request(url)
req.add_header('Authorization', 'Basic %s' % (self.basic_token,))
try:
response = urllib2.urlopen(req)
return json.loads(response.read())["token"]
except Exception as e:
return None
def getRepositoryList(self, n=None):
""" get repository list """
scope = "registry:catalog:*"
bear_token = self.getBearerTokenForScope(scope)
if bear_token is None:
return None
url = "%s/v2/_catalog" % (self.registry_endpoint,)
if n is not None:
url = "%s?n=%s" % (url, str(n))
req = urllib2.Request(url)
req.add_header('Authorization', r'Bearer %s' % (bear_token,))
try:
response = urllib2.urlopen(req)
return json.loads(response.read())
except Exception as e:
return None
def getTagList(self, repository):
""" get tag list for repository """
scope = "repository:%s:pull" % (repository,)
bear_token = self.getBearerTokenForScope(scope)
if bear_token is None:
return None
url = "%s/v2/%s/tags/list" % (self.registry_endpoint, repository)
req = urllib2.Request(url)
req.add_header('Authorization', r'Bearer %s' % (bear_token,))
try:
response = urllib2.urlopen(req)
return json.loads(response.read())
except Exception as e:
return None
def getManifest(self, repository, reference="latest", v1=False):
""" get manifest for tag or digest """
scope = "repository:%s:pull" % (repository,)
bear_token = self.getBearerTokenForScope(scope)
if bear_token is None:
return None
url = "%s/v2/%s/manifests/%s" % (self.registry_endpoint, repository, reference)
req = urllib2.Request(url)
req.get_method = lambda: 'GET'
req.add_header('Authorization', r'Bearer %s' % (bear_token,))
req.add_header('Accept', 'application/vnd.docker.distribution.manifest.v2+json')
if v1:
req.add_header('Accept', 'application/vnd.docker.distribution.manifest.v1+json')
try:
response = urllib2.urlopen(req)
return json.loads(response.read())
except Exception as e:
return None
def existManifest(self, repository, reference, v1=False):
""" check to see it manifest exist """
scope = "repository:%s:pull" % (repository,)
bear_token = self.getBearerTokenForScope(scope)
if bear_token is None:
raise RegistryException("manifestExist failed due to token error")
url = "%s/v2/%s/manifests/%s" % (self.registry_endpoint, repository, reference)
req = urllib2.Request(url)
req.get_method = lambda: 'HEAD'
req.add_header('Authorization', r'Bearer %s' % (bear_token,))
req.add_header('Accept', 'application/vnd.docker.distribution.manifest.v2+json')
if v1:
req.add_header('Accept', 'application/vnd.docker.distribution.manifest.v1+json')
try:
response = urllib2.urlopen(req)
return (True, response.headers.dict["docker-content-digest"])
except Exception as e:
return (False, None)
def deleteManifest(self, repository, reference):
""" delete manifest by tag """
(is_exist, digest) = self.existManifest(repository, reference)
if not is_exist:
raise RegistryException("manifest not exist")
scope = "repository:%s:pull,push" % (repository,)
bear_token = self.getBearerTokenForScope(scope)
if bear_token is None:
raise RegistryException("delete manifest failed due to token error")
url = "%s/v2/%s/manifests/%s" % (self.registry_endpoint, repository, digest)
req = urllib2.Request(url)
req.get_method = lambda: 'DELETE'
req.add_header('Authorization', r'Bearer %s' % (bear_token,))
try:
urllib2.urlopen(req)
except Exception as e:
return False
return True
def getManifestWithConf(self, repository, reference="latest"):
""" get manifest for tag or digest """
manifest = self.getManifest(repository, reference)
if manifest is None:
raise RegistryException("manifest for %s %s not exist" % (repository, reference))
config_digest = manifest["config"]["digest"]
scope = "repository:%s:pull" % (repository,)
bear_token = self.getBearerTokenForScope(scope)
if bear_token is None:
return None
url = "%s/v2/%s/blobs/%s" % (self.registry_endpoint, repository, config_digest)
req = urllib2.Request(url)
req.get_method = lambda: 'GET'
req.add_header('Authorization', r'Bearer %s' % (bear_token,))
req.add_header('Accept', 'application/vnd.docker.distribution.manifest.v2+json')
try:
response = urllib2.urlopen(req)
manifest["configContent"] = json.loads(response.read())
return manifest
except Exception as e:
return None