From 258b22a9a59c89469d1e52db00272aa552b836ae Mon Sep 17 00:00:00 2001 From: Wenkai Yin Date: Sun, 10 Mar 2019 11:14:23 +0800 Subject: [PATCH 1/6] Fix bug in replication This commit fixes bugs found in the implement of replciation NG Signed-off-by: Wenkai Yin --- src/common/job/const.go | 4 +- src/core/api/replication_execution.go | 5 +- .../job/impl/replication/ng/replication.go | 2 + src/jobservice/runtime/bootstrap.go | 14 +-- src/replication/ng/adapter/harbor/adapter.go | 43 ++++--- src/replication/ng/flow/controller.go | 7 ++ src/replication/ng/flow/flow.go | 109 +++++++++++------- src/replication/ng/replication.go | 16 ++- src/replication/ng/scheduler/scheduler.go | 22 ++-- .../ng/scheduler/scheduler_test.go | 14 +-- 10 files changed, 136 insertions(+), 100 deletions(-) diff --git a/src/common/job/const.go b/src/common/job/const.go index 08876ec68..fa492a477 100644 --- a/src/common/job/const.go +++ b/src/common/job/const.go @@ -13,8 +13,8 @@ const ( ImageReplicate = "IMAGE_REPLICATE" // ImageGC the name of image garbage collection job in job service ImageGC = "IMAGE_GC" - // ImageReplication : the name of image replication job in job service - ImageReplication = "IMAGE_REPLICATION" + // Replication : the name of the replication job in job service + Replication = "REPLICATION" // JobKindGeneric : Kind of generic job JobKindGeneric = "Generic" diff --git a/src/core/api/replication_execution.go b/src/core/api/replication_execution.go index f250ba40e..45af40ef6 100644 --- a/src/core/api/replication_execution.go +++ b/src/core/api/replication_execution.go @@ -166,7 +166,10 @@ func (r *ReplicationOperationAPI) ListTasks() { query := &models.TaskQuery{ ExecutionID: executionID, ResourceType: r.GetString("resource_type"), - Statuses: []string{r.GetString("status")}, + } + status := r.GetString("status") + if len(status) > 0 { + query.Statuses = []string{status} } query.Page, query.Size = r.GetPaginationParams() total, tasks, err := ng.OperationCtl.ListTasks(query) diff --git a/src/jobservice/job/impl/replication/ng/replication.go b/src/jobservice/job/impl/replication/ng/replication.go index ec0ec5a08..da49eb829 100644 --- a/src/jobservice/job/impl/replication/ng/replication.go +++ b/src/jobservice/job/impl/replication/ng/replication.go @@ -26,6 +26,8 @@ import ( _ "github.com/goharbor/harbor/src/replication/ng/transfer/chart" // import repository transfer _ "github.com/goharbor/harbor/src/replication/ng/transfer/repository" + // register the Harbor adapter + _ "github.com/goharbor/harbor/src/replication/ng/adapter/harbor" ) // Replication implements the job interface diff --git a/src/jobservice/runtime/bootstrap.go b/src/jobservice/runtime/bootstrap.go index c4b3ed4cb..79e7c0d04 100644 --- a/src/jobservice/runtime/bootstrap.go +++ b/src/jobservice/runtime/bootstrap.go @@ -206,13 +206,13 @@ func (bs *Bootstrap) loadAndRunRedisWorkerPool(ctx *env.Context, cfg *config.Con } if err := redisWorkerPool.RegisterJobs( map[string]interface{}{ - job.ImageScanJob: (*scan.ClairJob)(nil), - job.ImageScanAllJob: (*scan.All)(nil), - job.ImageTransfer: (*replication.Transfer)(nil), - job.ImageDelete: (*replication.Deleter)(nil), - job.ImageReplicate: (*replication.Replicator)(nil), - job.ImageGC: (*gc.GarbageCollector)(nil), - job.ImageReplication: (*ng.Replication)(nil), + job.ImageScanJob: (*scan.ClairJob)(nil), + job.ImageScanAllJob: (*scan.All)(nil), + job.ImageTransfer: (*replication.Transfer)(nil), + job.ImageDelete: (*replication.Deleter)(nil), + job.ImageReplicate: (*replication.Replicator)(nil), + job.ImageGC: (*gc.GarbageCollector)(nil), + job.Replication: (*ng.Replication)(nil), }); err != nil { // exit return nil, err diff --git a/src/replication/ng/adapter/harbor/adapter.go b/src/replication/ng/adapter/harbor/adapter.go index b394486c4..135d1b574 100644 --- a/src/replication/ng/adapter/harbor/adapter.go +++ b/src/replication/ng/adapter/harbor/adapter.go @@ -17,7 +17,7 @@ package harbor import ( "fmt" "net/http" - "strconv" + // "strconv" common_http "github.com/goharbor/harbor/src/common/http" "github.com/goharbor/harbor/src/common/http/modifier" @@ -96,27 +96,32 @@ func (a *adapter) CreateNamespace(namespace *model.Namespace) error { Name string `json:"project_name"` Metadata map[string]interface{} `json:"metadata"` }{ - Name: namespace.Name, + Name: namespace.Name, + Metadata: namespace.Metadata, } - // handle the public of the project - if meta, exist := namespace.Metadata["public"]; exist { - public := true - // if one of them is "private", the set the public as false - for _, value := range meta.(map[string]interface{}) { - b, err := strconv.ParseBool(value.(string)) - if err != nil { - return err - } - if !b { - public = false - break - } + // TODO + /* + // handle the public of the project + if meta, exist := namespace.Metadata["public"]; exist { + public := true + // if one of them is "private", the set the public as false + for _, value := range meta.(map[string]interface{}) { + b, err := strconv.ParseBool(value.(string)) + if err != nil { + return err + } + if !b { + public = false + break + } + + } + project.Metadata = map[string]interface{}{ + "public": public, + } } - project.Metadata = map[string]interface{}{ - "public": public, - } - } + */ err := a.client.Post(a.coreServiceURL+"/api/projects", project) if httpErr, ok := err.(*common_http.Error); ok && httpErr.Code == http.StatusConflict { diff --git a/src/replication/ng/flow/controller.go b/src/replication/ng/flow/controller.go index dcb35c0b3..49240249b 100644 --- a/src/replication/ng/flow/controller.go +++ b/src/replication/ng/flow/controller.go @@ -77,6 +77,13 @@ func (d *defaultController) StartReplication(policy *model.Policy) (int64, error return id, nil } + // if no resources need to be replicated, mark the execution success and return + if len(flow.srcResources) == 0 { + flow.markExecutionSuccess("no resources need to be replicated") + log.Infof("no resources need to be replicated for the execution %d, skip", id) + return id, nil + } + // create the namespace on the destination registry if err = flow.createNamespace(); err != nil { log.Errorf("failed to create the namespace %s for the execution %d on the destination registry: %v", policy.DestNamespace, id, err) diff --git a/src/replication/ng/flow/flow.go b/src/replication/ng/flow/flow.go index ec43667fb..d45bd852f 100644 --- a/src/replication/ng/flow/flow.go +++ b/src/replication/ng/flow/flow.go @@ -39,7 +39,8 @@ type flow struct { srcAdapter adapter.Adapter dstAdapter adapter.Adapter executionID int64 - resources []*model.Resource + srcResources []*model.Resource + dstResources []*model.Resource executionMgr execution.Manager scheduler scheduler.Scheduler scheduleItems []*scheduler.ScheduleItem @@ -151,7 +152,7 @@ func (f *flow) fetchResources() error { } // TODO consider whether the logic can be refactored by using reflect - resources := []*model.Resource{} + srcResources := []*model.Resource{} for _, typ := range resTypes { if typ == model.ResourceTypeRepository { reg, ok := f.srcAdapter.(adapter.ImageRegistry) @@ -165,12 +166,38 @@ func (f *flow) fetchResources() error { f.markExecutionFailure(err) return err } - resources = append(resources, res...) + srcResources = append(srcResources, res...) continue } // TODO add support for chart } - f.resources = resources + + dstResources := []*model.Resource{} + for _, srcResource := range srcResources { + dstResource := &model.Resource{ + Type: srcResource.Type, + Metadata: &model.ResourceMetadata{ + Name: srcResource.Metadata.Name, + Namespace: srcResource.Metadata.Namespace, + Vtags: srcResource.Metadata.Vtags, + }, + Registry: f.dstRegistry, + ExtendedInfo: srcResource.ExtendedInfo, + Deleted: srcResource.Deleted, + Override: f.policy.Override, + } + // TODO check whether the logic is applied to chart + // if the destination namespace is specified, use the specified one + if len(f.policy.DestNamespace) > 0 { + dstResource.Metadata.Name = strings.Replace(srcResource.Metadata.Name, + srcResource.Metadata.Namespace, f.policy.DestNamespace, 1) + dstResource.Metadata.Namespace = f.policy.DestNamespace + } + dstResources = append(dstResources, dstResource) + } + + f.srcResources = srcResources + f.dstResources = dstResources log.Debugf("resources for the execution %d fetched from the source registry", f.executionID) return nil @@ -201,55 +228,36 @@ func (f *flow) createNamespace() error { // }, // }, // } - metadata := map[string]interface{}{} - for _, srcNamespace := range f.policy.SrcNamespaces { - namespace, err := f.srcAdapter.GetNamespace(srcNamespace) + // TODO merge the metadata of different namespaces + namespaces := []*model.Namespace{} + for i, resource := range f.dstResources { + namespace := &model.Namespace{ + Name: resource.Metadata.Namespace, + } + // get the metadata of the namespace from the source registry + ns, err := f.srcAdapter.GetNamespace(f.srcResources[i].Metadata.Namespace) if err != nil { f.markExecutionFailure(err) return err } - for key, value := range namespace.Metadata { - var m map[string]interface{} - if metadata[key] == nil { - m = map[string]interface{}{} - } else { - m = metadata[key].(map[string]interface{}) - } - m[namespace.Name] = value + namespace.Metadata = ns.Metadata + namespaces = append(namespaces, namespace) + } + + for _, namespace := range namespaces { + if err := f.dstAdapter.CreateNamespace(namespace); err != nil { + f.markExecutionFailure(err) + return err } + + log.Debugf("namespace %s for the execution %d created on the destination registry", namespace.Name, f.executionID) } - if err := f.dstAdapter.CreateNamespace(&model.Namespace{ - Name: f.policy.DestNamespace, - Metadata: metadata, - }); err != nil { - f.markExecutionFailure(err) - return err - } - - log.Debugf("namespace %s for the execution %d created on the destination registry", f.policy.DestNamespace, f.executionID) return nil } func (f *flow) preprocess() error { - dstResources := []*model.Resource{} - for _, srcResource := range f.resources { - dstResource := &model.Resource{ - Type: srcResource.Type, - Metadata: &model.ResourceMetadata{ - Name: srcResource.Metadata.Name, - Namespace: f.policy.DestNamespace, - Vtags: srcResource.Metadata.Vtags, - }, - Registry: f.dstRegistry, - ExtendedInfo: srcResource.ExtendedInfo, - Deleted: srcResource.Deleted, - Override: f.policy.Override, - } - dstResources = append(dstResources, dstResource) - } - - items, err := f.scheduler.Preprocess(f.resources, dstResources) + items, err := f.scheduler.Preprocess(f.srcResources, f.dstResources) if err != nil { f.markExecutionFailure(err) return err @@ -277,6 +285,7 @@ func (f *flow) createTasks() error { f.markExecutionFailure(err) return err } + item.TaskID = id log.Debugf("task record %d for the execution %d created", id, f.executionID) @@ -311,7 +320,7 @@ func (f *flow) schedule() error { ID: result.TaskID, JobID: result.JobID, StartTime: time.Now(), - }); err != nil { + }, "JobID", "StartTime"); err != nil { log.Errorf("failed to update task %d: %v", result.TaskID, err) } log.Debugf("the task %d scheduled", result.TaskID) @@ -344,6 +353,20 @@ func (f *flow) markExecutionFailure(err error) { } } +func (f *flow) markExecutionSuccess(msg string) { + log.Debugf("the execution %d is marked as success", f.executionID) + err := f.executionMgr.Update( + &models.Execution{ + ID: f.executionID, + Status: models.ExecutionStatusSucceed, + StatusText: msg, + EndTime: time.Now(), + }, "Status", "StatusText", "EndTime") + if err != nil { + log.Errorf("failed to update the execution %d: %v", f.executionID, err) + } +} + // return the name with format "res_name" or "res_name:[vtag1,vtag2,vtag3]" // if the resource has vtags func getResourceName(res *model.Resource) string { diff --git a/src/replication/ng/replication.go b/src/replication/ng/replication.go index e5e79ed31..8f12a7577 100644 --- a/src/replication/ng/replication.go +++ b/src/replication/ng/replication.go @@ -19,17 +19,15 @@ package ng import ( "fmt" + "github.com/goharbor/harbor/src/core/config" "github.com/goharbor/harbor/src/replication/ng/execution" - - "github.com/goharbor/harbor/src/replication/ng/policy" - - "github.com/goharbor/harbor/src/replication/ng/scheduler" - - // register the Harbor adapter - _ "github.com/goharbor/harbor/src/replication/ng/adapter/harbor" "github.com/goharbor/harbor/src/replication/ng/flow" "github.com/goharbor/harbor/src/replication/ng/operation" + "github.com/goharbor/harbor/src/replication/ng/policy" "github.com/goharbor/harbor/src/replication/ng/registry" + "github.com/goharbor/harbor/src/replication/ng/scheduler" + // register the Harbor adapter + _ "github.com/goharbor/harbor/src/replication/ng/adapter/harbor" ) var ( @@ -49,8 +47,8 @@ func Init() error { PolicyMgr = policy.NewDefaultManager() // init ExecutionMgr executionMgr := execution.NewDefaultManager() - // TODO init scheduler - var scheduler scheduler.Scheduler + // init scheduler + scheduler := scheduler.NewScheduler(config.InternalJobServiceURL(), config.CoreSecret()) flowCtl, err := flow.NewController(RegistryMgr, executionMgr, scheduler) if err != nil { diff --git a/src/replication/ng/scheduler/scheduler.go b/src/replication/ng/scheduler/scheduler.go index 17a2013af..47a93d83f 100644 --- a/src/replication/ng/scheduler/scheduler.go +++ b/src/replication/ng/scheduler/scheduler.go @@ -27,15 +27,16 @@ import ( "github.com/goharbor/harbor/src/replication/ng/model" ) -// DefaultReplicator provides a default implement for Replicator -type DefaultReplicator struct { +type defaultScheduler struct { client job.Client } -// NewDefaultReplicator returns an instance of DefaultReplicator -func NewDefaultReplicator(client job.Client) *DefaultReplicator { - return &DefaultReplicator{ - client: client, +// TODO use the service account? + +// NewScheduler returns an instance of Scheduler +func NewScheduler(jobserviceURL, secret string) Scheduler { + return &defaultScheduler{ + client: job.NewDefaultClient(jobserviceURL, secret), } } @@ -66,7 +67,7 @@ type Scheduler interface { } // Preprocess the resources and returns the item list that can be scheduled -func (d *DefaultReplicator) Preprocess(srcResources []*model.Resource, destResources []*model.Resource) ([]*ScheduleItem, error) { +func (d *defaultScheduler) Preprocess(srcResources []*model.Resource, destResources []*model.Resource) ([]*ScheduleItem, error) { if len(srcResources) != len(destResources) { err := errors.New("srcResources has different length with destResources") return nil, err @@ -85,7 +86,7 @@ func (d *DefaultReplicator) Preprocess(srcResources []*model.Resource, destResou } // Schedule transfer the tasks to jobs,and then submit these jobs to job service. -func (d *DefaultReplicator) Schedule(items []*ScheduleItem) ([]*ScheduleResult, error) { +func (d *defaultScheduler) Schedule(items []*ScheduleItem) ([]*ScheduleResult, error) { var results []*ScheduleResult for _, item := range items { result := &ScheduleResult{ @@ -103,7 +104,7 @@ func (d *DefaultReplicator) Schedule(items []*ScheduleItem) ([]*ScheduleResult, StatusHook: fmt.Sprintf("%s/service/notifications/jobs/replication/task/%d", config.InternalCoreURL(), item.TaskID), } - job.Name = common_job.ImageTransfer + job.Name = common_job.Replication src, err := json.Marshal(item.SrcResource) if err != nil { result.Error = err @@ -133,8 +134,7 @@ func (d *DefaultReplicator) Schedule(items []*ScheduleItem) ([]*ScheduleResult, } // Stop the transfer job -func (d *DefaultReplicator) Stop(id string) error { - +func (d *defaultScheduler) Stop(id string) error { err := d.client.PostAction(id, opm.CtlCommandStop) if err != nil { return err diff --git a/src/replication/ng/scheduler/scheduler_test.go b/src/replication/ng/scheduler/scheduler_test.go index 1d620757f..1ff75b6e8 100644 --- a/src/replication/ng/scheduler/scheduler_test.go +++ b/src/replication/ng/scheduler/scheduler_test.go @@ -8,10 +8,8 @@ import ( "github.com/goharbor/harbor/src/replication/ng/model" ) -var replicator *DefaultReplicator - -func init() { - replicator = NewDefaultReplicator(TestClient{}) +var scheduler = &defaultScheduler{ + client: TestClient{}, } type TestClient struct { @@ -27,7 +25,7 @@ func (client TestClient) PostAction(uuid, action string) error { return nil } -func TestDefaultReplicator_Preprocess(t *testing.T) { +func TestPreprocess(t *testing.T) { items, err := generateData() if err != nil { t.Error(err) @@ -42,8 +40,8 @@ func TestDefaultReplicator_Preprocess(t *testing.T) { } -func TestDefaultReplicator_Stop(t *testing.T) { - err := replicator.Stop("id") +func TestStop(t *testing.T) { + err := scheduler.Stop("id") if err != nil { t.Error(err) } @@ -70,6 +68,6 @@ func generateData() ([]*ScheduleItem, error) { Credential: &model.Credential{}, }, } - items, err := replicator.Preprocess([]*model.Resource{srcResource}, []*model.Resource{destResource}) + items, err := scheduler.Preprocess([]*model.Resource{srcResource}, []*model.Resource{destResource}) return items, err } From 7ab21db26a072a92e379c266b72b0187d510240f Mon Sep 17 00:00:00 2001 From: Wenkai Yin Date: Tue, 12 Mar 2019 15:42:16 +0800 Subject: [PATCH 2/6] Update the job ID in flow controller Update the job ID for the task if it is scheduled successfully Signed-off-by: Wenkai Yin --- src/replication/ng/flow/flow.go | 3 ++- src/replication/ng/scheduler/scheduler.go | 5 +++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/src/replication/ng/flow/flow.go b/src/replication/ng/flow/flow.go index 529763836..29a5cd121 100644 --- a/src/replication/ng/flow/flow.go +++ b/src/replication/ng/flow/flow.go @@ -246,12 +246,13 @@ func (f *flow) schedule() error { continue } allFailed = false - // if the task is submitted successfully, update the status and start time + // if the task is submitted successfully, update the status, job ID and start time if err = f.executionMgr.UpdateTaskStatus(result.TaskID, model.TaskStatusPending); err != nil { log.Errorf("failed to update task status %d: %v", result.TaskID, err) } if err = f.executionMgr.UpdateTask(&model.Task{ ID: result.TaskID, + JobID: result.JobID, StartTime: time.Now(), }); err != nil { log.Errorf("failed to update task %d: %v", result.TaskID, err) diff --git a/src/replication/ng/scheduler/scheduler.go b/src/replication/ng/scheduler/scheduler.go index d7e51eba7..17a2013af 100644 --- a/src/replication/ng/scheduler/scheduler.go +++ b/src/replication/ng/scheduler/scheduler.go @@ -49,6 +49,7 @@ type ScheduleItem struct { // ScheduleResult is the result of the schedule for one item type ScheduleResult struct { TaskID int64 + JobID string Error error } @@ -119,16 +120,16 @@ func (d *DefaultReplicator) Schedule(items []*ScheduleItem) ([]*ScheduleResult, "src_resource": string(src), "dst_resource": string(dest), } - _, joberr := d.client.SubmitJob(job) + id, joberr := d.client.SubmitJob(job) if joberr != nil { result.Error = joberr results = append(results, result) continue } + result.JobID = id results = append(results, result) } return results, nil - } // Stop the transfer job From 185525e9c854dd8c24fac2ea4c10bb72adcd0b49 Mon Sep 17 00:00:00 2001 From: Wenkai Yin Date: Tue, 12 Mar 2019 19:06:39 +0800 Subject: [PATCH 3/6] Implement a default image registry client Provide a default implemmentation for image registry interface, other adapters can use it directly Signed-off-by: Wenkai Yin --- src/replication/ng/adapter/image_registry.go | 149 ++++++++++++++++-- .../ng/transfer/repository/registry.go | 118 -------------- 2 files changed, 135 insertions(+), 132 deletions(-) delete mode 100644 src/replication/ng/transfer/repository/registry.go diff --git a/src/replication/ng/adapter/image_registry.go b/src/replication/ng/adapter/image_registry.go index 0d8717ef4..1b0ebe00e 100644 --- a/src/replication/ng/adapter/image_registry.go +++ b/src/replication/ng/adapter/image_registry.go @@ -15,12 +15,26 @@ package adapter import ( + "errors" "io" + "net/http" + "strings" + "sync" "github.com/docker/distribution" + "github.com/docker/distribution/manifest/schema1" + "github.com/goharbor/harbor/src/common/http/modifier" + registry_pkg "github.com/goharbor/harbor/src/common/utils/registry" + "github.com/goharbor/harbor/src/common/utils/registry/auth" "github.com/goharbor/harbor/src/replication/ng/model" ) +// const definition +const ( + // TODO: add filter for the agent in registry webhook handler + UserAgentReplicator = "harbor-replicator" +) + // ImageRegistry defines the capabilities that an image registry should have type ImageRegistry interface { FetchImages(namespaces []string, filters []*model.Filter) ([]*model.Resource, error) @@ -32,42 +46,149 @@ type ImageRegistry interface { PushBlob(repository, digest string, size int64, blob io.Reader) error } -// TODO implement the functions - // DefaultImageRegistry provides a default implementation for interface ImageRegistry -type DefaultImageRegistry struct{} +type DefaultImageRegistry struct { + sync.RWMutex + client *http.Client + url string + clients map[string]*registry_pkg.Repository +} + +// TODO: passing the tokenServiceURL + +// NewDefaultImageRegistry returns an instance of DefaultImageRegistry +func NewDefaultImageRegistry(registry *model.Registry, tokenServiceURL ...string) *DefaultImageRegistry { + // use the same HTTP connection pool for all clients + transport := registry_pkg.GetHTTPTransport(registry.Insecure) + modifiers := []modifier.Modifier{ + &auth.UserAgentModifier{ + UserAgent: UserAgentReplicator, + }, + } + if registry.Credential != nil { + cred := auth.NewBasicAuthCredential( + registry.Credential.AccessKey, + registry.Credential.AccessSecret) + authorizer := auth.NewStandardTokenAuthorizer(&http.Client{ + Transport: transport, + }, cred, tokenServiceURL...) + + modifiers = append(modifiers, authorizer) + } + client := &http.Client{ + Transport: registry_pkg.NewTransport(transport, modifiers...), + } + return &DefaultImageRegistry{ + client: client, + clients: map[string]*registry_pkg.Repository{}, + url: registry.URL, + } +} + +func (d *DefaultImageRegistry) getClient(repository string) (*registry_pkg.Repository, error) { + client := d.get(repository) + if client != nil { + return client, nil + } + + return d.create(repository) +} + +func (d *DefaultImageRegistry) get(repository string) *registry_pkg.Repository { + d.RLock() + defer d.RUnlock() + client, exist := d.clients[repository] + if exist { + return client + } + return nil +} + +func (d *DefaultImageRegistry) create(repository string) (*registry_pkg.Repository, error) { + d.Lock() + defer d.Unlock() + // double check + client, exist := d.clients[repository] + if exist { + return client, nil + } + + client, err := registry_pkg.NewRepository(repository, d.url, d.client) + if err != nil { + return nil, err + } + d.clients[repository] = client + return client, nil +} // FetchImages ... func (d *DefaultImageRegistry) FetchImages(namespaces []string, filters []*model.Filter) ([]*model.Resource, error) { - return nil, nil + return nil, errors.New("not implemented") } // ManifestExist ... -func (d *DefaultImageRegistry) ManifestExist(repository, reference string) (exist bool, digest string, err error) { - return false, "", nil +func (d *DefaultImageRegistry) ManifestExist(repository, reference string) (bool, string, error) { + client, err := d.getClient(repository) + if err != nil { + return false, "", err + } + digest, exist, err := client.ManifestExist(reference) + return exist, digest, err } // PullManifest ... -func (d *DefaultImageRegistry) PullManifest(repository, reference string, accepttedMediaTypes []string) (manifest distribution.Manifest, digest string, err error) { - return nil, "", nil +func (d *DefaultImageRegistry) PullManifest(repository, reference string, accepttedMediaTypes []string) (distribution.Manifest, string, error) { + client, err := d.getClient(repository) + if err != nil { + return nil, "", err + } + digest, mediaType, payload, err := client.PullManifest(reference, accepttedMediaTypes) + if err != nil { + return nil, "", err + } + if strings.Contains(mediaType, "application/json") { + mediaType = schema1.MediaTypeManifest + } + manifest, _, err := registry_pkg.UnMarshal(mediaType, payload) + if err != nil { + return nil, "", err + } + return manifest, digest, nil } // PushManifest ... func (d *DefaultImageRegistry) PushManifest(repository, reference, mediaType string, payload []byte) error { - return nil + client, err := d.getClient(repository) + if err != nil { + return err + } + _, err = client.PushManifest(reference, mediaType, payload) + return err } // BlobExist ... -func (d *DefaultImageRegistry) BlobExist(repository, digest string) (exist bool, err error) { - return false, nil +func (d *DefaultImageRegistry) BlobExist(repository, digest string) (bool, error) { + client, err := d.getClient(repository) + if err != nil { + return false, err + } + return client.BlobExist(digest) } // PullBlob ... -func (d *DefaultImageRegistry) PullBlob(repository, digest string) (size int64, blob io.ReadCloser, err error) { - return 0, nil, nil +func (d *DefaultImageRegistry) PullBlob(repository, digest string) (int64, io.ReadCloser, error) { + client, err := d.getClient(repository) + if err != nil { + return 0, nil, err + } + return client.PullBlob(digest) } // PushBlob ... func (d *DefaultImageRegistry) PushBlob(repository, digest string, size int64, blob io.Reader) error { - return nil + client, err := d.getClient(repository) + if err != nil { + return err + } + return client.PushBlob(digest, size, blob) } diff --git a/src/replication/ng/transfer/repository/registry.go b/src/replication/ng/transfer/repository/registry.go deleted file mode 100644 index 31ff25c67..000000000 --- a/src/replication/ng/transfer/repository/registry.go +++ /dev/null @@ -1,118 +0,0 @@ -// Copyright Project Harbor Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package repository - -import ( - "io" - "net/http" - "strings" - - "github.com/goharbor/harbor/src/common/http/modifier" - - "github.com/docker/distribution" - "github.com/docker/distribution/manifest/schema1" - pkg_registry "github.com/goharbor/harbor/src/common/utils/registry" - "github.com/goharbor/harbor/src/common/utils/registry/auth" - "github.com/goharbor/harbor/src/replication/ng/model" -) - -// TODO remove the file - -// const definition -const ( - // TODO: add filter for the agent in registry webhook handler - UserAgentReplicator = "harbor-replicator" -) - -// Registry defines an the interface for registry service -type Registry interface { - ManifestExist(repository, reference string) (exist bool, digest string, err error) - PullManifest(repository, reference string, accepttedMediaTypes []string) (manifest distribution.Manifest, digest string, err error) - PushManifest(repository, reference, mediaType string, payload []byte) error - BlobExist(repository, digest string) (exist bool, err error) - PullBlob(repository, digest string) (size int64, blob io.ReadCloser, err error) - PushBlob(repository, digest string, size int64, blob io.Reader) error -} - -// NewRegistry returns an instance of the default registry implementation -// TODO: passing the tokenServiceURL -func NewRegistry(reg *model.Registry, repository string, - tokenServiceURL ...string) (Registry, error) { - // use the same HTTP connection pool for all clients - transport := pkg_registry.GetHTTPTransport(reg.Insecure) - modifiers := []modifier.Modifier{ - &auth.UserAgentModifier{ - UserAgent: UserAgentReplicator, - }, - } - if reg.Credential != nil { - cred := auth.NewBasicAuthCredential( - reg.Credential.AccessKey, - reg.Credential.AccessSecret) - authorizer := auth.NewStandardTokenAuthorizer(&http.Client{ - Transport: transport, - }, cred, tokenServiceURL...) - - modifiers = append(modifiers, authorizer) - } - - client, err := pkg_registry.NewRepository(repository, reg.URL, - &http.Client{ - Transport: pkg_registry.NewTransport(transport, modifiers...), - }) - if err != nil { - return nil, err - } - - return ®istry{ - client: client, - }, nil -} - -type registry struct { - client *pkg_registry.Repository -} - -func (r *registry) ManifestExist(repository, reference string) (bool, string, error) { - digest, exist, err := r.client.ManifestExist(reference) - return exist, digest, err -} -func (r *registry) PullManifest(repository, reference string, accepttedMediaTypes []string) (distribution.Manifest, string, error) { - digest, mediaType, payload, err := r.client.PullManifest(reference, accepttedMediaTypes) - if err != nil { - return nil, "", err - } - if strings.Contains(mediaType, "application/json") { - mediaType = schema1.MediaTypeManifest - } - manifest, _, err := pkg_registry.UnMarshal(mediaType, payload) - if err != nil { - return nil, "", err - } - return manifest, digest, nil -} -func (r *registry) PushManifest(repository, reference, mediaType string, payload []byte) error { - _, err := r.client.PushManifest(reference, mediaType, payload) - return err -} -func (r *registry) BlobExist(repository, digest string) (bool, error) { - return r.client.BlobExist(digest) -} -func (r *registry) PullBlob(repository, digest string) (size int64, blob io.ReadCloser, err error) { - return r.client.PullBlob(digest) -} -func (r *registry) PushBlob(repository, digest string, size int64, blob io.Reader) error { - return r.client.PushBlob(digest, size, blob) -} From 4efad287ce26e3c7d3e00c9b71ab4d8c5ed5024d Mon Sep 17 00:00:00 2001 From: peimingming Date: Wed, 13 Mar 2019 09:35:01 +0800 Subject: [PATCH 4/6] Add execution and hooks Signed-off-by: peimingming --- .../postgresql/0004_1.8.0_schema.up.sql | 33 +- src/common/models/base.go | 4 - src/core/api/models/execution.go | 15 + src/core/api/replication_execution.go | 16 +- src/core/api/replication_execution_test.go | 24 +- src/core/api/replication_policy_ng.go | 5 +- src/core/router.go | 1 + .../service/notifications/jobs/handler.go | 11 + src/replication/ng/dao/base.go | 13 + src/replication/ng/dao/dao_test.go | 3 +- src/replication/ng/dao/execution.go | 370 ++++++++++++++++++ src/replication/ng/dao/execution_test.go | 237 +++++++++++ src/replication/ng/dao/models/base.go | 19 + src/replication/ng/dao/models/execution.go | 146 +++++++ src/replication/ng/execution/execution.go | 147 ++++++- .../ng/execution/execution_test.go | 149 +++++++ src/replication/ng/flow/controller_test.go | 17 +- src/replication/ng/flow/flow.go | 21 +- src/replication/ng/model/execution.go | 88 ----- src/replication/ng/operation/controller.go | 21 +- .../ng/operation/controller_test.go | 23 +- 21 files changed, 1203 insertions(+), 160 deletions(-) create mode 100644 src/core/api/models/execution.go create mode 100644 src/replication/ng/dao/base.go create mode 100644 src/replication/ng/dao/execution.go create mode 100644 src/replication/ng/dao/execution_test.go create mode 100644 src/replication/ng/dao/models/base.go create mode 100644 src/replication/ng/dao/models/execution.go create mode 100644 src/replication/ng/execution/execution_test.go delete mode 100644 src/replication/ng/model/execution.go diff --git a/make/migrations/postgresql/0004_1.8.0_schema.up.sql b/make/migrations/postgresql/0004_1.8.0_schema.up.sql index bdebac2c5..7aa4407e3 100644 --- a/make/migrations/postgresql/0004_1.8.0_schema.up.sql +++ b/make/migrations/postgresql/0004_1.8.0_schema.up.sql @@ -60,4 +60,35 @@ CREATE TABLE "replication_policy_ng" ( "creation_time" timestamp(6) DEFAULT now(), "update_time" timestamp(6) DEFAULT now(), CONSTRAINT unique_policy_ng_name UNIQUE ("name") -); \ No newline at end of file +); + +create table replication_execution ( + id SERIAL NOT NULL, + policy_id int NOT NULL, + status varchar(32), + status_text varchar(256), + total int NOT NULL DEFAULT 0, + failed int NOT NULL DEFAULT 0, + succeed int NOT NULL DEFAULT 0, + in_progress int NOT NULL DEFAULT 0, + stopped int NOT NULL DEFAULT 0, + trigger varchar(64), + start_time timestamp default CURRENT_TIMESTAMP, + end_time timestamp NULL, + PRIMARY KEY (id) + ); +CREATE INDEX execution_policy ON replication_execution (policy_id); + +create table replication_task ( + id SERIAL NOT NULL, + execution_id int NOT NULL, + resource_type varchar(64), + src_resource varchar(256), + dst_resource varchar(256), + job_id varchar(64), + status varchar(32), + start_time timestamp default CURRENT_TIMESTAMP, + end_time timestamp NULL, + PRIMARY KEY (id) +); +CREATE INDEX task_execution ON replication_task (execution_id); \ No newline at end of file diff --git a/src/common/models/base.go b/src/common/models/base.go index 50b2cc0f2..6bf3e525f 100644 --- a/src/common/models/base.go +++ b/src/common/models/base.go @@ -16,14 +16,10 @@ package models import ( "github.com/astaxie/beego/orm" - - "github.com/goharbor/harbor/src/replication/ng/dao/models" ) func init() { orm.RegisterModel( - new(models.Registry), - new(models.RepPolicy), new(RepPolicy), new(RepJob), new(User), diff --git a/src/core/api/models/execution.go b/src/core/api/models/execution.go new file mode 100644 index 000000000..e5445c27f --- /dev/null +++ b/src/core/api/models/execution.go @@ -0,0 +1,15 @@ +package models + +import ( + "time" +) + +// Execution defines the data model used in API level +type Execution struct { + ID int64 `json:"id"` + Status string `json:"status"` + TriggerMode string `json:"trigger_mode"` + Duration int `json:"duration"` + SuccessRate string `json:"success_rate"` + StartTime time.Time `json:"start_time"` +} diff --git a/src/core/api/replication_execution.go b/src/core/api/replication_execution.go index 1049bf12d..0652fa541 100644 --- a/src/core/api/replication_execution.go +++ b/src/core/api/replication_execution.go @@ -20,7 +20,7 @@ import ( "strconv" "github.com/goharbor/harbor/src/replication/ng" - "github.com/goharbor/harbor/src/replication/ng/model" + "github.com/goharbor/harbor/src/replication/ng/dao/models" ) // ReplicationOperationAPI handles the replication operation requests @@ -73,9 +73,9 @@ func (r *ReplicationOperationAPI) authorized(policy *model.Policy, resource rbac // ListExecutions ... func (r *ReplicationOperationAPI) ListExecutions() { - query := &model.ExecutionQuery{ - Status: r.GetString("status"), - Trigger: r.GetString("trigger"), + query := &models.ExecutionQuery{ + Statuses: []string{r.GetString("status")}, + Trigger: r.GetString("trigger"), } if len(r.GetString("policy_id")) > 0 { policyID, err := r.GetInt64("policy_id") @@ -97,7 +97,7 @@ func (r *ReplicationOperationAPI) ListExecutions() { // CreateExecution starts a replication func (r *ReplicationOperationAPI) CreateExecution() { - execution := &model.Execution{} + execution := &models.Execution{} r.DecodeJSONReq(execution) policy, err := ng.PolicyMgr.Get(execution.PolicyID) if err != nil { @@ -160,10 +160,10 @@ func (r *ReplicationOperationAPI) ListTasks() { return } - query := &model.TaskQuery{ + query := &models.TaskQuery{ ExecutionID: executionID, - ResourceType: (model.ResourceType)(r.GetString("resource_type")), - Status: r.GetString("status"), + ResourceType: r.GetString("resource_type"), + Statuses: []string{r.GetString("status")}, } query.Page, query.Size = r.GetPaginationParams() total, tasks, err := ng.OperationCtl.ListTasks(query) diff --git a/src/core/api/replication_execution_test.go b/src/core/api/replication_execution_test.go index ec9bafdbe..e9ad45e55 100644 --- a/src/core/api/replication_execution_test.go +++ b/src/core/api/replication_execution_test.go @@ -19,6 +19,7 @@ import ( "testing" "github.com/goharbor/harbor/src/replication/ng" + "github.com/goharbor/harbor/src/replication/ng/dao/models" "github.com/goharbor/harbor/src/replication/ng/model" ) @@ -30,40 +31,43 @@ func (f *fakedOperationController) StartReplication(policy *model.Policy) (int64 func (f *fakedOperationController) StopReplication(int64) error { return nil } -func (f *fakedOperationController) ListExecutions(...*model.ExecutionQuery) (int64, []*model.Execution, error) { - return 1, []*model.Execution{ +func (f *fakedOperationController) ListExecutions(...*models.ExecutionQuery) (int64, []*models.Execution, error) { + return 1, []*models.Execution{ { ID: 1, PolicyID: 1, }, }, nil } -func (f *fakedOperationController) GetExecution(id int64) (*model.Execution, error) { +func (f *fakedOperationController) GetExecution(id int64) (*models.Execution, error) { if id == 1 { - return &model.Execution{ + return &models.Execution{ ID: 1, PolicyID: 1, }, nil } return nil, nil } -func (f *fakedOperationController) ListTasks(...*model.TaskQuery) (int64, []*model.Task, error) { - return 1, []*model.Task{ +func (f *fakedOperationController) ListTasks(...*models.TaskQuery) (int64, []*models.Task, error) { + return 1, []*models.Task{ { ID: 1, ExecutionID: 1, }, }, nil } -func (f *fakedOperationController) GetTask(id int64) (*model.Task, error) { +func (f *fakedOperationController) GetTask(id int64) (*models.Task, error) { if id == 1 { - return &model.Task{ + return &models.Task{ ID: 1, ExecutionID: 1, }, nil } return nil, nil } +func (f *fakedOperationController) UpdateTaskStatus(id int64, status string, statusCondition ...string) error { + return nil +} func (f *fakedOperationController) GetTaskLog(int64) ([]byte, error) { return []byte("success"), nil } @@ -174,7 +178,7 @@ func TestCreateExecution(t *testing.T) { request: &testingRequest{ method: http.MethodPost, url: "/api/replication/executions", - bodyJSON: &model.Execution{ + bodyJSON: &models.Execution{ PolicyID: 2, }, credential: sysAdmin, @@ -186,7 +190,7 @@ func TestCreateExecution(t *testing.T) { request: &testingRequest{ method: http.MethodPost, url: "/api/replication/executions", - bodyJSON: &model.Execution{ + bodyJSON: &models.Execution{ PolicyID: 1, }, credential: sysAdmin, diff --git a/src/core/api/replication_policy_ng.go b/src/core/api/replication_policy_ng.go index f7410ada8..8ac6feca1 100644 --- a/src/core/api/replication_policy_ng.go +++ b/src/core/api/replication_policy_ng.go @@ -20,6 +20,7 @@ import ( "strconv" "github.com/goharbor/harbor/src/replication/ng" + "github.com/goharbor/harbor/src/replication/ng/dao/models" "github.com/goharbor/harbor/src/replication/ng/model" ) @@ -192,7 +193,7 @@ func (r *ReplicationPolicyAPI) Delete() { return } - _, executions, err := ng.OperationCtl.ListExecutions(&model.ExecutionQuery{ + _, executions, err := ng.OperationCtl.ListExecutions(&models.ExecutionQuery{ PolicyID: id, }) if err != nil { @@ -201,7 +202,7 @@ func (r *ReplicationPolicyAPI) Delete() { } for _, execution := range executions { - if execution.Status == model.ExecutionStatusInProgress { + if execution.Status == models.ExecutionStatusInProgress { r.HandleStatusPreconditionFailed(fmt.Sprintf("the policy %d has running executions, can not be deleted", id)) return } diff --git a/src/core/router.go b/src/core/router.go index a954ff950..b491845af 100644 --- a/src/core/router.go +++ b/src/core/router.go @@ -131,6 +131,7 @@ func initRouters() { beego.Router("/service/notifications/jobs/scan/:id([0-9]+)", &jobs.Handler{}, "post:HandleScan") beego.Router("/service/notifications/jobs/replication/:id([0-9]+)", &jobs.Handler{}, "post:HandleReplication") beego.Router("/service/notifications/jobs/adminjob/:id([0-9]+)", &admin.Handler{}, "post:HandleAdminJob") + beego.Router("/service/notifications/jobs/replication/task/:id([0-9]+)", &jobs.Handler{}, "post:HandleReplicationTask") beego.Router("/service/token", &token.Handler{}) beego.Router("/api/registries", &api.RegistryAPI{}, "get:List;post:Post") diff --git a/src/core/service/notifications/jobs/handler.go b/src/core/service/notifications/jobs/handler.go index af621bb10..ceb5f5a27 100644 --- a/src/core/service/notifications/jobs/handler.go +++ b/src/core/service/notifications/jobs/handler.go @@ -23,6 +23,7 @@ import ( "github.com/goharbor/harbor/src/common/models" "github.com/goharbor/harbor/src/common/utils/log" "github.com/goharbor/harbor/src/core/api" + "github.com/goharbor/harbor/src/replication/ng" ) var statusMap = map[string]string{ @@ -87,3 +88,13 @@ func (h *Handler) HandleReplication() { return } } + +// HandleReplicationTask handles the webhook of replication task +func (h *Handler) HandleReplicationTask() { + log.Debugf("received replication task status update event: task-%d, status-%s", h.id, h.status) + if err := ng.OperationCtl.UpdateTaskStatus(h.id, h.status); err != nil { + log.Errorf("Failed to update replication task status, id: %d, status: %s", h.id, h.status) + h.HandleInternalServerError(err.Error()) + return + } +} diff --git a/src/replication/ng/dao/base.go b/src/replication/ng/dao/base.go new file mode 100644 index 000000000..29a567d8b --- /dev/null +++ b/src/replication/ng/dao/base.go @@ -0,0 +1,13 @@ +package dao + +import "github.com/astaxie/beego/orm" + +func paginateForQuerySetter(qs orm.QuerySeter, page, size int64) orm.QuerySeter { + if size > 0 { + qs = qs.Limit(size) + if page > 0 { + qs = qs.Offset((page - 1) * size) + } + } + return qs +} diff --git a/src/replication/ng/dao/dao_test.go b/src/replication/ng/dao/dao_test.go index d27c52f37..2f36ac1f6 100644 --- a/src/replication/ng/dao/dao_test.go +++ b/src/replication/ng/dao/dao_test.go @@ -32,7 +32,8 @@ func TestMain(m *testing.M) { "harbor_label", "harbor_resource_label", "harbor_user", "img_scan_job", "img_scan_overview", "job_log", "project", "project_member", "project_metadata", "properties", "registry", "replication_immediate_trigger", "replication_job", "replication_policy", "replication_policy_ng", - "replication_target", "repository", "robot", "role", "schema_migrations", "user_group";`, + "replication_target", "repository", "robot", "role", "schema_migrations", "user_group", + "replication_execution", "replication_task";`, `DROP FUNCTION "update_update_time_at_column"();`, } dao.PrepareTestData(clearSqls, nil) diff --git a/src/replication/ng/dao/execution.go b/src/replication/ng/dao/execution.go new file mode 100644 index 000000000..8a0e7ef2e --- /dev/null +++ b/src/replication/ng/dao/execution.go @@ -0,0 +1,370 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dao + +import ( + "time" + + "fmt" + "github.com/astaxie/beego/orm" + "github.com/goharbor/harbor/src/common/dao" + "github.com/goharbor/harbor/src/common/utils/log" + "github.com/goharbor/harbor/src/replication/ng/dao/models" +) + +// AddExecution ... +func AddExecution(execution *models.Execution) (int64, error) { + o := dao.GetOrmer() + now := time.Now() + execution.StartTime = now + + return o.Insert(execution) +} + +// GetTotalOfExecutions returns the total count of replication execution +func GetTotalOfExecutions(query ...*models.ExecutionQuery) (int64, error) { + qs := executionQueryConditions(query...) + return qs.Count() +} + +// GetExecutions ... +func GetExecutions(query ...*models.ExecutionQuery) ([]*models.Execution, error) { + executions := []*models.Execution{} + + qs := executionQueryConditions(query...) + if len(query) > 0 && query[0] != nil { + qs = paginateForQuerySetter(qs, query[0].Page, query[0].Size) + } + + qs = qs.OrderBy("-StartTime") + + _, err := qs.All(&executions) + return executions, err +} + +func executionQueryConditions(query ...*models.ExecutionQuery) orm.QuerySeter { + qs := dao.GetOrmer().QueryTable(new(models.Execution)) + if len(query) == 0 || query[0] == nil { + return qs + } + + q := query[0] + if q.PolicyID != 0 { + qs = qs.Filter("PolicyID", q.PolicyID) + } + if len(q.Trigger) > 0 { + qs = qs.Filter("Trigger", q.Trigger) + } + if len(q.Statuses) > 0 { + qs = qs.Filter("Status__in", q.Statuses) + } + return qs +} + +// GetExecution ... +func GetExecution(id int64) (*models.Execution, error) { + o := dao.GetOrmer() + t := models.Execution{ID: id} + err := o.Read(&t) + if err == orm.ErrNoRows { + return nil, nil + } + return &t, err +} + +// DeleteExecution ... +func DeleteExecution(id int64) error { + o := dao.GetOrmer() + _, err := o.Delete(&models.Execution{ID: id}) + return err +} + +// DeleteAllExecutions ... +func DeleteAllExecutions(policyID int64) error { + o := dao.GetOrmer() + _, err := o.Delete(&models.Execution{PolicyID: policyID}, "PolicyID") + return err +} + +// UpdateExecution ... +func UpdateExecution(execution *models.Execution, props ...string) (int64, error) { + if execution.ID == 0 { + return 0, fmt.Errorf("execution ID is empty") + } + o := dao.GetOrmer() + return o.Update(execution, props...) +} + +// AddTask ... +func AddTask(task *models.Task) (int64, error) { + o := dao.GetOrmer() + sql := `insert into replication_task (execution_id, resource_type, src_resource, dst_resource, job_id, status) + values (?, ?, ?, ?, ?, ?) RETURNING id` + + args := []interface{}{} + args = append(args, task.ExecutionID, task.ResourceType, task.SrcResource, task.DstResource, task.JobID, task.Status) + + var taskID int64 + err := o.Raw(sql, args).QueryRow(&taskID) + if err != nil { + return 0, err + } + + return taskID, nil +} + +// GetTask ... +func GetTask(id int64) (*models.Task, error) { + o := dao.GetOrmer() + sql := `select * from replication_task where id = ?` + + var task models.Task + + if err := o.Raw(sql, id).QueryRow(&task); err != nil { + if err == orm.ErrNoRows { + return nil, nil + } + return nil, err + } + + return &task, nil +} + +// GetTotalOfTasks ... +func GetTotalOfTasks(query ...*models.TaskQuery) (int64, error) { + qs := taskQueryConditions(query...) + return qs.Count() +} + +// GetTasks ... +func GetTasks(query ...*models.TaskQuery) ([]*models.Task, error) { + tasks := []*models.Task{} + + qs := taskQueryConditions(query...) + if len(query) > 0 && query[0] != nil { + qs = paginateForQuerySetter(qs, query[0].Page, query[0].Size) + } + + qs = qs.OrderBy("-StartTime") + + _, err := qs.All(&tasks) + return tasks, err +} + +func taskQueryConditions(query ...*models.TaskQuery) orm.QuerySeter { + qs := dao.GetOrmer().QueryTable(new(models.Task)) + if len(query) == 0 || query[0] == nil { + return qs + } + + q := query[0] + if q.ExecutionID != 0 { + qs = qs.Filter("ExecutionID", q.ExecutionID) + } + if len(q.JobID) > 0 { + qs = qs.Filter("JobID", q.JobID) + } + if len(q.ResourceType) > 0 { + qs = qs.Filter("ResourceType", q.ResourceType) + } + if len(q.Statuses) > 0 { + qs = qs.Filter("Status__in", q.Statuses) + } + return qs +} + +// DeleteTask ... +func DeleteTask(id int64) error { + o := dao.GetOrmer() + _, err := o.Delete(&models.Task{ID: id}) + return err +} + +// DeleteAllTasks ... +func DeleteAllTasks(executionID int64) error { + o := dao.GetOrmer() + _, err := o.Delete(&models.Task{ExecutionID: executionID}, "ExecutionID") + return err +} + +// UpdateTask ... +func UpdateTask(task *models.Task, props ...string) (int64, error) { + if task.ID == 0 { + return 0, fmt.Errorf("task ID is empty") + } + o := dao.GetOrmer() + return o.Update(task, props...) +} + +// UpdateTaskStatus ... +func UpdateTaskStatus(id int64, status string, statusCondition ...string) (int64, error) { + // can not use the globalOrm + o := orm.NewOrm() + o.Begin() + + // query the task status + var task models.Task + sql := `select * from replication_task where id = ?` + if err := o.Raw(sql, id).QueryRow(&task); err != nil { + if err == orm.ErrNoRows { + o.Rollback() + return 0, err + } + } + + // check status + satisfy := false + if len(statusCondition) == 0 { + satisfy = true + } else { + for _, stCondition := range statusCondition { + if task.Status == stCondition { + satisfy = true + break + } + } + } + if !satisfy { + o.Rollback() + return 0, fmt.Errorf("Status condition not match ") + } + + // update status + params := []interface{}{} + sql = `update replication_task set status = ?` + params = append(params, status) + if taskFinished(status) { // should update endTime + sql += ` ,end_time = ?` + params = append(params, time.Now()) + } + sql += ` where id = ?` + params = append(params, id) + _, err := o.Raw(sql, params).Exec() + log.Infof("Update task %d: %s -> %s", id, task.Status, status) + if err != nil { + log.Errorf("Update task failed %d: %s -> %s", id, task.Status, status) + o.Rollback() + return 0, err + } + + // query the execution + var execution models.Execution + sql = `select * from replication_execution where id = ?` + if err := o.Raw(sql, task.ExecutionID).QueryRow(&execution); err != nil { + if err == orm.ErrNoRows { + log.Errorf("Execution not found id: %d", task.ExecutionID) + o.Rollback() + return 0, err + } + } + // check execution data + execuStatus, _ := getStatus(task.Status) + count := getStatusCount(&execution, execuStatus) + if count <= 0 { + log.Errorf("Task statistics in execution inconsistent") + o.Commit() + return 1, nil + } + + // update execution data + updateStatusCount(&execution, execuStatus, -1) + execuStatusUp, _ := getStatus(status) + updateStatusCount(&execution, execuStatusUp, 1) + + resetExecutionStatus(&execution) + _, err = o.Update(&execution, models.ExecutionPropsName.Status, models.ExecutionPropsName.Total, models.ExecutionPropsName.InProgress, + models.ExecutionPropsName.Failed, models.ExecutionPropsName.Succeed, models.ExecutionPropsName.Stopped, + models.ExecutionPropsName.EndTime) + if err != nil { + log.Errorf("Update execution status failed %d: %v", execution.ID, err) + o.Rollback() + return 0, err + } + o.Commit() + return 1, nil +} + +func taskFinished(status string) bool { + if status == models.TaskStatusFailed || status == models.TaskStatusStopped || status == models.TaskStatusSucceed { + return true + } + return false +} + +func getStatus(status string) (string, error) { + switch status { + case models.TaskStatusInitialized, models.TaskStatusPending, models.TaskStatusInProgress: + return models.ExecutionStatusInProgress, nil + case models.TaskStatusSucceed: + return models.ExecutionStatusSucceed, nil + case models.TaskStatusStopped: + return models.ExecutionStatusStopped, nil + case models.TaskStatusFailed: + return models.ExecutionStatusFailed, nil + } + return "", fmt.Errorf("Not support task status ") +} + +func getStatusCount(execution *models.Execution, status string) int { + switch status { + case models.ExecutionStatusInProgress: + return execution.InProgress + case models.ExecutionStatusSucceed: + return execution.Succeed + case models.ExecutionStatusStopped: + return execution.Stopped + case models.ExecutionStatusFailed: + return execution.Failed + } + return 0 +} + +func updateStatusCount(execution *models.Execution, status string, delta int) error { + switch status { + case models.ExecutionStatusInProgress: + execution.InProgress += delta + case models.ExecutionStatusSucceed: + execution.Succeed += delta + case models.ExecutionStatusStopped: + execution.Stopped += delta + case models.ExecutionStatusFailed: + execution.Failed += delta + } + return nil +} + +func resetExecutionStatus(execution *models.Execution) error { + status := generateStatus(execution) + if status != execution.Status { + execution.Status = status + log.Debugf("Execution status changed %d: %s -> %s", execution.ID, execution.Status, status) + } + if n := getStatusCount(execution, models.ExecutionStatusInProgress); n == 0 { + // execution finished in this time + execution.EndTime = time.Now() + } + return nil +} + +func generateStatus(execution *models.Execution) string { + if execution.InProgress > 0 { + return models.ExecutionStatusInProgress + } else if execution.Failed > 0 { + return models.ExecutionStatusFailed + } else if execution.Stopped > 0 { + return models.ExecutionStatusStopped + } + return models.ExecutionStatusSucceed +} diff --git a/src/replication/ng/dao/execution_test.go b/src/replication/ng/dao/execution_test.go new file mode 100644 index 000000000..a9adec642 --- /dev/null +++ b/src/replication/ng/dao/execution_test.go @@ -0,0 +1,237 @@ +package dao + +import ( + "github.com/goharbor/harbor/src/replication/ng/dao/models" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "testing" + "time" +) + +func TestMethodOfExecution(t *testing.T) { + execution1 := &models.Execution{ + PolicyID: 11209, + Status: "InProgress", + StatusText: "None", + Total: 12, + Failed: 0, + Succeed: 7, + InProgress: 5, + Stopped: 0, + Trigger: "Event", + StartTime: time.Now(), + } + execution2 := &models.Execution{ + PolicyID: 11209, + Status: "Failed", + StatusText: "Network error", + Total: 9, + Failed: 1, + Succeed: 8, + InProgress: 0, + Stopped: 0, + Trigger: "Manual", + StartTime: time.Now(), + } + + // test add + id1, err := AddExecution(execution1) + require.Nil(t, err) + + _, err = AddExecution(execution2) + require.Nil(t, err) + + // test list + query := &models.ExecutionQuery{ + Statuses: []string{"InProgress", "Failed"}, + Pagination: models.Pagination{ + Page: 1, + Size: 10, + }, + } + executions, err := GetExecutions(query) + require.Nil(t, err) + assert.Equal(t, 2, len(executions)) + + total, err := GetTotalOfExecutions(query) + require.Nil(t, err) + assert.Equal(t, int64(2), total) + + // test get + execution, err := GetExecution(id1) + require.Nil(t, err) + assert.Equal(t, execution1.Status, execution.Status) + + // test update + executionNew := &models.Execution{ + ID: id1, + Status: "Succeed", + Succeed: 12, + InProgress: 0, + EndTime: time.Now(), + } + n, err := UpdateExecution(executionNew, models.ExecutionPropsName.Status, models.ExecutionPropsName.Succeed, models.ExecutionPropsName.InProgress, + models.ExecutionPropsName.EndTime) + require.Nil(t, err) + assert.Equal(t, int64(1), n) + + // test delete + require.Nil(t, DeleteExecution(execution1.ID)) + execution, err = GetExecution(execution1.ID) + require.Nil(t, err) + require.Nil(t, execution) + + // test delete all + require.Nil(t, DeleteAllExecutions(execution1.PolicyID)) + query = &models.ExecutionQuery{} + n, err = GetTotalOfExecutions(query) + require.Nil(t, err) + assert.Equal(t, int64(0), n) +} + +func TestMethodOfTask(t *testing.T) { + task1 := &models.Task{ + ExecutionID: 112200, + ResourceType: "resourceType1", + SrcResource: "srcResource1", + DstResource: "dstResource1", + JobID: "jobID1", + Status: "Initialized", + StartTime: time.Now(), + } + task2 := &models.Task{ + ExecutionID: 112200, + ResourceType: "resourceType2", + SrcResource: "srcResource2", + DstResource: "dstResource2", + JobID: "jobID2", + Status: "Stopped", + StartTime: time.Now(), + EndTime: time.Now(), + } + + // test add + id1, err := AddTask(task1) + require.Nil(t, err) + + _, err = AddTask(task2) + require.Nil(t, err) + + // test list + query := &models.TaskQuery{ + ResourceType: "resourceType1", + Pagination: models.Pagination{ + Page: 1, + Size: 10, + }, + } + tasks, err := GetTasks(query) + require.Nil(t, err) + assert.Equal(t, 1, len(tasks)) + + total, err := GetTotalOfTasks(query) + require.Nil(t, err) + assert.Equal(t, int64(1), total) + + // test get + task, err := GetTask(id1) + require.Nil(t, err) + assert.Equal(t, task1.Status, task.Status) + + // test update + taskNew := &models.Task{ + ID: id1, + Status: "Failed", + EndTime: time.Now(), + } + n, err := UpdateTask(taskNew, models.TaskPropsName.Status, models.TaskPropsName.EndTime) + require.Nil(t, err) + assert.Equal(t, int64(1), n) + + // test delete + require.Nil(t, DeleteTask(id1)) + task, err = GetTask(id1) + require.Nil(t, err) + require.Nil(t, task) + + // test delete all + require.Nil(t, DeleteAllTasks(task1.ExecutionID)) + query = &models.TaskQuery{} + n, err = GetTotalOfTasks(query) + require.Nil(t, err) + assert.Equal(t, int64(0), n) +} + +func TestUpdateJobStatus(t *testing.T) { + execution := &models.Execution{ + PolicyID: 11209, + Status: "InProgress", + StatusText: "None", + Total: 12, + Failed: 0, + Succeed: 10, + InProgress: 1, + Stopped: 1, + Trigger: "Event", + StartTime: time.Now(), + } + executionID, _ := AddExecution(execution) + task1 := &models.Task{ + ID: 20191, + ExecutionID: executionID, + ResourceType: "resourceType1", + SrcResource: "srcResource1", + DstResource: "dstResource1", + JobID: "jobID1", + Status: "Pending", + StartTime: time.Now(), + } + task2 := &models.Task{ + ID: 20192, + ExecutionID: executionID, + ResourceType: "resourceType2", + SrcResource: "srcResource2", + DstResource: "dstResource2", + JobID: "jobID2", + Status: "Stopped", + StartTime: time.Now(), + EndTime: time.Now(), + } + taskID1, _ := AddTask(task1) + taskID2, _ := AddTask(task2) + + defer func() { + DeleteAllTasks(executionID) + DeleteAllExecutions(11209) + }() + + // update Pending->InProgress + n, err := UpdateTaskStatus(taskID1, "InProgress", "Pending") + require.Nil(t, err) + assert.Equal(t, int64(1), n) + + execu, err := GetExecution(executionID) + require.Nil(t, err) + assert.Equal(t, execution.InProgress, execu.InProgress) + assert.Equal(t, execution.Status, execu.Status) + + // update InProgress->Failed: Execution.InProgress-1, Failed+1 + n, err = UpdateTaskStatus(taskID1, "Failed") + require.Nil(t, err) + assert.Equal(t, int64(1), n) + + execu, err = GetExecution(executionID) + require.Nil(t, err) + assert.Equal(t, 1, execu.Failed) + assert.Equal(t, "Failed", execu.Status) + + // update Stopped->Pending: Execution.Stopped-1, InProgress+1 + n, err = UpdateTaskStatus(taskID2, "Pending") + require.Nil(t, err) + assert.Equal(t, int64(1), n) + + execu, err = GetExecution(executionID) + require.Nil(t, err) + assert.Equal(t, 1, execu.InProgress) + assert.Equal(t, "InProgress", execu.Status) +} diff --git a/src/replication/ng/dao/models/base.go b/src/replication/ng/dao/models/base.go new file mode 100644 index 000000000..da1221e2f --- /dev/null +++ b/src/replication/ng/dao/models/base.go @@ -0,0 +1,19 @@ +package models + +import ( + "github.com/astaxie/beego/orm" +) + +func init() { + orm.RegisterModel( + new(Registry), + new(RepPolicy), + new(Execution), + new(Task)) +} + +// Pagination ... +type Pagination struct { + Page int64 + Size int64 +} diff --git a/src/replication/ng/dao/models/execution.go b/src/replication/ng/dao/models/execution.go new file mode 100644 index 000000000..d73c3e21e --- /dev/null +++ b/src/replication/ng/dao/models/execution.go @@ -0,0 +1,146 @@ +package models + +import ( + "time" +) + +const ( + // ExecutionTable is the table name for replication executions + ExecutionTable = "replication_execution" + // TaskTable is table name for replication tasks + TaskTable = "replication_task" +) + +// execution/task status/trigger const +const ( + ExecutionStatusFailed string = "Failed" + ExecutionStatusSucceed string = "Succeed" + ExecutionStatusStopped string = "Stopped" + ExecutionStatusInProgress string = "InProgress" + + ExecutionTriggerManual string = "Manual" + ExecutionTriggerEvent string = "Event" + ExecutionTriggerSchedule string = "Schedule" + + // The task has been persisted in db but not submitted to Jobservice + TaskStatusInitialized string = "Initialized" + TaskStatusPending string = "Pending" + TaskStatusInProgress string = "InProgress" + TaskStatusSucceed string = "Succeed" + TaskStatusFailed string = "Failed" + TaskStatusStopped string = "Stopped" +) + +// ExecutionPropsName defines the names of fields of Execution +var ExecutionPropsName = ExecutionFieldsName{ + ID: "ID", + PolicyID: "PolicyID", + Status: "Status", + StatusText: "StatusText", + Total: "Total", + Failed: "Failed", + Succeed: "Succeed", + InProgress: "InProgress", + Stopped: "Stopped", + Trigger: "Trigger", + StartTime: "StartTime", + EndTime: "EndTime", +} + +// ExecutionFieldsName defines the props of Execution +type ExecutionFieldsName struct { + ID string + PolicyID string + Status string + StatusText string + Total string + Failed string + Succeed string + InProgress string + Stopped string + Trigger string + StartTime string + EndTime string +} + +// Execution holds information about once replication execution. +type Execution struct { + ID int64 `orm:"pk;auto;column(id)" json:"id"` + PolicyID int64 `orm:"column(policy_id)" json:"policy_id"` + Status string `orm:"column(status)" json:"status"` + StatusText string `orm:"column(status_text)" json:"status_text"` + Total int `orm:"column(total)" json:"total"` + Failed int `orm:"column(failed)" json:"failed"` + Succeed int `orm:"column(succeed)" json:"succeed"` + InProgress int `orm:"column(in_progress)" json:"in_progress"` + Stopped int `orm:"column(stopped)" json:"stopped"` + Trigger string `orm:"column(trigger)" json:"trigger"` + StartTime time.Time `orm:"column(start_time)" json:"start_time"` + EndTime time.Time `orm:"column(end_time)" json:"end_time"` +} + +// TaskPropsName defines the names of fields of Task +var TaskPropsName = TaskFieldsName{ + ID: "ID", + ExecutionID: "ExecutionID", + ResourceType: "ResourceType", + SrcResource: "SrcResource", + DstResource: "DstResource", + JobID: "JobID", + Status: "Status", + StartTime: "StartTime", + EndTime: "EndTime", +} + +// TaskFieldsName defines the props of Task +type TaskFieldsName struct { + ID string + ExecutionID string + ResourceType string + SrcResource string + DstResource string + JobID string + Status string + StartTime string + EndTime string +} + +// Task represent the tasks in one execution. +type Task struct { + ID int64 `orm:"pk;auto;column(id)" json:"id"` + ExecutionID int64 `orm:"column(execution_id)" json:"execution_id"` + ResourceType string `orm:"column(resource_type)" json:"resource_type"` + SrcResource string `orm:"column(src_resource)" json:"src_resource"` + DstResource string `orm:"column(dst_resource)" json:"dst_resource"` + JobID string `orm:"column(job_id)" json:"job_id"` + Status string `orm:"column(status)" json:"status"` + StartTime time.Time `orm:"column(start_time)" json:"start_time"` + EndTime time.Time `orm:"column(end_time)" json:"end_time"` +} + +// TableName is required by by beego orm to map Execution to table replication_execution +func (r *Execution) TableName() string { + return ExecutionTable +} + +// TableName is required by by beego orm to map Task to table replication_task +func (r *Task) TableName() string { + return TaskTable +} + +// ExecutionQuery holds the query conditions for replication executions +type ExecutionQuery struct { + PolicyID int64 + Statuses []string + Trigger string + Pagination +} + +// TaskQuery holds the query conditions for replication task +type TaskQuery struct { + ExecutionID int64 + JobID string + Statuses []string + ResourceType string + Pagination +} diff --git a/src/replication/ng/execution/execution.go b/src/replication/ng/execution/execution.go index 1b1c5a62a..e47a97251 100644 --- a/src/replication/ng/execution/execution.go +++ b/src/replication/ng/execution/execution.go @@ -15,34 +15,37 @@ package execution import ( - "github.com/goharbor/harbor/src/replication/ng/model" + "fmt" + "github.com/goharbor/harbor/src/core/utils" + "github.com/goharbor/harbor/src/replication/ng/dao" + "github.com/goharbor/harbor/src/replication/ng/dao/models" ) // Manager manages the executions type Manager interface { // Create a new execution - Create(*model.Execution) (int64, error) + Create(*models.Execution) (int64, error) // List the summaries of executions - List(...*model.ExecutionQuery) (int64, []*model.Execution, error) + List(...*models.ExecutionQuery) (int64, []*models.Execution, error) // Get the specified execution - Get(int64) (*model.Execution, error) + Get(int64) (*models.Execution, error) // Update the data of the specified execution, the "props" are the // properties of execution that need to be updated - Update(execution *model.Execution, props ...string) error + Update(execution *models.Execution, props ...string) error // Remove the execution specified by the ID Remove(int64) error // Remove all executions of one policy specified by the policy ID RemoveAll(int64) error // Create a task - CreateTask(*model.Task) (int64, error) + CreateTask(*models.Task) (int64, error) // List the tasks according to the query - ListTasks(...*model.TaskQuery) (int64, []*model.Task, error) + ListTasks(...*models.TaskQuery) (int64, []*models.Task, error) // Get one specified task - GetTask(int64) (*model.Task, error) + GetTask(int64) (*models.Task, error) // Update the task, the "props" are the properties of task // that need to be updated, it cannot include "status". If // you want to update the status, use "UpdateTaskStatus" instead - UpdateTask(task *model.Task, props ...string) error + UpdateTask(task *models.Task, props ...string) error // UpdateTaskStatus only updates the task status. If "statusCondition" // presents, only the tasks whose status equal to "statusCondition" // will be updated @@ -54,3 +57,129 @@ type Manager interface { // Get the log of one specific task GetTaskLog(int64) ([]byte, error) } + +// DefaultManager .. +type DefaultManager struct { +} + +// NewDefaultManager ... +func NewDefaultManager() (Manager, error) { + return &DefaultManager{}, nil +} + +// Create a new execution +func (dm *DefaultManager) Create(execution *models.Execution) (int64, error) { + return dao.AddExecution(execution) +} + +// List the summaries of executions +func (dm *DefaultManager) List(queries ...*models.ExecutionQuery) (int64, []*models.Execution, error) { + total, err := dao.GetTotalOfExecutions(queries...) + if err != nil { + return 0, nil, err + } + + executions, err := dao.GetExecutions(queries...) + if err != nil { + return 0, nil, err + } + return total, executions, nil +} + +// Get the specified execution +func (dm *DefaultManager) Get(id int64) (*models.Execution, error) { + return dao.GetExecution(id) +} + +// Update ... +func (dm *DefaultManager) Update(execution *models.Execution, props ...string) error { + n, err := dao.UpdateExecution(execution, props...) + if err != nil { + return err + } + if n == 0 { + return fmt.Errorf("Execution not found error: %d ", execution.ID) + } + return nil +} + +// Remove the execution specified by the ID +func (dm *DefaultManager) Remove(id int64) error { + return dao.DeleteExecution(id) +} + +// RemoveAll executions of one policy specified by the policy ID +func (dm *DefaultManager) RemoveAll(policyID int64) error { + return dao.DeleteAllExecutions(policyID) +} + +// CreateTask used to create a task +func (dm *DefaultManager) CreateTask(task *models.Task) (int64, error) { + return dao.AddTask(task) +} + +// ListTasks list the tasks according to the query +func (dm *DefaultManager) ListTasks(queries ...*models.TaskQuery) (int64, []*models.Task, error) { + total, err := dao.GetTotalOfTasks(queries...) + if err != nil { + return 0, nil, err + } + + tasks, err := dao.GetTasks(queries...) + if err != nil { + return 0, nil, err + } + return total, tasks, nil +} + +// GetTask get one specified task +func (dm *DefaultManager) GetTask(id int64) (*models.Task, error) { + return dao.GetTask(id) +} + +// UpdateTask ... +func (dm *DefaultManager) UpdateTask(task *models.Task, props ...string) error { + n, err := dao.UpdateTask(task, props...) + if err != nil { + return err + } + if n == 0 { + return fmt.Errorf("Task not found error: %d ", task.ID) + } + return nil +} + +// UpdateTaskStatus ... +func (dm *DefaultManager) UpdateTaskStatus(taskID int64, status string, statusCondition ...string) error { + n, err := dao.UpdateTaskStatus(taskID, status, statusCondition...) + if err != nil { + return err + } + if n == 0 { + return fmt.Errorf("Update task status failed %d: -> %s ", taskID, status) + } + return nil +} + +// RemoveTask remove one task specified by task ID +func (dm *DefaultManager) RemoveTask(id int64) error { + return dao.DeleteTask(id) +} + +// RemoveAllTasks of one execution specified by the execution ID +func (dm *DefaultManager) RemoveAllTasks(executionID int64) error { + return dao.DeleteAllTasks(executionID) +} + +// GetTaskLog get the log of one specific task +func (dm *DefaultManager) GetTaskLog(taskID int64) ([]byte, error) { + task, err := dao.GetTask(taskID) + if err != nil { + return nil, err + } + if task == nil { + return nil, fmt.Errorf("Task not found %d ", taskID) + } + + return utils.GetJobServiceClient().GetJobLog(task.JobID) +} diff --git a/src/replication/ng/execution/execution_test.go b/src/replication/ng/execution/execution_test.go new file mode 100644 index 000000000..6a6370438 --- /dev/null +++ b/src/replication/ng/execution/execution_test.go @@ -0,0 +1,149 @@ +package execution + +import ( + "github.com/goharbor/harbor/src/common/dao" + "github.com/goharbor/harbor/src/common/utils/log" + "github.com/goharbor/harbor/src/replication/ng/dao/models" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "os" + "testing" + "time" +) + +var executionManager, _ = NewDefaultManager() + +func TestMain(m *testing.M) { + databases := []string{"postgresql"} + for _, database := range databases { + log.Infof("run test cases for database: %s", database) + result := 1 + switch database { + case "postgresql": + dao.PrepareTestForPostgresSQL() + default: + log.Fatalf("invalid database: %s", database) + } + + result = m.Run() + + if result != 0 { + os.Exit(result) + } + } + +} + +func TestMethodOfExecutionManager(t *testing.T) { + execution := &models.Execution{ + PolicyID: 11209, + Status: "InProgress", + StatusText: "None", + Total: 12, + Failed: 0, + Succeed: 7, + InProgress: 5, + Stopped: 0, + Trigger: "Event", + StartTime: time.Now(), + } + + defer func() { + executionManager.RemoveAll(execution.PolicyID) + }() + + // Create + id, err := executionManager.Create(execution) + require.Nil(t, err) + + // List + query := &models.ExecutionQuery{ + Statuses: []string{"InProgress", "Failed"}, + Pagination: models.Pagination{ + Page: 1, + Size: 10, + }, + } + count, executions, err := executionManager.List(query) + require.Nil(t, err) + assert.Equal(t, int64(1), count) + assert.Equal(t, 1, len(executions)) + + // Get + _, err = executionManager.Get(id) + require.Nil(t, err) + + // Update + executionNew := &models.Execution{ + ID: id, + Status: "Failed", + Succeed: 12, + InProgress: 0, + EndTime: time.Now(), + } + err = executionManager.Update(executionNew, models.ExecutionPropsName.Status, models.ExecutionPropsName.Succeed, models.ExecutionPropsName.InProgress, + models.ExecutionPropsName.EndTime) + require.Nil(t, err) + + // Remove + require.Nil(t, executionManager.Remove(id)) +} + +func TestMethodOfTaskManager(t *testing.T) { + task := &models.Task{ + ExecutionID: 112200, + ResourceType: "resourceType1", + SrcResource: "srcResource1", + DstResource: "dstResource1", + JobID: "jobID1", + Status: "Initialized", + StartTime: time.Now(), + } + + defer func() { + executionManager.RemoveAllTasks(task.ExecutionID) + }() + + // CreateTask + id, err := executionManager.CreateTask(task) + require.Nil(t, err) + + // ListTasks + query := &models.TaskQuery{ + ResourceType: "resourceType1", + Pagination: models.Pagination{ + Page: 1, + Size: 10, + }, + } + count, tasks, err := executionManager.ListTasks(query) + require.Nil(t, err) + assert.Equal(t, 1, len(tasks)) + assert.Equal(t, int64(1), count) + + // GetTask + _, err = executionManager.GetTask(id) + require.Nil(t, err) + + // UpdateTask + taskNew := &models.Task{ + ID: id, + SrcResource: "srcResourceChanged", + } + err = executionManager.UpdateTask(taskNew, models.TaskPropsName.SrcResource) + require.Nil(t, err) + taskUpdate, _ := executionManager.GetTask(id) + assert.Equal(t, taskNew.SrcResource, taskUpdate.SrcResource) + + // UpdateTaskStatus + err = executionManager.UpdateTaskStatus(id, models.TaskStatusSucceed) + require.NotNil(t, err) + taskUpdate, _ = executionManager.GetTask(id) + assert.Equal(t, models.TaskStatusInitialized, taskUpdate.Status) + + // Remove + require.Nil(t, executionManager.RemoveTask(id)) + + // RemoveAll + require.Nil(t, executionManager.RemoveAll(id)) +} diff --git a/src/replication/ng/flow/controller_test.go b/src/replication/ng/flow/controller_test.go index 89c4ea779..278c7953a 100644 --- a/src/replication/ng/flow/controller_test.go +++ b/src/replication/ng/flow/controller_test.go @@ -20,6 +20,7 @@ import ( "github.com/docker/distribution" "github.com/goharbor/harbor/src/replication/ng/adapter" + "github.com/goharbor/harbor/src/replication/ng/dao/models" "github.com/goharbor/harbor/src/replication/ng/model" "github.com/goharbor/harbor/src/replication/ng/scheduler" "github.com/stretchr/testify/assert" @@ -85,16 +86,16 @@ func (f *fakedRegistryManager) HealthCheck() error { type fakedExecutionManager struct{} -func (f *fakedExecutionManager) Create(*model.Execution) (int64, error) { +func (f *fakedExecutionManager) Create(*models.Execution) (int64, error) { return 1, nil } -func (f *fakedExecutionManager) List(...*model.ExecutionQuery) (int64, []*model.Execution, error) { +func (f *fakedExecutionManager) List(...*models.ExecutionQuery) (int64, []*models.Execution, error) { return 0, nil, nil } -func (f *fakedExecutionManager) Get(int64) (*model.Execution, error) { +func (f *fakedExecutionManager) Get(int64) (*models.Execution, error) { return nil, nil } -func (f *fakedExecutionManager) Update(*model.Execution, ...string) error { +func (f *fakedExecutionManager) Update(*models.Execution, ...string) error { return nil } func (f *fakedExecutionManager) Remove(int64) error { @@ -103,16 +104,16 @@ func (f *fakedExecutionManager) Remove(int64) error { func (f *fakedExecutionManager) RemoveAll(int64) error { return nil } -func (f *fakedExecutionManager) CreateTask(*model.Task) (int64, error) { +func (f *fakedExecutionManager) CreateTask(*models.Task) (int64, error) { return 1, nil } -func (f *fakedExecutionManager) ListTasks(...*model.TaskQuery) (int64, []*model.Task, error) { +func (f *fakedExecutionManager) ListTasks(...*models.TaskQuery) (int64, []*models.Task, error) { return 0, nil, nil } -func (f *fakedExecutionManager) GetTask(int64) (*model.Task, error) { +func (f *fakedExecutionManager) GetTask(int64) (*models.Task, error) { return nil, nil } -func (f *fakedExecutionManager) UpdateTask(*model.Task, ...string) error { +func (f *fakedExecutionManager) UpdateTask(*models.Task, ...string) error { return nil } func (f *fakedExecutionManager) UpdateTaskStatus(int64, string, ...string) error { diff --git a/src/replication/ng/flow/flow.go b/src/replication/ng/flow/flow.go index 29a5cd121..8e46c7a02 100644 --- a/src/replication/ng/flow/flow.go +++ b/src/replication/ng/flow/flow.go @@ -26,6 +26,7 @@ import ( "github.com/goharbor/harbor/src/common/utils/log" "github.com/goharbor/harbor/src/replication/ng/adapter" + "github.com/goharbor/harbor/src/replication/ng/dao/models" "github.com/goharbor/harbor/src/replication/ng/model" "github.com/goharbor/harbor/src/replication/ng/registry" ) @@ -98,9 +99,9 @@ func newFlow(policy *model.Policy, registryMgr registry.Manager, } func (f *flow) createExecution() (int64, error) { - id, err := f.executionMgr.Create(&model.Execution{ + id, err := f.executionMgr.Create(&models.Execution{ PolicyID: f.policy.ID, - Status: model.ExecutionStatusInProgress, + Status: models.ExecutionStatusInProgress, StartTime: time.Now(), }) f.executionID = id @@ -205,10 +206,10 @@ func (f *flow) preprocess() error { func (f *flow) createTasks() error { for _, item := range f.scheduleItems { - task := &model.Task{ + task := &models.Task{ ExecutionID: f.executionID, - Status: model.TaskStatusInitialized, - ResourceType: item.SrcResource.Type, + Status: models.TaskStatusInitialized, + ResourceType: string(item.SrcResource.Type), SrcResource: getResourceName(item.SrcResource), DstResource: getResourceName(item.DstResource), } @@ -240,17 +241,17 @@ func (f *flow) schedule() error { // task as failure if result.Error != nil { log.Errorf("failed to schedule task %d: %v", result.TaskID, err) - if err = f.executionMgr.UpdateTaskStatus(result.TaskID, model.TaskStatusFailed); err != nil { + if err = f.executionMgr.UpdateTaskStatus(result.TaskID, models.TaskStatusFailed); err != nil { log.Errorf("failed to update task status %d: %v", result.TaskID, err) } continue } allFailed = false // if the task is submitted successfully, update the status, job ID and start time - if err = f.executionMgr.UpdateTaskStatus(result.TaskID, model.TaskStatusPending); err != nil { + if err = f.executionMgr.UpdateTaskStatus(result.TaskID, models.TaskStatusPending); err != nil { log.Errorf("failed to update task status %d: %v", result.TaskID, err) } - if err = f.executionMgr.UpdateTask(&model.Task{ + if err = f.executionMgr.UpdateTask(&models.Task{ ID: result.TaskID, JobID: result.JobID, StartTime: time.Now(), @@ -276,9 +277,9 @@ func (f *flow) markExecutionFailure(err error) { log.Errorf("the execution %d is marked as failure because of the error: %s", f.executionID, statusText) err = f.executionMgr.Update( - &model.Execution{ + &models.Execution{ ID: f.executionID, - Status: model.ExecutionStatusFailed, + Status: models.ExecutionStatusFailed, StatusText: statusText, EndTime: time.Now(), }) diff --git a/src/replication/ng/model/execution.go b/src/replication/ng/model/execution.go deleted file mode 100644 index 105671aff..000000000 --- a/src/replication/ng/model/execution.go +++ /dev/null @@ -1,88 +0,0 @@ -// Copyright Project Harbor Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package model - -import ( - "time" - - "github.com/goharbor/harbor/src/common/models" -) - -// execution/task status/trigger const -const ( - ExecutionStatusFailed string = "Failed" - ExecutionStatusSucceed string = "Succeed" - ExecutionStatusStopped string = "Stopped" - ExecutionStatusInProgress string = "InProgress" - - ExecutionTriggerManual string = "Manual" - ExecutionTriggerEvent string = "Event" - ExecutionTriggerSchedule string = "Schedule" - - // The task has been persisted in db but not submitted to Jobservice - TaskStatusInitialized string = "Initialized" - TaskStatusPending string = "Pending" - TaskStatusInProgress string = "InProgress" - TaskStatusSucceed string = "Succeed" - TaskStatusFailed string = "Failed" - TaskStatusStopped string = "Stopped" -) - -// Execution defines an execution of the replication -type Execution struct { - ID int64 `json:"id"` - PolicyID int64 `json:"policy_id"` - Status string `json:"status"` - StatusText string `json:"status_text"` - Trigger string `json:"trigger"` - Total int `json:"total"` - Failed int `json:"failed"` - Succeed int `json:"succeed"` - Pending int `json:"pending"` - InProgress int `json:"in_progress"` - Stopped int `json:"stopped"` - Initialized int `json:"initialized"` - StartTime time.Time `json:"start_time"` - EndTime time.Time `json:"end_time"` -} - -// Task holds the information of one replication task -type Task struct { - ID int64 `json:"id"` - ExecutionID int64 `json:"execution_id"` - ResourceType ResourceType `json:"resource_type"` - SrcResource string `json:"src_resource"` - DstResource string `json:"dst_resource"` - JobID string `json:"job_id"` - Status string `json:"status"` - StartTime time.Time `json:"start_time"` - EndTime time.Time `json:"end_time"` -} - -// ExecutionQuery defines the query conditions for listing executions -type ExecutionQuery struct { - PolicyID int64 - Status string - Trigger string - models.Pagination -} - -// TaskQuery defines the query conditions for listing tasks -type TaskQuery struct { - ExecutionID int64 - ResourceType ResourceType - Status string - models.Pagination -} diff --git a/src/replication/ng/operation/controller.go b/src/replication/ng/operation/controller.go index 0a000f0fc..3ad2e12b2 100644 --- a/src/replication/ng/operation/controller.go +++ b/src/replication/ng/operation/controller.go @@ -15,6 +15,7 @@ package operation import ( + "github.com/goharbor/harbor/src/replication/ng/dao/models" "github.com/goharbor/harbor/src/replication/ng/execution" "github.com/goharbor/harbor/src/replication/ng/flow" "github.com/goharbor/harbor/src/replication/ng/model" @@ -25,10 +26,11 @@ import ( type Controller interface { StartReplication(policy *model.Policy) (int64, error) StopReplication(int64) error - ListExecutions(...*model.ExecutionQuery) (int64, []*model.Execution, error) - GetExecution(int64) (*model.Execution, error) - ListTasks(...*model.TaskQuery) (int64, []*model.Task, error) - GetTask(int64) (*model.Task, error) + ListExecutions(...*models.ExecutionQuery) (int64, []*models.Execution, error) + GetExecution(int64) (*models.Execution, error) + ListTasks(...*models.TaskQuery) (int64, []*models.Task, error) + GetTask(int64) (*models.Task, error) + UpdateTaskStatus(id int64, status string, statusCondition ...string) error GetTaskLog(int64) ([]byte, error) } @@ -51,18 +53,21 @@ func (d *defaultController) StartReplication(policy *model.Policy) (int64, error func (d *defaultController) StopReplication(executionID int64) error { return d.flowCtl.StopReplication(executionID) } -func (d *defaultController) ListExecutions(query ...*model.ExecutionQuery) (int64, []*model.Execution, error) { +func (d *defaultController) ListExecutions(query ...*models.ExecutionQuery) (int64, []*models.Execution, error) { return d.executionMgr.List(query...) } -func (d *defaultController) GetExecution(executionID int64) (*model.Execution, error) { +func (d *defaultController) GetExecution(executionID int64) (*models.Execution, error) { return d.executionMgr.Get(executionID) } -func (d *defaultController) ListTasks(query ...*model.TaskQuery) (int64, []*model.Task, error) { +func (d *defaultController) ListTasks(query ...*models.TaskQuery) (int64, []*models.Task, error) { return d.executionMgr.ListTasks(query...) } -func (d *defaultController) GetTask(id int64) (*model.Task, error) { +func (d *defaultController) GetTask(id int64) (*models.Task, error) { return d.executionMgr.GetTask(id) } +func (d *defaultController) UpdateTaskStatus(id int64, status string, statusCondition ...string) error { + return d.executionMgr.UpdateTaskStatus(id, status, statusCondition...) +} func (d *defaultController) GetTaskLog(taskID int64) ([]byte, error) { return d.executionMgr.GetTaskLog(taskID) } diff --git a/src/replication/ng/operation/controller_test.go b/src/replication/ng/operation/controller_test.go index a68e2bda6..73f48e517 100644 --- a/src/replication/ng/operation/controller_test.go +++ b/src/replication/ng/operation/controller_test.go @@ -17,6 +17,7 @@ package operation import ( "testing" + "github.com/goharbor/harbor/src/replication/ng/dao/models" "github.com/goharbor/harbor/src/replication/ng/model" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -33,22 +34,22 @@ func (f *fakedFlowController) StopReplication(int64) error { type fakedExecutionManager struct{} -func (f *fakedExecutionManager) Create(*model.Execution) (int64, error) { +func (f *fakedExecutionManager) Create(*models.Execution) (int64, error) { return 1, nil } -func (f *fakedExecutionManager) List(...*model.ExecutionQuery) (int64, []*model.Execution, error) { - return 1, []*model.Execution{ +func (f *fakedExecutionManager) List(...*models.ExecutionQuery) (int64, []*models.Execution, error) { + return 1, []*models.Execution{ { ID: 1, }, }, nil } -func (f *fakedExecutionManager) Get(int64) (*model.Execution, error) { - return &model.Execution{ +func (f *fakedExecutionManager) Get(int64) (*models.Execution, error) { + return &models.Execution{ ID: 1, }, nil } -func (f *fakedExecutionManager) Update(*model.Execution, ...string) error { +func (f *fakedExecutionManager) Update(*models.Execution, ...string) error { return nil } func (f *fakedExecutionManager) Remove(int64) error { @@ -57,20 +58,20 @@ func (f *fakedExecutionManager) Remove(int64) error { func (f *fakedExecutionManager) RemoveAll(int64) error { return nil } -func (f *fakedExecutionManager) CreateTask(*model.Task) (int64, error) { +func (f *fakedExecutionManager) CreateTask(*models.Task) (int64, error) { return 1, nil } -func (f *fakedExecutionManager) ListTasks(...*model.TaskQuery) (int64, []*model.Task, error) { - return 1, []*model.Task{ +func (f *fakedExecutionManager) ListTasks(...*models.TaskQuery) (int64, []*models.Task, error) { + return 1, []*models.Task{ { ID: 1, }, }, nil } -func (f *fakedExecutionManager) GetTask(int64) (*model.Task, error) { +func (f *fakedExecutionManager) GetTask(int64) (*models.Task, error) { return nil, nil } -func (f *fakedExecutionManager) UpdateTask(*model.Task, ...string) error { +func (f *fakedExecutionManager) UpdateTask(*models.Task, ...string) error { return nil } func (f *fakedExecutionManager) UpdateTaskStatus(int64, string, ...string) error { From cabef73980e4089b8cda6bc35cc669a18b83f271 Mon Sep 17 00:00:00 2001 From: Wenkai Yin Date: Tue, 12 Mar 2019 19:18:59 +0800 Subject: [PATCH 5/6] Add Harbor adapter for replication Implement the replication adapter for Harbor registry Signed-off-by: Wenkai Yin --- .../utils/registry/auth/tokenauthorizer.go | 2 +- src/core/api/replication_adapter_test.go | 8 +- src/core/api/replication_execution.go | 7 +- src/replication/ng/adapter/harbor/adapter.go | 188 ++++++++++++++++-- src/replication/ng/execution/execution.go | 5 +- .../ng/execution/execution_test.go | 9 +- src/replication/ng/flow/controller_test.go | 2 + src/replication/ng/flow/flow.go | 104 +++++++--- src/replication/ng/replication.go | 5 +- 9 files changed, 275 insertions(+), 55 deletions(-) diff --git a/src/common/utils/registry/auth/tokenauthorizer.go b/src/common/utils/registry/auth/tokenauthorizer.go index ccb4cca49..66959e2e5 100644 --- a/src/common/utils/registry/auth/tokenauthorizer.go +++ b/src/common/utils/registry/auth/tokenauthorizer.go @@ -278,7 +278,7 @@ func NewStandardTokenAuthorizer(client *http.Client, credential Credential, // 1. performance issue // 2. the realm field returned by registry is an IP which can not reachable // inside Harbor - if len(customizedTokenService) > 0 { + if len(customizedTokenService) > 0 && len(customizedTokenService[0]) > 0 { generator.realm = customizedTokenService[0] } diff --git a/src/core/api/replication_adapter_test.go b/src/core/api/replication_adapter_test.go index 3a9217479..e405db0a1 100644 --- a/src/core/api/replication_adapter_test.go +++ b/src/core/api/replication_adapter_test.go @@ -63,7 +63,7 @@ func fakedFactory(*model.Registry) (adapter.Adapter, error) { func TestReplicationAdapterAPIGet(t *testing.T) { err := adapter.RegisterFactory( &adapter.Info{ - Type: "harbor", + Type: "test", SupportedResourceTypes: []model.ResourceType{"image"}, }, fakedFactory) require.Nil(t, err) @@ -73,7 +73,7 @@ func TestReplicationAdapterAPIGet(t *testing.T) { { request: &testingRequest{ method: http.MethodGet, - url: "/api/replication/adapters/harbor", + url: "/api/replication/adapters/test", }, code: http.StatusUnauthorized, }, @@ -81,7 +81,7 @@ func TestReplicationAdapterAPIGet(t *testing.T) { { request: &testingRequest{ method: http.MethodGet, - url: "/api/replication/adapters/harbor", + url: "/api/replication/adapters/test", credential: nonSysAdmin, }, code: http.StatusForbidden, @@ -99,7 +99,7 @@ func TestReplicationAdapterAPIGet(t *testing.T) { { request: &testingRequest{ method: http.MethodGet, - url: "/api/replication/adapters/harbor", + url: "/api/replication/adapters/test", credential: sysAdmin, }, code: http.StatusOK, diff --git a/src/core/api/replication_execution.go b/src/core/api/replication_execution.go index 0652fa541..f250ba40e 100644 --- a/src/core/api/replication_execution.go +++ b/src/core/api/replication_execution.go @@ -74,8 +74,11 @@ func (r *ReplicationOperationAPI) authorized(policy *model.Policy, resource rbac // ListExecutions ... func (r *ReplicationOperationAPI) ListExecutions() { query := &models.ExecutionQuery{ - Statuses: []string{r.GetString("status")}, - Trigger: r.GetString("trigger"), + Trigger: r.GetString("trigger"), + } + + if len(r.GetString("status")) > 0 { + query.Statuses = []string{r.GetString("status")} } if len(r.GetString("policy_id")) > 0 { policyID, err := r.GetInt64("policy_id") diff --git a/src/replication/ng/adapter/harbor/adapter.go b/src/replication/ng/adapter/harbor/adapter.go index a61393415..b394486c4 100644 --- a/src/replication/ng/adapter/harbor/adapter.go +++ b/src/replication/ng/adapter/harbor/adapter.go @@ -15,45 +15,203 @@ package harbor import ( + "fmt" + "net/http" + "strconv" + + common_http "github.com/goharbor/harbor/src/common/http" + "github.com/goharbor/harbor/src/common/http/modifier" "github.com/goharbor/harbor/src/common/utils/log" + registry_pkg "github.com/goharbor/harbor/src/common/utils/registry" + "github.com/goharbor/harbor/src/common/utils/registry/auth" adp "github.com/goharbor/harbor/src/replication/ng/adapter" "github.com/goharbor/harbor/src/replication/ng/model" ) -const ( - harbor model.RegistryType = "Harbor" -) - func init() { // TODO add more information to the info info := &adp.Info{ - Type: harbor, + Type: model.RegistryTypeHarbor, SupportedResourceTypes: []model.ResourceType{model.ResourceTypeRepository}, } + // TODO passing coreServiceURL and tokenServiceURL + coreServiceURL := "http://core:8080" + tokenServiceURL := "" if err := adp.RegisterFactory(info, func(registry *model.Registry) (adp.Adapter, error) { - return newAdapter(registry), nil + return newAdapter(registry, coreServiceURL, tokenServiceURL), nil }); err != nil { - log.Errorf("failed to register factory for %s: %v", harbor, err) + log.Errorf("failed to register factory for %s: %v", model.RegistryTypeHarbor, err) return } - log.Infof("the factory for adapter %s registered", harbor) + log.Infof("the factory for adapter %s registered", model.RegistryTypeHarbor) } -// TODO implement the functions type adapter struct { *adp.DefaultImageRegistry + registry *model.Registry + coreServiceURL string + client *common_http.Client } -func newAdapter(registry *model.Registry) *adapter { - return &adapter{} +// The registry URL and core service URL are different when the adapter +// is created for a local Harbor. If the "coreServicrURL" is null, the +// registry URL will be used as the coreServiceURL instead +func newAdapter(registry *model.Registry, coreServiceURL string, + tokenServiceURL string) *adapter { + transport := registry_pkg.GetHTTPTransport(registry.Insecure) + modifiers := []modifier.Modifier{ + &auth.UserAgentModifier{ + UserAgent: adp.UserAgentReplicator, + }, + } + if registry.Credential != nil { + authorizer := auth.NewBasicAuthCredential( + registry.Credential.AccessKey, + registry.Credential.AccessSecret) + modifiers = append(modifiers, authorizer) + } + + url := registry.URL + if len(coreServiceURL) > 0 { + url = coreServiceURL + } + + return &adapter{ + registry: registry, + coreServiceURL: url, + client: common_http.NewClient( + &http.Client{ + Transport: transport, + }, modifiers...), + DefaultImageRegistry: adp.NewDefaultImageRegistry(registry, tokenServiceURL), + } } +// TODO implement the function func (a *adapter) ListNamespaces(*model.NamespaceQuery) ([]*model.Namespace, error) { return nil, nil } -func (a *adapter) CreateNamespace(*model.Namespace) error { - return nil +func (a *adapter) CreateNamespace(namespace *model.Namespace) error { + project := &struct { + Name string `json:"project_name"` + Metadata map[string]interface{} `json:"metadata"` + }{ + Name: namespace.Name, + } + // handle the public of the project + if meta, exist := namespace.Metadata["public"]; exist { + public := true + // if one of them is "private", the set the public as false + for _, value := range meta.(map[string]interface{}) { + b, err := strconv.ParseBool(value.(string)) + if err != nil { + return err + } + if !b { + public = false + break + } + + } + project.Metadata = map[string]interface{}{ + "public": public, + } + } + + err := a.client.Post(a.coreServiceURL+"/api/projects", project) + if httpErr, ok := err.(*common_http.Error); ok && httpErr.Code == http.StatusConflict { + log.Debugf("got 409 when trying to create project %s", namespace.Name) + return nil + } + return err } -func (a *adapter) GetNamespace(string) (*model.Namespace, error) { - return nil, nil +func (a *adapter) GetNamespace(namespace string) (*model.Namespace, error) { + project, err := a.getProject(namespace) + if err != nil { + return nil, err + } + return &model.Namespace{ + Name: namespace, + Metadata: project.Metadata, + }, nil +} + +// TODO implement filter +func (a *adapter) FetchImages(namespaces []string, filters []*model.Filter) ([]*model.Resource, error) { + resources := []*model.Resource{} + for _, namespace := range namespaces { + project, err := a.getProject(namespace) + if err != nil { + return nil, err + } + repositories := []*repository{} + url := fmt.Sprintf("%s/api/repositories?project_id=%d", a.coreServiceURL, project.ID) + if err = a.client.Get(url, &repositories); err != nil { + return nil, err + } + + for _, repository := range repositories { + url := fmt.Sprintf("%s/api/repositories/%s/tags", a.coreServiceURL, repository.Name) + tags := []*tag{} + if err = a.client.Get(url, &tags); err != nil { + return nil, err + } + vtags := []string{} + for _, tag := range tags { + vtags = append(vtags, tag.Name) + } + resources = append(resources, &model.Resource{ + Type: model.ResourceTypeRepository, + Registry: a.registry, + Metadata: &model.ResourceMetadata{ + Namespace: namespace, + Name: repository.Name, + Vtags: vtags, + }, + }) + } + } + + return resources, nil +} + +type project struct { + ID int64 `json:"project_id"` + Name string `json:"name"` + Metadata map[string]interface{} `json:"metadata"` +} + +type repository struct { + Name string `json:"name"` +} + +type tag struct { + Name string `json:"name"` +} + +func (a *adapter) getProject(name string) (*project, error) { + // TODO need an API to exact match project by name + projects := []*project{} + url := fmt.Sprintf("%s/api/projects?name=%s&page=1&page_size=1000", a.coreServiceURL, name) + if err := a.client.Get(url, &projects); err != nil { + return nil, err + } + + for _, pro := range projects { + if pro.Name == name { + p := &project{ + ID: pro.ID, + Name: name, + } + if pro.Metadata != nil { + metadata := map[string]interface{}{} + for key, value := range pro.Metadata { + metadata[key] = value + } + p.Metadata = metadata + } + return p, nil + } + } + return nil, fmt.Errorf("project %s not found", name) } diff --git a/src/replication/ng/execution/execution.go b/src/replication/ng/execution/execution.go index e47a97251..8d1af5d2d 100644 --- a/src/replication/ng/execution/execution.go +++ b/src/replication/ng/execution/execution.go @@ -16,6 +16,7 @@ package execution import ( "fmt" + "github.com/goharbor/harbor/src/core/utils" "github.com/goharbor/harbor/src/replication/ng/dao" "github.com/goharbor/harbor/src/replication/ng/dao/models" @@ -63,8 +64,8 @@ type DefaultManager struct { } // NewDefaultManager ... -func NewDefaultManager() (Manager, error) { - return &DefaultManager{}, nil +func NewDefaultManager() Manager { + return &DefaultManager{} } // Create a new execution diff --git a/src/replication/ng/execution/execution_test.go b/src/replication/ng/execution/execution_test.go index 6a6370438..4a8fa0900 100644 --- a/src/replication/ng/execution/execution_test.go +++ b/src/replication/ng/execution/execution_test.go @@ -1,17 +1,18 @@ package execution import ( + "os" + "testing" + "time" + "github.com/goharbor/harbor/src/common/dao" "github.com/goharbor/harbor/src/common/utils/log" "github.com/goharbor/harbor/src/replication/ng/dao/models" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "os" - "testing" - "time" ) -var executionManager, _ = NewDefaultManager() +var executionManager = NewDefaultManager() func TestMain(m *testing.M) { databases := []string{"postgresql"} diff --git a/src/replication/ng/flow/controller_test.go b/src/replication/ng/flow/controller_test.go index 278c7953a..5b883f9b6 100644 --- a/src/replication/ng/flow/controller_test.go +++ b/src/replication/ng/flow/controller_test.go @@ -19,6 +19,7 @@ import ( "testing" "github.com/docker/distribution" + "github.com/goharbor/harbor/src/core/config" "github.com/goharbor/harbor/src/replication/ng/adapter" "github.com/goharbor/harbor/src/replication/ng/dao/models" "github.com/goharbor/harbor/src/replication/ng/model" @@ -204,6 +205,7 @@ func (f *fakedAdapter) PushBlob(repository, digest string, size int64, blob io.R } func TestStartReplication(t *testing.T) { + config.InitWithSettings(nil) err := adapter.RegisterFactory( &adapter.Info{ Type: "faked_registry", diff --git a/src/replication/ng/flow/flow.go b/src/replication/ng/flow/flow.go index 8e46c7a02..ec43667fb 100644 --- a/src/replication/ng/flow/flow.go +++ b/src/replication/ng/flow/flow.go @@ -25,6 +25,7 @@ import ( "github.com/goharbor/harbor/src/replication/ng/execution" "github.com/goharbor/harbor/src/common/utils/log" + "github.com/goharbor/harbor/src/core/config" "github.com/goharbor/harbor/src/replication/ng/adapter" "github.com/goharbor/harbor/src/replication/ng/dao/models" "github.com/goharbor/harbor/src/replication/ng/model" @@ -53,45 +54,71 @@ func newFlow(policy *model.Policy, registryMgr registry.Manager, scheduler: scheduler, } - // get source registry - srcRegistry, err := registryMgr.Get(policy.SrcRegistryID) + // TODO consider to put registry model in the policy directly rather than just the registry ID? + url, err := config.RegistryURL() if err != nil { - return nil, fmt.Errorf("failed to get registry %d: %v", policy.SrcRegistryID, err) + return nil, fmt.Errorf("failed to get the registry URL: %v", err) } - if srcRegistry == nil { - return nil, fmt.Errorf("registry %d not found", policy.SrcRegistryID) + registry := &model.Registry{ + Type: model.RegistryTypeHarbor, + Name: "Local", + URL: url, + // TODO use the service account + Credential: &model.Credential{ + Type: model.CredentialTypeBasic, + AccessKey: "admin", + AccessSecret: "Harbor12345", + }, + Insecure: true, + } + + // get source registry + if policy.SrcRegistryID != 0 { + srcRegistry, err := registryMgr.Get(policy.SrcRegistryID) + if err != nil { + return nil, fmt.Errorf("failed to get registry %d: %v", policy.SrcRegistryID, err) + } + if srcRegistry == nil { + return nil, fmt.Errorf("registry %d not found", policy.SrcRegistryID) + } + f.srcRegistry = srcRegistry + } else { + f.srcRegistry = registry } - f.srcRegistry = srcRegistry // get destination registry - dstRegistry, err := registryMgr.Get(policy.DestRegistryID) - if err != nil { - return nil, fmt.Errorf("failed to get registry %d: %v", policy.DestRegistryID, err) + if policy.DestRegistryID != 0 { + dstRegistry, err := registryMgr.Get(policy.DestRegistryID) + if err != nil { + return nil, fmt.Errorf("failed to get registry %d: %v", policy.DestRegistryID, err) + } + if dstRegistry == nil { + return nil, fmt.Errorf("registry %d not found", policy.DestRegistryID) + } + f.dstRegistry = dstRegistry + } else { + f.dstRegistry = registry } - if dstRegistry == nil { - return nil, fmt.Errorf("registry %d not found", policy.DestRegistryID) - } - f.dstRegistry = dstRegistry // create the source registry adapter - srcFactory, err := adapter.GetFactory(srcRegistry.Type) + srcFactory, err := adapter.GetFactory(f.srcRegistry.Type) if err != nil { - return nil, fmt.Errorf("failed to get adapter factory for registry type %s: %v", srcRegistry.Type, err) + return nil, fmt.Errorf("failed to get adapter factory for registry type %s: %v", f.srcRegistry.Type, err) } - srcAdapter, err := srcFactory(srcRegistry) + srcAdapter, err := srcFactory(f.srcRegistry) if err != nil { - return nil, fmt.Errorf("failed to create adapter for source registry %s: %v", srcRegistry.URL, err) + return nil, fmt.Errorf("failed to create adapter for source registry %s: %v", f.srcRegistry.URL, err) } f.srcAdapter = srcAdapter // create the destination registry adapter - dstFactory, err := adapter.GetFactory(dstRegistry.Type) + dstFactory, err := adapter.GetFactory(f.dstRegistry.Type) if err != nil { - return nil, fmt.Errorf("failed to get adapter factory for registry type %s: %v", dstRegistry.Type, err) + return nil, fmt.Errorf("failed to get adapter factory for registry type %s: %v", f.dstRegistry.Type, err) } - dstAdapter, err := dstFactory(dstRegistry) + dstAdapter, err := dstFactory(f.dstRegistry) if err != nil { - return nil, fmt.Errorf("failed to create adapter for destination registry %s: %v", dstRegistry.URL, err) + return nil, fmt.Errorf("failed to create adapter for destination registry %s: %v", f.dstRegistry.URL, err) } f.dstAdapter = dstAdapter @@ -150,7 +177,30 @@ func (f *flow) fetchResources() error { } func (f *flow) createNamespace() error { - // merge the metadata of all source namespaces + // Merge the metadata of all source namespaces + // eg: + // We have two source namespaces: + // { + // Name: "source01", + // Metadata: {"public": true} + // } + // and + // { + // Name: "source02", + // Metadata: {"public": false} + // } + // The name of the destination namespace is "destination", + // after merging the metadata, the destination namespace + // looks like this: + // { + // Name: "destination", + // Metadata: { + // "public": { + // "source01": true, + // "source02": false, + // }, + // }, + // } metadata := map[string]interface{}{} for _, srcNamespace := range f.policy.SrcNamespaces { namespace, err := f.srcAdapter.GetNamespace(srcNamespace) @@ -159,7 +209,13 @@ func (f *flow) createNamespace() error { return err } for key, value := range namespace.Metadata { - metadata[namespace.Name+":"+key] = value + var m map[string]interface{} + if metadata[key] == nil { + m = map[string]interface{}{} + } else { + m = metadata[key].(map[string]interface{}) + } + m[namespace.Name] = value } } @@ -282,7 +338,7 @@ func (f *flow) markExecutionFailure(err error) { Status: models.ExecutionStatusFailed, StatusText: statusText, EndTime: time.Now(), - }) + }, "Status", "StatusText", "EndTime") if err != nil { log.Errorf("failed to update the execution %d: %v", f.executionID, err) } diff --git a/src/replication/ng/replication.go b/src/replication/ng/replication.go index 03c36979e..e5e79ed31 100644 --- a/src/replication/ng/replication.go +++ b/src/replication/ng/replication.go @@ -47,9 +47,8 @@ func Init() error { RegistryMgr = registry.NewDefaultManager() // init policy manager PolicyMgr = policy.NewDefaultManager() - - // TODO init ExecutionMgr - var executionMgr execution.Manager + // init ExecutionMgr + executionMgr := execution.NewDefaultManager() // TODO init scheduler var scheduler scheduler.Scheduler From 3f64a3b337c3aeb3da5d0111b25d63e2d26ea1df Mon Sep 17 00:00:00 2001 From: peimingming Date: Fri, 15 Mar 2019 16:21:39 +0800 Subject: [PATCH 6/6] Execution updateStatus logic upgrade Signed-off-by: peimingming --- src/replication/ng/dao/execution.go | 275 ++++++++---------- src/replication/ng/dao/execution_test.go | 112 ++++--- src/replication/ng/dao/models/execution.go | 6 + .../ng/execution/execution_test.go | 4 +- 4 files changed, 215 insertions(+), 182 deletions(-) diff --git a/src/replication/ng/dao/execution.go b/src/replication/ng/dao/execution.go index 8a0e7ef2e..b6d641a57 100644 --- a/src/replication/ng/dao/execution.go +++ b/src/replication/ng/dao/execution.go @@ -51,6 +51,12 @@ func GetExecutions(query ...*models.ExecutionQuery) ([]*models.Execution, error) qs = qs.OrderBy("-StartTime") _, err := qs.All(&executions) + if err != nil || len(executions) == 0 { + return executions, err + } + for _, e := range executions { + fillExecution(e) + } return executions, err } @@ -81,9 +87,113 @@ func GetExecution(id int64) (*models.Execution, error) { if err == orm.ErrNoRows { return nil, nil } + if err != nil { + return nil, err + } + fillExecution(&t) return &t, err } +// fillExecution will fill the statistics data and status by tasks data +func fillExecution(execution *models.Execution) error { + if executionFinished(execution.Status) { + return nil + } + + o := dao.GetOrmer() + sql := `select status, count(*) as c from replication_task where execution_id = ? group by status` + queryParam := make([]interface{}, 1) + queryParam = append(queryParam, execution.ID) + + dt := []*models.TaskStat{} + count, err := o.Raw(sql, queryParam).QueryRows(&dt) + + if err != nil { + log.Errorf("Query tasks error execution %d: %v", execution.ID, err) + return err + } + + if count == 0 { + return nil + } + + total := 0 + for _, d := range dt { + status, _ := getStatus(d.Status) + updateStatusCount(execution, status, d.C) + total += d.C + } + + if execution.Total != total { + log.Errorf("execution task count inconsistent and fixed, executionID=%d, execution.total=%d, tasks.count=%d", + execution.ID, execution.Total, total) + execution.Total = total + } + resetExecutionStatus(execution) + + // if execution status changed to a final status, store to DB + if executionFinished(execution.Status) { + UpdateExecution(execution, models.ExecutionPropsName.Status, models.ExecutionPropsName.InProgress, + models.ExecutionPropsName.Succeed, models.ExecutionPropsName.Failed, models.ExecutionPropsName.Stopped, + models.ExecutionPropsName.EndTime, models.ExecutionPropsName.Total) + } + return nil +} + +func getStatus(status string) (string, error) { + switch status { + case models.TaskStatusInitialized, models.TaskStatusPending, models.TaskStatusInProgress: + return models.ExecutionStatusInProgress, nil + case models.TaskStatusSucceed: + return models.ExecutionStatusSucceed, nil + case models.TaskStatusStopped: + return models.ExecutionStatusStopped, nil + case models.TaskStatusFailed: + return models.ExecutionStatusFailed, nil + } + return "", fmt.Errorf("Not support task status ") +} + +func updateStatusCount(execution *models.Execution, status string, delta int) error { + switch status { + case models.ExecutionStatusInProgress: + execution.InProgress += delta + case models.ExecutionStatusSucceed: + execution.Succeed += delta + case models.ExecutionStatusStopped: + execution.Stopped += delta + case models.ExecutionStatusFailed: + execution.Failed += delta + } + return nil +} + +func resetExecutionStatus(execution *models.Execution) error { + execution.Status = generateStatus(execution) + if executionFinished(execution.Status) { + execution.EndTime = time.Now() + } + return nil +} + +func generateStatus(execution *models.Execution) string { + if execution.InProgress > 0 { + return models.ExecutionStatusInProgress + } else if execution.Failed > 0 { + return models.ExecutionStatusFailed + } else if execution.Stopped > 0 { + return models.ExecutionStatusStopped + } + return models.ExecutionStatusSucceed +} + +func executionFinished(status string) bool { + if status == models.ExecutionStatusInProgress { + return false + } + return true +} + // DeleteExecution ... func DeleteExecution(id int64) error { o := dao.GetOrmer() @@ -110,19 +220,10 @@ func UpdateExecution(execution *models.Execution, props ...string) (int64, error // AddTask ... func AddTask(task *models.Task) (int64, error) { o := dao.GetOrmer() - sql := `insert into replication_task (execution_id, resource_type, src_resource, dst_resource, job_id, status) - values (?, ?, ?, ?, ?, ?) RETURNING id` + now := time.Now() + task.StartTime = now - args := []interface{}{} - args = append(args, task.ExecutionID, task.ResourceType, task.SrcResource, task.DstResource, task.JobID, task.Status) - - var taskID int64 - err := o.Raw(sql, args).QueryRow(&taskID) - if err != nil { - return 0, err - } - - return taskID, nil + return o.Insert(task) } // GetTask ... @@ -210,40 +311,11 @@ func UpdateTask(task *models.Task, props ...string) (int64, error) { // UpdateTaskStatus ... func UpdateTaskStatus(id int64, status string, statusCondition ...string) (int64, error) { - // can not use the globalOrm - o := orm.NewOrm() - o.Begin() - - // query the task status - var task models.Task - sql := `select * from replication_task where id = ?` - if err := o.Raw(sql, id).QueryRow(&task); err != nil { - if err == orm.ErrNoRows { - o.Rollback() - return 0, err - } - } - - // check status - satisfy := false - if len(statusCondition) == 0 { - satisfy = true - } else { - for _, stCondition := range statusCondition { - if task.Status == stCondition { - satisfy = true - break - } - } - } - if !satisfy { - o.Rollback() - return 0, fmt.Errorf("Status condition not match ") - } + o := dao.GetOrmer() // update status params := []interface{}{} - sql = `update replication_task set status = ?` + sql := `update replication_task set status = ?` params = append(params, status) if taskFinished(status) { // should update endTime sql += ` ,end_time = ?` @@ -251,49 +323,26 @@ func UpdateTaskStatus(id int64, status string, statusCondition ...string) (int64 } sql += ` where id = ?` params = append(params, id) - _, err := o.Raw(sql, params).Exec() - log.Infof("Update task %d: %s -> %s", id, task.Status, status) - if err != nil { - log.Errorf("Update task failed %d: %s -> %s", id, task.Status, status) - o.Rollback() - return 0, err - } - - // query the execution - var execution models.Execution - sql = `select * from replication_execution where id = ?` - if err := o.Raw(sql, task.ExecutionID).QueryRow(&execution); err != nil { - if err == orm.ErrNoRows { - log.Errorf("Execution not found id: %d", task.ExecutionID) - o.Rollback() - return 0, err + if len(statusCondition) > 0 { + sql += ` and status in (` + for _, stCondition := range statusCondition { + sql += ` ?,` + params = append(params, stCondition) } - } - // check execution data - execuStatus, _ := getStatus(task.Status) - count := getStatusCount(&execution, execuStatus) - if count <= 0 { - log.Errorf("Task statistics in execution inconsistent") - o.Commit() - return 1, nil + sql = sql[0 : len(sql)-1] + sql += `)` } - // update execution data - updateStatusCount(&execution, execuStatus, -1) - execuStatusUp, _ := getStatus(status) - updateStatusCount(&execution, execuStatusUp, 1) - - resetExecutionStatus(&execution) - _, err = o.Update(&execution, models.ExecutionPropsName.Status, models.ExecutionPropsName.Total, models.ExecutionPropsName.InProgress, - models.ExecutionPropsName.Failed, models.ExecutionPropsName.Succeed, models.ExecutionPropsName.Stopped, - models.ExecutionPropsName.EndTime) + log.Infof("Update task %d: -> %s", id, status) + res, err := o.Raw(sql, params).Exec() if err != nil { - log.Errorf("Update execution status failed %d: %v", execution.ID, err) - o.Rollback() return 0, err } - o.Commit() - return 1, nil + count, err := res.RowsAffected() + if err != nil { + return 0, err + } + return count, nil } func taskFinished(status string) bool { @@ -302,69 +351,3 @@ func taskFinished(status string) bool { } return false } - -func getStatus(status string) (string, error) { - switch status { - case models.TaskStatusInitialized, models.TaskStatusPending, models.TaskStatusInProgress: - return models.ExecutionStatusInProgress, nil - case models.TaskStatusSucceed: - return models.ExecutionStatusSucceed, nil - case models.TaskStatusStopped: - return models.ExecutionStatusStopped, nil - case models.TaskStatusFailed: - return models.ExecutionStatusFailed, nil - } - return "", fmt.Errorf("Not support task status ") -} - -func getStatusCount(execution *models.Execution, status string) int { - switch status { - case models.ExecutionStatusInProgress: - return execution.InProgress - case models.ExecutionStatusSucceed: - return execution.Succeed - case models.ExecutionStatusStopped: - return execution.Stopped - case models.ExecutionStatusFailed: - return execution.Failed - } - return 0 -} - -func updateStatusCount(execution *models.Execution, status string, delta int) error { - switch status { - case models.ExecutionStatusInProgress: - execution.InProgress += delta - case models.ExecutionStatusSucceed: - execution.Succeed += delta - case models.ExecutionStatusStopped: - execution.Stopped += delta - case models.ExecutionStatusFailed: - execution.Failed += delta - } - return nil -} - -func resetExecutionStatus(execution *models.Execution) error { - status := generateStatus(execution) - if status != execution.Status { - execution.Status = status - log.Debugf("Execution status changed %d: %s -> %s", execution.ID, execution.Status, status) - } - if n := getStatusCount(execution, models.ExecutionStatusInProgress); n == 0 { - // execution finished in this time - execution.EndTime = time.Now() - } - return nil -} - -func generateStatus(execution *models.Execution) string { - if execution.InProgress > 0 { - return models.ExecutionStatusInProgress - } else if execution.Failed > 0 { - return models.ExecutionStatusFailed - } else if execution.Stopped > 0 { - return models.ExecutionStatusStopped - } - return models.ExecutionStatusSucceed -} diff --git a/src/replication/ng/dao/execution_test.go b/src/replication/ng/dao/execution_test.go index a9adec642..a3f609445 100644 --- a/src/replication/ng/dao/execution_test.go +++ b/src/replication/ng/dao/execution_test.go @@ -148,6 +148,13 @@ func TestMethodOfTask(t *testing.T) { require.Nil(t, err) assert.Equal(t, int64(1), n) + // test update status + n, err = UpdateTaskStatus(id1, "Succeed") + require.Nil(t, err) + assert.Equal(t, int64(1), n) + task, _ = GetTask(id1) + assert.Equal(t, "Succeed", task.Status) + // test delete require.Nil(t, DeleteTask(id1)) task, err = GetTask(id1) @@ -162,16 +169,12 @@ func TestMethodOfTask(t *testing.T) { assert.Equal(t, int64(0), n) } -func TestUpdateJobStatus(t *testing.T) { +func TestExecutionFill(t *testing.T) { execution := &models.Execution{ PolicyID: 11209, Status: "InProgress", StatusText: "None", - Total: 12, - Failed: 0, - Succeed: 10, - InProgress: 1, - Stopped: 1, + Total: 2, Trigger: "Event", StartTime: time.Now(), } @@ -183,7 +186,56 @@ func TestUpdateJobStatus(t *testing.T) { SrcResource: "srcResource1", DstResource: "dstResource1", JobID: "jobID1", - Status: "Pending", + Status: "Succeed", + StartTime: time.Now(), + } + task2 := &models.Task{ + ID: 20192, + ExecutionID: executionID, + ResourceType: "resourceType2", + SrcResource: "srcResource2", + DstResource: "dstResource2", + JobID: "jobID2", + Status: "Stopped", + StartTime: time.Now(), + EndTime: time.Now(), + } + AddTask(task1) + AddTask(task2) + + defer func() { + DeleteAllTasks(executionID) + DeleteAllExecutions(11209) + }() + + // query and fill + exe, err := GetExecution(executionID) + require.Nil(t, err) + assert.Equal(t, "Stopped", exe.Status) + assert.Equal(t, 0, exe.InProgress) + assert.Equal(t, 1, exe.Stopped) + assert.Equal(t, 0, exe.Failed) + assert.Equal(t, 1, exe.Succeed) +} + +func TestExecutionFill2(t *testing.T) { + execution := &models.Execution{ + PolicyID: 11209, + Status: "InProgress", + StatusText: "None", + Total: 2, + Trigger: "Event", + StartTime: time.Now(), + } + executionID, _ := AddExecution(execution) + task1 := &models.Task{ + ID: 20191, + ExecutionID: executionID, + ResourceType: "resourceType1", + SrcResource: "srcResource1", + DstResource: "dstResource1", + JobID: "jobID1", + Status: models.TaskStatusInProgress, StartTime: time.Now(), } task2 := &models.Task{ @@ -198,40 +250,32 @@ func TestUpdateJobStatus(t *testing.T) { EndTime: time.Now(), } taskID1, _ := AddTask(task1) - taskID2, _ := AddTask(task2) + AddTask(task2) defer func() { DeleteAllTasks(executionID) DeleteAllExecutions(11209) }() - // update Pending->InProgress - n, err := UpdateTaskStatus(taskID1, "InProgress", "Pending") + // query and fill + exe, err := GetExecution(executionID) require.Nil(t, err) - assert.Equal(t, int64(1), n) + assert.Equal(t, models.ExecutionStatusInProgress, exe.Status) + assert.Equal(t, 1, exe.InProgress) + assert.Equal(t, 1, exe.Stopped) + assert.Equal(t, 0, exe.Failed) + assert.Equal(t, 0, exe.Succeed) - execu, err := GetExecution(executionID) + // update task status and query and fill + UpdateTaskStatus(taskID1, models.TaskStatusFailed) + exes, err := GetExecutions(&models.ExecutionQuery{ + PolicyID: 11209, + }) require.Nil(t, err) - assert.Equal(t, execution.InProgress, execu.InProgress) - assert.Equal(t, execution.Status, execu.Status) - - // update InProgress->Failed: Execution.InProgress-1, Failed+1 - n, err = UpdateTaskStatus(taskID1, "Failed") - require.Nil(t, err) - assert.Equal(t, int64(1), n) - - execu, err = GetExecution(executionID) - require.Nil(t, err) - assert.Equal(t, 1, execu.Failed) - assert.Equal(t, "Failed", execu.Status) - - // update Stopped->Pending: Execution.Stopped-1, InProgress+1 - n, err = UpdateTaskStatus(taskID2, "Pending") - require.Nil(t, err) - assert.Equal(t, int64(1), n) - - execu, err = GetExecution(executionID) - require.Nil(t, err) - assert.Equal(t, 1, execu.InProgress) - assert.Equal(t, "InProgress", execu.Status) + assert.Equal(t, 1, len(exes)) + assert.Equal(t, models.ExecutionStatusFailed, exes[0].Status) + assert.Equal(t, 0, exes[0].InProgress) + assert.Equal(t, 1, exes[0].Stopped) + assert.Equal(t, 1, exes[0].Failed) + assert.Equal(t, 0, exes[0].Succeed) } diff --git a/src/replication/ng/dao/models/execution.go b/src/replication/ng/dao/models/execution.go index d73c3e21e..dfd6fe9e9 100644 --- a/src/replication/ng/dao/models/execution.go +++ b/src/replication/ng/dao/models/execution.go @@ -144,3 +144,9 @@ type TaskQuery struct { ResourceType string Pagination } + +// TaskStat holds statistics of task by status +type TaskStat struct { + Status string `orm:"column(status)"` + C int `orm:"column(c)"` +} diff --git a/src/replication/ng/execution/execution_test.go b/src/replication/ng/execution/execution_test.go index 4a8fa0900..3ccb23d54 100644 --- a/src/replication/ng/execution/execution_test.go +++ b/src/replication/ng/execution/execution_test.go @@ -138,9 +138,9 @@ func TestMethodOfTaskManager(t *testing.T) { // UpdateTaskStatus err = executionManager.UpdateTaskStatus(id, models.TaskStatusSucceed) - require.NotNil(t, err) + require.Nil(t, err) taskUpdate, _ = executionManager.GetTask(id) - assert.Equal(t, models.TaskStatusInitialized, taskUpdate.Status) + assert.Equal(t, models.TaskStatusSucceed, taskUpdate.Status) // Remove require.Nil(t, executionManager.RemoveTask(id))