Merge remote-tracking branch 'upstream/job-service' into job-service

This commit is contained in:
Tan Jiang 2016-05-19 16:13:03 +08:00
commit 54cd63023b
5 changed files with 67 additions and 51 deletions

View File

@ -1,4 +1,4 @@
package imgout
package replication
type ImgOutParm struct {
Secret string `json:"secret"`

View File

@ -1,4 +1,4 @@
package imgout
package replication
/*
import (

View File

@ -13,7 +13,7 @@
limitations under the License.
*/
package imgout
package replication
import (
"bytes"
@ -43,6 +43,7 @@ const (
StatePushManifest = "push_manifest"
)
// BaseHandler holds informations shared by other state handlers
type BaseHandler struct {
project string // project_name
repository string // prject_name/repo_name
@ -61,13 +62,17 @@ type BaseHandler struct {
manifest distribution.Manifest // manifest of tags[0]
blobs []string // blobs need to be transferred for tags[0]
blobsExistence map[string]bool //key: digest of blob, value: existence
logger *utils.Logger
}
// InitBaseHandler initializes a BaseHandler: creating clients for source and destination registry,
// listing tags of the repository if parameter tags is nil.
func InitBaseHandler(repository, srcURL, srcSecretKey,
dstURL, dstUsr, dstPwd string, tags []string, logger *utils.Logger) (*BaseHandler, error) {
logger.Infof("initializing base handler: repository: %s, tags: %v, source URL: %s, destination URL: %s, destination user: %s",
logger.Infof("initializing: repository: %s, tags: %v, source URL: %s, destination URL: %s, destination user: %s",
repository, tags, srcURL, dstURL, dstUsr)
base := &BaseHandler{
@ -78,6 +83,7 @@ func InitBaseHandler(repository, srcURL, srcSecretKey,
dstURL: dstURL,
dstUsr: dstUsr,
dstPwd: dstPwd,
blobsExistence: make(map[string]bool, 10),
logger: logger,
}
@ -106,12 +112,13 @@ func InitBaseHandler(repository, srcURL, srcSecretKey,
base.tags = tags
}
base.logger.Infof("initialization of base handler completed: project: %s, repository: %s, tags: %v, source URL: %s, destination URL: %s, destination user: %s",
base.logger.Infof("initialization completed: project: %s, repository: %s, tags: %v, source URL: %s, destination URL: %s, destination user: %s",
base.project, base.repository, base.tags, base.srcURL, base.dstURL, base.dstUsr)
return base, nil
}
// Exit ...
func (b *BaseHandler) Exit() error {
return nil
}
@ -122,11 +129,12 @@ func getProjectName(repository string) string {
return repository[:strings.LastIndex(repository, "/")]
}
// Checker checks the existence of project and the user's privlege to the project
type Checker struct {
*BaseHandler
}
// check existence of project, if it does not exist, create it,
// Enter check existence of project, if it does not exist, create it,
// if it exists, check whether the user has write privilege to it.
func (c *Checker) Enter() (string, error) {
exist, canWrite, err := c.projectExist()
@ -247,13 +255,16 @@ func (c *Checker) createProject() error {
return nil
}
// ManifestPuller pulls the manifest of a tag. And if no tag needs to be pulled,
// the next state that state machine should enter is "finished".
type ManifestPuller struct {
*BaseHandler
}
// Enter pulls manifest of a tag and checks if all blobs exist in the destination registry
func (m *ManifestPuller) Enter() (string, error) {
if len(m.tags) == 0 {
m.logger.Infof("no tag needs to be replicated, entering finish state")
m.logger.Infof("no tag needs to be replicated, next state is \"finished\"")
return models.JobFinished, nil
}
@ -296,30 +307,38 @@ func (m *ManifestPuller) Enter() (string, error) {
m.logger.Infof("all blobs of %s:%s from %s: %v", name, tag, m.srcURL, blobs)
for _, blob := range blobs {
exist, err := m.dstClient.BlobExist(blob)
exist, ok := m.blobsExistence[blob]
if !ok {
exist, err = m.dstClient.BlobExist(blob)
if err != nil {
m.logger.Errorf("an error occurred while checking existence of blob %s of %s:%s on %s: %v", blob, name, tag, m.dstURL, err)
return "", err
}
m.blobsExistence[blob] = exist
}
if !exist {
m.blobs = append(m.blobs, blob)
} else {
m.logger.Infof("blob %s of %s:%s already exists in %s", blob, name, tag, m.dstURL)
}
}
m.logger.Infof("blobs of %s:%s need to be transferred to %s: %v", name, tag, m.dstURL, m.blobs)
m.blobs = blobs
return StateTransferBlob, nil
}
// BlobTransfer transfers blobs of a tag
type BlobTransfer struct {
*BaseHandler
}
// Enter pulls blobs and then pushs them to destination registry.
func (b *BlobTransfer) Enter() (string, error) {
name := b.repository
tag := b.tags[0]
for _, blob := range b.blobs {
b.logger.Infof("transferring blob %s of %s:%s to %s ...", blob, name, tag, b.dstURL)
size, data, err := b.srcClient.PullBlob(blob)
if err != nil {
b.logger.Errorf("an error occurred while pulling blob %s of %s:%s from %s: %v", blob, name, tag, b.srcURL, err)
@ -329,16 +348,20 @@ func (b *BlobTransfer) Enter() (string, error) {
b.logger.Errorf("an error occurred while pushing blob %s of %s:%s to %s : %v", blob, name, tag, b.dstURL, err)
return "", err
}
b.logger.Infof("blob %s of %s:%s tranferred to %s completed", blob, name, tag, b.dstURL)
b.logger.Infof("blob %s of %s:%s transferred to %s completed", blob, name, tag, b.dstURL)
}
return StatePushManifest, nil
}
// ManifestPusher pushs the manifest to destination registry
type ManifestPusher struct {
*BaseHandler
}
// Enter checks the existence of manifest in the source registry first, and if it
// exists, pushs it to destination registry. The checking operation is to avoid
// the situation that the tag is deleted during the blobs transfering
func (m *ManifestPusher) Enter() (string, error) {
name := m.repository
tag := m.tags[0]
@ -365,6 +388,8 @@ func (m *ManifestPusher) Enter() (string, error) {
}
m.tags = m.tags[1:]
m.manifest = nil
m.blobs = nil
return StatePullManifest, nil
}

View File

@ -6,7 +6,7 @@ import (
"github.com/vmware/harbor/dao"
"github.com/vmware/harbor/job/config"
"github.com/vmware/harbor/job/imgout"
"github.com/vmware/harbor/job/replication"
"github.com/vmware/harbor/job/utils"
"github.com/vmware/harbor/models"
"github.com/vmware/harbor/utils/log"
@ -224,17 +224,17 @@ func (sm *JobSM) Reset(jid int64) error {
}
func addImgOutTransition(sm *JobSM) error {
base, err := imgout.InitBaseHandler(sm.Parms.Repository, sm.Parms.LocalRegURL, "",
base, err := replication.InitBaseHandler(sm.Parms.Repository, sm.Parms.LocalRegURL, "",
sm.Parms.TargetURL, sm.Parms.TargetUsername, sm.Parms.TargetPassword,
nil, &sm.Logger)
if err != nil {
return err
}
sm.AddTransition(models.JobRunning, imgout.StateCheck, &imgout.Checker{BaseHandler: base})
sm.AddTransition(imgout.StateCheck, imgout.StatePullManifest, &imgout.ManifestPuller{BaseHandler: base})
sm.AddTransition(imgout.StatePullManifest, imgout.StateTransferBlob, &imgout.BlobTransfer{BaseHandler: base})
sm.AddTransition(imgout.StatePullManifest, models.JobFinished, &StatusUpdater{DummyHandler{JobID: sm.JobID}, models.JobFinished})
sm.AddTransition(imgout.StateTransferBlob, imgout.StatePushManifest, &imgout.ManifestPusher{BaseHandler: base})
sm.AddTransition(imgout.StatePushManifest, imgout.StatePullManifest, &imgout.ManifestPuller{BaseHandler: base})
sm.AddTransition(models.JobRunning, replication.StateCheck, &replication.Checker{BaseHandler: base})
sm.AddTransition(replication.StateCheck, replication.StatePullManifest, &replication.ManifestPuller{BaseHandler: base})
sm.AddTransition(replication.StatePullManifest, replication.StateTransferBlob, &replication.BlobTransfer{BaseHandler: base})
sm.AddTransition(replication.StatePullManifest, models.JobFinished, &StatusUpdater{DummyHandler{JobID: sm.JobID}, models.JobFinished})
sm.AddTransition(replication.StateTransferBlob, replication.StatePushManifest, &replication.ManifestPusher{BaseHandler: base})
sm.AddTransition(replication.StatePushManifest, replication.StatePullManifest, &replication.ManifestPuller{BaseHandler: base})
return nil
}

View File

@ -19,6 +19,7 @@ import (
"bytes"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
@ -385,7 +386,7 @@ func (r *Repository) BlobExist(digest string) (bool, error) {
}
// PullBlob ...
func (r *Repository) PullBlob(digest string) (size int64, data []byte, err error) {
func (r *Repository) PullBlob(digest string) (size int64, data io.ReadCloser, err error) {
req, err := http.NewRequest("GET", buildBlobURL(r.Endpoint.String(), r.Name, digest), nil)
if err != nil {
return
@ -401,19 +402,19 @@ func (r *Repository) PullBlob(digest string) (size int64, data []byte, err error
return
}
defer resp.Body.Close()
b, err := ioutil.ReadAll(resp.Body)
if err != nil {
return
}
if resp.StatusCode == http.StatusOK {
contengLength := resp.Header.Get(http.CanonicalHeaderKey("Content-Length"))
size, err = strconv.ParseInt(contengLength, 10, 64)
if err != nil {
return
}
data = b
data = resp.Body
return
}
defer resp.Body.Close()
b, err := ioutil.ReadAll(resp.Body)
if err != nil {
return
}
@ -462,8 +463,8 @@ func (r *Repository) initiateBlobUpload(name string) (location, uploadUUID strin
return
}
func (r *Repository) monolithicBlobUpload(location, digest string, size int64, data []byte) error {
req, err := http.NewRequest("PUT", buildMonolithicBlobUploadURL(location, digest), bytes.NewReader(data))
func (r *Repository) monolithicBlobUpload(location, digest string, size int64, data io.Reader) error {
req, err := http.NewRequest("PUT", buildMonolithicBlobUploadURL(location, digest), data)
if err != nil {
return err
}
@ -496,17 +497,7 @@ func (r *Repository) monolithicBlobUpload(location, digest string, size int64, d
}
// PushBlob ...
func (r *Repository) PushBlob(digest string, size int64, data []byte) error {
exist, err := r.BlobExist(digest)
if err != nil {
return err
}
if exist {
log.Infof("blob already exists, skip pushing: %s %s", r.Name, digest)
return nil
}
func (r *Repository) PushBlob(digest string, size int64, data io.Reader) error {
location, _, err := r.initiateBlobUpload(r.Name)
if err != nil {
return err