mirror of
https://github.com/goharbor/harbor.git
synced 2024-09-28 13:27:31 +02:00
Merge pull request #7585 from steven-zou/add_get_jobs_API
add get jobs API to provide powerful job stats management
This commit is contained in:
commit
b664b90b86
@ -34,7 +34,10 @@ import (
|
|||||||
"strconv"
|
"strconv"
|
||||||
)
|
)
|
||||||
|
|
||||||
const totalHeaderKey = "Total-Count"
|
const (
|
||||||
|
totalHeaderKey = "Total-Count"
|
||||||
|
nextCursorKey = "Next-Cursor"
|
||||||
|
)
|
||||||
|
|
||||||
// Handler defines approaches to handle the http requests.
|
// Handler defines approaches to handle the http requests.
|
||||||
type Handler interface {
|
type Handler interface {
|
||||||
@ -56,8 +59,8 @@ type Handler interface {
|
|||||||
// HandleJobLogReq is used to handle the request of getting periodic executions
|
// HandleJobLogReq is used to handle the request of getting periodic executions
|
||||||
HandlePeriodicExecutions(w http.ResponseWriter, req *http.Request)
|
HandlePeriodicExecutions(w http.ResponseWriter, req *http.Request)
|
||||||
|
|
||||||
// HandleScheduledJobs is used to handle the request of getting pending scheduled jobs
|
// HandleGetJobsReq is used to handle the request of getting jobs
|
||||||
HandleScheduledJobs(w http.ResponseWriter, req *http.Request)
|
HandleGetJobsReq(w http.ResponseWriter, req *http.Request)
|
||||||
}
|
}
|
||||||
|
|
||||||
// DefaultHandler is the default request handler which implements the Handler interface.
|
// DefaultHandler is the default request handler which implements the Handler interface.
|
||||||
@ -244,17 +247,24 @@ func (dh *DefaultHandler) HandlePeriodicExecutions(w http.ResponseWriter, req *h
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// HandleScheduledJobs is implementation of method defined in interface 'Handler'
|
// HandleGetJobsReq is implementation of method defined in interface 'Handler'
|
||||||
func (dh *DefaultHandler) HandleScheduledJobs(w http.ResponseWriter, req *http.Request) {
|
func (dh *DefaultHandler) HandleGetJobsReq(w http.ResponseWriter, req *http.Request) {
|
||||||
// Get query parameters
|
// Get query parameters
|
||||||
q := extractQuery(req)
|
q := extractQuery(req)
|
||||||
jobs, total, err := dh.controller.ScheduledJobs(q)
|
jobs, total, err := dh.controller.GetJobs(q)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
dh.handleError(w, req, http.StatusInternalServerError, errs.GetScheduledJobsError(err))
|
dh.handleError(w, req, http.StatusInternalServerError, errs.GetJobsError(q, err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
w.Header().Add(totalHeaderKey, fmt.Sprintf("%d", total))
|
key := nextCursorKey
|
||||||
|
if v, ok := q.Extras.Get(query.ExtraParamKeyKind); ok {
|
||||||
|
if kind, yes := v.(string); yes && kind == job.KindScheduled {
|
||||||
|
key = totalHeaderKey
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
w.Header().Add(key, fmt.Sprintf("%d", total))
|
||||||
dh.handleJSONData(w, req, http.StatusOK, jobs)
|
dh.handleJSONData(w, req, http.StatusOK, jobs)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -321,6 +331,20 @@ func extractQuery(req *http.Request) *query.Parameter {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Extra job kind query param
|
||||||
|
jobKind := queries.Get(query.ParamKeyJobKind)
|
||||||
|
if !utils.IsEmptyStr(jobKind) {
|
||||||
|
q.Extras.Set(query.ExtraParamKeyKind, jobKind)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Extra query cursor
|
||||||
|
cursorV := queries.Get(query.ParamKeyCursor)
|
||||||
|
if !utils.IsEmptyStr(cursorV) {
|
||||||
|
if cursor, err := strconv.ParseInt(cursorV, 10, 32); err == nil {
|
||||||
|
q.Extras.Set(query.ExtraParamKeyCursor, cursor)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return q
|
return q
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -277,7 +277,7 @@ func (suite *APIHandlerTestSuite) TestGetPeriodicExecutionsWithQuery() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// TestScheduledJobs ...
|
// TestScheduledJobs ...
|
||||||
func (suite *APIHandlerTestSuite) TestScheduledJobs() {
|
func (suite *APIHandlerTestSuite) TestGetJobs() {
|
||||||
q := &query.Parameter{
|
q := &query.Parameter{
|
||||||
PageNumber: 2,
|
PageNumber: 2,
|
||||||
PageSize: 50,
|
PageSize: 50,
|
||||||
@ -285,11 +285,20 @@ func (suite *APIHandlerTestSuite) TestScheduledJobs() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fc := &fakeController{}
|
fc := &fakeController{}
|
||||||
fc.On("ScheduledJobs", q).
|
fc.On("GetJobs", q).
|
||||||
Return([]*job.Stats{createJobStats("sample", "Generic", "")}, int64(1), nil)
|
Return([]*job.Stats{createJobStats("sample", job.KindGeneric, "")}, int64(1), nil)
|
||||||
suite.controller = fc
|
suite.controller = fc
|
||||||
|
|
||||||
_, code := suite.getReq(fmt.Sprintf("%s/%s", suite.APIAddr, "jobs/scheduled?page_number=2&page_size=50"))
|
_, code := suite.getReq(fmt.Sprintf("%s/%s", suite.APIAddr, "jobs?page_number=2&page_size=50"))
|
||||||
|
assert.Equal(suite.T(), 200, code, "expected 200 ok but got %d", code)
|
||||||
|
|
||||||
|
q.Extras.Set(query.ExtraParamKeyKind, job.KindScheduled)
|
||||||
|
fc = &fakeController{}
|
||||||
|
fc.On("GetJobs", q).
|
||||||
|
Return([]*job.Stats{createJobStats("sample", job.KindScheduled, "")}, int64(1), nil)
|
||||||
|
suite.controller = fc
|
||||||
|
|
||||||
|
_, code = suite.getReq(fmt.Sprintf("%s/%s", suite.APIAddr, "jobs?page_number=2&page_size=50&kind=Scheduled"))
|
||||||
assert.Equal(suite.T(), 200, code, "expected 200 ok but got %d", code)
|
assert.Equal(suite.T(), 200, code, "expected 200 ok but got %d", code)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -397,8 +406,8 @@ func (suite *APIHandlerTestSuite) GetPeriodicExecutions(periodicJobID string, qu
|
|||||||
return suite.controller.GetPeriodicExecutions(periodicJobID, query)
|
return suite.controller.GetPeriodicExecutions(periodicJobID, query)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (suite *APIHandlerTestSuite) ScheduledJobs(query *query.Parameter) ([]*job.Stats, int64, error) {
|
func (suite *APIHandlerTestSuite) GetJobs(query *query.Parameter) ([]*job.Stats, int64, error) {
|
||||||
return suite.controller.ScheduledJobs(query)
|
return suite.controller.GetJobs(query)
|
||||||
}
|
}
|
||||||
|
|
||||||
type fakeController struct {
|
type fakeController struct {
|
||||||
@ -460,7 +469,7 @@ func (fc *fakeController) GetPeriodicExecutions(periodicJobID string, query *que
|
|||||||
return args.Get(0).([]*job.Stats), args.Get(1).(int64), nil
|
return args.Get(0).([]*job.Stats), args.Get(1).(int64), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fc *fakeController) ScheduledJobs(query *query.Parameter) ([]*job.Stats, int64, error) {
|
func (fc *fakeController) GetJobs(query *query.Parameter) ([]*job.Stats, int64, error) {
|
||||||
args := fc.Called(query)
|
args := fc.Called(query)
|
||||||
if args.Error(2) != nil {
|
if args.Error(2) != nil {
|
||||||
return nil, args.Get(1).(int64), args.Error(2)
|
return nil, args.Get(1).(int64), args.Error(2)
|
||||||
|
@ -88,7 +88,7 @@ func (br *BaseRouter) registerRoutes() {
|
|||||||
subRouter := br.router.PathPrefix(fmt.Sprintf("%s/%s", baseRoute, apiVersion)).Subrouter()
|
subRouter := br.router.PathPrefix(fmt.Sprintf("%s/%s", baseRoute, apiVersion)).Subrouter()
|
||||||
|
|
||||||
subRouter.HandleFunc("/jobs", br.handler.HandleLaunchJobReq).Methods(http.MethodPost)
|
subRouter.HandleFunc("/jobs", br.handler.HandleLaunchJobReq).Methods(http.MethodPost)
|
||||||
subRouter.HandleFunc("/jobs/scheduled", br.handler.HandleScheduledJobs).Methods(http.MethodGet)
|
subRouter.HandleFunc("/jobs", br.handler.HandleGetJobsReq).Methods(http.MethodGet)
|
||||||
subRouter.HandleFunc("/jobs/{job_id}", br.handler.HandleGetJobReq).Methods(http.MethodGet)
|
subRouter.HandleFunc("/jobs/{job_id}", br.handler.HandleGetJobReq).Methods(http.MethodGet)
|
||||||
subRouter.HandleFunc("/jobs/{job_id}", br.handler.HandleJobActionReq).Methods(http.MethodPost)
|
subRouter.HandleFunc("/jobs/{job_id}", br.handler.HandleJobActionReq).Methods(http.MethodPost)
|
||||||
subRouter.HandleFunc("/jobs/{job_id}/log", br.handler.HandleJobLogReq).Methods(http.MethodGet)
|
subRouter.HandleFunc("/jobs/{job_id}/log", br.handler.HandleJobLogReq).Methods(http.MethodGet)
|
||||||
|
@ -14,6 +14,8 @@
|
|||||||
|
|
||||||
package query
|
package query
|
||||||
|
|
||||||
|
import "encoding/json"
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// DefaultPageSize defines the default page size
|
// DefaultPageSize defines the default page size
|
||||||
DefaultPageSize uint = 25
|
DefaultPageSize uint = 25
|
||||||
@ -23,8 +25,16 @@ const (
|
|||||||
ParamKeyPageSize = "page_size"
|
ParamKeyPageSize = "page_size"
|
||||||
// ParamKeyNonStoppedOnly defines query param key of querying non stopped periodic executions
|
// ParamKeyNonStoppedOnly defines query param key of querying non stopped periodic executions
|
||||||
ParamKeyNonStoppedOnly = "non_dead_only"
|
ParamKeyNonStoppedOnly = "non_dead_only"
|
||||||
|
// ParamKeyCursor defines query param of cursor for fetching job stats with batches
|
||||||
|
ParamKeyCursor = "cursor"
|
||||||
|
// ParamKeyJobKind defines query param of job kind
|
||||||
|
ParamKeyJobKind = "kind"
|
||||||
// ExtraParamKeyNonStoppedOnly defines extra parameter key for querying non stopped periodic executions
|
// ExtraParamKeyNonStoppedOnly defines extra parameter key for querying non stopped periodic executions
|
||||||
ExtraParamKeyNonStoppedOnly = "NonDeadOnly"
|
ExtraParamKeyNonStoppedOnly = "NonDeadOnly"
|
||||||
|
// ExtraParamKeyCursor defines extra parameter key for the cursor of fetching job stats with batches
|
||||||
|
ExtraParamKeyCursor = "Cursor"
|
||||||
|
// ExtraParamKeyKind defines extra parameter key for the job kind
|
||||||
|
ExtraParamKeyKind = "Kind"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ExtraParameters to keep non pagination query parameters
|
// ExtraParameters to keep non pagination query parameters
|
||||||
@ -44,6 +54,16 @@ func (ep ExtraParameters) Get(key string) (interface{}, bool) {
|
|||||||
return v, ok
|
return v, ok
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// String returns the json string of ExtraParameters
|
||||||
|
func (ep ExtraParameters) String() string {
|
||||||
|
bytes, err := json.Marshal(&ep)
|
||||||
|
if err != nil {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
return string(bytes)
|
||||||
|
}
|
||||||
|
|
||||||
// Parameter for getting executions
|
// Parameter for getting executions
|
||||||
type Parameter struct {
|
type Parameter struct {
|
||||||
PageNumber uint
|
PageNumber uint
|
||||||
|
@ -1,6 +1,8 @@
|
|||||||
package query
|
package query
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
@ -26,4 +28,13 @@ func (suite *QueryTestSuite) TestExtraParams() {
|
|||||||
|
|
||||||
assert.Equal(suite.T(), true, ok)
|
assert.Equal(suite.T(), true, ok)
|
||||||
assert.Equal(suite.T(), 100, v.(int))
|
assert.Equal(suite.T(), 100, v.(int))
|
||||||
|
|
||||||
|
str := extras.String()
|
||||||
|
copy := make(ExtraParameters)
|
||||||
|
err := json.Unmarshal([]byte(str), ©)
|
||||||
|
require.NoError(suite.T(), err)
|
||||||
|
|
||||||
|
v, ok = copy.Get("a")
|
||||||
|
assert.Equal(suite.T(), true, ok)
|
||||||
|
assert.Equal(suite.T(), 100, int(v.(float64)))
|
||||||
}
|
}
|
||||||
|
@ -37,8 +37,6 @@ func RedisKeyLastPeriodicEnqueue(namespace string) string {
|
|||||||
return RedisNamespacePrefix(namespace) + "last_periodic_enqueue"
|
return RedisNamespacePrefix(namespace) + "last_periodic_enqueue"
|
||||||
}
|
}
|
||||||
|
|
||||||
// ----------------------------------------------------------
|
|
||||||
|
|
||||||
// KeyNamespacePrefix returns the based key based on the namespace.
|
// KeyNamespacePrefix returns the based key based on the namespace.
|
||||||
func KeyNamespacePrefix(namespace string) string {
|
func KeyNamespacePrefix(namespace string) string {
|
||||||
ns := strings.TrimSpace(namespace)
|
ns := strings.TrimSpace(namespace)
|
||||||
|
@ -16,6 +16,7 @@ package core
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"github.com/goharbor/harbor/src/jobservice/mgt"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
|
|
||||||
"github.com/goharbor/harbor/src/jobservice/logger"
|
"github.com/goharbor/harbor/src/jobservice/logger"
|
||||||
@ -24,7 +25,6 @@ import (
|
|||||||
"github.com/goharbor/harbor/src/jobservice/common/utils"
|
"github.com/goharbor/harbor/src/jobservice/common/utils"
|
||||||
"github.com/goharbor/harbor/src/jobservice/errs"
|
"github.com/goharbor/harbor/src/jobservice/errs"
|
||||||
"github.com/goharbor/harbor/src/jobservice/job"
|
"github.com/goharbor/harbor/src/jobservice/job"
|
||||||
"github.com/goharbor/harbor/src/jobservice/lcm"
|
|
||||||
"github.com/goharbor/harbor/src/jobservice/worker"
|
"github.com/goharbor/harbor/src/jobservice/worker"
|
||||||
"github.com/robfig/cron"
|
"github.com/robfig/cron"
|
||||||
)
|
)
|
||||||
@ -34,15 +34,15 @@ import (
|
|||||||
type basicController struct {
|
type basicController struct {
|
||||||
// Refer the backend worker
|
// Refer the backend worker
|
||||||
backendWorker worker.Interface
|
backendWorker worker.Interface
|
||||||
// Refer the job life cycle management controller
|
// Refer the job stats manager
|
||||||
ctl lcm.Controller
|
manager mgt.Manager
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewController is constructor of basicController.
|
// NewController is constructor of basicController.
|
||||||
func NewController(backendWorker worker.Interface, ctl lcm.Controller) Interface {
|
func NewController(backendWorker worker.Interface, mgr mgt.Manager) Interface {
|
||||||
return &basicController{
|
return &basicController{
|
||||||
backendWorker: backendWorker,
|
backendWorker: backendWorker,
|
||||||
ctl: ctl,
|
manager: mgr,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -92,7 +92,7 @@ func (bc *basicController) LaunchJob(req *job.Request) (res *job.Stats, err erro
|
|||||||
|
|
||||||
// Save job stats
|
// Save job stats
|
||||||
if err == nil {
|
if err == nil {
|
||||||
if _, err := bc.ctl.New(res); err != nil {
|
if err := bc.manager.SaveJob(res); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -106,12 +106,7 @@ func (bc *basicController) GetJob(jobID string) (*job.Stats, error) {
|
|||||||
return nil, errs.BadRequestError(errors.New("empty job ID"))
|
return nil, errs.BadRequestError(errors.New("empty job ID"))
|
||||||
}
|
}
|
||||||
|
|
||||||
t, err := bc.ctl.Track(jobID)
|
return bc.manager.GetJob(jobID)
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return t.Job(), nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// StopJob is implementation of same method in core interface.
|
// StopJob is implementation of same method in core interface.
|
||||||
@ -157,32 +152,25 @@ func (bc *basicController) GetPeriodicExecutions(periodicJobID string, query *qu
|
|||||||
return nil, 0, errs.BadRequestError(errors.New("nil periodic job ID"))
|
return nil, 0, errs.BadRequestError(errors.New("nil periodic job ID"))
|
||||||
}
|
}
|
||||||
|
|
||||||
t, err := bc.ctl.Track(periodicJobID)
|
return bc.manager.GetPeriodicExecution(periodicJobID, query)
|
||||||
if err != nil {
|
|
||||||
return nil, 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
eIDs, total, err := t.Executions(query)
|
|
||||||
if err != nil {
|
|
||||||
return nil, 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
res := make([]*job.Stats, 0)
|
|
||||||
for _, eID := range eIDs {
|
|
||||||
et, err := bc.ctl.Track(eID)
|
|
||||||
if err != nil {
|
|
||||||
return nil, 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
res = append(res, et.Job())
|
|
||||||
}
|
|
||||||
|
|
||||||
return res, total, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ScheduledJobs returns the scheduled jobs by page
|
// GetJobs returns the jobs by specified
|
||||||
func (bc *basicController) ScheduledJobs(query *query.Parameter) ([]*job.Stats, int64, error) {
|
func (bc *basicController) GetJobs(q *query.Parameter) ([]*job.Stats, int64, error) {
|
||||||
return bc.backendWorker.ScheduledJobs(query)
|
onlyScheduledJobs := false
|
||||||
|
if q != nil && q.Extras != nil {
|
||||||
|
if v, ok := q.Extras.Get(query.ExtraParamKeyKind); ok {
|
||||||
|
if job.KindScheduled == v.(string) {
|
||||||
|
onlyScheduledJobs = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if onlyScheduledJobs {
|
||||||
|
return bc.manager.GetScheduledJobs(q)
|
||||||
|
}
|
||||||
|
|
||||||
|
return bc.manager.GetJobs(q)
|
||||||
}
|
}
|
||||||
|
|
||||||
func validJobReq(req *job.Request) error {
|
func validJobReq(req *job.Request) error {
|
||||||
|
@ -14,28 +14,25 @@
|
|||||||
package core
|
package core
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"github.com/goharbor/harbor/src/jobservice/common/query"
|
"github.com/goharbor/harbor/src/jobservice/common/query"
|
||||||
"github.com/goharbor/harbor/src/jobservice/common/utils"
|
"github.com/goharbor/harbor/src/jobservice/common/utils"
|
||||||
"github.com/goharbor/harbor/src/jobservice/job"
|
"github.com/goharbor/harbor/src/jobservice/job"
|
||||||
"github.com/goharbor/harbor/src/jobservice/job/impl/sample"
|
"github.com/goharbor/harbor/src/jobservice/job/impl/sample"
|
||||||
"github.com/goharbor/harbor/src/jobservice/tests"
|
|
||||||
"github.com/goharbor/harbor/src/jobservice/worker"
|
"github.com/goharbor/harbor/src/jobservice/worker"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/mock"
|
"github.com/stretchr/testify/mock"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"github.com/stretchr/testify/suite"
|
"github.com/stretchr/testify/suite"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// ControllerTestSuite tests functions of core controller
|
// ControllerTestSuite tests functions of core controller
|
||||||
type ControllerTestSuite struct {
|
type ControllerTestSuite struct {
|
||||||
suite.Suite
|
suite.Suite
|
||||||
|
|
||||||
lcmCtl *fakeLcmController
|
manager *fakeManager
|
||||||
worker *fakeWorker
|
worker *fakeWorker
|
||||||
ctl Interface
|
ctl Interface
|
||||||
|
|
||||||
res *job.Stats
|
res *job.Stats
|
||||||
jobID string
|
jobID string
|
||||||
@ -60,14 +57,17 @@ func (suite *ControllerTestSuite) SetupSuite() {
|
|||||||
// Prepare for each test case
|
// Prepare for each test case
|
||||||
func (suite *ControllerTestSuite) SetupTest() {
|
func (suite *ControllerTestSuite) SetupTest() {
|
||||||
suite.worker = &fakeWorker{}
|
suite.worker = &fakeWorker{}
|
||||||
suite.lcmCtl = &fakeLcmController{}
|
|
||||||
|
|
||||||
suite.lcmCtl.On("Track", suite.jobID).Return(job.NewBasicTrackerWithStats(nil, suite.res, "ns", nil, nil), nil)
|
|
||||||
suite.lcmCtl.On("New", suite.res).Return(job.NewBasicTrackerWithStats(nil, suite.res, "ns", nil, nil), nil)
|
|
||||||
|
|
||||||
suite.worker.On("IsKnownJob", job.SampleJob).Return((*sample.Job)(nil), true)
|
suite.worker.On("IsKnownJob", job.SampleJob).Return((*sample.Job)(nil), true)
|
||||||
suite.worker.On("IsKnownJob", "fake").Return(nil, false)
|
suite.worker.On("IsKnownJob", "fake").Return(nil, false)
|
||||||
suite.worker.On("ValidateJobParameters", (*sample.Job)(nil), suite.params).Return(nil)
|
suite.worker.On("ValidateJobParameters", (*sample.Job)(nil), suite.params).Return(nil)
|
||||||
|
|
||||||
|
fakeMgr := &fakeManager{}
|
||||||
|
fakeMgr.On("SaveJob", suite.res).Return(nil)
|
||||||
|
fakeMgr.On("GetJob", suite.jobID).Return(suite.res, nil)
|
||||||
|
|
||||||
|
suite.manager = fakeMgr
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestControllerTestSuite is suite entry for 'go test'
|
// TestControllerTestSuite is suite entry for 'go test'
|
||||||
@ -142,20 +142,6 @@ func (suite *ControllerTestSuite) TestCheckStatus() {
|
|||||||
assert.Equal(suite.T(), "running", st.Pools[0].Status, "expected running pool but got %s", st.Pools[0].Status)
|
assert.Equal(suite.T(), "running", st.Pools[0].Status, "expected running pool but got %s", st.Pools[0].Status)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestScheduledJobs ...
|
|
||||||
func (suite *ControllerTestSuite) TestScheduledJobs() {
|
|
||||||
q := &query.Parameter{
|
|
||||||
PageSize: 20,
|
|
||||||
PageNumber: 1,
|
|
||||||
}
|
|
||||||
|
|
||||||
suite.worker.On("ScheduledJobs", q).Return([]*job.Stats{suite.res}, 1, nil)
|
|
||||||
|
|
||||||
_, total, err := suite.ctl.ScheduledJobs(q)
|
|
||||||
require.Nil(suite.T(), err, "scheduled jobs: nil error expected but got %s", err)
|
|
||||||
assert.Equal(suite.T(), int64(1), total, "expected 1 item but got 0")
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestInvalidChecks ...
|
// TestInvalidChecks ...
|
||||||
func (suite *ControllerTestSuite) TestInvalidChecks() {
|
func (suite *ControllerTestSuite) TestInvalidChecks() {
|
||||||
req := createJobReq("kind")
|
req := createJobReq("kind")
|
||||||
@ -180,57 +166,42 @@ func (suite *ControllerTestSuite) TestInvalidChecks() {
|
|||||||
assert.NotNil(suite.T(), err, "invalid job name: error expected but got nil")
|
assert.NotNil(suite.T(), err, "invalid job name: error expected but got nil")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestScheduledJobs ...
|
||||||
|
func (suite *ControllerTestSuite) TestGetScheduledJobs() {
|
||||||
|
extras := make(query.ExtraParameters)
|
||||||
|
extras.Set(query.ExtraParamKeyKind, job.KindScheduled)
|
||||||
|
q := &query.Parameter{
|
||||||
|
PageSize: 20,
|
||||||
|
PageNumber: 1,
|
||||||
|
Extras: extras,
|
||||||
|
}
|
||||||
|
|
||||||
|
fakeMgr := &fakeManager{}
|
||||||
|
fakeMgr.On("SaveJob", suite.res).Return(nil)
|
||||||
|
fakeMgr.On("GetJob", suite.jobID).Return(suite.res, nil)
|
||||||
|
fakeMgr.On("GetScheduledJobs", q).Return([]*job.Stats{suite.res}, int64(1), nil)
|
||||||
|
suite.manager = fakeMgr
|
||||||
|
|
||||||
|
_, total, err := suite.ctl.GetJobs(q)
|
||||||
|
require.Nil(suite.T(), err, "scheduled jobs: nil error expected but got %s", err)
|
||||||
|
assert.Equal(suite.T(), int64(1), total, "expected 1 item but got 0")
|
||||||
|
}
|
||||||
|
|
||||||
// TestGetPeriodicExecutions tests GetPeriodicExecutions
|
// TestGetPeriodicExecutions tests GetPeriodicExecutions
|
||||||
func (suite *ControllerTestSuite) TestGetPeriodicExecutions() {
|
func (suite *ControllerTestSuite) TestGetPeriodicExecutions() {
|
||||||
pool := tests.GiveMeRedisPool()
|
q := &query.Parameter{
|
||||||
namespace := tests.GiveMeTestNamespace()
|
|
||||||
|
|
||||||
jobID := utils.MakeIdentifier()
|
|
||||||
nID := time.Now().Unix()
|
|
||||||
mockJobStats := &job.Stats{
|
|
||||||
Info: &job.StatsInfo{
|
|
||||||
JobID: jobID,
|
|
||||||
Status: job.ScheduledStatus.String(),
|
|
||||||
JobKind: job.KindPeriodic,
|
|
||||||
JobName: job.SampleJob,
|
|
||||||
IsUnique: false,
|
|
||||||
CronSpec: "0 0 * * * *",
|
|
||||||
NumericPID: nID,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
t := job.NewBasicTrackerWithStats(context.TODO(), mockJobStats, namespace, pool, nil)
|
|
||||||
err := t.Save()
|
|
||||||
require.NoError(suite.T(), err)
|
|
||||||
|
|
||||||
executionID := utils.MakeIdentifier()
|
|
||||||
runAt := time.Now().Add(1 * time.Hour).Unix()
|
|
||||||
executionStats := &job.Stats{
|
|
||||||
Info: &job.StatsInfo{
|
|
||||||
JobID: executionID,
|
|
||||||
Status: job.ScheduledStatus.String(),
|
|
||||||
JobKind: job.KindScheduled,
|
|
||||||
JobName: job.SampleJob,
|
|
||||||
IsUnique: false,
|
|
||||||
CronSpec: "0 0 * * * *",
|
|
||||||
RunAt: runAt,
|
|
||||||
EnqueueTime: runAt,
|
|
||||||
UpstreamJobID: jobID,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
t2 := job.NewBasicTrackerWithStats(context.TODO(), executionStats, namespace, pool, nil)
|
|
||||||
err = t2.Save()
|
|
||||||
require.NoError(suite.T(), err)
|
|
||||||
|
|
||||||
suite.lcmCtl.On("Track", jobID).Return(t, nil)
|
|
||||||
suite.lcmCtl.On("Track", executionID).Return(t2, nil)
|
|
||||||
|
|
||||||
_, total, err := suite.ctl.GetPeriodicExecutions(jobID, &query.Parameter{
|
|
||||||
PageSize: 10,
|
PageSize: 10,
|
||||||
PageNumber: 1,
|
PageNumber: 1,
|
||||||
Extras: make(query.ExtraParameters),
|
Extras: make(query.ExtraParameters),
|
||||||
})
|
}
|
||||||
|
|
||||||
|
fakeMgr := &fakeManager{}
|
||||||
|
fakeMgr.On("SaveJob", suite.res).Return(nil)
|
||||||
|
fakeMgr.On("GetJob", suite.jobID).Return(suite.res, nil)
|
||||||
|
fakeMgr.On("GetPeriodicExecution", "1000", q).Return([]*job.Stats{suite.res}, int64(1), nil)
|
||||||
|
suite.manager = fakeMgr
|
||||||
|
|
||||||
|
_, total, err := suite.ctl.GetPeriodicExecutions("1000", q)
|
||||||
require.NoError(suite.T(), err)
|
require.NoError(suite.T(), err)
|
||||||
assert.Equal(suite.T(), int64(1), total)
|
assert.Equal(suite.T(), int64(1), total)
|
||||||
}
|
}
|
||||||
@ -253,19 +224,6 @@ func createJobReq(kind string) *job.Request {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Implement lcm controller interface
|
|
||||||
func (suite *ControllerTestSuite) Serve() error {
|
|
||||||
return suite.lcmCtl.Serve()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (suite *ControllerTestSuite) New(stats *job.Stats) (job.Tracker, error) {
|
|
||||||
return suite.lcmCtl.New(stats)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (suite *ControllerTestSuite) Track(jobID string) (job.Tracker, error) {
|
|
||||||
return suite.lcmCtl.Track(jobID)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Implement worker interface
|
// Implement worker interface
|
||||||
func (suite *ControllerTestSuite) Start() error {
|
func (suite *ControllerTestSuite) Start() error {
|
||||||
return suite.worker.Start()
|
return suite.worker.Start()
|
||||||
@ -307,37 +265,28 @@ func (suite *ControllerTestSuite) RetryJob(jobID string) error {
|
|||||||
return suite.worker.RetryJob(jobID)
|
return suite.worker.RetryJob(jobID)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (suite *ControllerTestSuite) ScheduledJobs(query *query.Parameter) ([]*job.Stats, int64, error) {
|
// Implement manager interface
|
||||||
return suite.worker.ScheduledJobs(query)
|
func (suite *ControllerTestSuite) GetJobs(q *query.Parameter) ([]*job.Stats, int64, error) {
|
||||||
|
return suite.manager.GetJobs(q)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Implement fake objects with mock
|
func (suite *ControllerTestSuite) GetPeriodicExecution(pID string, q *query.Parameter) ([]*job.Stats, int64, error) {
|
||||||
type fakeLcmController struct {
|
return suite.manager.GetPeriodicExecution(pID, q)
|
||||||
mock.Mock
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (flc *fakeLcmController) Serve() error {
|
func (suite *ControllerTestSuite) GetScheduledJobs(q *query.Parameter) ([]*job.Stats, int64, error) {
|
||||||
return flc.Called().Error(0)
|
return suite.manager.GetScheduledJobs(q)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (flc *fakeLcmController) New(stats *job.Stats) (job.Tracker, error) {
|
func (suite *ControllerTestSuite) GetJob(jobID string) (*job.Stats, error) {
|
||||||
args := flc.Called(stats)
|
return suite.manager.GetJob(jobID)
|
||||||
if args.Error(1) != nil {
|
|
||||||
return nil, args.Error(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
return args.Get(0).(job.Tracker), nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (flc *fakeLcmController) Track(jobID string) (job.Tracker, error) {
|
func (suite *ControllerTestSuite) SaveJob(j *job.Stats) error {
|
||||||
args := flc.Called(jobID)
|
return suite.manager.SaveJob(j)
|
||||||
if args.Error(1) != nil {
|
|
||||||
return nil, args.Error(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
return args.Get(0).(job.Tracker), nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// fake worker
|
||||||
type fakeWorker struct {
|
type fakeWorker struct {
|
||||||
mock.Mock
|
mock.Mock
|
||||||
}
|
}
|
||||||
@ -407,11 +356,48 @@ func (f *fakeWorker) RetryJob(jobID string) error {
|
|||||||
return f.Called(jobID).Error(0)
|
return f.Called(jobID).Error(0)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *fakeWorker) ScheduledJobs(query *query.Parameter) ([]*job.Stats, int64, error) {
|
// fake manager
|
||||||
args := f.Called(query)
|
type fakeManager struct {
|
||||||
|
mock.Mock
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fm *fakeManager) GetJobs(q *query.Parameter) ([]*job.Stats, int64, error) {
|
||||||
|
args := fm.Called(q)
|
||||||
if args.Error(2) != nil {
|
if args.Error(2) != nil {
|
||||||
return nil, 0, args.Error(2)
|
return nil, 0, args.Error(2)
|
||||||
}
|
}
|
||||||
|
|
||||||
return args.Get(0).([]*job.Stats), int64(args.Int(1)), nil
|
return args.Get(0).([]*job.Stats), args.Get(1).(int64), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fm *fakeManager) GetPeriodicExecution(pID string, q *query.Parameter) ([]*job.Stats, int64, error) {
|
||||||
|
args := fm.Called(pID, q)
|
||||||
|
if args.Error(2) != nil {
|
||||||
|
return nil, 0, args.Error(2)
|
||||||
|
}
|
||||||
|
|
||||||
|
return args.Get(0).([]*job.Stats), args.Get(1).(int64), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fm *fakeManager) GetScheduledJobs(q *query.Parameter) ([]*job.Stats, int64, error) {
|
||||||
|
args := fm.Called(q)
|
||||||
|
if args.Error(2) != nil {
|
||||||
|
return nil, 0, args.Error(2)
|
||||||
|
}
|
||||||
|
|
||||||
|
return args.Get(0).([]*job.Stats), args.Get(1).(int64), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fm *fakeManager) GetJob(jobID string) (*job.Stats, error) {
|
||||||
|
args := fm.Called(jobID)
|
||||||
|
if args.Error(1) != nil {
|
||||||
|
return nil, args.Error(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
return args.Get(0).(*job.Stats), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fm *fakeManager) SaveJob(j *job.Stats) error {
|
||||||
|
args := fm.Called(j)
|
||||||
|
return args.Error(0)
|
||||||
}
|
}
|
||||||
|
@ -68,8 +68,11 @@ type Interface interface {
|
|||||||
// The total number is also returned.
|
// The total number is also returned.
|
||||||
GetPeriodicExecutions(periodicJobID string, query *query.Parameter) ([]*job.Stats, int64, error)
|
GetPeriodicExecutions(periodicJobID string, query *query.Parameter) ([]*job.Stats, int64, error)
|
||||||
|
|
||||||
// Get the scheduled jobs by page
|
// Get the jobs with queries.
|
||||||
// The page number in the query will be ignored, default 20 is used. This is the limitation of backend lib.
|
//
|
||||||
|
// For scheduled jobs, the page number in the query will be ignored, default 20 is used.
|
||||||
|
// This is the limitation of backend lib. The int64 is total number.
|
||||||
|
// For other cases, query the jobs with cursor, not standard pagination. The int64 is next cursor.
|
||||||
// The total number is also returned.
|
// The total number is also returned.
|
||||||
ScheduledJobs(query *query.Parameter) ([]*job.Stats, int64, error)
|
GetJobs(query *query.Parameter) ([]*job.Stats, int64, error)
|
||||||
}
|
}
|
||||||
|
@ -18,6 +18,7 @@ package errs
|
|||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"github.com/goharbor/harbor/src/jobservice/common/query"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -49,8 +50,8 @@ const (
|
|||||||
ResourceConflictsErrorCode
|
ResourceConflictsErrorCode
|
||||||
// BadRequestErrorCode is code for the error of bad request
|
// BadRequestErrorCode is code for the error of bad request
|
||||||
BadRequestErrorCode
|
BadRequestErrorCode
|
||||||
// GetScheduledJobsErrorCode is code for the error of getting scheduled jobs
|
// GetJobsErrorCode is code for the error of getting scheduled jobs
|
||||||
GetScheduledJobsErrorCode
|
GetJobsErrorCode
|
||||||
// GetPeriodicExecutionErrorCode is code for the error of getting periodic executions
|
// GetPeriodicExecutionErrorCode is code for the error of getting periodic executions
|
||||||
GetPeriodicExecutionErrorCode
|
GetPeriodicExecutionErrorCode
|
||||||
// StatusMismatchErrorCode is code for the error of mismatching status
|
// StatusMismatchErrorCode is code for the error of mismatching status
|
||||||
@ -137,9 +138,13 @@ func UnauthorizedError(err error) error {
|
|||||||
return New(UnAuthorizedErrorCode, "unauthorized", err.Error())
|
return New(UnAuthorizedErrorCode, "unauthorized", err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetScheduledJobsError is error for the case of getting scheduled jobs failed
|
// GetJobsError is error for the case of getting jobs failed
|
||||||
func GetScheduledJobsError(err error) error {
|
func GetJobsError(q *query.Parameter, err error) error {
|
||||||
return New(GetScheduledJobsErrorCode, "failed to get scheduled jobs", err.Error())
|
qStr := ""
|
||||||
|
if q != nil {
|
||||||
|
qStr = q.Extras.String()
|
||||||
|
}
|
||||||
|
return New(GetJobsErrorCode, fmt.Sprintf("failed to get scheduled jobs, q=%s", qStr), err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetPeriodicExecutionError is error for the case of getting periodic jobs failed
|
// GetPeriodicExecutionError is error for the case of getting periodic jobs failed
|
||||||
|
@ -17,7 +17,6 @@ package job
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"github.com/goharbor/harbor/src/jobservice/common/query"
|
|
||||||
"github.com/goharbor/harbor/src/jobservice/common/rds"
|
"github.com/goharbor/harbor/src/jobservice/common/rds"
|
||||||
"github.com/goharbor/harbor/src/jobservice/common/utils"
|
"github.com/goharbor/harbor/src/jobservice/common/utils"
|
||||||
"github.com/goharbor/harbor/src/jobservice/errs"
|
"github.com/goharbor/harbor/src/jobservice/errs"
|
||||||
@ -66,15 +65,6 @@ type Tracker interface {
|
|||||||
// error if update failed
|
// error if update failed
|
||||||
Update(fieldAndValues ...interface{}) error
|
Update(fieldAndValues ...interface{}) error
|
||||||
|
|
||||||
// Executions returns the executions of the job tracked by this tracker.
|
|
||||||
// Please pay attention, this only for periodic job.
|
|
||||||
//
|
|
||||||
// Returns:
|
|
||||||
// job execution IDs matched the query
|
|
||||||
// the total number
|
|
||||||
// error if any issues happened
|
|
||||||
Executions(q *query.Parameter) ([]string, int64, error)
|
|
||||||
|
|
||||||
// NumericID returns the numeric ID of periodic job.
|
// NumericID returns the numeric ID of periodic job.
|
||||||
// Please pay attention, this only for periodic job.
|
// Please pay attention, this only for periodic job.
|
||||||
NumericID() (int64, error)
|
NumericID() (int64, error)
|
||||||
@ -241,65 +231,6 @@ func (bt *basicTracker) CheckIn(message string) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Executions of the tracked job
|
|
||||||
func (bt *basicTracker) Executions(q *query.Parameter) ([]string, int64, error) {
|
|
||||||
if bt.jobStats.Info.JobKind != KindPeriodic {
|
|
||||||
return nil, 0, errors.New("only periodic job has executions")
|
|
||||||
}
|
|
||||||
|
|
||||||
conn := bt.pool.Get()
|
|
||||||
defer func() {
|
|
||||||
_ = conn.Close()
|
|
||||||
}()
|
|
||||||
|
|
||||||
key := rds.KeyUpstreamJobAndExecutions(bt.namespace, bt.jobID)
|
|
||||||
|
|
||||||
// Query executions by "non stopped"
|
|
||||||
if nonStoppedOnly, ok := q.Extras.Get(query.ExtraParamKeyNonStoppedOnly); ok {
|
|
||||||
if v, yes := nonStoppedOnly.(bool); yes && v {
|
|
||||||
return queryExecutions(conn, key, q)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Pagination
|
|
||||||
var pageNumber, pageSize uint = 1, query.DefaultPageSize
|
|
||||||
if q != nil {
|
|
||||||
if q.PageNumber > 0 {
|
|
||||||
pageNumber = q.PageNumber
|
|
||||||
}
|
|
||||||
if q.PageSize > 0 {
|
|
||||||
pageSize = q.PageSize
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get total first
|
|
||||||
total, err := redis.Int64(conn.Do("ZCARD", key))
|
|
||||||
if err != nil {
|
|
||||||
return nil, 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// No items
|
|
||||||
result := make([]string, 0)
|
|
||||||
if total == 0 || (int64)((pageNumber-1)*pageSize) >= total {
|
|
||||||
return result, total, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
min, max := (pageNumber-1)*pageSize, pageNumber*pageSize-1
|
|
||||||
args := []interface{}{key, min, max}
|
|
||||||
list, err := redis.Values(conn.Do("ZREVRANGE", args...))
|
|
||||||
if err != nil {
|
|
||||||
return nil, 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, item := range list {
|
|
||||||
if eID, ok := item.([]byte); ok {
|
|
||||||
result = append(result, string(eID))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return result, total, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Expire job stats
|
// Expire job stats
|
||||||
func (bt *basicTracker) Expire() error {
|
func (bt *basicTracker) Expire() error {
|
||||||
conn := bt.pool.Get()
|
conn := bt.pool.Get()
|
||||||
@ -709,40 +640,3 @@ func getStatus(conn redis.Conn, key string) (Status, error) {
|
|||||||
func setStatus(conn redis.Conn, key string, status Status) error {
|
func setStatus(conn redis.Conn, key string, status Status) error {
|
||||||
return rds.HmSet(conn, key, "status", status.String(), "update_time", time.Now().Unix())
|
return rds.HmSet(conn, key, "status", status.String(), "update_time", time.Now().Unix())
|
||||||
}
|
}
|
||||||
|
|
||||||
// queryExecutions queries periodic executions by status
|
|
||||||
func queryExecutions(conn redis.Conn, dataKey string, q *query.Parameter) ([]string, int64, error) {
|
|
||||||
total, err := redis.Int64(conn.Do("ZCOUNT", dataKey, 0, "+inf"))
|
|
||||||
if err != nil {
|
|
||||||
return nil, 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
var pageNumber, pageSize uint = 1, query.DefaultPageSize
|
|
||||||
if q.PageNumber > 0 {
|
|
||||||
pageNumber = q.PageNumber
|
|
||||||
}
|
|
||||||
if q.PageSize > 0 {
|
|
||||||
pageSize = q.PageSize
|
|
||||||
}
|
|
||||||
|
|
||||||
results := make([]string, 0)
|
|
||||||
if total == 0 || (int64)((pageNumber-1)*pageSize) >= total {
|
|
||||||
return results, total, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
offset := (pageNumber - 1) * pageSize
|
|
||||||
args := []interface{}{dataKey, "+inf", 0, "LIMIT", offset, pageSize}
|
|
||||||
|
|
||||||
eIDs, err := redis.Values(conn.Do("ZREVRANGEBYSCORE", args...))
|
|
||||||
if err != nil {
|
|
||||||
return nil, 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, eID := range eIDs {
|
|
||||||
if eIDBytes, ok := eID.([]byte); ok {
|
|
||||||
results = append(results, string(eIDBytes))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return results, total, nil
|
|
||||||
}
|
|
||||||
|
@ -19,8 +19,6 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/goharbor/harbor/src/jobservice/common/query"
|
|
||||||
|
|
||||||
"github.com/goharbor/harbor/src/jobservice/common/utils"
|
"github.com/goharbor/harbor/src/jobservice/common/utils"
|
||||||
"github.com/goharbor/harbor/src/jobservice/tests"
|
"github.com/goharbor/harbor/src/jobservice/tests"
|
||||||
"github.com/gomodule/redigo/redis"
|
"github.com/gomodule/redigo/redis"
|
||||||
@ -176,14 +174,6 @@ func (suite *TrackerTestSuite) TestPeriodicTracker() {
|
|||||||
require.NoError(suite.T(), err)
|
require.NoError(suite.T(), err)
|
||||||
assert.Equal(suite.T(), nID, id)
|
assert.Equal(suite.T(), nID, id)
|
||||||
|
|
||||||
_, total, err := t.Executions(&query.Parameter{
|
|
||||||
PageNumber: 1,
|
|
||||||
PageSize: 10,
|
|
||||||
Extras: make(query.ExtraParameters),
|
|
||||||
})
|
|
||||||
require.NoError(suite.T(), err)
|
|
||||||
assert.Equal(suite.T(), int64(1), total)
|
|
||||||
|
|
||||||
err = t2.PeriodicExecutionDone()
|
err = t2.PeriodicExecutionDone()
|
||||||
require.NoError(suite.T(), err)
|
require.NoError(suite.T(), err)
|
||||||
}
|
}
|
||||||
|
@ -16,17 +16,17 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/goharbor/harbor/src/common"
|
|
||||||
comcfg "github.com/goharbor/harbor/src/common/config"
|
|
||||||
"github.com/goharbor/harbor/src/jobservice/job"
|
|
||||||
"github.com/goharbor/harbor/src/jobservice/job/impl"
|
|
||||||
"github.com/pkg/errors"
|
|
||||||
"os"
|
"os"
|
||||||
|
|
||||||
|
"github.com/goharbor/harbor/src/common"
|
||||||
|
comcfg "github.com/goharbor/harbor/src/common/config"
|
||||||
"github.com/goharbor/harbor/src/jobservice/common/utils"
|
"github.com/goharbor/harbor/src/jobservice/common/utils"
|
||||||
"github.com/goharbor/harbor/src/jobservice/config"
|
"github.com/goharbor/harbor/src/jobservice/config"
|
||||||
|
"github.com/goharbor/harbor/src/jobservice/job"
|
||||||
|
"github.com/goharbor/harbor/src/jobservice/job/impl"
|
||||||
"github.com/goharbor/harbor/src/jobservice/logger"
|
"github.com/goharbor/harbor/src/jobservice/logger"
|
||||||
"github.com/goharbor/harbor/src/jobservice/runtime"
|
"github.com/goharbor/harbor/src/jobservice/runtime"
|
||||||
)
|
)
|
||||||
|
349
src/jobservice/mgt/manager.go
Normal file
349
src/jobservice/mgt/manager.go
Normal file
@ -0,0 +1,349 @@
|
|||||||
|
// Copyright Project Harbor Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package mgt
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"github.com/gocraft/work"
|
||||||
|
"github.com/goharbor/harbor/src/jobservice/common/query"
|
||||||
|
"github.com/goharbor/harbor/src/jobservice/common/rds"
|
||||||
|
"github.com/goharbor/harbor/src/jobservice/common/utils"
|
||||||
|
"github.com/goharbor/harbor/src/jobservice/errs"
|
||||||
|
"github.com/goharbor/harbor/src/jobservice/job"
|
||||||
|
"github.com/goharbor/harbor/src/jobservice/logger"
|
||||||
|
"github.com/goharbor/harbor/src/jobservice/period"
|
||||||
|
"github.com/gomodule/redigo/redis"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Manager defies the related operations to handle the management of job stats.
|
||||||
|
type Manager interface {
|
||||||
|
// Get the stats data of all kinds of jobs.
|
||||||
|
// Data returned by pagination.
|
||||||
|
//
|
||||||
|
// Arguments:
|
||||||
|
// q *query.Parameter : the query parameters
|
||||||
|
//
|
||||||
|
// Returns:
|
||||||
|
// The matched job stats list
|
||||||
|
// The total number of the jobs
|
||||||
|
// Non nil error if any issues meet
|
||||||
|
GetJobs(q *query.Parameter) ([]*job.Stats, int64, error)
|
||||||
|
|
||||||
|
// Get the executions of the specified periodic job by pagination
|
||||||
|
//
|
||||||
|
// Arguments:
|
||||||
|
// pID: ID of the periodic job
|
||||||
|
// q *query.Parameter: query parameters
|
||||||
|
//
|
||||||
|
// Returns:
|
||||||
|
// The matched job stats list,
|
||||||
|
// The total number of the executions,
|
||||||
|
// Non nil error if any issues meet.
|
||||||
|
GetPeriodicExecution(pID string, q *query.Parameter) ([]*job.Stats, int64, error)
|
||||||
|
|
||||||
|
// Get the scheduled jobs
|
||||||
|
//
|
||||||
|
// Arguments:
|
||||||
|
// q *query.Parameter: query parameters
|
||||||
|
//
|
||||||
|
// Returns:
|
||||||
|
// The matched job stats list,
|
||||||
|
// The total number of the executions,
|
||||||
|
// Non nil error if any issues meet.
|
||||||
|
GetScheduledJobs(q *query.Parameter) ([]*job.Stats, int64, error)
|
||||||
|
|
||||||
|
// Get the stats of the specified job
|
||||||
|
//
|
||||||
|
// Arguments:
|
||||||
|
// jobID string: ID of the job
|
||||||
|
//
|
||||||
|
// Returns:
|
||||||
|
// The job stats
|
||||||
|
// Non nil error if any issues meet
|
||||||
|
GetJob(jobID string) (*job.Stats, error)
|
||||||
|
|
||||||
|
// Save the job stats
|
||||||
|
//
|
||||||
|
// Arguments:
|
||||||
|
// job *job.Stats: the saving job stats
|
||||||
|
//
|
||||||
|
// Returns:
|
||||||
|
// Non nil error if any issues meet
|
||||||
|
SaveJob(job *job.Stats) error
|
||||||
|
}
|
||||||
|
|
||||||
|
// basicManager is the default implementation of @manager,
|
||||||
|
// based on redis.
|
||||||
|
type basicManager struct {
|
||||||
|
// system context
|
||||||
|
ctx context.Context
|
||||||
|
// db namespace
|
||||||
|
namespace string
|
||||||
|
// redis conn pool
|
||||||
|
pool *redis.Pool
|
||||||
|
// go work client
|
||||||
|
client *work.Client
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewManager news a basic manager
|
||||||
|
func NewManager(ctx context.Context, ns string, pool *redis.Pool) Manager {
|
||||||
|
return &basicManager{
|
||||||
|
ctx: ctx,
|
||||||
|
namespace: ns,
|
||||||
|
pool: pool,
|
||||||
|
client: work.NewClient(ns, pool),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetJobs is implementation of Manager.GetJobs
|
||||||
|
// Because of the hash set used to keep the job stats, we can not support a standard pagination.
|
||||||
|
// A cursor is used to fetch the jobs with several batches.
|
||||||
|
func (bm *basicManager) GetJobs(q *query.Parameter) ([]*job.Stats, int64, error) {
|
||||||
|
cursor, count := int64(0), query.DefaultPageSize
|
||||||
|
if q != nil {
|
||||||
|
if q.PageSize > 0 {
|
||||||
|
count = q.PageSize
|
||||||
|
}
|
||||||
|
|
||||||
|
if cur, ok := q.Extras.Get(query.ExtraParamKeyCursor); ok {
|
||||||
|
cursor = cur.(int64)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pattern := rds.KeyJobStats(bm.namespace, "*")
|
||||||
|
args := []interface{}{cursor, "MATCH", pattern, "COUNT", count}
|
||||||
|
|
||||||
|
conn := bm.pool.Get()
|
||||||
|
defer func() {
|
||||||
|
_ = conn.Close()
|
||||||
|
}()
|
||||||
|
|
||||||
|
values, err := redis.Values(conn.Do("SCAN", args...))
|
||||||
|
if err != nil {
|
||||||
|
return nil, 0, err
|
||||||
|
}
|
||||||
|
if len(values) != 2 {
|
||||||
|
return nil, 0, errors.New("malform scan results")
|
||||||
|
}
|
||||||
|
|
||||||
|
nextCur, err := strconv.ParseUint(string(values[0].([]byte)), 10, 8)
|
||||||
|
if err != nil {
|
||||||
|
return nil, 0, err
|
||||||
|
}
|
||||||
|
list := values[1].([]interface{})
|
||||||
|
|
||||||
|
results := make([]*job.Stats, 0)
|
||||||
|
for _, v := range list {
|
||||||
|
if bytes, ok := v.([]byte); ok {
|
||||||
|
statsKey := string(bytes)
|
||||||
|
if i := strings.LastIndex(statsKey, ":"); i != -1 {
|
||||||
|
jID := statsKey[i+1:]
|
||||||
|
t := job.NewBasicTrackerWithID(bm.ctx, jID, bm.namespace, bm.pool, nil)
|
||||||
|
if err := t.Load(); err != nil {
|
||||||
|
logger.Errorf("retrieve stats data of job %s error: %s", jID, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
results = append(results, t.Job())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return results, int64(nextCur), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetPeriodicExecution is implementation of Manager.GetPeriodicExecution
|
||||||
|
func (bm *basicManager) GetPeriodicExecution(pID string, q *query.Parameter) (results []*job.Stats, total int64, err error) {
|
||||||
|
if utils.IsEmptyStr(pID) {
|
||||||
|
return nil, 0, errors.New("nil periodic job ID")
|
||||||
|
}
|
||||||
|
|
||||||
|
tracker := job.NewBasicTrackerWithID(bm.ctx, pID, bm.namespace, bm.pool, nil)
|
||||||
|
err = tracker.Load()
|
||||||
|
if err != nil {
|
||||||
|
return nil, 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if tracker.Job().Info.JobKind != job.KindPeriodic {
|
||||||
|
return nil, 0, errors.Errorf("only periodic job has executions: %s kind is received", tracker.Job().Info.JobKind)
|
||||||
|
}
|
||||||
|
|
||||||
|
conn := bm.pool.Get()
|
||||||
|
defer func() {
|
||||||
|
_ = conn.Close()
|
||||||
|
}()
|
||||||
|
|
||||||
|
key := rds.KeyUpstreamJobAndExecutions(bm.namespace, pID)
|
||||||
|
|
||||||
|
executionIDs := make([]string, 0)
|
||||||
|
// Query executions by "non stopped"
|
||||||
|
if nonStoppedOnly, ok := q.Extras.Get(query.ExtraParamKeyNonStoppedOnly); ok {
|
||||||
|
if v, yes := nonStoppedOnly.(bool); yes && v {
|
||||||
|
executionIDs, total, err = queryExecutions(conn, key, q)
|
||||||
|
if err != nil {
|
||||||
|
return nil, 0, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Query all
|
||||||
|
// Pagination
|
||||||
|
var pageNumber, pageSize uint = 1, query.DefaultPageSize
|
||||||
|
if q != nil {
|
||||||
|
if q.PageNumber > 0 {
|
||||||
|
pageNumber = q.PageNumber
|
||||||
|
}
|
||||||
|
if q.PageSize > 0 {
|
||||||
|
pageSize = q.PageSize
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get total first
|
||||||
|
total, err = redis.Int64(conn.Do("ZCARD", key))
|
||||||
|
if err != nil {
|
||||||
|
return nil, 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// No items
|
||||||
|
if total == 0 || (int64)((pageNumber-1)*pageSize) >= total {
|
||||||
|
return results, total, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
min, max := (pageNumber-1)*pageSize, pageNumber*pageSize-1
|
||||||
|
args := []interface{}{key, min, max}
|
||||||
|
list, err := redis.Values(conn.Do("ZREVRANGE", args...))
|
||||||
|
if err != nil {
|
||||||
|
return nil, 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, item := range list {
|
||||||
|
if eID, ok := item.([]byte); ok {
|
||||||
|
executionIDs = append(executionIDs, string(eID))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, eID := range executionIDs {
|
||||||
|
t := job.NewBasicTrackerWithID(bm.ctx, eID, bm.namespace, bm.pool, nil)
|
||||||
|
if er := t.Load(); er != nil {
|
||||||
|
logger.Errorf("track job %s error: %s", eID, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
results = append(results, t.Job())
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetScheduledJobs is implementation of Manager.GetScheduledJobs
|
||||||
|
func (bm *basicManager) GetScheduledJobs(q *query.Parameter) ([]*job.Stats, int64, error) {
|
||||||
|
// PageSize is not supported here
|
||||||
|
var page uint = 1
|
||||||
|
if q != nil && q.PageNumber > 1 {
|
||||||
|
page = q.PageNumber
|
||||||
|
}
|
||||||
|
|
||||||
|
sJobs, total, err := bm.client.ScheduledJobs(page)
|
||||||
|
if err != nil {
|
||||||
|
return nil, 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
res := make([]*job.Stats, 0)
|
||||||
|
for _, sJob := range sJobs {
|
||||||
|
jID := sJob.ID
|
||||||
|
if len(sJob.Args) > 0 {
|
||||||
|
if _, ok := sJob.Args[period.PeriodicExecutionMark]; ok {
|
||||||
|
// Periodic scheduled job
|
||||||
|
jID = fmt.Sprintf("%s@%d", sJob.ID, sJob.RunAt)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
t := job.NewBasicTrackerWithID(bm.ctx, jID, bm.namespace, bm.pool, nil)
|
||||||
|
err = t.Load()
|
||||||
|
if err != nil {
|
||||||
|
// Just log it
|
||||||
|
logger.Errorf("mgt.basicManager: query scheduled jobs error: %s", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
res = append(res, t.Job())
|
||||||
|
}
|
||||||
|
|
||||||
|
return res, total, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetJob is implementation of Manager.GetJob
|
||||||
|
func (bm *basicManager) GetJob(jobID string) (*job.Stats, error) {
|
||||||
|
if utils.IsEmptyStr(jobID) {
|
||||||
|
return nil, errs.BadRequestError("empty job ID")
|
||||||
|
}
|
||||||
|
|
||||||
|
t := job.NewBasicTrackerWithID(bm.ctx, jobID, bm.namespace, bm.pool, nil)
|
||||||
|
if err := t.Load(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return t.Job(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// SaveJob is implementation of Manager.SaveJob
|
||||||
|
func (bm *basicManager) SaveJob(j *job.Stats) error {
|
||||||
|
if j == nil {
|
||||||
|
return errs.BadRequestError("nil saving job stats")
|
||||||
|
}
|
||||||
|
|
||||||
|
t := job.NewBasicTrackerWithStats(bm.ctx, j, bm.namespace, bm.pool, nil)
|
||||||
|
return t.Save()
|
||||||
|
}
|
||||||
|
|
||||||
|
// queryExecutions queries periodic executions by status
|
||||||
|
func queryExecutions(conn redis.Conn, dataKey string, q *query.Parameter) ([]string, int64, error) {
|
||||||
|
total, err := redis.Int64(conn.Do("ZCOUNT", dataKey, 0, "+inf"))
|
||||||
|
if err != nil {
|
||||||
|
return nil, 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var pageNumber, pageSize uint = 1, query.DefaultPageSize
|
||||||
|
if q.PageNumber > 0 {
|
||||||
|
pageNumber = q.PageNumber
|
||||||
|
}
|
||||||
|
if q.PageSize > 0 {
|
||||||
|
pageSize = q.PageSize
|
||||||
|
}
|
||||||
|
|
||||||
|
results := make([]string, 0)
|
||||||
|
if total == 0 || (int64)((pageNumber-1)*pageSize) >= total {
|
||||||
|
return results, total, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
offset := (pageNumber - 1) * pageSize
|
||||||
|
args := []interface{}{dataKey, "+inf", 0, "LIMIT", offset, pageSize}
|
||||||
|
|
||||||
|
eIDs, err := redis.Values(conn.Do("ZREVRANGEBYSCORE", args...))
|
||||||
|
if err != nil {
|
||||||
|
return nil, 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, eID := range eIDs {
|
||||||
|
if eIDBytes, ok := eID.([]byte); ok {
|
||||||
|
results = append(results, string(eIDBytes))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return results, total, nil
|
||||||
|
}
|
184
src/jobservice/mgt/manager_test.go
Normal file
184
src/jobservice/mgt/manager_test.go
Normal file
@ -0,0 +1,184 @@
|
|||||||
|
// Copyright Project Harbor Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package mgt
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"github.com/gocraft/work"
|
||||||
|
"github.com/goharbor/harbor/src/jobservice/common/query"
|
||||||
|
"github.com/goharbor/harbor/src/jobservice/job"
|
||||||
|
"github.com/goharbor/harbor/src/jobservice/tests"
|
||||||
|
"github.com/gomodule/redigo/redis"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
"github.com/stretchr/testify/suite"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// BasicManagerTestSuite tests the function of basic manager
|
||||||
|
type BasicManagerTestSuite struct {
|
||||||
|
suite.Suite
|
||||||
|
|
||||||
|
namespace string
|
||||||
|
pool *redis.Pool
|
||||||
|
|
||||||
|
manager Manager
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetupSuite prepares the test suite
|
||||||
|
func (suite *BasicManagerTestSuite) SetupSuite() {
|
||||||
|
suite.namespace = tests.GiveMeTestNamespace()
|
||||||
|
suite.pool = tests.GiveMeRedisPool()
|
||||||
|
suite.manager = NewManager(context.TODO(), suite.namespace, suite.pool)
|
||||||
|
|
||||||
|
// Mock data
|
||||||
|
periodicJob := &job.Stats{
|
||||||
|
Info: &job.StatsInfo{
|
||||||
|
JobID: "1000",
|
||||||
|
JobName: job.SampleJob,
|
||||||
|
JobKind: job.KindPeriodic,
|
||||||
|
Status: job.ScheduledStatus.String(),
|
||||||
|
IsUnique: false,
|
||||||
|
CronSpec: "* */10 * * * *",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
t := job.NewBasicTrackerWithStats(context.TODO(), periodicJob, suite.namespace, suite.pool, nil)
|
||||||
|
err := t.Save()
|
||||||
|
require.NoError(suite.T(), err)
|
||||||
|
|
||||||
|
execution := &job.Stats{
|
||||||
|
Info: &job.StatsInfo{
|
||||||
|
JobID: "1001",
|
||||||
|
JobKind: job.KindScheduled,
|
||||||
|
JobName: job.SampleJob,
|
||||||
|
Status: job.PendingStatus.String(),
|
||||||
|
IsUnique: false,
|
||||||
|
RunAt: time.Now().Add(5 * time.Minute).Unix(),
|
||||||
|
UpstreamJobID: "1000",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
t = job.NewBasicTrackerWithStats(context.TODO(), execution, suite.namespace, suite.pool, nil)
|
||||||
|
err = t.Save()
|
||||||
|
require.NoError(suite.T(), err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TearDownSuite clears the test suite
|
||||||
|
func (suite *BasicManagerTestSuite) TearDownSuite() {
|
||||||
|
conn := suite.pool.Get()
|
||||||
|
defer func() {
|
||||||
|
_ = conn.Close()
|
||||||
|
}()
|
||||||
|
|
||||||
|
_ = tests.ClearAll(suite.namespace, conn)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestBasicManagerTestSuite is entry of go test
|
||||||
|
func TestBasicManagerTestSuite(t *testing.T) {
|
||||||
|
suite.Run(t, new(BasicManagerTestSuite))
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestBasicManagerGetJobs tests get jobs
|
||||||
|
func (suite *BasicManagerTestSuite) TestBasicManagerGetJobs() {
|
||||||
|
jobs, _, err := suite.manager.GetJobs(&query.Parameter{
|
||||||
|
PageSize: 25,
|
||||||
|
PageNumber: 1,
|
||||||
|
})
|
||||||
|
require.NoError(suite.T(), err)
|
||||||
|
assert.Condition(suite.T(), func() bool {
|
||||||
|
return len(jobs) > 0
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestGetPeriodicExecutions tests get periodic executions
|
||||||
|
func (suite *BasicManagerTestSuite) TestGetPeriodicExecutions() {
|
||||||
|
extras := make(query.ExtraParameters)
|
||||||
|
extras.Set(query.ExtraParamKeyNonStoppedOnly, true)
|
||||||
|
|
||||||
|
jobs, total, err := suite.manager.GetPeriodicExecution("1000", &query.Parameter{
|
||||||
|
PageSize: 20,
|
||||||
|
PageNumber: 1,
|
||||||
|
Extras: extras,
|
||||||
|
})
|
||||||
|
require.NoError(suite.T(), err)
|
||||||
|
assert.Equal(suite.T(), int64(1), total)
|
||||||
|
assert.Equal(suite.T(), int64(1), int64(len(jobs)))
|
||||||
|
|
||||||
|
t := job.NewBasicTrackerWithID(context.TODO(), "1001", suite.namespace, suite.pool, nil)
|
||||||
|
err = t.Load()
|
||||||
|
require.NoError(suite.T(), err)
|
||||||
|
err = t.PeriodicExecutionDone()
|
||||||
|
require.NoError(suite.T(), err)
|
||||||
|
|
||||||
|
jobs, total, err = suite.manager.GetPeriodicExecution("1000", &query.Parameter{
|
||||||
|
PageSize: 20,
|
||||||
|
PageNumber: 1,
|
||||||
|
})
|
||||||
|
require.NoError(suite.T(), err)
|
||||||
|
assert.Equal(suite.T(), int64(1), total)
|
||||||
|
assert.Equal(suite.T(), int64(1), int64(len(jobs)))
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestGetScheduledJobs tests get scheduled jobs
|
||||||
|
func (suite *BasicManagerTestSuite) TestGetScheduledJobs() {
|
||||||
|
enqueuer := work.NewEnqueuer(suite.namespace, suite.pool)
|
||||||
|
scheduledJob, err := enqueuer.EnqueueIn(job.SampleJob, 1000, make(map[string]interface{}))
|
||||||
|
require.NoError(suite.T(), err)
|
||||||
|
stats := &job.Stats{
|
||||||
|
Info: &job.StatsInfo{
|
||||||
|
JobID: scheduledJob.ID,
|
||||||
|
JobName: job.SampleJob,
|
||||||
|
JobKind: job.KindScheduled,
|
||||||
|
Status: job.ScheduledStatus.String(),
|
||||||
|
RunAt: scheduledJob.RunAt,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
t := job.NewBasicTrackerWithStats(context.TODO(), stats, suite.namespace, suite.pool, nil)
|
||||||
|
err = t.Save()
|
||||||
|
require.NoError(suite.T(), err)
|
||||||
|
|
||||||
|
list, total, err := suite.manager.GetScheduledJobs(&query.Parameter{
|
||||||
|
PageNumber: 1,
|
||||||
|
PageSize: 10,
|
||||||
|
})
|
||||||
|
require.NoError(suite.T(), err)
|
||||||
|
assert.Equal(suite.T(), int64(1), total)
|
||||||
|
assert.Equal(suite.T(), int64(1), int64(len(list)))
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestGetJob tests get job
|
||||||
|
func (suite *BasicManagerTestSuite) TestGetJob() {
|
||||||
|
j, err := suite.manager.GetJob("1001")
|
||||||
|
require.NoError(suite.T(), err)
|
||||||
|
assert.Equal(suite.T(), j.Info.JobID, "1001")
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestSaveJob tests saving job
|
||||||
|
func (suite *BasicManagerTestSuite) TestSaveJob() {
|
||||||
|
newJob := &job.Stats{
|
||||||
|
Info: &job.StatsInfo{
|
||||||
|
JobID: "1002",
|
||||||
|
JobKind: job.KindPeriodic,
|
||||||
|
JobName: job.SampleJob,
|
||||||
|
Status: job.PendingStatus.String(),
|
||||||
|
IsUnique: false,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
err := suite.manager.SaveJob(newJob)
|
||||||
|
require.NoError(suite.T(), err)
|
||||||
|
}
|
@ -17,6 +17,7 @@ package runtime
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"github.com/goharbor/harbor/src/jobservice/mgt"
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
"sync"
|
"sync"
|
||||||
@ -86,7 +87,7 @@ func (bs *Bootstrap) LoadAndRun(ctx context.Context, cancel context.CancelFunc)
|
|||||||
|
|
||||||
var (
|
var (
|
||||||
backendWorker worker.Interface
|
backendWorker worker.Interface
|
||||||
lcmCtl lcm.Controller
|
manager mgt.Manager
|
||||||
)
|
)
|
||||||
if cfg.PoolConfig.Backend == config.JobServicePoolBackendRedis {
|
if cfg.PoolConfig.Backend == config.JobServicePoolBackendRedis {
|
||||||
// Number of workers
|
// Number of workers
|
||||||
@ -95,6 +96,9 @@ func (bs *Bootstrap) LoadAndRun(ctx context.Context, cancel context.CancelFunc)
|
|||||||
namespace := fmt.Sprintf("{%s}", cfg.PoolConfig.RedisPoolCfg.Namespace)
|
namespace := fmt.Sprintf("{%s}", cfg.PoolConfig.RedisPoolCfg.Namespace)
|
||||||
// Get redis connection pool
|
// Get redis connection pool
|
||||||
redisPool := bs.getRedisPool(cfg.PoolConfig.RedisPoolCfg.RedisURL)
|
redisPool := bs.getRedisPool(cfg.PoolConfig.RedisPoolCfg.RedisURL)
|
||||||
|
|
||||||
|
// Create stats manager
|
||||||
|
manager = mgt.NewManager(ctx, namespace, redisPool)
|
||||||
// Create hook agent, it's a singleton object
|
// Create hook agent, it's a singleton object
|
||||||
hookAgent := hook.NewAgent(rootContext, namespace, redisPool)
|
hookAgent := hook.NewAgent(rootContext, namespace, redisPool)
|
||||||
hookCallback := func(URL string, change *job.StatusChange) error {
|
hookCallback := func(URL string, change *job.StatusChange) error {
|
||||||
@ -114,7 +118,7 @@ func (bs *Bootstrap) LoadAndRun(ctx context.Context, cancel context.CancelFunc)
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Create job life cycle management controller
|
// Create job life cycle management controller
|
||||||
lcmCtl = lcm.NewController(rootContext, namespace, redisPool, hookCallback)
|
lcmCtl := lcm.NewController(rootContext, namespace, redisPool, hookCallback)
|
||||||
|
|
||||||
// Start the backend worker
|
// Start the backend worker
|
||||||
backendWorker, err = bs.loadAndRunRedisWorkerPool(
|
backendWorker, err = bs.loadAndRunRedisWorkerPool(
|
||||||
@ -145,7 +149,7 @@ func (bs *Bootstrap) LoadAndRun(ctx context.Context, cancel context.CancelFunc)
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Initialize controller
|
// Initialize controller
|
||||||
ctl := core.NewController(backendWorker, lcmCtl)
|
ctl := core.NewController(backendWorker, manager)
|
||||||
// Start the API server
|
// Start the API server
|
||||||
apiServer := bs.createAPIServer(ctx, cfg, ctl)
|
apiServer := bs.createAPIServer(ctx, cfg, ctl)
|
||||||
|
|
||||||
|
@ -20,7 +20,6 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/gocraft/work"
|
"github.com/gocraft/work"
|
||||||
"github.com/goharbor/harbor/src/jobservice/common/query"
|
|
||||||
"github.com/goharbor/harbor/src/jobservice/common/utils"
|
"github.com/goharbor/harbor/src/jobservice/common/utils"
|
||||||
"github.com/goharbor/harbor/src/jobservice/env"
|
"github.com/goharbor/harbor/src/jobservice/env"
|
||||||
"github.com/goharbor/harbor/src/jobservice/job"
|
"github.com/goharbor/harbor/src/jobservice/job"
|
||||||
@ -362,40 +361,6 @@ func (w *basicWorker) ValidateJobParameters(jobType interface{}, params job.Para
|
|||||||
return theJ.Validate(params)
|
return theJ.Validate(params)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ScheduledJobs returns the scheduled jobs by page
|
|
||||||
func (w *basicWorker) ScheduledJobs(query *query.Parameter) ([]*job.Stats, int64, error) {
|
|
||||||
var page uint = 1
|
|
||||||
if query != nil && query.PageNumber > 1 {
|
|
||||||
page = query.PageNumber
|
|
||||||
}
|
|
||||||
|
|
||||||
sJobs, total, err := w.client.ScheduledJobs(page)
|
|
||||||
if err != nil {
|
|
||||||
return nil, 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
res := make([]*job.Stats, 0)
|
|
||||||
for _, sJob := range sJobs {
|
|
||||||
jID := sJob.ID
|
|
||||||
if len(sJob.Args) > 0 {
|
|
||||||
if _, ok := sJob.Args[period.PeriodicExecutionMark]; ok {
|
|
||||||
// Periodic scheduled job
|
|
||||||
jID = fmt.Sprintf("%s@%d", sJob.ID, sJob.RunAt)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
t, err := w.ctl.Track(jID)
|
|
||||||
if err != nil {
|
|
||||||
// Just log it
|
|
||||||
logger.Errorf("cworker: query scheduled jobs error: %s", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
res = append(res, t.Job())
|
|
||||||
}
|
|
||||||
|
|
||||||
return res, total, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// RegisterJob is used to register the job to the worker.
|
// RegisterJob is used to register the job to the worker.
|
||||||
// j is the type of job
|
// j is the type of job
|
||||||
func (w *basicWorker) registerJob(name string, j interface{}) (err error) {
|
func (w *basicWorker) registerJob(name string, j interface{}) (err error) {
|
||||||
|
@ -156,6 +156,9 @@ func (suite *CWorkerTestSuite) TestEnqueuePeriodicJob() {
|
|||||||
params["name"] = "testing:v1"
|
params["name"] = "testing:v1"
|
||||||
|
|
||||||
m := time.Now().Minute()
|
m := time.Now().Minute()
|
||||||
|
if m+2 >= 60 {
|
||||||
|
m = m - 2
|
||||||
|
}
|
||||||
_, err := suite.cWorker.PeriodicallyEnqueue(
|
_, err := suite.cWorker.PeriodicallyEnqueue(
|
||||||
"fake_job",
|
"fake_job",
|
||||||
params,
|
params,
|
||||||
@ -204,16 +207,6 @@ func (suite *CWorkerTestSuite) TestStopJob() {
|
|||||||
require.NoError(suite.T(), err, "stop job: nil error expected but got %s", err)
|
require.NoError(suite.T(), err, "stop job: nil error expected but got %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestScheduledJobs tests get scheduled job
|
|
||||||
func (suite *CWorkerTestSuite) TestScheduledJobs() {
|
|
||||||
params := make(map[string]interface{})
|
|
||||||
params["name"] = "testing:v1"
|
|
||||||
|
|
||||||
_, total, err := suite.cWorker.ScheduledJobs(nil)
|
|
||||||
require.NoError(suite.T(), err, "get scheduled job: nil error expected but got %s", err)
|
|
||||||
assert.EqualValues(suite.T(), int64(2), total, "expect 1 item but got 0")
|
|
||||||
}
|
|
||||||
|
|
||||||
type fakeJob struct{}
|
type fakeJob struct{}
|
||||||
|
|
||||||
func (j *fakeJob) MaxFails() uint {
|
func (j *fakeJob) MaxFails() uint {
|
||||||
|
@ -15,7 +15,6 @@
|
|||||||
package worker
|
package worker
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/goharbor/harbor/src/jobservice/common/query"
|
|
||||||
"github.com/goharbor/harbor/src/jobservice/job"
|
"github.com/goharbor/harbor/src/jobservice/job"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -112,16 +111,4 @@ type Interface interface {
|
|||||||
// Return:
|
// Return:
|
||||||
// error : error returned if meet any problems
|
// error : error returned if meet any problems
|
||||||
RetryJob(jobID string) error
|
RetryJob(jobID string) error
|
||||||
|
|
||||||
// Get the scheduled jobs by page
|
|
||||||
// The page number in the query will be ignored, default 20 is used. This is the limitation of backend lib.
|
|
||||||
// The total number is also returned.
|
|
||||||
//
|
|
||||||
// query *query.Parameter : query parameters
|
|
||||||
//
|
|
||||||
// Return:
|
|
||||||
// []*job.Stats : list of scheduled jobs
|
|
||||||
// int : the total number of scheduled jobs
|
|
||||||
// error : non nil error if meet any issues
|
|
||||||
ScheduledJobs(query *query.Parameter) ([]*job.Stats, int64, error)
|
|
||||||
}
|
}
|
||||||
|
@ -17,6 +17,8 @@ ${SERVER_API_ENDPOINT} ${SERVER_URL}/api
|
|||||||
&{SERVER_CONFIG} endpoint=${SERVER_API_ENDPOINT} verify_ssl=False
|
&{SERVER_CONFIG} endpoint=${SERVER_API_ENDPOINT} verify_ssl=False
|
||||||
|
|
||||||
*** Test Cases ***
|
*** Test Cases ***
|
||||||
|
Test Case - Garbage Collection
|
||||||
|
Harbor API Test ./tests/apitests/python/test_garbage_collection.py
|
||||||
Test Case - Add Private Project Member and Check User Can See It
|
Test Case - Add Private Project Member and Check User Can See It
|
||||||
Harbor API Test ./tests/apitests/python/test_add_member_to_private_project.py
|
Harbor API Test ./tests/apitests/python/test_add_member_to_private_project.py
|
||||||
Test Case - Delete a Repository of a Certain Project Created by Normal User
|
Test Case - Delete a Repository of a Certain Project Created by Normal User
|
||||||
@ -33,8 +35,6 @@ Test Case - Manage Project Member
|
|||||||
Harbor API Test ./tests/apitests/python/test_manage_project_member.py
|
Harbor API Test ./tests/apitests/python/test_manage_project_member.py
|
||||||
Test Case - Project Level Policy Content Trust
|
Test Case - Project Level Policy Content Trust
|
||||||
Harbor API Test ./tests/apitests/python/test_project_level_policy_content_trust.py
|
Harbor API Test ./tests/apitests/python/test_project_level_policy_content_trust.py
|
||||||
Test Case - Garbage Collection
|
|
||||||
Harbor API Test ./tests/apitests/python/test_garbage_collection.py
|
|
||||||
Test Case - User View Logs
|
Test Case - User View Logs
|
||||||
Harbor API Test ./tests/apitests/python/test_user_view_logs.py
|
Harbor API Test ./tests/apitests/python/test_user_view_logs.py
|
||||||
Test Case - Scan All Images
|
Test Case - Scan All Images
|
||||||
|
Loading…
Reference in New Issue
Block a user