Merge pull request #6074 from goharbor/enhance_job_service

Enhance job service to support multiple executions recording
This commit is contained in:
Steven Zou 2018-10-23 09:50:05 +08:00 committed by GitHub
commit 0c901a475f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
71 changed files with 2013 additions and 276 deletions

View File

@ -1,4 +1,16 @@
// Copyright Project Harbor Authors. All rights reserved. // Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package api package api

View File

@ -1,4 +1,16 @@
// Copyright Project Harbor Authors. All rights reserved. // Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package api package api
@ -14,6 +26,7 @@ import (
"github.com/goharbor/harbor/src/jobservice/core" "github.com/goharbor/harbor/src/jobservice/core"
"github.com/goharbor/harbor/src/jobservice/errs" "github.com/goharbor/harbor/src/jobservice/errs"
"github.com/goharbor/harbor/src/jobservice/logger"
"github.com/goharbor/harbor/src/jobservice/models" "github.com/goharbor/harbor/src/jobservice/models"
"github.com/goharbor/harbor/src/jobservice/opm" "github.com/goharbor/harbor/src/jobservice/opm"
) )
@ -50,42 +63,36 @@ func NewDefaultHandler(ctl core.Interface) *DefaultHandler {
// HandleLaunchJobReq is implementation of method defined in interface 'Handler' // HandleLaunchJobReq is implementation of method defined in interface 'Handler'
func (dh *DefaultHandler) HandleLaunchJobReq(w http.ResponseWriter, req *http.Request) { func (dh *DefaultHandler) HandleLaunchJobReq(w http.ResponseWriter, req *http.Request) {
if !dh.preCheck(w) { if !dh.preCheck(w, req) {
return return
} }
data, err := ioutil.ReadAll(req.Body) data, err := ioutil.ReadAll(req.Body)
if err != nil { if err != nil {
dh.handleError(w, http.StatusInternalServerError, errs.ReadRequestBodyError(err)) dh.handleError(w, req, http.StatusInternalServerError, errs.ReadRequestBodyError(err))
return return
} }
// unmarshal data // unmarshal data
jobReq := models.JobRequest{} jobReq := models.JobRequest{}
if err = json.Unmarshal(data, &jobReq); err != nil { if err = json.Unmarshal(data, &jobReq); err != nil {
dh.handleError(w, http.StatusInternalServerError, errs.HandleJSONDataError(err)) dh.handleError(w, req, http.StatusInternalServerError, errs.HandleJSONDataError(err))
return return
} }
// Pass request to the controller for the follow-up. // Pass request to the controller for the follow-up.
jobStats, err := dh.controller.LaunchJob(jobReq) jobStats, err := dh.controller.LaunchJob(jobReq)
if err != nil { if err != nil {
dh.handleError(w, http.StatusInternalServerError, errs.LaunchJobError(err)) dh.handleError(w, req, http.StatusInternalServerError, errs.LaunchJobError(err))
return return
} }
data, ok := dh.handleJSONData(w, jobStats) dh.handleJSONData(w, req, http.StatusAccepted, jobStats)
if !ok {
return
}
w.WriteHeader(http.StatusAccepted)
w.Write(data)
} }
// HandleGetJobReq is implementation of method defined in interface 'Handler' // HandleGetJobReq is implementation of method defined in interface 'Handler'
func (dh *DefaultHandler) HandleGetJobReq(w http.ResponseWriter, req *http.Request) { func (dh *DefaultHandler) HandleGetJobReq(w http.ResponseWriter, req *http.Request) {
if !dh.preCheck(w) { if !dh.preCheck(w, req) {
return return
} }
@ -100,22 +107,16 @@ func (dh *DefaultHandler) HandleGetJobReq(w http.ResponseWriter, req *http.Reque
code = http.StatusNotFound code = http.StatusNotFound
backErr = err backErr = err
} }
dh.handleError(w, code, backErr) dh.handleError(w, req, code, backErr)
return return
} }
data, ok := dh.handleJSONData(w, jobStats) dh.handleJSONData(w, req, http.StatusOK, jobStats)
if !ok {
return
}
w.WriteHeader(http.StatusOK)
w.Write(data)
} }
// HandleJobActionReq is implementation of method defined in interface 'Handler' // HandleJobActionReq is implementation of method defined in interface 'Handler'
func (dh *DefaultHandler) HandleJobActionReq(w http.ResponseWriter, req *http.Request) { func (dh *DefaultHandler) HandleJobActionReq(w http.ResponseWriter, req *http.Request) {
if !dh.preCheck(w) { if !dh.preCheck(w, req) {
return return
} }
@ -124,14 +125,14 @@ func (dh *DefaultHandler) HandleJobActionReq(w http.ResponseWriter, req *http.Re
data, err := ioutil.ReadAll(req.Body) data, err := ioutil.ReadAll(req.Body)
if err != nil { if err != nil {
dh.handleError(w, http.StatusInternalServerError, errs.ReadRequestBodyError(err)) dh.handleError(w, req, http.StatusInternalServerError, errs.ReadRequestBodyError(err))
return return
} }
// unmarshal data // unmarshal data
jobActionReq := models.JobActionRequest{} jobActionReq := models.JobActionRequest{}
if err = json.Unmarshal(data, &jobActionReq); err != nil { if err = json.Unmarshal(data, &jobActionReq); err != nil {
dh.handleError(w, http.StatusInternalServerError, errs.HandleJSONDataError(err)) dh.handleError(w, req, http.StatusInternalServerError, errs.HandleJSONDataError(err))
return return
} }
@ -144,7 +145,7 @@ func (dh *DefaultHandler) HandleJobActionReq(w http.ResponseWriter, req *http.Re
code = http.StatusNotFound code = http.StatusNotFound
backErr = err backErr = err
} }
dh.handleError(w, code, backErr) dh.handleError(w, req, code, backErr)
return return
} }
case opm.CtlCommandCancel: case opm.CtlCommandCancel:
@ -155,7 +156,7 @@ func (dh *DefaultHandler) HandleJobActionReq(w http.ResponseWriter, req *http.Re
code = http.StatusNotFound code = http.StatusNotFound
backErr = err backErr = err
} }
dh.handleError(w, code, backErr) dh.handleError(w, req, code, backErr)
return return
} }
case opm.CtlCommandRetry: case opm.CtlCommandRetry:
@ -166,41 +167,37 @@ func (dh *DefaultHandler) HandleJobActionReq(w http.ResponseWriter, req *http.Re
code = http.StatusNotFound code = http.StatusNotFound
backErr = err backErr = err
} }
dh.handleError(w, code, backErr) dh.handleError(w, req, code, backErr)
return return
} }
default: default:
dh.handleError(w, http.StatusNotImplemented, errs.UnknownActionNameError(fmt.Errorf("%s", jobID))) dh.handleError(w, req, http.StatusNotImplemented, errs.UnknownActionNameError(fmt.Errorf("%s", jobID)))
return return
} }
dh.log(req, http.StatusNoContent, string(data))
w.WriteHeader(http.StatusNoContent) // only header, no content returned w.WriteHeader(http.StatusNoContent) // only header, no content returned
} }
// HandleCheckStatusReq is implementation of method defined in interface 'Handler' // HandleCheckStatusReq is implementation of method defined in interface 'Handler'
func (dh *DefaultHandler) HandleCheckStatusReq(w http.ResponseWriter, req *http.Request) { func (dh *DefaultHandler) HandleCheckStatusReq(w http.ResponseWriter, req *http.Request) {
if !dh.preCheck(w) { if !dh.preCheck(w, req) {
return return
} }
stats, err := dh.controller.CheckStatus() stats, err := dh.controller.CheckStatus()
if err != nil { if err != nil {
dh.handleError(w, http.StatusInternalServerError, errs.CheckStatsError(err)) dh.handleError(w, req, http.StatusInternalServerError, errs.CheckStatsError(err))
return return
} }
data, ok := dh.handleJSONData(w, stats) dh.handleJSONData(w, req, http.StatusOK, stats)
if !ok {
return
}
w.WriteHeader(http.StatusOK)
w.Write(data)
} }
// HandleJobLogReq is implementation of method defined in interface 'Handler' // HandleJobLogReq is implementation of method defined in interface 'Handler'
func (dh *DefaultHandler) HandleJobLogReq(w http.ResponseWriter, req *http.Request) { func (dh *DefaultHandler) HandleJobLogReq(w http.ResponseWriter, req *http.Request) {
if !dh.preCheck(w) { if !dh.preCheck(w, req) {
return return
} }
@ -208,7 +205,7 @@ func (dh *DefaultHandler) HandleJobLogReq(w http.ResponseWriter, req *http.Reque
jobID := vars["job_id"] jobID := vars["job_id"]
if strings.Contains(jobID, "..") || strings.ContainsRune(jobID, os.PathSeparator) { if strings.Contains(jobID, "..") || strings.ContainsRune(jobID, os.PathSeparator) {
dh.handleError(w, http.StatusBadRequest, fmt.Errorf("Invalid Job ID: %s", jobID)) dh.handleError(w, req, http.StatusBadRequest, fmt.Errorf("Invalid Job ID: %s", jobID))
return return
} }
@ -220,34 +217,48 @@ func (dh *DefaultHandler) HandleJobLogReq(w http.ResponseWriter, req *http.Reque
code = http.StatusNotFound code = http.StatusNotFound
backErr = err backErr = err
} }
dh.handleError(w, code, backErr) dh.handleError(w, req, code, backErr)
return return
} }
dh.log(req, http.StatusOK, "")
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
w.Write(logData) w.Write(logData)
} }
func (dh *DefaultHandler) handleJSONData(w http.ResponseWriter, object interface{}) ([]byte, bool) { func (dh *DefaultHandler) handleJSONData(w http.ResponseWriter, req *http.Request, code int, object interface{}) {
data, err := json.Marshal(object) data, err := json.Marshal(object)
if err != nil { if err != nil {
dh.handleError(w, http.StatusInternalServerError, errs.HandleJSONDataError(err)) dh.handleError(w, req, http.StatusInternalServerError, errs.HandleJSONDataError(err))
return nil, false return
} }
return data, true logger.Debugf("Serve http request '%s %s': %d %s", req.Method, req.URL.String(), code, data)
w.Header().Set(http.CanonicalHeaderKey("Accept"), "application/json")
w.Header().Set(http.CanonicalHeaderKey("content-type"), "application/json")
w.WriteHeader(code)
w.Write(data)
} }
func (dh *DefaultHandler) handleError(w http.ResponseWriter, code int, err error) { func (dh *DefaultHandler) handleError(w http.ResponseWriter, req *http.Request, code int, err error) {
// Log all errors
logger.Errorf("Serve http request '%s %s' error: %d %s", req.Method, req.URL.String(), code, err.Error())
w.WriteHeader(code) w.WriteHeader(code)
w.Write([]byte(err.Error())) w.Write([]byte(err.Error()))
} }
func (dh *DefaultHandler) preCheck(w http.ResponseWriter) bool { func (dh *DefaultHandler) preCheck(w http.ResponseWriter, req *http.Request) bool {
if dh.controller == nil { if dh.controller == nil {
dh.handleError(w, http.StatusInternalServerError, errs.MissingBackendHandlerError(fmt.Errorf("nil controller"))) dh.handleError(w, req, http.StatusInternalServerError, errs.MissingBackendHandlerError(fmt.Errorf("nil controller")))
return false return false
} }
return true return true
} }
func (dh *DefaultHandler) log(req *http.Request, code int, text string) {
logger.Debugf("Serve http request '%s %s': %d %s", req.Method, req.URL.String(), code, text)
}

View File

@ -1,4 +1,16 @@
// Copyright Project Harbor Authors. All rights reserved. // Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package api package api
import ( import (

View File

@ -1,4 +1,16 @@
// Copyright Project Harbor Authors. All rights reserved. // Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package api package api
@ -7,6 +19,7 @@ import (
"net/http" "net/http"
"github.com/goharbor/harbor/src/jobservice/errs" "github.com/goharbor/harbor/src/jobservice/errs"
"github.com/goharbor/harbor/src/jobservice/logger"
"github.com/gorilla/mux" "github.com/gorilla/mux"
) )
@ -53,6 +66,7 @@ func (br *BaseRouter) ServeHTTP(w http.ResponseWriter, req *http.Request) {
// Do auth // Do auth
if err := br.authenticator.DoAuth(req); err != nil { if err := br.authenticator.DoAuth(req); err != nil {
authErr := errs.UnauthorizedError(err) authErr := errs.UnauthorizedError(err)
logger.Errorf("Serve http request '%s %s' failed with error: %s", req.Method, req.URL.String(), authErr.Error())
w.WriteHeader(http.StatusUnauthorized) w.WriteHeader(http.StatusUnauthorized)
w.Write([]byte(authErr.Error())) w.Write([]byte(authErr.Error()))
return return

View File

@ -1,4 +1,16 @@
// Copyright Project Harbor Authors. All rights reserved. // Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package api package api

View File

@ -19,15 +19,15 @@ worker_pool:
redis_pool: redis_pool:
#redis://[arbitrary_username:password@]ipaddress:port/database_index #redis://[arbitrary_username:password@]ipaddress:port/database_index
#or ipaddress:port[,weight,password,database_index] #or ipaddress:port[,weight,password,database_index]
redis_url: "redis:6379" redis_url: "localhost:6379"
namespace: "harbor_job_service" namespace: "harbor_job_service"
#Logger for job #Logger for job
logger: logger:
path: "/Users/szou/tmp/job_logs" path: "~/tmp/job_logs"
level: "INFO" level: "DEBUG"
archive_period: 1 #days archive_period: 1 #days
#Admin server endpoint #Admin server endpoint
admin_server: "http://10.160.178.186:9010/" admin_server: "http://adminserver:9010/"

View File

@ -1,4 +1,16 @@
// Copyright Project Harbor Authors. All rights reserved. // Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package config provides functions to handle the configurations of job service. // Package config provides functions to handle the configurations of job service.
package config package config

View File

@ -1,4 +1,16 @@
// Copyright Project Harbor Authors. All rights reserved. // Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package config package config
import ( import (

View File

@ -1,4 +1,16 @@
// Copyright Project Harbor Authors. All rights reserved. // Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package core package core

View File

@ -1,4 +1,16 @@
// Copyright Project Harbor Authors. All rights reserved. // Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package core package core
import ( import (

View File

@ -1,4 +1,16 @@
// Copyright Project Harbor Authors. All rights reserved. // Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package core provides the main job operation interface and components. // Package core provides the main job operation interface and components.
package core package core

View File

@ -1,3 +1,17 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package env package env
import ( import (

View File

@ -1,4 +1,16 @@
// Copyright Project Harbor Authors. All rights reserved. // Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package env package env
@ -6,6 +18,7 @@ import (
"context" "context"
"github.com/goharbor/harbor/src/jobservice/logger" "github.com/goharbor/harbor/src/jobservice/logger"
"github.com/goharbor/harbor/src/jobservice/models"
) )
// JobContext is combination of BaseContext and other job specified resources. // JobContext is combination of BaseContext and other job specified resources.
@ -53,6 +66,9 @@ type JobContext interface {
// Return the logger // Return the logger
GetLogger() logger.Interface GetLogger() logger.Interface
// Launch sub jobs
LaunchJob(req models.JobRequest) (models.JobStats, error)
} }
// JobData defines job context dependencies. // JobData defines job context dependencies.

View File

@ -1,4 +1,16 @@
// Copyright Project Harbor Authors. All rights reserved. // Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package errs define some system errors with specified types. // Package errs define some system errors with specified types.
package errs package errs

View File

@ -1,4 +1,16 @@
// Copyright Project Harbor Authors. All rights reserved. // Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package impl package impl
@ -19,6 +31,7 @@ import (
"github.com/goharbor/harbor/src/jobservice/job" "github.com/goharbor/harbor/src/jobservice/job"
jlogger "github.com/goharbor/harbor/src/jobservice/job/impl/logger" jlogger "github.com/goharbor/harbor/src/jobservice/job/impl/logger"
"github.com/goharbor/harbor/src/jobservice/logger" "github.com/goharbor/harbor/src/jobservice/logger"
jmodel "github.com/goharbor/harbor/src/jobservice/models"
) )
const ( const (
@ -39,6 +52,9 @@ type Context struct {
// checkin func // checkin func
checkInFunc job.CheckInFunc checkInFunc job.CheckInFunc
// launch job
launchJobFunc job.LaunchJobFunc
// other required information // other required information
properties map[string]interface{} properties map[string]interface{}
@ -138,6 +154,18 @@ func (c *Context) Build(dep env.JobData) (env.JobContext, error) {
return nil, errors.New("failed to inject checkInFunc") return nil, errors.New("failed to inject checkInFunc")
} }
if launchJobFunc, ok := dep.ExtraData["launchJobFunc"]; ok {
if reflect.TypeOf(launchJobFunc).Kind() == reflect.Func {
if funcRef, ok := launchJobFunc.(job.LaunchJobFunc); ok {
jContext.launchJobFunc = funcRef
}
}
}
if jContext.launchJobFunc == nil {
return nil, errors.New("failed to inject launchJobFunc")
}
return jContext, nil return jContext, nil
} }
@ -177,6 +205,15 @@ func (c *Context) GetLogger() logger.Interface {
return c.logger return c.logger
} }
// LaunchJob launches sub jobs
func (c *Context) LaunchJob(req jmodel.JobRequest) (jmodel.JobStats, error) {
if c.launchJobFunc == nil {
return jmodel.JobStats{}, errors.New("nil launch job function")
}
return c.launchJobFunc(req)
}
func getDBFromConfig(cfg map[string]interface{}) *models.Database { func getDBFromConfig(cfg map[string]interface{}) *models.Database {
database := &models.Database{} database := &models.Database{}
database.Type = cfg[common.DatabaseType].(string) database.Type = cfg[common.DatabaseType].(string)

View File

@ -0,0 +1,163 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package impl
import (
"context"
"errors"
"fmt"
"reflect"
"github.com/goharbor/harbor/src/jobservice/config"
"github.com/goharbor/harbor/src/jobservice/env"
"github.com/goharbor/harbor/src/jobservice/job"
jlogger "github.com/goharbor/harbor/src/jobservice/job/impl/logger"
"github.com/goharbor/harbor/src/jobservice/logger"
jmodel "github.com/goharbor/harbor/src/jobservice/models"
)
// DefaultContext provides a basic job context
type DefaultContext struct {
// System context
sysContext context.Context
// Logger for job
logger logger.Interface
// op command func
opCommandFunc job.CheckOPCmdFunc
// checkin func
checkInFunc job.CheckInFunc
// launch job
launchJobFunc job.LaunchJobFunc
// other required information
properties map[string]interface{}
}
// NewDefaultContext is constructor of building DefaultContext
func NewDefaultContext(sysCtx context.Context) env.JobContext {
return &DefaultContext{
sysContext: sysCtx,
properties: make(map[string]interface{}),
}
}
// Build implements the same method in env.JobContext interface
// This func will build the job execution context before running
func (c *DefaultContext) Build(dep env.JobData) (env.JobContext, error) {
jContext := &DefaultContext{
sysContext: c.sysContext,
properties: make(map[string]interface{}),
}
// Copy properties
if len(c.properties) > 0 {
for k, v := range c.properties {
jContext.properties[k] = v
}
}
// Init logger here
logPath := fmt.Sprintf("%s/%s.log", config.GetLogBasePath(), dep.ID)
jContext.logger = jlogger.New(logPath, config.GetLogLevel())
if jContext.logger == nil {
return nil, errors.New("failed to initialize job logger")
}
if opCommandFunc, ok := dep.ExtraData["opCommandFunc"]; ok {
if reflect.TypeOf(opCommandFunc).Kind() == reflect.Func {
if funcRef, ok := opCommandFunc.(job.CheckOPCmdFunc); ok {
jContext.opCommandFunc = funcRef
}
}
}
if jContext.opCommandFunc == nil {
return nil, errors.New("failed to inject opCommandFunc")
}
if checkInFunc, ok := dep.ExtraData["checkInFunc"]; ok {
if reflect.TypeOf(checkInFunc).Kind() == reflect.Func {
if funcRef, ok := checkInFunc.(job.CheckInFunc); ok {
jContext.checkInFunc = funcRef
}
}
}
if jContext.checkInFunc == nil {
return nil, errors.New("failed to inject checkInFunc")
}
if launchJobFunc, ok := dep.ExtraData["launchJobFunc"]; ok {
if reflect.TypeOf(launchJobFunc).Kind() == reflect.Func {
if funcRef, ok := launchJobFunc.(job.LaunchJobFunc); ok {
jContext.launchJobFunc = funcRef
}
}
}
if jContext.launchJobFunc == nil {
return nil, errors.New("failed to inject launchJobFunc")
}
return jContext, nil
}
// Get implements the same method in env.JobContext interface
func (c *DefaultContext) Get(prop string) (interface{}, bool) {
v, ok := c.properties[prop]
return v, ok
}
// SystemContext implements the same method in env.JobContext interface
func (c *DefaultContext) SystemContext() context.Context {
return c.sysContext
}
// Checkin is bridge func for reporting detailed status
func (c *DefaultContext) Checkin(status string) error {
if c.checkInFunc != nil {
c.checkInFunc(status)
} else {
return errors.New("nil check in function")
}
return nil
}
// OPCommand return the control operational command like stop/cancel if have
func (c *DefaultContext) OPCommand() (string, bool) {
if c.opCommandFunc != nil {
return c.opCommandFunc()
}
return "", false
}
// GetLogger returns the logger
func (c *DefaultContext) GetLogger() logger.Interface {
return c.logger
}
// LaunchJob launches sub jobs
func (c *DefaultContext) LaunchJob(req jmodel.JobRequest) (jmodel.JobStats, error) {
if c.launchJobFunc == nil {
return jmodel.JobStats{}, errors.New("nil launch job function")
}
return c.launchJobFunc(req)
}

View File

@ -0,0 +1,100 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package impl
import (
"context"
"fmt"
"os"
"testing"
"time"
"github.com/goharbor/harbor/src/jobservice/config"
"github.com/goharbor/harbor/src/jobservice/env"
"github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/jobservice/models"
)
func TestDefaultContext(t *testing.T) {
defaultContext := NewDefaultContext(context.Background())
jobData := env.JobData{
ID: "fake_id",
Name: "DEMO",
Args: make(map[string]interface{}),
ExtraData: make(map[string]interface{}),
}
var opCmdFund job.CheckOPCmdFunc = func() (string, bool) {
return "stop", true
}
var checkInFunc job.CheckInFunc = func(msg string) {
fmt.Println(msg)
}
var launchJobFunc job.LaunchJobFunc = func(req models.JobRequest) (models.JobStats, error) {
return models.JobStats{
Stats: &models.JobStatData{
JobID: "fake_sub_job_id",
Status: "pending",
JobName: "DEMO",
JobKind: job.JobKindGeneric,
EnqueueTime: time.Now().Unix(),
UpdateTime: time.Now().Unix(),
},
}, nil
}
jobData.ExtraData["opCommandFunc"] = opCmdFund
jobData.ExtraData["checkInFunc"] = checkInFunc
jobData.ExtraData["launchJobFunc"] = launchJobFunc
oldLogConfig := config.DefaultConfig.LoggerConfig
defer func() {
config.DefaultConfig.LoggerConfig = oldLogConfig
}()
config.DefaultConfig.LoggerConfig = &config.LoggerConfig{
LogLevel: "debug",
ArchivePeriod: 1,
BasePath: os.TempDir(),
}
newJobContext, err := defaultContext.Build(jobData)
if err != nil {
t.Fatal(err)
}
cmd, ok := newJobContext.OPCommand()
if !ok || cmd != "stop" {
t.Fatalf("expect op command 'stop' but got %s", cmd)
}
if err := newJobContext.Checkin("hello"); err != nil {
t.Fatal(err)
}
stats, err := newJobContext.LaunchJob(models.JobRequest{})
if err != nil {
t.Fatal(err)
}
if stats.Stats.JobID != "fake_sub_job_id" {
t.Fatalf("expect job id 'fake_sub_job_id' but got %s", stats.Stats.JobID)
}
ctx := newJobContext.SystemContext()
if ctx == nil {
t.Fatal("got nil system context")
}
}

View File

@ -1,4 +1,16 @@
// Copyright Project Harbor Authors. All rights reserved. // Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package impl package impl
@ -8,13 +20,12 @@ import (
"strings" "strings"
"time" "time"
"github.com/goharbor/harbor/src/common/dao" "github.com/goharbor/harbor/src/common/job"
"github.com/goharbor/harbor/src/common/models"
"github.com/goharbor/harbor/src/jobservice/opm"
"github.com/goharbor/harbor/src/jobservice/errs"
"github.com/goharbor/harbor/src/jobservice/env" "github.com/goharbor/harbor/src/jobservice/env"
"github.com/goharbor/harbor/src/jobservice/errs"
"github.com/goharbor/harbor/src/jobservice/models"
"github.com/goharbor/harbor/src/jobservice/opm"
) )
// DemoJob is the job to demostrate the job interface. // DemoJob is the job to demostrate the job interface.
@ -53,25 +64,17 @@ func (dj *DemoJob) Run(ctx env.JobContext, params map[string]interface{}) error
defer func() { defer func() {
logger.Info("I'm finished, exit!") logger.Info("I'm finished, exit!")
fmt.Println("I'm finished, exit!")
}() }()
fmt.Println("I'm running") fmt.Println("I'm running")
logger.Info("=======Replication job running=======")
logger.Infof("params: %#v\n", params) logger.Infof("params: %#v\n", params)
logger.Infof("context: %#v\n", ctx) logger.Infof("context: %#v\n", ctx)
if v, ok := ctx.Get("email_from"); ok { if v, ok := ctx.Get("email_from"); ok {
fmt.Printf("Get prop form context: email_from=%s\n", v) fmt.Printf("Get prop form context: email_from=%s\n", v)
} }
if u, err := dao.GetUser(models.User{}); err == nil { /*if u, err := dao.GetUser(models.User{}); err == nil {
fmt.Printf("u=%#+v\n", u) fmt.Printf("u=%#+v\n", u)
}
/*if 1 != 0 {
return errors.New("I suicide")
}*/ }*/
// runtime error
// var runtime_err error = nil
// fmt.Println(runtime_err.Error())
logger.Info("check in 30%") logger.Info("check in 30%")
ctx.Checkin("30%") ctx.Checkin("30%")
@ -84,9 +87,8 @@ func (dj *DemoJob) Run(ctx env.JobContext, params map[string]interface{}) error
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
// HOLD ON FOR A WHILE // HOLD ON FOR A WHILE
logger.Error("Holding for 20 sec") logger.Error("Holding for 5 sec")
<-time.After(15 * time.Second) <-time.After(5 * time.Second)
// logger.Fatal("I'm back, check if I'm stopped/cancelled")
if cmd, ok := ctx.OPCommand(); ok { if cmd, ok := ctx.OPCommand(); ok {
logger.Infof("cmd=%s\n", cmd) logger.Infof("cmd=%s\n", cmd)
@ -100,6 +102,29 @@ func (dj *DemoJob) Run(ctx env.JobContext, params map[string]interface{}) error
return errs.JobStoppedError() return errs.JobStoppedError()
} }
fmt.Println("Launch sub job")
jobParams := make(map[string]interface{})
jobParams["image"] = "demo:1.7"
subDemoJob := models.JobRequest{
Job: &models.JobData{
Name: "DEMO",
Parameters: jobParams,
Metadata: &models.JobMetadata{
JobKind: job.JobKindGeneric,
},
},
}
for i := 0; i < 5; i++ {
subJob, err := ctx.LaunchJob(subDemoJob)
if err != nil {
fmt.Printf("Create sub job failed with error: %s\n", err)
logger.Error(err)
}
fmt.Printf("Sub job: %v", subJob)
}
fmt.Println("I'm close to end") fmt.Println("I'm close to end")
return nil return nil

View File

@ -1,4 +1,16 @@
// Copyright Project Harbor Authors. All rights reserved. // Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package impl package impl

View File

@ -1,3 +1,17 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package logger package logger
import ( import (

View File

@ -1,3 +1,17 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package logger package logger
import ( import (

View File

@ -1,3 +1,17 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package replication package replication
import ( import (

View File

@ -1,3 +1,16 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package replication package replication
import ( import (

View File

@ -1,3 +1,17 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package replication package replication
import ( import (

View File

@ -1,3 +1,17 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package replication package replication
import ( import (

View File

@ -1,3 +1,16 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package replication package replication
import ( import (

View File

@ -1,3 +1,17 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package replication package replication
import ( import (

View File

@ -1,3 +1,16 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package replication package replication
import ( import (

View File

@ -1,3 +1,17 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package utils package utils
import ( import (

View File

@ -1,8 +1,23 @@
// Copyright Project Harbor Authors. All rights reserved. // Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package job package job
import "github.com/goharbor/harbor/src/jobservice/env" import (
"github.com/goharbor/harbor/src/jobservice/env"
"github.com/goharbor/harbor/src/jobservice/models"
)
// CheckOPCmdFunc is the function to check if the related operation commands // CheckOPCmdFunc is the function to check if the related operation commands
// like STOP or CANCEL is fired for the specified job. If yes, return the // like STOP or CANCEL is fired for the specified job. If yes, return the
@ -12,6 +27,9 @@ type CheckOPCmdFunc func() (string, bool)
// CheckInFunc is designed for job to report more detailed progress info // CheckInFunc is designed for job to report more detailed progress info
type CheckInFunc func(message string) type CheckInFunc func(message string)
// LaunchJobFunc is designed to launch sub jobs in the job
type LaunchJobFunc func(req models.JobRequest) (models.JobStats, error)
// Interface defines the related injection and run entry methods. // Interface defines the related injection and run entry methods.
type Interface interface { type Interface interface {
// Declare how many times the job can be retried if failed. // Declare how many times the job can be retried if failed.

View File

@ -1,4 +1,16 @@
// Copyright Project Harbor Authors. All rights reserved. // Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package job package job

View File

@ -1,4 +1,16 @@
// Copyright Project Harbor Authors. All rights reserved. // Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package job package job

View File

@ -1,4 +1,16 @@
// Copyright Project Harbor Authors. All rights reserved. // Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package logger package logger

View File

@ -1,4 +1,16 @@
// Copyright Project Harbor Authors. All rights reserved. // Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package logger package logger

View File

@ -1,4 +1,16 @@
// Copyright Project Harbor Authors. All rights reserved. // Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package logger package logger

View File

@ -1,4 +1,16 @@
// Copyright Project Harbor Authors. All rights reserved. // Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package logger package logger
import ( import (

View File

@ -1,4 +1,16 @@
// Copyright Project Harbor Authors. All rights reserved. // Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package logger package logger

View File

@ -1,4 +1,16 @@
// Copyright Project Harbor Authors. All rights reserved. // Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package logger package logger
import ( import (

View File

@ -1,3 +1,17 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main package main
import ( import (

View File

@ -1,4 +1,16 @@
// Copyright Project Harbor Authors. All rights reserved. // Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package models package models
@ -33,20 +45,23 @@ type JobStats struct {
// JobStatData keeps the stats of job // JobStatData keeps the stats of job
type JobStatData struct { type JobStatData struct {
JobID string `json:"id"` JobID string `json:"id"`
Status string `json:"status"` Status string `json:"status"`
JobName string `json:"name"` JobName string `json:"name"`
JobKind string `json:"kind"` JobKind string `json:"kind"`
IsUnique bool `json:"unique"` IsUnique bool `json:"unique"`
RefLink string `json:"ref_link,omitempty"` RefLink string `json:"ref_link,omitempty"`
CronSpec string `json:"cron_spec,omitempty"` CronSpec string `json:"cron_spec,omitempty"`
EnqueueTime int64 `json:"enqueue_time"` EnqueueTime int64 `json:"enqueue_time"`
UpdateTime int64 `json:"update_time"` UpdateTime int64 `json:"update_time"`
RunAt int64 `json:"run_at,omitempty"` RunAt int64 `json:"run_at,omitempty"`
CheckIn string `json:"check_in,omitempty"` CheckIn string `json:"check_in,omitempty"`
CheckInAt int64 `json:"check_in_at,omitempty"` CheckInAt int64 `json:"check_in_at,omitempty"`
DieAt int64 `json:"die_at,omitempty"` DieAt int64 `json:"die_at,omitempty"`
HookStatus string `json:"hook_status,omitempty"` HookStatus string `json:"hook_status,omitempty"`
Executions []string `json:"executions,omitempty"` // For the jobs like periodic jobs, which may execute multiple times
UpstreamJobID string `json:"upstream_job_id,omitempty"` // Ref the upstream job if existing
IsMultipleExecutions bool `json:"multiple_executions"` // Indicate if the job has subsequent executions
} }
// JobPoolStats represents the healthy and status of all the running worker pools. // JobPoolStats represents the healthy and status of all the running worker pools.

View File

@ -1,4 +1,16 @@
// Copyright Project Harbor Authors. All rights reserved. // Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package opm package opm

View File

@ -1,4 +1,16 @@
// Copyright Project Harbor Authors. All rights reserved. // Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package opm package opm
import ( import (

View File

@ -1,4 +1,16 @@
// Copyright Project Harbor Authors. All rights reserved. // Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package opm package opm

View File

@ -1,4 +1,16 @@
// Copyright Project Harbor Authors. All rights reserved. // Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package opm package opm
import "testing" import "testing"

View File

@ -1,4 +1,16 @@
// Copyright Project Harbor Authors. All rights reserved. // Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package opm package opm
@ -26,6 +38,15 @@ type JobStatsManager interface {
// error : error if meet any problems // error : error if meet any problems
Retrieve(jobID string) (models.JobStats, error) Retrieve(jobID string) (models.JobStats, error)
// Update the properties of the job stats
//
// jobID string : ID of the being retried job
// fieldAndValues ...interface{} : One or more properties being updated
//
// Returns:
// error if update failed
Update(jobID string, fieldAndValues ...interface{}) error
// SetJobStatus will mark the status of job to the specified one // SetJobStatus will mark the status of job to the specified one
// Async method to retry // Async method to retry
SetJobStatus(jobID string, status string) SetJobStatus(jobID string, status string)
@ -79,4 +100,22 @@ type JobStatsManager interface {
// Returns: // Returns:
// error if meet any problems // error if meet any problems
ExpirePeriodicJobStats(jobID string) error ExpirePeriodicJobStats(jobID string) error
// Persist the links between upstream job and the executions.
//
// upstreamJobID string: ID of the upstream job
// executions ...string: IDs of the execution jobs
//
// Returns:
// error if meet any issues
AttachExecution(upstreamJobID string, executions ...string) error
// Get all the executions (IDs) fro the specified upstream Job.
//
// upstreamJobID string: ID of the upstream job
//
// Returns:
// the ID list of the executions if no error occurred
// or a non-nil error is returned
GetExecutions(upstreamJobID string) ([]string, error)
} }

View File

@ -1,4 +1,16 @@
// Copyright Project Harbor Authors. All rights reserved. // Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package opm package opm

View File

@ -1,4 +1,16 @@
// Copyright Project Harbor Authors. All rights reserved. // Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package opm package opm
@ -23,13 +35,16 @@ import (
) )
const ( const (
processBufferSize = 1024 processBufferSize = 1024
opSaveStats = "save_job_stats" opSaveStats = "save_job_stats"
opUpdateStatus = "update_job_status" opUpdateStatus = "update_job_status"
opCheckIn = "check_in" opCheckIn = "check_in"
opDieAt = "mark_die_at" opDieAt = "mark_die_at"
opReportStatus = "report_status" opReportStatus = "report_status"
maxFails = 3 opPersistExecutions = "persist_executions"
opUpdateStats = "update_job_stats"
maxFails = 3
jobStatsDataExpireTime = 60 * 60 * 24 * 7 // one week
// CtlCommandStop : command stop // CtlCommandStop : command stop
CtlCommandStop = "stop" CtlCommandStop = "stop"
@ -43,9 +58,18 @@ const (
) )
type queueItem struct { type queueItem struct {
op string Op string
fails uint Fails uint
data interface{} Data interface{}
}
func (qi *queueItem) string() string {
data, err := json.Marshal(qi)
if err != nil {
return fmt.Sprintf("%v", qi)
}
return string(data)
} }
// RedisJobStatsManager implements JobStatsManager based on redis. // RedisJobStatsManager implements JobStatsManager based on redis.
@ -62,7 +86,7 @@ type RedisJobStatsManager struct {
} }
// NewRedisJobStatsManager is constructor of RedisJobStatsManager // NewRedisJobStatsManager is constructor of RedisJobStatsManager
func NewRedisJobStatsManager(ctx context.Context, namespace string, redisPool *redis.Pool) *RedisJobStatsManager { func NewRedisJobStatsManager(ctx context.Context, namespace string, redisPool *redis.Pool) JobStatsManager {
isRunning := &atomic.Value{} isRunning := &atomic.Value{}
isRunning.Store(false) isRunning.Store(false)
@ -110,8 +134,8 @@ func (rjs *RedisJobStatsManager) Shutdown() {
// Async method // Async method
func (rjs *RedisJobStatsManager) Save(jobStats models.JobStats) { func (rjs *RedisJobStatsManager) Save(jobStats models.JobStats) {
item := &queueItem{ item := &queueItem{
op: opSaveStats, Op: opSaveStats,
data: jobStats, Data: jobStats,
} }
rjs.processChan <- item rjs.processChan <- item
@ -124,7 +148,21 @@ func (rjs *RedisJobStatsManager) Retrieve(jobID string) (models.JobStats, error)
return models.JobStats{}, errors.New("empty job ID") return models.JobStats{}, errors.New("empty job ID")
} }
return rjs.getJobStats(jobID) res, err := rjs.getJobStats(jobID)
if err != nil {
return models.JobStats{}, err
}
if res.Stats.IsMultipleExecutions {
executions, err := rjs.GetExecutions(jobID)
if err != nil {
return models.JobStats{}, err
}
res.Stats.Executions = executions
}
return res, nil
} }
// SetJobStatus is implementation of same method in JobStatsManager interface. // SetJobStatus is implementation of same method in JobStatsManager interface.
@ -135,8 +173,8 @@ func (rjs *RedisJobStatsManager) SetJobStatus(jobID string, status string) {
} }
item := &queueItem{ item := &queueItem{
op: opUpdateStatus, Op: opUpdateStatus,
data: []string{jobID, status}, Data: []string{jobID, status},
} }
rjs.processChan <- item rjs.processChan <- item
@ -161,13 +199,13 @@ func (rjs *RedisJobStatsManager) loop() {
go func(item *queueItem) { go func(item *queueItem) {
clearHookCache := false clearHookCache := false
if err := rjs.process(item); err != nil { if err := rjs.process(item); err != nil {
item.fails++ item.Fails++
if item.fails < maxFails { if item.Fails < maxFails {
logger.Warningf("Failed to process '%s' request with error: %s\n", item.op, err) logger.Warningf("Failed to process '%s' request with error: %s\n", item.Op, err)
// Retry after a random interval // Retry after a random interval
go func() { go func() {
timer := time.NewTimer(time.Duration(backoff(item.fails)) * time.Second) timer := time.NewTimer(time.Duration(backoff(item.Fails)) * time.Second)
defer timer.Stop() defer timer.Stop()
select { select {
@ -178,20 +216,22 @@ func (rjs *RedisJobStatsManager) loop() {
} }
}() }()
} else { } else {
logger.Errorf("Failed to process '%s' request with error: %s (%d times tried)\n", item.op, err, maxFails) logger.Errorf("Failed to process '%s' request with error: %s (%d times tried)\n", item.Op, err, maxFails)
if item.op == opReportStatus { if item.Op == opReportStatus {
clearHookCache = true clearHookCache = true
} }
} }
} else { } else {
if item.op == opReportStatus { logger.Debugf("Operation is successfully processed: %s", item.string())
if item.Op == opReportStatus {
clearHookCache = true clearHookCache = true
} }
} }
if clearHookCache { if clearHookCache {
// Clear cache to save memory if job status is success or stopped. // Clear cache to save memory if job status is success or stopped.
data := item.data.([]string) data := item.Data.([]string)
status := data[2] status := data[2]
if status == job.JobStatusSuccess || status == job.JobStatusStopped { if status == job.JobStatusSuccess || status == job.JobStatusStopped {
rjs.hookStore.Remove(data[0]) rjs.hookStore.Remove(data[0])
@ -233,8 +273,8 @@ func (rjs *RedisJobStatsManager) CheckIn(jobID string, message string) {
} }
item := &queueItem{ item := &queueItem{
op: opCheckIn, Op: opCheckIn,
data: []string{jobID, message}, Data: []string{jobID, message},
} }
rjs.processChan <- item rjs.processChan <- item
@ -264,8 +304,8 @@ func (rjs *RedisJobStatsManager) DieAt(jobID string, dieAt int64) {
} }
item := &queueItem{ item := &queueItem{
op: opDieAt, Op: opDieAt,
data: []interface{}{jobID, dieAt}, Data: []interface{}{jobID, dieAt},
} }
rjs.processChan <- item rjs.processChan <- item
@ -298,12 +338,78 @@ func (rjs *RedisJobStatsManager) ExpirePeriodicJobStats(jobID string) error {
// The periodic job (policy) is stopped/unscheduled and then // The periodic job (policy) is stopped/unscheduled and then
// the stats of periodic job now can be expired // the stats of periodic job now can be expired
key := utils.KeyJobStats(rjs.namespace, jobID) key := utils.KeyJobStats(rjs.namespace, jobID)
expireTime := 24 * 60 * 60 // 1 day _, err := conn.Do("EXPIRE", key, jobStatsDataExpireTime)
_, err := conn.Do("EXPIRE", key, expireTime)
return err return err
} }
// AttachExecution persist the links between upstream jobs and the related executions (jobs).
func (rjs *RedisJobStatsManager) AttachExecution(upstreamJobID string, executions ...string) error {
if len(upstreamJobID) == 0 {
return errors.New("empty upstream job ID is not allowed")
}
if len(executions) == 0 {
return errors.New("no executions existing to persist")
}
// Send to process channel
item := &queueItem{
Op: opPersistExecutions,
Data: []interface{}{upstreamJobID, executions},
}
rjs.processChan <- item
return nil
}
// GetExecutions returns the existing executions (IDs) for the specified job.
func (rjs *RedisJobStatsManager) GetExecutions(upstreamJobID string) ([]string, error) {
if len(upstreamJobID) == 0 {
return nil, errors.New("no upstream ID specified")
}
conn := rjs.redisPool.Get()
defer conn.Close()
key := utils.KeyUpstreamJobAndExecutions(rjs.namespace, upstreamJobID)
ids, err := redis.Strings(conn.Do("ZRANGE", key, 0, -1))
if err != nil {
if err == redis.ErrNil {
return []string{}, nil
}
return nil, err
}
return ids, nil
}
// Update the properties of job stats
func (rjs *RedisJobStatsManager) Update(jobID string, fieldAndValues ...interface{}) error {
if len(jobID) == 0 {
return errors.New("no updating job")
}
if len(fieldAndValues) == 0 || len(fieldAndValues)%2 != 0 {
return errors.New("filed and its value should be pair")
}
data := []interface{}{}
data = append(data, jobID)
data = append(data, fieldAndValues...)
item := &queueItem{
Op: opUpdateStats,
Data: data,
}
rjs.processChan <- item
return nil
}
func (rjs *RedisJobStatsManager) submitStatusReportingItem(jobID string, status, checkIn string) { func (rjs *RedisJobStatsManager) submitStatusReportingItem(jobID string, status, checkIn string) {
// Let it run in a separate goroutine to avoid waiting more time // Let it run in a separate goroutine to avoid waiting more time
go func() { go func() {
@ -325,8 +431,8 @@ func (rjs *RedisJobStatsManager) submitStatusReportingItem(jobID string, status,
} }
item := &queueItem{ item := &queueItem{
op: opReportStatus, Op: opReportStatus,
data: []string{jobID, hookURL, status, checkIn}, Data: []string{jobID, hookURL, status, checkIn},
} }
rjs.processChan <- item rjs.processChan <- item
@ -345,7 +451,7 @@ func (rjs *RedisJobStatsManager) reportStatus(jobID string, hookURL, status, che
jobStats, err := rjs.getJobStats(jobID) jobStats, err := rjs.getJobStats(jobID)
if err != nil { if err != nil {
// Just logged // Just logged
logger.Warningf("Retrieving stats of job %s for hook reporting failed with error: %s", jobID, err) logger.Errorf("Retrieving stats of job %s for hook reporting failed with error: %s", jobID, err)
} else { } else {
// Override status/check in message // Override status/check in message
// Just double confirmation // Just double confirmation
@ -357,33 +463,40 @@ func (rjs *RedisJobStatsManager) reportStatus(jobID string, hookURL, status, che
return DefaultHookClient.ReportStatus(hookURL, reportingStatus) return DefaultHookClient.ReportStatus(hookURL, reportingStatus)
} }
func (rjs *RedisJobStatsManager) updateJobStatus(jobID string, status string) error { func (rjs *RedisJobStatsManager) updateJobStats(jobID string, fieldAndValues ...interface{}) error {
conn := rjs.redisPool.Get() conn := rjs.redisPool.Get()
defer conn.Close() defer conn.Close()
key := utils.KeyJobStats(rjs.namespace, jobID) key := utils.KeyJobStats(rjs.namespace, jobID)
args := make([]interface{}, 0, 6) args := make([]interface{}, 0, len(fieldAndValues)+1)
args = append(args, key, "status", status, "update_time", time.Now().Unix())
if status == job.JobStatusSuccess { args = append(args, key)
// make sure the 'die_at' is reset in case it's a retrying job args = append(args, fieldAndValues...)
args = append(args, "die_at", 0) args = append(args, "update_time", time.Now().Unix())
}
_, err := conn.Do("HMSET", args...) _, err := conn.Do("HMSET", args...)
return err return err
} }
func (rjs *RedisJobStatsManager) updateJobStatus(jobID string, status string) error {
args := make([]interface{}, 0, 4)
args = append(args, "status", status)
if status == job.JobStatusSuccess {
// make sure the 'die_at' is reset in case it's a retrying job
args = append(args, "die_at", 0)
}
return rjs.updateJobStats(jobID, args...)
}
func (rjs *RedisJobStatsManager) checkIn(jobID string, message string) error { func (rjs *RedisJobStatsManager) checkIn(jobID string, message string) error {
conn := rjs.redisPool.Get()
defer conn.Close()
now := time.Now().Unix() now := time.Now().Unix()
key := utils.KeyJobStats(rjs.namespace, jobID) args := make([]interface{}, 0, 4)
args := make([]interface{}, 0, 7) args = append(args, "check_in", message, "check_in_at", now)
args = append(args, key, "check_in", message, "check_in_at", now, "update_time", now)
_, err := conn.Do("HMSET", args...)
return err return rjs.updateJobStats(jobID, args...)
} }
func (rjs *RedisJobStatsManager) dieAt(jobID string, baseTime int64) error { func (rjs *RedisJobStatsManager) dieAt(jobID string, baseTime int64) error {
@ -401,11 +514,9 @@ func (rjs *RedisJobStatsManager) dieAt(jobID string, baseTime int64) error {
if j, err := utils.DeSerializeJob(jws.JobBytes); err == nil { if j, err := utils.DeSerializeJob(jws.JobBytes); err == nil {
if j.ID == jobID { if j.ID == jobID {
// Found // Found
statsKey := utils.KeyJobStats(rjs.namespace, jobID) args := make([]interface{}, 0, 6)
args := make([]interface{}, 0, 7) args = append(args, "die_at", jws.Score)
args = append(args, statsKey, "die_at", jws.Score, "update_time", time.Now().Unix()) return rjs.updateJobStats(jobID, args...)
_, err := conn.Do("HMSET", args...)
return err
} }
} }
} }
@ -479,6 +590,16 @@ func (rjs *RedisJobStatsManager) getJobStats(jobID string) (models.JobStats, err
case "die_at": case "die_at":
v, _ := strconv.ParseInt(value, 10, 64) v, _ := strconv.ParseInt(value, 10, 64)
res.Stats.DieAt = v res.Stats.DieAt = v
case "upstream_job_id":
res.Stats.UpstreamJobID = value
break
case "multiple_executions":
v, err := strconv.ParseBool(value)
if err != nil {
v = false
}
res.Stats.IsMultipleExecutions = v
break
default: default:
break break
} }
@ -509,6 +630,7 @@ func (rjs *RedisJobStatsManager) saveJobStats(jobStats models.JobStats) error {
"update_time", jobStats.Stats.UpdateTime, "update_time", jobStats.Stats.UpdateTime,
"run_at", jobStats.Stats.RunAt, "run_at", jobStats.Stats.RunAt,
"cron_spec", jobStats.Stats.CronSpec, "cron_spec", jobStats.Stats.CronSpec,
"multiple_executions", jobStats.Stats.IsMultipleExecutions,
) )
if jobStats.Stats.CheckInAt > 0 && !utils.IsEmptyStr(jobStats.Stats.CheckIn) { if jobStats.Stats.CheckInAt > 0 && !utils.IsEmptyStr(jobStats.Stats.CheckIn) {
args = append(args, args = append(args,
@ -520,11 +642,15 @@ func (rjs *RedisJobStatsManager) saveJobStats(jobStats models.JobStats) error {
args = append(args, "die_at", jobStats.Stats.DieAt) args = append(args, "die_at", jobStats.Stats.DieAt)
} }
if len(jobStats.Stats.UpstreamJobID) > 0 {
args = append(args, "upstream_job_id", jobStats.Stats.UpstreamJobID)
}
conn.Send("HMSET", args...) conn.Send("HMSET", args...)
// If job kind is periodic job, expire time should not be set // If job kind is periodic job, expire time should not be set
// If job kind is scheduled job, expire time should be runAt+1day // If job kind is scheduled job, expire time should be runAt+1day
if jobStats.Stats.JobKind != job.JobKindPeriodic { if jobStats.Stats.JobKind != job.JobKindPeriodic {
var expireTime int64 = 60 * 60 * 24 var expireTime int64 = jobStatsDataExpireTime
if jobStats.Stats.JobKind == job.JobKindScheduled { if jobStats.Stats.JobKind == job.JobKindScheduled {
nowTime := time.Now().Unix() nowTime := time.Now().Unix()
future := jobStats.Stats.RunAt - nowTime future := jobStats.Stats.RunAt - nowTime
@ -539,23 +665,60 @@ func (rjs *RedisJobStatsManager) saveJobStats(jobStats models.JobStats) error {
return conn.Flush() return conn.Flush()
} }
func (rjs *RedisJobStatsManager) saveExecutions(upstreamJobID string, executions []string) error {
key := utils.KeyUpstreamJobAndExecutions(rjs.namespace, upstreamJobID)
conn := rjs.redisPool.Get()
defer conn.Close()
err := conn.Send("MULTI")
if err != nil {
return err
}
args := []interface{}{key}
baseScore := time.Now().Unix()
for index, execution := range executions {
args = append(args, baseScore+int64(index), execution)
}
if err := conn.Send("ZADD", args...); err != nil {
return err
}
// add expire time
if err := conn.Send("EXPIRE", key, jobStatsDataExpireTime); err != nil {
return err
}
_, err = conn.Do("EXEC")
return err
}
func (rjs *RedisJobStatsManager) process(item *queueItem) error { func (rjs *RedisJobStatsManager) process(item *queueItem) error {
switch item.op { switch item.Op {
case opSaveStats: case opSaveStats:
jobStats := item.data.(models.JobStats) jobStats := item.Data.(models.JobStats)
return rjs.saveJobStats(jobStats) return rjs.saveJobStats(jobStats)
case opUpdateStatus: case opUpdateStatus:
data := item.data.([]string) data := item.Data.([]string)
return rjs.updateJobStatus(data[0], data[1]) return rjs.updateJobStatus(data[0], data[1])
case opCheckIn: case opCheckIn:
data := item.data.([]string) data := item.Data.([]string)
return rjs.checkIn(data[0], data[1]) return rjs.checkIn(data[0], data[1])
case opDieAt: case opDieAt:
data := item.data.([]interface{}) data := item.Data.([]interface{})
return rjs.dieAt(data[0].(string), data[1].(int64)) return rjs.dieAt(data[0].(string), data[1].(int64))
case opReportStatus: case opReportStatus:
data := item.data.([]string) data := item.Data.([]string)
return rjs.reportStatus(data[0], data[1], data[2], data[3]) return rjs.reportStatus(data[0], data[1], data[2], data[3])
case opPersistExecutions:
data := item.Data.([]interface{})
return rjs.saveExecutions(data[0].(string), data[1].([]string))
case opUpdateStats:
data := item.Data.([]interface{})
return rjs.updateJobStats(data[0].(string), data[1:]...)
default: default:
break break
} }

View File

@ -1,4 +1,16 @@
// Copyright Project Harbor Authors. All rights reserved. // Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package opm package opm
import ( import (
@ -10,6 +22,7 @@ import (
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"os" "os"
"strings"
"testing" "testing"
"time" "time"
@ -220,6 +233,52 @@ func TestCheckIn(t *testing.T) {
} }
} }
func TestExecutionRelated(t *testing.T) {
mgr := createStatsManager(redisPool)
mgr.Start()
defer mgr.Shutdown()
<-time.After(200 * time.Millisecond)
if err := mgr.AttachExecution("upstream_id", "id1", "id2", "id3"); err != nil {
t.Fatal(err)
}
// Wait for data is stable
<-time.After(200 * time.Millisecond)
ids, err := mgr.GetExecutions("upstream_id")
if err != nil {
t.Fatal(err)
}
if strings.Join(ids, "/") != "id1/id2/id3" {
t.Fatalf("expect 'id1/id2/id3' but got %s", strings.Join(ids, " / "))
}
}
func TestUpdateJobStats(t *testing.T) {
mgr := createStatsManager(redisPool)
mgr.Start()
defer mgr.Shutdown()
<-time.After(200 * time.Millisecond)
// make sure data existing
testingStats := createFakeStats()
mgr.Save(testingStats)
<-time.After(200 * time.Millisecond)
mgr.Update("fake_job_ID", "status", "Error")
<-time.After(200 * time.Millisecond)
updatedStats, err := mgr.Retrieve("fake_job_ID")
if err != nil {
t.Fatal(err)
}
if updatedStats.Stats.Status != "Error" {
t.Fatalf("expect status to be '%s' but got '%s'", "Error", updatedStats.Stats.Status)
}
}
func getRedisHost() string { func getRedisHost() string {
redisHost := os.Getenv(testingRedisHost) redisHost := os.Getenv(testingRedisHost)
if redisHost == "" { if redisHost == "" {

View File

@ -1,14 +1,29 @@
// Refer github.com/gocraft/work // Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package period package period
import ( import (
"fmt"
"math/rand" "math/rand"
"time" "time"
"github.com/gocraft/work" "github.com/gocraft/work"
"github.com/goharbor/harbor/src/jobservice/job" "github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/jobservice/logger" "github.com/goharbor/harbor/src/jobservice/logger"
"github.com/goharbor/harbor/src/jobservice/models"
"github.com/goharbor/harbor/src/jobservice/opm"
"github.com/goharbor/harbor/src/jobservice/utils" "github.com/goharbor/harbor/src/jobservice/utils"
"github.com/gomodule/redigo/redis" "github.com/gomodule/redigo/redis"
"github.com/robfig/cron" "github.com/robfig/cron"
@ -20,31 +35,20 @@ const (
) )
type periodicEnqueuer struct { type periodicEnqueuer struct {
namespace string namespace string
pool *redis.Pool pool *redis.Pool
policyStore *periodicJobPolicyStore policyStore *periodicJobPolicyStore
scheduledPeriodicJobs []*scheduledPeriodicJob stopChan chan struct{}
stopChan chan struct{} doneStoppingChan chan struct{}
doneStoppingChan chan struct{} statsManager opm.JobStatsManager
} }
type periodicJob struct { func newPeriodicEnqueuer(namespace string, pool *redis.Pool, policyStore *periodicJobPolicyStore, statsManager opm.JobStatsManager) *periodicEnqueuer {
jobName string
spec string
schedule cron.Schedule
}
type scheduledPeriodicJob struct {
scheduledAt time.Time
scheduledAtEpoch int64
*periodicJob
}
func newPeriodicEnqueuer(namespace string, pool *redis.Pool, policyStore *periodicJobPolicyStore) *periodicEnqueuer {
return &periodicEnqueuer{ return &periodicEnqueuer{
namespace: namespace, namespace: namespace,
pool: pool, pool: pool,
policyStore: policyStore, policyStore: policyStore,
statsManager: statsManager,
stopChan: make(chan struct{}), stopChan: make(chan struct{}),
doneStoppingChan: make(chan struct{}), doneStoppingChan: make(chan struct{}),
} }
@ -93,7 +97,7 @@ func (pe *periodicEnqueuer) loop() {
} }
func (pe *periodicEnqueuer) enqueue() error { func (pe *periodicEnqueuer) enqueue() error {
now := utils.NowEpochSeconds() now := time.Now().Unix()
nowTime := time.Unix(now, 0) nowTime := time.Unix(now, 0)
horizon := nowTime.Add(periodicEnqueuerHorizon) horizon := nowTime.Add(periodicEnqueuerHorizon)
@ -105,20 +109,24 @@ func (pe *periodicEnqueuer) enqueue() error {
if err != nil { if err != nil {
// The cron spec should be already checked at top components. // The cron spec should be already checked at top components.
// Just in cases, if error occurred, ignore it // Just in cases, if error occurred, ignore it
logger.Errorf("[Ignore] Invalid corn spec in periodic policy %s %s: %s", pl.JobName, pl.PolicyID, err)
continue continue
} }
pj := &periodicJob{
jobName: pl.JobName,
spec: pl.CronSpec,
schedule: schedule,
}
for t := pj.schedule.Next(nowTime); t.Before(horizon); t = pj.schedule.Next(t) {
epoch := t.Unix()
job := &work.Job{
Name: pj.jobName,
ID: pl.PolicyID, // Same with the id of the policy it's being scheduled for
// This is technically wrong, but this lets the bytes be identical for the same periodic job instance. If we don't do this, we'd need to use a different approach -- probably giving each periodic job its own history of the past 100 periodic jobs, and only scheduling a job if it's not in the history. executions := []string{}
for t := schedule.Next(nowTime); t.Before(horizon); t = schedule.Next(t) {
epoch := t.Unix()
scheduledExecutionID := utils.MakeIdentifier()
executions = append(executions, scheduledExecutionID)
// Create an execution (job) based on the periodic job template (policy)
job := &work.Job{
Name: pl.JobName,
ID: scheduledExecutionID,
// This is technically wrong, but this lets the bytes be identical for the same periodic job instance.
// If we don't do this, we'd need to use a different approach -- probably giving each periodic job its own
// history of the past 100 periodic jobs, and only scheduling a job if it's not in the history.
EnqueuedAt: epoch, EnqueuedAt: epoch,
Args: pl.JobParameters, // Pass parameters to scheduled job here Args: pl.JobParameters, // Pass parameters to scheduled job here
} }
@ -133,7 +141,17 @@ func (pe *periodicEnqueuer) enqueue() error {
return err return err
} }
logger.Infof("Schedule job %s for policy %s at %d\n", pj.jobName, pl.PolicyID, epoch) logger.Infof("Schedule job %s:%s for policy %s at %d\n", job.Name, job.ID, pl.PolicyID, epoch)
// Try to save the stats of new scheduled execution (job).
pe.createExecution(pl.PolicyID, pl.JobName, scheduledExecutionID, epoch)
}
// Link the upstream job (policy) with the created executions
if len(executions) > 0 {
if err := pe.statsManager.AttachExecution(pl.PolicyID, executions...); err != nil {
// Just logged it
logger.Errorf("Link upstream job with executions failed: %s", err)
}
} }
// Directly use redis conn to update the periodic job (policy) status // Directly use redis conn to update the periodic job (policy) status
// Do not care the result // Do not care the result
@ -145,6 +163,24 @@ func (pe *periodicEnqueuer) enqueue() error {
return err return err
} }
func (pe *periodicEnqueuer) createExecution(upstreamJobID, upstreamJobName, executionID string, runAt int64) {
execution := models.JobStats{
Stats: &models.JobStatData{
JobID: executionID,
JobName: upstreamJobName,
Status: job.JobStatusPending,
JobKind: job.JobKindScheduled,
EnqueueTime: time.Now().Unix(),
UpdateTime: time.Now().Unix(),
RefLink: fmt.Sprintf("/api/v1/jobs/%s", executionID),
RunAt: runAt,
UpstreamJobID: upstreamJobID,
},
}
pe.statsManager.Save(execution)
}
func (pe *periodicEnqueuer) shouldEnqueue() bool { func (pe *periodicEnqueuer) shouldEnqueue() bool {
conn := pe.pool.Get() conn := pe.pool.Get()
defer conn.Close() defer conn.Close()
@ -157,5 +193,5 @@ func (pe *periodicEnqueuer) shouldEnqueue() bool {
return true return true
} }
return lastEnqueue < (utils.NowEpochSeconds() - int64(periodicEnqueuerSleep/time.Minute)) return lastEnqueue < (time.Now().Unix() - int64(periodicEnqueuerSleep/time.Minute))
} }

View File

@ -1,11 +1,26 @@
// Copyright Project Harbor Authors. All rights reserved. // Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package period package period
import ( import (
"context"
"sync" "sync"
"testing" "testing"
"time" "time"
"github.com/goharbor/harbor/src/jobservice/opm"
"github.com/goharbor/harbor/src/jobservice/tests" "github.com/goharbor/harbor/src/jobservice/tests"
"github.com/goharbor/harbor/src/jobservice/utils" "github.com/goharbor/harbor/src/jobservice/utils"
) )
@ -16,7 +31,7 @@ func TestPeriodicEnqueuerStartStop(t *testing.T) {
lock: new(sync.RWMutex), lock: new(sync.RWMutex),
policies: make(map[string]*PeriodicJobPolicy), policies: make(map[string]*PeriodicJobPolicy),
} }
enqueuer := newPeriodicEnqueuer(ns, redisPool, ps) enqueuer := newPeriodicEnqueuer(ns, redisPool, ps, nil)
enqueuer.start() enqueuer.start()
<-time.After(100 * time.Millisecond) <-time.After(100 * time.Millisecond)
enqueuer.stop() enqueuer.stop()
@ -36,7 +51,11 @@ func TestEnqueue(t *testing.T) {
} }
ps.add(pl) ps.add(pl)
enqueuer := newPeriodicEnqueuer(ns, redisPool, ps) statsManager := opm.NewRedisJobStatsManager(context.Background(), ns, redisPool)
statsManager.Start()
defer statsManager.Shutdown()
enqueuer := newPeriodicEnqueuer(ns, redisPool, ps, statsManager)
if err := enqueuer.enqueue(); err != nil { if err := enqueuer.enqueue(); err != nil {
t.Error(err) t.Error(err)
} }

View File

@ -1,4 +1,16 @@
// Copyright Project Harbor Authors. All rights reserved. // Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package period package period

View File

@ -1,4 +1,16 @@
// Copyright Project Harbor Authors. All rights reserved. // Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package period package period

View File

@ -1,4 +1,16 @@
// Copyright Project Harbor Authors. All rights reserved. // Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package period package period
import ( import (

View File

@ -1,4 +1,16 @@
// Copyright Project Harbor Authors. All rights reserved. // Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package period package period
@ -10,6 +22,7 @@ import (
"time" "time"
"github.com/goharbor/harbor/src/jobservice/errs" "github.com/goharbor/harbor/src/jobservice/errs"
"github.com/goharbor/harbor/src/jobservice/opm"
"github.com/robfig/cron" "github.com/robfig/cron"
@ -37,12 +50,12 @@ type RedisPeriodicScheduler struct {
} }
// NewRedisPeriodicScheduler is constructor of RedisPeriodicScheduler // NewRedisPeriodicScheduler is constructor of RedisPeriodicScheduler
func NewRedisPeriodicScheduler(ctx *env.Context, namespace string, redisPool *redis.Pool) *RedisPeriodicScheduler { func NewRedisPeriodicScheduler(ctx *env.Context, namespace string, redisPool *redis.Pool, statsManager opm.JobStatsManager) *RedisPeriodicScheduler {
pstore := &periodicJobPolicyStore{ pstore := &periodicJobPolicyStore{
lock: new(sync.RWMutex), lock: new(sync.RWMutex),
policies: make(map[string]*PeriodicJobPolicy), policies: make(map[string]*PeriodicJobPolicy),
} }
enqueuer := newPeriodicEnqueuer(namespace, redisPool, pstore) enqueuer := newPeriodicEnqueuer(namespace, redisPool, pstore, statsManager)
return &RedisPeriodicScheduler{ return &RedisPeriodicScheduler{
context: ctx, context: ctx,
@ -261,6 +274,8 @@ func (rps *RedisPeriodicScheduler) Load() error {
} }
allPeriodicPolicies = append(allPeriodicPolicies, policy) allPeriodicPolicies = append(allPeriodicPolicies, policy)
logger.Infof("Load periodic job policy %s for job %s: %s", policy.PolicyID, policy.JobName, policy.CronSpec)
} }
if len(allPeriodicPolicies) > 0 { if len(allPeriodicPolicies) > 0 {

View File

@ -1,4 +1,16 @@
// Copyright Project Harbor Authors. All rights reserved. // Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package period package period
import ( import (
@ -7,6 +19,8 @@ import (
"testing" "testing"
"time" "time"
"github.com/goharbor/harbor/src/jobservice/opm"
"github.com/goharbor/harbor/src/jobservice/env" "github.com/goharbor/harbor/src/jobservice/env"
"github.com/goharbor/harbor/src/jobservice/tests" "github.com/goharbor/harbor/src/jobservice/tests"
"github.com/goharbor/harbor/src/jobservice/utils" "github.com/goharbor/harbor/src/jobservice/utils"
@ -15,7 +29,11 @@ import (
var redisPool = tests.GiveMeRedisPool() var redisPool = tests.GiveMeRedisPool()
func TestScheduler(t *testing.T) { func TestScheduler(t *testing.T) {
scheduler := myPeriodicScheduler() statsManager := opm.NewRedisJobStatsManager(context.Background(), tests.GiveMeTestNamespace(), redisPool)
statsManager.Start()
defer statsManager.Shutdown()
scheduler := myPeriodicScheduler(statsManager)
params := make(map[string]interface{}) params := make(map[string]interface{})
params["image"] = "testing:v1" params["image"] = "testing:v1"
id, runAt, err := scheduler.Schedule("fake_job", params, "5 * * * * *") id, runAt, err := scheduler.Schedule("fake_job", params, "5 * * * * *")
@ -51,7 +69,11 @@ func TestScheduler(t *testing.T) {
} }
func TestPubFunc(t *testing.T) { func TestPubFunc(t *testing.T) {
scheduler := myPeriodicScheduler() statsManager := opm.NewRedisJobStatsManager(context.Background(), tests.GiveMeTestNamespace(), redisPool)
statsManager.Start()
defer statsManager.Shutdown()
scheduler := myPeriodicScheduler(statsManager)
p := &PeriodicJobPolicy{ p := &PeriodicJobPolicy{
PolicyID: "fake_ID", PolicyID: "fake_ID",
JobName: "fake_job", JobName: "fake_job",
@ -71,7 +93,7 @@ func TestPubFunc(t *testing.T) {
} }
} }
func myPeriodicScheduler() *RedisPeriodicScheduler { func myPeriodicScheduler(statsManager opm.JobStatsManager) *RedisPeriodicScheduler {
sysCtx := context.Background() sysCtx := context.Background()
ctx := &env.Context{ ctx := &env.Context{
SystemContext: sysCtx, SystemContext: sysCtx,
@ -79,5 +101,5 @@ func myPeriodicScheduler() *RedisPeriodicScheduler {
ErrorChan: make(chan error, 1), ErrorChan: make(chan error, 1),
} }
return NewRedisPeriodicScheduler(ctx, tests.GiveMeTestNamespace(), redisPool) return NewRedisPeriodicScheduler(ctx, tests.GiveMeTestNamespace(), redisPool, statsManager)
} }

View File

@ -1,4 +1,16 @@
// Copyright Project Harbor Authors. All rights reserved. // Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package period package period

View File

@ -1,4 +1,16 @@
// Copyright Project Harbor Authors. All rights reserved. // Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package period package period
import ( import (

View File

@ -1,4 +1,16 @@
// Copyright Project Harbor Authors. All rights reserved. // Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pool package pool

View File

@ -1,3 +1,15 @@
// Copyright Project Harbor Authors. All rights reserved. // Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pool package pool

View File

@ -1,4 +1,16 @@
// Copyright Project Harbor Authors. All rights reserved. // Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pool package pool

View File

@ -1,4 +1,16 @@
// Copyright Project Harbor Authors. All rights reserved. // Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pool package pool
import ( import (

View File

@ -1,17 +1,35 @@
// Copyright Project Harbor Authors. All rights reserved. // Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pool package pool
import ( import (
"errors"
"fmt" "fmt"
"runtime"
"time" "time"
"github.com/goharbor/harbor/src/jobservice/job/impl"
"github.com/gocraft/work" "github.com/gocraft/work"
"github.com/goharbor/harbor/src/jobservice/env" "github.com/goharbor/harbor/src/jobservice/env"
"github.com/goharbor/harbor/src/jobservice/errs" "github.com/goharbor/harbor/src/jobservice/errs"
"github.com/goharbor/harbor/src/jobservice/job" "github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/jobservice/logger" "github.com/goharbor/harbor/src/jobservice/logger"
"github.com/goharbor/harbor/src/jobservice/models"
"github.com/goharbor/harbor/src/jobservice/opm" "github.com/goharbor/harbor/src/jobservice/opm"
"github.com/goharbor/harbor/src/jobservice/utils"
) )
// RedisJob is a job wrapper to wrap the job.Interface to the style which can be recognized by the redis pool. // RedisJob is a job wrapper to wrap the job.Interface to the style which can be recognized by the redis pool.
@ -66,6 +84,12 @@ func (rj *RedisJob) Run(j *work.Job) error {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
err = fmt.Errorf("Runtime error: %s", r) err = fmt.Errorf("Runtime error: %s", r)
// Log the stack
buf := make([]byte, 1<<16)
size := runtime.Stack(buf, false)
logger.Errorf("Runtime error happened when executing job %s:%s: %s", j.Name, j.ID, buf[0:size])
// record runtime error status // record runtime error status
rj.jobFailed(j.ID) rj.jobFailed(j.ID)
} }
@ -89,6 +113,7 @@ func (rj *RedisJob) Run(j *work.Job) error {
// Start to run // Start to run
rj.jobRunning(j.ID) rj.jobRunning(j.ID)
// Inject data // Inject data
err = runningJob.Run(execContext, j.Args) err = runningJob.Run(execContext, j.Args)
@ -163,6 +188,49 @@ func (rj *RedisJob) buildContext(j *work.Job) (env.JobContext, error) {
jData.ExtraData["checkInFunc"] = checkInFuncFactory(j.ID) jData.ExtraData["checkInFunc"] = checkInFuncFactory(j.ID)
launchJobFuncFactory := func(jobID string) job.LaunchJobFunc {
funcIntf := rj.context.SystemContext.Value(utils.CtlKeyOfLaunchJobFunc)
return func(jobReq models.JobRequest) (models.JobStats, error) {
launchJobFunc, ok := funcIntf.(job.LaunchJobFunc)
if !ok {
return models.JobStats{}, errors.New("no launch job func provided")
}
jobName := ""
if jobReq.Job != nil {
jobName = jobReq.Job.Name
}
if j.Name == jobName {
return models.JobStats{}, errors.New("infinite job creating loop may exist")
}
res, err := launchJobFunc(jobReq)
if err != nil {
return models.JobStats{}, err
}
if err := rj.statsManager.Update(jobID, "multiple_executions", true); err != nil {
logger.Error(err)
}
if err := rj.statsManager.Update(res.Stats.JobID, "upstream_job_id", jobID); err != nil {
logger.Error(err)
}
rj.statsManager.AttachExecution(jobID, res.Stats.JobID)
logger.Infof("Launch sub job %s:%s for upstream job %s", res.Stats.JobName, res.Stats.JobID, jobID)
return res, nil
}
}
jData.ExtraData["launchJobFunc"] = launchJobFuncFactory(j.ID)
// Use default context
if rj.context.JobContext == nil {
rj.context.JobContext = impl.NewDefaultContext(rj.context.SystemContext)
}
return rj.context.JobContext.Build(jData) return rj.context.JobContext.Build(jData)
} }

View File

@ -0,0 +1,100 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pool
import (
"context"
"os"
"sync"
"testing"
"time"
"github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/jobservice/models"
"github.com/goharbor/harbor/src/jobservice/utils"
"github.com/gocraft/work"
"github.com/goharbor/harbor/src/jobservice/config"
"github.com/goharbor/harbor/src/jobservice/opm"
"github.com/goharbor/harbor/src/jobservice/tests"
"github.com/goharbor/harbor/src/jobservice/env"
)
func TestJobWrapper(t *testing.T) {
ctx := context.Background()
mgr := opm.NewRedisJobStatsManager(ctx, tests.GiveMeTestNamespace(), rPool)
mgr.Start()
defer mgr.Shutdown()
<-time.After(200 * time.Millisecond)
var launchJobFunc job.LaunchJobFunc = func(req models.JobRequest) (models.JobStats, error) {
return models.JobStats{}, nil
}
ctx = context.WithValue(ctx, utils.CtlKeyOfLaunchJobFunc, launchJobFunc)
envContext := &env.Context{
SystemContext: ctx,
WG: &sync.WaitGroup{},
ErrorChan: make(chan error, 1), // with 1 buffer
}
wrapper := NewRedisJob((*fakeParentJob)(nil), envContext, mgr)
j := &work.Job{
ID: "FAKE",
Name: "DEMO",
EnqueuedAt: time.Now().Add(5 * time.Minute).Unix(),
}
oldLogConfig := config.DefaultConfig.LoggerConfig
defer func() {
config.DefaultConfig.LoggerConfig = oldLogConfig
}()
config.DefaultConfig.LoggerConfig = &config.LoggerConfig{
LogLevel: "debug",
ArchivePeriod: 1,
BasePath: os.TempDir(),
}
if err := wrapper.Run(j); err != nil {
t.Fatal(err)
}
}
type fakeParentJob struct{}
func (j *fakeParentJob) MaxFails() uint {
return 1
}
func (j *fakeParentJob) ShouldRetry() bool {
return false
}
func (j *fakeParentJob) Validate(params map[string]interface{}) error {
return nil
}
func (j *fakeParentJob) Run(ctx env.JobContext, params map[string]interface{}) error {
ctx.Checkin("start")
ctx.OPCommand()
ctx.LaunchJob(models.JobRequest{
Job: &models.JobData{
Name: "SUB_JOB",
Metadata: &models.JobMetadata{
JobKind: job.JobKindGeneric,
},
},
})
return nil
}

View File

@ -1,4 +1,16 @@
// Copyright Project Harbor Authors. All rights reserved. // Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pool package pool
@ -6,6 +18,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"math" "math"
"reflect"
"time" "time"
"github.com/gocraft/work" "github.com/gocraft/work"
@ -62,9 +75,9 @@ func NewGoCraftWorkPool(ctx *env.Context, namespace string, workerCount uint, re
pool := work.NewWorkerPool(RedisPoolContext{}, workerCount, namespace, redisPool) pool := work.NewWorkerPool(RedisPoolContext{}, workerCount, namespace, redisPool)
enqueuer := work.NewEnqueuer(namespace, redisPool) enqueuer := work.NewEnqueuer(namespace, redisPool)
client := work.NewClient(namespace, redisPool) client := work.NewClient(namespace, redisPool)
scheduler := period.NewRedisPeriodicScheduler(ctx, namespace, redisPool)
sweeper := period.NewSweeper(namespace, redisPool, client)
statsMgr := opm.NewRedisJobStatsManager(ctx.SystemContext, namespace, redisPool) statsMgr := opm.NewRedisJobStatsManager(ctx.SystemContext, namespace, redisPool)
scheduler := period.NewRedisPeriodicScheduler(ctx, namespace, redisPool, statsMgr)
sweeper := period.NewSweeper(namespace, redisPool, client)
msgServer := NewMessageServer(ctx.SystemContext, namespace, redisPool) msgServer := NewMessageServer(ctx.SystemContext, namespace, redisPool)
return &GoCraftWorkPool{ return &GoCraftWorkPool{
namespace: namespace, namespace: namespace,
@ -210,6 +223,19 @@ func (gcwp *GoCraftWorkPool) RegisterJob(name string, j interface{}) error {
return errors.New("job must implement the job.Interface") return errors.New("job must implement the job.Interface")
} }
// 1:1 constraint
if jInList, ok := gcwp.knownJobs[name]; ok {
return fmt.Errorf("Job name %s has been already registered with %s", name, reflect.TypeOf(jInList).String())
}
// Same job implementation can be only registered with one name
for jName, jInList := range gcwp.knownJobs {
jobImpl := reflect.TypeOf(j).String()
if reflect.TypeOf(jInList).String() == jobImpl {
return fmt.Errorf("Job %s has been already registered with name %s", jobImpl, jName)
}
}
redisJob := NewRedisJob(j, gcwp.context, gcwp.statsManager) redisJob := NewRedisJob(j, gcwp.context, gcwp.statsManager)
// Get more info from j // Get more info from j
@ -223,6 +249,8 @@ func (gcwp *GoCraftWorkPool) RegisterJob(name string, j interface{}) error {
) )
gcwp.knownJobs[name] = j // keep the name of registered jobs as known jobs for future validation gcwp.knownJobs[name] = j // keep the name of registered jobs as known jobs for future validation
logger.Infof("Register job %s with name %s", reflect.TypeOf(j).String(), name)
return nil return nil
} }
@ -314,15 +342,16 @@ func (gcwp *GoCraftWorkPool) PeriodicallyEnqueue(jobName string, params models.P
res := models.JobStats{ res := models.JobStats{
Stats: &models.JobStatData{ Stats: &models.JobStatData{
JobID: id, JobID: id,
JobName: jobName, JobName: jobName,
Status: job.JobStatusPending, Status: job.JobStatusPending,
JobKind: job.JobKindPeriodic, JobKind: job.JobKindPeriodic,
CronSpec: cronSetting, CronSpec: cronSetting,
EnqueueTime: time.Now().Unix(), EnqueueTime: time.Now().Unix(),
UpdateTime: time.Now().Unix(), UpdateTime: time.Now().Unix(),
RefLink: fmt.Sprintf("/api/v1/jobs/%s", id), RefLink: fmt.Sprintf("/api/v1/jobs/%s", id),
RunAt: nextRun, RunAt: nextRun,
IsMultipleExecutions: true, // True for periodic job
}, },
} }
@ -412,6 +441,9 @@ func (gcwp *GoCraftWorkPool) StopJob(jobID string) error {
if err := gcwp.scheduler.UnSchedule(jobID); err != nil { if err := gcwp.scheduler.UnSchedule(jobID); err != nil {
return err return err
} }
logger.Infof("Periodic job policy %s is removed", jobID)
// secondly we need try to delete the job instances scheduled for this periodic job, a try best action // secondly we need try to delete the job instances scheduled for this periodic job, a try best action
gcwp.deleteScheduledJobsOfPeriodicPolicy(theJob.Stats.JobID, theJob.Stats.CronSpec) // ignore error as we have logged gcwp.deleteScheduledJobsOfPeriodicPolicy(theJob.Stats.JobID, theJob.Stats.CronSpec) // ignore error as we have logged
// thirdly expire the job stats of this periodic job if exists // thirdly expire the job stats of this periodic job if exists
@ -527,7 +559,7 @@ func (gcwp *GoCraftWorkPool) deleteScheduledJobsOfPeriodicPolicy(policyID string
return err return err
} }
now := utils.NowEpochSeconds() now := time.Now().Unix()
nowTime := time.Unix(now, 0) nowTime := time.Unix(now, 0)
horizon := nowTime.Add(periodicEnqueuerHorizon) horizon := nowTime.Add(periodicEnqueuerHorizon)
@ -537,7 +569,9 @@ func (gcwp *GoCraftWorkPool) deleteScheduledJobsOfPeriodicPolicy(policyID string
epoch := t.Unix() epoch := t.Unix()
if err = gcwp.client.DeleteScheduledJob(epoch, policyID); err != nil { if err = gcwp.client.DeleteScheduledJob(epoch, policyID); err != nil {
// only logged // only logged
logger.Warningf("delete scheduled instance for periodic job %s failed with error: %s\n", policyID, err) logger.Warningf("Delete scheduled instance for periodic job %s failed with error: %s\n", policyID, err)
} else {
logger.Infof("Delete scheduled job for periodic job policy %s: runat = %d", policyID, epoch)
} }
} }

View File

@ -1,4 +1,16 @@
// Copyright Project Harbor Authors. All rights reserved. // Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pool package pool
import ( import (
@ -12,6 +24,7 @@ import (
"github.com/goharbor/harbor/src/jobservice/errs" "github.com/goharbor/harbor/src/jobservice/errs"
"github.com/goharbor/harbor/src/jobservice/job" "github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/jobservice/logger" "github.com/goharbor/harbor/src/jobservice/logger"
"github.com/goharbor/harbor/src/jobservice/models"
"github.com/goharbor/harbor/src/jobservice/opm" "github.com/goharbor/harbor/src/jobservice/opm"
"github.com/goharbor/harbor/src/jobservice/tests" "github.com/goharbor/harbor/src/jobservice/tests"
@ -33,15 +46,16 @@ func TestRegisterJob(t *testing.T) {
t.Error(err) t.Error(err)
} }
jobs := make(map[string]interface{}) if _, ok := wp.IsKnownJob("fake_job"); !ok {
jobs["fake_job_1st"] = (*fakeJob)(nil) t.Error("expected known job but registering 'fake_job' appears to have failed")
jobs["fake_job_2nd"] = (*fakeJob)(nil)
if err := wp.RegisterJobs(jobs); err != nil {
t.Error(err)
} }
if _, ok := wp.IsKnownJob("fake_job"); !ok { delete(wp.knownJobs, "fake_job")
t.Error("expect known job but seems failed to register job 'fake_job'")
jobs := make(map[string]interface{})
jobs["fake_job_1st"] = (*fakeJob)(nil)
if err := wp.RegisterJobs(jobs); err != nil {
t.Error(err)
} }
params := make(map[string]interface{}) params := make(map[string]interface{})
@ -143,6 +157,122 @@ func TestEnqueuePeriodicJob(t *testing.T) {
// <-time.After(1 * time.Second) // <-time.After(1 * time.Second)
} }
func TestPoolStats(t *testing.T) {
wp, _, cancel := createRedisWorkerPool()
defer func() {
if err := tests.ClearAll(tests.GiveMeTestNamespace(), redisPool.Get()); err != nil {
t.Error(err)
}
}()
defer cancel()
go wp.Start()
time.Sleep(1 * time.Second)
_, err := wp.Stats()
if err != nil {
t.Fatal(err)
}
}
func TestStopJob(t *testing.T) {
wp, _, cancel := createRedisWorkerPool()
defer func() {
if err := tests.ClearAll(tests.GiveMeTestNamespace(), redisPool.Get()); err != nil {
t.Error(err)
}
}()
defer cancel()
if err := wp.RegisterJob("fake_long_run_job", (*fakeRunnableJob)(nil)); err != nil {
t.Error(err)
}
go wp.Start()
time.Sleep(1 * time.Second)
// Stop generic job
params := make(map[string]interface{})
params["name"] = "testing:v1"
genericJob, err := wp.Enqueue("fake_long_run_job", params, false)
if err != nil {
t.Fatal(err)
}
time.Sleep(200 * time.Millisecond)
stats, err := wp.GetJobStats(genericJob.Stats.JobID)
if err != nil {
t.Fatal(err)
}
if stats.Stats.Status != job.JobStatusRunning {
t.Fatalf("expect job running but got %s", stats.Stats.Status)
}
if err := wp.StopJob(genericJob.Stats.JobID); err != nil {
t.Fatal(err)
}
// Stop scheduled job
scheduledJob, err := wp.Schedule("fake_long_run_job", params, 120, false)
if err != nil {
t.Fatal(err)
}
time.Sleep(200 * time.Millisecond)
if err := wp.StopJob(scheduledJob.Stats.JobID); err != nil {
t.Fatal(err)
}
}
func TestCancelJob(t *testing.T) {
wp, _, cancel := createRedisWorkerPool()
defer func() {
if err := tests.ClearAll(tests.GiveMeTestNamespace(), redisPool.Get()); err != nil {
t.Error(err)
}
}()
defer cancel()
if err := wp.RegisterJob("fake_long_run_job", (*fakeRunnableJob)(nil)); err != nil {
t.Error(err)
}
go wp.Start()
time.Sleep(1 * time.Second)
// Cancel job
params := make(map[string]interface{})
params["name"] = "testing:v1"
genericJob, err := wp.Enqueue("fake_long_run_job", params, false)
if err != nil {
t.Fatal(err)
}
time.Sleep(200 * time.Millisecond)
stats, err := wp.GetJobStats(genericJob.Stats.JobID)
if err != nil {
t.Fatal(err)
}
if stats.Stats.Status != job.JobStatusRunning {
t.Fatalf("expect job running but got %s", stats.Stats.Status)
}
if err := wp.CancelJob(genericJob.Stats.JobID); err != nil {
t.Fatal(err)
}
time.Sleep(3 * time.Second)
stats, err = wp.GetJobStats(genericJob.Stats.JobID)
if err != nil {
t.Fatal(err)
}
if stats.Stats.Status != job.JobStatusCancelled {
t.Fatalf("expect job cancelled but got %s", stats.Stats.Status)
}
if err := wp.RetryJob(genericJob.Stats.JobID); err != nil {
t.Fatal(err)
}
}
/*func TestCancelAndRetryJobWithHook(t *testing.T) { /*func TestCancelAndRetryJobWithHook(t *testing.T) {
wp, _, cancel := createRedisWorkerPool() wp, _, cancel := createRedisWorkerPool()
defer func() { defer func() {
@ -292,7 +422,7 @@ func (j *fakeRunnableJob) Validate(params map[string]interface{}) error {
} }
func (j *fakeRunnableJob) Run(ctx env.JobContext, params map[string]interface{}) error { func (j *fakeRunnableJob) Run(ctx env.JobContext, params map[string]interface{}) error {
tk := time.NewTicker(1 * time.Second) tk := time.NewTicker(200 * time.Millisecond)
defer tk.Stop() defer tk.Stop()
for { for {
@ -324,6 +454,9 @@ type fakeContext struct {
// checkin func // checkin func
checkInFunc job.CheckInFunc checkInFunc job.CheckInFunc
// launch job
launchJobFunc job.LaunchJobFunc
// other required information // other required information
properties map[string]interface{} properties map[string]interface{}
} }
@ -373,6 +506,18 @@ func (c *fakeContext) Build(dep env.JobData) (env.JobContext, error) {
return nil, errors.New("failed to inject checkInFunc") return nil, errors.New("failed to inject checkInFunc")
} }
if launchJobFunc, ok := dep.ExtraData["launchJobFunc"]; ok {
if reflect.TypeOf(launchJobFunc).Kind() == reflect.Func {
if funcRef, ok := launchJobFunc.(job.LaunchJobFunc); ok {
jContext.launchJobFunc = funcRef
}
}
}
if jContext.launchJobFunc == nil {
return nil, errors.New("failed to inject launchJobFunc")
}
return jContext, nil return jContext, nil
} }
@ -411,3 +556,12 @@ func (c *fakeContext) OPCommand() (string, bool) {
func (c *fakeContext) GetLogger() logger.Interface { func (c *fakeContext) GetLogger() logger.Interface {
return nil return nil
} }
// LaunchJob launches sub jobs
func (c *fakeContext) LaunchJob(req models.JobRequest) (models.JobStats, error) {
if c.launchJobFunc == nil {
return models.JobStats{}, errors.New("nil launch job function")
}
return c.launchJobFunc(req)
}

View File

@ -1,4 +1,16 @@
// Copyright Project Harbor Authors. All rights reserved. // Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pool package pool

View File

@ -1,4 +1,16 @@
// Copyright Project Harbor Authors. All rights reserved. // Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package runtime package runtime
@ -16,12 +28,15 @@ import (
"github.com/goharbor/harbor/src/jobservice/config" "github.com/goharbor/harbor/src/jobservice/config"
"github.com/goharbor/harbor/src/jobservice/core" "github.com/goharbor/harbor/src/jobservice/core"
"github.com/goharbor/harbor/src/jobservice/env" "github.com/goharbor/harbor/src/jobservice/env"
jsjob "github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/jobservice/job/impl" "github.com/goharbor/harbor/src/jobservice/job/impl"
"github.com/goharbor/harbor/src/jobservice/job/impl/gc" "github.com/goharbor/harbor/src/jobservice/job/impl/gc"
"github.com/goharbor/harbor/src/jobservice/job/impl/replication" "github.com/goharbor/harbor/src/jobservice/job/impl/replication"
"github.com/goharbor/harbor/src/jobservice/job/impl/scan" "github.com/goharbor/harbor/src/jobservice/job/impl/scan"
"github.com/goharbor/harbor/src/jobservice/logger" "github.com/goharbor/harbor/src/jobservice/logger"
"github.com/goharbor/harbor/src/jobservice/models"
"github.com/goharbor/harbor/src/jobservice/pool" "github.com/goharbor/harbor/src/jobservice/pool"
"github.com/goharbor/harbor/src/jobservice/utils"
"github.com/gomodule/redigo/redis" "github.com/gomodule/redigo/redis"
) )
@ -85,6 +100,12 @@ func (bs *Bootstrap) LoadAndRun() {
// Initialize controller // Initialize controller
ctl := core.NewController(backendPool) ctl := core.NewController(backendPool)
// Keep the job launch func in the system context
var launchJobFunc jsjob.LaunchJobFunc = func(req models.JobRequest) (models.JobStats, error) {
return ctl.LaunchJob(req)
}
rootContext.SystemContext = context.WithValue(rootContext.SystemContext, utils.CtlKeyOfLaunchJobFunc, launchJobFunc)
// Start the API server // Start the API server
apiServer := bs.loadAndRunAPIServer(rootContext, config.DefaultConfig, ctl) apiServer := bs.loadAndRunAPIServer(rootContext, config.DefaultConfig, ctl)
logger.Infof("Server is started at %s:%d with %s", "", config.DefaultConfig.Port, config.DefaultConfig.Protocol) logger.Infof("Server is started at %s:%d with %s", "", config.DefaultConfig.Port, config.DefaultConfig.Protocol)

View File

@ -1,4 +1,16 @@
// Copyright Project Harbor Authors. All rights reserved. // Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package tests provide test utilities // Package tests provide test utilities
package tests package tests
@ -83,7 +95,7 @@ func ClearAll(namespace string, conn redis.Conn) error {
func getRedisHost() string { func getRedisHost() string {
redisHost := os.Getenv(testingRedisHost) redisHost := os.Getenv(testingRedisHost)
if redisHost == "" { if redisHost == "" {
redisHost = "10.160.178.186" // for local test redisHost = "127.0.0.1" // for local test
} }
return redisHost return redisHost

View File

@ -1,3 +1,17 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package utils package utils
import ( import (
@ -5,7 +19,6 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"io" "io"
"time"
"github.com/gocraft/work" "github.com/gocraft/work"
) )
@ -48,21 +61,6 @@ func RedisKeyDead(namespace string) string {
return RedisNamespacePrefix(namespace) + "dead" return RedisNamespacePrefix(namespace) + "dead"
} }
var nowMock int64
// NowEpochSeconds ...
func NowEpochSeconds() int64 {
if nowMock != 0 {
return nowMock
}
return time.Now().Unix()
}
// SetNowEpochSecondsMock ...
func SetNowEpochSecondsMock(t int64) {
nowMock = t
}
// SerializeJob encodes work.Job to json data. // SerializeJob encodes work.Job to json data.
func SerializeJob(job *work.Job) ([]byte, error) { func SerializeJob(job *work.Job) ([]byte, error) {
return json.Marshal(job) return json.Marshal(job)

View File

@ -1,4 +1,16 @@
// Copyright Project Harbor Authors. All rights reserved. // Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package utils package utils
@ -61,7 +73,12 @@ func KeyJobStats(namespace string, jobID string) string {
return fmt.Sprintf("%s%s:%s", KeyNamespacePrefix(namespace), "job_stats", jobID) return fmt.Sprintf("%s%s:%s", KeyNamespacePrefix(namespace), "job_stats", jobID)
} }
// KeyJobCtlCommands give the key for publishing ctl commands like 'stop' etc. // KeyJobCtlCommands returns the key for publishing ctl commands like 'stop' etc.
func KeyJobCtlCommands(namespace string, jobID string) string { func KeyJobCtlCommands(namespace string, jobID string) string {
return fmt.Sprintf("%s%s:%s", KeyNamespacePrefix(namespace), "ctl_commands", jobID) return fmt.Sprintf("%s%s:%s", KeyNamespacePrefix(namespace), "ctl_commands", jobID)
} }
// KeyUpstreamJobAndExecutions returns the key for persisting executions.
func KeyUpstreamJobAndExecutions(namespace, upstreamJobID string) string {
return fmt.Sprintf("%s%s:%s", KeyNamespacePrefix(namespace), "executions", upstreamJobID)
}

View File

@ -1,4 +1,16 @@
// Copyright Project Harbor Authors. All rights reserved. // Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package utils provides reusable and sharable utilities for other packages and components. // Package utils provides reusable and sharable utilities for other packages and components.
package utils package utils
@ -14,6 +26,14 @@ import (
"github.com/gomodule/redigo/redis" "github.com/gomodule/redigo/redis"
) )
// CtlContextKey is used to keep controller reference in the system context
type CtlContextKey string
const (
// CtlKeyOfLaunchJobFunc is context key to keep the ctl launch job func
CtlKeyOfLaunchJobFunc CtlContextKey = "controller_launch_job_func"
)
// IsEmptyStr check if the specified str is empty (len ==0) after triming prefix and suffix spaces. // IsEmptyStr check if the specified str is empty (len ==0) after triming prefix and suffix spaces.
func IsEmptyStr(str string) bool { func IsEmptyStr(str string) bool {
return len(strings.TrimSpace(str)) == 0 return len(strings.TrimSpace(str)) == 0