mirror of
https://github.com/goharbor/harbor.git
synced 2025-02-02 04:51:22 +01:00
fix go vet issues in the code
Signed-off-by: Steven Zou <szou@vmware.com>
This commit is contained in:
parent
ad68a3f79d
commit
9bcbe2907b
1037
src/common/dao/dao_test.go
Normal file
1037
src/common/dao/dao_test.go
Normal file
File diff suppressed because it is too large
Load Diff
@ -213,7 +213,7 @@ func (dh *DefaultHandler) HandleJobLogReq(w http.ResponseWriter, req *http.Reque
|
||||
dh.log(req, http.StatusOK, "")
|
||||
|
||||
w.WriteHeader(http.StatusOK)
|
||||
w.Write(logData)
|
||||
writeDate(w, logData)
|
||||
}
|
||||
|
||||
// HandlePeriodicExecutions is implementation of method defined in interface 'Handler'
|
||||
@ -270,7 +270,7 @@ func (dh *DefaultHandler) handleJSONData(w http.ResponseWriter, req *http.Reques
|
||||
w.Header().Set(http.CanonicalHeaderKey("Accept"), "application/json")
|
||||
w.Header().Set(http.CanonicalHeaderKey("content-type"), "application/json")
|
||||
w.WriteHeader(code)
|
||||
w.Write(data)
|
||||
writeDate(w, data)
|
||||
}
|
||||
|
||||
func (dh *DefaultHandler) handleError(w http.ResponseWriter, req *http.Request, code int, err error) {
|
||||
@ -278,7 +278,7 @@ func (dh *DefaultHandler) handleError(w http.ResponseWriter, req *http.Request,
|
||||
logger.Errorf("Serve http request '%s %s' error: %d %s", req.Method, req.URL.String(), code, err.Error())
|
||||
|
||||
w.WriteHeader(code)
|
||||
w.Write([]byte(err.Error()))
|
||||
writeDate(w, []byte(err.Error()))
|
||||
}
|
||||
|
||||
func (dh *DefaultHandler) log(req *http.Request, code int, text string) {
|
||||
@ -294,7 +294,7 @@ func extractQuery(req *http.Request) *query.Parameter {
|
||||
|
||||
queries := req.URL.Query()
|
||||
// Page number
|
||||
p := queries.Get(query.QueryParamKeyPage)
|
||||
p := queries.Get(query.ParamKeyPage)
|
||||
if !utils.IsEmptyStr(p) {
|
||||
if pv, err := strconv.ParseUint(p, 10, 32); err == nil {
|
||||
if pv > 1 {
|
||||
@ -304,7 +304,7 @@ func extractQuery(req *http.Request) *query.Parameter {
|
||||
}
|
||||
|
||||
// Page number
|
||||
size := queries.Get(query.QueryParamKeyPageSize)
|
||||
size := queries.Get(query.ParamKeyPageSize)
|
||||
if !utils.IsEmptyStr(size) {
|
||||
if pz, err := strconv.ParseUint(size, 10, 32); err == nil {
|
||||
if pz > 0 {
|
||||
@ -314,7 +314,7 @@ func extractQuery(req *http.Request) *query.Parameter {
|
||||
}
|
||||
|
||||
// Extra query parameters
|
||||
nonStoppedOnly := queries.Get(query.QueryParamKeyNonStoppedOnly)
|
||||
nonStoppedOnly := queries.Get(query.ParamKeyNonStoppedOnly)
|
||||
if !utils.IsEmptyStr(nonStoppedOnly) {
|
||||
if nonStoppedOnlyV, err := strconv.ParseBool(nonStoppedOnly); err == nil {
|
||||
q.Extras.Set(query.ExtraParamKeyNonStoppedOnly, nonStoppedOnlyV)
|
||||
@ -323,3 +323,9 @@ func extractQuery(req *http.Request) *query.Parameter {
|
||||
|
||||
return q
|
||||
}
|
||||
|
||||
func writeDate(w http.ResponseWriter, bytes []byte) {
|
||||
if _, err := w.Write(bytes); err != nil {
|
||||
logger.Errorf("writer write error: %s", err)
|
||||
}
|
||||
}
|
||||
|
@ -53,7 +53,7 @@ type APIHandlerTestSuite struct {
|
||||
|
||||
// SetupSuite prepares test suite
|
||||
func (suite *APIHandlerTestSuite) SetupSuite() {
|
||||
os.Setenv(secretKey, fakeSecret)
|
||||
_ = os.Setenv(secretKey, fakeSecret)
|
||||
|
||||
suite.client = &http.Client{
|
||||
Timeout: 10 * time.Second,
|
||||
@ -65,14 +65,17 @@ func (suite *APIHandlerTestSuite) SetupSuite() {
|
||||
|
||||
suite.createServer()
|
||||
|
||||
go suite.server.Start()
|
||||
go func() {
|
||||
_ = suite.server.Start()
|
||||
}()
|
||||
|
||||
<-time.After(200 * time.Millisecond)
|
||||
}
|
||||
|
||||
// TearDownSuite clears test suite
|
||||
func (suite *APIHandlerTestSuite) TearDownSuite() {
|
||||
os.Unsetenv(secretKey)
|
||||
suite.server.Stop()
|
||||
_ = os.Unsetenv(secretKey)
|
||||
_ = suite.server.Stop()
|
||||
suite.cancel()
|
||||
}
|
||||
|
||||
@ -83,9 +86,9 @@ func TestAPIHandlerTestSuite(t *testing.T) {
|
||||
|
||||
// TestUnAuthorizedAccess ...
|
||||
func (suite *APIHandlerTestSuite) TestUnAuthorizedAccess() {
|
||||
os.Unsetenv(secretKey)
|
||||
_ = os.Unsetenv(secretKey)
|
||||
defer func() {
|
||||
os.Setenv(secretKey, fakeSecret)
|
||||
_ = os.Setenv(secretKey, fakeSecret)
|
||||
}()
|
||||
|
||||
_, code := suite.getReq(fmt.Sprintf("%s/%s", suite.APIAddr, "jobs/fake_job"))
|
||||
@ -327,7 +330,9 @@ func (suite *APIHandlerTestSuite) postReq(url string, data []byte) ([]byte, int)
|
||||
resData []byte
|
||||
)
|
||||
|
||||
defer res.Body.Close()
|
||||
defer func() {
|
||||
_ = res.Body.Close()
|
||||
}()
|
||||
if res.ContentLength > 0 {
|
||||
resData, err = ioutil.ReadAll(res.Body)
|
||||
if err != nil {
|
||||
@ -352,7 +357,10 @@ func (suite *APIHandlerTestSuite) getReq(url string) ([]byte, int) {
|
||||
return nil, 0
|
||||
}
|
||||
|
||||
defer res.Body.Close()
|
||||
defer func() {
|
||||
_ = res.Body.Close()
|
||||
}()
|
||||
|
||||
data, err := ioutil.ReadAll(res.Body)
|
||||
if err != nil {
|
||||
return nil, 0
|
||||
|
@ -16,6 +16,7 @@ package api
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/pkg/errors"
|
||||
"net/http"
|
||||
|
||||
"github.com/goharbor/harbor/src/jobservice/errs"
|
||||
@ -68,9 +69,12 @@ func (br *BaseRouter) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||
if req.URL.String() != fmt.Sprintf("%s/%s/stats", baseRoute, apiVersion) {
|
||||
if err := br.authenticator.DoAuth(req); err != nil {
|
||||
authErr := errs.UnauthorizedError(err)
|
||||
if authErr == nil {
|
||||
authErr = errors.Errorf("unauthorized: %s", err)
|
||||
}
|
||||
logger.Errorf("Serve http request '%s %s' failed with error: %s", req.Method, req.URL.String(), authErr.Error())
|
||||
w.WriteHeader(http.StatusUnauthorized)
|
||||
w.Write([]byte(authErr.Error()))
|
||||
writeDate(w, []byte(authErr.Error()))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
@ -17,12 +17,12 @@ package query
|
||||
const (
|
||||
// DefaultPageSize defines the default page size
|
||||
DefaultPageSize uint = 25
|
||||
// QueryParamKeyPage defines query param key of page number
|
||||
QueryParamKeyPage = "page_number"
|
||||
// QueryParamKeyPageSize defines query param key of page size
|
||||
QueryParamKeyPageSize = "page_size"
|
||||
// QueryParamKeyNonStoppedOnly defines query param key of querying non stopped periodic executions
|
||||
QueryParamKeyNonStoppedOnly = "non_dead_only"
|
||||
// ParamKeyPage defines query param key of page number
|
||||
ParamKeyPage = "page_number"
|
||||
// ParamKeyPageSize defines query param key of page size
|
||||
ParamKeyPageSize = "page_size"
|
||||
// ParamKeyNonStoppedOnly defines query param key of querying non stopped periodic executions
|
||||
ParamKeyNonStoppedOnly = "non_dead_only"
|
||||
// ExtraParamKeyNonStoppedOnly defines extra parameter key for querying non stopped periodic executions
|
||||
ExtraParamKeyNonStoppedOnly = "NonDeadOnly"
|
||||
)
|
||||
|
29
src/jobservice/common/query/q_test.go
Normal file
29
src/jobservice/common/query/q_test.go
Normal file
@ -0,0 +1,29 @@
|
||||
package query
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/stretchr/testify/suite"
|
||||
)
|
||||
|
||||
// QueryTestSuite tests q
|
||||
type QueryTestSuite struct {
|
||||
suite.Suite
|
||||
}
|
||||
|
||||
// TestQueryTestSuite is entry of go test
|
||||
func TestQueryTestSuite(t *testing.T) {
|
||||
suite.Run(t, new(QueryTestSuite))
|
||||
}
|
||||
|
||||
// TestExtraParams tests extra parameters
|
||||
func (suite *QueryTestSuite) TestExtraParams() {
|
||||
extras := make(ExtraParameters)
|
||||
extras.Set("a", 100)
|
||||
v, ok := extras.Get("a")
|
||||
|
||||
assert.Equal(suite.T(), true, ok)
|
||||
assert.Equal(suite.T(), 100, v.(int))
|
||||
}
|
@ -37,7 +37,7 @@ func RedisKeyLastPeriodicEnqueue(namespace string) string {
|
||||
return RedisNamespacePrefix(namespace) + "last_periodic_enqueue"
|
||||
}
|
||||
|
||||
//----------------------------------------------------------
|
||||
// ----------------------------------------------------------
|
||||
|
||||
// KeyNamespacePrefix returns the based key based on the namespace.
|
||||
func KeyNamespacePrefix(namespace string) string {
|
||||
|
@ -19,11 +19,11 @@ func HmSet(conn redis.Conn, key string, fieldAndValues ...interface{}) error {
|
||||
}
|
||||
|
||||
if utils.IsEmptyStr(key) {
|
||||
errors.New("no key specified to do HMSET")
|
||||
return errors.New("no key specified to do HMSET")
|
||||
}
|
||||
|
||||
if len(fieldAndValues) == 0 {
|
||||
errors.New("no properties specified to do HMSET")
|
||||
return errors.New("no properties specified to do HMSET")
|
||||
}
|
||||
|
||||
args := make([]interface{}, 0, len(fieldAndValues)+2)
|
||||
|
@ -25,11 +25,6 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
var (
|
||||
pool = tests.GiveMeRedisPool()
|
||||
namespace = tests.GiveMeTestNamespace()
|
||||
)
|
||||
|
||||
// For testing
|
||||
type simpleStatusChange struct {
|
||||
JobID string
|
||||
@ -56,15 +51,19 @@ func (suite *RdsUtilsTestSuite) SetupTest() {
|
||||
|
||||
// TearDownTest clears test cases
|
||||
func (suite *RdsUtilsTestSuite) TearDownTest() {
|
||||
suite.conn.Close()
|
||||
err := suite.conn.Close()
|
||||
assert.NoError(suite.T(), err, "close conn: nil error expected but got %s", err)
|
||||
}
|
||||
|
||||
// TearDownSuite clears test suite
|
||||
func (suite *RdsUtilsTestSuite) TearDownSuite() {
|
||||
conn := suite.pool.Get()
|
||||
defer conn.Close()
|
||||
defer func() {
|
||||
_ = conn.Close()
|
||||
}()
|
||||
|
||||
tests.ClearAll(suite.namespace, conn)
|
||||
err := tests.ClearAll(suite.namespace, conn)
|
||||
assert.NoError(suite.T(), err, "clear all: nil error expected but got %s", err)
|
||||
}
|
||||
|
||||
// TestZPopMin ...
|
||||
@ -75,7 +74,7 @@ func (suite *RdsUtilsTestSuite) TestZPopMin() {
|
||||
raw1, _ := json.Marshal(s1)
|
||||
raw2, _ := json.Marshal(s2)
|
||||
|
||||
key := KeyStatusUpdateRetryQueue(namespace)
|
||||
key := KeyStatusUpdateRetryQueue(suite.namespace)
|
||||
_, err := suite.conn.Do("ZADD", key, time.Now().Unix(), raw1)
|
||||
_, err = suite.conn.Do("ZADD", key, time.Now().Unix()+5, raw2)
|
||||
require.Nil(suite.T(), err, "zadd objects error should be nil")
|
||||
@ -84,14 +83,14 @@ func (suite *RdsUtilsTestSuite) TestZPopMin() {
|
||||
require.Nil(suite.T(), err, "nil error should be returned by calling ZPopMin")
|
||||
|
||||
change1 := &simpleStatusChange{}
|
||||
json.Unmarshal(v.([]byte), change1)
|
||||
_ = json.Unmarshal(v.([]byte), change1)
|
||||
assert.Equal(suite.T(), "a", change1.JobID, "job ID not equal")
|
||||
|
||||
v, err = ZPopMin(suite.conn, key)
|
||||
require.Nil(suite.T(), err, "nil error should be returned by calling ZPopMin")
|
||||
|
||||
change2 := &simpleStatusChange{}
|
||||
json.Unmarshal(v.([]byte), change2)
|
||||
_ = json.Unmarshal(v.([]byte), change2)
|
||||
assert.Equal(suite.T(), "b", change2.JobID, "job ID not equal")
|
||||
}
|
||||
|
||||
|
@ -118,7 +118,7 @@ func TranslateRedisAddress(commaFormat string) (string, bool) {
|
||||
return "", false
|
||||
}
|
||||
|
||||
urlParts := []string{}
|
||||
urlParts := make([]string, 0)
|
||||
// section[0] should be host:port
|
||||
redisURL := fmt.Sprintf("redis://%s", sections[0])
|
||||
if _, err := url.Parse(redisURL); err != nil {
|
||||
|
@ -24,20 +24,19 @@ import (
|
||||
"strings"
|
||||
|
||||
"github.com/goharbor/harbor/src/jobservice/common/utils"
|
||||
yaml "gopkg.in/yaml.v2"
|
||||
"gopkg.in/yaml.v2"
|
||||
)
|
||||
|
||||
const (
|
||||
jobServiceProtocol = "JOB_SERVICE_PROTOCOL"
|
||||
jobServicePort = "JOB_SERVICE_PORT"
|
||||
jobServiceHTTPCert = "JOB_SERVICE_HTTPS_CERT"
|
||||
jobServiceHTTPKey = "JOB_SERVICE_HTTPS_KEY"
|
||||
jobServiceWorkerPoolBackend = "JOB_SERVICE_POOL_BACKEND"
|
||||
jobServiceWorkers = "JOB_SERVICE_POOL_WORKERS"
|
||||
jobServiceRedisURL = "JOB_SERVICE_POOL_REDIS_URL"
|
||||
jobServiceRedisNamespace = "JOB_SERVICE_POOL_REDIS_NAMESPACE"
|
||||
jobServiceCoreServerEndpoint = "CORE_URL"
|
||||
jobServiceAuthSecret = "JOBSERVICE_SECRET"
|
||||
jobServiceProtocol = "JOB_SERVICE_PROTOCOL"
|
||||
jobServicePort = "JOB_SERVICE_PORT"
|
||||
jobServiceHTTPCert = "JOB_SERVICE_HTTPS_CERT"
|
||||
jobServiceHTTPKey = "JOB_SERVICE_HTTPS_KEY"
|
||||
jobServiceWorkerPoolBackend = "JOB_SERVICE_POOL_BACKEND"
|
||||
jobServiceWorkers = "JOB_SERVICE_POOL_WORKERS"
|
||||
jobServiceRedisURL = "JOB_SERVICE_POOL_REDIS_URL"
|
||||
jobServiceRedisNamespace = "JOB_SERVICE_POOL_REDIS_NAMESPACE"
|
||||
jobServiceAuthSecret = "JOBSERVICE_SECRET"
|
||||
|
||||
// JobServiceProtocolHTTPS points to the 'https' protocol
|
||||
JobServiceProtocolHTTPS = "https"
|
||||
@ -291,11 +290,11 @@ func (c *Configuration) validate() error {
|
||||
}
|
||||
|
||||
if !strings.HasPrefix(c.PoolConfig.RedisPoolCfg.RedisURL, redisSchema) {
|
||||
return errors.New("Invalid redis URL")
|
||||
return errors.New("invalid redis URL")
|
||||
}
|
||||
|
||||
if _, err := url.Parse(c.PoolConfig.RedisPoolCfg.RedisURL); err != nil {
|
||||
return fmt.Errorf("Invalid redis URL: %s", err.Error())
|
||||
return fmt.Errorf("invalid redis URL: %s", err.Error())
|
||||
}
|
||||
|
||||
if utils.IsEmptyStr(c.PoolConfig.RedisPoolCfg.Namespace) {
|
||||
|
@ -47,11 +47,16 @@ func (suite *ConfigurationTestSuite) TestConfigLoadingSucceed() {
|
||||
|
||||
// TestConfigLoadingWithEnv ...
|
||||
func (suite *ConfigurationTestSuite) TestConfigLoadingWithEnv() {
|
||||
setENV()
|
||||
defer unsetENV()
|
||||
err := setENV()
|
||||
require.Nil(suite.T(), err, "set envs: expect nil error but got error '%s'", err)
|
||||
|
||||
defer func() {
|
||||
err := unsetENV()
|
||||
require.Nil(suite.T(), err, "unset envs: expect nil error but got error '%s'", err)
|
||||
}()
|
||||
|
||||
cfg := &Configuration{}
|
||||
err := cfg.Load("../config_test.yml", true)
|
||||
err = cfg.Load("../config_test.yml", true)
|
||||
require.Nil(suite.T(), err, "load config from yaml file, expect nil error but got error '%s'", err)
|
||||
|
||||
assert.Equal(suite.T(), "https", cfg.Protocol, "expect protocol 'https', but got '%s'", cfg.Protocol)
|
||||
@ -118,28 +123,32 @@ func (suite *ConfigurationTestSuite) TestDefaultConfig() {
|
||||
)
|
||||
}
|
||||
|
||||
func setENV() {
|
||||
os.Setenv("JOB_SERVICE_PROTOCOL", "https")
|
||||
os.Setenv("JOB_SERVICE_PORT", "8989")
|
||||
os.Setenv("JOB_SERVICE_HTTPS_CERT", "../server.crt")
|
||||
os.Setenv("JOB_SERVICE_HTTPS_KEY", "../server.key")
|
||||
os.Setenv("JOB_SERVICE_POOL_BACKEND", "redis")
|
||||
os.Setenv("JOB_SERVICE_POOL_WORKERS", "8")
|
||||
os.Setenv("JOB_SERVICE_POOL_REDIS_URL", "8.8.8.8:6379,100,password,0")
|
||||
os.Setenv("JOB_SERVICE_POOL_REDIS_NAMESPACE", "ut_namespace")
|
||||
os.Setenv("JOBSERVICE_SECRET", "js_secret")
|
||||
os.Setenv("CORE_SECRET", "core_secret")
|
||||
func setENV() error {
|
||||
err := os.Setenv("JOB_SERVICE_PROTOCOL", "https")
|
||||
err = os.Setenv("JOB_SERVICE_PORT", "8989")
|
||||
err = os.Setenv("JOB_SERVICE_HTTPS_CERT", "../server.crt")
|
||||
err = os.Setenv("JOB_SERVICE_HTTPS_KEY", "../server.key")
|
||||
err = os.Setenv("JOB_SERVICE_POOL_BACKEND", "redis")
|
||||
err = os.Setenv("JOB_SERVICE_POOL_WORKERS", "8")
|
||||
err = os.Setenv("JOB_SERVICE_POOL_REDIS_URL", "8.8.8.8:6379,100,password,0")
|
||||
err = os.Setenv("JOB_SERVICE_POOL_REDIS_NAMESPACE", "ut_namespace")
|
||||
err = os.Setenv("JOBSERVICE_SECRET", "js_secret")
|
||||
err = os.Setenv("CORE_SECRET", "core_secret")
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func unsetENV() {
|
||||
os.Unsetenv("JOB_SERVICE_PROTOCOL")
|
||||
os.Unsetenv("JOB_SERVICE_PORT")
|
||||
os.Unsetenv("JOB_SERVICE_HTTPS_CERT")
|
||||
os.Unsetenv("JOB_SERVICE_HTTPS_KEY")
|
||||
os.Unsetenv("JOB_SERVICE_POOL_BACKEND")
|
||||
os.Unsetenv("JOB_SERVICE_POOL_WORKERS")
|
||||
os.Unsetenv("JOB_SERVICE_POOL_REDIS_URL")
|
||||
os.Unsetenv("JOB_SERVICE_POOL_REDIS_NAMESPACE")
|
||||
os.Unsetenv("JOBSERVICE_SECRET")
|
||||
os.Unsetenv("CORE_SECRET")
|
||||
func unsetENV() error {
|
||||
err := os.Unsetenv("JOB_SERVICE_PROTOCOL")
|
||||
err = os.Unsetenv("JOB_SERVICE_PORT")
|
||||
err = os.Unsetenv("JOB_SERVICE_HTTPS_CERT")
|
||||
err = os.Unsetenv("JOB_SERVICE_HTTPS_KEY")
|
||||
err = os.Unsetenv("JOB_SERVICE_POOL_BACKEND")
|
||||
err = os.Unsetenv("JOB_SERVICE_POOL_WORKERS")
|
||||
err = os.Unsetenv("JOB_SERVICE_POOL_REDIS_URL")
|
||||
err = os.Unsetenv("JOB_SERVICE_POOL_REDIS_NAMESPACE")
|
||||
err = os.Unsetenv("JOBSERVICE_SECRET")
|
||||
err = os.Unsetenv("CORE_SECRET")
|
||||
|
||||
return err
|
||||
}
|
||||
|
@ -14,16 +14,19 @@
|
||||
package core
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/goharbor/harbor/src/jobservice/common/query"
|
||||
"github.com/goharbor/harbor/src/jobservice/common/utils"
|
||||
"github.com/goharbor/harbor/src/jobservice/job"
|
||||
"github.com/goharbor/harbor/src/jobservice/job/impl/sample"
|
||||
"github.com/goharbor/harbor/src/jobservice/tests"
|
||||
"github.com/goharbor/harbor/src/jobservice/worker"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/stretchr/testify/suite"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// ControllerTestSuite tests functions of core controller
|
||||
@ -126,7 +129,7 @@ func (suite *ControllerTestSuite) TestJobActions() {
|
||||
// TestCheckStatus ...
|
||||
func (suite *ControllerTestSuite) TestCheckStatus() {
|
||||
suite.worker.On("Stats").Return(&worker.Stats{
|
||||
[]*worker.StatsData{
|
||||
Pools: []*worker.StatsData{
|
||||
{
|
||||
Status: "running",
|
||||
},
|
||||
@ -177,6 +180,61 @@ func (suite *ControllerTestSuite) TestInvalidChecks() {
|
||||
assert.NotNil(suite.T(), err, "invalid job name: error expected but got nil")
|
||||
}
|
||||
|
||||
// TestGetPeriodicExecutions tests GetPeriodicExecutions
|
||||
func (suite *ControllerTestSuite) TestGetPeriodicExecutions() {
|
||||
pool := tests.GiveMeRedisPool()
|
||||
namespace := tests.GiveMeTestNamespace()
|
||||
|
||||
jobID := utils.MakeIdentifier()
|
||||
nID := time.Now().Unix()
|
||||
mockJobStats := &job.Stats{
|
||||
Info: &job.StatsInfo{
|
||||
JobID: jobID,
|
||||
Status: job.ScheduledStatus.String(),
|
||||
JobKind: job.KindPeriodic,
|
||||
JobName: job.SampleJob,
|
||||
IsUnique: false,
|
||||
CronSpec: "0 0 * * * *",
|
||||
NumericPID: nID,
|
||||
},
|
||||
}
|
||||
|
||||
t := job.NewBasicTrackerWithStats(context.TODO(), mockJobStats, namespace, pool, nil)
|
||||
err := t.Save()
|
||||
require.NoError(suite.T(), err)
|
||||
|
||||
executionID := utils.MakeIdentifier()
|
||||
runAt := time.Now().Add(1 * time.Hour).Unix()
|
||||
executionStats := &job.Stats{
|
||||
Info: &job.StatsInfo{
|
||||
JobID: executionID,
|
||||
Status: job.ScheduledStatus.String(),
|
||||
JobKind: job.KindScheduled,
|
||||
JobName: job.SampleJob,
|
||||
IsUnique: false,
|
||||
CronSpec: "0 0 * * * *",
|
||||
RunAt: runAt,
|
||||
EnqueueTime: runAt,
|
||||
UpstreamJobID: jobID,
|
||||
},
|
||||
}
|
||||
|
||||
t2 := job.NewBasicTrackerWithStats(context.TODO(), executionStats, namespace, pool, nil)
|
||||
err = t2.Save()
|
||||
require.NoError(suite.T(), err)
|
||||
|
||||
suite.lcmCtl.On("Track", jobID).Return(t, nil)
|
||||
suite.lcmCtl.On("Track", executionID).Return(t2, nil)
|
||||
|
||||
_, total, err := suite.ctl.GetPeriodicExecutions(jobID, &query.Parameter{
|
||||
PageSize: 10,
|
||||
PageNumber: 1,
|
||||
Extras: make(query.ExtraParameters),
|
||||
})
|
||||
require.NoError(suite.T(), err)
|
||||
assert.Equal(suite.T(), int64(1), total)
|
||||
}
|
||||
|
||||
func createJobReq(kind string) *job.Request {
|
||||
params := make(job.Parameters)
|
||||
params["name"] = "testing:v1"
|
||||
|
@ -17,19 +17,21 @@ package hook
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"github.com/pkg/errors"
|
||||
"math/rand"
|
||||
"net/url"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"github.com/goharbor/harbor/src/jobservice/job"
|
||||
|
||||
"sync"
|
||||
|
||||
"github.com/goharbor/harbor/src/jobservice/common/rds"
|
||||
"github.com/goharbor/harbor/src/jobservice/env"
|
||||
"github.com/goharbor/harbor/src/jobservice/lcm"
|
||||
"github.com/goharbor/harbor/src/jobservice/logger"
|
||||
"github.com/gomodule/redigo/redis"
|
||||
"sync"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -146,8 +148,11 @@ func (ba *basicAgent) Serve() error {
|
||||
return errors.New("nil life cycle controller of hook agent")
|
||||
}
|
||||
|
||||
ba.wg.Add(1)
|
||||
go ba.loopRetry()
|
||||
logger.Info("Hook event retrying loop is started")
|
||||
|
||||
ba.wg.Add(1)
|
||||
go ba.serve()
|
||||
logger.Info("Basic hook agent is started")
|
||||
|
||||
@ -160,7 +165,6 @@ func (ba *basicAgent) serve() {
|
||||
ba.wg.Done()
|
||||
}()
|
||||
|
||||
ba.wg.Add(1)
|
||||
for {
|
||||
select {
|
||||
case evt := <-ba.events:
|
||||
@ -219,7 +223,9 @@ func (ba *basicAgent) pushForRetry(evt *Event) error {
|
||||
}
|
||||
|
||||
conn := ba.redisPool.Get()
|
||||
defer conn.Close()
|
||||
defer func() {
|
||||
_ = conn.Close()
|
||||
}()
|
||||
|
||||
key := rds.KeyHookEventRetryQueue(ba.namespace)
|
||||
args := make([]interface{}, 0)
|
||||
@ -242,8 +248,6 @@ func (ba *basicAgent) loopRetry() {
|
||||
ba.wg.Done()
|
||||
}()
|
||||
|
||||
ba.wg.Add(1)
|
||||
|
||||
token := make(chan bool, 1)
|
||||
token <- true
|
||||
|
||||
@ -262,7 +266,7 @@ func (ba *basicAgent) loopRetry() {
|
||||
case <-time.After(waitInterval):
|
||||
// Just wait, do nothing
|
||||
case <-ba.context.Done():
|
||||
/// Terminated
|
||||
// Terminated
|
||||
return
|
||||
}
|
||||
}
|
||||
@ -300,7 +304,9 @@ func (ba *basicAgent) reSend() error {
|
||||
|
||||
func (ba *basicAgent) popMinOne() (*Event, error) {
|
||||
conn := ba.redisPool.Get()
|
||||
defer conn.Close()
|
||||
defer func() {
|
||||
_ = conn.Close()
|
||||
}()
|
||||
|
||||
key := rds.KeyHookEventRetryQueue(ba.namespace)
|
||||
minOne, err := rds.ZPopMin(conn, key)
|
||||
|
@ -70,9 +70,11 @@ func (suite *HookAgentTestSuite) SetupSuite() {
|
||||
// TearDownSuite prepares test suites
|
||||
func (suite *HookAgentTestSuite) TearDownSuite() {
|
||||
conn := suite.pool.Get()
|
||||
defer conn.Close()
|
||||
defer func() {
|
||||
_ = conn.Close()
|
||||
}()
|
||||
|
||||
tests.ClearAll(suite.namespace, conn)
|
||||
_ = tests.ClearAll(suite.namespace, conn)
|
||||
}
|
||||
|
||||
// TestEventSending ...
|
||||
@ -90,7 +92,7 @@ func (suite *HookAgentTestSuite) TestEventSending() {
|
||||
done <- true
|
||||
}
|
||||
}()
|
||||
fmt.Fprintln(w, "ok")
|
||||
_, _ = fmt.Fprintln(w, "ok")
|
||||
}))
|
||||
defer ts.Close()
|
||||
|
||||
@ -102,7 +104,8 @@ func (suite *HookAgentTestSuite) TestEventSending() {
|
||||
|
||||
agent := NewAgent(suite.envContext, suite.namespace, suite.pool)
|
||||
agent.Attach(suite.lcmCtl)
|
||||
agent.Serve()
|
||||
err := agent.Serve()
|
||||
require.NoError(suite.T(), err, "agent serve: nil error expected but got %s", err)
|
||||
|
||||
go func() {
|
||||
defer func() {
|
||||
@ -169,7 +172,9 @@ func (suite *HookAgentTestSuite) TestRetryAndPopMin() {
|
||||
|
||||
// Mock job stats
|
||||
conn := suite.pool.Get()
|
||||
defer conn.Close()
|
||||
defer func() {
|
||||
_ = conn.Close()
|
||||
}()
|
||||
|
||||
key := rds.KeyJobStats(suite.namespace, "fake_job_ID")
|
||||
_, err := conn.Do("HSET", key, "status", job.SuccessStatus.String())
|
||||
|
@ -44,6 +44,7 @@ type Client interface {
|
||||
// Client is used to post the related data to the interested parties.
|
||||
type basicClient struct {
|
||||
client *http.Client
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
// NewClient return the ptr of the new hook client
|
||||
@ -81,6 +82,7 @@ func NewClient(ctx context.Context) Client {
|
||||
|
||||
return &basicClient{
|
||||
client: client,
|
||||
ctx: ctx,
|
||||
}
|
||||
}
|
||||
|
||||
@ -112,7 +114,9 @@ func (bc *basicClient) SendEvent(evt *Event) error {
|
||||
return err
|
||||
}
|
||||
|
||||
defer res.Body.Close() // close connection for reuse
|
||||
defer func() {
|
||||
_ = res.Body.Close()
|
||||
}() // close connection for reuse
|
||||
|
||||
// Should be 200
|
||||
if res.StatusCode != http.StatusOK {
|
||||
|
@ -45,14 +45,14 @@ func (suite *HookClientTestSuite) SetupSuite() {
|
||||
return
|
||||
}
|
||||
|
||||
m := &Event{}
|
||||
err = json.Unmarshal(bytes, m)
|
||||
change := &job.StatusChange{}
|
||||
err = json.Unmarshal(bytes, change)
|
||||
if err != nil {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
if m.Data.JobID == "job_ID_failed" {
|
||||
if change.JobID == "job_ID_failed" {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
@ -93,7 +93,7 @@ func (c *Context) Init() error {
|
||||
// Build implements the same method in env.JobContext interface
|
||||
// This func will build the job execution context before running
|
||||
func (c *Context) Build(tracker job.Tracker) (job.Context, error) {
|
||||
if tracker == nil {
|
||||
if tracker == nil || tracker.Job() == nil {
|
||||
return nil, errors.New("nil job tracker")
|
||||
}
|
||||
|
||||
@ -101,6 +101,7 @@ func (c *Context) Build(tracker job.Tracker) (job.Context, error) {
|
||||
sysContext: c.sysContext,
|
||||
cfgMgr: c.cfgMgr,
|
||||
properties: make(map[string]interface{}),
|
||||
tracker: tracker,
|
||||
}
|
||||
|
||||
// Copy properties
|
||||
@ -112,16 +113,17 @@ func (c *Context) Build(tracker job.Tracker) (job.Context, error) {
|
||||
|
||||
// Refresh config properties
|
||||
err := c.cfgMgr.Load()
|
||||
props := c.cfgMgr.GetAll()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
props := c.cfgMgr.GetAll()
|
||||
for k, v := range props {
|
||||
jContext.properties[k] = v
|
||||
}
|
||||
|
||||
// Set loggers for job
|
||||
lg, err := createLoggers(c.tracker.Job().Info.JobID)
|
||||
lg, err := createLoggers(tracker.Job().Info.JobID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -176,20 +178,20 @@ func createLoggers(jobID string) (logger.Interface, error) {
|
||||
lOptions := make([]logger.Option, 0)
|
||||
for _, lc := range config.DefaultConfig.JobLoggerConfigs {
|
||||
// For running job, the depth should be 5
|
||||
if lc.Name == logger.LoggerNameFile || lc.Name == logger.LoggerNameStdOutput || lc.Name == logger.LoggerNameDB {
|
||||
if lc.Name == logger.NameFile || lc.Name == logger.NameStdOutput || lc.Name == logger.NameDB {
|
||||
if lc.Settings == nil {
|
||||
lc.Settings = map[string]interface{}{}
|
||||
}
|
||||
lc.Settings["depth"] = 5
|
||||
}
|
||||
if lc.Name == logger.LoggerNameFile || lc.Name == logger.LoggerNameDB {
|
||||
if lc.Name == logger.NameFile || lc.Name == logger.NameDB {
|
||||
// Need extra param
|
||||
fSettings := map[string]interface{}{}
|
||||
for k, v := range lc.Settings {
|
||||
// Copy settings
|
||||
fSettings[k] = v
|
||||
}
|
||||
if lc.Name == logger.LoggerNameFile {
|
||||
if lc.Name == logger.NameFile {
|
||||
// Append file name param
|
||||
fSettings["filename"] = fmt.Sprintf("%s.log", jobID)
|
||||
lOptions = append(lOptions, logger.BackendOption(lc.Name, lc.Level, fSettings))
|
||||
|
126
src/jobservice/job/impl/context_test.go
Normal file
126
src/jobservice/job/impl/context_test.go
Normal file
@ -0,0 +1,126 @@
|
||||
// Copyright Project Harbor Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package impl
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
comcfg "github.com/goharbor/harbor/src/common/config"
|
||||
"github.com/goharbor/harbor/src/jobservice/common/utils"
|
||||
"github.com/goharbor/harbor/src/jobservice/config"
|
||||
"github.com/goharbor/harbor/src/jobservice/job"
|
||||
"github.com/goharbor/harbor/src/jobservice/tests"
|
||||
"github.com/gomodule/redigo/redis"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/stretchr/testify/suite"
|
||||
)
|
||||
|
||||
// ContextImplTestSuite tests functions of context impl
|
||||
type ContextImplTestSuite struct {
|
||||
suite.Suite
|
||||
|
||||
tracker job.Tracker
|
||||
namespace string
|
||||
pool *redis.Pool
|
||||
jobID string
|
||||
}
|
||||
|
||||
// TestContextImplTestSuite is entry of go test
|
||||
func TestContextImplTestSuite(t *testing.T) {
|
||||
suite.Run(t, new(ContextImplTestSuite))
|
||||
}
|
||||
|
||||
// SetupSuite prepares test suite
|
||||
func (suite *ContextImplTestSuite) SetupSuite() {
|
||||
config.DefaultConfig.JobLoggerConfigs = []*config.LoggerConfig{
|
||||
{
|
||||
Name: "STD_OUTPUT",
|
||||
Level: "DEBUG",
|
||||
},
|
||||
{
|
||||
Name: "FILE",
|
||||
Level: "INFO",
|
||||
Settings: map[string]interface{}{
|
||||
"base_dir": os.TempDir(),
|
||||
},
|
||||
Sweeper: &config.LogSweeperConfig{
|
||||
Duration: 1,
|
||||
Settings: map[string]interface{}{
|
||||
"work_dir": os.TempDir(),
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
suite.namespace = tests.GiveMeTestNamespace()
|
||||
suite.pool = tests.GiveMeRedisPool()
|
||||
|
||||
suite.jobID = utils.MakeIdentifier()
|
||||
mockStats := &job.Stats{
|
||||
Info: &job.StatsInfo{
|
||||
JobID: suite.jobID,
|
||||
JobKind: job.KindGeneric,
|
||||
JobName: job.SampleJob,
|
||||
Status: job.PendingStatus.String(),
|
||||
IsUnique: false,
|
||||
},
|
||||
}
|
||||
|
||||
suite.tracker = job.NewBasicTrackerWithStats(
|
||||
context.Background(),
|
||||
mockStats,
|
||||
suite.namespace,
|
||||
suite.pool,
|
||||
nil,
|
||||
)
|
||||
|
||||
err := suite.tracker.Save()
|
||||
require.NoError(suite.T(), err, "job stats: nil error expected but got %s", err)
|
||||
}
|
||||
|
||||
// SetupSuite clears test suite
|
||||
func (suite *ContextImplTestSuite) TearDownSuite() {
|
||||
conn := suite.pool.Get()
|
||||
defer func() {
|
||||
_ = conn.Close()
|
||||
}()
|
||||
|
||||
_ = tests.ClearAll(suite.namespace, conn)
|
||||
}
|
||||
|
||||
// TestContextImpl tests the context impl
|
||||
func (suite *ContextImplTestSuite) TestContextImpl() {
|
||||
cfgMem := comcfg.NewInMemoryManager()
|
||||
cfgMem.Set("read_only", "true")
|
||||
ctx := NewContext(context.Background(), cfgMem)
|
||||
jCtx, err := ctx.Build(suite.tracker)
|
||||
|
||||
require.NoErrorf(suite.T(), err, "build job context: nil error expected but got %s", err)
|
||||
v, ok := jCtx.Get("read_only")
|
||||
assert.Equal(suite.T(), true, ok)
|
||||
assert.Equal(suite.T(), v, v.(bool))
|
||||
|
||||
err = jCtx.Checkin("check in testing")
|
||||
assert.NoErrorf(suite.T(), err, "check in: nil error expected but got %s", err)
|
||||
|
||||
l := jCtx.GetLogger()
|
||||
assert.NotNil(suite.T(), l, "logger should be not nil")
|
||||
|
||||
_, ok = jCtx.OPCommand()
|
||||
assert.Equal(suite.T(), false, ok)
|
||||
}
|
@ -166,11 +166,13 @@ func (bt *basicTracker) Job() *Stats {
|
||||
// Update the properties of the job stats
|
||||
func (bt *basicTracker) Update(fieldAndValues ...interface{}) error {
|
||||
if len(fieldAndValues) == 0 {
|
||||
errors.New("no properties specified to update")
|
||||
return errors.New("no properties specified to update")
|
||||
}
|
||||
|
||||
conn := bt.pool.Get()
|
||||
defer conn.Close()
|
||||
defer func() {
|
||||
_ = conn.Close()
|
||||
}()
|
||||
|
||||
key := rds.KeyJobStats(bt.namespace, bt.jobID)
|
||||
args := []interface{}{"update_time", time.Now().Unix()} // update timestamp
|
||||
@ -183,7 +185,9 @@ func (bt *basicTracker) Update(fieldAndValues ...interface{}) error {
|
||||
func (bt *basicTracker) Status() (Status, error) {
|
||||
// Retrieve the latest status again in case get the outdated one.
|
||||
conn := bt.pool.Get()
|
||||
defer conn.Close()
|
||||
defer func() {
|
||||
_ = conn.Close()
|
||||
}()
|
||||
|
||||
rootKey := rds.KeyJobStats(bt.namespace, bt.jobID)
|
||||
return getStatus(conn, rootKey)
|
||||
@ -207,7 +211,9 @@ func (bt *basicTracker) PeriodicExecutionDone() error {
|
||||
key := rds.KeyUpstreamJobAndExecutions(bt.namespace, bt.jobStats.Info.UpstreamJobID)
|
||||
|
||||
conn := bt.pool.Get()
|
||||
defer conn.Close()
|
||||
defer func() {
|
||||
_ = conn.Close()
|
||||
}()
|
||||
|
||||
args := []interface{}{key, "XX", -1, bt.jobID}
|
||||
_, err := conn.Do("ZADD", args...)
|
||||
@ -242,7 +248,9 @@ func (bt *basicTracker) Executions(q *query.Parameter) ([]string, int64, error)
|
||||
}
|
||||
|
||||
conn := bt.pool.Get()
|
||||
defer conn.Close()
|
||||
defer func() {
|
||||
_ = conn.Close()
|
||||
}()
|
||||
|
||||
key := rds.KeyUpstreamJobAndExecutions(bt.namespace, bt.jobID)
|
||||
|
||||
@ -295,7 +303,9 @@ func (bt *basicTracker) Executions(q *query.Parameter) ([]string, int64, error)
|
||||
// Expire job stats
|
||||
func (bt *basicTracker) Expire() error {
|
||||
conn := bt.pool.Get()
|
||||
defer conn.Close()
|
||||
defer func() {
|
||||
_ = conn.Close()
|
||||
}()
|
||||
|
||||
key := rds.KeyJobStats(bt.namespace, bt.jobID)
|
||||
num, err := conn.Do("EXPIRE", key, statDataExpireTime)
|
||||
@ -372,11 +382,13 @@ func (bt *basicTracker) Succeed() error {
|
||||
// Save the stats of job tracked by this tracker
|
||||
func (bt *basicTracker) Save() (err error) {
|
||||
if bt.jobStats == nil {
|
||||
errors.New("nil job stats to save")
|
||||
return errors.New("nil job stats to save")
|
||||
}
|
||||
|
||||
conn := bt.pool.Get()
|
||||
defer conn.Close()
|
||||
defer func() {
|
||||
_ = conn.Close()
|
||||
}()
|
||||
|
||||
// Alliance
|
||||
stats := bt.jobStats
|
||||
@ -526,7 +538,9 @@ func (bt *basicTracker) pushToQueueForRetry(targetStatus Status) error {
|
||||
}
|
||||
|
||||
conn := bt.pool.Get()
|
||||
defer conn.Close()
|
||||
defer func() {
|
||||
_ = conn.Close()
|
||||
}()
|
||||
|
||||
key := rds.KeyStatusUpdateRetryQueue(bt.namespace)
|
||||
args := []interface{}{key, "NX", time.Now().Unix(), rawJSON}
|
||||
@ -557,7 +571,9 @@ func (bt *basicTracker) retryUpdateStatus(targetStatus Status) {
|
||||
|
||||
func (bt *basicTracker) compareAndSet(targetStatus Status) error {
|
||||
conn := bt.pool.Get()
|
||||
defer conn.Close()
|
||||
defer func() {
|
||||
_ = conn.Close()
|
||||
}()
|
||||
|
||||
rootKey := rds.KeyJobStats(bt.namespace, bt.jobID)
|
||||
|
||||
@ -581,7 +597,9 @@ func (bt *basicTracker) compareAndSet(targetStatus Status) error {
|
||||
// retrieve the stats of job tracked by this tracker from the backend data
|
||||
func (bt *basicTracker) retrieve() error {
|
||||
conn := bt.pool.Get()
|
||||
defer conn.Close()
|
||||
defer func() {
|
||||
_ = conn.Close()
|
||||
}()
|
||||
|
||||
key := rds.KeyJobStats(bt.namespace, bt.jobID)
|
||||
vals, err := redis.Strings(conn.Do("HGETALL", key))
|
||||
|
@ -16,20 +16,23 @@ package job
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/goharbor/harbor/src/jobservice/common/query"
|
||||
|
||||
"github.com/goharbor/harbor/src/jobservice/common/utils"
|
||||
"github.com/goharbor/harbor/src/jobservice/tests"
|
||||
"github.com/gomodule/redigo/redis"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/stretchr/testify/suite"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// TrackerTestSuite tests functions of tracker
|
||||
type TrackerTestSuite struct {
|
||||
suite.Suite
|
||||
|
||||
tracker *basicTracker
|
||||
namespace string
|
||||
pool *redis.Pool
|
||||
}
|
||||
@ -45,22 +48,14 @@ func (suite *TrackerTestSuite) SetupSuite() {
|
||||
suite.pool = tests.GiveMeRedisPool()
|
||||
}
|
||||
|
||||
// SetupTest prepares for test cases
|
||||
func (suite *TrackerTestSuite) SetupTest() {
|
||||
suite.tracker = &basicTracker{
|
||||
namespace: suite.namespace,
|
||||
context: context.Background(),
|
||||
pool: suite.pool,
|
||||
callback: func(hookURL string, change *StatusChange) error { return nil },
|
||||
}
|
||||
}
|
||||
|
||||
// TearDownSuite prepares test suites
|
||||
func (suite *TrackerTestSuite) TearDownSuite() {
|
||||
conn := suite.pool.Get()
|
||||
defer conn.Close()
|
||||
defer func() {
|
||||
_ = conn.Close()
|
||||
}()
|
||||
|
||||
tests.ClearAll(suite.namespace, conn)
|
||||
_ = tests.ClearAll(suite.namespace, conn)
|
||||
}
|
||||
|
||||
// TestTracker tests tracker
|
||||
@ -76,41 +71,148 @@ func (suite *TrackerTestSuite) TestTracker() {
|
||||
},
|
||||
}
|
||||
|
||||
suite.tracker.jobStats = mockJobStats
|
||||
suite.tracker.jobID = jobID
|
||||
tracker := NewBasicTrackerWithStats(
|
||||
context.TODO(),
|
||||
mockJobStats,
|
||||
suite.namespace,
|
||||
suite.pool,
|
||||
func(hookURL string, change *StatusChange) error {
|
||||
return nil
|
||||
},
|
||||
)
|
||||
|
||||
err := suite.tracker.Save()
|
||||
err := tracker.Save()
|
||||
require.Nil(suite.T(), err, "save: nil error expected but got %s", err)
|
||||
|
||||
s, err := suite.tracker.Status()
|
||||
s, err := tracker.Status()
|
||||
assert.Nil(suite.T(), err, "get status: nil error expected but got %s", err)
|
||||
assert.Equal(suite.T(), SuccessStatus, s, "get status: expected pending but got %s", s)
|
||||
|
||||
j := suite.tracker.Job()
|
||||
j := tracker.Job()
|
||||
assert.Equal(suite.T(), jobID, j.Info.JobID, "job: expect job ID %s but got %s", jobID, j.Info.JobID)
|
||||
|
||||
err = suite.tracker.Update("web_hook_url", "http://hook.url")
|
||||
err = tracker.Update("web_hook_url", "http://hook.url")
|
||||
assert.Nil(suite.T(), err, "update: nil error expected but got %s", err)
|
||||
|
||||
err = suite.tracker.Load()
|
||||
err = tracker.Load()
|
||||
assert.Nil(suite.T(), err, "load: nil error expected but got %s", err)
|
||||
assert.Equal(
|
||||
suite.T(),
|
||||
"http://hook.url",
|
||||
suite.tracker.jobStats.Info.WebHookURL,
|
||||
tracker.Job().Info.WebHookURL,
|
||||
"web hook: expect %s but got %s",
|
||||
"http://hook.url",
|
||||
suite.tracker.jobStats.Info.WebHookURL,
|
||||
tracker.Job().Info.WebHookURL,
|
||||
)
|
||||
|
||||
err = suite.tracker.Run()
|
||||
err = tracker.Run()
|
||||
assert.Error(suite.T(), err, "run: non nil error expected but got nil")
|
||||
err = suite.tracker.CheckIn("check in")
|
||||
err = tracker.CheckIn("check in")
|
||||
assert.Nil(suite.T(), err, "check in: nil error expected but got %s", err)
|
||||
err = suite.tracker.Succeed()
|
||||
err = tracker.Succeed()
|
||||
assert.Nil(suite.T(), err, "succeed: nil error expected but got %s", err)
|
||||
err = suite.tracker.Stop()
|
||||
err = tracker.Stop()
|
||||
assert.Nil(suite.T(), err, "stop: nil error expected but got %s", err)
|
||||
err = suite.tracker.Fail()
|
||||
err = tracker.Fail()
|
||||
assert.Nil(suite.T(), err, "fail: nil error expected but got %s", err)
|
||||
|
||||
t := NewBasicTrackerWithID(
|
||||
context.TODO(),
|
||||
jobID,
|
||||
suite.namespace,
|
||||
suite.pool,
|
||||
func(hookURL string, change *StatusChange) error {
|
||||
return nil
|
||||
},
|
||||
)
|
||||
err = t.Load()
|
||||
assert.NoError(suite.T(), err)
|
||||
|
||||
err = t.Expire()
|
||||
assert.NoError(suite.T(), err)
|
||||
}
|
||||
|
||||
// TestPeriodicTracker tests tracker of periodic
|
||||
func (suite *TrackerTestSuite) TestPeriodicTracker() {
|
||||
jobID := utils.MakeIdentifier()
|
||||
nID := time.Now().Unix()
|
||||
mockJobStats := &Stats{
|
||||
Info: &StatsInfo{
|
||||
JobID: jobID,
|
||||
Status: ScheduledStatus.String(),
|
||||
JobKind: KindPeriodic,
|
||||
JobName: SampleJob,
|
||||
IsUnique: false,
|
||||
CronSpec: "0 0 * * * *",
|
||||
NumericPID: nID,
|
||||
},
|
||||
}
|
||||
|
||||
t := NewBasicTrackerWithStats(context.TODO(), mockJobStats, suite.namespace, suite.pool, nil)
|
||||
err := t.Save()
|
||||
require.NoError(suite.T(), err)
|
||||
|
||||
executionID := utils.MakeIdentifier()
|
||||
runAt := time.Now().Add(1 * time.Hour).Unix()
|
||||
executionStats := &Stats{
|
||||
Info: &StatsInfo{
|
||||
JobID: executionID,
|
||||
Status: ScheduledStatus.String(),
|
||||
JobKind: KindScheduled,
|
||||
JobName: SampleJob,
|
||||
IsUnique: false,
|
||||
CronSpec: "0 0 * * * *",
|
||||
RunAt: runAt,
|
||||
EnqueueTime: runAt,
|
||||
UpstreamJobID: jobID,
|
||||
},
|
||||
}
|
||||
|
||||
t2 := NewBasicTrackerWithStats(context.TODO(), executionStats, suite.namespace, suite.pool, nil)
|
||||
err = t2.Save()
|
||||
require.NoError(suite.T(), err)
|
||||
|
||||
id, err := t.NumericID()
|
||||
require.NoError(suite.T(), err)
|
||||
assert.Equal(suite.T(), nID, id)
|
||||
|
||||
_, total, err := t.Executions(&query.Parameter{
|
||||
PageNumber: 1,
|
||||
PageSize: 10,
|
||||
Extras: make(query.ExtraParameters),
|
||||
})
|
||||
require.NoError(suite.T(), err)
|
||||
assert.Equal(suite.T(), int64(1), total)
|
||||
|
||||
err = t2.PeriodicExecutionDone()
|
||||
require.NoError(suite.T(), err)
|
||||
}
|
||||
|
||||
// TestPushForRetry tests push for retry
|
||||
func (suite *TrackerTestSuite) TestPushForRetry() {
|
||||
ID := utils.MakeIdentifier()
|
||||
runAt := time.Now().Add(1 * time.Hour).Unix()
|
||||
jobStats := &Stats{
|
||||
Info: &StatsInfo{
|
||||
JobID: ID,
|
||||
Status: ScheduledStatus.String(),
|
||||
JobKind: KindScheduled,
|
||||
JobName: SampleJob,
|
||||
IsUnique: false,
|
||||
RunAt: runAt,
|
||||
EnqueueTime: runAt,
|
||||
},
|
||||
}
|
||||
|
||||
t := &basicTracker{
|
||||
namespace: suite.namespace,
|
||||
context: context.TODO(),
|
||||
pool: suite.pool,
|
||||
jobID: ID,
|
||||
jobStats: jobStats,
|
||||
callback: nil,
|
||||
}
|
||||
|
||||
err := t.pushToQueueForRetry(RunningStatus)
|
||||
require.NoError(suite.T(), err)
|
||||
}
|
||||
|
@ -68,7 +68,9 @@ func NewController(ctx *env.Context, ns string, pool *redis.Pool, callback job.H
|
||||
|
||||
// Serve ...
|
||||
func (bc *basicController) Serve() error {
|
||||
bc.wg.Add(1)
|
||||
go bc.loopForRestoreDeadStatus()
|
||||
|
||||
logger.Info("Status restoring loop is started")
|
||||
|
||||
return nil
|
||||
@ -112,7 +114,6 @@ func (bc *basicController) loopForRestoreDeadStatus() {
|
||||
token := make(chan bool, 1)
|
||||
token <- true
|
||||
|
||||
bc.wg.Add(1)
|
||||
for {
|
||||
<-token
|
||||
|
||||
@ -157,7 +158,9 @@ func (bc *basicController) restoreDeadStatus() error {
|
||||
// popOneDead retrieves one dead status from the backend Q from lowest to highest
|
||||
func (bc *basicController) popOneDead() (*job.SimpleStatusChange, error) {
|
||||
conn := bc.pool.Get()
|
||||
defer conn.Close()
|
||||
defer func() {
|
||||
_ = conn.Close()
|
||||
}()
|
||||
|
||||
key := rds.KeyStatusUpdateRetryQueue(bc.namespace)
|
||||
v, err := rds.ZPopMin(conn, key)
|
||||
|
@ -48,11 +48,11 @@ func (suite *LcmControllerTestSuite) SetupSuite() {
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
suite.cancel = cancel
|
||||
context := &env.Context{
|
||||
envCtx := &env.Context{
|
||||
SystemContext: ctx,
|
||||
WG: new(sync.WaitGroup),
|
||||
}
|
||||
suite.ctl = NewController(context, suite.namespace, suite.pool, func(hookURL string, change *job.StatusChange) error { return nil })
|
||||
suite.ctl = NewController(envCtx, suite.namespace, suite.pool, func(hookURL string, change *job.StatusChange) error { return nil })
|
||||
}
|
||||
|
||||
// TearDownSuite clears test suite
|
||||
@ -82,7 +82,9 @@ func (suite *LcmControllerTestSuite) TestServe() {
|
||||
suite.newsStats(jobID)
|
||||
|
||||
conn := suite.pool.Get()
|
||||
defer conn.Close()
|
||||
defer func() {
|
||||
_ = conn.Close()
|
||||
}()
|
||||
simpleChange := &job.SimpleStatusChange{
|
||||
JobID: jobID,
|
||||
TargetStatus: job.RunningStatus.String(),
|
||||
@ -94,7 +96,8 @@ func (suite *LcmControllerTestSuite) TestServe() {
|
||||
_, err = conn.Do("ZADD", args...)
|
||||
require.Nil(suite.T(), err, "prepare mock data: nil error expected but got %s", err)
|
||||
|
||||
suite.ctl.Serve()
|
||||
err = suite.ctl.Serve()
|
||||
require.NoError(suite.T(), err, "lcm: nil error expected but got %s", err)
|
||||
<-time.After(1 * time.Second)
|
||||
|
||||
count, err := redis.Int(conn.Do("ZCARD", key))
|
||||
|
@ -49,14 +49,15 @@ func TestDBLogger(t *testing.T) {
|
||||
l.Warningf("JobLog Warningf: %s", "TestDBLogger")
|
||||
l.Errorf("JobLog Errorf: %s", "TestDBLogger")
|
||||
|
||||
l.Close()
|
||||
_ = l.Close()
|
||||
|
||||
dbGetter := getter.NewDBGetter()
|
||||
ll, err := dbGetter.Retrieve(uuid)
|
||||
require.Nil(t, err)
|
||||
log.Infof("get logger %s", ll)
|
||||
|
||||
sweeper.PrepareDBSweep()
|
||||
err = sweeper.PrepareDBSweep()
|
||||
require.NoError(t, err)
|
||||
dbSweeper := sweeper.NewDBSweeper(-1)
|
||||
count, err := dbSweeper.Sweep()
|
||||
require.Nil(t, err)
|
||||
|
@ -38,12 +38,12 @@ func GetLogger(loggerOptions ...Option) (Interface, error) {
|
||||
|
||||
// No options specified, enable std as default
|
||||
if len(loggerOptions) == 0 {
|
||||
defaultOp := BackendOption(LoggerNameStdOutput, "", nil)
|
||||
defaultOp := BackendOption(NameStdOutput, "", nil)
|
||||
defaultOp.Apply(lOptions)
|
||||
}
|
||||
|
||||
// Create backends
|
||||
loggers := []Interface{}
|
||||
loggers := make([]Interface, 0)
|
||||
for name, ops := range lOptions.values {
|
||||
if !IsKnownLogger(name) {
|
||||
return nil, fmt.Errorf("no logger registered for name '%s'", name)
|
||||
@ -105,7 +105,7 @@ func GetSweeper(context context.Context, sweeperOptions ...Option) (sweeper.Inte
|
||||
op.Apply(sOptions)
|
||||
}
|
||||
|
||||
sweepers := []sweeper.Interface{}
|
||||
sweepers := make([]sweeper.Interface, 0)
|
||||
for name, ops := range sOptions.values {
|
||||
if !HasSweeper(name) {
|
||||
return nil, fmt.Errorf("no sweeper provided for the logger %s", name)
|
||||
@ -147,7 +147,7 @@ func GetLogDataGetter(loggerOptions ...Option) (getter.Interface, error) {
|
||||
}
|
||||
|
||||
// Iterate with specified order
|
||||
keys := []string{}
|
||||
keys := make([]string, 0)
|
||||
for k := range lOptions.values {
|
||||
keys = append(keys, k)
|
||||
}
|
||||
@ -175,14 +175,14 @@ func GetLogDataGetter(loggerOptions ...Option) (getter.Interface, error) {
|
||||
// Init the loggers and sweepers
|
||||
func Init(ctx context.Context) error {
|
||||
// For loggers
|
||||
options := []Option{}
|
||||
options := make([]Option, 0)
|
||||
// For sweepers
|
||||
sOptions := []Option{}
|
||||
sOptions := make([]Option, 0)
|
||||
|
||||
for _, lc := range config.DefaultConfig.LoggerConfigs {
|
||||
// Inject logger depth here for FILE and STD logger to avoid configuring it in the yaml
|
||||
// For logger of job service itself, the depth should be 6
|
||||
if lc.Name == LoggerNameFile || lc.Name == LoggerNameStdOutput {
|
||||
if lc.Name == NameFile || lc.Name == NameStdOutput {
|
||||
if lc.Settings == nil {
|
||||
lc.Settings = map[string]interface{}{}
|
||||
}
|
||||
@ -202,7 +202,7 @@ func Init(ctx context.Context) error {
|
||||
// Avoid data race issue
|
||||
singletons.Store(systemKeyServiceLogger, lg)
|
||||
|
||||
jOptions := []Option{}
|
||||
jOptions := make([]Option, 0)
|
||||
// Append configured sweepers in job loggers if existing
|
||||
for _, lc := range config.DefaultConfig.JobLoggerConfigs {
|
||||
jOptions = append(jOptions, BackendOption(lc.Name, lc.Level, lc.Settings))
|
||||
@ -224,12 +224,12 @@ func Init(ctx context.Context) error {
|
||||
// If sweepers configured
|
||||
if len(sOptions) > 0 {
|
||||
// Get the sweeper controller
|
||||
sweeper, err := GetSweeper(ctx, sOptions...)
|
||||
swp, err := GetSweeper(ctx, sOptions...)
|
||||
if err != nil {
|
||||
return fmt.Errorf("create logger sweeper error: %s", err)
|
||||
}
|
||||
// Start sweep loop
|
||||
_, err = sweeper.Sweep()
|
||||
_, err = swp.Sweep()
|
||||
if err != nil {
|
||||
return fmt.Errorf("start logger sweeper error: %s", err)
|
||||
}
|
||||
|
@ -74,7 +74,7 @@ func TestGetLoggersMulti(t *testing.T) {
|
||||
}
|
||||
}()
|
||||
|
||||
ops := []Option{}
|
||||
ops := make([]Option, 0)
|
||||
ops = append(
|
||||
ops,
|
||||
BackendOption("STD_OUTPUT", "DEBUG", nil),
|
||||
|
@ -17,7 +17,7 @@ func TestFileFactory(t *testing.T) {
|
||||
require.Nil(t, err)
|
||||
|
||||
if closer, ok := ff.(Closer); ok {
|
||||
closer.Close()
|
||||
_ = closer.Close()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -39,14 +39,16 @@ func TestDBGetter(t *testing.T) {
|
||||
require.Nil(t, err)
|
||||
|
||||
l.Debug("JobLog Debug: TestDBLoggerGetter")
|
||||
l.Close()
|
||||
err = l.Close()
|
||||
require.NoError(t, err)
|
||||
|
||||
dbGetter := NewDBGetter()
|
||||
ll, err := dbGetter.Retrieve(uuid)
|
||||
require.Nil(t, err)
|
||||
log.Infof("get logger %s", ll)
|
||||
|
||||
sweeper.PrepareDBSweep()
|
||||
err = sweeper.PrepareDBSweep()
|
||||
require.NoError(t, err)
|
||||
dbSweeper := sweeper.NewDBSweeper(-1)
|
||||
count, err := dbSweeper.Sweep()
|
||||
require.Nil(t, err)
|
||||
@ -60,7 +62,8 @@ func TestDBGetterError(t *testing.T) {
|
||||
require.Nil(t, err)
|
||||
|
||||
l.Debug("JobLog Debug: TestDBLoggerGetter")
|
||||
l.Close()
|
||||
err = l.Close()
|
||||
require.NoError(t, err)
|
||||
|
||||
dbGetter := NewDBGetter()
|
||||
_, err = dbGetter.Retrieve("")
|
||||
@ -68,7 +71,8 @@ func TestDBGetterError(t *testing.T) {
|
||||
_, err = dbGetter.Retrieve("not_exist_uuid")
|
||||
require.NotNil(t, err)
|
||||
|
||||
sweeper.PrepareDBSweep()
|
||||
err = sweeper.PrepareDBSweep()
|
||||
require.NoError(t, err)
|
||||
dbSweeper := sweeper.NewDBSweeper(-1)
|
||||
count, err := dbSweeper.Sweep()
|
||||
require.Nil(t, err)
|
||||
|
@ -7,12 +7,12 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
// LoggerNameFile is unique name of the file logger.
|
||||
LoggerNameFile = "FILE"
|
||||
// LoggerNameStdOutput is the unique name of the std logger.
|
||||
LoggerNameStdOutput = "STD_OUTPUT"
|
||||
// LoggerNameDB is the unique name of the DB logger.
|
||||
LoggerNameDB = "DB"
|
||||
// NameFile is unique name of the file logger.
|
||||
NameFile = "FILE"
|
||||
// NameStdOutput is the unique name of the std logger.
|
||||
NameStdOutput = "STD_OUTPUT"
|
||||
// NameDB is the unique name of the DB logger.
|
||||
NameDB = "DB"
|
||||
)
|
||||
|
||||
// Declaration is used to declare a supported logger.
|
||||
@ -31,11 +31,11 @@ type Declaration struct {
|
||||
// log info.
|
||||
var knownLoggers = map[string]*Declaration{
|
||||
// File logger
|
||||
LoggerNameFile: {FileFactory, FileSweeperFactory, FileGetterFactory, false},
|
||||
NameFile: {FileFactory, FileSweeperFactory, FileGetterFactory, false},
|
||||
// STD output(both stdout and stderr) logger
|
||||
LoggerNameStdOutput: {StdFactory, nil, nil, true},
|
||||
NameStdOutput: {StdFactory, nil, nil, true},
|
||||
// DB logger
|
||||
LoggerNameDB: {DBFactory, DBSweeperFactory, DBGetterFactory, false},
|
||||
NameDB: {DBFactory, DBSweeperFactory, DBGetterFactory, false},
|
||||
}
|
||||
|
||||
// IsKnownLogger checks if the logger is supported with name.
|
||||
@ -97,11 +97,11 @@ func GetLoggerName(l Interface) string {
|
||||
|
||||
switch l.(type) {
|
||||
case *backend.DBLogger:
|
||||
name = LoggerNameDB
|
||||
name = NameDB
|
||||
case *backend.StdOutputLogger:
|
||||
name = LoggerNameStdOutput
|
||||
name = NameStdOutput
|
||||
case *backend.FileLogger:
|
||||
name = LoggerNameFile
|
||||
name = NameFile
|
||||
default:
|
||||
name = reflect.TypeOf(l).String()
|
||||
}
|
||||
|
@ -13,28 +13,28 @@ func TestKnownLoggers(t *testing.T) {
|
||||
b := IsKnownLogger("Unknown")
|
||||
require.False(t, b)
|
||||
|
||||
b = IsKnownLogger(LoggerNameFile)
|
||||
b = IsKnownLogger(NameFile)
|
||||
require.True(t, b)
|
||||
|
||||
// no getter
|
||||
b = HasGetter(LoggerNameStdOutput)
|
||||
b = HasGetter(NameStdOutput)
|
||||
require.False(t, b)
|
||||
// has getter
|
||||
b = HasGetter(LoggerNameDB)
|
||||
b = HasGetter(NameDB)
|
||||
require.True(t, b)
|
||||
|
||||
// no sweeper
|
||||
b = HasSweeper(LoggerNameStdOutput)
|
||||
b = HasSweeper(NameStdOutput)
|
||||
require.False(t, b)
|
||||
// has sweeper
|
||||
b = HasSweeper(LoggerNameDB)
|
||||
b = HasSweeper(NameDB)
|
||||
require.True(t, b)
|
||||
|
||||
// unknown logger
|
||||
l := KnownLoggers("unknown")
|
||||
require.Nil(t, l)
|
||||
// known logger
|
||||
l = KnownLoggers(LoggerNameDB)
|
||||
l = KnownLoggers(NameDB)
|
||||
require.NotNil(t, l)
|
||||
|
||||
// unknown level
|
||||
@ -52,14 +52,14 @@ func TestGetLoggerName(t *testing.T) {
|
||||
uuid := "uuid_for_unit_test"
|
||||
l, err := backend.NewDBLogger(uuid, "DEBUG", 4)
|
||||
require.Nil(t, err)
|
||||
require.Equal(t, LoggerNameDB, GetLoggerName(l))
|
||||
require.Equal(t, NameDB, GetLoggerName(l))
|
||||
|
||||
stdLog := backend.NewStdOutputLogger("DEBUG", backend.StdErr, 4)
|
||||
require.Equal(t, LoggerNameStdOutput, GetLoggerName(stdLog))
|
||||
require.Equal(t, NameStdOutput, GetLoggerName(stdLog))
|
||||
|
||||
fileLog, err := backend.NewFileLogger("DEBUG", path.Join(os.TempDir(), "TestFileLogger.log"), 4)
|
||||
require.Nil(t, err)
|
||||
require.Equal(t, LoggerNameFile, GetLoggerName(fileLog))
|
||||
require.Equal(t, NameFile, GetLoggerName(fileLog))
|
||||
|
||||
e := &Entry{}
|
||||
n := GetLoggerName(e)
|
||||
|
@ -15,9 +15,3 @@ func Retrieve(logID string) ([]byte, error) {
|
||||
|
||||
return val.(getter.Interface).Retrieve(logID)
|
||||
}
|
||||
|
||||
// HasLogGetterConfigured checks if a log data getter is there for using
|
||||
func HasLogGetterConfigured() bool {
|
||||
_, ok := singletons.Load(systemKeyLogDataGetter)
|
||||
return ok
|
||||
}
|
||||
|
50
src/jobservice/logger/log_data_handler_test.go
Normal file
50
src/jobservice/logger/log_data_handler_test.go
Normal file
@ -0,0 +1,50 @@
|
||||
// Copyright Project Harbor Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package logger
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/goharbor/harbor/src/jobservice/config"
|
||||
"github.com/stretchr/testify/require"
|
||||
"os"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestRetrieve(t *testing.T) {
|
||||
config.DefaultConfig.JobLoggerConfigs = []*config.LoggerConfig{
|
||||
{
|
||||
Name: "STD_OUTPUT",
|
||||
Level: "DEBUG",
|
||||
},
|
||||
{
|
||||
Name: "FILE",
|
||||
Level: "INFO",
|
||||
Settings: map[string]interface{}{
|
||||
"base_dir": os.TempDir(),
|
||||
},
|
||||
Sweeper: &config.LogSweeperConfig{
|
||||
Duration: 1,
|
||||
Settings: map[string]interface{}{
|
||||
"work_dir": os.TempDir(),
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
err := Init(context.TODO())
|
||||
require.NoError(t, err)
|
||||
_, err = Retrieve("no_id")
|
||||
require.Error(t, err)
|
||||
}
|
@ -38,9 +38,11 @@ func TestDBGetter(t *testing.T) {
|
||||
require.Nil(t, err)
|
||||
|
||||
l.Debug("JobLog Debug: TestDBLoggerSweeper")
|
||||
l.Close()
|
||||
err = l.Close()
|
||||
require.NoError(t, err)
|
||||
|
||||
PrepareDBSweep()
|
||||
err = PrepareDBSweep()
|
||||
require.NoError(t, err)
|
||||
dbSweeper := NewDBSweeper(-1)
|
||||
count, err := dbSweeper.Sweep()
|
||||
require.Nil(t, err)
|
||||
|
@ -18,6 +18,12 @@ import (
|
||||
"context"
|
||||
"flag"
|
||||
"fmt"
|
||||
"github.com/goharbor/harbor/src/common"
|
||||
comcfg "github.com/goharbor/harbor/src/common/config"
|
||||
"github.com/goharbor/harbor/src/jobservice/job"
|
||||
"github.com/goharbor/harbor/src/jobservice/job/impl"
|
||||
"github.com/pkg/errors"
|
||||
"os"
|
||||
|
||||
"github.com/goharbor/harbor/src/jobservice/common/utils"
|
||||
"github.com/goharbor/harbor/src/jobservice/config"
|
||||
@ -53,7 +59,7 @@ func main() {
|
||||
}
|
||||
|
||||
// Set job context initializer
|
||||
/*runtime.JobService.SetJobContextInitializer(func(ctx context.Context) (job.Context, error) {
|
||||
runtime.JobService.SetJobContextInitializer(func(ctx context.Context) (job.Context, error) {
|
||||
secret := config.GetAuthSecret()
|
||||
if utils.IsEmptyStr(secret) {
|
||||
return nil, errors.New("empty auth secret")
|
||||
@ -61,14 +67,14 @@ func main() {
|
||||
coreURL := os.Getenv("CORE_URL")
|
||||
configURL := coreURL + common.CoreConfigPath
|
||||
cfgMgr := comcfg.NewRESTCfgManager(configURL, secret)
|
||||
jobCtx := impl.NewContext(ctx.SystemContext, cfgMgr)
|
||||
jobCtx := impl.NewContext(ctx, cfgMgr)
|
||||
|
||||
if err := jobCtx.Init(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return jobCtx, nil
|
||||
})*/
|
||||
})
|
||||
|
||||
// Start
|
||||
if err := runtime.JobService.LoadAndRun(ctx, cancel); err != nil {
|
||||
|
@ -86,7 +86,9 @@ func (bs *basicScheduler) Schedule(p *Policy) (int64, error) {
|
||||
}
|
||||
|
||||
conn := bs.pool.Get()
|
||||
defer conn.Close()
|
||||
defer func() {
|
||||
_ = conn.Close()
|
||||
}()
|
||||
|
||||
// Do the 1st round of enqueuing
|
||||
bs.enqueuer.scheduleNextJobs(p, conn)
|
||||
@ -140,7 +142,9 @@ func (bs *basicScheduler) UnSchedule(policyID string) error {
|
||||
}
|
||||
|
||||
conn := bs.pool.Get()
|
||||
defer conn.Close()
|
||||
defer func() {
|
||||
_ = conn.Close()
|
||||
}()
|
||||
|
||||
// Get the un-scheduling policy object
|
||||
bytes, err := redis.Values(conn.Do("ZRANGEBYSCORE", rds.KeyPeriodicPolicy(bs.namespace), numericID, numericID))
|
||||
@ -241,7 +245,9 @@ func (bs *basicScheduler) UnSchedule(policyID string) error {
|
||||
// This is a try best action
|
||||
func (bs *basicScheduler) clearDirtyJobs() {
|
||||
conn := bs.pool.Get()
|
||||
defer conn.Close()
|
||||
defer func() {
|
||||
_ = conn.Close()
|
||||
}()
|
||||
|
||||
nowEpoch := time.Now().Unix()
|
||||
scope := nowEpoch - int64(enqueuerHorizon/time.Minute)*60
|
||||
|
@ -70,9 +70,11 @@ func (suite *BasicSchedulerTestSuite) TearDownSuite() {
|
||||
suite.cancel()
|
||||
|
||||
conn := suite.pool.Get()
|
||||
defer conn.Close()
|
||||
defer func() {
|
||||
_ = conn.Close()
|
||||
}()
|
||||
|
||||
tests.ClearAll(suite.namespace, conn)
|
||||
_ = tests.ClearAll(suite.namespace, conn)
|
||||
}
|
||||
|
||||
// TestSchedulerTestSuite is entry of go test
|
||||
@ -84,7 +86,7 @@ func TestSchedulerTestSuite(t *testing.T) {
|
||||
func (suite *BasicSchedulerTestSuite) TestScheduler() {
|
||||
go func() {
|
||||
<-time.After(1 * time.Second)
|
||||
suite.scheduler.Stop()
|
||||
_ = suite.scheduler.Stop()
|
||||
}()
|
||||
|
||||
go func() {
|
||||
@ -99,6 +101,9 @@ func (suite *BasicSchedulerTestSuite) TestScheduler() {
|
||||
// Prepare one
|
||||
now := time.Now()
|
||||
minute := now.Minute()
|
||||
if minute+2 >= 60 {
|
||||
minute = minute - 2
|
||||
}
|
||||
coreSpec := fmt.Sprintf("30,50 %d * * * *", minute+2)
|
||||
p := &Policy{
|
||||
ID: "fake_policy",
|
||||
|
@ -150,7 +150,9 @@ func (e *enqueuer) nextTurn(isHit bool, enqErr bool) time.Duration {
|
||||
|
||||
func (e *enqueuer) enqueue() {
|
||||
conn := e.pool.Get()
|
||||
defer conn.Close()
|
||||
defer func() {
|
||||
_ = conn.Close()
|
||||
}()
|
||||
|
||||
// Reset error track
|
||||
e.lastEnqueueErr = nil
|
||||
@ -267,7 +269,9 @@ func (e *enqueuer) createExecution(p *Policy, runAt int64) *job.Stats {
|
||||
|
||||
func (e *enqueuer) shouldEnqueue() bool {
|
||||
conn := e.pool.Get()
|
||||
defer conn.Close()
|
||||
defer func() {
|
||||
_ = conn.Close()
|
||||
}()
|
||||
|
||||
// Acquired a lock before doing checking
|
||||
// If failed, directly returns false.
|
||||
|
@ -75,9 +75,11 @@ func (suite *EnqueuerTestSuite) TearDownSuite() {
|
||||
suite.cancel()
|
||||
|
||||
conn := suite.pool.Get()
|
||||
defer conn.Close()
|
||||
defer func() {
|
||||
_ = conn.Close()
|
||||
}()
|
||||
|
||||
tests.ClearAll(suite.namespace, conn)
|
||||
_ = tests.ClearAll(suite.namespace, conn)
|
||||
}
|
||||
|
||||
// TestEnqueuer tests enqueuer
|
||||
@ -91,7 +93,9 @@ func (suite *EnqueuerTestSuite) TestEnqueuer() {
|
||||
|
||||
key := rds.RedisKeyScheduled(suite.namespace)
|
||||
conn := suite.pool.Get()
|
||||
defer conn.Close()
|
||||
defer func() {
|
||||
_ = conn.Close()
|
||||
}()
|
||||
|
||||
count, err := redis.Int(conn.Do("ZCARD", key))
|
||||
require.Nil(suite.T(), err, "count scheduled: nil error expected but got %s", err)
|
||||
@ -121,7 +125,9 @@ func (suite *EnqueuerTestSuite) prepare() {
|
||||
key := rds.KeyPeriodicPolicy(suite.namespace)
|
||||
|
||||
conn := suite.pool.Get()
|
||||
defer conn.Close()
|
||||
defer func() {
|
||||
_ = conn.Close()
|
||||
}()
|
||||
|
||||
_, err = conn.Do("ZADD", key, time.Now().Unix(), rawData)
|
||||
assert.Nil(suite.T(), err, "prepare policy: nil error expected but got %s", err)
|
||||
|
@ -119,7 +119,9 @@ func (ps *policyStore) serve() (err error) {
|
||||
psc := redis.PubSubConn{
|
||||
Conn: conn,
|
||||
}
|
||||
defer psc.Close()
|
||||
defer func() {
|
||||
_ = psc.Close()
|
||||
}()
|
||||
|
||||
// Subscribe channel
|
||||
err = psc.Subscribe(redis.Args{}.AddFlat(rds.KeyPeriodicNotification(ps.namespace))...)
|
||||
@ -182,7 +184,9 @@ func (ps *policyStore) serve() (err error) {
|
||||
}
|
||||
}()
|
||||
// Unsubscribe all
|
||||
psc.Unsubscribe()
|
||||
if err := psc.Unsubscribe(); err != nil {
|
||||
logger.Errorf("unsubscribe: %s", err)
|
||||
}
|
||||
// Confirm result
|
||||
// Add timeout in case unsubscribe failed
|
||||
select {
|
||||
@ -245,7 +249,9 @@ func (ps *policyStore) sync(m *message) error {
|
||||
// Load all the policies from the backend to store
|
||||
func (ps *policyStore) load() error {
|
||||
conn := ps.pool.Get()
|
||||
defer conn.Close()
|
||||
defer func() {
|
||||
_ = conn.Close()
|
||||
}()
|
||||
|
||||
bytes, err := redis.Values(conn.Do("ZRANGE", rds.KeyPeriodicPolicy(ps.namespace), 0, -1))
|
||||
if err != nil {
|
||||
|
@ -55,9 +55,11 @@ func (suite *PolicyStoreTestSuite) TearDownSuite() {
|
||||
suite.cancel()
|
||||
|
||||
conn := suite.pool.Get()
|
||||
defer conn.Close()
|
||||
defer func() {
|
||||
_ = conn.Close()
|
||||
}()
|
||||
|
||||
tests.ClearAll(suite.namespace, conn)
|
||||
_ = tests.ClearAll(suite.namespace, conn)
|
||||
}
|
||||
|
||||
// TestStore tests policy store serve
|
||||
@ -88,7 +90,9 @@ func (suite *PolicyStoreTestSuite) TestLoad() {
|
||||
key := rds.KeyPeriodicPolicy(suite.namespace)
|
||||
|
||||
conn := suite.pool.Get()
|
||||
defer conn.Close()
|
||||
defer func() {
|
||||
_ = conn.Close()
|
||||
}()
|
||||
|
||||
_, err = conn.Do("ZADD", key, time.Now().Unix(), rawData)
|
||||
assert.Nil(suite.T(), err, "add data: nil error expected but got %s", err)
|
||||
|
@ -15,6 +15,7 @@ package runner
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"os"
|
||||
"sync"
|
||||
"testing"
|
||||
@ -39,8 +40,9 @@ import (
|
||||
type RedisRunnerTestSuite struct {
|
||||
suite.Suite
|
||||
|
||||
lcmCtl lcm.Controller
|
||||
redisJob *RedisJob
|
||||
lcmCtl lcm.Controller
|
||||
|
||||
envContext *env.Context
|
||||
|
||||
cancel context.CancelFunc
|
||||
namespace string
|
||||
@ -57,7 +59,7 @@ func (suite *RedisRunnerTestSuite) SetupSuite() {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
suite.cancel = cancel
|
||||
|
||||
envCtx := &env.Context{
|
||||
suite.envContext = &env.Context{
|
||||
SystemContext: ctx,
|
||||
WG: new(sync.WaitGroup),
|
||||
ErrorChan: make(chan error, 1),
|
||||
@ -67,7 +69,7 @@ func (suite *RedisRunnerTestSuite) SetupSuite() {
|
||||
suite.pool = tests.GiveMeRedisPool()
|
||||
|
||||
suite.lcmCtl = lcm.NewController(
|
||||
envCtx,
|
||||
suite.envContext,
|
||||
suite.namespace,
|
||||
suite.pool,
|
||||
func(hookURL string, change *job.StatusChange) error { return nil },
|
||||
@ -84,8 +86,14 @@ func (suite *RedisRunnerTestSuite) SetupSuite() {
|
||||
}
|
||||
_, err := suite.lcmCtl.New(fakeStats)
|
||||
require.NoError(suite.T(), err, "lcm new: nil error expected but got %s", err)
|
||||
}
|
||||
|
||||
suite.redisJob = NewRedisJob((*fakeParentJob)(nil), envCtx, suite.lcmCtl)
|
||||
// SetupTest prepares test cases
|
||||
func (suite *RedisRunnerTestSuite) SetupTest() {
|
||||
t, err := suite.lcmCtl.Track("FAKE-j")
|
||||
require.NoError(suite.T(), err)
|
||||
err = t.Update("status", job.PendingStatus.String()) // reset
|
||||
assert.NoError(suite.T(), err)
|
||||
}
|
||||
|
||||
// TearDownSuite clears the test suite
|
||||
@ -129,11 +137,59 @@ func (suite *RedisRunnerTestSuite) TestJobWrapper() {
|
||||
},
|
||||
}
|
||||
|
||||
err := suite.redisJob.Run(j)
|
||||
redisJob := NewRedisJob((*fakeParentJob)(nil), suite.envContext, suite.lcmCtl)
|
||||
err := redisJob.Run(j)
|
||||
require.NoError(suite.T(), err, "redis job: nil error expected but got %s", err)
|
||||
}
|
||||
|
||||
type fakeParentJob struct{}
|
||||
// TestJobWrapperInvalidTracker tests job runner with invalid job ID
|
||||
func (suite *RedisRunnerTestSuite) TestJobWrapperInvalidTracker() {
|
||||
j := &work.Job{
|
||||
ID: "FAKE-j2",
|
||||
Name: "fakeParentJob",
|
||||
EnqueuedAt: time.Now().Add(5 * time.Minute).Unix(),
|
||||
Fails: 3,
|
||||
}
|
||||
|
||||
redisJob := NewRedisJob((*fakeParentJob)(nil), suite.envContext, suite.lcmCtl)
|
||||
err := redisJob.Run(j)
|
||||
require.Error(suite.T(), err, "redis job: non nil error expected but got nil")
|
||||
assert.Equal(suite.T(), int64(2), j.Fails)
|
||||
}
|
||||
|
||||
// TestJobWrapperPanic tests job runner panic
|
||||
func (suite *RedisRunnerTestSuite) TestJobWrapperPanic() {
|
||||
j := &work.Job{
|
||||
ID: "FAKE-j",
|
||||
Name: "fakePanicJob",
|
||||
EnqueuedAt: time.Now().Add(5 * time.Minute).Unix(),
|
||||
}
|
||||
|
||||
redisJob := NewRedisJob((*fakePanicJob)(nil), suite.envContext, suite.lcmCtl)
|
||||
err := redisJob.Run(j)
|
||||
assert.Error(suite.T(), err)
|
||||
}
|
||||
|
||||
// TestJobWrapperStopped tests job runner stopped
|
||||
func (suite *RedisRunnerTestSuite) TestJobWrapperStopped() {
|
||||
j := &work.Job{
|
||||
ID: "FAKE-j",
|
||||
Name: "fakePanicJob",
|
||||
EnqueuedAt: time.Now().Add(5 * time.Minute).Unix(),
|
||||
}
|
||||
|
||||
t, err := suite.lcmCtl.Track("FAKE-j")
|
||||
require.NoError(suite.T(), err)
|
||||
err = t.Stop()
|
||||
require.NoError(suite.T(), err)
|
||||
|
||||
redisJob := NewRedisJob((*fakeParentJob)(nil), suite.envContext, suite.lcmCtl)
|
||||
err = redisJob.Run(j)
|
||||
require.NoError(suite.T(), err)
|
||||
}
|
||||
|
||||
type fakeParentJob struct {
|
||||
}
|
||||
|
||||
func (j *fakeParentJob) MaxFails() uint {
|
||||
return 1
|
||||
@ -148,7 +204,26 @@ func (j *fakeParentJob) Validate(params job.Parameters) error {
|
||||
}
|
||||
|
||||
func (j *fakeParentJob) Run(ctx job.Context, params job.Parameters) error {
|
||||
ctx.Checkin("start")
|
||||
_ = ctx.Checkin("start")
|
||||
ctx.OPCommand()
|
||||
return nil
|
||||
}
|
||||
|
||||
type fakePanicJob struct {
|
||||
}
|
||||
|
||||
func (j *fakePanicJob) MaxFails() uint {
|
||||
return 1
|
||||
}
|
||||
|
||||
func (j *fakePanicJob) ShouldRetry() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (j *fakePanicJob) Validate(params job.Parameters) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (j *fakePanicJob) Run(ctx job.Context, params job.Parameters) error {
|
||||
panic("for testing")
|
||||
}
|
||||
|
@ -60,9 +60,11 @@ func (suite *BootStrapTestSuite) TearDownSuite() {
|
||||
|
||||
pool := tests.GiveMeRedisPool()
|
||||
conn := pool.Get()
|
||||
defer conn.Close()
|
||||
defer func() {
|
||||
_ = conn.Close()
|
||||
}()
|
||||
|
||||
tests.ClearAll(tests.GiveMeTestNamespace(), conn)
|
||||
_ = tests.ClearAll(tests.GiveMeTestNamespace(), conn)
|
||||
}
|
||||
|
||||
// TestBootStrapTestSuite is entry of go test
|
||||
|
@ -138,7 +138,9 @@ func (w *basicWorker) Start() error {
|
||||
logger.Infof("Basic worker is stopped")
|
||||
}()
|
||||
<-w.context.SystemContext.Done()
|
||||
w.scheduler.Stop()
|
||||
if err := w.scheduler.Stop(); err != nil {
|
||||
logger.Errorf("stop scheduler error: %s", err)
|
||||
}
|
||||
w.pool.Stop()
|
||||
}()
|
||||
|
||||
@ -318,7 +320,7 @@ func (w *basicWorker) StopJob(jobID string) error {
|
||||
|
||||
if job.RunningStatus.Compare(job.Status(t.Job().Info.Status)) < 0 {
|
||||
// Job has been in the final states
|
||||
return errors.Errorf("mismatch job status %s for stopping job %s", t.Job().Info.Status, jobID)
|
||||
return errors.Errorf("mismatch job status for stopping job: %s, job status %s is behind %s", jobID, t.Job().Info.Status, job.RunningStatus)
|
||||
}
|
||||
|
||||
switch t.Job().Info.JobKind {
|
||||
@ -453,7 +455,9 @@ func (w *basicWorker) registerJob(name string, j interface{}) (err error) {
|
||||
// Ping the redis server
|
||||
func (w *basicWorker) ping() error {
|
||||
conn := w.redisPool.Get()
|
||||
defer conn.Close()
|
||||
defer func() {
|
||||
_ = conn.Close()
|
||||
}()
|
||||
|
||||
var err error
|
||||
for count := 1; count <= pingRedisMaxTimes; count++ {
|
||||
|
@ -89,9 +89,11 @@ func (suite *CWorkerTestSuite) TearDownSuite() {
|
||||
suite.context.WG.Wait()
|
||||
|
||||
conn := suite.pool.Get()
|
||||
defer conn.Close()
|
||||
defer func() {
|
||||
_ = conn.Close()
|
||||
}()
|
||||
|
||||
tests.ClearAll(suite.namespace, conn)
|
||||
_ = tests.ClearAll(suite.namespace, conn)
|
||||
}
|
||||
|
||||
// TestCWorkerTestSuite is entry fo go test
|
||||
@ -234,7 +236,7 @@ func (j *fakeJob) Validate(params job.Parameters) error {
|
||||
|
||||
func (j *fakeJob) Run(ctx job.Context, params job.Parameters) error {
|
||||
ctx.OPCommand()
|
||||
ctx.Checkin("done")
|
||||
_ = ctx.Checkin("done")
|
||||
|
||||
return nil
|
||||
}
|
||||
@ -266,7 +268,7 @@ func (j *fakeLongRunJob) Run(ctx job.Context, params job.Parameters) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
ctx.Checkin("done")
|
||||
_ = ctx.Checkin("done")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -64,7 +64,9 @@ func (rdd *redisDeDuplicator) MustUnique(jobName string, params job.Parameters)
|
||||
}
|
||||
|
||||
conn := rdd.pool.Get()
|
||||
defer conn.Close()
|
||||
defer func() {
|
||||
_ = conn.Close()
|
||||
}()
|
||||
|
||||
args := []interface{}{
|
||||
uniqueKey,
|
||||
@ -98,7 +100,9 @@ func (rdd *redisDeDuplicator) DelUniqueSign(jobName string, params job.Parameter
|
||||
}
|
||||
|
||||
conn := rdd.pool.Get()
|
||||
defer conn.Close()
|
||||
defer func() {
|
||||
_ = conn.Close()
|
||||
}()
|
||||
|
||||
if _, err := conn.Do("DEL", uniqueKey); err != nil {
|
||||
return fmt.Errorf("delete unique job error: %s", err)
|
||||
|
@ -1,12 +1,26 @@
|
||||
package cworker
|
||||
|
||||
import (
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/stretchr/testify/suite"
|
||||
"testing"
|
||||
|
||||
"github.com/goharbor/harbor/src/jobservice/tests"
|
||||
)
|
||||
|
||||
func TestDeDuplicator(t *testing.T) {
|
||||
// DeDuplicatorTestSuite tests functions of DeDuplicator
|
||||
type DeDuplicatorTestSuite struct {
|
||||
suite.Suite
|
||||
}
|
||||
|
||||
// TestDeDuplicatorTestSuite is entry of go test
|
||||
func TestDeDuplicatorTestSuite(t *testing.T) {
|
||||
suite.Run(t, new(DeDuplicatorTestSuite))
|
||||
}
|
||||
|
||||
// TestDeDuplicator ...
|
||||
func (suite *DeDuplicatorTestSuite) TestDeDuplicator() {
|
||||
jobName := "fake_job"
|
||||
jobParams := map[string]interface{}{
|
||||
"image": "ubuntu:latest",
|
||||
@ -14,15 +28,12 @@ func TestDeDuplicator(t *testing.T) {
|
||||
|
||||
rdd := NewDeDuplicator(tests.GiveMeTestNamespace(), tests.GiveMeRedisPool())
|
||||
|
||||
if err := rdd.MustUnique(jobName, jobParams); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
err := rdd.MustUnique(jobName, jobParams)
|
||||
require.NoError(suite.T(), err, "must unique 1st time: nil error expected but got %s", err)
|
||||
|
||||
if err := rdd.DelUniqueSign(jobName, jobParams); err == nil {
|
||||
t.Errorf("expect duplicated error but got nil error")
|
||||
}
|
||||
err = rdd.MustUnique(jobName, jobParams)
|
||||
assert.Error(suite.T(), err, "must unique 2nd time: non nil error expected but got nil")
|
||||
|
||||
if err := rdd.DelUniqueSign(jobName, jobParams); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
err = rdd.DelUniqueSign(jobName, jobParams)
|
||||
assert.NoError(suite.T(), err, "del unique: nil error expected but got %s", err)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user