Don't fail the process when one tag replication failed

Don't fail the process when one tag replication failed

Signed-off-by: Wenkai Yin <yinw@vmware.com>
This commit is contained in:
Wenkai Yin 2019-04-28 17:22:55 +08:00
parent c26f655bce
commit c9c2db7c13
13 changed files with 100 additions and 105 deletions

View File

@ -66,21 +66,13 @@ func newAdapter(registry *model.Registry) (*adapter, error) {
modifiers = append(modifiers, authorizer)
}
// The registry URL and core service URL are different when the adapter
// is created for a local Harbor. If the "registry.CoreURL" is null, the
// registry URL will be used as the coreServiceURL instead
url := registry.URL
if len(registry.CoreURL) > 0 {
url = registry.CoreURL
}
reg, err := adp.NewDefaultImageRegistry(registry)
if err != nil {
return nil, err
}
return &adapter{
registry: registry,
coreServiceURL: url,
coreServiceURL: registry.URL,
client: common_http.NewClient(
&http.Client{
Transport: transport,

View File

@ -16,7 +16,6 @@ package adapter
import (
"errors"
"fmt"
"io"
"net/http"
"strings"
@ -72,15 +71,9 @@ func NewDefaultImageRegistry(registry *model.Registry) (*DefaultImageRegistry, e
registry.Credential.AccessKey,
registry.Credential.AccessSecret)
}
tokenServiceURL := ""
// the registry is a local Harbor instance if the core URL is specified,
// use the internal token service URL instead
if len(registry.CoreURL) > 0 {
tokenServiceURL = fmt.Sprintf("%s/service/token", registry.CoreURL)
}
authorizer = auth.NewStandardTokenAuthorizer(&http.Client{
Transport: util.GetHTTPTransport(registry.Insecure),
}, cred, tokenServiceURL)
}, cred, registry.TokenServiceURL)
}
return NewDefaultImageRegistryWithCustomizedAuthorizer(registry, authorizer)
}

View File

@ -22,7 +22,6 @@ var (
// Configuration holds the configuration information for replication
type Configuration struct {
CoreURL string
RegistryURL string
TokenServiceURL string
JobserviceURL string
SecretKey string

View File

@ -234,7 +234,7 @@ func UpdateExecution(execution *models.Execution, props ...string) (int64, error
func AddTask(task *models.Task) (int64, error) {
o := dao.GetOrmer()
now := time.Now()
task.StartTime = now
task.StartTime = &now
return o.Insert(task)
}

View File

@ -91,6 +91,7 @@ func TestMethodOfExecution(t *testing.T) {
}
func TestMethodOfTask(t *testing.T) {
now := time.Now()
task1 := &models.Task{
ExecutionID: 112200,
ResourceType: "resourceType1",
@ -98,7 +99,7 @@ func TestMethodOfTask(t *testing.T) {
DstResource: "dstResource1",
JobID: "jobID1",
Status: "Initialized",
StartTime: time.Now(),
StartTime: &now,
}
task2 := &models.Task{
ExecutionID: 112200,
@ -107,8 +108,8 @@ func TestMethodOfTask(t *testing.T) {
DstResource: "dstResource2",
JobID: "jobID2",
Status: "Stopped",
StartTime: time.Now(),
EndTime: time.Now(),
StartTime: &now,
EndTime: &now,
}
// test add
@ -143,7 +144,7 @@ func TestMethodOfTask(t *testing.T) {
taskNew := &models.Task{
ID: id1,
Status: "Failed",
EndTime: time.Now(),
EndTime: &now,
}
n, err := UpdateTask(taskNew, models.TaskPropsName.Status, models.TaskPropsName.EndTime)
require.Nil(t, err)
@ -171,6 +172,7 @@ func TestMethodOfTask(t *testing.T) {
}
func TestExecutionFill(t *testing.T) {
now := time.Now()
execution := &models.Execution{
PolicyID: 11209,
Status: "InProgress",
@ -190,8 +192,8 @@ func TestExecutionFill(t *testing.T) {
DstResource: "dstResource1",
JobID: "jobID1",
Status: "Succeed",
StartTime: time.Now(),
EndTime: et1,
StartTime: &now,
EndTime: &et1,
}
task2 := &models.Task{
ID: 20192,
@ -201,8 +203,8 @@ func TestExecutionFill(t *testing.T) {
DstResource: "dstResource2",
JobID: "jobID2",
Status: "Stopped",
StartTime: time.Now(),
EndTime: et2,
StartTime: &now,
EndTime: &et2,
}
AddTask(task1)
AddTask(task2)
@ -224,6 +226,7 @@ func TestExecutionFill(t *testing.T) {
}
func TestExecutionFill2(t *testing.T) {
now := time.Now()
execution := &models.Execution{
PolicyID: 11209,
Status: "InProgress",
@ -241,7 +244,7 @@ func TestExecutionFill2(t *testing.T) {
DstResource: "dstResource1",
JobID: "jobID1",
Status: models.TaskStatusInProgress,
StartTime: time.Now(),
StartTime: &now,
}
task2 := &models.Task{
ID: 20192,
@ -251,8 +254,8 @@ func TestExecutionFill2(t *testing.T) {
DstResource: "dstResource2",
JobID: "jobID2",
Status: "Stopped",
StartTime: time.Now(),
EndTime: time.Now(),
StartTime: &now,
EndTime: &now,
}
taskID1, _ := AddTask(task1)
AddTask(task2)

View File

@ -117,8 +117,8 @@ type Task struct {
Operation string `orm:"column(operation)" json:"operation"`
JobID string `orm:"column(job_id)" json:"job_id"`
Status string `orm:"column(status)" json:"status"`
StartTime time.Time `orm:"column(start_time)" json:"start_time"`
EndTime time.Time `orm:"column(end_time)" json:"end_time"`
StartTime *time.Time `orm:"column(start_time)" json:"start_time"`
EndTime *time.Time `orm:"column(end_time)" json:"end_time,omitempty"`
}
// TableName is required by by beego orm to map Execution to table replication_execution

View File

@ -183,8 +183,8 @@ func GetLocalRegistry() *model.Registry {
return &model.Registry{
Type: model.RegistryTypeHarbor,
Name: "Local",
URL: config.Config.RegistryURL,
CoreURL: config.Config.CoreURL,
URL: config.Config.CoreURL,
TokenServiceURL: config.Config.TokenServiceURL,
Status: "healthy",
Credential: &model.Credential{
Type: model.CredentialTypeSecret,

View File

@ -81,9 +81,9 @@ type Registry struct {
Description string `json:"description"`
Type RegistryType `json:"type"`
URL string `json:"url"`
// CoreURL is only used for local harbor instance to
// avoid the requests passing through the external proxy
CoreURL string `json:"core_url"`
// TokenServiceURL is only used for local harbor instance to
// avoid the requests passing through the external proxy for now
TokenServiceURL string `json:"token_service_url"`
Credential *Credential `json:"credential"`
Insecure bool `json:"insecure"`
Status string `json:"status"`

View File

@ -91,6 +91,7 @@ func TestMethodOfExecutionManager(t *testing.T) {
}
func TestMethodOfTaskManager(t *testing.T) {
now := time.Now()
task := &models.Task{
ExecutionID: 112200,
ResourceType: "resourceType1",
@ -98,7 +99,7 @@ func TestMethodOfTaskManager(t *testing.T) {
DstResource: "dstResource1",
JobID: "jobID1",
Status: "Initialized",
StartTime: time.Now(),
StartTime: &now,
}
defer func() {

View File

@ -292,10 +292,11 @@ func schedule(scheduler scheduler.Scheduler, executionMgr execution.Manager, ite
if err = executionMgr.UpdateTaskStatus(result.TaskID, models.TaskStatusPending, models.TaskStatusInitialized); err != nil {
log.Errorf("failed to update the task status %d: %v", result.TaskID, err)
}
now := time.Now()
if err = executionMgr.UpdateTask(&models.Task{
ID: result.TaskID,
JobID: result.JobID,
StartTime: time.Now(),
StartTime: &now,
}, "JobID", "StartTime"); err != nil {
log.Errorf("failed to update the task %d: %v", result.TaskID, err)
}

View File

@ -192,7 +192,7 @@ func (f *fakedExecutionManager) GetTaskLog(int64) ([]byte, error) {
func TestMain(m *testing.M) {
url := "https://registry.harbor.local"
config.Config = &config.Configuration{
RegistryURL: url,
CoreURL: url,
}
if err := adapter.RegisterFactory(model.RegistryTypeHarbor, fakedAdapterFactory); err != nil {
os.Exit(1)

View File

@ -51,17 +51,12 @@ var (
// Init the global variables and configurations
func Init(closing chan struct{}) error {
// init config
registryURL, err := cfg.RegistryURL()
if err != nil {
return err
}
secretKey, err := cfg.SecretKey()
if err != nil {
return err
}
config.Config = &config.Configuration{
CoreURL: cfg.InternalCoreURL(),
RegistryURL: registryURL,
TokenServiceURL: cfg.InternalTokenServiceEndpoint(),
JobserviceURL: cfg.InternalJobServiceURL(),
SecretKey: secretKey,

View File

@ -134,9 +134,23 @@ func (t *transfer) copy(src *repository, dst *repository, override bool) error {
dstRepo := dst.repository
t.logger.Infof("copying %s:[%s](source registry) to %s:[%s](destination registry)...",
srcRepo, strings.Join(src.tags, ","), dstRepo, strings.Join(dst.tags, ","))
var err error
for i := range src.tags {
srcTag := src.tags[i]
dstTag := dst.tags[i]
if e := t.copyTag(srcRepo, src.tags[i], dstRepo, dst.tags[i], override); e != nil {
t.logger.Errorf(e.Error())
err = e
}
}
if err != nil {
return err
}
t.logger.Infof("copy %s:[%s](source registry) to %s:[%s](destination registry) completed",
srcRepo, strings.Join(src.tags, ","), dstRepo, strings.Join(dst.tags, ","))
return nil
}
func (t *transfer) copyTag(srcRepo, srcTag, dstRepo, dstTag string, override bool) error {
t.logger.Infof("copying %s:%s(source registry) to %s:%s(destination registry)...",
srcRepo, srcTag, dstRepo, dstTag)
// pull the manifest from the source registry
@ -155,13 +169,13 @@ func (t *transfer) copy(src *repository, dst *repository, override bool) error {
if digest == digest2 {
t.logger.Infof("the image %s:%s already exists on the destination registry, skip",
dstRepo, dstTag)
continue
return nil
}
// the same name image exists, but not allowed to override
if !override {
t.logger.Warningf("the same name image %s:%s exists on the destination registry, but the \"override\" is set to false, skip",
dstRepo, dstTag)
continue
return nil
}
// the same name image exists, but allowed to override
t.logger.Warningf("the same name image %s:%s exists on the destination registry and the \"override\" is set to true, continue...",
@ -180,9 +194,6 @@ func (t *transfer) copy(src *repository, dst *repository, override bool) error {
t.logger.Infof("copy %s:%s(source registry) to %s:%s(destination registry) completed",
srcRepo, srcTag, dstRepo, dstTag)
}
t.logger.Infof("copy %s:[%s](source registry) to %s:[%s](destination registry) completed",
srcRepo, strings.Join(src.tags, ","), dstRepo, strings.Join(dst.tags, ","))
return nil
}