diff --git a/src/common/dao/replication_job.go b/src/common/dao/replication_job.go index d72a4796c..9aa3634bd 100644 --- a/src/common/dao/replication_job.go +++ b/src/common/dao/replication_job.go @@ -312,7 +312,7 @@ func GetRepJobByPolicy(policyID int64) ([]*models.RepJob, error) { // FilterRepJobs ... func FilterRepJobs(policyID int64, repository string, status []string, startTime, - endTime *time.Time, limit, offset int64) ([]*models.RepJob, int64, error) { + endTime *time.Time, limit, offset int64, operations ...string) ([]*models.RepJob, int64, error) { jobs := []*models.RepJob{} @@ -333,6 +333,9 @@ func FilterRepJobs(policyID int64, repository string, status []string, startTime if endTime != nil { qs = qs.Filter("CreationTime__lte", endTime) } + if len(operations) != 0 { + qs = qs.Filter("Operation__in", operations) + } total, err := qs.Count() if err != nil { diff --git a/src/common/models/job.go b/src/common/models/job.go index 11ea3d00f..ef5a9a7e3 100644 --- a/src/common/models/job.go +++ b/src/common/models/job.go @@ -31,4 +31,6 @@ const ( JobRetrying string = "retrying" //JobContinue is the status returned by statehandler to tell statemachine to move to next possible state based on trasition table. JobContinue string = "_continue" + // JobScheduled ... + JobScheduled string = "scheduled" ) diff --git a/src/common/models/replication_job.go b/src/common/models/replication_job.go index f81afa7b2..8843c7470 100644 --- a/src/common/models/replication_job.go +++ b/src/common/models/replication_job.go @@ -26,6 +26,8 @@ const ( RepOpTransfer string = "transfer" //RepOpDelete represents the operation of a job to remove repository from a remote registry/harbor instance. RepOpDelete string = "delete" + //RepOpSchedule represents the operation of a job to schedule the real replication process + RepOpSchedule string = "schedule" //UISecretCookie is the cookie name to contain the UI secret UISecretCookie string = "secret" //RepTargetTable is the table name for replication targets diff --git a/src/jobservice/job/impl/replication/replicate.go b/src/jobservice/job/impl/replication/replicate.go index afd4f15fa..5e7bb27ef 100644 --- a/src/jobservice/job/impl/replication/replicate.go +++ b/src/jobservice/job/impl/replication/replicate.go @@ -81,7 +81,7 @@ func (r *Replicator) replicate() error { r.logger.Errorf("failed to send the replication request to %s: %v", r.url, err) return err } - r.logger.Infof("the replication request has been sent to %s successfully", r.url) + r.logger.Info("the replication request has been sent successfully") return nil } diff --git a/src/replication/core/controller.go b/src/replication/core/controller.go index a7b22597a..3db5671d2 100644 --- a/src/replication/core/controller.go +++ b/src/replication/core/controller.go @@ -96,22 +96,6 @@ func (ctl *DefaultController) Init() error { return nil } - policies, err := ctl.policyManager.GetPolicies(models.QueryParameter{}) - if err != nil { - return err - } - if policies != nil && len(policies.Policies) > 0 { - for _, policy := range policies.Policies { - if policy.Trigger == nil || policy.Trigger.Kind != replication.TriggerKindSchedule { - continue - } - - if err := ctl.triggerManager.SetupTrigger(policy); err != nil { - log.Errorf("failed to setup trigger for policy %v: %v", policy, err) - } - } - } - //Initialize sourcer ctl.sourcer.Init() diff --git a/src/replication/trigger/manager_test.go b/src/replication/trigger/manager_test.go index d02d9ee28..02500e9a2 100644 --- a/src/replication/trigger/manager_test.go +++ b/src/replication/trigger/manager_test.go @@ -19,6 +19,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/vmware/harbor/src/common/dao" + "github.com/vmware/harbor/src/common/utils/test" "github.com/vmware/harbor/src/replication" "github.com/vmware/harbor/src/replication/models" ) @@ -66,30 +68,26 @@ func TestCreateTrigger(t *testing.T) { } func TestSetupTrigger(t *testing.T) { + dao.DefaultDatabaseWatchItemDAO = &test.FakeWatchItemDAO{} + mgr := NewManager(1) err := mgr.SetupTrigger(&models.ReplicationPolicy{ Trigger: &models.Trigger{ - Kind: replication.TriggerKindSchedule, - ScheduleParam: &models.ScheduleParam{ - Type: replication.TriggerScheduleDaily, - Offtime: 1, - }, + Kind: replication.TriggerKindImmediate, }, }) assert.Nil(t, err) } func TestUnsetTrigger(t *testing.T) { + dao.DefaultDatabaseWatchItemDAO = &test.FakeWatchItemDAO{} + mgr := NewManager(1) err := mgr.UnsetTrigger(&models.ReplicationPolicy{ Trigger: &models.Trigger{ - Kind: replication.TriggerKindSchedule, - ScheduleParam: &models.ScheduleParam{ - Type: replication.TriggerScheduleDaily, - Offtime: 1, - }, + Kind: replication.TriggerKindImmediate, }, }) assert.Nil(t, err) diff --git a/src/replication/trigger/schedule.go b/src/replication/trigger/schedule.go index 5d5b24aad..eeefbd159 100644 --- a/src/replication/trigger/schedule.go +++ b/src/replication/trigger/schedule.go @@ -2,12 +2,17 @@ package trigger import ( "fmt" - "time" + "net/http" - "github.com/vmware/harbor/src/common/scheduler" - "github.com/vmware/harbor/src/common/scheduler/policy" - replication_task "github.com/vmware/harbor/src/common/scheduler/task/replication" + "github.com/vmware/harbor/src/common/dao" + common_http "github.com/vmware/harbor/src/common/http" + "github.com/vmware/harbor/src/common/job" + job_models "github.com/vmware/harbor/src/common/job/models" + "github.com/vmware/harbor/src/common/models" + "github.com/vmware/harbor/src/common/utils/log" "github.com/vmware/harbor/src/replication" + "github.com/vmware/harbor/src/ui/config" + "github.com/vmware/harbor/src/ui/utils" ) //ScheduleTrigger will schedule a alternate policy to provide 'daily' and 'weekly' trigger ways. @@ -29,30 +34,79 @@ func (st *ScheduleTrigger) Kind() string { //Setup is the implementation of same method defined in Trigger interface func (st *ScheduleTrigger) Setup() error { - config := &policy.AlternatePolicyConfiguration{} + metadata := &job_models.JobMetadata{ + JobKind: job.JobKindPeriodic, + } switch st.params.Type { case replication.TriggerScheduleDaily: - config.Duration = 24 * 3600 * time.Second - config.OffsetTime = st.params.Offtime + h, m, s := parseOfftime(st.params.Offtime) + metadata.Cron = fmt.Sprintf("%d %d %d * * *", s, m, h) case replication.TriggerScheduleWeekly: - config.Duration = 7 * 24 * 3600 * time.Second - config.OffsetTime = st.params.Offtime - config.Weekday = st.params.Weekday + h, m, s := parseOfftime(st.params.Offtime) + metadata.Cron = fmt.Sprintf("%d %d %d * * %d", s, m, h, st.params.Weekday%7) default: return fmt.Errorf("unsupported schedual trigger type: %s", st.params.Type) } - schedulePolicy := policy.NewAlternatePolicy(assembleName(st.params.PolicyID), config) - attachTask := replication_task.NewTask(st.params.PolicyID) - schedulePolicy.AttachTasks(attachTask) - return scheduler.DefaultScheduler.Schedule(schedulePolicy) + id, err := dao.AddRepJob(models.RepJob{ + Repository: "N/A", + PolicyID: st.params.PolicyID, + Operation: models.RepOpSchedule, + }) + if err != nil { + return err + } + uuid, err := utils.GetJobServiceClient().SubmitJob(&job_models.JobData{ + Name: job.ImageReplicate, + Parameters: map[string]interface{}{ + "policy_id": st.params.PolicyID, + "url": config.InternalUIURL(), + "insecure": true, + }, + Metadata: metadata, + StatusHook: fmt.Sprintf("%s/service/notifications/jobs/replication/%d", + config.InternalUIURL(), id), + }) + if err != nil { + // clean up the job record in database + if e := dao.DeleteRepJob(id); e != nil { + log.Errorf("failed to delete job %d: %v", id, e) + } + return err + } + return dao.SetRepJobUUID(id, uuid) } //Unset is the implementation of same method defined in Trigger interface func (st *ScheduleTrigger) Unset() error { - return scheduler.DefaultScheduler.UnSchedule(assembleName(st.params.PolicyID)) + jobs, _, err := dao.FilterRepJobs(st.params.PolicyID, "", nil, nil, nil, 0, 0, models.RepOpSchedule) + if err != nil { + return err + } + if len(jobs) != 1 { + log.Warningf("only one job should be found, but found %d now", len(jobs)) + } + + for _, j := range jobs { + if err = utils.GetJobServiceClient().PostAction(j.UUID, job.JobActionStop); err != nil { + // if the job specified by UUID is not found in jobservice, delete the job + // record from database + if e, ok := err.(*common_http.Error); !ok || e.Code != http.StatusNotFound { + return err + } + } + if err = dao.DeleteRepJob(j.ID); err != nil { + return err + } + } + return nil } -func assembleName(policyID int64) string { - return fmt.Sprintf("replication_policy_%d", policyID) +func parseOfftime(offtime int64) (hour, minite, second int) { + offtime = offtime % (3600 * 24) + hour = int(offtime / 3600) + offtime = offtime % 3600 + minite = int(offtime / 60) + second = int(offtime % 60) + return } diff --git a/src/replication/trigger/schedule_test.go b/src/replication/trigger/schedule_test.go index 5fecd934b..13dcc36b6 100644 --- a/src/replication/trigger/schedule_test.go +++ b/src/replication/trigger/schedule_test.go @@ -16,48 +16,35 @@ package trigger import ( "testing" - "time" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "github.com/vmware/harbor/src/common/scheduler" "github.com/vmware/harbor/src/replication" ) -func TestAssembleName(t *testing.T) { - assert.Equal(t, "replication_policy_1", assembleName(1)) -} - func TestKindOfScheduleTrigger(t *testing.T) { trigger := NewScheduleTrigger(ScheduleParam{}) assert.Equal(t, replication.TriggerKindSchedule, trigger.Kind()) } -func TestSetupAndUnSetOfScheduleTrigger(t *testing.T) { - // invalid schedule param - trigger := NewScheduleTrigger(ScheduleParam{}) - assert.NotNil(t, trigger.Setup()) +func TestParseOfftime(t *testing.T) { + cases := []struct { + offtime int64 + hour int + minite int + second int + }{ + {0, 0, 0, 0}, + {1, 0, 0, 1}, + {60, 0, 1, 0}, + {3600, 1, 0, 0}, + {3661, 1, 1, 1}, + {3600*24 + 60, 0, 1, 0}, + } - // valid schedule param - var policyID int64 = 1 - trigger = NewScheduleTrigger(ScheduleParam{ - BasicParam: BasicParam{ - PolicyID: policyID, - }, - Type: replication.TriggerScheduleWeekly, - Weekday: (int8(time.Now().Weekday()) + 1) % 7, - Offtime: 0, - }) - - count := scheduler.DefaultScheduler.PolicyCount() - require.Nil(t, scheduler.DefaultScheduler.GetPolicy(assembleName(policyID))) - - require.Nil(t, trigger.Setup()) - - assert.Equal(t, count+1, scheduler.DefaultScheduler.PolicyCount()) - assert.NotNil(t, scheduler.DefaultScheduler.GetPolicy(assembleName(policyID))) - - require.Nil(t, trigger.Unset()) - assert.Equal(t, count, scheduler.DefaultScheduler.PolicyCount()) - assert.Nil(t, scheduler.DefaultScheduler.GetPolicy(assembleName(policyID))) + for _, c := range cases { + h, m, s := parseOfftime(c.offtime) + assert.Equal(t, c.hour, h) + assert.Equal(t, c.minite, m) + assert.Equal(t, c.second, s) + } } diff --git a/src/ui/api/replication.go b/src/ui/api/replication.go index be89fcca1..c920520bf 100644 --- a/src/ui/api/replication.go +++ b/src/ui/api/replication.go @@ -64,7 +64,8 @@ func (r *ReplicationAPI) Post() { } _, count, err := dao.FilterRepJobs(replication.PolicyID, "", - []string{models.JobRunning, models.JobPending}, nil, nil, 1, 0) + []string{models.JobRunning, models.JobPending}, nil, nil, 1, 0, + models.RepOpTransfer, models.RepOpDelete) if err != nil { r.HandleInternalServerError(fmt.Sprintf("failed to filter jobs of policy %d: %v", replication.PolicyID, err)) diff --git a/src/ui/service/notifications/jobs/handler.go b/src/ui/service/notifications/jobs/handler.go index 87f126f0c..ad2cb682a 100644 --- a/src/ui/service/notifications/jobs/handler.go +++ b/src/ui/service/notifications/jobs/handler.go @@ -32,6 +32,7 @@ var statusMap = map[string]string{ job.JobServiceStatusCancelled: models.JobCanceled, job.JobServiceStatusError: models.JobError, job.JobServiceStatusSuccess: models.JobFinished, + job.JobServiceStatusScheduled: models.JobScheduled, } // Handler handles reqeust on /service/notifications/jobs/*, which listens to the webhook of jobservice. @@ -58,9 +59,9 @@ func (h *Handler) Prepare() { h.Abort("200") return } - log.Debugf("Received scan job status update for job: %d, status: %s", id, data.Status) status, ok := statusMap[data.Status] if !ok { + log.Debugf("drop the job status update event: job id-%d, status-%s", id, status) h.Abort("200") return } @@ -69,6 +70,7 @@ func (h *Handler) Prepare() { // HandleScan handles the webhook of scan job func (h *Handler) HandleScan() { + log.Debugf("received san job status update event: job-%d, status-%s", h.id, h.status) if err := dao.UpdateScanJobStatus(h.id, h.status); err != nil { log.Errorf("Failed to update job status, id: %d, status: %s", h.id, h.status) h.HandleInternalServerError(err.Error()) @@ -78,6 +80,7 @@ func (h *Handler) HandleScan() { //HandleReplication handles the webhook of replication job func (h *Handler) HandleReplication() { + log.Debugf("received replication job status update event: job-%d, status-%s", h.id, h.status) if err := dao.UpdateRepJobStatus(h.id, h.status); err != nil { log.Errorf("Failed to update job status, id: %d, status: %s", h.id, h.status) h.HandleInternalServerError(err.Error())