diff --git a/api/v2.0/swagger.yaml b/api/v2.0/swagger.yaml index 16428b152..a0f0fae56 100644 --- a/api/v2.0/swagger.yaml +++ b/api/v2.0/swagger.yaml @@ -4593,6 +4593,120 @@ paths: $ref: '#/responses/404' '500': $ref: '#/responses/500' + /jobservice/queues: + get: + operationId: listJobQueues + summary: list job queues + description: list job queue + tags: + - jobservice + parameters: + - $ref: '#/parameters/requestId' + responses: + '200': + description: List job queue successfully. + schema: + type: array + items: + $ref: '#/definitions/JobQueue' + '401': + $ref: '#/responses/401' + '403': + $ref: '#/responses/403' + '404': + $ref: '#/responses/404' + '500': + $ref: '#/responses/500' + /jobservice/queues/{job_type}: + put: + operationId: actionPendingJobs + summary: stop and clean, pause, resume pending jobs in the queue + description: stop and clean, pause, resume pending jobs in the queue + tags: + - jobservice + parameters: + - $ref: '#/parameters/requestId' + - name: job_type + in: path + required: true + type: string + description: The type of the job. 'all' stands for all job types + - name: action_request + in: body + required: true + schema: + $ref: '#/definitions/ActionRequest' + responses: + '200': + description: take action to the jobs in the queue successfully. + '401': + $ref: '#/responses/401' + '403': + $ref: '#/responses/403' + '404': + $ref: '#/responses/404' + '500': + $ref: '#/responses/500' + /schedules: + get: + operationId: listSchedules + description: List schedules + tags: + - schedule + parameters: + - $ref: '#/parameters/requestId' + - $ref: '#/parameters/page' + - $ref: '#/parameters/pageSize' + responses: + '200': + description: list schedule successfully. + schema: + type: array + items: + type: object + $ref: '#/definitions/ScheduleTask' + headers: + X-Total-Count: + description: The total count of available items + type: integer + Link: + description: Link to previous page and next page + type: string + '401': + $ref: '#/responses/401' + '403': + $ref: '#/responses/403' + '404': + $ref: '#/responses/404' + '500': + $ref: '#/responses/500' + /schedules/{job_type}/paused: + get: + operationId: getSchedulePaused + description: Get scheduler paused status + tags: + - schedule + parameters: + - $ref: '#/parameters/requestId' + - name: job_type + in: path + required: true + type: string + description: The type of the job. 'all' stands for all job types, current only support query with all + responses: + '200': + description: Get scheduler status successfully. + schema: + type: object + $ref: '#/definitions/SchedulerStatus' + '401': + $ref: '#/responses/401' + '403': + $ref: '#/responses/403' + '404': + $ref: '#/responses/404' + '500': + $ref: '#/responses/500' /ping: get: operationId: getPing @@ -6238,6 +6352,8 @@ definitions: type: string format: date-time description: The creation time of the repository + x-nullable: true + x-omitempty: true update_time: type: string format: date-time @@ -9329,6 +9445,8 @@ definitions: type: string format: date-time description: The start time of the worker + x-nullable: true + x-omitempty: true args: type: string description: The args of the worker @@ -9338,4 +9456,61 @@ definitions: checkin_at: type: string format: date-time - description: The checkin time of the running job in the worker \ No newline at end of file + description: The checkin time of the worker + x-nullable: true + x-omitempty: true + ActionRequest: + type: object + description: The request to stop, pause or resume + properties: + action: + type: string + description: The action of the request, should be stop, pause or resume + enum: + - stop + - pause + - resume + JobQueue: + type: object + description: the job queue info + properties: + job_type: + type: string + description: The type of the job queue + count: + type: integer + description: The count of jobs in the job queue + latency: + type: integer + description: The latency the job queue (seconds) + paused: + type: boolean + description: The paused status of the job queue + x-omitempty: false + ScheduleTask: + type: object + description: the schedule task info + properties: + id: + type: integer + description: the id of the Schedule task + vendor_type: + type: string + description: the vendor type of the current schedule task + vendor_id: + type: integer + description: the vendor id of the current task + extra_attrs: + type: string + description: the extra attributes + creation_time: + type: string + format: date-time + SchedulerStatus: + type: object + description: the scheduler status + properties: + paused: + type: boolean + description: if the scheduler is paused + x-omitempty: false \ No newline at end of file diff --git a/make/migrations/postgresql/0100_2.7.0_schema.up.sql b/make/migrations/postgresql/0100_2.7.0_schema.up.sql index f70551519..377ab11b1 100644 --- a/make/migrations/postgresql/0100_2.7.0_schema.up.sql +++ b/make/migrations/postgresql/0100_2.7.0_schema.up.sql @@ -1 +1,9 @@ -ALTER TABLE replication_policy ADD COLUMN IF NOT EXISTS copy_by_chunk boolean; \ No newline at end of file +ALTER TABLE replication_policy ADD COLUMN IF NOT EXISTS copy_by_chunk boolean; + +CREATE TABLE IF NOT EXISTS job_queue_status ( + id SERIAL NOT NULL PRIMARY KEY, + job_type varchar(256) NOT NULL, + paused boolean NOT NULL DEFAULT false, + update_time timestamp with time zone NOT NULL DEFAULT CURRENT_TIMESTAMP, + UNIQUE ("job_type") +); diff --git a/src/controller/jobmonitor/monitor.go b/src/controller/jobmonitor/monitor.go index c3ff7d580..1c48de400 100644 --- a/src/controller/jobmonitor/monitor.go +++ b/src/controller/jobmonitor/monitor.go @@ -21,113 +21,83 @@ import ( "time" "github.com/goharbor/harbor/src/lib/orm" + "github.com/goharbor/harbor/src/pkg/queuestatus" "github.com/goharbor/harbor/src/lib/log" "github.com/gocraft/work" "github.com/goharbor/harbor/src/common/job" - "github.com/goharbor/harbor/src/lib/errors" "github.com/goharbor/harbor/src/lib/q" libRedis "github.com/goharbor/harbor/src/lib/redis" jm "github.com/goharbor/harbor/src/pkg/jobmonitor" - "github.com/goharbor/harbor/src/pkg/scheduler" "github.com/goharbor/harbor/src/pkg/task" ) -// All the jobs in the pool, or all pools -const All = "all" +const ( + all = "all" +) // Ctl the controller instance of the worker pool controller var Ctl = NewMonitorController() +var skippedJobTypes = []string{ + "DEMO", + "IMAGE_REPLICATE", + "IMAGE_SCAN_ALL", + "IMAGE_GC", +} + // MonitorController defines the worker pool operations type MonitorController interface { // ListPools lists the worker pools ListPools(ctx context.Context) ([]*jm.WorkerPool, error) // ListWorkers lists the workers in the pool ListWorkers(ctx context.Context, poolID string) ([]*jm.Worker, error) + // StopRunningJob stop the running job - StopRunningJob(ctx context.Context, jobID string) error + StopRunningJobs(ctx context.Context, jobID string) error + // StopPendingJobs stop the pending jobs + StopPendingJobs(ctx context.Context, jobType string) error + + // ListQueues lists job queues + ListQueues(ctx context.Context) ([]*jm.Queue, error) + // PauseJobQueues suspend the job queue by type + PauseJobQueues(ctx context.Context, jobType string) error + // ResumeJobQueues resume the job queue by type + ResumeJobQueues(ctx context.Context, jobType string) error } type monitorController struct { - poolManager jm.PoolManager - workerManager jm.WorkerManager - taskManager task.Manager - sch scheduler.Scheduler - monitorClient func() (jm.JobServiceMonitorClient, error) + poolManager jm.PoolManager + workerManager jm.WorkerManager + taskManager task.Manager + queueManager jm.QueueManager + queueStatusManager queuestatus.Manager + monitorClient func() (jm.JobServiceMonitorClient, error) + jobServiceRedisClient func() (jm.RedisClient, error) } // NewMonitorController ... func NewMonitorController() MonitorController { return &monitorController{ - poolManager: jm.NewPoolManager(), - workerManager: jm.NewWorkerManager(), - taskManager: task.NewManager(), - monitorClient: jobServiceMonitorClient, + poolManager: jm.NewPoolManager(), + workerManager: jm.NewWorkerManager(), + taskManager: task.NewManager(), + queueManager: jm.NewQueueClient(), + queueStatusManager: queuestatus.Mgr, + monitorClient: jobServiceMonitorClient, + jobServiceRedisClient: jm.JobServiceRedisClient, } } -func (w *monitorController) StopRunningJob(ctx context.Context, jobID string) error { - if strings.EqualFold(jobID, All) { - allRunningJobs, err := w.allRunningJobs(ctx) - if err != nil { - log.Errorf("failed to get all running jobs: %v", err) - return err - } - for _, jobID := range allRunningJobs { - if err := w.stopJob(ctx, jobID); err != nil { - log.Errorf("failed to stop running job %s: %v", jobID, err) - return err - } - } - return nil - } - return w.stopJob(ctx, jobID) -} - -func (w *monitorController) stopJob(ctx context.Context, jobID string) error { - tasks, err := w.taskManager.List(ctx, &q.Query{Keywords: q.KeyWords{"job_id": jobID}}) - if err != nil { - return err - } - if len(tasks) == 0 { - return errors.BadRequestError(nil).WithMessage("job %s not found", jobID) - } - if len(tasks) != 1 { - return fmt.Errorf("there are more than one task with the same job ID") - } - // use local transaction to avoid rollback batch success tasks to previous state when one fail - if ctx == nil { - log.Debug("context is nil, skip stop operation") - return nil - } - return orm.WithTransaction(func(ctx context.Context) error { - return w.taskManager.Stop(ctx, tasks[0].ID) - })(orm.SetTransactionOpNameToContext(ctx, "tx-stop-job")) -} - -func (w *monitorController) allRunningJobs(ctx context.Context) ([]string, error) { - jobIDs := make([]string, 0) - wks, err := w.ListWorkers(ctx, All) - if err != nil { - log.Errorf("failed to list workers: %v", err) - return nil, err - } - for _, wk := range wks { - jobIDs = append(jobIDs, wk.JobID) - } - return jobIDs, nil -} - func jobServiceMonitorClient() (jm.JobServiceMonitorClient, error) { cfg, err := job.GlobalClient.GetJobServiceConfig() if err != nil { return nil, err } config := cfg.RedisPoolConfig - pool, err := libRedis.GetRedisPool("JobService", config.RedisURL, &libRedis.PoolParam{ + pool, err := libRedis.GetRedisPool(jm.JobServicePool, config.RedisURL, &libRedis.PoolParam{ PoolMaxIdle: 0, PoolIdleTimeout: time.Duration(config.IdleTimeoutSecond) * time.Second, }) @@ -153,3 +123,235 @@ func (w *monitorController) ListPools(ctx context.Context) ([]*jm.WorkerPool, er } return w.poolManager.List(ctx, mClient) } + +func (w *monitorController) StopRunningJobs(ctx context.Context, jobID string) error { + if !strings.EqualFold(jobID, all) { + return w.stopJob(ctx, jobID) + } + allRunningJobs, err := w.allRunningJobs(ctx) + if err != nil { + log.Errorf("failed to get all running jobs: %v", err) + return err + } + for _, jobID := range allRunningJobs { + if err := w.stopJob(ctx, jobID); err != nil { + log.Errorf("failed to stop running job %s: %v", jobID, err) + return err + } + } + return nil +} + +func (w *monitorController) stopJob(ctx context.Context, jobID string) error { + tasks, err := w.taskManager.List(ctx, &q.Query{Keywords: q.KeyWords{"job_id": jobID}}) + if err != nil { + return err + } + if len(tasks) == 0 { + // the job is not found + log.Infof("job %s not found, maybe the job is already complete", jobID) + return nil + } + if len(tasks) != 1 { + return fmt.Errorf("there are more than one task with the same job ID") + } + // use local transaction to avoid rollback batch success tasks to previous state when one fail + if ctx == nil { + log.Debug("context is nil, skip stop operation") + return nil + } + return orm.WithTransaction(func(ctx context.Context) error { + return w.taskManager.Stop(ctx, tasks[0].ID) + })(orm.SetTransactionOpNameToContext(ctx, "tx-stop-job")) +} + +func (w *monitorController) allRunningJobs(ctx context.Context) ([]string, error) { + jobIDs := make([]string, 0) + wks, err := w.ListWorkers(ctx, all) + if err != nil { + log.Errorf("failed to list workers: %v", err) + return nil, err + } + for _, wk := range wks { + jobIDs = append(jobIDs, wk.JobID) + } + return jobIDs, nil +} + +func (w *monitorController) StopPendingJobs(ctx context.Context, jobType string) error { + redisClient, err := w.jobServiceRedisClient() + if err != nil { + return err + } + if !strings.EqualFold(jobType, all) { + return w.stopPendingJob(ctx, jobType) + } + + jobTypes, err := redisClient.AllJobTypes(ctx) + if err != nil { + return err + } + for _, jobType := range jobTypes { + if err := w.stopPendingJob(ctx, jobType); err != nil { + log.Warningf("failed to stop pending jobs of type %s: %v", jobType, err) + continue + } + } + return nil +} + +func (w *monitorController) stopPendingJob(ctx context.Context, jobType string) error { + redisClient, err := w.jobServiceRedisClient() + if err != nil { + return err + } + jobIDs, err := redisClient.StopPendingJobs(ctx, jobType) + if err != nil { + return err + } + return w.updateJobStatusInTask(ctx, jobIDs, "Stopped") +} + +func (w *monitorController) updateJobStatusInTask(ctx context.Context, jobIDs []string, status string) error { + if ctx == nil { + log.Debug("context is nil, update job status in task") + return nil + } + for _, jobID := range jobIDs { + ts, err := w.taskManager.List(ctx, q.New(q.KeyWords{"job_id": jobID})) + if err != nil { + return err + } + if len(ts) == 0 { + continue + } + ts[0].Status = status + // use local transaction to avoid rollback batch success tasks to previous state when one fail + if err := orm.WithTransaction(func(ctx context.Context) error { + return w.taskManager.Update(ctx, ts[0], "Status") + })(orm.SetTransactionOpNameToContext(ctx, "tx-update-task")); err != nil { + return err + } + } + return nil +} + +func (w *monitorController) ListQueues(ctx context.Context) ([]*jm.Queue, error) { + mClient, err := w.monitorClient() + if err != nil { + return nil, err + } + qs, err := mClient.Queues() + if err != nil { + return nil, err + } + // the original queue doesn't include the paused status, fetch it from the redis + statusMap, err := w.queueStatusManager.AllJobTypeStatus(ctx) + if err != nil { + return nil, err + } + result := make([]*jm.Queue, 0) + for _, queue := range qs { + if skippedUnusedJobType(queue.JobName) { + continue + } + result = append(result, &jm.Queue{ + JobType: queue.JobName, + Count: queue.Count, + Latency: queue.Latency, + Paused: statusMap[queue.JobName], + }) + } + return result, nil +} + +func skippedUnusedJobType(jobType string) bool { + for _, t := range skippedJobTypes { + if jobType == t { + return true + } + } + return false +} + +func (w *monitorController) PauseJobQueues(ctx context.Context, jobType string) error { + redisClient, err := w.jobServiceRedisClient() + if err != nil { + return err + } + if !strings.EqualFold(jobType, all) { + return w.pauseQueue(ctx, jobType) + } + + jobTypes, err := redisClient.AllJobTypes(ctx) + if err != nil { + return err + } + for _, t := range jobTypes { + if err := w.pauseQueue(ctx, t); err != nil { + return err + } + } + return nil +} + +func (w *monitorController) pauseQueue(ctx context.Context, jobType string) error { + if ctx == nil { + log.Debug("context is nil, skip pause queue") + return nil + } + redisClient, err := w.jobServiceRedisClient() + if err != nil { + return fmt.Errorf("failed to pause queue %v, error: %v", jobType, err) + } + err = redisClient.PauseJob(ctx, jobType) + if err != nil { + return fmt.Errorf("failed to pause queue %v, error: %v", jobType, err) + } + if err := orm.WithTransaction(func(ctx context.Context) error { + return w.queueStatusManager.UpdateStatus(ctx, jobType, true) + })(orm.SetTransactionOpNameToContext(ctx, "tx-update-queue-status")); err != nil { + return fmt.Errorf("failed to pause queue %v, error: %v", jobType, err) + } + return nil +} + +func (w *monitorController) ResumeJobQueues(ctx context.Context, jobType string) error { + redisClient, err := w.jobServiceRedisClient() + if err != nil { + return err + } + if !strings.EqualFold(jobType, all) { + return w.resumeQueue(ctx, jobType) + } + jobTypes, err := redisClient.AllJobTypes(ctx) + if err != nil { + return err + } + for _, jobType := range jobTypes { + if err := w.resumeQueue(ctx, jobType); err != nil { + return err + } + } + return nil +} + +func (w *monitorController) resumeQueue(ctx context.Context, jobType string) error { + if ctx == nil { + log.Debug("context is nil, skip resume queue") + return nil + } + redisClient, err := w.jobServiceRedisClient() + if err != nil { + return fmt.Errorf("failed to resume queue %v, error: %v", jobType, err) + } + if err := redisClient.UnpauseJob(ctx, jobType); err != nil { + return fmt.Errorf("failed to resume queue %v, error: %v", jobType, err) + } + if err := orm.WithTransaction(func(ctx context.Context) error { + return w.queueStatusManager.UpdateStatus(ctx, jobType, false) + })(orm.SetTransactionOpNameToContext(ctx, "tx-update-queue-status")); err != nil { + return fmt.Errorf("failed to resume queue %v, error: %v", jobType, err) + } + return nil +} diff --git a/src/controller/jobmonitor/monitor_test.go b/src/controller/jobmonitor/monitor_test.go index 5ffb44e04..ef38b54be 100644 --- a/src/controller/jobmonitor/monitor_test.go +++ b/src/controller/jobmonitor/monitor_test.go @@ -18,6 +18,10 @@ import ( "testing" "time" + "github.com/goharbor/harbor/src/pkg/queuestatus" + "github.com/goharbor/harbor/src/pkg/scheduler" + queueStatusMock "github.com/goharbor/harbor/src/testing/pkg/queuestatus" + "github.com/gocraft/work" "github.com/stretchr/testify/suite" @@ -30,11 +34,14 @@ import ( type JobServiceMonitorTestSuite struct { suite.Suite - jmClient jobmonitor.JobServiceMonitorClient - poolManager jobmonitor.PoolManager - workerManager jobmonitor.WorkerManager - monitController MonitorController - taskManager task.Manager + jmClient jobmonitor.JobServiceMonitorClient + poolManager jobmonitor.PoolManager + workerManager jobmonitor.WorkerManager + monitController MonitorController + taskManager task.Manager + queueStatusManager queuestatus.Manager + sch scheduler.Scheduler + redisClient jobmonitor.RedisClient } func (s *JobServiceMonitorTestSuite) SetupSuite() { @@ -42,13 +49,19 @@ func (s *JobServiceMonitorTestSuite) SetupSuite() { s.poolManager = &monitorMock.PoolManager{} s.workerManager = jobmonitor.NewWorkerManager() s.taskManager = &taskMock.Manager{} + s.redisClient = &monitorMock.RedisClient{} + s.queueStatusManager = &queueStatusMock.Manager{} s.monitController = &monitorController{ - poolManager: s.poolManager, - workerManager: s.workerManager, - taskManager: s.taskManager, + poolManager: s.poolManager, + workerManager: s.workerManager, + taskManager: s.taskManager, + queueStatusManager: s.queueStatusManager, monitorClient: func() (jobmonitor.JobServiceMonitorClient, error) { return s.jmClient, nil }, + jobServiceRedisClient: func() (jobmonitor.RedisClient, error) { + return s.redisClient, nil + }, } } @@ -87,7 +100,27 @@ func (s *JobServiceMonitorTestSuite) TestStopRunningJob() { }, nil) mock.OnAnything(s.taskManager, "List").Return([]*task.Task{{ID: 1, VendorType: "GARBAGE_COLLECTION"}}, nil) mock.OnAnything(s.taskManager, "Stop").Return(nil) - err := s.monitController.StopRunningJob(nil, "1") + err := s.monitController.StopRunningJobs(nil, "1") + s.Assert().Nil(err) +} + +func (s *JobServiceMonitorTestSuite) TestListQueue() { + mock.OnAnything(s.jmClient, "Queues").Return([]*work.Queue{ + {JobName: "GARBAGE_COLLECTION", Count: 100, Latency: 10000}}, nil) + mock.OnAnything(s.queueStatusManager, "AllJobTypeStatus").Return(map[string]bool{"GARBAGE_COLLECTION": false}, nil).Once() + queues, err := s.monitController.ListQueues(nil) + s.Assert().Nil(err) + s.Assert().Equal(1, len(queues)) + s.Assert().Equal("GARBAGE_COLLECTION", queues[0].JobType) + s.Assert().False(queues[0].Paused) +} + +func (s *JobServiceMonitorTestSuite) TestPauseJob() { + mock.OnAnything(s.redisClient, "PauseJob").Return(nil).Once() + err := s.monitController.PauseJobQueues(nil, "GARBAGE_COLLECTION") + s.Assert().Nil(err) + mock.OnAnything(s.redisClient, "UnpauseJob").Return(nil).Once() + err = s.monitController.ResumeJobQueues(nil, "GARBAGE_COLLECTION") s.Assert().Nil(err) } diff --git a/src/controller/jobservice/schedule.go b/src/controller/jobservice/schedule.go index e383deb18..61db62808 100644 --- a/src/controller/jobservice/schedule.go +++ b/src/controller/jobservice/schedule.go @@ -19,12 +19,15 @@ import ( "github.com/goharbor/harbor/src/lib/errors" "github.com/goharbor/harbor/src/lib/q" + jm "github.com/goharbor/harbor/src/pkg/jobmonitor" + "github.com/goharbor/harbor/src/pkg/queuestatus" "github.com/goharbor/harbor/src/pkg/scheduler" ) var ( // SchedulerCtl ... - SchedulerCtl = NewSchedulerCtrl() + SchedulerCtl = NewSchedulerCtrl() + jobTypeSchedule = "SCHEDULER" ) // SchedulerController interface to manage schedule @@ -35,17 +38,30 @@ type SchedulerController interface { Create(ctx context.Context, vendorType, cronType, cron, callbackFuncName string, policy interface{}, extrasParam map[string]interface{}) (int64, error) // Delete the schedule Delete(ctx context.Context, vendorType string) error + // List lists schedules + List(ctx context.Context, query *q.Query) ([]*scheduler.Schedule, error) + // Count counts schedules + Count(ctx context.Context, query *q.Query) (int64, error) + // Paused get the job scheduler is paused or not + Paused(ctx context.Context) (bool, error) } type schedulerController struct { - schedulerMgr scheduler.Scheduler + schedulerMgr scheduler.Scheduler + jobServiceRedisClient func() (jm.RedisClient, error) + queueStatusMgr queuestatus.Manager } // NewSchedulerCtrl ... func NewSchedulerCtrl() SchedulerController { - return &schedulerController{schedulerMgr: scheduler.New()} + return &schedulerController{ + schedulerMgr: scheduler.New(), + jobServiceRedisClient: jm.JobServiceRedisClient, + queueStatusMgr: queuestatus.Mgr, + } } -func (s schedulerController) Get(ctx context.Context, vendorType string) (*scheduler.Schedule, error) { + +func (s *schedulerController) Get(ctx context.Context, vendorType string) (*scheduler.Schedule, error) { sch, err := s.schedulerMgr.ListSchedules(ctx, q.New(q.KeyWords{"VendorType": vendorType})) if err != nil { return nil, err @@ -59,11 +75,29 @@ func (s schedulerController) Get(ctx context.Context, vendorType string) (*sched return sch[0], nil } -func (s schedulerController) Create(ctx context.Context, vendorType, cronType, cron, callbackFuncName string, +func (s *schedulerController) Create(ctx context.Context, vendorType, cronType, cron, callbackFuncName string, policy interface{}, extrasParam map[string]interface{}) (int64, error) { return s.schedulerMgr.Schedule(ctx, vendorType, -1, cronType, cron, callbackFuncName, policy, extrasParam) } -func (s schedulerController) Delete(ctx context.Context, vendorType string) error { +func (s *schedulerController) Delete(ctx context.Context, vendorType string) error { return s.schedulerMgr.UnScheduleByVendor(ctx, vendorType, -1) } + +func (s *schedulerController) List(ctx context.Context, query *q.Query) ([]*scheduler.Schedule, error) { + return s.schedulerMgr.ListSchedules(ctx, query) +} + +func (s *schedulerController) Count(ctx context.Context, query *q.Query) (int64, error) { + return s.schedulerMgr.CountSchedules(ctx, query) +} + +func (s *schedulerController) Paused(ctx context.Context) (bool, error) { + // Scheduler is a type of job type, it will create a concrete job type in the scheduler task + // if it is paused, all scheduled tasks are paused + statusMap, err := s.queueStatusMgr.AllJobTypeStatus(ctx) + if err != nil { + return false, err + } + return statusMap[jobTypeSchedule], nil +} diff --git a/src/controller/jobservice/schedule_test.go b/src/controller/jobservice/schedule_test.go index 3d3b5b7ef..85e6dd91c 100644 --- a/src/controller/jobservice/schedule_test.go +++ b/src/controller/jobservice/schedule_test.go @@ -15,26 +15,32 @@ package jobservice import ( + "context" "testing" "github.com/stretchr/testify/suite" "github.com/goharbor/harbor/src/controller/purge" + "github.com/goharbor/harbor/src/pkg/queuestatus" "github.com/goharbor/harbor/src/pkg/scheduler" "github.com/goharbor/harbor/src/testing/mock" + queueStatusMock "github.com/goharbor/harbor/src/testing/pkg/queuestatus" testingScheduler "github.com/goharbor/harbor/src/testing/pkg/scheduler" ) type ScheduleTestSuite struct { suite.Suite - scheduler *testingScheduler.Scheduler - ctl SchedulerController + scheduler *testingScheduler.Scheduler + ctl SchedulerController + queueStatusMgr queuestatus.Manager } func (s *ScheduleTestSuite) SetupSuite() { s.scheduler = &testingScheduler.Scheduler{} + s.queueStatusMgr = &queueStatusMock.Manager{} s.ctl = &schedulerController{ - schedulerMgr: s.scheduler, + schedulerMgr: s.scheduler, + queueStatusMgr: s.queueStatusMgr, } } @@ -60,13 +66,37 @@ func (s *ScheduleTestSuite) TestGetSchedule() { ID: 1, VendorType: purge.VendorType, }, - }, nil) + }, nil).Once() schedule, err := s.ctl.Get(nil, purge.VendorType) s.Nil(err) s.Equal(purge.VendorType, schedule.VendorType) } +func (s *ScheduleTestSuite) TestListSchedule() { + mock.OnAnything(s.scheduler, "ListSchedules").Return([]*scheduler.Schedule{ + {ID: 1, VendorType: "GARBAGE_COLLECTION", CRON: "0 0 0 * * *", ExtraAttrs: map[string]interface{}{"args": "sample args"}}}, nil).Once() + schedules, err := s.scheduler.ListSchedules(nil, nil) + s.Assert().Nil(err) + s.Assert().Equal(1, len(schedules)) + s.Assert().Equal(schedules[0].VendorType, "GARBAGE_COLLECTION") + s.Assert().Equal(schedules[0].ID, int64(1)) +} + +func (s *ScheduleTestSuite) TestSchedulerStatus() { + mock.OnAnything(s.queueStatusMgr, "AllJobTypeStatus").Return(map[string]bool{"SCHEDULER": true}, nil).Once() + result, err := s.ctl.Paused(context.Background()) + s.Assert().Nil(err) + s.Assert().True(result) +} + +func (s *ScheduleTestSuite) TestCountSchedule() { + mock.OnAnything(s.scheduler, "CountSchedules").Return(int64(1), nil).Once() + count, err := s.ctl.Count(context.Background(), nil) + s.Assert().Nil(err) + s.Assert().Equal(int64(1), count) +} + func TestScheduleTestSuite(t *testing.T) { suite.Run(t, &ScheduleTestSuite{}) } diff --git a/src/controller/retention/controller_test.go b/src/controller/retention/controller_test.go index a9421b505..07307b181 100644 --- a/src/controller/retention/controller_test.go +++ b/src/controller/retention/controller_test.go @@ -310,6 +310,10 @@ func (s *ControllerTestSuite) TestExecution() { type fakeRetentionScheduler struct { } +func (f *fakeRetentionScheduler) CountSchedules(ctx context.Context, query *q.Query) (int64, error) { + panic("implement me") +} + func (f *fakeRetentionScheduler) Schedule(ctx context.Context, vendorType string, vendorID int64, cronType string, cron string, callbackFuncName string, params interface{}, extras map[string]interface{}) (int64, error) { return 111, nil } diff --git a/src/jobservice/runtime/bootstrap.go b/src/jobservice/runtime/bootstrap.go index 09c2125b0..6d5e3d99b 100644 --- a/src/jobservice/runtime/bootstrap.go +++ b/src/jobservice/runtime/bootstrap.go @@ -54,6 +54,7 @@ import ( "github.com/goharbor/harbor/src/lib/metric" redislib "github.com/goharbor/harbor/src/lib/redis" "github.com/goharbor/harbor/src/pkg/p2p/preheat" + "github.com/goharbor/harbor/src/pkg/queuestatus" "github.com/goharbor/harbor/src/pkg/retention" "github.com/goharbor/harbor/src/pkg/scan" "github.com/goharbor/harbor/src/pkg/scheduler" @@ -199,6 +200,8 @@ func (bs *Bootstrap) LoadAndRun(ctx context.Context, cancel context.CancelFunc) UseCoreScheduler(scheduler.Sched). UseCoreExecutionManager(task.ExecMgr). UseCoreTaskManager(task.Mgr). + UseQueueStatusManager(queuestatus.Mgr). + UseMonitorRedisClient(cfg.PoolConfig.RedisPoolCfg). WithPolicyLoader(func() ([]*period.Policy, error) { conn := redisPool.Get() defer conn.Close() diff --git a/src/jobservice/sync/schedule.go b/src/jobservice/sync/schedule.go index e3267bcba..5801369c2 100644 --- a/src/jobservice/sync/schedule.go +++ b/src/jobservice/sync/schedule.go @@ -21,14 +21,18 @@ import ( o "github.com/beego/beego/orm" + "github.com/goharbor/harbor/src/jobservice/config" "github.com/goharbor/harbor/src/jobservice/env" "github.com/goharbor/harbor/src/jobservice/job" "github.com/goharbor/harbor/src/jobservice/logger" "github.com/goharbor/harbor/src/jobservice/mgt" "github.com/goharbor/harbor/src/jobservice/period" "github.com/goharbor/harbor/src/lib/errors" + "github.com/goharbor/harbor/src/lib/log" "github.com/goharbor/harbor/src/lib/orm" "github.com/goharbor/harbor/src/lib/q" + "github.com/goharbor/harbor/src/pkg/jobmonitor" + "github.com/goharbor/harbor/src/pkg/queuestatus" "github.com/goharbor/harbor/src/pkg/scheduler" "github.com/goharbor/harbor/src/pkg/task" ) @@ -60,6 +64,10 @@ type Worker struct { coreTaskManager task.Manager // Loader for loading polices from the js store. policyLoader PolicyLoader + // queueStatusManager + queueStatusManager queuestatus.Manager + // monitorRedisClient + monitorRedisClient jobmonitor.RedisClient } // New sync worker. @@ -111,6 +119,22 @@ func (w *Worker) UseCoreTaskManager(taskManager task.Manager) *Worker { return w } +// UseQueueStatusManager refers the queue status manager. +func (w *Worker) UseQueueStatusManager(queueStatusManager queuestatus.Manager) *Worker { + w.queueStatusManager = queueStatusManager + return w +} + +// UseMonitorRedisClient refers the monitor redis client. +func (w *Worker) UseMonitorRedisClient(redisConfig *config.RedisPoolConfig) *Worker { + client, err := jobmonitor.NewRedisClient(redisConfig) + if err != nil { + log.Errorf("failed to create redis client for job monitor: %v", err) + } + w.monitorRedisClient = client + return w +} + // WithPolicyLoader determines the policy loader func. func (w *Worker) WithPolicyLoader(loader PolicyLoader) *Worker { w.policyLoader = loader @@ -165,6 +189,9 @@ func (w *Worker) Run(ctx context.Context) error { // Start sync schedules. logger.Infof("Start to sync schedules in database to jobservice: round[%d].", w.round) + // sync queue status from db to redis + w.syncQueueStatus(ctx) + // Get all the schedules from the database first. // Use the default scheduler. schedules, err := w.coreScheduler.ListSchedules(ctx, &q.Query{}) @@ -367,3 +394,27 @@ func (w *Worker) getTask(ctx context.Context, schedule *scheduler.Schedule) (*ta return tasks[0], nil } + +func (w *Worker) syncQueueStatus(ctx context.Context) { + queues, err := w.queueStatusManager.List(ctx) + if err != nil { + log.Errorf("failed to sync queue status because of %v", err) + return + } + if len(queues) == 0 { + log.Info("no queue status need to sync") + return + } + for _, queue := range queues { + // update the queue status in redis + if queue.Paused { + if err = w.monitorRedisClient.PauseJob(ctx, queue.JobType); err != nil { + log.Errorf("failed to pause job %s because of %v", queue.JobType, err) + } + } else { + if err = w.monitorRedisClient.UnpauseJob(ctx, queue.JobType); err != nil { + log.Errorf("failed to resume job %s because of %v", queue.JobType, err) + } + } + } +} diff --git a/src/jobservice/sync/schedule_test.go b/src/jobservice/sync/schedule_test.go index b534955b5..fae1d6611 100644 --- a/src/jobservice/sync/schedule_test.go +++ b/src/jobservice/sync/schedule_test.go @@ -26,9 +26,12 @@ import ( "github.com/goharbor/harbor/src/jobservice/mgt" "github.com/goharbor/harbor/src/jobservice/period" "github.com/goharbor/harbor/src/lib/q" + "github.com/goharbor/harbor/src/pkg/queuestatus/model" "github.com/goharbor/harbor/src/pkg/scheduler" "github.com/goharbor/harbor/src/pkg/task" "github.com/goharbor/harbor/src/testing/mock" + redisClientMock "github.com/goharbor/harbor/src/testing/pkg/jobmonitor" + mockQueueStatus "github.com/goharbor/harbor/src/testing/pkg/queuestatus" ts "github.com/goharbor/harbor/src/testing/pkg/scheduler" tt "github.com/goharbor/harbor/src/testing/pkg/task" ) @@ -36,8 +39,9 @@ import ( // WorkerTestSuite is test suite for testing sync.Worker. type WorkerTestSuite struct { suite.Suite - - worker *Worker + queueStatusManager *mockQueueStatus.Manager + monitorRedisClient *redisClientMock.RedisClient + worker *Worker } // TestWorker is the entry method of WorkerTestSuite. @@ -111,7 +115,8 @@ func (suite *WorkerTestSuite) SetupSuite() { mmm := &mgt.MockManager{} mmm.On("SaveJob", mock.Anything).Return(nil) - + suite.queueStatusManager = &mockQueueStatus.Manager{} + mock.OnAnything(suite.queueStatusManager, "List").Return([]*model.JobQueueStatus{{JobType: "GARBAGE_COLLECTION", Paused: true}}, nil) suite.worker = New(3). WithContext(&env.Context{ SystemContext: sysContext, @@ -122,8 +127,12 @@ func (suite *WorkerTestSuite) SetupSuite() { UseCoreTaskManager(ttm). UseScheduler(pms). UseManager(mmm). + UseQueueStatusManager(suite.queueStatusManager). WithCoreInternalAddr("http://core:8080"). WithPolicyLoader(getPolicies) + suite.monitorRedisClient = &redisClientMock.RedisClient{} + mock.OnAnything(suite.monitorRedisClient, "PauseJob").Return(nil) + suite.worker.monitorRedisClient = suite.monitorRedisClient } // TestStart test Start(). @@ -135,5 +144,6 @@ func (suite *WorkerTestSuite) TestStart() { // TestRun test Run(). func (suite *WorkerTestSuite) TestRun() { err := suite.worker.Run(context.TODO()) + suite.NoError(err, "run worker") } diff --git a/src/pkg/jobmonitor/pool.go b/src/pkg/jobmonitor/pool.go index 335cc3e44..c13096180 100644 --- a/src/pkg/jobmonitor/pool.go +++ b/src/pkg/jobmonitor/pool.go @@ -20,11 +20,7 @@ import ( "github.com/gocraft/work" ) -// PoolManager the interface to retrieve job service monitor metrics -type PoolManager interface { - // List retrieves pools information - List(ctx context.Context, monitorClient JobServiceMonitorClient) ([]*WorkerPool, error) -} +var _ JobServiceMonitorClient = (*work.Client)(nil) // JobServiceMonitorClient the interface to retrieve job service monitor metrics type JobServiceMonitorClient interface { @@ -32,6 +28,14 @@ type JobServiceMonitorClient interface { WorkerPoolHeartbeats() ([]*work.WorkerPoolHeartbeat, error) // WorkerObservations retrieves worker observations WorkerObservations() ([]*work.WorkerObservation, error) + // Queues retrieves the job queue information + Queues() ([]*work.Queue, error) +} + +// PoolManager the interface to retrieve job service monitor metrics +type PoolManager interface { + // List retrieves pools information + List(ctx context.Context, monitorClient JobServiceMonitorClient) ([]*WorkerPool, error) } type poolManager struct{} diff --git a/src/pkg/jobmonitor/queue.go b/src/pkg/jobmonitor/queue.go new file mode 100644 index 000000000..806ebe57a --- /dev/null +++ b/src/pkg/jobmonitor/queue.go @@ -0,0 +1,47 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package jobmonitor + +import ( + "context" +) + +// QueueManager defines the operation related to job service queue +type QueueManager interface { + List(ctx context.Context, monitClient JobServiceMonitorClient) ([]*Queue, error) +} + +type queueManagerImpl struct{} + +// NewQueueClient ... +func NewQueueClient() QueueManager { + return &queueManagerImpl{} +} + +func (w *queueManagerImpl) List(ctx context.Context, monitClient JobServiceMonitorClient) ([]*Queue, error) { + resultQueues := make([]*Queue, 0) + queues, err := monitClient.Queues() + if err != nil { + return nil, err + } + for _, q := range queues { + resultQueues = append(resultQueues, &Queue{ + JobType: q.JobName, + Count: q.Count, + Latency: q.Latency, + }) + } + return resultQueues, nil +} diff --git a/src/pkg/jobmonitor/redis.go b/src/pkg/jobmonitor/redis.go new file mode 100644 index 000000000..79589fd52 --- /dev/null +++ b/src/pkg/jobmonitor/redis.go @@ -0,0 +1,120 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package jobmonitor + +import ( + "context" + "fmt" + "time" + + "github.com/gomodule/redigo/redis" + + "github.com/goharbor/harbor/src/common/job" + "github.com/goharbor/harbor/src/jobservice/config" + "github.com/goharbor/harbor/src/lib/log" + libRedis "github.com/goharbor/harbor/src/lib/redis" +) + +// JobServicePool job service pool name +const JobServicePool = "JobService" + +// RedisClient defines the job service operations related to redis +type RedisClient interface { + // AllJobTypes returns all the job types registered in the job service + AllJobTypes(ctx context.Context) ([]string, error) + // PauseJob pause the execution of the specified type job, except the running job + PauseJob(ctx context.Context, jobName string) error + // UnpauseJob resume the execution of the specified type job + UnpauseJob(ctx context.Context, jobName string) error + // StopPendingJobs stop the pending jobs of the specified type, and remove the jobs from the waiting queue + StopPendingJobs(ctx context.Context, jobType string) (jobIDs []string, err error) +} + +type redisClientImpl struct { + redisPool *redis.Pool + namespace string +} + +// NewRedisClient create a redis client +func NewRedisClient(config *config.RedisPoolConfig) (RedisClient, error) { + pool, err := redisPool(config) + if err != nil { + return nil, err + } + return &redisClientImpl{pool, config.Namespace}, nil +} + +func redisPool(config *config.RedisPoolConfig) (*redis.Pool, error) { + return libRedis.GetRedisPool(JobServicePool, config.RedisURL, &libRedis.PoolParam{ + PoolMaxIdle: 0, + PoolIdleTimeout: time.Duration(config.IdleTimeoutSecond) * time.Second, + }) +} + +func (r *redisClientImpl) StopPendingJobs(ctx context.Context, jobType string) (jobIDs []string, err error) { + jobIDs = []string{} + log.Infof("job queue cleaned up %s", jobType) + redisKeyJobQueue := fmt.Sprintf("{%s}:jobs:%v", r.namespace, jobType) + conn := r.redisPool.Get() + defer conn.Close() + jobIDs, err = redis.Strings(conn.Do("LRANGE", redisKeyJobQueue, 0, -1)) + if err != nil { + return []string{}, err + } + log.Infof("updated %d tasks in pending status to stop", len(jobIDs)) + ret, err := redis.Int64(conn.Do("DEL", redisKeyJobQueue)) + if err != nil { + return []string{}, err + } + if ret < 1 { + // no job in queue removed + return []string{}, fmt.Errorf("no job in the queue removed") + } + log.Infof("deleted %d keys in waiting queue for %s", ret, jobType) + return jobIDs, nil +} + +func (r *redisClientImpl) AllJobTypes(ctx context.Context) ([]string, error) { + conn := r.redisPool.Get() + defer conn.Close() + return redis.Strings(conn.Do("SMEMBERS", fmt.Sprintf("{%s}:known_jobs", r.namespace))) +} + +func (r *redisClientImpl) PauseJob(ctx context.Context, jobName string) error { + log.Infof("pause job type:%s", jobName) + redisKeyJobPaused := fmt.Sprintf("{%s}:jobs:%s:paused", r.namespace, jobName) + conn := r.redisPool.Get() + defer conn.Close() + _, err := conn.Do("SET", redisKeyJobPaused, "1") + return err +} + +func (r *redisClientImpl) UnpauseJob(ctx context.Context, jobName string) error { + log.Infof("unpause job %s", jobName) + redisKeyJobPaused := fmt.Sprintf("{%s}:jobs:%s:paused", r.namespace, jobName) + conn := r.redisPool.Get() + defer conn.Close() + _, err := conn.Do("DEL", redisKeyJobPaused) + return err +} + +func JobServiceRedisClient() (RedisClient, error) { + cfg, err := job.GlobalClient.GetJobServiceConfig() + if err != nil { + return nil, err + } + config := cfg.RedisPoolConfig + return NewRedisClient(config) +} diff --git a/src/pkg/queuestatus/dao/dao.go b/src/pkg/queuestatus/dao/dao.go new file mode 100644 index 000000000..41fd00d6f --- /dev/null +++ b/src/pkg/queuestatus/dao/dao.go @@ -0,0 +1,90 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dao + +import ( + "context" + + "github.com/goharbor/harbor/src/lib/errors" + "github.com/goharbor/harbor/src/lib/orm" + "github.com/goharbor/harbor/src/lib/q" + "github.com/goharbor/harbor/src/pkg/queuestatus/model" +) + +// ErrQueueTypeDup ... +var ErrQueueTypeDup = errors.ConflictError(nil).WithMessage("duplicated queue type") + +func init() { + orm.RegisterModel( + new(model.JobQueueStatus), + ) +} + +// DAO the dao for queue status +type DAO interface { + // Query query queue status + Query(ctx context.Context, query *q.Query) ([]*model.JobQueueStatus, error) + // GetByJobType get queue status by JobType + GetByJobType(ctx context.Context, jobType string) (*model.JobQueueStatus, error) + // UpdateStatus update queue status + UpdateStatus(ctx context.Context, jobType string, paused bool) error + // InsertOrUpdate create a queue status or update it if it already exists + InsertOrUpdate(ctx context.Context, queue *model.JobQueueStatus) (int64, error) +} + +type dao struct { +} + +// New create queue status DAO +func New() DAO { + return &dao{} +} + +func (d *dao) Query(ctx context.Context, query *q.Query) ([]*model.JobQueueStatus, error) { + query = q.MustClone(query) + qs, err := orm.QuerySetter(ctx, &model.JobQueueStatus{}, query) + if err != nil { + return nil, err + } + var queueStatusList []*model.JobQueueStatus + if _, err := qs.All(&queueStatusList); err != nil { + return nil, err + } + return queueStatusList, nil +} + +func (d *dao) GetByJobType(ctx context.Context, jobType string) (*model.JobQueueStatus, error) { + queueList, err := d.Query(ctx, q.New(q.KeyWords{"JobType": jobType})) + if err != nil { + return nil, err + } + if len(queueList) > 0 { + return queueList[0], nil + } + return nil, nil +} + +func (d *dao) UpdateStatus(ctx context.Context, jobType string, paused bool) error { + _, err := d.InsertOrUpdate(ctx, &model.JobQueueStatus{JobType: jobType, Paused: paused}) + return err +} + +func (d *dao) InsertOrUpdate(ctx context.Context, queue *model.JobQueueStatus) (int64, error) { + o, err := orm.FromContext(ctx) + if err != nil { + return 0, err + } + return o.InsertOrUpdate(queue, "job_type") +} diff --git a/src/pkg/queuestatus/dao/dao_test.go b/src/pkg/queuestatus/dao/dao_test.go new file mode 100644 index 000000000..5d684d02c --- /dev/null +++ b/src/pkg/queuestatus/dao/dao_test.go @@ -0,0 +1,73 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dao + +import ( + "testing" + + "github.com/stretchr/testify/suite" + + "github.com/goharbor/harbor/src/lib/q" + "github.com/goharbor/harbor/src/pkg/queuestatus/model" + htesting "github.com/goharbor/harbor/src/testing" +) + +type DaoTestSuite struct { + htesting.Suite + dao DAO +} + +func (s *DaoTestSuite) SetupSuite() { + s.Suite.SetupSuite() + s.Suite.ClearTables = []string{"queue_status"} + s.dao = New() +} + +func (s *DaoTestSuite) TestCRUDQueueStatus() { + ctx := s.Context() + jobType := "GARBAGE_COLLECTION" + queueStatus := model.JobQueueStatus{ + JobType: jobType, + } + id, err := s.dao.InsertOrUpdate(ctx, &queueStatus) + s.Nil(err) + s.True(id > 0) + + id2, err := s.dao.InsertOrUpdate(ctx, &queueStatus) + s.Nil(err) + s.Equal(id, id2) + + qs, err2 := s.dao.GetByJobType(ctx, jobType) + s.Nil(err2) + s.Equal("GARBAGE_COLLECTION", qs.JobType) + s.False(qs.Paused) + + err3 := s.dao.UpdateStatus(ctx, jobType, true) + s.Nil(err3) + + qs2, err4 := s.dao.GetByJobType(ctx, jobType) + s.Nil(err4) + s.Equal(jobType, qs2.JobType) + s.True(qs2.Paused) + + qList, err := s.dao.Query(ctx, q.New(q.KeyWords{"job_type": jobType})) + s.Nil(err) + s.Equal(1, len(qList)) + +} + +func TestDaoTestSuite(t *testing.T) { + suite.Run(t, &DaoTestSuite{}) +} diff --git a/src/pkg/queuestatus/manager.go b/src/pkg/queuestatus/manager.go new file mode 100644 index 000000000..79d8c57cd --- /dev/null +++ b/src/pkg/queuestatus/manager.go @@ -0,0 +1,77 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package queuestatus + +import ( + "context" + + "github.com/goharbor/harbor/src/pkg/queuestatus/dao" + "github.com/goharbor/harbor/src/pkg/queuestatus/model" +) + +var ( + // Mgr default user group manager + Mgr = newManager() +) + +// Manager the manager for queue status +type Manager interface { + // List list queue status + List(ctx context.Context) ([]*model.JobQueueStatus, error) + // AllJobTypeStatus get all job type status + AllJobTypeStatus(ctx context.Context) (map[string]bool, error) + // Get get queue status by JobType + Get(ctx context.Context, jobType string) (*model.JobQueueStatus, error) + // UpdateStatus update queue status + UpdateStatus(ctx context.Context, jobType string, paused bool) error + // CreateOrUpdate create a queue status or update it if it already exists + CreateOrUpdate(ctx context.Context, status *model.JobQueueStatus) (int64, error) +} + +type manager struct { + dao dao.DAO +} + +func newManager() Manager { + return &manager{dao: dao.New()} +} + +func (m *manager) List(ctx context.Context) ([]*model.JobQueueStatus, error) { + return m.dao.Query(ctx, nil) +} + +func (m *manager) Get(ctx context.Context, jobType string) (*model.JobQueueStatus, error) { + return m.dao.GetByJobType(ctx, jobType) +} + +func (m *manager) UpdateStatus(ctx context.Context, jobType string, paused bool) error { + return m.dao.UpdateStatus(ctx, jobType, paused) +} + +func (m manager) CreateOrUpdate(ctx context.Context, status *model.JobQueueStatus) (int64, error) { + return m.dao.InsertOrUpdate(ctx, status) +} + +func (m *manager) AllJobTypeStatus(ctx context.Context) (map[string]bool, error) { + statuses, err := m.List(ctx) + if err != nil { + return nil, err + } + result := make(map[string]bool) + for _, status := range statuses { + result[status.JobType] = status.Paused + } + return result, nil +} diff --git a/src/pkg/queuestatus/manager_test.go b/src/pkg/queuestatus/manager_test.go new file mode 100644 index 000000000..fb92daa2c --- /dev/null +++ b/src/pkg/queuestatus/manager_test.go @@ -0,0 +1,52 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package queuestatus + +import ( + "testing" + + "github.com/stretchr/testify/suite" + + "github.com/goharbor/harbor/src/pkg/queuestatus/model" + htesting "github.com/goharbor/harbor/src/testing" +) + +type ManagerTestSuite struct { + htesting.Suite + mgr Manager +} + +func (s *ManagerTestSuite) SetupSuite() { + s.Suite.SetupSuite() + s.Suite.ClearTables = []string{"queue_status"} + s.mgr = newManager() +} + +func (s *ManagerTestSuite) TestAllJobTypeStatus() { + ctx := s.Context() + _, err := s.mgr.CreateOrUpdate(ctx, &model.JobQueueStatus{JobType: "GARBAGE_COLLECTION", Paused: true}) + s.Nil(err) + _, err = s.mgr.CreateOrUpdate(ctx, &model.JobQueueStatus{JobType: "REPLICATION", Paused: false}) + s.Nil(err) + resultMap, err := s.mgr.AllJobTypeStatus(ctx) + s.Nil(err) + s.Equal(2, len(resultMap)) + s.True(resultMap["GARBAGE_COLLECTION"]) + s.False(resultMap["REPLICATION"]) +} + +func TestManagerTestSuite(t *testing.T) { + suite.Run(t, &ManagerTestSuite{}) +} diff --git a/src/pkg/queuestatus/model/model.go b/src/pkg/queuestatus/model/model.go new file mode 100644 index 000000000..fa81d0e12 --- /dev/null +++ b/src/pkg/queuestatus/model/model.go @@ -0,0 +1,33 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package model + +import "time" + +// JobQueueStatusTable is the name of table in DB that holds the queue status +const JobQueueStatusTable = "job_queue_status" + +// JobQueueStatus ... +type JobQueueStatus struct { + ID int `orm:"pk;auto;column(id)" json:"id,omitempty"` + JobType string `orm:"column(job_type)" json:"job_type,omitempty"` + Paused bool `orm:"column(paused)" json:"paused,omitempty"` + UpdateTime time.Time `orm:"column(update_time);auto_now"` +} + +// TableName ... +func (u *JobQueueStatus) TableName() string { + return JobQueueStatusTable +} diff --git a/src/pkg/scheduler/dao.go b/src/pkg/scheduler/dao.go index 949d98d78..14cbf806f 100644 --- a/src/pkg/scheduler/dao.go +++ b/src/pkg/scheduler/dao.go @@ -47,6 +47,7 @@ type schedule struct { type DAO interface { Create(ctx context.Context, s *schedule) (id int64, err error) List(ctx context.Context, query *q.Query) (schedules []*schedule, err error) + Count(ctx context.Context, query *q.Query) (total int64, err error) Get(ctx context.Context, id int64) (s *schedule, err error) Delete(ctx context.Context, id int64) (err error) Update(ctx context.Context, s *schedule, props ...string) (err error) @@ -145,3 +146,12 @@ func (d *dao) UpdateRevision(ctx context.Context, id, revision int64) (int64, er "Revision": revision, }) } + +func (d *dao) Count(ctx context.Context, query *q.Query) (total int64, err error) { + query = q.MustClone(query) + qs, err := orm.QuerySetterForCount(ctx, &schedule{}, query) + if err != nil { + return 0, err + } + return qs.Count() +} diff --git a/src/pkg/scheduler/mock_dao_test.go b/src/pkg/scheduler/mock_dao_test.go index 096a7d1e1..64886e583 100644 --- a/src/pkg/scheduler/mock_dao_test.go +++ b/src/pkg/scheduler/mock_dao_test.go @@ -14,6 +14,27 @@ type mockDAO struct { mock.Mock } +// Count provides a mock function with given fields: ctx, query +func (_m *mockDAO) Count(ctx context.Context, query *q.Query) (int64, error) { + ret := _m.Called(ctx, query) + + var r0 int64 + if rf, ok := ret.Get(0).(func(context.Context, *q.Query) int64); ok { + r0 = rf(ctx, query) + } else { + r0 = ret.Get(0).(int64) + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, *q.Query) error); ok { + r1 = rf(ctx, query) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // Create provides a mock function with given fields: ctx, s func (_m *mockDAO) Create(ctx context.Context, s *schedule) (int64, error) { ret := _m.Called(ctx, s) diff --git a/src/pkg/scheduler/scheduler.go b/src/pkg/scheduler/scheduler.go index cabc94db7..bab6b43c0 100644 --- a/src/pkg/scheduler/scheduler.go +++ b/src/pkg/scheduler/scheduler.go @@ -68,6 +68,8 @@ type Scheduler interface { GetSchedule(ctx context.Context, id int64) (*Schedule, error) // ListSchedules according to the query ListSchedules(ctx context.Context, query *q.Query) ([]*Schedule, error) + // CountSchedules counts the schedules according to the query + CountSchedules(ctx context.Context, query *q.Query) (int64, error) } // New returns an instance of the default scheduler @@ -85,6 +87,10 @@ type scheduler struct { taskMgr task.Manager } +func (s *scheduler) CountSchedules(ctx context.Context, query *q.Query) (int64, error) { + return s.dao.Count(ctx, query) +} + func (s *scheduler) Schedule(ctx context.Context, vendorType string, vendorID int64, cronType string, cron string, callbackFuncName string, callbackFuncParams interface{}, extraAttrs map[string]interface{}) (int64, error) { if len(vendorType) == 0 { diff --git a/src/pkg/task/mock_task_manager_test.go b/src/pkg/task/mock_task_manager_test.go index 7cf6bf625..4e0ba26e1 100644 --- a/src/pkg/task/mock_task_manager_test.go +++ b/src/pkg/task/mock_task_manager_test.go @@ -146,6 +146,27 @@ func (_m *mockTaskManager) Stop(ctx context.Context, id int64) error { return r0 } +// Update provides a mock function with given fields: ctx, task, props +func (_m *mockTaskManager) Update(ctx context.Context, task *Task, props ...string) error { + _va := make([]interface{}, len(props)) + for _i := range props { + _va[_i] = props[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, task) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, *Task, ...string) error); ok { + r0 = rf(ctx, task, props...) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // UpdateExtraAttrs provides a mock function with given fields: ctx, id, extraAttrs func (_m *mockTaskManager) UpdateExtraAttrs(ctx context.Context, id int64, extraAttrs map[string]interface{}) error { ret := _m.Called(ctx, id, extraAttrs) diff --git a/src/pkg/task/task.go b/src/pkg/task/task.go index 6640d77dd..84b0afe57 100644 --- a/src/pkg/task/task.go +++ b/src/pkg/task/task.go @@ -56,6 +56,8 @@ type Manager interface { // Count counts total of tasks according to the query. // Query the "ExtraAttrs" by setting 'query.Keywords["ExtraAttrs.key"]="value"' Count(ctx context.Context, query *q.Query) (int64, error) + // Update the status of the specified task + Update(ctx context.Context, task *Task, props ...string) error } // NewManager creates an instance of the default task manager @@ -75,6 +77,13 @@ type manager struct { coreURL string } +func (m *manager) Update(ctx context.Context, task *Task, props ...string) error { + return m.dao.Update(ctx, &dao.Task{ + ID: task.ID, + Status: task.Status, + }, props...) +} + func (m *manager) Count(ctx context.Context, query *q.Query) (int64, error) { return m.dao.Count(ctx, query) } diff --git a/src/server/v2.0/handler/handler.go b/src/server/v2.0/handler/handler.go index 81a239584..f24f4c19d 100644 --- a/src/server/v2.0/handler/handler.go +++ b/src/server/v2.0/handler/handler.go @@ -68,6 +68,7 @@ func New() http.Handler { PurgeAPI: newPurgeAPI(), ScanDataExportAPI: newScanDataExportAPI(), JobserviceAPI: newJobServiceAPI(), + ScheduleAPI: newScheduleAPI(), }) if err != nil { log.Fatal(err) diff --git a/src/server/v2.0/handler/jobservice.go b/src/server/v2.0/handler/jobservice.go index 70072efeb..9475281fb 100644 --- a/src/server/v2.0/handler/jobservice.go +++ b/src/server/v2.0/handler/jobservice.go @@ -16,8 +16,15 @@ package handler import ( "context" + "encoding/json" + "fmt" + "strings" "time" + "github.com/goharbor/harbor/src/lib/errors" + "github.com/goharbor/harbor/src/lib/log" + "github.com/goharbor/harbor/src/pkg/scheduler" + "github.com/go-openapi/runtime/middleware" "github.com/go-openapi/strfmt" @@ -63,7 +70,7 @@ func (j *jobServiceAPI) StopRunningJob(ctx context.Context, params jobservice.St if err := j.RequireSystemAccess(ctx, rbac.ActionStop, rbac.ResourceJobServiceMonitor); err != nil { return j.SendError(ctx, err) } - err := j.jobCtr.StopRunningJob(ctx, params.JobID) + err := j.jobCtr.StopRunningJobs(ctx, params.JobID) if err != nil { return j.SendError(ctx, err) } @@ -73,16 +80,24 @@ func (j *jobServiceAPI) StopRunningJob(ctx context.Context, params jobservice.St func toWorkerResponse(wks []*jm.Worker) []*models.Worker { workers := make([]*models.Worker, 0) for _, w := range wks { - p := &models.Worker{ - ID: w.ID, - JobName: w.JobName, - JobID: w.JobID, - PoolID: w.PoolID, - Args: w.Args, - StartAt: covertTime(w.StartedAt), - CheckinAt: covertTime(w.CheckInAt), + if len(w.JobID) == 0 { + workers = append(workers, &models.Worker{ + ID: w.ID, + PoolID: w.PoolID, + }) + } else { + startAtTime := covertTime(w.StartedAt) + checkInAtTime := covertTime(w.CheckInAt) + workers = append(workers, &models.Worker{ + ID: w.ID, + JobName: w.JobName, + JobID: w.JobID, + PoolID: w.PoolID, + Args: w.Args, + StartAt: &startAtTime, + CheckinAt: &checkInAtTime, + }) } - workers = append(workers, p) } return workers } @@ -103,6 +118,86 @@ func toWorkerPoolResponse(wps []*jm.WorkerPool) []*models.WorkerPool { } func covertTime(t int64) strfmt.DateTime { + if t == 0 { + return strfmt.NewDateTime() + } uxt := time.Unix(int64(t), 0) return strfmt.DateTime(uxt) } + +func toScheduleResponse(schs []*scheduler.Schedule) []*models.ScheduleTask { + result := make([]*models.ScheduleTask, 0) + for _, s := range schs { + extraAttr := []byte("") + if s.ExtraAttrs != nil { + extra, err := json.Marshal(s.ExtraAttrs) + if err != nil { + log.Warningf("failed to extract extra attribute, error %v", err) + } else { + extraAttr = extra + } + } + result = append(result, &models.ScheduleTask{ + ID: s.ID, + VendorType: s.VendorType, + VendorID: s.VendorID, + ExtraAttrs: string(extraAttr), + CreationTime: strfmt.DateTime(s.CreationTime), + }) + } + return result +} + +func (j *jobServiceAPI) ListJobQueues(ctx context.Context, params jobservice.ListJobQueuesParams) middleware.Responder { + if err := j.RequireSystemAccess(ctx, rbac.ActionList, rbac.ResourceJobServiceMonitor); err != nil { + return j.SendError(ctx, err) + } + queues, err := j.jobCtr.ListQueues(ctx) + if err != nil { + return j.SendError(ctx, err) + } + return jobservice.NewListJobQueuesOK().WithPayload(toQueueResponse(queues)) +} + +func toQueueResponse(queues []*jm.Queue) []*models.JobQueue { + result := make([]*models.JobQueue, 0) + for _, q := range queues { + result = append(result, &models.JobQueue{ + JobType: q.JobType, + Count: q.Count, + Latency: q.Latency, + Paused: q.Paused, + }) + } + return result +} + +func (j *jobServiceAPI) ActionPendingJobs(ctx context.Context, params jobservice.ActionPendingJobsParams) middleware.Responder { + if err := j.RequireSystemAccess(ctx, rbac.ActionStop, rbac.ResourceJobServiceMonitor); err != nil { + return j.SendError(ctx, err) + } + jobType := strings.ToUpper(params.JobType) + action := strings.ToLower(params.ActionRequest.Action) + if !strings.EqualFold(action, "stop") && !strings.EqualFold(action, "resume") && !strings.EqualFold(action, "pause") { + return j.SendError(ctx, errors.BadRequestError(fmt.Errorf("the action is not supported"))) + } + if strings.EqualFold(action, "stop") { + err := j.jobCtr.StopPendingJobs(ctx, jobType) + if err != nil { + return j.SendError(ctx, err) + } + } + if strings.EqualFold(action, "pause") { + err := j.jobCtr.PauseJobQueues(ctx, jobType) + if err != nil { + return j.SendError(ctx, err) + } + } + if strings.EqualFold(action, "resume") { + err := j.jobCtr.ResumeJobQueues(ctx, jobType) + if err != nil { + return j.SendError(ctx, err) + } + } + return jobservice.NewActionPendingJobsOK() +} diff --git a/src/server/v2.0/handler/model/repository.go b/src/server/v2.0/handler/model/repository.go index 2a63a5545..d9d77e975 100644 --- a/src/server/v2.0/handler/model/repository.go +++ b/src/server/v2.0/handler/model/repository.go @@ -14,8 +14,14 @@ type RepoRecord struct { // ToSwagger converts the repository into the swagger model func (r *RepoRecord) ToSwagger() *models.Repository { + var createTime *strfmt.DateTime + if !r.CreationTime.IsZero() { + t := strfmt.DateTime(r.CreationTime) + createTime = &t + } + return &models.Repository{ - CreationTime: strfmt.DateTime(r.CreationTime), + CreationTime: createTime, Description: r.Description, ID: r.RepositoryID, Name: r.Name, diff --git a/src/server/v2.0/handler/schedule.go b/src/server/v2.0/handler/schedule.go new file mode 100644 index 000000000..80f00b500 --- /dev/null +++ b/src/server/v2.0/handler/schedule.go @@ -0,0 +1,79 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package handler + +import ( + "context" + "strings" + + "github.com/go-openapi/runtime/middleware" + + "github.com/goharbor/harbor/src/common/rbac" + jobserviceCtl "github.com/goharbor/harbor/src/controller/jobservice" + "github.com/goharbor/harbor/src/lib/errors" + "github.com/goharbor/harbor/src/server/v2.0/models" + "github.com/goharbor/harbor/src/server/v2.0/restapi/operations/schedule" +) + +const all = "all" + +type scheduleAPI struct { + BaseAPI + jobServiceCtl jobserviceCtl.SchedulerController +} + +func newScheduleAPI() *scheduleAPI { + return &scheduleAPI{ + jobServiceCtl: jobserviceCtl.SchedulerCtl, + } +} + +func (s *scheduleAPI) GetSchedulePaused(ctx context.Context, params schedule.GetSchedulePausedParams) middleware.Responder { + if err := s.RequireAuthenticated(ctx); err != nil { + return s.SendError(ctx, err) + } + if !strings.EqualFold(params.JobType, all) { + return s.SendError(ctx, errors.BadRequestError(nil).WithMessage("job_type can only be 'all'")) + } + paused, err := s.jobServiceCtl.Paused(ctx) + if err != nil { + return s.SendError(ctx, err) + } + return schedule.NewGetSchedulePausedOK().WithPayload(&models.SchedulerStatus{ + Paused: paused, + }) +} + +func (s *scheduleAPI) ListSchedules(ctx context.Context, params schedule.ListSchedulesParams) middleware.Responder { + if err := s.RequireSystemAccess(ctx, rbac.ActionList, rbac.ResourceJobServiceMonitor); err != nil { + return s.SendError(ctx, err) + } + query, err := s.BuildQuery(ctx, nil, nil, params.Page, params.PageSize) + if err != nil { + return s.SendError(ctx, err) + } + count, err := s.jobServiceCtl.Count(ctx, query) + if err != nil { + return s.SendError(ctx, err) + } + schs, err := s.jobServiceCtl.List(ctx, query) + if err != nil { + return s.SendError(ctx, err) + } + return schedule.NewListSchedulesOK(). + WithPayload(toScheduleResponse(schs)). + WithXTotalCount(count). + WithLink(s.Links(ctx, params.HTTPRequest.URL, count, query.PageNumber, query.PageSize).String()) +} diff --git a/src/testing/controller/jobservice/scheduler_controller.go b/src/testing/controller/jobservice/scheduler_controller.go index dc16bfe34..893fd12f1 100644 --- a/src/testing/controller/jobservice/scheduler_controller.go +++ b/src/testing/controller/jobservice/scheduler_controller.go @@ -7,6 +7,8 @@ import ( mock "github.com/stretchr/testify/mock" + q "github.com/goharbor/harbor/src/lib/q" + scheduler "github.com/goharbor/harbor/src/pkg/scheduler" ) @@ -15,6 +17,27 @@ type SchedulerController struct { mock.Mock } +// Count provides a mock function with given fields: ctx, query +func (_m *SchedulerController) Count(ctx context.Context, query *q.Query) (int64, error) { + ret := _m.Called(ctx, query) + + var r0 int64 + if rf, ok := ret.Get(0).(func(context.Context, *q.Query) int64); ok { + r0 = rf(ctx, query) + } else { + r0 = ret.Get(0).(int64) + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, *q.Query) error); ok { + r1 = rf(ctx, query) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // Create provides a mock function with given fields: ctx, vendorType, cronType, cron, callbackFuncName, policy, extrasParam func (_m *SchedulerController) Create(ctx context.Context, vendorType string, cronType string, cron string, callbackFuncName string, policy interface{}, extrasParam map[string]interface{}) (int64, error) { ret := _m.Called(ctx, vendorType, cronType, cron, callbackFuncName, policy, extrasParam) @@ -73,6 +96,50 @@ func (_m *SchedulerController) Get(ctx context.Context, vendorType string) (*sch return r0, r1 } +// List provides a mock function with given fields: ctx, query +func (_m *SchedulerController) List(ctx context.Context, query *q.Query) ([]*scheduler.Schedule, error) { + ret := _m.Called(ctx, query) + + var r0 []*scheduler.Schedule + if rf, ok := ret.Get(0).(func(context.Context, *q.Query) []*scheduler.Schedule); ok { + r0 = rf(ctx, query) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*scheduler.Schedule) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, *q.Query) error); ok { + r1 = rf(ctx, query) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Paused provides a mock function with given fields: ctx +func (_m *SchedulerController) Paused(ctx context.Context) (bool, error) { + ret := _m.Called(ctx) + + var r0 bool + if rf, ok := ret.Get(0).(func(context.Context) bool); ok { + r0 = rf(ctx) + } else { + r0 = ret.Get(0).(bool) + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + type mockConstructorTestingTNewSchedulerController interface { mock.TestingT Cleanup(func()) diff --git a/src/testing/pkg/jobmonitor/job_service_monitor_client.go b/src/testing/pkg/jobmonitor/job_service_monitor_client.go index e2fcb126e..3f0d49d32 100644 --- a/src/testing/pkg/jobmonitor/job_service_monitor_client.go +++ b/src/testing/pkg/jobmonitor/job_service_monitor_client.go @@ -12,6 +12,29 @@ type JobServiceMonitorClient struct { mock.Mock } +// Queues provides a mock function with given fields: +func (_m *JobServiceMonitorClient) Queues() ([]*work.Queue, error) { + ret := _m.Called() + + var r0 []*work.Queue + if rf, ok := ret.Get(0).(func() []*work.Queue); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*work.Queue) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // WorkerObservations provides a mock function with given fields: func (_m *JobServiceMonitorClient) WorkerObservations() ([]*work.WorkerObservation, error) { ret := _m.Called() diff --git a/src/testing/pkg/jobmonitor/queue_manager.go b/src/testing/pkg/jobmonitor/queue_manager.go new file mode 100644 index 000000000..786534cb5 --- /dev/null +++ b/src/testing/pkg/jobmonitor/queue_manager.go @@ -0,0 +1,53 @@ +// Code generated by mockery v2.14.0. DO NOT EDIT. + +package jobmonitor + +import ( + context "context" + + jobmonitor "github.com/goharbor/harbor/src/pkg/jobmonitor" + mock "github.com/stretchr/testify/mock" +) + +// QueueManager is an autogenerated mock type for the QueueManager type +type QueueManager struct { + mock.Mock +} + +// List provides a mock function with given fields: ctx, monitClient +func (_m *QueueManager) List(ctx context.Context, monitClient jobmonitor.JobServiceMonitorClient) ([]*jobmonitor.Queue, error) { + ret := _m.Called(ctx, monitClient) + + var r0 []*jobmonitor.Queue + if rf, ok := ret.Get(0).(func(context.Context, jobmonitor.JobServiceMonitorClient) []*jobmonitor.Queue); ok { + r0 = rf(ctx, monitClient) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*jobmonitor.Queue) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, jobmonitor.JobServiceMonitorClient) error); ok { + r1 = rf(ctx, monitClient) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +type mockConstructorTestingTNewQueueManager interface { + mock.TestingT + Cleanup(func()) +} + +// NewQueueManager creates a new instance of QueueManager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewQueueManager(t mockConstructorTestingTNewQueueManager) *QueueManager { + mock := &QueueManager{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/src/testing/pkg/jobmonitor/redis_client.go b/src/testing/pkg/jobmonitor/redis_client.go new file mode 100644 index 000000000..f437b200e --- /dev/null +++ b/src/testing/pkg/jobmonitor/redis_client.go @@ -0,0 +1,103 @@ +// Code generated by mockery v2.14.0. DO NOT EDIT. + +package jobmonitor + +import ( + context "context" + + mock "github.com/stretchr/testify/mock" +) + +// RedisClient is an autogenerated mock type for the RedisClient type +type RedisClient struct { + mock.Mock +} + +// AllJobTypes provides a mock function with given fields: ctx +func (_m *RedisClient) AllJobTypes(ctx context.Context) ([]string, error) { + ret := _m.Called(ctx) + + var r0 []string + if rf, ok := ret.Get(0).(func(context.Context) []string); ok { + r0 = rf(ctx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]string) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// PauseJob provides a mock function with given fields: ctx, jobName +func (_m *RedisClient) PauseJob(ctx context.Context, jobName string) error { + ret := _m.Called(ctx, jobName) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string) error); ok { + r0 = rf(ctx, jobName) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// StopPendingJobs provides a mock function with given fields: ctx, jobType +func (_m *RedisClient) StopPendingJobs(ctx context.Context, jobType string) ([]string, error) { + ret := _m.Called(ctx, jobType) + + var r0 []string + if rf, ok := ret.Get(0).(func(context.Context, string) []string); ok { + r0 = rf(ctx, jobType) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]string) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { + r1 = rf(ctx, jobType) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// UnpauseJob provides a mock function with given fields: ctx, jobName +func (_m *RedisClient) UnpauseJob(ctx context.Context, jobName string) error { + ret := _m.Called(ctx, jobName) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string) error); ok { + r0 = rf(ctx, jobName) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +type mockConstructorTestingTNewRedisClient interface { + mock.TestingT + Cleanup(func()) +} + +// NewRedisClient creates a new instance of RedisClient. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewRedisClient(t mockConstructorTestingTNewRedisClient) *RedisClient { + mock := &RedisClient{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/src/testing/pkg/pkg.go b/src/testing/pkg/pkg.go index 145ced63a..bdd4b975c 100644 --- a/src/testing/pkg/pkg.go +++ b/src/testing/pkg/pkg.go @@ -71,3 +71,6 @@ package pkg //go:generate mockery --case snake --dir ../../pkg/jobmonitor --name PoolManager --output ./jobmonitor --outpkg jobmonitor //go:generate mockery --case snake --dir ../../pkg/jobmonitor --name JobServiceMonitorClient --output ./jobmonitor --outpkg jobmonitor //go:generate mockery --case snake --dir ../../pkg/jobmonitor --name WorkerManager --output ./jobmonitor --outpkg jobmonitor +//go:generate mockery --case snake --dir ../../pkg/jobmonitor --name QueueManager --output ./jobmonitor --outpkg jobmonitor +//go:generate mockery --case snake --dir ../../pkg/jobmonitor --name RedisClient --output ./jobmonitor --outpkg jobmonitor +//go:generate mockery --case snake --dir ../../pkg/queuestatus --name Manager --output ./queuestatus --outpkg queuestatus diff --git a/src/testing/pkg/queuestatus/manager.go b/src/testing/pkg/queuestatus/manager.go new file mode 100644 index 000000000..34d718f85 --- /dev/null +++ b/src/testing/pkg/queuestatus/manager.go @@ -0,0 +1,134 @@ +// Code generated by mockery v2.14.0. DO NOT EDIT. + +package queuestatus + +import ( + context "context" + + model "github.com/goharbor/harbor/src/pkg/queuestatus/model" + mock "github.com/stretchr/testify/mock" +) + +// Manager is an autogenerated mock type for the Manager type +type Manager struct { + mock.Mock +} + +// AllJobTypeStatus provides a mock function with given fields: ctx +func (_m *Manager) AllJobTypeStatus(ctx context.Context) (map[string]bool, error) { + ret := _m.Called(ctx) + + var r0 map[string]bool + if rf, ok := ret.Get(0).(func(context.Context) map[string]bool); ok { + r0 = rf(ctx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(map[string]bool) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// CreateOrUpdate provides a mock function with given fields: ctx, status +func (_m *Manager) CreateOrUpdate(ctx context.Context, status *model.JobQueueStatus) (int64, error) { + ret := _m.Called(ctx, status) + + var r0 int64 + if rf, ok := ret.Get(0).(func(context.Context, *model.JobQueueStatus) int64); ok { + r0 = rf(ctx, status) + } else { + r0 = ret.Get(0).(int64) + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, *model.JobQueueStatus) error); ok { + r1 = rf(ctx, status) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Get provides a mock function with given fields: ctx, jobType +func (_m *Manager) Get(ctx context.Context, jobType string) (*model.JobQueueStatus, error) { + ret := _m.Called(ctx, jobType) + + var r0 *model.JobQueueStatus + if rf, ok := ret.Get(0).(func(context.Context, string) *model.JobQueueStatus); ok { + r0 = rf(ctx, jobType) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*model.JobQueueStatus) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { + r1 = rf(ctx, jobType) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// List provides a mock function with given fields: ctx +func (_m *Manager) List(ctx context.Context) ([]*model.JobQueueStatus, error) { + ret := _m.Called(ctx) + + var r0 []*model.JobQueueStatus + if rf, ok := ret.Get(0).(func(context.Context) []*model.JobQueueStatus); ok { + r0 = rf(ctx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*model.JobQueueStatus) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// UpdateStatus provides a mock function with given fields: ctx, jobType, paused +func (_m *Manager) UpdateStatus(ctx context.Context, jobType string, paused bool) error { + ret := _m.Called(ctx, jobType, paused) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string, bool) error); ok { + r0 = rf(ctx, jobType, paused) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +type mockConstructorTestingTNewManager interface { + mock.TestingT + Cleanup(func()) +} + +// NewManager creates a new instance of Manager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewManager(t mockConstructorTestingTNewManager) *Manager { + mock := &Manager{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/src/testing/pkg/scheduler/scheduler.go b/src/testing/pkg/scheduler/scheduler.go index 608eda008..3e92bfc0b 100644 --- a/src/testing/pkg/scheduler/scheduler.go +++ b/src/testing/pkg/scheduler/scheduler.go @@ -16,6 +16,27 @@ type Scheduler struct { mock.Mock } +// CountSchedules provides a mock function with given fields: ctx, query +func (_m *Scheduler) CountSchedules(ctx context.Context, query *q.Query) (int64, error) { + ret := _m.Called(ctx, query) + + var r0 int64 + if rf, ok := ret.Get(0).(func(context.Context, *q.Query) int64); ok { + r0 = rf(ctx, query) + } else { + r0 = ret.Get(0).(int64) + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, *q.Query) error); ok { + r1 = rf(ctx, query) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // GetSchedule provides a mock function with given fields: ctx, id func (_m *Scheduler) GetSchedule(ctx context.Context, id int64) (*scheduler.Schedule, error) { ret := _m.Called(ctx, id) diff --git a/src/testing/pkg/task/manager.go b/src/testing/pkg/task/manager.go index d7a8eb4c4..cbbabdecb 100644 --- a/src/testing/pkg/task/manager.go +++ b/src/testing/pkg/task/manager.go @@ -148,6 +148,27 @@ func (_m *Manager) Stop(ctx context.Context, id int64) error { return r0 } +// Update provides a mock function with given fields: ctx, _a1, props +func (_m *Manager) Update(ctx context.Context, _a1 *task.Task, props ...string) error { + _va := make([]interface{}, len(props)) + for _i := range props { + _va[_i] = props[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, _a1) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, *task.Task, ...string) error); ok { + r0 = rf(ctx, _a1, props...) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // UpdateExtraAttrs provides a mock function with given fields: ctx, id, extraAttrs func (_m *Manager) UpdateExtraAttrs(ctx context.Context, id int64, extraAttrs map[string]interface{}) error { ret := _m.Called(ctx, id, extraAttrs)