mirror of
https://github.com/goharbor/harbor.git
synced 2025-02-17 04:11:24 +01:00
add get jobs API to provide powerful job stats management
Signed-off-by: Steven Zou <szou@vmware.com>
This commit is contained in:
parent
ffadd13ba6
commit
f2870272ce
@ -34,7 +34,10 @@ import (
|
||||
"strconv"
|
||||
)
|
||||
|
||||
const totalHeaderKey = "Total-Count"
|
||||
const (
|
||||
totalHeaderKey = "Total-Count"
|
||||
nextCursorKey = "Next-Cursor"
|
||||
)
|
||||
|
||||
// Handler defines approaches to handle the http requests.
|
||||
type Handler interface {
|
||||
@ -56,8 +59,8 @@ type Handler interface {
|
||||
// HandleJobLogReq is used to handle the request of getting periodic executions
|
||||
HandlePeriodicExecutions(w http.ResponseWriter, req *http.Request)
|
||||
|
||||
// HandleScheduledJobs is used to handle the request of getting pending scheduled jobs
|
||||
HandleScheduledJobs(w http.ResponseWriter, req *http.Request)
|
||||
// HandleGetJobsReq is used to handle the request of getting jobs
|
||||
HandleGetJobsReq(w http.ResponseWriter, req *http.Request)
|
||||
}
|
||||
|
||||
// DefaultHandler is the default request handler which implements the Handler interface.
|
||||
@ -244,17 +247,24 @@ func (dh *DefaultHandler) HandlePeriodicExecutions(w http.ResponseWriter, req *h
|
||||
|
||||
}
|
||||
|
||||
// HandleScheduledJobs is implementation of method defined in interface 'Handler'
|
||||
func (dh *DefaultHandler) HandleScheduledJobs(w http.ResponseWriter, req *http.Request) {
|
||||
// HandleGetJobsReq is implementation of method defined in interface 'Handler'
|
||||
func (dh *DefaultHandler) HandleGetJobsReq(w http.ResponseWriter, req *http.Request) {
|
||||
// Get query parameters
|
||||
q := extractQuery(req)
|
||||
jobs, total, err := dh.controller.ScheduledJobs(q)
|
||||
jobs, total, err := dh.controller.GetJobs(q)
|
||||
if err != nil {
|
||||
dh.handleError(w, req, http.StatusInternalServerError, errs.GetScheduledJobsError(err))
|
||||
dh.handleError(w, req, http.StatusInternalServerError, errs.GetJobsError(q, err))
|
||||
return
|
||||
}
|
||||
|
||||
w.Header().Add(totalHeaderKey, fmt.Sprintf("%d", total))
|
||||
key := nextCursorKey
|
||||
if v, ok := q.Extras.Get(query.ExtraParamKeyKind); ok {
|
||||
if kind, yes := v.(string); yes && kind == job.KindScheduled {
|
||||
key = totalHeaderKey
|
||||
}
|
||||
}
|
||||
|
||||
w.Header().Add(key, fmt.Sprintf("%d", total))
|
||||
dh.handleJSONData(w, req, http.StatusOK, jobs)
|
||||
}
|
||||
|
||||
@ -321,6 +331,20 @@ func extractQuery(req *http.Request) *query.Parameter {
|
||||
}
|
||||
}
|
||||
|
||||
// Extra job kind query param
|
||||
jobKind := queries.Get(query.ParamKeyJobKind)
|
||||
if !utils.IsEmptyStr(jobKind) {
|
||||
q.Extras.Set(query.ExtraParamKeyKind, jobKind)
|
||||
}
|
||||
|
||||
// Extra query cursor
|
||||
cursorV := queries.Get(query.ParamKeyCursor)
|
||||
if !utils.IsEmptyStr(cursorV) {
|
||||
if cursor, err := strconv.ParseInt(cursorV, 10, 32); err == nil {
|
||||
q.Extras.Set(query.ExtraParamKeyCursor, cursor)
|
||||
}
|
||||
}
|
||||
|
||||
return q
|
||||
}
|
||||
|
||||
|
@ -277,7 +277,7 @@ func (suite *APIHandlerTestSuite) TestGetPeriodicExecutionsWithQuery() {
|
||||
}
|
||||
|
||||
// TestScheduledJobs ...
|
||||
func (suite *APIHandlerTestSuite) TestScheduledJobs() {
|
||||
func (suite *APIHandlerTestSuite) TestGetJobs() {
|
||||
q := &query.Parameter{
|
||||
PageNumber: 2,
|
||||
PageSize: 50,
|
||||
@ -285,11 +285,20 @@ func (suite *APIHandlerTestSuite) TestScheduledJobs() {
|
||||
}
|
||||
|
||||
fc := &fakeController{}
|
||||
fc.On("ScheduledJobs", q).
|
||||
Return([]*job.Stats{createJobStats("sample", "Generic", "")}, int64(1), nil)
|
||||
fc.On("GetJobs", q).
|
||||
Return([]*job.Stats{createJobStats("sample", job.KindGeneric, "")}, int64(1), nil)
|
||||
suite.controller = fc
|
||||
|
||||
_, code := suite.getReq(fmt.Sprintf("%s/%s", suite.APIAddr, "jobs/scheduled?page_number=2&page_size=50"))
|
||||
_, code := suite.getReq(fmt.Sprintf("%s/%s", suite.APIAddr, "jobs?page_number=2&page_size=50"))
|
||||
assert.Equal(suite.T(), 200, code, "expected 200 ok but got %d", code)
|
||||
|
||||
q.Extras.Set(query.ExtraParamKeyKind, job.KindScheduled)
|
||||
fc = &fakeController{}
|
||||
fc.On("GetJobs", q).
|
||||
Return([]*job.Stats{createJobStats("sample", job.KindScheduled, "")}, int64(1), nil)
|
||||
suite.controller = fc
|
||||
|
||||
_, code = suite.getReq(fmt.Sprintf("%s/%s", suite.APIAddr, "jobs?page_number=2&page_size=50&kind=Scheduled"))
|
||||
assert.Equal(suite.T(), 200, code, "expected 200 ok but got %d", code)
|
||||
}
|
||||
|
||||
@ -397,8 +406,8 @@ func (suite *APIHandlerTestSuite) GetPeriodicExecutions(periodicJobID string, qu
|
||||
return suite.controller.GetPeriodicExecutions(periodicJobID, query)
|
||||
}
|
||||
|
||||
func (suite *APIHandlerTestSuite) ScheduledJobs(query *query.Parameter) ([]*job.Stats, int64, error) {
|
||||
return suite.controller.ScheduledJobs(query)
|
||||
func (suite *APIHandlerTestSuite) GetJobs(query *query.Parameter) ([]*job.Stats, int64, error) {
|
||||
return suite.controller.GetJobs(query)
|
||||
}
|
||||
|
||||
type fakeController struct {
|
||||
@ -460,7 +469,7 @@ func (fc *fakeController) GetPeriodicExecutions(periodicJobID string, query *que
|
||||
return args.Get(0).([]*job.Stats), args.Get(1).(int64), nil
|
||||
}
|
||||
|
||||
func (fc *fakeController) ScheduledJobs(query *query.Parameter) ([]*job.Stats, int64, error) {
|
||||
func (fc *fakeController) GetJobs(query *query.Parameter) ([]*job.Stats, int64, error) {
|
||||
args := fc.Called(query)
|
||||
if args.Error(2) != nil {
|
||||
return nil, args.Get(1).(int64), args.Error(2)
|
||||
|
@ -88,7 +88,7 @@ func (br *BaseRouter) registerRoutes() {
|
||||
subRouter := br.router.PathPrefix(fmt.Sprintf("%s/%s", baseRoute, apiVersion)).Subrouter()
|
||||
|
||||
subRouter.HandleFunc("/jobs", br.handler.HandleLaunchJobReq).Methods(http.MethodPost)
|
||||
subRouter.HandleFunc("/jobs/scheduled", br.handler.HandleScheduledJobs).Methods(http.MethodGet)
|
||||
subRouter.HandleFunc("/jobs", br.handler.HandleGetJobsReq).Methods(http.MethodGet)
|
||||
subRouter.HandleFunc("/jobs/{job_id}", br.handler.HandleGetJobReq).Methods(http.MethodGet)
|
||||
subRouter.HandleFunc("/jobs/{job_id}", br.handler.HandleJobActionReq).Methods(http.MethodPost)
|
||||
subRouter.HandleFunc("/jobs/{job_id}/log", br.handler.HandleJobLogReq).Methods(http.MethodGet)
|
||||
|
@ -14,6 +14,8 @@
|
||||
|
||||
package query
|
||||
|
||||
import "encoding/json"
|
||||
|
||||
const (
|
||||
// DefaultPageSize defines the default page size
|
||||
DefaultPageSize uint = 25
|
||||
@ -23,8 +25,16 @@ const (
|
||||
ParamKeyPageSize = "page_size"
|
||||
// ParamKeyNonStoppedOnly defines query param key of querying non stopped periodic executions
|
||||
ParamKeyNonStoppedOnly = "non_dead_only"
|
||||
// ParamKeyCursor defines query param of cursor for fetching job stats with batches
|
||||
ParamKeyCursor = "cursor"
|
||||
// ParamKeyJobKind defines query param of job kind
|
||||
ParamKeyJobKind = "kind"
|
||||
// ExtraParamKeyNonStoppedOnly defines extra parameter key for querying non stopped periodic executions
|
||||
ExtraParamKeyNonStoppedOnly = "NonDeadOnly"
|
||||
// ExtraParamKeyCursor defines extra parameter key for the cursor of fetching job stats with batches
|
||||
ExtraParamKeyCursor = "Cursor"
|
||||
// ExtraParamKeyKind defines extra parameter key for the job kind
|
||||
ExtraParamKeyKind = "Kind"
|
||||
)
|
||||
|
||||
// ExtraParameters to keep non pagination query parameters
|
||||
@ -44,6 +54,16 @@ func (ep ExtraParameters) Get(key string) (interface{}, bool) {
|
||||
return v, ok
|
||||
}
|
||||
|
||||
// String returns the json string of ExtraParameters
|
||||
func (ep ExtraParameters) String() string {
|
||||
bytes, err := json.Marshal(&ep)
|
||||
if err != nil {
|
||||
return ""
|
||||
}
|
||||
|
||||
return string(bytes)
|
||||
}
|
||||
|
||||
// Parameter for getting executions
|
||||
type Parameter struct {
|
||||
PageNumber uint
|
||||
|
@ -1,6 +1,8 @@
|
||||
package query
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"github.com/stretchr/testify/require"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
@ -26,4 +28,13 @@ func (suite *QueryTestSuite) TestExtraParams() {
|
||||
|
||||
assert.Equal(suite.T(), true, ok)
|
||||
assert.Equal(suite.T(), 100, v.(int))
|
||||
|
||||
str := extras.String()
|
||||
copy := make(ExtraParameters)
|
||||
err := json.Unmarshal([]byte(str), ©)
|
||||
require.NoError(suite.T(), err)
|
||||
|
||||
v, ok = copy.Get("a")
|
||||
assert.Equal(suite.T(), true, ok)
|
||||
assert.Equal(suite.T(), 100, int(v.(float64)))
|
||||
}
|
||||
|
@ -37,8 +37,6 @@ func RedisKeyLastPeriodicEnqueue(namespace string) string {
|
||||
return RedisNamespacePrefix(namespace) + "last_periodic_enqueue"
|
||||
}
|
||||
|
||||
// ----------------------------------------------------------
|
||||
|
||||
// KeyNamespacePrefix returns the based key based on the namespace.
|
||||
func KeyNamespacePrefix(namespace string) string {
|
||||
ns := strings.TrimSpace(namespace)
|
||||
|
@ -16,6 +16,7 @@ package core
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/goharbor/harbor/src/jobservice/mgt"
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"github.com/goharbor/harbor/src/jobservice/logger"
|
||||
@ -24,7 +25,6 @@ import (
|
||||
"github.com/goharbor/harbor/src/jobservice/common/utils"
|
||||
"github.com/goharbor/harbor/src/jobservice/errs"
|
||||
"github.com/goharbor/harbor/src/jobservice/job"
|
||||
"github.com/goharbor/harbor/src/jobservice/lcm"
|
||||
"github.com/goharbor/harbor/src/jobservice/worker"
|
||||
"github.com/robfig/cron"
|
||||
)
|
||||
@ -34,15 +34,15 @@ import (
|
||||
type basicController struct {
|
||||
// Refer the backend worker
|
||||
backendWorker worker.Interface
|
||||
// Refer the job life cycle management controller
|
||||
ctl lcm.Controller
|
||||
// Refer the job stats manager
|
||||
manager mgt.Manager
|
||||
}
|
||||
|
||||
// NewController is constructor of basicController.
|
||||
func NewController(backendWorker worker.Interface, ctl lcm.Controller) Interface {
|
||||
func NewController(backendWorker worker.Interface, mgr mgt.Manager) Interface {
|
||||
return &basicController{
|
||||
backendWorker: backendWorker,
|
||||
ctl: ctl,
|
||||
manager: mgr,
|
||||
}
|
||||
}
|
||||
|
||||
@ -92,7 +92,7 @@ func (bc *basicController) LaunchJob(req *job.Request) (res *job.Stats, err erro
|
||||
|
||||
// Save job stats
|
||||
if err == nil {
|
||||
if _, err := bc.ctl.New(res); err != nil {
|
||||
if err := bc.manager.SaveJob(res); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
@ -106,12 +106,7 @@ func (bc *basicController) GetJob(jobID string) (*job.Stats, error) {
|
||||
return nil, errs.BadRequestError(errors.New("empty job ID"))
|
||||
}
|
||||
|
||||
t, err := bc.ctl.Track(jobID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return t.Job(), nil
|
||||
return bc.manager.GetJob(jobID)
|
||||
}
|
||||
|
||||
// StopJob is implementation of same method in core interface.
|
||||
@ -157,32 +152,25 @@ func (bc *basicController) GetPeriodicExecutions(periodicJobID string, query *qu
|
||||
return nil, 0, errs.BadRequestError(errors.New("nil periodic job ID"))
|
||||
}
|
||||
|
||||
t, err := bc.ctl.Track(periodicJobID)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
eIDs, total, err := t.Executions(query)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
res := make([]*job.Stats, 0)
|
||||
for _, eID := range eIDs {
|
||||
et, err := bc.ctl.Track(eID)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
res = append(res, et.Job())
|
||||
}
|
||||
|
||||
return res, total, nil
|
||||
return bc.manager.GetPeriodicExecution(periodicJobID, query)
|
||||
}
|
||||
|
||||
// ScheduledJobs returns the scheduled jobs by page
|
||||
func (bc *basicController) ScheduledJobs(query *query.Parameter) ([]*job.Stats, int64, error) {
|
||||
return bc.backendWorker.ScheduledJobs(query)
|
||||
// GetJobs returns the jobs by specified
|
||||
func (bc *basicController) GetJobs(q *query.Parameter) ([]*job.Stats, int64, error) {
|
||||
onlyScheduledJobs := false
|
||||
if q != nil && q.Extras != nil {
|
||||
if v, ok := q.Extras.Get(query.ExtraParamKeyKind); ok {
|
||||
if job.KindScheduled == v.(string) {
|
||||
onlyScheduledJobs = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if onlyScheduledJobs {
|
||||
return bc.manager.GetScheduledJobs(q)
|
||||
}
|
||||
|
||||
return bc.manager.GetJobs(q)
|
||||
}
|
||||
|
||||
func validJobReq(req *job.Request) error {
|
||||
|
@ -14,28 +14,25 @@
|
||||
package core
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/goharbor/harbor/src/jobservice/common/query"
|
||||
"github.com/goharbor/harbor/src/jobservice/common/utils"
|
||||
"github.com/goharbor/harbor/src/jobservice/job"
|
||||
"github.com/goharbor/harbor/src/jobservice/job/impl/sample"
|
||||
"github.com/goharbor/harbor/src/jobservice/tests"
|
||||
"github.com/goharbor/harbor/src/jobservice/worker"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/stretchr/testify/suite"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// ControllerTestSuite tests functions of core controller
|
||||
type ControllerTestSuite struct {
|
||||
suite.Suite
|
||||
|
||||
lcmCtl *fakeLcmController
|
||||
worker *fakeWorker
|
||||
ctl Interface
|
||||
manager *fakeManager
|
||||
worker *fakeWorker
|
||||
ctl Interface
|
||||
|
||||
res *job.Stats
|
||||
jobID string
|
||||
@ -60,14 +57,17 @@ func (suite *ControllerTestSuite) SetupSuite() {
|
||||
// Prepare for each test case
|
||||
func (suite *ControllerTestSuite) SetupTest() {
|
||||
suite.worker = &fakeWorker{}
|
||||
suite.lcmCtl = &fakeLcmController{}
|
||||
|
||||
suite.lcmCtl.On("Track", suite.jobID).Return(job.NewBasicTrackerWithStats(nil, suite.res, "ns", nil, nil), nil)
|
||||
suite.lcmCtl.On("New", suite.res).Return(job.NewBasicTrackerWithStats(nil, suite.res, "ns", nil, nil), nil)
|
||||
|
||||
suite.worker.On("IsKnownJob", job.SampleJob).Return((*sample.Job)(nil), true)
|
||||
suite.worker.On("IsKnownJob", "fake").Return(nil, false)
|
||||
suite.worker.On("ValidateJobParameters", (*sample.Job)(nil), suite.params).Return(nil)
|
||||
|
||||
fakeMgr := &fakeManager{}
|
||||
fakeMgr.On("SaveJob", suite.res).Return(nil)
|
||||
fakeMgr.On("GetJob", suite.jobID).Return(suite.res, nil)
|
||||
|
||||
suite.manager = fakeMgr
|
||||
|
||||
}
|
||||
|
||||
// TestControllerTestSuite is suite entry for 'go test'
|
||||
@ -142,20 +142,6 @@ func (suite *ControllerTestSuite) TestCheckStatus() {
|
||||
assert.Equal(suite.T(), "running", st.Pools[0].Status, "expected running pool but got %s", st.Pools[0].Status)
|
||||
}
|
||||
|
||||
// TestScheduledJobs ...
|
||||
func (suite *ControllerTestSuite) TestScheduledJobs() {
|
||||
q := &query.Parameter{
|
||||
PageSize: 20,
|
||||
PageNumber: 1,
|
||||
}
|
||||
|
||||
suite.worker.On("ScheduledJobs", q).Return([]*job.Stats{suite.res}, 1, nil)
|
||||
|
||||
_, total, err := suite.ctl.ScheduledJobs(q)
|
||||
require.Nil(suite.T(), err, "scheduled jobs: nil error expected but got %s", err)
|
||||
assert.Equal(suite.T(), int64(1), total, "expected 1 item but got 0")
|
||||
}
|
||||
|
||||
// TestInvalidChecks ...
|
||||
func (suite *ControllerTestSuite) TestInvalidChecks() {
|
||||
req := createJobReq("kind")
|
||||
@ -180,57 +166,42 @@ func (suite *ControllerTestSuite) TestInvalidChecks() {
|
||||
assert.NotNil(suite.T(), err, "invalid job name: error expected but got nil")
|
||||
}
|
||||
|
||||
// TestScheduledJobs ...
|
||||
func (suite *ControllerTestSuite) TestGetScheduledJobs() {
|
||||
extras := make(query.ExtraParameters)
|
||||
extras.Set(query.ExtraParamKeyKind, job.KindScheduled)
|
||||
q := &query.Parameter{
|
||||
PageSize: 20,
|
||||
PageNumber: 1,
|
||||
Extras: extras,
|
||||
}
|
||||
|
||||
fakeMgr := &fakeManager{}
|
||||
fakeMgr.On("SaveJob", suite.res).Return(nil)
|
||||
fakeMgr.On("GetJob", suite.jobID).Return(suite.res, nil)
|
||||
fakeMgr.On("GetScheduledJobs", q).Return([]*job.Stats{suite.res}, int64(1), nil)
|
||||
suite.manager = fakeMgr
|
||||
|
||||
_, total, err := suite.ctl.GetJobs(q)
|
||||
require.Nil(suite.T(), err, "scheduled jobs: nil error expected but got %s", err)
|
||||
assert.Equal(suite.T(), int64(1), total, "expected 1 item but got 0")
|
||||
}
|
||||
|
||||
// TestGetPeriodicExecutions tests GetPeriodicExecutions
|
||||
func (suite *ControllerTestSuite) TestGetPeriodicExecutions() {
|
||||
pool := tests.GiveMeRedisPool()
|
||||
namespace := tests.GiveMeTestNamespace()
|
||||
|
||||
jobID := utils.MakeIdentifier()
|
||||
nID := time.Now().Unix()
|
||||
mockJobStats := &job.Stats{
|
||||
Info: &job.StatsInfo{
|
||||
JobID: jobID,
|
||||
Status: job.ScheduledStatus.String(),
|
||||
JobKind: job.KindPeriodic,
|
||||
JobName: job.SampleJob,
|
||||
IsUnique: false,
|
||||
CronSpec: "0 0 * * * *",
|
||||
NumericPID: nID,
|
||||
},
|
||||
}
|
||||
|
||||
t := job.NewBasicTrackerWithStats(context.TODO(), mockJobStats, namespace, pool, nil)
|
||||
err := t.Save()
|
||||
require.NoError(suite.T(), err)
|
||||
|
||||
executionID := utils.MakeIdentifier()
|
||||
runAt := time.Now().Add(1 * time.Hour).Unix()
|
||||
executionStats := &job.Stats{
|
||||
Info: &job.StatsInfo{
|
||||
JobID: executionID,
|
||||
Status: job.ScheduledStatus.String(),
|
||||
JobKind: job.KindScheduled,
|
||||
JobName: job.SampleJob,
|
||||
IsUnique: false,
|
||||
CronSpec: "0 0 * * * *",
|
||||
RunAt: runAt,
|
||||
EnqueueTime: runAt,
|
||||
UpstreamJobID: jobID,
|
||||
},
|
||||
}
|
||||
|
||||
t2 := job.NewBasicTrackerWithStats(context.TODO(), executionStats, namespace, pool, nil)
|
||||
err = t2.Save()
|
||||
require.NoError(suite.T(), err)
|
||||
|
||||
suite.lcmCtl.On("Track", jobID).Return(t, nil)
|
||||
suite.lcmCtl.On("Track", executionID).Return(t2, nil)
|
||||
|
||||
_, total, err := suite.ctl.GetPeriodicExecutions(jobID, &query.Parameter{
|
||||
q := &query.Parameter{
|
||||
PageSize: 10,
|
||||
PageNumber: 1,
|
||||
Extras: make(query.ExtraParameters),
|
||||
})
|
||||
}
|
||||
|
||||
fakeMgr := &fakeManager{}
|
||||
fakeMgr.On("SaveJob", suite.res).Return(nil)
|
||||
fakeMgr.On("GetJob", suite.jobID).Return(suite.res, nil)
|
||||
fakeMgr.On("GetPeriodicExecution", "1000", q).Return([]*job.Stats{suite.res}, int64(1), nil)
|
||||
suite.manager = fakeMgr
|
||||
|
||||
_, total, err := suite.ctl.GetPeriodicExecutions("1000", q)
|
||||
require.NoError(suite.T(), err)
|
||||
assert.Equal(suite.T(), int64(1), total)
|
||||
}
|
||||
@ -253,19 +224,6 @@ func createJobReq(kind string) *job.Request {
|
||||
}
|
||||
}
|
||||
|
||||
// Implement lcm controller interface
|
||||
func (suite *ControllerTestSuite) Serve() error {
|
||||
return suite.lcmCtl.Serve()
|
||||
}
|
||||
|
||||
func (suite *ControllerTestSuite) New(stats *job.Stats) (job.Tracker, error) {
|
||||
return suite.lcmCtl.New(stats)
|
||||
}
|
||||
|
||||
func (suite *ControllerTestSuite) Track(jobID string) (job.Tracker, error) {
|
||||
return suite.lcmCtl.Track(jobID)
|
||||
}
|
||||
|
||||
// Implement worker interface
|
||||
func (suite *ControllerTestSuite) Start() error {
|
||||
return suite.worker.Start()
|
||||
@ -307,37 +265,28 @@ func (suite *ControllerTestSuite) RetryJob(jobID string) error {
|
||||
return suite.worker.RetryJob(jobID)
|
||||
}
|
||||
|
||||
func (suite *ControllerTestSuite) ScheduledJobs(query *query.Parameter) ([]*job.Stats, int64, error) {
|
||||
return suite.worker.ScheduledJobs(query)
|
||||
// Implement manager interface
|
||||
func (suite *ControllerTestSuite) GetJobs(q *query.Parameter) ([]*job.Stats, int64, error) {
|
||||
return suite.manager.GetJobs(q)
|
||||
}
|
||||
|
||||
// Implement fake objects with mock
|
||||
type fakeLcmController struct {
|
||||
mock.Mock
|
||||
func (suite *ControllerTestSuite) GetPeriodicExecution(pID string, q *query.Parameter) ([]*job.Stats, int64, error) {
|
||||
return suite.manager.GetPeriodicExecution(pID, q)
|
||||
}
|
||||
|
||||
func (flc *fakeLcmController) Serve() error {
|
||||
return flc.Called().Error(0)
|
||||
func (suite *ControllerTestSuite) GetScheduledJobs(q *query.Parameter) ([]*job.Stats, int64, error) {
|
||||
return suite.manager.GetScheduledJobs(q)
|
||||
}
|
||||
|
||||
func (flc *fakeLcmController) New(stats *job.Stats) (job.Tracker, error) {
|
||||
args := flc.Called(stats)
|
||||
if args.Error(1) != nil {
|
||||
return nil, args.Error(1)
|
||||
}
|
||||
|
||||
return args.Get(0).(job.Tracker), nil
|
||||
func (suite *ControllerTestSuite) GetJob(jobID string) (*job.Stats, error) {
|
||||
return suite.manager.GetJob(jobID)
|
||||
}
|
||||
|
||||
func (flc *fakeLcmController) Track(jobID string) (job.Tracker, error) {
|
||||
args := flc.Called(jobID)
|
||||
if args.Error(1) != nil {
|
||||
return nil, args.Error(1)
|
||||
}
|
||||
|
||||
return args.Get(0).(job.Tracker), nil
|
||||
func (suite *ControllerTestSuite) SaveJob(j *job.Stats) error {
|
||||
return suite.manager.SaveJob(j)
|
||||
}
|
||||
|
||||
// fake worker
|
||||
type fakeWorker struct {
|
||||
mock.Mock
|
||||
}
|
||||
@ -407,11 +356,48 @@ func (f *fakeWorker) RetryJob(jobID string) error {
|
||||
return f.Called(jobID).Error(0)
|
||||
}
|
||||
|
||||
func (f *fakeWorker) ScheduledJobs(query *query.Parameter) ([]*job.Stats, int64, error) {
|
||||
args := f.Called(query)
|
||||
// fake manager
|
||||
type fakeManager struct {
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
func (fm *fakeManager) GetJobs(q *query.Parameter) ([]*job.Stats, int64, error) {
|
||||
args := fm.Called(q)
|
||||
if args.Error(2) != nil {
|
||||
return nil, 0, args.Error(2)
|
||||
}
|
||||
|
||||
return args.Get(0).([]*job.Stats), int64(args.Int(1)), nil
|
||||
return args.Get(0).([]*job.Stats), args.Get(1).(int64), nil
|
||||
}
|
||||
|
||||
func (fm *fakeManager) GetPeriodicExecution(pID string, q *query.Parameter) ([]*job.Stats, int64, error) {
|
||||
args := fm.Called(pID, q)
|
||||
if args.Error(2) != nil {
|
||||
return nil, 0, args.Error(2)
|
||||
}
|
||||
|
||||
return args.Get(0).([]*job.Stats), args.Get(1).(int64), nil
|
||||
}
|
||||
|
||||
func (fm *fakeManager) GetScheduledJobs(q *query.Parameter) ([]*job.Stats, int64, error) {
|
||||
args := fm.Called(q)
|
||||
if args.Error(2) != nil {
|
||||
return nil, 0, args.Error(2)
|
||||
}
|
||||
|
||||
return args.Get(0).([]*job.Stats), args.Get(1).(int64), nil
|
||||
}
|
||||
|
||||
func (fm *fakeManager) GetJob(jobID string) (*job.Stats, error) {
|
||||
args := fm.Called(jobID)
|
||||
if args.Error(1) != nil {
|
||||
return nil, args.Error(1)
|
||||
}
|
||||
|
||||
return args.Get(0).(*job.Stats), nil
|
||||
}
|
||||
|
||||
func (fm *fakeManager) SaveJob(j *job.Stats) error {
|
||||
args := fm.Called(j)
|
||||
return args.Error(0)
|
||||
}
|
||||
|
@ -68,8 +68,11 @@ type Interface interface {
|
||||
// The total number is also returned.
|
||||
GetPeriodicExecutions(periodicJobID string, query *query.Parameter) ([]*job.Stats, int64, error)
|
||||
|
||||
// Get the scheduled jobs by page
|
||||
// The page number in the query will be ignored, default 20 is used. This is the limitation of backend lib.
|
||||
// Get the jobs with queries.
|
||||
//
|
||||
// For scheduled jobs, the page number in the query will be ignored, default 20 is used.
|
||||
// This is the limitation of backend lib. The int64 is total number.
|
||||
// For other cases, query the jobs with cursor, not standard pagination. The int64 is next cursor.
|
||||
// The total number is also returned.
|
||||
ScheduledJobs(query *query.Parameter) ([]*job.Stats, int64, error)
|
||||
GetJobs(query *query.Parameter) ([]*job.Stats, int64, error)
|
||||
}
|
||||
|
@ -18,6 +18,7 @@ package errs
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/goharbor/harbor/src/jobservice/common/query"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -49,8 +50,8 @@ const (
|
||||
ResourceConflictsErrorCode
|
||||
// BadRequestErrorCode is code for the error of bad request
|
||||
BadRequestErrorCode
|
||||
// GetScheduledJobsErrorCode is code for the error of getting scheduled jobs
|
||||
GetScheduledJobsErrorCode
|
||||
// GetJobsErrorCode is code for the error of getting scheduled jobs
|
||||
GetJobsErrorCode
|
||||
// GetPeriodicExecutionErrorCode is code for the error of getting periodic executions
|
||||
GetPeriodicExecutionErrorCode
|
||||
// StatusMismatchErrorCode is code for the error of mismatching status
|
||||
@ -137,9 +138,13 @@ func UnauthorizedError(err error) error {
|
||||
return New(UnAuthorizedErrorCode, "unauthorized", err.Error())
|
||||
}
|
||||
|
||||
// GetScheduledJobsError is error for the case of getting scheduled jobs failed
|
||||
func GetScheduledJobsError(err error) error {
|
||||
return New(GetScheduledJobsErrorCode, "failed to get scheduled jobs", err.Error())
|
||||
// GetJobsError is error for the case of getting jobs failed
|
||||
func GetJobsError(q *query.Parameter, err error) error {
|
||||
qStr := ""
|
||||
if q != nil {
|
||||
qStr = q.Extras.String()
|
||||
}
|
||||
return New(GetJobsErrorCode, fmt.Sprintf("failed to get scheduled jobs, q=%s", qStr), err.Error())
|
||||
}
|
||||
|
||||
// GetPeriodicExecutionError is error for the case of getting periodic jobs failed
|
||||
|
@ -17,7 +17,6 @@ package job
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"github.com/goharbor/harbor/src/jobservice/common/query"
|
||||
"github.com/goharbor/harbor/src/jobservice/common/rds"
|
||||
"github.com/goharbor/harbor/src/jobservice/common/utils"
|
||||
"github.com/goharbor/harbor/src/jobservice/errs"
|
||||
@ -66,15 +65,6 @@ type Tracker interface {
|
||||
// error if update failed
|
||||
Update(fieldAndValues ...interface{}) error
|
||||
|
||||
// Executions returns the executions of the job tracked by this tracker.
|
||||
// Please pay attention, this only for periodic job.
|
||||
//
|
||||
// Returns:
|
||||
// job execution IDs matched the query
|
||||
// the total number
|
||||
// error if any issues happened
|
||||
Executions(q *query.Parameter) ([]string, int64, error)
|
||||
|
||||
// NumericID returns the numeric ID of periodic job.
|
||||
// Please pay attention, this only for periodic job.
|
||||
NumericID() (int64, error)
|
||||
@ -241,65 +231,6 @@ func (bt *basicTracker) CheckIn(message string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// Executions of the tracked job
|
||||
func (bt *basicTracker) Executions(q *query.Parameter) ([]string, int64, error) {
|
||||
if bt.jobStats.Info.JobKind != KindPeriodic {
|
||||
return nil, 0, errors.New("only periodic job has executions")
|
||||
}
|
||||
|
||||
conn := bt.pool.Get()
|
||||
defer func() {
|
||||
_ = conn.Close()
|
||||
}()
|
||||
|
||||
key := rds.KeyUpstreamJobAndExecutions(bt.namespace, bt.jobID)
|
||||
|
||||
// Query executions by "non stopped"
|
||||
if nonStoppedOnly, ok := q.Extras.Get(query.ExtraParamKeyNonStoppedOnly); ok {
|
||||
if v, yes := nonStoppedOnly.(bool); yes && v {
|
||||
return queryExecutions(conn, key, q)
|
||||
}
|
||||
}
|
||||
|
||||
// Pagination
|
||||
var pageNumber, pageSize uint = 1, query.DefaultPageSize
|
||||
if q != nil {
|
||||
if q.PageNumber > 0 {
|
||||
pageNumber = q.PageNumber
|
||||
}
|
||||
if q.PageSize > 0 {
|
||||
pageSize = q.PageSize
|
||||
}
|
||||
}
|
||||
|
||||
// Get total first
|
||||
total, err := redis.Int64(conn.Do("ZCARD", key))
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
// No items
|
||||
result := make([]string, 0)
|
||||
if total == 0 || (int64)((pageNumber-1)*pageSize) >= total {
|
||||
return result, total, nil
|
||||
}
|
||||
|
||||
min, max := (pageNumber-1)*pageSize, pageNumber*pageSize-1
|
||||
args := []interface{}{key, min, max}
|
||||
list, err := redis.Values(conn.Do("ZREVRANGE", args...))
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
for _, item := range list {
|
||||
if eID, ok := item.([]byte); ok {
|
||||
result = append(result, string(eID))
|
||||
}
|
||||
}
|
||||
|
||||
return result, total, nil
|
||||
}
|
||||
|
||||
// Expire job stats
|
||||
func (bt *basicTracker) Expire() error {
|
||||
conn := bt.pool.Get()
|
||||
@ -709,40 +640,3 @@ func getStatus(conn redis.Conn, key string) (Status, error) {
|
||||
func setStatus(conn redis.Conn, key string, status Status) error {
|
||||
return rds.HmSet(conn, key, "status", status.String(), "update_time", time.Now().Unix())
|
||||
}
|
||||
|
||||
// queryExecutions queries periodic executions by status
|
||||
func queryExecutions(conn redis.Conn, dataKey string, q *query.Parameter) ([]string, int64, error) {
|
||||
total, err := redis.Int64(conn.Do("ZCOUNT", dataKey, 0, "+inf"))
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
var pageNumber, pageSize uint = 1, query.DefaultPageSize
|
||||
if q.PageNumber > 0 {
|
||||
pageNumber = q.PageNumber
|
||||
}
|
||||
if q.PageSize > 0 {
|
||||
pageSize = q.PageSize
|
||||
}
|
||||
|
||||
results := make([]string, 0)
|
||||
if total == 0 || (int64)((pageNumber-1)*pageSize) >= total {
|
||||
return results, total, nil
|
||||
}
|
||||
|
||||
offset := (pageNumber - 1) * pageSize
|
||||
args := []interface{}{dataKey, "+inf", 0, "LIMIT", offset, pageSize}
|
||||
|
||||
eIDs, err := redis.Values(conn.Do("ZREVRANGEBYSCORE", args...))
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
for _, eID := range eIDs {
|
||||
if eIDBytes, ok := eID.([]byte); ok {
|
||||
results = append(results, string(eIDBytes))
|
||||
}
|
||||
}
|
||||
|
||||
return results, total, nil
|
||||
}
|
||||
|
@ -19,8 +19,6 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/goharbor/harbor/src/jobservice/common/query"
|
||||
|
||||
"github.com/goharbor/harbor/src/jobservice/common/utils"
|
||||
"github.com/goharbor/harbor/src/jobservice/tests"
|
||||
"github.com/gomodule/redigo/redis"
|
||||
@ -176,14 +174,6 @@ func (suite *TrackerTestSuite) TestPeriodicTracker() {
|
||||
require.NoError(suite.T(), err)
|
||||
assert.Equal(suite.T(), nID, id)
|
||||
|
||||
_, total, err := t.Executions(&query.Parameter{
|
||||
PageNumber: 1,
|
||||
PageSize: 10,
|
||||
Extras: make(query.ExtraParameters),
|
||||
})
|
||||
require.NoError(suite.T(), err)
|
||||
assert.Equal(suite.T(), int64(1), total)
|
||||
|
||||
err = t2.PeriodicExecutionDone()
|
||||
require.NoError(suite.T(), err)
|
||||
}
|
||||
|
@ -18,13 +18,6 @@ import (
|
||||
"context"
|
||||
"flag"
|
||||
"fmt"
|
||||
"github.com/goharbor/harbor/src/common"
|
||||
comcfg "github.com/goharbor/harbor/src/common/config"
|
||||
"github.com/goharbor/harbor/src/jobservice/job"
|
||||
"github.com/goharbor/harbor/src/jobservice/job/impl"
|
||||
"github.com/pkg/errors"
|
||||
"os"
|
||||
|
||||
"github.com/goharbor/harbor/src/jobservice/common/utils"
|
||||
"github.com/goharbor/harbor/src/jobservice/config"
|
||||
"github.com/goharbor/harbor/src/jobservice/logger"
|
||||
@ -59,7 +52,7 @@ func main() {
|
||||
}
|
||||
|
||||
// Set job context initializer
|
||||
runtime.JobService.SetJobContextInitializer(func(ctx context.Context) (job.Context, error) {
|
||||
/*runtime.JobService.SetJobContextInitializer(func(ctx context.Context) (job.Context, error) {
|
||||
secret := config.GetAuthSecret()
|
||||
if utils.IsEmptyStr(secret) {
|
||||
return nil, errors.New("empty auth secret")
|
||||
@ -74,7 +67,7 @@ func main() {
|
||||
}
|
||||
|
||||
return jobCtx, nil
|
||||
})
|
||||
})*/
|
||||
|
||||
// Start
|
||||
if err := runtime.JobService.LoadAndRun(ctx, cancel); err != nil {
|
||||
|
349
src/jobservice/mgt/manager.go
Normal file
349
src/jobservice/mgt/manager.go
Normal file
@ -0,0 +1,349 @@
|
||||
// Copyright Project Harbor Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package mgt
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/gocraft/work"
|
||||
"github.com/goharbor/harbor/src/jobservice/common/query"
|
||||
"github.com/goharbor/harbor/src/jobservice/common/rds"
|
||||
"github.com/goharbor/harbor/src/jobservice/common/utils"
|
||||
"github.com/goharbor/harbor/src/jobservice/errs"
|
||||
"github.com/goharbor/harbor/src/jobservice/job"
|
||||
"github.com/goharbor/harbor/src/jobservice/logger"
|
||||
"github.com/goharbor/harbor/src/jobservice/period"
|
||||
"github.com/gomodule/redigo/redis"
|
||||
"github.com/pkg/errors"
|
||||
"strconv"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// Manager defies the related operations to handle the management of job stats.
|
||||
type Manager interface {
|
||||
// Get the stats data of all kinds of jobs.
|
||||
// Data returned by pagination.
|
||||
//
|
||||
// Arguments:
|
||||
// q *query.Parameter : the query parameters
|
||||
//
|
||||
// Returns:
|
||||
// The matched job stats list
|
||||
// The total number of the jobs
|
||||
// Non nil error if any issues meet
|
||||
GetJobs(q *query.Parameter) ([]*job.Stats, int64, error)
|
||||
|
||||
// Get the executions of the specified periodic job by pagination
|
||||
//
|
||||
// Arguments:
|
||||
// pID: ID of the periodic job
|
||||
// q *query.Parameter: query parameters
|
||||
//
|
||||
// Returns:
|
||||
// The matched job stats list,
|
||||
// The total number of the executions,
|
||||
// Non nil error if any issues meet.
|
||||
GetPeriodicExecution(pID string, q *query.Parameter) ([]*job.Stats, int64, error)
|
||||
|
||||
// Get the scheduled jobs
|
||||
//
|
||||
// Arguments:
|
||||
// q *query.Parameter: query parameters
|
||||
//
|
||||
// Returns:
|
||||
// The matched job stats list,
|
||||
// The total number of the executions,
|
||||
// Non nil error if any issues meet.
|
||||
GetScheduledJobs(q *query.Parameter) ([]*job.Stats, int64, error)
|
||||
|
||||
// Get the stats of the specified job
|
||||
//
|
||||
// Arguments:
|
||||
// jobID string: ID of the job
|
||||
//
|
||||
// Returns:
|
||||
// The job stats
|
||||
// Non nil error if any issues meet
|
||||
GetJob(jobID string) (*job.Stats, error)
|
||||
|
||||
// Save the job stats
|
||||
//
|
||||
// Arguments:
|
||||
// job *job.Stats: the saving job stats
|
||||
//
|
||||
// Returns:
|
||||
// Non nil error if any issues meet
|
||||
SaveJob(job *job.Stats) error
|
||||
}
|
||||
|
||||
// basicManager is the default implementation of @manager,
|
||||
// based on redis.
|
||||
type basicManager struct {
|
||||
// system context
|
||||
ctx context.Context
|
||||
// db namespace
|
||||
namespace string
|
||||
// redis conn pool
|
||||
pool *redis.Pool
|
||||
// go work client
|
||||
client *work.Client
|
||||
}
|
||||
|
||||
// NewManager news a basic manager
|
||||
func NewManager(ctx context.Context, ns string, pool *redis.Pool) Manager {
|
||||
return &basicManager{
|
||||
ctx: ctx,
|
||||
namespace: ns,
|
||||
pool: pool,
|
||||
client: work.NewClient(ns, pool),
|
||||
}
|
||||
}
|
||||
|
||||
// GetJobs is implementation of Manager.GetJobs
|
||||
// Because of the hash set used to keep the job stats, we can not support a standard pagination.
|
||||
// A cursor is used to fetch the jobs with several batches.
|
||||
func (bm *basicManager) GetJobs(q *query.Parameter) ([]*job.Stats, int64, error) {
|
||||
cursor, count := int64(0), query.DefaultPageSize
|
||||
if q != nil {
|
||||
if q.PageSize > 0 {
|
||||
count = q.PageSize
|
||||
}
|
||||
|
||||
if cur, ok := q.Extras.Get(query.ExtraParamKeyCursor); ok {
|
||||
cursor = cur.(int64)
|
||||
}
|
||||
}
|
||||
|
||||
pattern := rds.KeyJobStats(bm.namespace, "*")
|
||||
args := []interface{}{cursor, "MATCH", pattern, "COUNT", count}
|
||||
|
||||
conn := bm.pool.Get()
|
||||
defer func() {
|
||||
_ = conn.Close()
|
||||
}()
|
||||
|
||||
values, err := redis.Values(conn.Do("SCAN", args...))
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
if len(values) != 2 {
|
||||
return nil, 0, errors.New("malform scan results")
|
||||
}
|
||||
|
||||
nextCur, err := strconv.ParseUint(string(values[0].([]byte)), 10, 8)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
list := values[1].([]interface{})
|
||||
|
||||
results := make([]*job.Stats, 0)
|
||||
for _, v := range list {
|
||||
if bytes, ok := v.([]byte); ok {
|
||||
statsKey := string(bytes)
|
||||
if i := strings.LastIndex(statsKey, ":"); i != -1 {
|
||||
jID := statsKey[i+1:]
|
||||
t := job.NewBasicTrackerWithID(bm.ctx, jID, bm.namespace, bm.pool, nil)
|
||||
if err := t.Load(); err != nil {
|
||||
logger.Errorf("retrieve stats data of job %s error: %s", jID, err)
|
||||
continue
|
||||
}
|
||||
|
||||
results = append(results, t.Job())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return results, int64(nextCur), nil
|
||||
}
|
||||
|
||||
// GetPeriodicExecution is implementation of Manager.GetPeriodicExecution
|
||||
func (bm *basicManager) GetPeriodicExecution(pID string, q *query.Parameter) (results []*job.Stats, total int64, err error) {
|
||||
if utils.IsEmptyStr(pID) {
|
||||
return nil, 0, errors.New("nil periodic job ID")
|
||||
}
|
||||
|
||||
tracker := job.NewBasicTrackerWithID(bm.ctx, pID, bm.namespace, bm.pool, nil)
|
||||
err = tracker.Load()
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
if tracker.Job().Info.JobKind != job.KindPeriodic {
|
||||
return nil, 0, errors.Errorf("only periodic job has executions: %s kind is received", tracker.Job().Info.JobKind)
|
||||
}
|
||||
|
||||
conn := bm.pool.Get()
|
||||
defer func() {
|
||||
_ = conn.Close()
|
||||
}()
|
||||
|
||||
key := rds.KeyUpstreamJobAndExecutions(bm.namespace, pID)
|
||||
|
||||
executionIDs := make([]string, 0)
|
||||
// Query executions by "non stopped"
|
||||
if nonStoppedOnly, ok := q.Extras.Get(query.ExtraParamKeyNonStoppedOnly); ok {
|
||||
if v, yes := nonStoppedOnly.(bool); yes && v {
|
||||
executionIDs, total, err = queryExecutions(conn, key, q)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Query all
|
||||
// Pagination
|
||||
var pageNumber, pageSize uint = 1, query.DefaultPageSize
|
||||
if q != nil {
|
||||
if q.PageNumber > 0 {
|
||||
pageNumber = q.PageNumber
|
||||
}
|
||||
if q.PageSize > 0 {
|
||||
pageSize = q.PageSize
|
||||
}
|
||||
}
|
||||
|
||||
// Get total first
|
||||
total, err = redis.Int64(conn.Do("ZCARD", key))
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
// No items
|
||||
if total == 0 || (int64)((pageNumber-1)*pageSize) >= total {
|
||||
return results, total, nil
|
||||
}
|
||||
|
||||
min, max := (pageNumber-1)*pageSize, pageNumber*pageSize-1
|
||||
args := []interface{}{key, min, max}
|
||||
list, err := redis.Values(conn.Do("ZREVRANGE", args...))
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
for _, item := range list {
|
||||
if eID, ok := item.([]byte); ok {
|
||||
executionIDs = append(executionIDs, string(eID))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for _, eID := range executionIDs {
|
||||
t := job.NewBasicTrackerWithID(bm.ctx, eID, bm.namespace, bm.pool, nil)
|
||||
if er := t.Load(); er != nil {
|
||||
logger.Errorf("track job %s error: %s", eID, err)
|
||||
continue
|
||||
}
|
||||
|
||||
results = append(results, t.Job())
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// GetScheduledJobs is implementation of Manager.GetScheduledJobs
|
||||
func (bm *basicManager) GetScheduledJobs(q *query.Parameter) ([]*job.Stats, int64, error) {
|
||||
// PageSize is not supported here
|
||||
var page uint = 1
|
||||
if q != nil && q.PageNumber > 1 {
|
||||
page = q.PageNumber
|
||||
}
|
||||
|
||||
sJobs, total, err := bm.client.ScheduledJobs(page)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
res := make([]*job.Stats, 0)
|
||||
for _, sJob := range sJobs {
|
||||
jID := sJob.ID
|
||||
if len(sJob.Args) > 0 {
|
||||
if _, ok := sJob.Args[period.PeriodicExecutionMark]; ok {
|
||||
// Periodic scheduled job
|
||||
jID = fmt.Sprintf("%s@%d", sJob.ID, sJob.RunAt)
|
||||
}
|
||||
}
|
||||
t := job.NewBasicTrackerWithID(bm.ctx, jID, bm.namespace, bm.pool, nil)
|
||||
err = t.Load()
|
||||
if err != nil {
|
||||
// Just log it
|
||||
logger.Errorf("mgt.basicManager: query scheduled jobs error: %s", err)
|
||||
continue
|
||||
}
|
||||
|
||||
res = append(res, t.Job())
|
||||
}
|
||||
|
||||
return res, total, nil
|
||||
}
|
||||
|
||||
// GetJob is implementation of Manager.GetJob
|
||||
func (bm *basicManager) GetJob(jobID string) (*job.Stats, error) {
|
||||
if utils.IsEmptyStr(jobID) {
|
||||
return nil, errs.BadRequestError("empty job ID")
|
||||
}
|
||||
|
||||
t := job.NewBasicTrackerWithID(bm.ctx, jobID, bm.namespace, bm.pool, nil)
|
||||
if err := t.Load(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return t.Job(), nil
|
||||
}
|
||||
|
||||
// SaveJob is implementation of Manager.SaveJob
|
||||
func (bm *basicManager) SaveJob(j *job.Stats) error {
|
||||
if j == nil {
|
||||
return errs.BadRequestError("nil saving job stats")
|
||||
}
|
||||
|
||||
t := job.NewBasicTrackerWithStats(bm.ctx, j, bm.namespace, bm.pool, nil)
|
||||
return t.Save()
|
||||
}
|
||||
|
||||
// queryExecutions queries periodic executions by status
|
||||
func queryExecutions(conn redis.Conn, dataKey string, q *query.Parameter) ([]string, int64, error) {
|
||||
total, err := redis.Int64(conn.Do("ZCOUNT", dataKey, 0, "+inf"))
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
var pageNumber, pageSize uint = 1, query.DefaultPageSize
|
||||
if q.PageNumber > 0 {
|
||||
pageNumber = q.PageNumber
|
||||
}
|
||||
if q.PageSize > 0 {
|
||||
pageSize = q.PageSize
|
||||
}
|
||||
|
||||
results := make([]string, 0)
|
||||
if total == 0 || (int64)((pageNumber-1)*pageSize) >= total {
|
||||
return results, total, nil
|
||||
}
|
||||
|
||||
offset := (pageNumber - 1) * pageSize
|
||||
args := []interface{}{dataKey, "+inf", 0, "LIMIT", offset, pageSize}
|
||||
|
||||
eIDs, err := redis.Values(conn.Do("ZREVRANGEBYSCORE", args...))
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
for _, eID := range eIDs {
|
||||
if eIDBytes, ok := eID.([]byte); ok {
|
||||
results = append(results, string(eIDBytes))
|
||||
}
|
||||
}
|
||||
|
||||
return results, total, nil
|
||||
}
|
184
src/jobservice/mgt/manager_test.go
Normal file
184
src/jobservice/mgt/manager_test.go
Normal file
@ -0,0 +1,184 @@
|
||||
// Copyright Project Harbor Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package mgt
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/gocraft/work"
|
||||
"github.com/goharbor/harbor/src/jobservice/common/query"
|
||||
"github.com/goharbor/harbor/src/jobservice/job"
|
||||
"github.com/goharbor/harbor/src/jobservice/tests"
|
||||
"github.com/gomodule/redigo/redis"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/stretchr/testify/suite"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// BasicManagerTestSuite tests the function of basic manager
|
||||
type BasicManagerTestSuite struct {
|
||||
suite.Suite
|
||||
|
||||
namespace string
|
||||
pool *redis.Pool
|
||||
|
||||
manager Manager
|
||||
}
|
||||
|
||||
// SetupSuite prepares the test suite
|
||||
func (suite *BasicManagerTestSuite) SetupSuite() {
|
||||
suite.namespace = tests.GiveMeTestNamespace()
|
||||
suite.pool = tests.GiveMeRedisPool()
|
||||
suite.manager = NewManager(context.TODO(), suite.namespace, suite.pool)
|
||||
|
||||
// Mock data
|
||||
periodicJob := &job.Stats{
|
||||
Info: &job.StatsInfo{
|
||||
JobID: "1000",
|
||||
JobName: job.SampleJob,
|
||||
JobKind: job.KindPeriodic,
|
||||
Status: job.ScheduledStatus.String(),
|
||||
IsUnique: false,
|
||||
CronSpec: "* */10 * * * *",
|
||||
},
|
||||
}
|
||||
|
||||
t := job.NewBasicTrackerWithStats(context.TODO(), periodicJob, suite.namespace, suite.pool, nil)
|
||||
err := t.Save()
|
||||
require.NoError(suite.T(), err)
|
||||
|
||||
execution := &job.Stats{
|
||||
Info: &job.StatsInfo{
|
||||
JobID: "1001",
|
||||
JobKind: job.KindScheduled,
|
||||
JobName: job.SampleJob,
|
||||
Status: job.PendingStatus.String(),
|
||||
IsUnique: false,
|
||||
RunAt: time.Now().Add(5 * time.Minute).Unix(),
|
||||
UpstreamJobID: "1000",
|
||||
},
|
||||
}
|
||||
t = job.NewBasicTrackerWithStats(context.TODO(), execution, suite.namespace, suite.pool, nil)
|
||||
err = t.Save()
|
||||
require.NoError(suite.T(), err)
|
||||
}
|
||||
|
||||
// TearDownSuite clears the test suite
|
||||
func (suite *BasicManagerTestSuite) TearDownSuite() {
|
||||
conn := suite.pool.Get()
|
||||
defer func() {
|
||||
_ = conn.Close()
|
||||
}()
|
||||
|
||||
_ = tests.ClearAll(suite.namespace, conn)
|
||||
}
|
||||
|
||||
// TestBasicManagerTestSuite is entry of go test
|
||||
func TestBasicManagerTestSuite(t *testing.T) {
|
||||
suite.Run(t, new(BasicManagerTestSuite))
|
||||
}
|
||||
|
||||
// TestBasicManagerGetJobs tests get jobs
|
||||
func (suite *BasicManagerTestSuite) TestBasicManagerGetJobs() {
|
||||
jobs, _, err := suite.manager.GetJobs(&query.Parameter{
|
||||
PageSize: 25,
|
||||
PageNumber: 1,
|
||||
})
|
||||
require.NoError(suite.T(), err)
|
||||
assert.Condition(suite.T(), func() bool {
|
||||
return len(jobs) > 0
|
||||
})
|
||||
}
|
||||
|
||||
// TestGetPeriodicExecutions tests get periodic executions
|
||||
func (suite *BasicManagerTestSuite) TestGetPeriodicExecutions() {
|
||||
extras := make(query.ExtraParameters)
|
||||
extras.Set(query.ExtraParamKeyNonStoppedOnly, true)
|
||||
|
||||
jobs, total, err := suite.manager.GetPeriodicExecution("1000", &query.Parameter{
|
||||
PageSize: 20,
|
||||
PageNumber: 1,
|
||||
Extras: extras,
|
||||
})
|
||||
require.NoError(suite.T(), err)
|
||||
assert.Equal(suite.T(), int64(1), total)
|
||||
assert.Equal(suite.T(), int64(1), int64(len(jobs)))
|
||||
|
||||
t := job.NewBasicTrackerWithID(context.TODO(), "1001", suite.namespace, suite.pool, nil)
|
||||
err = t.Load()
|
||||
require.NoError(suite.T(), err)
|
||||
err = t.PeriodicExecutionDone()
|
||||
require.NoError(suite.T(), err)
|
||||
|
||||
jobs, total, err = suite.manager.GetPeriodicExecution("1000", &query.Parameter{
|
||||
PageSize: 20,
|
||||
PageNumber: 1,
|
||||
})
|
||||
require.NoError(suite.T(), err)
|
||||
assert.Equal(suite.T(), int64(1), total)
|
||||
assert.Equal(suite.T(), int64(1), int64(len(jobs)))
|
||||
}
|
||||
|
||||
// TestGetScheduledJobs tests get scheduled jobs
|
||||
func (suite *BasicManagerTestSuite) TestGetScheduledJobs() {
|
||||
enqueuer := work.NewEnqueuer(suite.namespace, suite.pool)
|
||||
scheduledJob, err := enqueuer.EnqueueIn(job.SampleJob, 1000, make(map[string]interface{}))
|
||||
require.NoError(suite.T(), err)
|
||||
stats := &job.Stats{
|
||||
Info: &job.StatsInfo{
|
||||
JobID: scheduledJob.ID,
|
||||
JobName: job.SampleJob,
|
||||
JobKind: job.KindScheduled,
|
||||
Status: job.ScheduledStatus.String(),
|
||||
RunAt: scheduledJob.RunAt,
|
||||
},
|
||||
}
|
||||
|
||||
t := job.NewBasicTrackerWithStats(context.TODO(), stats, suite.namespace, suite.pool, nil)
|
||||
err = t.Save()
|
||||
require.NoError(suite.T(), err)
|
||||
|
||||
list, total, err := suite.manager.GetScheduledJobs(&query.Parameter{
|
||||
PageNumber: 1,
|
||||
PageSize: 10,
|
||||
})
|
||||
require.NoError(suite.T(), err)
|
||||
assert.Equal(suite.T(), int64(1), total)
|
||||
assert.Equal(suite.T(), int64(1), int64(len(list)))
|
||||
}
|
||||
|
||||
// TestGetJob tests get job
|
||||
func (suite *BasicManagerTestSuite) TestGetJob() {
|
||||
j, err := suite.manager.GetJob("1001")
|
||||
require.NoError(suite.T(), err)
|
||||
assert.Equal(suite.T(), j.Info.JobID, "1001")
|
||||
}
|
||||
|
||||
// TestSaveJob tests saving job
|
||||
func (suite *BasicManagerTestSuite) TestSaveJob() {
|
||||
newJob := &job.Stats{
|
||||
Info: &job.StatsInfo{
|
||||
JobID: "1002",
|
||||
JobKind: job.KindPeriodic,
|
||||
JobName: job.SampleJob,
|
||||
Status: job.PendingStatus.String(),
|
||||
IsUnique: false,
|
||||
},
|
||||
}
|
||||
|
||||
err := suite.manager.SaveJob(newJob)
|
||||
require.NoError(suite.T(), err)
|
||||
}
|
@ -17,6 +17,7 @@ package runtime
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/goharbor/harbor/src/jobservice/mgt"
|
||||
"os"
|
||||
"os/signal"
|
||||
"sync"
|
||||
@ -86,7 +87,7 @@ func (bs *Bootstrap) LoadAndRun(ctx context.Context, cancel context.CancelFunc)
|
||||
|
||||
var (
|
||||
backendWorker worker.Interface
|
||||
lcmCtl lcm.Controller
|
||||
manager mgt.Manager
|
||||
)
|
||||
if cfg.PoolConfig.Backend == config.JobServicePoolBackendRedis {
|
||||
// Number of workers
|
||||
@ -95,6 +96,9 @@ func (bs *Bootstrap) LoadAndRun(ctx context.Context, cancel context.CancelFunc)
|
||||
namespace := fmt.Sprintf("{%s}", cfg.PoolConfig.RedisPoolCfg.Namespace)
|
||||
// Get redis connection pool
|
||||
redisPool := bs.getRedisPool(cfg.PoolConfig.RedisPoolCfg.RedisURL)
|
||||
|
||||
// Create stats manager
|
||||
manager = mgt.NewManager(ctx, namespace, redisPool)
|
||||
// Create hook agent, it's a singleton object
|
||||
hookAgent := hook.NewAgent(rootContext, namespace, redisPool)
|
||||
hookCallback := func(URL string, change *job.StatusChange) error {
|
||||
@ -114,7 +118,7 @@ func (bs *Bootstrap) LoadAndRun(ctx context.Context, cancel context.CancelFunc)
|
||||
}
|
||||
|
||||
// Create job life cycle management controller
|
||||
lcmCtl = lcm.NewController(rootContext, namespace, redisPool, hookCallback)
|
||||
lcmCtl := lcm.NewController(rootContext, namespace, redisPool, hookCallback)
|
||||
|
||||
// Start the backend worker
|
||||
backendWorker, err = bs.loadAndRunRedisWorkerPool(
|
||||
@ -145,7 +149,7 @@ func (bs *Bootstrap) LoadAndRun(ctx context.Context, cancel context.CancelFunc)
|
||||
}
|
||||
|
||||
// Initialize controller
|
||||
ctl := core.NewController(backendWorker, lcmCtl)
|
||||
ctl := core.NewController(backendWorker, manager)
|
||||
// Start the API server
|
||||
apiServer := bs.createAPIServer(ctx, cfg, ctl)
|
||||
|
||||
|
@ -20,7 +20,6 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/gocraft/work"
|
||||
"github.com/goharbor/harbor/src/jobservice/common/query"
|
||||
"github.com/goharbor/harbor/src/jobservice/common/utils"
|
||||
"github.com/goharbor/harbor/src/jobservice/env"
|
||||
"github.com/goharbor/harbor/src/jobservice/job"
|
||||
@ -362,40 +361,6 @@ func (w *basicWorker) ValidateJobParameters(jobType interface{}, params job.Para
|
||||
return theJ.Validate(params)
|
||||
}
|
||||
|
||||
// ScheduledJobs returns the scheduled jobs by page
|
||||
func (w *basicWorker) ScheduledJobs(query *query.Parameter) ([]*job.Stats, int64, error) {
|
||||
var page uint = 1
|
||||
if query != nil && query.PageNumber > 1 {
|
||||
page = query.PageNumber
|
||||
}
|
||||
|
||||
sJobs, total, err := w.client.ScheduledJobs(page)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
res := make([]*job.Stats, 0)
|
||||
for _, sJob := range sJobs {
|
||||
jID := sJob.ID
|
||||
if len(sJob.Args) > 0 {
|
||||
if _, ok := sJob.Args[period.PeriodicExecutionMark]; ok {
|
||||
// Periodic scheduled job
|
||||
jID = fmt.Sprintf("%s@%d", sJob.ID, sJob.RunAt)
|
||||
}
|
||||
}
|
||||
t, err := w.ctl.Track(jID)
|
||||
if err != nil {
|
||||
// Just log it
|
||||
logger.Errorf("cworker: query scheduled jobs error: %s", err)
|
||||
continue
|
||||
}
|
||||
|
||||
res = append(res, t.Job())
|
||||
}
|
||||
|
||||
return res, total, nil
|
||||
}
|
||||
|
||||
// RegisterJob is used to register the job to the worker.
|
||||
// j is the type of job
|
||||
func (w *basicWorker) registerJob(name string, j interface{}) (err error) {
|
||||
|
@ -156,6 +156,9 @@ func (suite *CWorkerTestSuite) TestEnqueuePeriodicJob() {
|
||||
params["name"] = "testing:v1"
|
||||
|
||||
m := time.Now().Minute()
|
||||
if m+2 >= 60 {
|
||||
m = m - 2
|
||||
}
|
||||
_, err := suite.cWorker.PeriodicallyEnqueue(
|
||||
"fake_job",
|
||||
params,
|
||||
@ -204,16 +207,6 @@ func (suite *CWorkerTestSuite) TestStopJob() {
|
||||
require.NoError(suite.T(), err, "stop job: nil error expected but got %s", err)
|
||||
}
|
||||
|
||||
// TestScheduledJobs tests get scheduled job
|
||||
func (suite *CWorkerTestSuite) TestScheduledJobs() {
|
||||
params := make(map[string]interface{})
|
||||
params["name"] = "testing:v1"
|
||||
|
||||
_, total, err := suite.cWorker.ScheduledJobs(nil)
|
||||
require.NoError(suite.T(), err, "get scheduled job: nil error expected but got %s", err)
|
||||
assert.EqualValues(suite.T(), int64(2), total, "expect 1 item but got 0")
|
||||
}
|
||||
|
||||
type fakeJob struct{}
|
||||
|
||||
func (j *fakeJob) MaxFails() uint {
|
||||
|
@ -15,7 +15,6 @@
|
||||
package worker
|
||||
|
||||
import (
|
||||
"github.com/goharbor/harbor/src/jobservice/common/query"
|
||||
"github.com/goharbor/harbor/src/jobservice/job"
|
||||
)
|
||||
|
||||
@ -112,16 +111,4 @@ type Interface interface {
|
||||
// Return:
|
||||
// error : error returned if meet any problems
|
||||
RetryJob(jobID string) error
|
||||
|
||||
// Get the scheduled jobs by page
|
||||
// The page number in the query will be ignored, default 20 is used. This is the limitation of backend lib.
|
||||
// The total number is also returned.
|
||||
//
|
||||
// query *query.Parameter : query parameters
|
||||
//
|
||||
// Return:
|
||||
// []*job.Stats : list of scheduled jobs
|
||||
// int : the total number of scheduled jobs
|
||||
// error : non nil error if meet any issues
|
||||
ScheduledJobs(query *query.Parameter) ([]*job.Stats, int64, error)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user