From 39ca918ffea882ed17a71ac94e0f956688e2c6e5 Mon Sep 17 00:00:00 2001 From: "stonezdj(Daojun Zhang)" Date: Thu, 3 Nov 2022 10:47:40 +0800 Subject: [PATCH] Add jobservice monitoring api list pool, worker and stop running task (#17658) Add REST API to list job pool, worker, stop running task Add jobservice handler to retrieve configuration Add RBAC for jobservice monitoring dashboard Add REST API to list pool, worker and stop running task Signed-off-by: stonezdj Signed-off-by: stonezdj --- api/v2.0/swagger.yaml | 132 ++++++++++++++- src/common/job/client.go | 31 ++++ src/common/rbac/const.go | 1 + src/common/rbac/system/policies.go | 4 + src/controller/jobmonitor/monitor.go | 155 ++++++++++++++++++ src/controller/jobmonitor/monitor_test.go | 96 +++++++++++ src/jobservice/api/handler.go | 18 +- src/jobservice/api/router.go | 1 + src/jobservice/job/models.go | 7 + src/pkg/jobmonitor/model.go | 46 ++++++ src/pkg/jobmonitor/pool.go | 62 +++++++ src/pkg/jobmonitor/worker.go | 87 ++++++++++ src/pkg/task/mock_jobservice_client_test.go | 23 +++ src/server/v2.0/handler/handler.go | 1 + src/server/v2.0/handler/jobservice.go | 108 ++++++++++++ src/testing/job/mock_client.go | 5 + .../jobmonitor/job_service_monitor_client.go | 74 +++++++++ src/testing/pkg/jobmonitor/pool_manager.go | 53 ++++++ src/testing/pkg/jobmonitor/worker_manager.go | 53 ++++++ src/testing/pkg/pkg.go | 3 + 20 files changed, 958 insertions(+), 2 deletions(-) create mode 100644 src/controller/jobmonitor/monitor.go create mode 100644 src/controller/jobmonitor/monitor_test.go create mode 100644 src/pkg/jobmonitor/model.go create mode 100644 src/pkg/jobmonitor/pool.go create mode 100644 src/pkg/jobmonitor/worker.go create mode 100644 src/server/v2.0/handler/jobservice.go create mode 100644 src/testing/pkg/jobmonitor/job_service_monitor_client.go create mode 100644 src/testing/pkg/jobmonitor/pool_manager.go create mode 100644 src/testing/pkg/jobmonitor/worker_manager.go diff --git a/api/v2.0/swagger.yaml b/api/v2.0/swagger.yaml index a655653d2..c92f9f854 100644 --- a/api/v2.0/swagger.yaml +++ b/api/v2.0/swagger.yaml @@ -4517,6 +4517,82 @@ paths: $ref: '#/responses/403' '500': $ref: '#/responses/500' + /jobservice/pools: + get: + operationId: getWorkerPools + summary: Get worker pools + description: Get worker pools + tags: + - jobservice + parameters: + - $ref: '#/parameters/requestId' + responses: + '200': + description: Get worker pools successfully. + schema: + type: array + items: + $ref: '#/definitions/WorkerPool' + '401': + $ref: '#/responses/401' + '403': + $ref: '#/responses/403' + '500': + $ref: '#/responses/500' + /jobservice/pools/{pool_id}/workers: + get: + operationId: getWorkers + summary: Get workers + description: Get workers in current pool + tags: + - jobservice + parameters: + - $ref: '#/parameters/requestId' + - name: pool_id + in: path + required: true + type: string + description: The name of the pool. 'all' stands for all pools + responses: + '200': + description: Get workers successfully. + schema: + type: array + items: + $ref: '#/definitions/Worker' + '401': + $ref: '#/responses/401' + '403': + $ref: '#/responses/403' + '404': + $ref: '#/responses/404' + '500': + $ref: '#/responses/500' + /jobservice/jobs/{job_id}: + put: + operationId: stopRunningJob + summary: Stop running job + description: Stop running job + tags: + - jobservice + parameters: + - $ref: '#/parameters/requestId' + - name: job_id + in: path + required: true + type: string + description: The id of the job. + responses: + '200': + description: Stop worker successfully. + '401': + $ref: '#/responses/401' + '403': + $ref: '#/responses/403' + '404': + $ref: '#/responses/404' + '500': + $ref: '#/responses/500' /ping: get: operationId: getPing @@ -9253,4 +9329,58 @@ definitions: type: array items: $ref: '#/definitions/ScanDataExportExecution' - description: The list of scan data export executions \ No newline at end of file + description: The list of scan data export executions + WorkerPool: + type: object + description: the worker pool of job service + properties: + pid: + type: integer + description: the process id of jobservice + worker_pool_id: + type: string + description: the id of the worker pool + start_at: + type: string + format: date-time + description: The start time of the work pool + heartbeat_at: + type: string + format: date-time + description: The heartbeat time of the work pool + concurrency: + type: integer + description: The concurrency of the work pool + host: + type: string + description: The host of the work pool + Worker: + type: object + description: worker in the pool + properties: + id: + type: string + description: the id of the worker + pool_id: + type: string + description: the id of the worker pool + job_name: + type: string + description: the name of the running job in the worker + job_id: + type: string + description: the id of the running job in the worker + start_at: + type: string + format: date-time + description: The start time of the worker + args: + type: string + description: The args of the worker + check_in: + type: string + description: the checkin of the running job in the worker + checkin_at: + type: string + format: date-time + description: The checkin time of the running job in the worker \ No newline at end of file diff --git a/src/common/job/client.go b/src/common/job/client.go index 6b8719f71..d7f30d347 100644 --- a/src/common/job/client.go +++ b/src/common/job/client.go @@ -34,6 +34,8 @@ type Client interface { PostAction(uuid, action string) error GetExecutions(uuid string) ([]job.Stats, error) // TODO Redirect joblog when we see there's memory issue. + // GetJobServiceConfig retrieves the job config + GetJobServiceConfig() (*job.Config, error) } // StatusBehindError represents the error got when trying to stop a success/failed job @@ -212,6 +214,35 @@ func (d *DefaultClient) PostAction(uuid, action string) error { return nil } +// GetJobServiceConfig retrieves the job service configuration +func (d *DefaultClient) GetJobServiceConfig() (*job.Config, error) { + url := d.endpoint + "/api/v1/config" + req, err := http.NewRequest(http.MethodGet, url, nil) + if err != nil { + return nil, err + } + resp, err := d.client.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + data, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, err + } + if resp.StatusCode != http.StatusOK { + return nil, &commonhttp.Error{ + Code: resp.StatusCode, + Message: string(data), + } + } + var config job.Config + err = json.Unmarshal(data, &config) + if err != nil { + return nil, err + } + return &config, nil +} func isStatusBehindError(err error) (string, bool) { if err == nil { return "", false diff --git a/src/common/rbac/const.go b/src/common/rbac/const.go index e73fc8fdc..f52a40c96 100755 --- a/src/common/rbac/const.go +++ b/src/common/rbac/const.go @@ -77,4 +77,5 @@ const ( ResourceSystemVolumes = Resource("system-volumes") ResourcePurgeAuditLog = Resource("purge-audit") ResourceExportCVE = Resource("export-cve") + ResourceJobServiceMonitor = Resource("jobservice-monitor") ) diff --git a/src/common/rbac/system/policies.go b/src/common/rbac/system/policies.go index 611ccf5e2..0f8e89f02 100644 --- a/src/common/rbac/system/policies.go +++ b/src/common/rbac/system/policies.go @@ -66,5 +66,9 @@ var ( {Resource: rbac.ResourceLdapUser, Action: rbac.ActionList}, {Resource: rbac.ResourceConfiguration, Action: rbac.ActionRead}, {Resource: rbac.ResourceConfiguration, Action: rbac.ActionUpdate}, + + {Resource: rbac.ResourceJobServiceMonitor, Action: rbac.ActionRead}, + {Resource: rbac.ResourceJobServiceMonitor, Action: rbac.ActionList}, + {Resource: rbac.ResourceJobServiceMonitor, Action: rbac.ActionStop}, } ) diff --git a/src/controller/jobmonitor/monitor.go b/src/controller/jobmonitor/monitor.go new file mode 100644 index 000000000..c3ff7d580 --- /dev/null +++ b/src/controller/jobmonitor/monitor.go @@ -0,0 +1,155 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package jobmonitor + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/goharbor/harbor/src/lib/orm" + + "github.com/goharbor/harbor/src/lib/log" + + "github.com/gocraft/work" + + "github.com/goharbor/harbor/src/common/job" + "github.com/goharbor/harbor/src/lib/errors" + "github.com/goharbor/harbor/src/lib/q" + libRedis "github.com/goharbor/harbor/src/lib/redis" + jm "github.com/goharbor/harbor/src/pkg/jobmonitor" + "github.com/goharbor/harbor/src/pkg/scheduler" + "github.com/goharbor/harbor/src/pkg/task" +) + +// All the jobs in the pool, or all pools +const All = "all" + +// Ctl the controller instance of the worker pool controller +var Ctl = NewMonitorController() + +// MonitorController defines the worker pool operations +type MonitorController interface { + // ListPools lists the worker pools + ListPools(ctx context.Context) ([]*jm.WorkerPool, error) + // ListWorkers lists the workers in the pool + ListWorkers(ctx context.Context, poolID string) ([]*jm.Worker, error) + // StopRunningJob stop the running job + StopRunningJob(ctx context.Context, jobID string) error +} + +type monitorController struct { + poolManager jm.PoolManager + workerManager jm.WorkerManager + taskManager task.Manager + sch scheduler.Scheduler + monitorClient func() (jm.JobServiceMonitorClient, error) +} + +// NewMonitorController ... +func NewMonitorController() MonitorController { + return &monitorController{ + poolManager: jm.NewPoolManager(), + workerManager: jm.NewWorkerManager(), + taskManager: task.NewManager(), + monitorClient: jobServiceMonitorClient, + } +} + +func (w *monitorController) StopRunningJob(ctx context.Context, jobID string) error { + if strings.EqualFold(jobID, All) { + allRunningJobs, err := w.allRunningJobs(ctx) + if err != nil { + log.Errorf("failed to get all running jobs: %v", err) + return err + } + for _, jobID := range allRunningJobs { + if err := w.stopJob(ctx, jobID); err != nil { + log.Errorf("failed to stop running job %s: %v", jobID, err) + return err + } + } + return nil + } + return w.stopJob(ctx, jobID) +} + +func (w *monitorController) stopJob(ctx context.Context, jobID string) error { + tasks, err := w.taskManager.List(ctx, &q.Query{Keywords: q.KeyWords{"job_id": jobID}}) + if err != nil { + return err + } + if len(tasks) == 0 { + return errors.BadRequestError(nil).WithMessage("job %s not found", jobID) + } + if len(tasks) != 1 { + return fmt.Errorf("there are more than one task with the same job ID") + } + // use local transaction to avoid rollback batch success tasks to previous state when one fail + if ctx == nil { + log.Debug("context is nil, skip stop operation") + return nil + } + return orm.WithTransaction(func(ctx context.Context) error { + return w.taskManager.Stop(ctx, tasks[0].ID) + })(orm.SetTransactionOpNameToContext(ctx, "tx-stop-job")) +} + +func (w *monitorController) allRunningJobs(ctx context.Context) ([]string, error) { + jobIDs := make([]string, 0) + wks, err := w.ListWorkers(ctx, All) + if err != nil { + log.Errorf("failed to list workers: %v", err) + return nil, err + } + for _, wk := range wks { + jobIDs = append(jobIDs, wk.JobID) + } + return jobIDs, nil +} + +func jobServiceMonitorClient() (jm.JobServiceMonitorClient, error) { + cfg, err := job.GlobalClient.GetJobServiceConfig() + if err != nil { + return nil, err + } + config := cfg.RedisPoolConfig + pool, err := libRedis.GetRedisPool("JobService", config.RedisURL, &libRedis.PoolParam{ + PoolMaxIdle: 0, + PoolIdleTimeout: time.Duration(config.IdleTimeoutSecond) * time.Second, + }) + if err != nil { + log.Errorf("failed to get redis pool: %v", err) + return nil, err + } + return work.NewClient(fmt.Sprintf("{%s}", config.Namespace), pool), nil +} + +func (w *monitorController) ListWorkers(ctx context.Context, poolID string) ([]*jm.Worker, error) { + mClient, err := w.monitorClient() + if err != nil { + return nil, err + } + return w.workerManager.List(ctx, mClient, poolID) +} + +func (w *monitorController) ListPools(ctx context.Context) ([]*jm.WorkerPool, error) { + mClient, err := w.monitorClient() + if err != nil { + return nil, err + } + return w.poolManager.List(ctx, mClient) +} diff --git a/src/controller/jobmonitor/monitor_test.go b/src/controller/jobmonitor/monitor_test.go new file mode 100644 index 000000000..5ffb44e04 --- /dev/null +++ b/src/controller/jobmonitor/monitor_test.go @@ -0,0 +1,96 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package jobmonitor + +import ( + "testing" + "time" + + "github.com/gocraft/work" + "github.com/stretchr/testify/suite" + + "github.com/goharbor/harbor/src/pkg/jobmonitor" + "github.com/goharbor/harbor/src/pkg/task" + "github.com/goharbor/harbor/src/testing/mock" + monitorMock "github.com/goharbor/harbor/src/testing/pkg/jobmonitor" + taskMock "github.com/goharbor/harbor/src/testing/pkg/task" +) + +type JobServiceMonitorTestSuite struct { + suite.Suite + jmClient jobmonitor.JobServiceMonitorClient + poolManager jobmonitor.PoolManager + workerManager jobmonitor.WorkerManager + monitController MonitorController + taskManager task.Manager +} + +func (s *JobServiceMonitorTestSuite) SetupSuite() { + s.jmClient = &monitorMock.JobServiceMonitorClient{} + s.poolManager = &monitorMock.PoolManager{} + s.workerManager = jobmonitor.NewWorkerManager() + s.taskManager = &taskMock.Manager{} + s.monitController = &monitorController{ + poolManager: s.poolManager, + workerManager: s.workerManager, + taskManager: s.taskManager, + monitorClient: func() (jobmonitor.JobServiceMonitorClient, error) { + return s.jmClient, nil + }, + } +} + +func (s *JobServiceMonitorTestSuite) TearDownSuite() { +} + +func (s *JobServiceMonitorTestSuite) TestListPool() { + mock.OnAnything(s.poolManager, "List").Return([]*jobmonitor.WorkerPool{ + { + ID: "1", PID: 1, StartAt: time.Now().Unix(), Concurrency: 10, + }, + }, nil) + pools, err := s.poolManager.List(nil, s.jmClient) + s.Assert().Nil(err) + s.Assert().Equal(1, len(pools)) +} + +func (s *JobServiceMonitorTestSuite) TestListWorker() { + mock.OnAnything(s.jmClient, "WorkerObservations").Return([]*work.WorkerObservation{ + {WorkerID: "abc", IsBusy: true, JobName: "test", JobID: "1", ArgsJSON: "{\"sample\":\"sample args\"}"}, + }, nil) + mock.OnAnything(s.jmClient, "WorkerPoolHeartbeats").Return([]*work.WorkerPoolHeartbeat{ + {WorkerPoolID: "1", Pid: 1, StartedAt: time.Now().Unix(), Concurrency: 10, WorkerIDs: []string{"abc"}}, + }, nil) + workers, err := s.monitController.ListWorkers(nil, "1") + s.Assert().Nil(err) + s.Assert().Equal(1, len(workers)) +} + +func (s *JobServiceMonitorTestSuite) TestStopRunningJob() { + mock.OnAnything(s.jmClient, "WorkerObservations").Return([]*work.WorkerObservation{ + {WorkerID: "abc", IsBusy: true, JobName: "test", JobID: "1", ArgsJSON: "{\"sample\":\"sample args\"}"}, + }, nil) + mock.OnAnything(s.jmClient, "WorkerPoolHeartbeats").Return([]*work.WorkerPoolHeartbeat{ + {WorkerPoolID: "1", Pid: 1, StartedAt: time.Now().Unix(), Concurrency: 10, WorkerIDs: []string{"abc"}}, + }, nil) + mock.OnAnything(s.taskManager, "List").Return([]*task.Task{{ID: 1, VendorType: "GARBAGE_COLLECTION"}}, nil) + mock.OnAnything(s.taskManager, "Stop").Return(nil) + err := s.monitController.StopRunningJob(nil, "1") + s.Assert().Nil(err) +} + +func TestJobServiceMonitorTestSuite(t *testing.T) { + suite.Run(t, &JobServiceMonitorTestSuite{}) +} diff --git a/src/jobservice/api/handler.go b/src/jobservice/api/handler.go index d9007be31..3222b3083 100644 --- a/src/jobservice/api/handler.go +++ b/src/jobservice/api/handler.go @@ -27,6 +27,7 @@ import ( "github.com/goharbor/harbor/src/jobservice/common/query" "github.com/goharbor/harbor/src/jobservice/common/utils" + "github.com/goharbor/harbor/src/jobservice/config" "github.com/goharbor/harbor/src/jobservice/core" "github.com/goharbor/harbor/src/jobservice/errs" "github.com/goharbor/harbor/src/jobservice/job" @@ -56,11 +57,14 @@ type Handler interface { // HandleJobLogReq is used to handle the request of getting job logs HandleJobLogReq(w http.ResponseWriter, req *http.Request) - // HandleJobLogReq is used to handle the request of getting periodic executions + // HandlePeriodicExecutions is used to handle the request of getting periodic executions HandlePeriodicExecutions(w http.ResponseWriter, req *http.Request) // HandleGetJobsReq is used to handle the request of getting jobs HandleGetJobsReq(w http.ResponseWriter, req *http.Request) + + // HandleGetConfigReq is used to handle the request of getting configure + HandleGetConfigReq(w http.ResponseWriter, req *http.Request) } // DefaultHandler is the default request handler which implements the Handler interface. @@ -294,6 +298,18 @@ func (dh *DefaultHandler) log(req *http.Request, code int, text string) { logger.Debugf("Serve http request '%s %s': %d %s", req.Method, req.URL.String(), code, text) } +// HandleGetConfigReq return the config of the job service +func (dh *DefaultHandler) HandleGetConfigReq(w http.ResponseWriter, req *http.Request) { + if config.DefaultConfig == nil || config.DefaultConfig.PoolConfig == nil || config.DefaultConfig.PoolConfig.RedisPoolCfg == nil { + logger.Errorf("Failed to get config, config is nil") + dh.handleError(w, req, http.StatusInternalServerError, errs.HandleJSONDataError(fmt.Errorf("no configuration"))) + return + } + dh.handleJSONData(w, req, http.StatusOK, &job.Config{ + RedisPoolConfig: config.DefaultConfig.PoolConfig.RedisPoolCfg, + }) +} + func extractQuery(req *http.Request) *query.Parameter { q := &query.Parameter{ PageNumber: 1, diff --git a/src/jobservice/api/router.go b/src/jobservice/api/router.go index a34e2bb48..0e3c7988c 100644 --- a/src/jobservice/api/router.go +++ b/src/jobservice/api/router.go @@ -100,5 +100,6 @@ func (br *BaseRouter) registerRoutes() { subRouter.HandleFunc("/jobs/{job_id}", br.handler.HandleJobActionReq).Methods(http.MethodPost) subRouter.HandleFunc("/jobs/{job_id}/log", br.handler.HandleJobLogReq).Methods(http.MethodGet) subRouter.HandleFunc("/stats", br.handler.HandleCheckStatusReq).Methods(http.MethodGet) + subRouter.HandleFunc("/config", br.handler.HandleGetConfigReq).Methods(http.MethodGet) subRouter.HandleFunc("/jobs/{job_id}/executions", br.handler.HandlePeriodicExecutions).Methods(http.MethodGet) } diff --git a/src/jobservice/job/models.go b/src/jobservice/job/models.go index 1a521de43..2cd2eb0bc 100644 --- a/src/jobservice/job/models.go +++ b/src/jobservice/job/models.go @@ -17,6 +17,8 @@ package job import ( "encoding/json" + "github.com/goharbor/harbor/src/jobservice/config" + "github.com/goharbor/harbor/src/jobservice/common/utils" "github.com/goharbor/harbor/src/lib/errors" ) @@ -153,3 +155,8 @@ func (st *Stats) Validate() error { return nil } + +// Config job service config +type Config struct { + RedisPoolConfig *config.RedisPoolConfig `json:"redis_pool_config"` +} diff --git a/src/pkg/jobmonitor/model.go b/src/pkg/jobmonitor/model.go new file mode 100644 index 000000000..ba3acdef0 --- /dev/null +++ b/src/pkg/jobmonitor/model.go @@ -0,0 +1,46 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package jobmonitor + +// WorkerPool job service worker pool +type WorkerPool struct { + ID string `json:"pool_id"` + PID int `json:"pid"` + StartAt int64 `json:"start_at"` + HeartbeatAt int64 `json:"heartbeat_at"` + Concurrency int `json:"concurrency"` + Host string `json:"host"` +} + +// Worker job service worker +type Worker struct { + ID string `json:"id"` + PoolID string `json:"pool_id"` + IsBusy bool `json:"is_busy"` + JobName string `json:"job_name"` + JobID string `json:"job_id"` + StartedAt int64 `json:"start_at"` + Args string `json:"args"` + CheckIn string `json:"check_in"` + CheckInAt int64 `json:"check_in_at"` +} + +// Queue the job queue +type Queue struct { + JobType string + Count int64 + Latency int64 + Paused bool +} diff --git a/src/pkg/jobmonitor/pool.go b/src/pkg/jobmonitor/pool.go new file mode 100644 index 000000000..335cc3e44 --- /dev/null +++ b/src/pkg/jobmonitor/pool.go @@ -0,0 +1,62 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package jobmonitor + +import ( + "context" + + "github.com/gocraft/work" +) + +// PoolManager the interface to retrieve job service monitor metrics +type PoolManager interface { + // List retrieves pools information + List(ctx context.Context, monitorClient JobServiceMonitorClient) ([]*WorkerPool, error) +} + +// JobServiceMonitorClient the interface to retrieve job service monitor metrics +type JobServiceMonitorClient interface { + // WorkerPoolHeartbeats retrieves worker pool heartbeats + WorkerPoolHeartbeats() ([]*work.WorkerPoolHeartbeat, error) + // WorkerObservations retrieves worker observations + WorkerObservations() ([]*work.WorkerObservation, error) +} + +type poolManager struct{} + +// NewPoolManager create a PoolManager with namespace and redis Pool +func NewPoolManager() PoolManager { + return &poolManager{} +} + +func (p poolManager) List(ctx context.Context, monitorClient JobServiceMonitorClient) ([]*WorkerPool, error) { + workerPool := make([]*WorkerPool, 0) + wh, err := monitorClient.WorkerPoolHeartbeats() + if err != nil { + return workerPool, err + } + for _, w := range wh { + wp := &WorkerPool{ + ID: w.WorkerPoolID, + PID: w.Pid, + StartAt: w.StartedAt, + Concurrency: int(w.Concurrency), + Host: w.Host, + HeartbeatAt: w.HeartbeatAt, + } + workerPool = append(workerPool, wp) + } + return workerPool, nil +} diff --git a/src/pkg/jobmonitor/worker.go b/src/pkg/jobmonitor/worker.go new file mode 100644 index 000000000..debd4fab8 --- /dev/null +++ b/src/pkg/jobmonitor/worker.go @@ -0,0 +1,87 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package jobmonitor + +import ( + "context" + "strings" + + "github.com/gocraft/work" + + "github.com/goharbor/harbor/src/pkg/task" +) + +const all = "all" + +// WorkerManager ... +type WorkerManager interface { + // List lists all workers in the specified pool + List(ctx context.Context, monitClient JobServiceMonitorClient, poolID string) ([]*Worker, error) +} + +type workerManagerImpl struct { + taskMgr task.Manager +} + +// NewWorkerManager ... +func NewWorkerManager() WorkerManager { + return &workerManagerImpl{taskMgr: task.NewManager()} +} + +func (w *workerManagerImpl) List(ctx context.Context, monitClient JobServiceMonitorClient, poolID string) ([]*Worker, error) { + wphs, err := monitClient.WorkerPoolHeartbeats() + if err != nil { + return nil, err + } + workerPoolMap := make(map[string]string) + for _, wph := range wphs { + for _, id := range wph.WorkerIDs { + workerPoolMap[id] = wph.WorkerPoolID + } + } + + workers, err := monitClient.WorkerObservations() + if err != nil { + return nil, err + } + if strings.EqualFold(poolID, all) { + return convertToWorker(workers, workerPoolMap), nil + } + // filter workers by pool id + filteredWorkers := make([]*work.WorkerObservation, 0) + for _, w := range workers { + if workerPoolMap[w.WorkerID] == poolID { + filteredWorkers = append(filteredWorkers, w) + } + } + return convertToWorker(filteredWorkers, workerPoolMap), nil +} + +func convertToWorker(workers []*work.WorkerObservation, workerPoolMap map[string]string) []*Worker { + wks := make([]*Worker, 0) + for _, w := range workers { + wks = append(wks, &Worker{ + ID: w.WorkerID, + PoolID: workerPoolMap[w.WorkerID], + IsBusy: w.IsBusy, + JobName: w.JobName, + JobID: w.JobID, + StartedAt: w.StartedAt, + CheckIn: w.Checkin, + CheckInAt: w.CheckinAt, + }) + } + return wks +} diff --git a/src/pkg/task/mock_jobservice_client_test.go b/src/pkg/task/mock_jobservice_client_test.go index b1f8a6ce3..27b0b9059 100644 --- a/src/pkg/task/mock_jobservice_client_test.go +++ b/src/pkg/task/mock_jobservice_client_test.go @@ -60,6 +60,29 @@ func (_m *mockJobserviceClient) GetJobLog(uuid string) ([]byte, error) { return r0, r1 } +// GetJobServiceConfig provides a mock function with given fields: +func (_m *mockJobserviceClient) GetJobServiceConfig() (*job.Config, error) { + ret := _m.Called() + + var r0 *job.Config + if rf, ok := ret.Get(0).(func() *job.Config); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*job.Config) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // PostAction provides a mock function with given fields: uuid, action func (_m *mockJobserviceClient) PostAction(uuid string, action string) error { ret := _m.Called(uuid, action) diff --git a/src/server/v2.0/handler/handler.go b/src/server/v2.0/handler/handler.go index 4fe0a7fc6..81a239584 100644 --- a/src/server/v2.0/handler/handler.go +++ b/src/server/v2.0/handler/handler.go @@ -67,6 +67,7 @@ func New() http.Handler { ProjectMetadataAPI: newProjectMetadaAPI(), PurgeAPI: newPurgeAPI(), ScanDataExportAPI: newScanDataExportAPI(), + JobserviceAPI: newJobServiceAPI(), }) if err != nil { log.Fatal(err) diff --git a/src/server/v2.0/handler/jobservice.go b/src/server/v2.0/handler/jobservice.go new file mode 100644 index 000000000..70072efeb --- /dev/null +++ b/src/server/v2.0/handler/jobservice.go @@ -0,0 +1,108 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package handler + +import ( + "context" + "time" + + "github.com/go-openapi/runtime/middleware" + "github.com/go-openapi/strfmt" + + "github.com/goharbor/harbor/src/common/rbac" + "github.com/goharbor/harbor/src/controller/jobmonitor" + jm "github.com/goharbor/harbor/src/pkg/jobmonitor" + "github.com/goharbor/harbor/src/server/v2.0/models" + "github.com/goharbor/harbor/src/server/v2.0/restapi/operations/jobservice" +) + +type jobServiceAPI struct { + BaseAPI + jobCtr jobmonitor.MonitorController +} + +func newJobServiceAPI() *jobServiceAPI { + return &jobServiceAPI{jobCtr: jobmonitor.Ctl} +} + +func (j *jobServiceAPI) GetWorkerPools(ctx context.Context, params jobservice.GetWorkerPoolsParams) middleware.Responder { + if err := j.RequireSystemAccess(ctx, rbac.ActionList, rbac.ResourceJobServiceMonitor); err != nil { + return j.SendError(ctx, err) + } + workPools, err := j.jobCtr.ListPools(ctx) + if err != nil { + return j.SendError(ctx, err) + } + return jobservice.NewGetWorkerPoolsOK().WithPayload(toWorkerPoolResponse(workPools)) +} + +func (j *jobServiceAPI) GetWorkers(ctx context.Context, params jobservice.GetWorkersParams) middleware.Responder { + if err := j.RequireSystemAccess(ctx, rbac.ActionList, rbac.ResourceJobServiceMonitor); err != nil { + return j.SendError(ctx, err) + } + workers, err := j.jobCtr.ListWorkers(ctx, params.PoolID) + if err != nil { + return j.SendError(ctx, err) + } + return jobservice.NewGetWorkersOK().WithPayload(toWorkerResponse(workers)) +} + +func (j *jobServiceAPI) StopRunningJob(ctx context.Context, params jobservice.StopRunningJobParams) middleware.Responder { + if err := j.RequireSystemAccess(ctx, rbac.ActionStop, rbac.ResourceJobServiceMonitor); err != nil { + return j.SendError(ctx, err) + } + err := j.jobCtr.StopRunningJob(ctx, params.JobID) + if err != nil { + return j.SendError(ctx, err) + } + return jobservice.NewStopRunningJobOK() +} + +func toWorkerResponse(wks []*jm.Worker) []*models.Worker { + workers := make([]*models.Worker, 0) + for _, w := range wks { + p := &models.Worker{ + ID: w.ID, + JobName: w.JobName, + JobID: w.JobID, + PoolID: w.PoolID, + Args: w.Args, + StartAt: covertTime(w.StartedAt), + CheckinAt: covertTime(w.CheckInAt), + } + workers = append(workers, p) + } + return workers +} + +func toWorkerPoolResponse(wps []*jm.WorkerPool) []*models.WorkerPool { + pools := make([]*models.WorkerPool, 0) + for _, wp := range wps { + p := &models.WorkerPool{ + Pid: int64(wp.PID), + HeartbeatAt: covertTime(wp.HeartbeatAt), + Concurrency: int64(wp.Concurrency), + WorkerPoolID: wp.ID, + StartAt: covertTime(wp.StartAt), + } + pools = append(pools, p) + } + return pools +} + +func covertTime(t int64) strfmt.DateTime { + uxt := time.Unix(int64(t), 0) + return strfmt.DateTime(uxt) +} diff --git a/src/testing/job/mock_client.go b/src/testing/job/mock_client.go index efda03fc7..69e001b0a 100644 --- a/src/testing/job/mock_client.go +++ b/src/testing/job/mock_client.go @@ -14,6 +14,11 @@ type MockJobClient struct { JobUUID []string } +// GetJobServiceConfig ... +func (mjc *MockJobClient) GetJobServiceConfig() (*job.Config, error) { + panic("implement me") +} + // GetJobLog ... func (mjc *MockJobClient) GetJobLog(uuid string) ([]byte, error) { if uuid == "500" { diff --git a/src/testing/pkg/jobmonitor/job_service_monitor_client.go b/src/testing/pkg/jobmonitor/job_service_monitor_client.go new file mode 100644 index 000000000..e2fcb126e --- /dev/null +++ b/src/testing/pkg/jobmonitor/job_service_monitor_client.go @@ -0,0 +1,74 @@ +// Code generated by mockery v2.14.0. DO NOT EDIT. + +package jobmonitor + +import ( + work "github.com/gocraft/work" + mock "github.com/stretchr/testify/mock" +) + +// JobServiceMonitorClient is an autogenerated mock type for the JobServiceMonitorClient type +type JobServiceMonitorClient struct { + mock.Mock +} + +// WorkerObservations provides a mock function with given fields: +func (_m *JobServiceMonitorClient) WorkerObservations() ([]*work.WorkerObservation, error) { + ret := _m.Called() + + var r0 []*work.WorkerObservation + if rf, ok := ret.Get(0).(func() []*work.WorkerObservation); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*work.WorkerObservation) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// WorkerPoolHeartbeats provides a mock function with given fields: +func (_m *JobServiceMonitorClient) WorkerPoolHeartbeats() ([]*work.WorkerPoolHeartbeat, error) { + ret := _m.Called() + + var r0 []*work.WorkerPoolHeartbeat + if rf, ok := ret.Get(0).(func() []*work.WorkerPoolHeartbeat); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*work.WorkerPoolHeartbeat) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +type mockConstructorTestingTNewJobServiceMonitorClient interface { + mock.TestingT + Cleanup(func()) +} + +// NewJobServiceMonitorClient creates a new instance of JobServiceMonitorClient. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewJobServiceMonitorClient(t mockConstructorTestingTNewJobServiceMonitorClient) *JobServiceMonitorClient { + mock := &JobServiceMonitorClient{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/src/testing/pkg/jobmonitor/pool_manager.go b/src/testing/pkg/jobmonitor/pool_manager.go new file mode 100644 index 000000000..e4380c22c --- /dev/null +++ b/src/testing/pkg/jobmonitor/pool_manager.go @@ -0,0 +1,53 @@ +// Code generated by mockery v2.14.0. DO NOT EDIT. + +package jobmonitor + +import ( + context "context" + + jobmonitor "github.com/goharbor/harbor/src/pkg/jobmonitor" + mock "github.com/stretchr/testify/mock" +) + +// PoolManager is an autogenerated mock type for the PoolManager type +type PoolManager struct { + mock.Mock +} + +// List provides a mock function with given fields: ctx, monitorClient +func (_m *PoolManager) List(ctx context.Context, monitorClient jobmonitor.JobServiceMonitorClient) ([]*jobmonitor.WorkerPool, error) { + ret := _m.Called(ctx, monitorClient) + + var r0 []*jobmonitor.WorkerPool + if rf, ok := ret.Get(0).(func(context.Context, jobmonitor.JobServiceMonitorClient) []*jobmonitor.WorkerPool); ok { + r0 = rf(ctx, monitorClient) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*jobmonitor.WorkerPool) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, jobmonitor.JobServiceMonitorClient) error); ok { + r1 = rf(ctx, monitorClient) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +type mockConstructorTestingTNewPoolManager interface { + mock.TestingT + Cleanup(func()) +} + +// NewPoolManager creates a new instance of PoolManager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewPoolManager(t mockConstructorTestingTNewPoolManager) *PoolManager { + mock := &PoolManager{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/src/testing/pkg/jobmonitor/worker_manager.go b/src/testing/pkg/jobmonitor/worker_manager.go new file mode 100644 index 000000000..1a66b9eb4 --- /dev/null +++ b/src/testing/pkg/jobmonitor/worker_manager.go @@ -0,0 +1,53 @@ +// Code generated by mockery v2.14.0. DO NOT EDIT. + +package jobmonitor + +import ( + context "context" + + jobmonitor "github.com/goharbor/harbor/src/pkg/jobmonitor" + mock "github.com/stretchr/testify/mock" +) + +// WorkerManager is an autogenerated mock type for the WorkerManager type +type WorkerManager struct { + mock.Mock +} + +// List provides a mock function with given fields: ctx, monitClient, poolID +func (_m *WorkerManager) List(ctx context.Context, monitClient jobmonitor.JobServiceMonitorClient, poolID string) ([]*jobmonitor.Worker, error) { + ret := _m.Called(ctx, monitClient, poolID) + + var r0 []*jobmonitor.Worker + if rf, ok := ret.Get(0).(func(context.Context, jobmonitor.JobServiceMonitorClient, string) []*jobmonitor.Worker); ok { + r0 = rf(ctx, monitClient, poolID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*jobmonitor.Worker) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, jobmonitor.JobServiceMonitorClient, string) error); ok { + r1 = rf(ctx, monitClient, poolID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +type mockConstructorTestingTNewWorkerManager interface { + mock.TestingT + Cleanup(func()) +} + +// NewWorkerManager creates a new instance of WorkerManager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewWorkerManager(t mockConstructorTestingTNewWorkerManager) *WorkerManager { + mock := &WorkerManager{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/src/testing/pkg/pkg.go b/src/testing/pkg/pkg.go index 448847d6a..145ced63a 100644 --- a/src/testing/pkg/pkg.go +++ b/src/testing/pkg/pkg.go @@ -68,3 +68,6 @@ package pkg //go:generate mockery --case snake --dir ../../pkg/registry --name Client --output ./registry --outpkg registry --filename fake_registry_client.go //go:generate mockery --case snake --dir ../../pkg/member --name Manager --output ./member --outpkg member --filename fake_member_manager.go //go:generate mockery --case snake --dir ../../pkg/usergroup --name Manager --output ./usergroup --outpkg usergroup --filename fake_usergroup_manager.go +//go:generate mockery --case snake --dir ../../pkg/jobmonitor --name PoolManager --output ./jobmonitor --outpkg jobmonitor +//go:generate mockery --case snake --dir ../../pkg/jobmonitor --name JobServiceMonitorClient --output ./jobmonitor --outpkg jobmonitor +//go:generate mockery --case snake --dir ../../pkg/jobmonitor --name WorkerManager --output ./jobmonitor --outpkg jobmonitor