From f2870272ce166bc0ae495e55e5433f1e1763a9d3 Mon Sep 17 00:00:00 2001 From: Steven Zou Date: Sun, 28 Apr 2019 22:36:13 +0800 Subject: [PATCH 1/2] add get jobs API to provide powerful job stats management Signed-off-by: Steven Zou --- src/jobservice/api/handler.go | 40 +- src/jobservice/api/handler_test.go | 23 +- src/jobservice/api/router.go | 2 +- src/jobservice/common/query/q.go | 20 + src/jobservice/common/query/q_test.go | 11 + src/jobservice/common/rds/keys.go | 2 - src/jobservice/core/controller.go | 60 ++- src/jobservice/core/controller_test.go | 200 +++++----- src/jobservice/core/interface.go | 9 +- src/jobservice/errs/errors.go | 15 +- src/jobservice/job/tracker.go | 106 ------ src/jobservice/job/tracker_test.go | 10 - src/jobservice/main.go | 11 +- src/jobservice/mgt/manager.go | 349 ++++++++++++++++++ src/jobservice/mgt/manager_test.go | 184 +++++++++ src/jobservice/runtime/bootstrap.go | 10 +- src/jobservice/worker/cworker/c_worker.go | 35 -- .../worker/cworker/c_worker_test.go | 13 +- src/jobservice/worker/interface.go | 13 - 19 files changed, 758 insertions(+), 355 deletions(-) create mode 100644 src/jobservice/mgt/manager.go create mode 100644 src/jobservice/mgt/manager_test.go diff --git a/src/jobservice/api/handler.go b/src/jobservice/api/handler.go index 1bbc6aa2c..69afda977 100644 --- a/src/jobservice/api/handler.go +++ b/src/jobservice/api/handler.go @@ -34,7 +34,10 @@ import ( "strconv" ) -const totalHeaderKey = "Total-Count" +const ( + totalHeaderKey = "Total-Count" + nextCursorKey = "Next-Cursor" +) // Handler defines approaches to handle the http requests. type Handler interface { @@ -56,8 +59,8 @@ type Handler interface { // HandleJobLogReq is used to handle the request of getting periodic executions HandlePeriodicExecutions(w http.ResponseWriter, req *http.Request) - // HandleScheduledJobs is used to handle the request of getting pending scheduled jobs - HandleScheduledJobs(w http.ResponseWriter, req *http.Request) + // HandleGetJobsReq is used to handle the request of getting jobs + HandleGetJobsReq(w http.ResponseWriter, req *http.Request) } // DefaultHandler is the default request handler which implements the Handler interface. @@ -244,17 +247,24 @@ func (dh *DefaultHandler) HandlePeriodicExecutions(w http.ResponseWriter, req *h } -// HandleScheduledJobs is implementation of method defined in interface 'Handler' -func (dh *DefaultHandler) HandleScheduledJobs(w http.ResponseWriter, req *http.Request) { +// HandleGetJobsReq is implementation of method defined in interface 'Handler' +func (dh *DefaultHandler) HandleGetJobsReq(w http.ResponseWriter, req *http.Request) { // Get query parameters q := extractQuery(req) - jobs, total, err := dh.controller.ScheduledJobs(q) + jobs, total, err := dh.controller.GetJobs(q) if err != nil { - dh.handleError(w, req, http.StatusInternalServerError, errs.GetScheduledJobsError(err)) + dh.handleError(w, req, http.StatusInternalServerError, errs.GetJobsError(q, err)) return } - w.Header().Add(totalHeaderKey, fmt.Sprintf("%d", total)) + key := nextCursorKey + if v, ok := q.Extras.Get(query.ExtraParamKeyKind); ok { + if kind, yes := v.(string); yes && kind == job.KindScheduled { + key = totalHeaderKey + } + } + + w.Header().Add(key, fmt.Sprintf("%d", total)) dh.handleJSONData(w, req, http.StatusOK, jobs) } @@ -321,6 +331,20 @@ func extractQuery(req *http.Request) *query.Parameter { } } + // Extra job kind query param + jobKind := queries.Get(query.ParamKeyJobKind) + if !utils.IsEmptyStr(jobKind) { + q.Extras.Set(query.ExtraParamKeyKind, jobKind) + } + + // Extra query cursor + cursorV := queries.Get(query.ParamKeyCursor) + if !utils.IsEmptyStr(cursorV) { + if cursor, err := strconv.ParseInt(cursorV, 10, 32); err == nil { + q.Extras.Set(query.ExtraParamKeyCursor, cursor) + } + } + return q } diff --git a/src/jobservice/api/handler_test.go b/src/jobservice/api/handler_test.go index ffe281e1b..f0adc3a47 100644 --- a/src/jobservice/api/handler_test.go +++ b/src/jobservice/api/handler_test.go @@ -277,7 +277,7 @@ func (suite *APIHandlerTestSuite) TestGetPeriodicExecutionsWithQuery() { } // TestScheduledJobs ... -func (suite *APIHandlerTestSuite) TestScheduledJobs() { +func (suite *APIHandlerTestSuite) TestGetJobs() { q := &query.Parameter{ PageNumber: 2, PageSize: 50, @@ -285,11 +285,20 @@ func (suite *APIHandlerTestSuite) TestScheduledJobs() { } fc := &fakeController{} - fc.On("ScheduledJobs", q). - Return([]*job.Stats{createJobStats("sample", "Generic", "")}, int64(1), nil) + fc.On("GetJobs", q). + Return([]*job.Stats{createJobStats("sample", job.KindGeneric, "")}, int64(1), nil) suite.controller = fc - _, code := suite.getReq(fmt.Sprintf("%s/%s", suite.APIAddr, "jobs/scheduled?page_number=2&page_size=50")) + _, code := suite.getReq(fmt.Sprintf("%s/%s", suite.APIAddr, "jobs?page_number=2&page_size=50")) + assert.Equal(suite.T(), 200, code, "expected 200 ok but got %d", code) + + q.Extras.Set(query.ExtraParamKeyKind, job.KindScheduled) + fc = &fakeController{} + fc.On("GetJobs", q). + Return([]*job.Stats{createJobStats("sample", job.KindScheduled, "")}, int64(1), nil) + suite.controller = fc + + _, code = suite.getReq(fmt.Sprintf("%s/%s", suite.APIAddr, "jobs?page_number=2&page_size=50&kind=Scheduled")) assert.Equal(suite.T(), 200, code, "expected 200 ok but got %d", code) } @@ -397,8 +406,8 @@ func (suite *APIHandlerTestSuite) GetPeriodicExecutions(periodicJobID string, qu return suite.controller.GetPeriodicExecutions(periodicJobID, query) } -func (suite *APIHandlerTestSuite) ScheduledJobs(query *query.Parameter) ([]*job.Stats, int64, error) { - return suite.controller.ScheduledJobs(query) +func (suite *APIHandlerTestSuite) GetJobs(query *query.Parameter) ([]*job.Stats, int64, error) { + return suite.controller.GetJobs(query) } type fakeController struct { @@ -460,7 +469,7 @@ func (fc *fakeController) GetPeriodicExecutions(periodicJobID string, query *que return args.Get(0).([]*job.Stats), args.Get(1).(int64), nil } -func (fc *fakeController) ScheduledJobs(query *query.Parameter) ([]*job.Stats, int64, error) { +func (fc *fakeController) GetJobs(query *query.Parameter) ([]*job.Stats, int64, error) { args := fc.Called(query) if args.Error(2) != nil { return nil, args.Get(1).(int64), args.Error(2) diff --git a/src/jobservice/api/router.go b/src/jobservice/api/router.go index ea92061d4..35b836704 100644 --- a/src/jobservice/api/router.go +++ b/src/jobservice/api/router.go @@ -88,7 +88,7 @@ func (br *BaseRouter) registerRoutes() { subRouter := br.router.PathPrefix(fmt.Sprintf("%s/%s", baseRoute, apiVersion)).Subrouter() subRouter.HandleFunc("/jobs", br.handler.HandleLaunchJobReq).Methods(http.MethodPost) - subRouter.HandleFunc("/jobs/scheduled", br.handler.HandleScheduledJobs).Methods(http.MethodGet) + subRouter.HandleFunc("/jobs", br.handler.HandleGetJobsReq).Methods(http.MethodGet) subRouter.HandleFunc("/jobs/{job_id}", br.handler.HandleGetJobReq).Methods(http.MethodGet) subRouter.HandleFunc("/jobs/{job_id}", br.handler.HandleJobActionReq).Methods(http.MethodPost) subRouter.HandleFunc("/jobs/{job_id}/log", br.handler.HandleJobLogReq).Methods(http.MethodGet) diff --git a/src/jobservice/common/query/q.go b/src/jobservice/common/query/q.go index b546f527a..3e123365c 100644 --- a/src/jobservice/common/query/q.go +++ b/src/jobservice/common/query/q.go @@ -14,6 +14,8 @@ package query +import "encoding/json" + const ( // DefaultPageSize defines the default page size DefaultPageSize uint = 25 @@ -23,8 +25,16 @@ const ( ParamKeyPageSize = "page_size" // ParamKeyNonStoppedOnly defines query param key of querying non stopped periodic executions ParamKeyNonStoppedOnly = "non_dead_only" + // ParamKeyCursor defines query param of cursor for fetching job stats with batches + ParamKeyCursor = "cursor" + // ParamKeyJobKind defines query param of job kind + ParamKeyJobKind = "kind" // ExtraParamKeyNonStoppedOnly defines extra parameter key for querying non stopped periodic executions ExtraParamKeyNonStoppedOnly = "NonDeadOnly" + // ExtraParamKeyCursor defines extra parameter key for the cursor of fetching job stats with batches + ExtraParamKeyCursor = "Cursor" + // ExtraParamKeyKind defines extra parameter key for the job kind + ExtraParamKeyKind = "Kind" ) // ExtraParameters to keep non pagination query parameters @@ -44,6 +54,16 @@ func (ep ExtraParameters) Get(key string) (interface{}, bool) { return v, ok } +// String returns the json string of ExtraParameters +func (ep ExtraParameters) String() string { + bytes, err := json.Marshal(&ep) + if err != nil { + return "" + } + + return string(bytes) +} + // Parameter for getting executions type Parameter struct { PageNumber uint diff --git a/src/jobservice/common/query/q_test.go b/src/jobservice/common/query/q_test.go index 379e8af06..4b345df4b 100644 --- a/src/jobservice/common/query/q_test.go +++ b/src/jobservice/common/query/q_test.go @@ -1,6 +1,8 @@ package query import ( + "encoding/json" + "github.com/stretchr/testify/require" "testing" "github.com/stretchr/testify/assert" @@ -26,4 +28,13 @@ func (suite *QueryTestSuite) TestExtraParams() { assert.Equal(suite.T(), true, ok) assert.Equal(suite.T(), 100, v.(int)) + + str := extras.String() + copy := make(ExtraParameters) + err := json.Unmarshal([]byte(str), ©) + require.NoError(suite.T(), err) + + v, ok = copy.Get("a") + assert.Equal(suite.T(), true, ok) + assert.Equal(suite.T(), 100, int(v.(float64))) } diff --git a/src/jobservice/common/rds/keys.go b/src/jobservice/common/rds/keys.go index c8ee83f85..db6ae90e0 100644 --- a/src/jobservice/common/rds/keys.go +++ b/src/jobservice/common/rds/keys.go @@ -37,8 +37,6 @@ func RedisKeyLastPeriodicEnqueue(namespace string) string { return RedisNamespacePrefix(namespace) + "last_periodic_enqueue" } -// ---------------------------------------------------------- - // KeyNamespacePrefix returns the based key based on the namespace. func KeyNamespacePrefix(namespace string) string { ns := strings.TrimSpace(namespace) diff --git a/src/jobservice/core/controller.go b/src/jobservice/core/controller.go index 07273b39f..ea018e53a 100644 --- a/src/jobservice/core/controller.go +++ b/src/jobservice/core/controller.go @@ -16,6 +16,7 @@ package core import ( "fmt" + "github.com/goharbor/harbor/src/jobservice/mgt" "github.com/pkg/errors" "github.com/goharbor/harbor/src/jobservice/logger" @@ -24,7 +25,6 @@ import ( "github.com/goharbor/harbor/src/jobservice/common/utils" "github.com/goharbor/harbor/src/jobservice/errs" "github.com/goharbor/harbor/src/jobservice/job" - "github.com/goharbor/harbor/src/jobservice/lcm" "github.com/goharbor/harbor/src/jobservice/worker" "github.com/robfig/cron" ) @@ -34,15 +34,15 @@ import ( type basicController struct { // Refer the backend worker backendWorker worker.Interface - // Refer the job life cycle management controller - ctl lcm.Controller + // Refer the job stats manager + manager mgt.Manager } // NewController is constructor of basicController. -func NewController(backendWorker worker.Interface, ctl lcm.Controller) Interface { +func NewController(backendWorker worker.Interface, mgr mgt.Manager) Interface { return &basicController{ backendWorker: backendWorker, - ctl: ctl, + manager: mgr, } } @@ -92,7 +92,7 @@ func (bc *basicController) LaunchJob(req *job.Request) (res *job.Stats, err erro // Save job stats if err == nil { - if _, err := bc.ctl.New(res); err != nil { + if err := bc.manager.SaveJob(res); err != nil { return nil, err } } @@ -106,12 +106,7 @@ func (bc *basicController) GetJob(jobID string) (*job.Stats, error) { return nil, errs.BadRequestError(errors.New("empty job ID")) } - t, err := bc.ctl.Track(jobID) - if err != nil { - return nil, err - } - - return t.Job(), nil + return bc.manager.GetJob(jobID) } // StopJob is implementation of same method in core interface. @@ -157,32 +152,25 @@ func (bc *basicController) GetPeriodicExecutions(periodicJobID string, query *qu return nil, 0, errs.BadRequestError(errors.New("nil periodic job ID")) } - t, err := bc.ctl.Track(periodicJobID) - if err != nil { - return nil, 0, err - } - - eIDs, total, err := t.Executions(query) - if err != nil { - return nil, 0, err - } - - res := make([]*job.Stats, 0) - for _, eID := range eIDs { - et, err := bc.ctl.Track(eID) - if err != nil { - return nil, 0, err - } - - res = append(res, et.Job()) - } - - return res, total, nil + return bc.manager.GetPeriodicExecution(periodicJobID, query) } -// ScheduledJobs returns the scheduled jobs by page -func (bc *basicController) ScheduledJobs(query *query.Parameter) ([]*job.Stats, int64, error) { - return bc.backendWorker.ScheduledJobs(query) +// GetJobs returns the jobs by specified +func (bc *basicController) GetJobs(q *query.Parameter) ([]*job.Stats, int64, error) { + onlyScheduledJobs := false + if q != nil && q.Extras != nil { + if v, ok := q.Extras.Get(query.ExtraParamKeyKind); ok { + if job.KindScheduled == v.(string) { + onlyScheduledJobs = true + } + } + } + + if onlyScheduledJobs { + return bc.manager.GetScheduledJobs(q) + } + + return bc.manager.GetJobs(q) } func validJobReq(req *job.Request) error { diff --git a/src/jobservice/core/controller_test.go b/src/jobservice/core/controller_test.go index 4501d36ea..d8bc11bb9 100644 --- a/src/jobservice/core/controller_test.go +++ b/src/jobservice/core/controller_test.go @@ -14,28 +14,25 @@ package core import ( - "context" "github.com/goharbor/harbor/src/jobservice/common/query" "github.com/goharbor/harbor/src/jobservice/common/utils" "github.com/goharbor/harbor/src/jobservice/job" "github.com/goharbor/harbor/src/jobservice/job/impl/sample" - "github.com/goharbor/harbor/src/jobservice/tests" "github.com/goharbor/harbor/src/jobservice/worker" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "testing" - "time" ) // ControllerTestSuite tests functions of core controller type ControllerTestSuite struct { suite.Suite - lcmCtl *fakeLcmController - worker *fakeWorker - ctl Interface + manager *fakeManager + worker *fakeWorker + ctl Interface res *job.Stats jobID string @@ -60,14 +57,17 @@ func (suite *ControllerTestSuite) SetupSuite() { // Prepare for each test case func (suite *ControllerTestSuite) SetupTest() { suite.worker = &fakeWorker{} - suite.lcmCtl = &fakeLcmController{} - - suite.lcmCtl.On("Track", suite.jobID).Return(job.NewBasicTrackerWithStats(nil, suite.res, "ns", nil, nil), nil) - suite.lcmCtl.On("New", suite.res).Return(job.NewBasicTrackerWithStats(nil, suite.res, "ns", nil, nil), nil) suite.worker.On("IsKnownJob", job.SampleJob).Return((*sample.Job)(nil), true) suite.worker.On("IsKnownJob", "fake").Return(nil, false) suite.worker.On("ValidateJobParameters", (*sample.Job)(nil), suite.params).Return(nil) + + fakeMgr := &fakeManager{} + fakeMgr.On("SaveJob", suite.res).Return(nil) + fakeMgr.On("GetJob", suite.jobID).Return(suite.res, nil) + + suite.manager = fakeMgr + } // TestControllerTestSuite is suite entry for 'go test' @@ -142,20 +142,6 @@ func (suite *ControllerTestSuite) TestCheckStatus() { assert.Equal(suite.T(), "running", st.Pools[0].Status, "expected running pool but got %s", st.Pools[0].Status) } -// TestScheduledJobs ... -func (suite *ControllerTestSuite) TestScheduledJobs() { - q := &query.Parameter{ - PageSize: 20, - PageNumber: 1, - } - - suite.worker.On("ScheduledJobs", q).Return([]*job.Stats{suite.res}, 1, nil) - - _, total, err := suite.ctl.ScheduledJobs(q) - require.Nil(suite.T(), err, "scheduled jobs: nil error expected but got %s", err) - assert.Equal(suite.T(), int64(1), total, "expected 1 item but got 0") -} - // TestInvalidChecks ... func (suite *ControllerTestSuite) TestInvalidChecks() { req := createJobReq("kind") @@ -180,57 +166,42 @@ func (suite *ControllerTestSuite) TestInvalidChecks() { assert.NotNil(suite.T(), err, "invalid job name: error expected but got nil") } +// TestScheduledJobs ... +func (suite *ControllerTestSuite) TestGetScheduledJobs() { + extras := make(query.ExtraParameters) + extras.Set(query.ExtraParamKeyKind, job.KindScheduled) + q := &query.Parameter{ + PageSize: 20, + PageNumber: 1, + Extras: extras, + } + + fakeMgr := &fakeManager{} + fakeMgr.On("SaveJob", suite.res).Return(nil) + fakeMgr.On("GetJob", suite.jobID).Return(suite.res, nil) + fakeMgr.On("GetScheduledJobs", q).Return([]*job.Stats{suite.res}, int64(1), nil) + suite.manager = fakeMgr + + _, total, err := suite.ctl.GetJobs(q) + require.Nil(suite.T(), err, "scheduled jobs: nil error expected but got %s", err) + assert.Equal(suite.T(), int64(1), total, "expected 1 item but got 0") +} + // TestGetPeriodicExecutions tests GetPeriodicExecutions func (suite *ControllerTestSuite) TestGetPeriodicExecutions() { - pool := tests.GiveMeRedisPool() - namespace := tests.GiveMeTestNamespace() - - jobID := utils.MakeIdentifier() - nID := time.Now().Unix() - mockJobStats := &job.Stats{ - Info: &job.StatsInfo{ - JobID: jobID, - Status: job.ScheduledStatus.String(), - JobKind: job.KindPeriodic, - JobName: job.SampleJob, - IsUnique: false, - CronSpec: "0 0 * * * *", - NumericPID: nID, - }, - } - - t := job.NewBasicTrackerWithStats(context.TODO(), mockJobStats, namespace, pool, nil) - err := t.Save() - require.NoError(suite.T(), err) - - executionID := utils.MakeIdentifier() - runAt := time.Now().Add(1 * time.Hour).Unix() - executionStats := &job.Stats{ - Info: &job.StatsInfo{ - JobID: executionID, - Status: job.ScheduledStatus.String(), - JobKind: job.KindScheduled, - JobName: job.SampleJob, - IsUnique: false, - CronSpec: "0 0 * * * *", - RunAt: runAt, - EnqueueTime: runAt, - UpstreamJobID: jobID, - }, - } - - t2 := job.NewBasicTrackerWithStats(context.TODO(), executionStats, namespace, pool, nil) - err = t2.Save() - require.NoError(suite.T(), err) - - suite.lcmCtl.On("Track", jobID).Return(t, nil) - suite.lcmCtl.On("Track", executionID).Return(t2, nil) - - _, total, err := suite.ctl.GetPeriodicExecutions(jobID, &query.Parameter{ + q := &query.Parameter{ PageSize: 10, PageNumber: 1, Extras: make(query.ExtraParameters), - }) + } + + fakeMgr := &fakeManager{} + fakeMgr.On("SaveJob", suite.res).Return(nil) + fakeMgr.On("GetJob", suite.jobID).Return(suite.res, nil) + fakeMgr.On("GetPeriodicExecution", "1000", q).Return([]*job.Stats{suite.res}, int64(1), nil) + suite.manager = fakeMgr + + _, total, err := suite.ctl.GetPeriodicExecutions("1000", q) require.NoError(suite.T(), err) assert.Equal(suite.T(), int64(1), total) } @@ -253,19 +224,6 @@ func createJobReq(kind string) *job.Request { } } -// Implement lcm controller interface -func (suite *ControllerTestSuite) Serve() error { - return suite.lcmCtl.Serve() -} - -func (suite *ControllerTestSuite) New(stats *job.Stats) (job.Tracker, error) { - return suite.lcmCtl.New(stats) -} - -func (suite *ControllerTestSuite) Track(jobID string) (job.Tracker, error) { - return suite.lcmCtl.Track(jobID) -} - // Implement worker interface func (suite *ControllerTestSuite) Start() error { return suite.worker.Start() @@ -307,37 +265,28 @@ func (suite *ControllerTestSuite) RetryJob(jobID string) error { return suite.worker.RetryJob(jobID) } -func (suite *ControllerTestSuite) ScheduledJobs(query *query.Parameter) ([]*job.Stats, int64, error) { - return suite.worker.ScheduledJobs(query) +// Implement manager interface +func (suite *ControllerTestSuite) GetJobs(q *query.Parameter) ([]*job.Stats, int64, error) { + return suite.manager.GetJobs(q) } -// Implement fake objects with mock -type fakeLcmController struct { - mock.Mock +func (suite *ControllerTestSuite) GetPeriodicExecution(pID string, q *query.Parameter) ([]*job.Stats, int64, error) { + return suite.manager.GetPeriodicExecution(pID, q) } -func (flc *fakeLcmController) Serve() error { - return flc.Called().Error(0) +func (suite *ControllerTestSuite) GetScheduledJobs(q *query.Parameter) ([]*job.Stats, int64, error) { + return suite.manager.GetScheduledJobs(q) } -func (flc *fakeLcmController) New(stats *job.Stats) (job.Tracker, error) { - args := flc.Called(stats) - if args.Error(1) != nil { - return nil, args.Error(1) - } - - return args.Get(0).(job.Tracker), nil +func (suite *ControllerTestSuite) GetJob(jobID string) (*job.Stats, error) { + return suite.manager.GetJob(jobID) } -func (flc *fakeLcmController) Track(jobID string) (job.Tracker, error) { - args := flc.Called(jobID) - if args.Error(1) != nil { - return nil, args.Error(1) - } - - return args.Get(0).(job.Tracker), nil +func (suite *ControllerTestSuite) SaveJob(j *job.Stats) error { + return suite.manager.SaveJob(j) } +// fake worker type fakeWorker struct { mock.Mock } @@ -407,11 +356,48 @@ func (f *fakeWorker) RetryJob(jobID string) error { return f.Called(jobID).Error(0) } -func (f *fakeWorker) ScheduledJobs(query *query.Parameter) ([]*job.Stats, int64, error) { - args := f.Called(query) +// fake manager +type fakeManager struct { + mock.Mock +} + +func (fm *fakeManager) GetJobs(q *query.Parameter) ([]*job.Stats, int64, error) { + args := fm.Called(q) if args.Error(2) != nil { return nil, 0, args.Error(2) } - return args.Get(0).([]*job.Stats), int64(args.Int(1)), nil + return args.Get(0).([]*job.Stats), args.Get(1).(int64), nil +} + +func (fm *fakeManager) GetPeriodicExecution(pID string, q *query.Parameter) ([]*job.Stats, int64, error) { + args := fm.Called(pID, q) + if args.Error(2) != nil { + return nil, 0, args.Error(2) + } + + return args.Get(0).([]*job.Stats), args.Get(1).(int64), nil +} + +func (fm *fakeManager) GetScheduledJobs(q *query.Parameter) ([]*job.Stats, int64, error) { + args := fm.Called(q) + if args.Error(2) != nil { + return nil, 0, args.Error(2) + } + + return args.Get(0).([]*job.Stats), args.Get(1).(int64), nil +} + +func (fm *fakeManager) GetJob(jobID string) (*job.Stats, error) { + args := fm.Called(jobID) + if args.Error(1) != nil { + return nil, args.Error(1) + } + + return args.Get(0).(*job.Stats), nil +} + +func (fm *fakeManager) SaveJob(j *job.Stats) error { + args := fm.Called(j) + return args.Error(0) } diff --git a/src/jobservice/core/interface.go b/src/jobservice/core/interface.go index 176a87c57..788257f4e 100644 --- a/src/jobservice/core/interface.go +++ b/src/jobservice/core/interface.go @@ -68,8 +68,11 @@ type Interface interface { // The total number is also returned. GetPeriodicExecutions(periodicJobID string, query *query.Parameter) ([]*job.Stats, int64, error) - // Get the scheduled jobs by page - // The page number in the query will be ignored, default 20 is used. This is the limitation of backend lib. + // Get the jobs with queries. + // + // For scheduled jobs, the page number in the query will be ignored, default 20 is used. + // This is the limitation of backend lib. The int64 is total number. + // For other cases, query the jobs with cursor, not standard pagination. The int64 is next cursor. // The total number is also returned. - ScheduledJobs(query *query.Parameter) ([]*job.Stats, int64, error) + GetJobs(query *query.Parameter) ([]*job.Stats, int64, error) } diff --git a/src/jobservice/errs/errors.go b/src/jobservice/errs/errors.go index b937c5d07..7e5abcb36 100644 --- a/src/jobservice/errs/errors.go +++ b/src/jobservice/errs/errors.go @@ -18,6 +18,7 @@ package errs import ( "encoding/json" "fmt" + "github.com/goharbor/harbor/src/jobservice/common/query" ) const ( @@ -49,8 +50,8 @@ const ( ResourceConflictsErrorCode // BadRequestErrorCode is code for the error of bad request BadRequestErrorCode - // GetScheduledJobsErrorCode is code for the error of getting scheduled jobs - GetScheduledJobsErrorCode + // GetJobsErrorCode is code for the error of getting scheduled jobs + GetJobsErrorCode // GetPeriodicExecutionErrorCode is code for the error of getting periodic executions GetPeriodicExecutionErrorCode // StatusMismatchErrorCode is code for the error of mismatching status @@ -137,9 +138,13 @@ func UnauthorizedError(err error) error { return New(UnAuthorizedErrorCode, "unauthorized", err.Error()) } -// GetScheduledJobsError is error for the case of getting scheduled jobs failed -func GetScheduledJobsError(err error) error { - return New(GetScheduledJobsErrorCode, "failed to get scheduled jobs", err.Error()) +// GetJobsError is error for the case of getting jobs failed +func GetJobsError(q *query.Parameter, err error) error { + qStr := "" + if q != nil { + qStr = q.Extras.String() + } + return New(GetJobsErrorCode, fmt.Sprintf("failed to get scheduled jobs, q=%s", qStr), err.Error()) } // GetPeriodicExecutionError is error for the case of getting periodic jobs failed diff --git a/src/jobservice/job/tracker.go b/src/jobservice/job/tracker.go index e493fec7a..0e70d338a 100644 --- a/src/jobservice/job/tracker.go +++ b/src/jobservice/job/tracker.go @@ -17,7 +17,6 @@ package job import ( "context" "encoding/json" - "github.com/goharbor/harbor/src/jobservice/common/query" "github.com/goharbor/harbor/src/jobservice/common/rds" "github.com/goharbor/harbor/src/jobservice/common/utils" "github.com/goharbor/harbor/src/jobservice/errs" @@ -66,15 +65,6 @@ type Tracker interface { // error if update failed Update(fieldAndValues ...interface{}) error - // Executions returns the executions of the job tracked by this tracker. - // Please pay attention, this only for periodic job. - // - // Returns: - // job execution IDs matched the query - // the total number - // error if any issues happened - Executions(q *query.Parameter) ([]string, int64, error) - // NumericID returns the numeric ID of periodic job. // Please pay attention, this only for periodic job. NumericID() (int64, error) @@ -241,65 +231,6 @@ func (bt *basicTracker) CheckIn(message string) error { return err } -// Executions of the tracked job -func (bt *basicTracker) Executions(q *query.Parameter) ([]string, int64, error) { - if bt.jobStats.Info.JobKind != KindPeriodic { - return nil, 0, errors.New("only periodic job has executions") - } - - conn := bt.pool.Get() - defer func() { - _ = conn.Close() - }() - - key := rds.KeyUpstreamJobAndExecutions(bt.namespace, bt.jobID) - - // Query executions by "non stopped" - if nonStoppedOnly, ok := q.Extras.Get(query.ExtraParamKeyNonStoppedOnly); ok { - if v, yes := nonStoppedOnly.(bool); yes && v { - return queryExecutions(conn, key, q) - } - } - - // Pagination - var pageNumber, pageSize uint = 1, query.DefaultPageSize - if q != nil { - if q.PageNumber > 0 { - pageNumber = q.PageNumber - } - if q.PageSize > 0 { - pageSize = q.PageSize - } - } - - // Get total first - total, err := redis.Int64(conn.Do("ZCARD", key)) - if err != nil { - return nil, 0, err - } - - // No items - result := make([]string, 0) - if total == 0 || (int64)((pageNumber-1)*pageSize) >= total { - return result, total, nil - } - - min, max := (pageNumber-1)*pageSize, pageNumber*pageSize-1 - args := []interface{}{key, min, max} - list, err := redis.Values(conn.Do("ZREVRANGE", args...)) - if err != nil { - return nil, 0, err - } - - for _, item := range list { - if eID, ok := item.([]byte); ok { - result = append(result, string(eID)) - } - } - - return result, total, nil -} - // Expire job stats func (bt *basicTracker) Expire() error { conn := bt.pool.Get() @@ -709,40 +640,3 @@ func getStatus(conn redis.Conn, key string) (Status, error) { func setStatus(conn redis.Conn, key string, status Status) error { return rds.HmSet(conn, key, "status", status.String(), "update_time", time.Now().Unix()) } - -// queryExecutions queries periodic executions by status -func queryExecutions(conn redis.Conn, dataKey string, q *query.Parameter) ([]string, int64, error) { - total, err := redis.Int64(conn.Do("ZCOUNT", dataKey, 0, "+inf")) - if err != nil { - return nil, 0, err - } - - var pageNumber, pageSize uint = 1, query.DefaultPageSize - if q.PageNumber > 0 { - pageNumber = q.PageNumber - } - if q.PageSize > 0 { - pageSize = q.PageSize - } - - results := make([]string, 0) - if total == 0 || (int64)((pageNumber-1)*pageSize) >= total { - return results, total, nil - } - - offset := (pageNumber - 1) * pageSize - args := []interface{}{dataKey, "+inf", 0, "LIMIT", offset, pageSize} - - eIDs, err := redis.Values(conn.Do("ZREVRANGEBYSCORE", args...)) - if err != nil { - return nil, 0, err - } - - for _, eID := range eIDs { - if eIDBytes, ok := eID.([]byte); ok { - results = append(results, string(eIDBytes)) - } - } - - return results, total, nil -} diff --git a/src/jobservice/job/tracker_test.go b/src/jobservice/job/tracker_test.go index a67ca5f1c..7c08812c1 100644 --- a/src/jobservice/job/tracker_test.go +++ b/src/jobservice/job/tracker_test.go @@ -19,8 +19,6 @@ import ( "testing" "time" - "github.com/goharbor/harbor/src/jobservice/common/query" - "github.com/goharbor/harbor/src/jobservice/common/utils" "github.com/goharbor/harbor/src/jobservice/tests" "github.com/gomodule/redigo/redis" @@ -176,14 +174,6 @@ func (suite *TrackerTestSuite) TestPeriodicTracker() { require.NoError(suite.T(), err) assert.Equal(suite.T(), nID, id) - _, total, err := t.Executions(&query.Parameter{ - PageNumber: 1, - PageSize: 10, - Extras: make(query.ExtraParameters), - }) - require.NoError(suite.T(), err) - assert.Equal(suite.T(), int64(1), total) - err = t2.PeriodicExecutionDone() require.NoError(suite.T(), err) } diff --git a/src/jobservice/main.go b/src/jobservice/main.go index 8b97b5806..50d4923de 100644 --- a/src/jobservice/main.go +++ b/src/jobservice/main.go @@ -18,13 +18,6 @@ import ( "context" "flag" "fmt" - "github.com/goharbor/harbor/src/common" - comcfg "github.com/goharbor/harbor/src/common/config" - "github.com/goharbor/harbor/src/jobservice/job" - "github.com/goharbor/harbor/src/jobservice/job/impl" - "github.com/pkg/errors" - "os" - "github.com/goharbor/harbor/src/jobservice/common/utils" "github.com/goharbor/harbor/src/jobservice/config" "github.com/goharbor/harbor/src/jobservice/logger" @@ -59,7 +52,7 @@ func main() { } // Set job context initializer - runtime.JobService.SetJobContextInitializer(func(ctx context.Context) (job.Context, error) { + /*runtime.JobService.SetJobContextInitializer(func(ctx context.Context) (job.Context, error) { secret := config.GetAuthSecret() if utils.IsEmptyStr(secret) { return nil, errors.New("empty auth secret") @@ -74,7 +67,7 @@ func main() { } return jobCtx, nil - }) + })*/ // Start if err := runtime.JobService.LoadAndRun(ctx, cancel); err != nil { diff --git a/src/jobservice/mgt/manager.go b/src/jobservice/mgt/manager.go new file mode 100644 index 000000000..e31947ce1 --- /dev/null +++ b/src/jobservice/mgt/manager.go @@ -0,0 +1,349 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mgt + +import ( + "context" + "fmt" + "github.com/gocraft/work" + "github.com/goharbor/harbor/src/jobservice/common/query" + "github.com/goharbor/harbor/src/jobservice/common/rds" + "github.com/goharbor/harbor/src/jobservice/common/utils" + "github.com/goharbor/harbor/src/jobservice/errs" + "github.com/goharbor/harbor/src/jobservice/job" + "github.com/goharbor/harbor/src/jobservice/logger" + "github.com/goharbor/harbor/src/jobservice/period" + "github.com/gomodule/redigo/redis" + "github.com/pkg/errors" + "strconv" + "strings" +) + +// Manager defies the related operations to handle the management of job stats. +type Manager interface { + // Get the stats data of all kinds of jobs. + // Data returned by pagination. + // + // Arguments: + // q *query.Parameter : the query parameters + // + // Returns: + // The matched job stats list + // The total number of the jobs + // Non nil error if any issues meet + GetJobs(q *query.Parameter) ([]*job.Stats, int64, error) + + // Get the executions of the specified periodic job by pagination + // + // Arguments: + // pID: ID of the periodic job + // q *query.Parameter: query parameters + // + // Returns: + // The matched job stats list, + // The total number of the executions, + // Non nil error if any issues meet. + GetPeriodicExecution(pID string, q *query.Parameter) ([]*job.Stats, int64, error) + + // Get the scheduled jobs + // + // Arguments: + // q *query.Parameter: query parameters + // + // Returns: + // The matched job stats list, + // The total number of the executions, + // Non nil error if any issues meet. + GetScheduledJobs(q *query.Parameter) ([]*job.Stats, int64, error) + + // Get the stats of the specified job + // + // Arguments: + // jobID string: ID of the job + // + // Returns: + // The job stats + // Non nil error if any issues meet + GetJob(jobID string) (*job.Stats, error) + + // Save the job stats + // + // Arguments: + // job *job.Stats: the saving job stats + // + // Returns: + // Non nil error if any issues meet + SaveJob(job *job.Stats) error +} + +// basicManager is the default implementation of @manager, +// based on redis. +type basicManager struct { + // system context + ctx context.Context + // db namespace + namespace string + // redis conn pool + pool *redis.Pool + // go work client + client *work.Client +} + +// NewManager news a basic manager +func NewManager(ctx context.Context, ns string, pool *redis.Pool) Manager { + return &basicManager{ + ctx: ctx, + namespace: ns, + pool: pool, + client: work.NewClient(ns, pool), + } +} + +// GetJobs is implementation of Manager.GetJobs +// Because of the hash set used to keep the job stats, we can not support a standard pagination. +// A cursor is used to fetch the jobs with several batches. +func (bm *basicManager) GetJobs(q *query.Parameter) ([]*job.Stats, int64, error) { + cursor, count := int64(0), query.DefaultPageSize + if q != nil { + if q.PageSize > 0 { + count = q.PageSize + } + + if cur, ok := q.Extras.Get(query.ExtraParamKeyCursor); ok { + cursor = cur.(int64) + } + } + + pattern := rds.KeyJobStats(bm.namespace, "*") + args := []interface{}{cursor, "MATCH", pattern, "COUNT", count} + + conn := bm.pool.Get() + defer func() { + _ = conn.Close() + }() + + values, err := redis.Values(conn.Do("SCAN", args...)) + if err != nil { + return nil, 0, err + } + if len(values) != 2 { + return nil, 0, errors.New("malform scan results") + } + + nextCur, err := strconv.ParseUint(string(values[0].([]byte)), 10, 8) + if err != nil { + return nil, 0, err + } + list := values[1].([]interface{}) + + results := make([]*job.Stats, 0) + for _, v := range list { + if bytes, ok := v.([]byte); ok { + statsKey := string(bytes) + if i := strings.LastIndex(statsKey, ":"); i != -1 { + jID := statsKey[i+1:] + t := job.NewBasicTrackerWithID(bm.ctx, jID, bm.namespace, bm.pool, nil) + if err := t.Load(); err != nil { + logger.Errorf("retrieve stats data of job %s error: %s", jID, err) + continue + } + + results = append(results, t.Job()) + } + } + } + + return results, int64(nextCur), nil +} + +// GetPeriodicExecution is implementation of Manager.GetPeriodicExecution +func (bm *basicManager) GetPeriodicExecution(pID string, q *query.Parameter) (results []*job.Stats, total int64, err error) { + if utils.IsEmptyStr(pID) { + return nil, 0, errors.New("nil periodic job ID") + } + + tracker := job.NewBasicTrackerWithID(bm.ctx, pID, bm.namespace, bm.pool, nil) + err = tracker.Load() + if err != nil { + return nil, 0, err + } + + if tracker.Job().Info.JobKind != job.KindPeriodic { + return nil, 0, errors.Errorf("only periodic job has executions: %s kind is received", tracker.Job().Info.JobKind) + } + + conn := bm.pool.Get() + defer func() { + _ = conn.Close() + }() + + key := rds.KeyUpstreamJobAndExecutions(bm.namespace, pID) + + executionIDs := make([]string, 0) + // Query executions by "non stopped" + if nonStoppedOnly, ok := q.Extras.Get(query.ExtraParamKeyNonStoppedOnly); ok { + if v, yes := nonStoppedOnly.(bool); yes && v { + executionIDs, total, err = queryExecutions(conn, key, q) + if err != nil { + return nil, 0, err + } + } + } else { + // Query all + // Pagination + var pageNumber, pageSize uint = 1, query.DefaultPageSize + if q != nil { + if q.PageNumber > 0 { + pageNumber = q.PageNumber + } + if q.PageSize > 0 { + pageSize = q.PageSize + } + } + + // Get total first + total, err = redis.Int64(conn.Do("ZCARD", key)) + if err != nil { + return nil, 0, err + } + + // No items + if total == 0 || (int64)((pageNumber-1)*pageSize) >= total { + return results, total, nil + } + + min, max := (pageNumber-1)*pageSize, pageNumber*pageSize-1 + args := []interface{}{key, min, max} + list, err := redis.Values(conn.Do("ZREVRANGE", args...)) + if err != nil { + return nil, 0, err + } + + for _, item := range list { + if eID, ok := item.([]byte); ok { + executionIDs = append(executionIDs, string(eID)) + } + } + } + + for _, eID := range executionIDs { + t := job.NewBasicTrackerWithID(bm.ctx, eID, bm.namespace, bm.pool, nil) + if er := t.Load(); er != nil { + logger.Errorf("track job %s error: %s", eID, err) + continue + } + + results = append(results, t.Job()) + } + + return +} + +// GetScheduledJobs is implementation of Manager.GetScheduledJobs +func (bm *basicManager) GetScheduledJobs(q *query.Parameter) ([]*job.Stats, int64, error) { + // PageSize is not supported here + var page uint = 1 + if q != nil && q.PageNumber > 1 { + page = q.PageNumber + } + + sJobs, total, err := bm.client.ScheduledJobs(page) + if err != nil { + return nil, 0, err + } + + res := make([]*job.Stats, 0) + for _, sJob := range sJobs { + jID := sJob.ID + if len(sJob.Args) > 0 { + if _, ok := sJob.Args[period.PeriodicExecutionMark]; ok { + // Periodic scheduled job + jID = fmt.Sprintf("%s@%d", sJob.ID, sJob.RunAt) + } + } + t := job.NewBasicTrackerWithID(bm.ctx, jID, bm.namespace, bm.pool, nil) + err = t.Load() + if err != nil { + // Just log it + logger.Errorf("mgt.basicManager: query scheduled jobs error: %s", err) + continue + } + + res = append(res, t.Job()) + } + + return res, total, nil +} + +// GetJob is implementation of Manager.GetJob +func (bm *basicManager) GetJob(jobID string) (*job.Stats, error) { + if utils.IsEmptyStr(jobID) { + return nil, errs.BadRequestError("empty job ID") + } + + t := job.NewBasicTrackerWithID(bm.ctx, jobID, bm.namespace, bm.pool, nil) + if err := t.Load(); err != nil { + return nil, err + } + + return t.Job(), nil +} + +// SaveJob is implementation of Manager.SaveJob +func (bm *basicManager) SaveJob(j *job.Stats) error { + if j == nil { + return errs.BadRequestError("nil saving job stats") + } + + t := job.NewBasicTrackerWithStats(bm.ctx, j, bm.namespace, bm.pool, nil) + return t.Save() +} + +// queryExecutions queries periodic executions by status +func queryExecutions(conn redis.Conn, dataKey string, q *query.Parameter) ([]string, int64, error) { + total, err := redis.Int64(conn.Do("ZCOUNT", dataKey, 0, "+inf")) + if err != nil { + return nil, 0, err + } + + var pageNumber, pageSize uint = 1, query.DefaultPageSize + if q.PageNumber > 0 { + pageNumber = q.PageNumber + } + if q.PageSize > 0 { + pageSize = q.PageSize + } + + results := make([]string, 0) + if total == 0 || (int64)((pageNumber-1)*pageSize) >= total { + return results, total, nil + } + + offset := (pageNumber - 1) * pageSize + args := []interface{}{dataKey, "+inf", 0, "LIMIT", offset, pageSize} + + eIDs, err := redis.Values(conn.Do("ZREVRANGEBYSCORE", args...)) + if err != nil { + return nil, 0, err + } + + for _, eID := range eIDs { + if eIDBytes, ok := eID.([]byte); ok { + results = append(results, string(eIDBytes)) + } + } + + return results, total, nil +} diff --git a/src/jobservice/mgt/manager_test.go b/src/jobservice/mgt/manager_test.go new file mode 100644 index 000000000..dabad18df --- /dev/null +++ b/src/jobservice/mgt/manager_test.go @@ -0,0 +1,184 @@ +// Copyright Project Harbor Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mgt + +import ( + "context" + "github.com/gocraft/work" + "github.com/goharbor/harbor/src/jobservice/common/query" + "github.com/goharbor/harbor/src/jobservice/job" + "github.com/goharbor/harbor/src/jobservice/tests" + "github.com/gomodule/redigo/redis" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "testing" + "time" +) + +// BasicManagerTestSuite tests the function of basic manager +type BasicManagerTestSuite struct { + suite.Suite + + namespace string + pool *redis.Pool + + manager Manager +} + +// SetupSuite prepares the test suite +func (suite *BasicManagerTestSuite) SetupSuite() { + suite.namespace = tests.GiveMeTestNamespace() + suite.pool = tests.GiveMeRedisPool() + suite.manager = NewManager(context.TODO(), suite.namespace, suite.pool) + + // Mock data + periodicJob := &job.Stats{ + Info: &job.StatsInfo{ + JobID: "1000", + JobName: job.SampleJob, + JobKind: job.KindPeriodic, + Status: job.ScheduledStatus.String(), + IsUnique: false, + CronSpec: "* */10 * * * *", + }, + } + + t := job.NewBasicTrackerWithStats(context.TODO(), periodicJob, suite.namespace, suite.pool, nil) + err := t.Save() + require.NoError(suite.T(), err) + + execution := &job.Stats{ + Info: &job.StatsInfo{ + JobID: "1001", + JobKind: job.KindScheduled, + JobName: job.SampleJob, + Status: job.PendingStatus.String(), + IsUnique: false, + RunAt: time.Now().Add(5 * time.Minute).Unix(), + UpstreamJobID: "1000", + }, + } + t = job.NewBasicTrackerWithStats(context.TODO(), execution, suite.namespace, suite.pool, nil) + err = t.Save() + require.NoError(suite.T(), err) +} + +// TearDownSuite clears the test suite +func (suite *BasicManagerTestSuite) TearDownSuite() { + conn := suite.pool.Get() + defer func() { + _ = conn.Close() + }() + + _ = tests.ClearAll(suite.namespace, conn) +} + +// TestBasicManagerTestSuite is entry of go test +func TestBasicManagerTestSuite(t *testing.T) { + suite.Run(t, new(BasicManagerTestSuite)) +} + +// TestBasicManagerGetJobs tests get jobs +func (suite *BasicManagerTestSuite) TestBasicManagerGetJobs() { + jobs, _, err := suite.manager.GetJobs(&query.Parameter{ + PageSize: 25, + PageNumber: 1, + }) + require.NoError(suite.T(), err) + assert.Condition(suite.T(), func() bool { + return len(jobs) > 0 + }) +} + +// TestGetPeriodicExecutions tests get periodic executions +func (suite *BasicManagerTestSuite) TestGetPeriodicExecutions() { + extras := make(query.ExtraParameters) + extras.Set(query.ExtraParamKeyNonStoppedOnly, true) + + jobs, total, err := suite.manager.GetPeriodicExecution("1000", &query.Parameter{ + PageSize: 20, + PageNumber: 1, + Extras: extras, + }) + require.NoError(suite.T(), err) + assert.Equal(suite.T(), int64(1), total) + assert.Equal(suite.T(), int64(1), int64(len(jobs))) + + t := job.NewBasicTrackerWithID(context.TODO(), "1001", suite.namespace, suite.pool, nil) + err = t.Load() + require.NoError(suite.T(), err) + err = t.PeriodicExecutionDone() + require.NoError(suite.T(), err) + + jobs, total, err = suite.manager.GetPeriodicExecution("1000", &query.Parameter{ + PageSize: 20, + PageNumber: 1, + }) + require.NoError(suite.T(), err) + assert.Equal(suite.T(), int64(1), total) + assert.Equal(suite.T(), int64(1), int64(len(jobs))) +} + +// TestGetScheduledJobs tests get scheduled jobs +func (suite *BasicManagerTestSuite) TestGetScheduledJobs() { + enqueuer := work.NewEnqueuer(suite.namespace, suite.pool) + scheduledJob, err := enqueuer.EnqueueIn(job.SampleJob, 1000, make(map[string]interface{})) + require.NoError(suite.T(), err) + stats := &job.Stats{ + Info: &job.StatsInfo{ + JobID: scheduledJob.ID, + JobName: job.SampleJob, + JobKind: job.KindScheduled, + Status: job.ScheduledStatus.String(), + RunAt: scheduledJob.RunAt, + }, + } + + t := job.NewBasicTrackerWithStats(context.TODO(), stats, suite.namespace, suite.pool, nil) + err = t.Save() + require.NoError(suite.T(), err) + + list, total, err := suite.manager.GetScheduledJobs(&query.Parameter{ + PageNumber: 1, + PageSize: 10, + }) + require.NoError(suite.T(), err) + assert.Equal(suite.T(), int64(1), total) + assert.Equal(suite.T(), int64(1), int64(len(list))) +} + +// TestGetJob tests get job +func (suite *BasicManagerTestSuite) TestGetJob() { + j, err := suite.manager.GetJob("1001") + require.NoError(suite.T(), err) + assert.Equal(suite.T(), j.Info.JobID, "1001") +} + +// TestSaveJob tests saving job +func (suite *BasicManagerTestSuite) TestSaveJob() { + newJob := &job.Stats{ + Info: &job.StatsInfo{ + JobID: "1002", + JobKind: job.KindPeriodic, + JobName: job.SampleJob, + Status: job.PendingStatus.String(), + IsUnique: false, + }, + } + + err := suite.manager.SaveJob(newJob) + require.NoError(suite.T(), err) +} diff --git a/src/jobservice/runtime/bootstrap.go b/src/jobservice/runtime/bootstrap.go index a12d5d1a9..6e722bcd8 100644 --- a/src/jobservice/runtime/bootstrap.go +++ b/src/jobservice/runtime/bootstrap.go @@ -17,6 +17,7 @@ package runtime import ( "context" "fmt" + "github.com/goharbor/harbor/src/jobservice/mgt" "os" "os/signal" "sync" @@ -86,7 +87,7 @@ func (bs *Bootstrap) LoadAndRun(ctx context.Context, cancel context.CancelFunc) var ( backendWorker worker.Interface - lcmCtl lcm.Controller + manager mgt.Manager ) if cfg.PoolConfig.Backend == config.JobServicePoolBackendRedis { // Number of workers @@ -95,6 +96,9 @@ func (bs *Bootstrap) LoadAndRun(ctx context.Context, cancel context.CancelFunc) namespace := fmt.Sprintf("{%s}", cfg.PoolConfig.RedisPoolCfg.Namespace) // Get redis connection pool redisPool := bs.getRedisPool(cfg.PoolConfig.RedisPoolCfg.RedisURL) + + // Create stats manager + manager = mgt.NewManager(ctx, namespace, redisPool) // Create hook agent, it's a singleton object hookAgent := hook.NewAgent(rootContext, namespace, redisPool) hookCallback := func(URL string, change *job.StatusChange) error { @@ -114,7 +118,7 @@ func (bs *Bootstrap) LoadAndRun(ctx context.Context, cancel context.CancelFunc) } // Create job life cycle management controller - lcmCtl = lcm.NewController(rootContext, namespace, redisPool, hookCallback) + lcmCtl := lcm.NewController(rootContext, namespace, redisPool, hookCallback) // Start the backend worker backendWorker, err = bs.loadAndRunRedisWorkerPool( @@ -145,7 +149,7 @@ func (bs *Bootstrap) LoadAndRun(ctx context.Context, cancel context.CancelFunc) } // Initialize controller - ctl := core.NewController(backendWorker, lcmCtl) + ctl := core.NewController(backendWorker, manager) // Start the API server apiServer := bs.createAPIServer(ctx, cfg, ctl) diff --git a/src/jobservice/worker/cworker/c_worker.go b/src/jobservice/worker/cworker/c_worker.go index 2ba3b8163..e9cafe0c1 100644 --- a/src/jobservice/worker/cworker/c_worker.go +++ b/src/jobservice/worker/cworker/c_worker.go @@ -20,7 +20,6 @@ import ( "time" "github.com/gocraft/work" - "github.com/goharbor/harbor/src/jobservice/common/query" "github.com/goharbor/harbor/src/jobservice/common/utils" "github.com/goharbor/harbor/src/jobservice/env" "github.com/goharbor/harbor/src/jobservice/job" @@ -362,40 +361,6 @@ func (w *basicWorker) ValidateJobParameters(jobType interface{}, params job.Para return theJ.Validate(params) } -// ScheduledJobs returns the scheduled jobs by page -func (w *basicWorker) ScheduledJobs(query *query.Parameter) ([]*job.Stats, int64, error) { - var page uint = 1 - if query != nil && query.PageNumber > 1 { - page = query.PageNumber - } - - sJobs, total, err := w.client.ScheduledJobs(page) - if err != nil { - return nil, 0, err - } - - res := make([]*job.Stats, 0) - for _, sJob := range sJobs { - jID := sJob.ID - if len(sJob.Args) > 0 { - if _, ok := sJob.Args[period.PeriodicExecutionMark]; ok { - // Periodic scheduled job - jID = fmt.Sprintf("%s@%d", sJob.ID, sJob.RunAt) - } - } - t, err := w.ctl.Track(jID) - if err != nil { - // Just log it - logger.Errorf("cworker: query scheduled jobs error: %s", err) - continue - } - - res = append(res, t.Job()) - } - - return res, total, nil -} - // RegisterJob is used to register the job to the worker. // j is the type of job func (w *basicWorker) registerJob(name string, j interface{}) (err error) { diff --git a/src/jobservice/worker/cworker/c_worker_test.go b/src/jobservice/worker/cworker/c_worker_test.go index a2f9cc298..6f0b6ae1b 100644 --- a/src/jobservice/worker/cworker/c_worker_test.go +++ b/src/jobservice/worker/cworker/c_worker_test.go @@ -156,6 +156,9 @@ func (suite *CWorkerTestSuite) TestEnqueuePeriodicJob() { params["name"] = "testing:v1" m := time.Now().Minute() + if m+2 >= 60 { + m = m - 2 + } _, err := suite.cWorker.PeriodicallyEnqueue( "fake_job", params, @@ -204,16 +207,6 @@ func (suite *CWorkerTestSuite) TestStopJob() { require.NoError(suite.T(), err, "stop job: nil error expected but got %s", err) } -// TestScheduledJobs tests get scheduled job -func (suite *CWorkerTestSuite) TestScheduledJobs() { - params := make(map[string]interface{}) - params["name"] = "testing:v1" - - _, total, err := suite.cWorker.ScheduledJobs(nil) - require.NoError(suite.T(), err, "get scheduled job: nil error expected but got %s", err) - assert.EqualValues(suite.T(), int64(2), total, "expect 1 item but got 0") -} - type fakeJob struct{} func (j *fakeJob) MaxFails() uint { diff --git a/src/jobservice/worker/interface.go b/src/jobservice/worker/interface.go index 7c63926f1..776b2a0f2 100644 --- a/src/jobservice/worker/interface.go +++ b/src/jobservice/worker/interface.go @@ -15,7 +15,6 @@ package worker import ( - "github.com/goharbor/harbor/src/jobservice/common/query" "github.com/goharbor/harbor/src/jobservice/job" ) @@ -112,16 +111,4 @@ type Interface interface { // Return: // error : error returned if meet any problems RetryJob(jobID string) error - - // Get the scheduled jobs by page - // The page number in the query will be ignored, default 20 is used. This is the limitation of backend lib. - // The total number is also returned. - // - // query *query.Parameter : query parameters - // - // Return: - // []*job.Stats : list of scheduled jobs - // int : the total number of scheduled jobs - // error : non nil error if meet any issues - ScheduledJobs(query *query.Parameter) ([]*job.Stats, int64, error) } From 2562146faf5af663bf5c670645f587c21fe42e9a Mon Sep 17 00:00:00 2001 From: Steven Zou Date: Mon, 29 Apr 2019 18:47:29 +0800 Subject: [PATCH 2/2] adjust the order of robot test case of API_DB Signed-off-by: Steven Zou --- src/jobservice/main.go | 11 +++++++++-- tests/robot-cases/Group0-BAT/API_DB.robot | 4 ++-- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/src/jobservice/main.go b/src/jobservice/main.go index 50d4923de..a5a1706f3 100644 --- a/src/jobservice/main.go +++ b/src/jobservice/main.go @@ -16,10 +16,17 @@ package main import ( "context" + "errors" "flag" "fmt" + "os" + + "github.com/goharbor/harbor/src/common" + comcfg "github.com/goharbor/harbor/src/common/config" "github.com/goharbor/harbor/src/jobservice/common/utils" "github.com/goharbor/harbor/src/jobservice/config" + "github.com/goharbor/harbor/src/jobservice/job" + "github.com/goharbor/harbor/src/jobservice/job/impl" "github.com/goharbor/harbor/src/jobservice/logger" "github.com/goharbor/harbor/src/jobservice/runtime" ) @@ -52,7 +59,7 @@ func main() { } // Set job context initializer - /*runtime.JobService.SetJobContextInitializer(func(ctx context.Context) (job.Context, error) { + runtime.JobService.SetJobContextInitializer(func(ctx context.Context) (job.Context, error) { secret := config.GetAuthSecret() if utils.IsEmptyStr(secret) { return nil, errors.New("empty auth secret") @@ -67,7 +74,7 @@ func main() { } return jobCtx, nil - })*/ + }) // Start if err := runtime.JobService.LoadAndRun(ctx, cancel); err != nil { diff --git a/tests/robot-cases/Group0-BAT/API_DB.robot b/tests/robot-cases/Group0-BAT/API_DB.robot index baf555c1a..cd657f56f 100644 --- a/tests/robot-cases/Group0-BAT/API_DB.robot +++ b/tests/robot-cases/Group0-BAT/API_DB.robot @@ -17,6 +17,8 @@ ${SERVER_API_ENDPOINT} ${SERVER_URL}/api &{SERVER_CONFIG} endpoint=${SERVER_API_ENDPOINT} verify_ssl=False *** Test Cases *** +Test Case - Garbage Collection + Harbor API Test ./tests/apitests/python/test_garbage_collection.py Test Case - Add Private Project Member and Check User Can See It Harbor API Test ./tests/apitests/python/test_add_member_to_private_project.py Test Case - Delete a Repository of a Certain Project Created by Normal User @@ -33,8 +35,6 @@ Test Case - Manage Project Member Harbor API Test ./tests/apitests/python/test_manage_project_member.py Test Case - Project Level Policy Content Trust Harbor API Test ./tests/apitests/python/test_project_level_policy_content_trust.py -Test Case - Garbage Collection - Harbor API Test ./tests/apitests/python/test_garbage_collection.py Test Case - User View Logs Harbor API Test ./tests/apitests/python/test_user_view_logs.py Test Case - Scan All Images