caching the existence of blobs to avoid duplicate checking

This commit is contained in:
Wenkai Yin 2016-05-18 16:37:11 +08:00
parent e4cb015d34
commit 4ee50bc8b6

View File

@ -43,6 +43,7 @@ const (
StatePushManifest = "push_manifest" StatePushManifest = "push_manifest"
) )
// BaseHandler holds informations shared by other state handlers
type BaseHandler struct { type BaseHandler struct {
project string // project_name project string // project_name
repository string // prject_name/repo_name repository string // prject_name/repo_name
@ -61,9 +62,13 @@ type BaseHandler struct {
manifest distribution.Manifest // manifest of tags[0] manifest distribution.Manifest // manifest of tags[0]
blobs []string // blobs need to be transferred for tags[0] blobs []string // blobs need to be transferred for tags[0]
blobsExistence map[string]bool //key: digest of blob, value: existence
logger *utils.Logger logger *utils.Logger
} }
// InitBaseHandler initializes a BaseHandler: creating clients for source and destination registry,
// listing tags of the repository if parameter tags is nil.
func InitBaseHandler(repository, srcURL, srcSecretKey, func InitBaseHandler(repository, srcURL, srcSecretKey,
dstURL, dstUsr, dstPwd string, tags []string, logger *utils.Logger) (*BaseHandler, error) { dstURL, dstUsr, dstPwd string, tags []string, logger *utils.Logger) (*BaseHandler, error) {
@ -71,14 +76,15 @@ func InitBaseHandler(repository, srcURL, srcSecretKey,
repository, tags, srcURL, dstURL, dstUsr) repository, tags, srcURL, dstURL, dstUsr)
base := &BaseHandler{ base := &BaseHandler{
repository: repository, repository: repository,
tags: tags, tags: tags,
srcURL: srcURL, srcURL: srcURL,
srcSecretKey: srcSecretKey, srcSecretKey: srcSecretKey,
dstURL: dstURL, dstURL: dstURL,
dstUsr: dstUsr, dstUsr: dstUsr,
dstPwd: dstPwd, dstPwd: dstPwd,
logger: logger, blobsExistence: make(map[string]bool, 10),
logger: logger,
} }
base.project = getProjectName(base.repository) base.project = getProjectName(base.repository)
@ -112,6 +118,7 @@ func InitBaseHandler(repository, srcURL, srcSecretKey,
return base, nil return base, nil
} }
// Exit ...
func (b *BaseHandler) Exit() error { func (b *BaseHandler) Exit() error {
return nil return nil
} }
@ -122,11 +129,12 @@ func getProjectName(repository string) string {
return repository[:strings.LastIndex(repository, "/")] return repository[:strings.LastIndex(repository, "/")]
} }
// Checker checks the existence of project and the user's privlege to the project
type Checker struct { type Checker struct {
*BaseHandler *BaseHandler
} }
// check existence of project, if it does not exist, create it, // Enter check existence of project, if it does not exist, create it,
// if it exists, check whether the user has write privilege to it. // if it exists, check whether the user has write privilege to it.
func (c *Checker) Enter() (string, error) { func (c *Checker) Enter() (string, error) {
exist, canWrite, err := c.projectExist() exist, canWrite, err := c.projectExist()
@ -247,10 +255,13 @@ func (c *Checker) createProject() error {
return nil return nil
} }
// ManifestPuller pulls the manifest of a tag. And if no tag needs to be pulled,
// the next state that state machine should enter is "finished".
type ManifestPuller struct { type ManifestPuller struct {
*BaseHandler *BaseHandler
} }
// Enter pulls manifest of a tag and checks if all blobs exist in the destination registry
func (m *ManifestPuller) Enter() (string, error) { func (m *ManifestPuller) Enter() (string, error) {
if len(m.tags) == 0 { if len(m.tags) == 0 {
m.logger.Infof("no tag needs to be replicated, entering finish state") m.logger.Infof("no tag needs to be replicated, entering finish state")
@ -296,11 +307,16 @@ func (m *ManifestPuller) Enter() (string, error) {
m.logger.Infof("all blobs of %s:%s from %s: %v", name, tag, m.srcURL, blobs) m.logger.Infof("all blobs of %s:%s from %s: %v", name, tag, m.srcURL, blobs)
for _, blob := range blobs { for _, blob := range blobs {
exist, err := m.dstClient.BlobExist(blob) exist, ok := m.blobsExistence[blob]
if err != nil { if !ok {
m.logger.Errorf("an error occurred while checking existence of blob %s of %s:%s on %s: %v", blob, name, tag, m.dstURL, err) exist, err = m.dstClient.BlobExist(blob)
return "", err if err != nil {
m.logger.Errorf("an error occurred while checking existence of blob %s of %s:%s on %s: %v", blob, name, tag, m.dstURL, err)
return "", err
}
m.blobsExistence[blob] = exist
} }
if !exist { if !exist {
m.blobs = append(m.blobs, blob) m.blobs = append(m.blobs, blob)
} }
@ -312,10 +328,12 @@ func (m *ManifestPuller) Enter() (string, error) {
return StateTransferBlob, nil return StateTransferBlob, nil
} }
// BlobTransfer transfers blobs of a tag
type BlobTransfer struct { type BlobTransfer struct {
*BaseHandler *BaseHandler
} }
// Enter pulls blobs and then pushs them to destination registry.
func (b *BlobTransfer) Enter() (string, error) { func (b *BlobTransfer) Enter() (string, error) {
name := b.repository name := b.repository
tag := b.tags[0] tag := b.tags[0]
@ -335,10 +353,14 @@ func (b *BlobTransfer) Enter() (string, error) {
return StatePushManifest, nil return StatePushManifest, nil
} }
// ManifestPusher pushs the manifest to destination registry
type ManifestPusher struct { type ManifestPusher struct {
*BaseHandler *BaseHandler
} }
// Enter checks the existence of manifest in the source registry first, and if it
// exists, pushs it to destination registry. The checking operation is to avoid
// the situation that the tag is deleted during the blobs transfering
func (m *ManifestPusher) Enter() (string, error) { func (m *ManifestPusher) Enter() (string, error) {
name := m.repository name := m.repository
tag := m.tags[0] tag := m.tags[0]