From 87ce1c84d5c03e9bbb9a7b996f5b2905cf0709dc Mon Sep 17 00:00:00 2001 From: Wenkai Yin Date: Fri, 5 Jan 2018 16:50:35 +0800 Subject: [PATCH] Manual starting replication will be rejected if there are pending/running jobs --- src/common/dao/dao_test.go | 2 +- src/common/dao/replication_job.go | 4 ++-- src/ui/api/replication.go | 19 +++++++++++++++++-- src/ui/api/replication_job.go | 2 +- src/ui/api/replication_policy.go | 17 ++++++----------- 5 files changed, 27 insertions(+), 17 deletions(-) diff --git a/src/common/dao/dao_test.go b/src/common/dao/dao_test.go index 9be616be1..84f159608 100644 --- a/src/common/dao/dao_test.go +++ b/src/common/dao/dao_test.go @@ -1139,7 +1139,7 @@ func TestGetRepJobByPolicy(t *testing.T) { } func TestFilterRepJobs(t *testing.T) { - jobs, _, err := FilterRepJobs(policyID, "", "", nil, nil, 1000, 0) + jobs, _, err := FilterRepJobs(policyID, "", []string{}, nil, nil, 1000, 0) if err != nil { t.Errorf("Error occurred in FilterRepJobs: %v, policy ID: %d", err, policyID) return diff --git a/src/common/dao/replication_job.go b/src/common/dao/replication_job.go index 5a2639bd0..7c6d2d805 100644 --- a/src/common/dao/replication_job.go +++ b/src/common/dao/replication_job.go @@ -291,7 +291,7 @@ func GetRepJobByPolicy(policyID int64) ([]*models.RepJob, error) { } // FilterRepJobs ... -func FilterRepJobs(policyID int64, repository, status string, startTime, +func FilterRepJobs(policyID int64, repository string, status []string, startTime, endTime *time.Time, limit, offset int64) ([]*models.RepJob, int64, error) { jobs := []*models.RepJob{} @@ -305,7 +305,7 @@ func FilterRepJobs(policyID int64, repository, status string, startTime, qs = qs.Filter("Repository__icontains", repository) } if len(status) != 0 { - qs = qs.Filter("Status__icontains", status) + qs = qs.Filter("Status__in", status) } if startTime != nil { qs = qs.Filter("CreationTime__gte", startTime) diff --git a/src/ui/api/replication.go b/src/ui/api/replication.go index ffe9165d7..0290949bb 100644 --- a/src/ui/api/replication.go +++ b/src/ui/api/replication.go @@ -16,13 +16,16 @@ package api import ( "fmt" + "net/http" + "github.com/vmware/harbor/src/common/dao" + "github.com/vmware/harbor/src/common/models" "github.com/vmware/harbor/src/common/notifier" "github.com/vmware/harbor/src/common/utils/log" "github.com/vmware/harbor/src/replication/core" "github.com/vmware/harbor/src/replication/event/notification" "github.com/vmware/harbor/src/replication/event/topic" - "github.com/vmware/harbor/src/ui/api/models" + api_models "github.com/vmware/harbor/src/ui/api/models" ) // ReplicationAPI handles API calls for replication @@ -46,7 +49,7 @@ func (r *ReplicationAPI) Prepare() { // Post trigger a replication according to the specified policy func (r *ReplicationAPI) Post() { - replication := &models.Replication{} + replication := &api_models.Replication{} r.DecodeJSONReqAndValidate(replication) policy, err := core.GlobalController.GetPolicy(replication.PolicyID) @@ -60,6 +63,18 @@ func (r *ReplicationAPI) Post() { return } + _, count, err := dao.FilterRepJobs(replication.PolicyID, "", + []string{models.JobRunning, models.JobPending}, nil, nil, 1, 0) + if err != nil { + r.HandleInternalServerError(fmt.Sprintf("failed to filter jobs of policy %d: %v", + replication.PolicyID, err)) + return + } + if count > 0 { + r.RenderError(http.StatusPreconditionFailed, "policy has running/pending jobs, new replication can not be triggered") + return + } + if err = startReplication(replication.PolicyID); err != nil { r.HandleInternalServerError(fmt.Sprintf("failed to publish replication topic for policy %d: %v", replication.PolicyID, err)) return diff --git a/src/ui/api/replication_job.go b/src/ui/api/replication_job.go index 670ebacda..b9ddf1609 100644 --- a/src/ui/api/replication_job.go +++ b/src/ui/api/replication_job.go @@ -108,7 +108,7 @@ func (ra *RepJobAPI) List() { page, pageSize := ra.GetPaginationParams() - jobs, total, err := dao.FilterRepJobs(policyID, repository, status, + jobs, total, err := dao.FilterRepJobs(policyID, repository, []string{status}, startTime, endTime, pageSize, pageSize*(page-1)) if err != nil { log.Errorf("failed to filter jobs according policy ID %d, repository %s, status %s, start time %v, end time %v: %v", diff --git a/src/ui/api/replication_policy.go b/src/ui/api/replication_policy.go index 647fe6e58..6a843fe6a 100644 --- a/src/ui/api/replication_policy.go +++ b/src/ui/api/replication_policy.go @@ -243,19 +243,14 @@ func (pa *RepPolicyAPI) Delete() { pa.CustomAbort(http.StatusNotFound, http.StatusText(http.StatusNotFound)) } - // TODO - jobs, err := dao.GetRepJobByPolicy(id) + _, count, err := dao.FilterRepJobs(id, "", + []string{models.JobRunning, models.JobRetrying, models.JobPending}, nil, nil, 1, 0) if err != nil { - log.Errorf("failed to get jobs of policy %d: %v", id, err) + log.Errorf("failed to filter jobs of policy %d: %v", id, err) pa.CustomAbort(http.StatusInternalServerError, "") } - - for _, job := range jobs { - if job.Status == models.JobRunning || - job.Status == models.JobRetrying || - job.Status == models.JobPending { - pa.CustomAbort(http.StatusPreconditionFailed, "policy has running/retrying/pending jobs, can not be deleted") - } + if count > 0 { + pa.CustomAbort(http.StatusPreconditionFailed, "policy has running/retrying/pending jobs, can not be deleted") } if err = core.GlobalController.RemovePolicy(id); err != nil { @@ -302,7 +297,7 @@ func convertFromRepPolicy(projectMgr promgr.ProjectManager, policy rep_models.Re } // TODO call the method from replication controller - _, errJobCount, err := dao.FilterRepJobs(policy.ID, "", "error", nil, nil, 0, 0) + _, errJobCount, err := dao.FilterRepJobs(policy.ID, "", []string{"error"}, nil, nil, 0, 0) if err != nil { return nil, err }