mirror of
https://github.com/goharbor/harbor.git
synced 2024-11-27 04:35:16 +01:00
Add UT cases for package pool
This commit is contained in:
parent
ebf860367a
commit
9889d4badd
@ -49,14 +49,14 @@ func TestLaunchJobSucceed(t *testing.T) {
|
||||
|
||||
res, err := postReq(fmt.Sprintf("http://localhost:%d/api/v1/jobs", port), createJobReq(true))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
obj, err := getResult(res)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
if obj.Stats.JobID != "fake_ID_ok" {
|
||||
t.Errorf("expect job ID 'fake_ID_ok' but got '%s'\n", obj.Stats.JobID)
|
||||
t.Fatalf("expect job ID 'fake_ID_ok' but got '%s'\n", obj.Stats.JobID)
|
||||
}
|
||||
|
||||
server.Stop()
|
||||
@ -70,7 +70,7 @@ func TestGetJobFailed(t *testing.T) {
|
||||
|
||||
res, err := getReq(fmt.Sprintf("http://localhost:%d/api/v1/jobs/fake_job", port))
|
||||
if e := expectFormatedError(res, err); e != nil {
|
||||
t.Error(e)
|
||||
t.Fatal(e)
|
||||
}
|
||||
|
||||
server.Stop()
|
||||
@ -84,14 +84,14 @@ func TestGetJobSucceed(t *testing.T) {
|
||||
|
||||
res, err := getReq(fmt.Sprintf("http://localhost:%d/api/v1/jobs/fake_job_ok", port))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
obj, err := getResult(res)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
if obj.Stats.JobName != "testing" || obj.Stats.JobID != "fake_ID_ok" {
|
||||
t.Errorf("expect job ID 'fake_ID_ok' of 'testing', but got '%s'\n", obj.Stats.JobID)
|
||||
t.Fatalf("expect job ID 'fake_ID_ok' of 'testing', but got '%s'\n", obj.Stats.JobID)
|
||||
}
|
||||
|
||||
server.Stop()
|
||||
@ -105,21 +105,21 @@ func TestJobActionFailed(t *testing.T) {
|
||||
|
||||
actionReq, err := createJobActionReq("stop")
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
resData, err := postReq(fmt.Sprintf("http://localhost:%d/api/v1/jobs/fake_job", port), actionReq)
|
||||
expectFormatedError(resData, err)
|
||||
|
||||
actionReq, err = createJobActionReq("cancel")
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
resData, err = postReq(fmt.Sprintf("http://localhost:%d/api/v1/jobs/fake_job", port), actionReq)
|
||||
expectFormatedError(resData, err)
|
||||
|
||||
actionReq, err = createJobActionReq("retry")
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
resData, err = postReq(fmt.Sprintf("http://localhost:%d/api/v1/jobs/fake_job", port), actionReq)
|
||||
expectFormatedError(resData, err)
|
||||
@ -135,29 +135,29 @@ func TestJobActionSucceed(t *testing.T) {
|
||||
|
||||
actionReq, err := createJobActionReq("stop")
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
_, err = postReq(fmt.Sprintf("http://localhost:%d/api/v1/jobs/fake_job_ok", port), actionReq)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
actionReq, err = createJobActionReq("cancel")
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
_, err = postReq(fmt.Sprintf("http://localhost:%d/api/v1/jobs/fake_job_ok", port), actionReq)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
actionReq, err = createJobActionReq("retry")
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
_, err = postReq(fmt.Sprintf("http://localhost:%d/api/v1/jobs/fake_job_ok", port), actionReq)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
server.Stop()
|
||||
@ -171,7 +171,7 @@ func TestCheckStatus(t *testing.T) {
|
||||
|
||||
resData, err := getReq(fmt.Sprintf("http://localhost:%d/api/v1/stats", port))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
poolStats := &models.JobPoolStats{
|
||||
@ -179,11 +179,11 @@ func TestCheckStatus(t *testing.T) {
|
||||
}
|
||||
err = json.Unmarshal(resData, poolStats)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if poolStats.Pools[0].WorkerPoolID != "fake_pool_ID" {
|
||||
t.Errorf("expect pool ID 'fake_pool_ID' but got '%s'", poolStats.Pools[0].WorkerPoolID)
|
||||
t.Fatalf("expect pool ID 'fake_pool_ID' but got '%s'", poolStats.Pools[0].WorkerPoolID)
|
||||
}
|
||||
|
||||
server.Stop()
|
||||
@ -197,11 +197,11 @@ func TestGetJobLog(t *testing.T) {
|
||||
|
||||
resData, err := getReq(fmt.Sprintf("http://localhost:%d/api/v1/jobs/fake_job_ok/log", port))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if len(resData) == 0 {
|
||||
t.Error("expect job log but got nothing")
|
||||
t.Fatal("expect job log but got nothing")
|
||||
}
|
||||
|
||||
server.Stop()
|
||||
|
@ -9,22 +9,22 @@ import (
|
||||
func TestConfigLoadingFailed(t *testing.T) {
|
||||
cfg := &Configuration{}
|
||||
if err := cfg.Load("./config.not-existing.yaml", false); err == nil {
|
||||
t.Errorf("Load config from none-existing document, expect none nil error but got '%s'\n", err)
|
||||
t.Fatalf("Load config from none-existing document, expect none nil error but got '%s'\n", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestConfigLoadingSucceed(t *testing.T) {
|
||||
if err := CreateLogDir(); err != nil {
|
||||
t.Error(err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
cfg := &Configuration{}
|
||||
if err := cfg.Load("../config_test.yml", false); err != nil {
|
||||
t.Errorf("Load config from yaml file, expect nil error but got error '%s'\n", err)
|
||||
t.Fatalf("Load config from yaml file, expect nil error but got error '%s'\n", err)
|
||||
}
|
||||
|
||||
if err := RemoveLogDir(); err != nil {
|
||||
t.Error(err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
@ -36,70 +36,70 @@ func TestConfigLoadingWithEnv(t *testing.T) {
|
||||
|
||||
cfg := &Configuration{}
|
||||
if err := cfg.Load("../config_test.yml", true); err != nil {
|
||||
t.Errorf("Load config from yaml file, expect nil error but got error '%s'\n", err)
|
||||
t.Fatalf("Load config from yaml file, expect nil error but got error '%s'\n", err)
|
||||
}
|
||||
|
||||
if cfg.Protocol != "https" {
|
||||
t.Errorf("expect protocol 'https', but got '%s'\n", cfg.Protocol)
|
||||
t.Fatalf("expect protocol 'https', but got '%s'\n", cfg.Protocol)
|
||||
}
|
||||
if cfg.Port != 8989 {
|
||||
t.Errorf("expect port 8989 but got '%d'\n", cfg.Port)
|
||||
t.Fatalf("expect port 8989 but got '%d'\n", cfg.Port)
|
||||
}
|
||||
if cfg.PoolConfig.WorkerCount != 8 {
|
||||
t.Errorf("expect workcount 8 but go '%d'\n", cfg.PoolConfig.WorkerCount)
|
||||
t.Fatalf("expect workcount 8 but go '%d'\n", cfg.PoolConfig.WorkerCount)
|
||||
}
|
||||
if cfg.PoolConfig.RedisPoolCfg.Host != "localhost" {
|
||||
t.Errorf("expect redis host 'localhost' but got '%s'\n", cfg.PoolConfig.RedisPoolCfg.Host)
|
||||
t.Fatalf("expect redis host 'localhost' but got '%s'\n", cfg.PoolConfig.RedisPoolCfg.Host)
|
||||
}
|
||||
if cfg.PoolConfig.RedisPoolCfg.Port != 7379 {
|
||||
t.Errorf("expect redis port '7379' but got '%d'\n", cfg.PoolConfig.RedisPoolCfg.Port)
|
||||
t.Fatalf("expect redis port '7379' but got '%d'\n", cfg.PoolConfig.RedisPoolCfg.Port)
|
||||
}
|
||||
if cfg.PoolConfig.RedisPoolCfg.Namespace != "ut_namespace" {
|
||||
t.Errorf("expect redis namespace 'ut_namespace' but got '%s'\n", cfg.PoolConfig.RedisPoolCfg.Namespace)
|
||||
t.Fatalf("expect redis namespace 'ut_namespace' but got '%s'\n", cfg.PoolConfig.RedisPoolCfg.Namespace)
|
||||
}
|
||||
if cfg.LoggerConfig.BasePath != "/tmp" {
|
||||
t.Errorf("expect log base path '/tmp' but got '%s'\n", cfg.LoggerConfig.BasePath)
|
||||
t.Fatalf("expect log base path '/tmp' but got '%s'\n", cfg.LoggerConfig.BasePath)
|
||||
}
|
||||
if cfg.LoggerConfig.LogLevel != "DEBUG" {
|
||||
t.Errorf("expect log level 'DEBUG' but got '%s'\n", cfg.LoggerConfig.LogLevel)
|
||||
t.Fatalf("expect log level 'DEBUG' but got '%s'\n", cfg.LoggerConfig.LogLevel)
|
||||
}
|
||||
if cfg.LoggerConfig.ArchivePeriod != 5 {
|
||||
t.Errorf("expect log archive period 5 but got '%d'\n", cfg.LoggerConfig.ArchivePeriod)
|
||||
t.Fatalf("expect log archive period 5 but got '%d'\n", cfg.LoggerConfig.ArchivePeriod)
|
||||
}
|
||||
|
||||
unsetENV()
|
||||
if err := RemoveLogDir(); err != nil {
|
||||
t.Error(err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDefaultConfig(t *testing.T) {
|
||||
if err := CreateLogDir(); err != nil {
|
||||
t.Error(err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err := DefaultConfig.Load("../config_test.yml", true); err != nil {
|
||||
t.Errorf("Load config from yaml file, expect nil error but got error '%s'\n", err)
|
||||
t.Fatalf("Load config from yaml file, expect nil error but got error '%s'\n", err)
|
||||
}
|
||||
|
||||
if endpoint := GetAdminServerEndpoint(); endpoint != "http://127.0.0.1:8888" {
|
||||
t.Errorf("expect default admin server endpoint 'http://127.0.0.1:8888' but got '%s'\n", endpoint)
|
||||
t.Fatalf("expect default admin server endpoint 'http://127.0.0.1:8888' but got '%s'\n", endpoint)
|
||||
}
|
||||
|
||||
if basePath := GetLogBasePath(); basePath != "/tmp/job_logs" {
|
||||
t.Errorf("expect default logger base path '/tmp/job_logs' but got '%s'\n", basePath)
|
||||
t.Fatalf("expect default logger base path '/tmp/job_logs' but got '%s'\n", basePath)
|
||||
}
|
||||
|
||||
if lvl := GetLogLevel(); lvl != "INFO" {
|
||||
t.Errorf("expect default logger level 'INFO' but got '%s'\n", lvl)
|
||||
t.Fatalf("expect default logger level 'INFO' but got '%s'\n", lvl)
|
||||
}
|
||||
|
||||
if period := GetLogArchivePeriod(); period != 1 {
|
||||
t.Errorf("expect default log archive period 1 but got '%d'\n", period)
|
||||
t.Fatalf("expect default log archive period 1 but got '%d'\n", period)
|
||||
}
|
||||
|
||||
if err := RemoveLogDir(); err != nil {
|
||||
t.Error(err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -17,11 +17,11 @@ func TestLaunchGenericJob(t *testing.T) {
|
||||
req := createJobReq("Generic", false, false)
|
||||
res, err := c.LaunchJob(req)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if res.Stats.JobID != "fake_ID" {
|
||||
t.Errorf("expect enqueued job ID 'fake_ID' but got '%s'\n", res.Stats.JobID)
|
||||
t.Fatalf("expect enqueued job ID 'fake_ID' but got '%s'\n", res.Stats.JobID)
|
||||
}
|
||||
}
|
||||
|
||||
@ -31,11 +31,11 @@ func TestLaunchGenericJobUnique(t *testing.T) {
|
||||
req := createJobReq("Generic", true, false)
|
||||
res, err := c.LaunchJob(req)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if res.Stats.JobID != "fake_ID" {
|
||||
t.Errorf("expect enqueued job ID 'fake_ID' but got '%s'\n", res.Stats.JobID)
|
||||
t.Fatalf("expect enqueued job ID 'fake_ID' but got '%s'\n", res.Stats.JobID)
|
||||
}
|
||||
}
|
||||
|
||||
@ -45,11 +45,11 @@ func TestLaunchGenericJobWithHook(t *testing.T) {
|
||||
req := createJobReq("Generic", false, true)
|
||||
res, err := c.LaunchJob(req)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if res.Stats.JobID != "fake_ID" {
|
||||
t.Errorf("expect enqueued job ID 'fake_ID' but got '%s'\n", res.Stats.JobID)
|
||||
t.Fatalf("expect enqueued job ID 'fake_ID' but got '%s'\n", res.Stats.JobID)
|
||||
}
|
||||
}
|
||||
|
||||
@ -59,11 +59,11 @@ func TestLaunchScheduledJob(t *testing.T) {
|
||||
req := createJobReq("Scheduled", false, true)
|
||||
res, err := c.LaunchJob(req)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if res.Stats.JobID != "fake_ID_Scheduled" {
|
||||
t.Errorf("expect enqueued job ID 'fake_ID_Scheduled' but got '%s'\n", res.Stats.JobID)
|
||||
t.Fatalf("expect enqueued job ID 'fake_ID_Scheduled' but got '%s'\n", res.Stats.JobID)
|
||||
}
|
||||
}
|
||||
|
||||
@ -73,11 +73,11 @@ func TestLaunchScheduledUniqueJob(t *testing.T) {
|
||||
req := createJobReq("Scheduled", true, false)
|
||||
res, err := c.LaunchJob(req)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if res.Stats.JobID != "fake_ID_Scheduled" {
|
||||
t.Errorf("expect enqueued job ID 'fake_ID_Scheduled' but got '%s'\n", res.Stats.JobID)
|
||||
t.Fatalf("expect enqueued job ID 'fake_ID_Scheduled' but got '%s'\n", res.Stats.JobID)
|
||||
}
|
||||
}
|
||||
|
||||
@ -87,11 +87,11 @@ func TestLaunchPeriodicJob(t *testing.T) {
|
||||
req := createJobReq("Periodic", true, false)
|
||||
res, err := c.LaunchJob(req)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if res.Stats.JobID != "fake_ID_Periodic" {
|
||||
t.Errorf("expect enqueued job ID 'fake_ID_Periodic' but got '%s'\n", res.Stats.JobID)
|
||||
t.Fatalf("expect enqueued job ID 'fake_ID_Periodic' but got '%s'\n", res.Stats.JobID)
|
||||
}
|
||||
}
|
||||
|
||||
@ -100,11 +100,11 @@ func TestGetJobStats(t *testing.T) {
|
||||
c := NewController(pool)
|
||||
stats, err := c.GetJob("fake_ID")
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if stats.Stats.Status != "running" {
|
||||
t.Errorf("expect stauts 'running' but got '%s'\n", stats.Stats.Status)
|
||||
t.Fatalf("expect stauts 'running' but got '%s'\n", stats.Stats.Status)
|
||||
}
|
||||
}
|
||||
|
||||
@ -113,15 +113,15 @@ func TestJobActions(t *testing.T) {
|
||||
c := NewController(pool)
|
||||
|
||||
if err := c.StopJob("fake_ID"); err != nil {
|
||||
t.Error(err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err := c.CancelJob("fake_ID"); err != nil {
|
||||
t.Error(err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err := c.RetryJob("fake_ID"); err != nil {
|
||||
t.Error(err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
@ -134,7 +134,7 @@ func TestGetJobLogData(t *testing.T) {
|
||||
t.Errorf("expect object not found error but got '%s'\n", err)
|
||||
}
|
||||
} else {
|
||||
t.Error("expect error but got nil")
|
||||
t.Fatal("expect error but got nil")
|
||||
}
|
||||
}
|
||||
|
||||
@ -144,15 +144,15 @@ func TestCheckStatus(t *testing.T) {
|
||||
|
||||
st, err := c.CheckStatus()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if len(st.Pools) == 0 {
|
||||
t.Error("expect status data but got zero list")
|
||||
t.Fatal("expect status data but got zero list")
|
||||
}
|
||||
|
||||
if st.Pools[0].Status != "running" {
|
||||
t.Errorf("expect status 'running' but got '%s'\n", st.Pools[0].Status)
|
||||
t.Fatalf("expect status 'running' but got '%s'\n", st.Pools[0].Status)
|
||||
}
|
||||
}
|
||||
|
||||
@ -170,23 +170,23 @@ func TestInvalidCheck(t *testing.T) {
|
||||
}
|
||||
|
||||
if _, err := c.LaunchJob(req); err == nil {
|
||||
t.Error("error expected but got nil")
|
||||
t.Fatal("error expected but got nil")
|
||||
}
|
||||
|
||||
req.Job.Name = "fake"
|
||||
if _, err := c.LaunchJob(req); err == nil {
|
||||
t.Error("error expected but got nil")
|
||||
t.Fatal("error expected but got nil")
|
||||
}
|
||||
|
||||
req.Job.Metadata.JobKind = "Scheduled"
|
||||
if _, err := c.LaunchJob(req); err == nil {
|
||||
t.Error("error expected but got nil")
|
||||
t.Fatal("error expected but got nil")
|
||||
}
|
||||
|
||||
req.Job.Metadata.JobKind = "Periodic"
|
||||
req.Job.Metadata.Cron = "x x x x x x"
|
||||
if _, err := c.LaunchJob(req); err == nil {
|
||||
t.Error("error expected but got nil")
|
||||
t.Fatal("error expected but got nil")
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -13,11 +13,11 @@ func TestSweeper(t *testing.T) {
|
||||
workDir := "/tmp/sweeper_logs"
|
||||
|
||||
if err := os.MkdirAll(workDir, 0755); err != nil {
|
||||
t.Error(err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
_, err := os.Create(fmt.Sprintf("%s/sweeper_test.log", workDir))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
@ -28,9 +28,9 @@ func TestSweeper(t *testing.T) {
|
||||
<-time.After(100 * time.Millisecond)
|
||||
|
||||
if err := os.Remove(fmt.Sprintf("%s/sweeper_test.log", workDir)); err != nil {
|
||||
t.Error(err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := os.Remove(workDir); err != nil {
|
||||
t.Error(err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
@ -21,7 +21,7 @@ func TestHookClient(t *testing.T) {
|
||||
Status: "running",
|
||||
})
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
@ -37,6 +37,6 @@ func TestReportStatusFailed(t *testing.T) {
|
||||
Status: "running",
|
||||
})
|
||||
if err == nil {
|
||||
t.Error("expect error but got nil")
|
||||
t.Fatal("expect error but got nil")
|
||||
}
|
||||
}
|
||||
|
@ -57,16 +57,16 @@ func TestSetJobStatus(t *testing.T) {
|
||||
<-time.After(100 * time.Millisecond)
|
||||
stats, err := mgr.Retrieve("fake_job_ID")
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if stats.Stats.Status != "running" {
|
||||
t.Errorf("expect job status 'running' but got '%s'\n", stats.Stats.Status)
|
||||
t.Fatalf("expect job status 'running' but got '%s'\n", stats.Stats.Status)
|
||||
}
|
||||
|
||||
key := utils.KeyJobStats(testingNamespace, "fake_job_ID")
|
||||
if err := clear(key, redisPool.Get()); err != nil {
|
||||
t.Error(err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
@ -77,20 +77,20 @@ func TestCommand(t *testing.T) {
|
||||
<-time.After(200 * time.Millisecond)
|
||||
|
||||
if err := mgr.SendCommand("fake_job_ID", CtlCommandStop); err != nil {
|
||||
t.Error(err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if cmd, err := mgr.CtlCommand("fake_job_ID"); err != nil {
|
||||
t.Error(err)
|
||||
t.Fatal(err)
|
||||
} else {
|
||||
if cmd != CtlCommandStop {
|
||||
t.Errorf("expect '%s' but got '%s'", CtlCommandStop, cmd)
|
||||
t.Fatalf("expect '%s' but got '%s'", CtlCommandStop, cmd)
|
||||
}
|
||||
}
|
||||
|
||||
key := utils.KeyJobCtlCommands(testingNamespace, "fake_job_ID")
|
||||
if err := clear(key, redisPool.Get()); err != nil {
|
||||
t.Error(err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
@ -105,7 +105,7 @@ func TestDieAt(t *testing.T) {
|
||||
|
||||
dieAt := time.Now().Unix()
|
||||
if err := createDeadJob(redisPool.Get(), dieAt); err != nil {
|
||||
t.Error(err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
<-time.After(200 * time.Millisecond)
|
||||
mgr.DieAt("fake_job_ID", dieAt)
|
||||
@ -113,20 +113,20 @@ func TestDieAt(t *testing.T) {
|
||||
|
||||
stats, err := mgr.Retrieve("fake_job_ID")
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if stats.Stats.DieAt != dieAt {
|
||||
t.Errorf("expect die at '%d' but got '%d'\n", dieAt, stats.Stats.DieAt)
|
||||
t.Fatalf("expect die at '%d' but got '%d'\n", dieAt, stats.Stats.DieAt)
|
||||
}
|
||||
|
||||
key := utils.KeyJobStats(testingNamespace, "fake_job_ID")
|
||||
if err := clear(key, redisPool.Get()); err != nil {
|
||||
t.Error(err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
key2 := utils.RedisKeyDead(testingNamespace)
|
||||
if err := clear(key2, redisPool.Get()); err != nil {
|
||||
t.Error(err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
@ -137,12 +137,12 @@ func TestRegisterHook(t *testing.T) {
|
||||
<-time.After(200 * time.Millisecond)
|
||||
|
||||
if err := mgr.RegisterHook("fake_job_ID", "http://localhost:9999", false); err != nil {
|
||||
t.Error(err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
key := utils.KeyJobStats(testingNamespace, "fake_job_ID")
|
||||
if err := clear(key, redisPool.Get()); err != nil {
|
||||
t.Error(err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
@ -158,12 +158,12 @@ func TestExpireJobStats(t *testing.T) {
|
||||
<-time.After(200 * time.Millisecond)
|
||||
|
||||
if err := mgr.ExpirePeriodicJobStats("fake_job_ID"); err != nil {
|
||||
t.Error(err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
key := utils.KeyJobStats(testingNamespace, "fake_job_ID")
|
||||
if err := clear(key, redisPool.Get()); err != nil {
|
||||
t.Error(err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
@ -185,7 +185,7 @@ func TestCheckIn(t *testing.T) {
|
||||
defer ts.Close()
|
||||
|
||||
if err := mgr.RegisterHook("fake_job_ID", ts.URL, false); err != nil {
|
||||
t.Error(err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
mgr.CheckIn("fake_job_ID", "checkin")
|
||||
@ -193,16 +193,16 @@ func TestCheckIn(t *testing.T) {
|
||||
|
||||
stats, err := mgr.Retrieve("fake_job_ID")
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if stats.Stats.CheckIn != "checkin" {
|
||||
t.Errorf("expect check in info 'checkin' but got '%s'\n", stats.Stats.CheckIn)
|
||||
t.Fatalf("expect check in info 'checkin' but got '%s'\n", stats.Stats.CheckIn)
|
||||
}
|
||||
|
||||
key := utils.KeyJobStats(testingNamespace, "fake_job_ID")
|
||||
if err := clear(key, redisPool.Get()); err != nil {
|
||||
t.Error(err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -14,11 +14,11 @@ func TestPeriodicJobPolicy(t *testing.T) {
|
||||
|
||||
data, err := p.Serialize()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err := p.DeSerialize(data); err != nil {
|
||||
t.Error(err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
@ -37,17 +37,17 @@ func TestPeriodicJobPolicyStore(t *testing.T) {
|
||||
pl = append(pl, createPolicy(""))
|
||||
ps.addAll(pl)
|
||||
if ps.size() != 3 {
|
||||
t.Errorf("expect size 3 but got '%d'\n", ps.size())
|
||||
t.Fatalf("expect size 3 but got '%d'\n", ps.size())
|
||||
}
|
||||
|
||||
l := ps.list()
|
||||
if l == nil || len(l) != 3 {
|
||||
t.Error("expect a policy list with 3 items but got invalid list")
|
||||
t.Fatal("expect a policy list with 3 items but got invalid list")
|
||||
}
|
||||
|
||||
rp := ps.remove("fake_ID_Steven")
|
||||
if rp == nil {
|
||||
t.Error("expect none nil policy object but got nil")
|
||||
t.Fatal("expect none nil policy object but got nil")
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -20,33 +20,33 @@ func TestScheduler(t *testing.T) {
|
||||
params["image"] = "testing:v1"
|
||||
id, runAt, err := scheduler.Schedule("fake_job", params, "5 * * * * *")
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if time.Now().Unix() >= runAt {
|
||||
t.Error("the running at time of scheduled job should be after now, but seems not")
|
||||
t.Fatal("the running at time of scheduled job should be after now, but seems not")
|
||||
}
|
||||
|
||||
if err := scheduler.Load(); err != nil {
|
||||
t.Error(err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if scheduler.pstore.size() != 1 {
|
||||
t.Errorf("expect 1 item in pstore but got '%d'\n", scheduler.pstore.size())
|
||||
t.Fatalf("expect 1 item in pstore but got '%d'\n", scheduler.pstore.size())
|
||||
}
|
||||
|
||||
if err := scheduler.UnSchedule(id); err != nil {
|
||||
t.Error(err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := scheduler.Clear(); err != nil {
|
||||
t.Error(err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = tests.Clear(utils.KeyPeriodicPolicy(tests.GiveMeTestNamespace()), redisPool.Get())
|
||||
err = tests.Clear(utils.KeyPeriodicPolicyScore(tests.GiveMeTestNamespace()), redisPool.Get())
|
||||
err = tests.Clear(utils.KeyPeriodicNotification(tests.GiveMeTestNamespace()), redisPool.Get())
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
@ -58,16 +58,16 @@ func TestPubFunc(t *testing.T) {
|
||||
CronSpec: "5 * * * * *",
|
||||
}
|
||||
if err := scheduler.AcceptPeriodicPolicy(p); err != nil {
|
||||
t.Error(err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
if scheduler.pstore.size() != 1 {
|
||||
t.Errorf("expect 1 item in pstore but got '%d' after accepting \n", scheduler.pstore.size())
|
||||
t.Fatalf("expect 1 item in pstore but got '%d' after accepting \n", scheduler.pstore.size())
|
||||
}
|
||||
if rmp := scheduler.RemovePeriodicPolicy("fake_ID"); rmp == nil {
|
||||
t.Error("expect none nil object returned after removing but got nil")
|
||||
t.Fatal("expect none nil object returned after removing but got nil")
|
||||
}
|
||||
if scheduler.pstore.size() != 0 {
|
||||
t.Errorf("expect 0 item in pstore but got '%d' \n", scheduler.pstore.size())
|
||||
t.Fatalf("expect 0 item in pstore but got '%d' \n", scheduler.pstore.size())
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -15,16 +15,16 @@ import (
|
||||
func TestSweeper(t *testing.T) {
|
||||
epoch := time.Now().Unix() - 1000
|
||||
if err := createFakeScheduledJob(epoch); err != nil {
|
||||
t.Error(err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
ns := tests.GiveMeTestNamespace()
|
||||
sweeper := NewSweeper(ns, redisPool, work.NewClient(ns, redisPool))
|
||||
if err := sweeper.ClearOutdatedScheduledJobs(); err != nil {
|
||||
t.Error(err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
err := tests.Clear(utils.RedisKeyScheduled(ns), redisPool.Get())
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
|
148
src/jobservice_v2/pool/message_server_test.go
Normal file
148
src/jobservice_v2/pool/message_server_test.go
Normal file
@ -0,0 +1,148 @@
|
||||
// Copyright 2018 The Harbor Authors. All rights reserved.
|
||||
package pool
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/vmware/harbor/src/jobservice_v2/opm"
|
||||
|
||||
"github.com/vmware/harbor/src/jobservice_v2/models"
|
||||
"github.com/vmware/harbor/src/jobservice_v2/period"
|
||||
"github.com/vmware/harbor/src/jobservice_v2/utils"
|
||||
|
||||
"github.com/vmware/harbor/src/jobservice_v2/tests"
|
||||
)
|
||||
|
||||
var redisPool = tests.GiveMeRedisPool()
|
||||
|
||||
func TestPublishPolicy(t *testing.T) {
|
||||
ms, cancel := createMessageServer()
|
||||
err := ms.Subscribe(period.EventSchedulePeriodicPolicy, func(data interface{}) error {
|
||||
if _, ok := data.(*period.PeriodicJobPolicy); !ok {
|
||||
t.Fatal("expect PeriodicJobPolicy but got other thing")
|
||||
return errors.New("expect PeriodicJobPolicy but got other thing")
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = ms.Subscribe(period.EventUnSchedulePeriodicPolicy, func(data interface{}) error {
|
||||
if _, ok := data.(*period.PeriodicJobPolicy); !ok {
|
||||
t.Fatal("expect PeriodicJobPolicy but got other thing")
|
||||
return errors.New("expect PeriodicJobPolicy but got other thing")
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
go func() {
|
||||
defer cancel()
|
||||
//wait and then publish
|
||||
<-time.After(200 * time.Millisecond)
|
||||
|
||||
p := &period.PeriodicJobPolicy{
|
||||
PolicyID: "fake_ID",
|
||||
JobName: "fake_job",
|
||||
CronSpec: "5 * * * *",
|
||||
}
|
||||
notification := &models.Message{
|
||||
Event: period.EventSchedulePeriodicPolicy,
|
||||
Data: p,
|
||||
}
|
||||
|
||||
rawJSON, err := json.Marshal(notification)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
conn := redisPool.Get()
|
||||
defer conn.Close()
|
||||
err = conn.Send("PUBLISH", utils.KeyPeriodicNotification(tests.GiveMeTestNamespace()), rawJSON)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
notification.Event = period.EventUnSchedulePeriodicPolicy
|
||||
rawJSON, err = json.Marshal(notification)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
err = conn.Send("PUBLISH", utils.KeyPeriodicNotification(tests.GiveMeTestNamespace()), rawJSON)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
//send quit signal
|
||||
<-time.After(200 * time.Millisecond)
|
||||
err = tests.Clear(utils.KeyPeriodicNotification(tests.GiveMeTestNamespace()), conn)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}()
|
||||
|
||||
ms.Start()
|
||||
}
|
||||
|
||||
func TestPublishHook(t *testing.T) {
|
||||
ms, cancel := createMessageServer()
|
||||
err := ms.Subscribe(opm.EventRegisterStatusHook, func(data interface{}) error {
|
||||
if _, ok := data.(*opm.HookData); !ok {
|
||||
t.Fatal("expect HookData but got other thing")
|
||||
return errors.New("expect HookData but got other thing")
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
go func() {
|
||||
defer cancel()
|
||||
|
||||
<-time.After(200 * time.Millisecond)
|
||||
hook := &opm.HookData{
|
||||
JobID: "fake_job_ID",
|
||||
HookURL: "http://localhost:9999/hook",
|
||||
}
|
||||
notification := &models.Message{
|
||||
Event: opm.EventRegisterStatusHook,
|
||||
Data: hook,
|
||||
}
|
||||
|
||||
rawJSON, err := json.Marshal(notification)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
conn := redisPool.Get()
|
||||
defer conn.Close()
|
||||
err = conn.Send("PUBLISH", utils.KeyPeriodicNotification(tests.GiveMeTestNamespace()), rawJSON)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
//send quit signal
|
||||
<-time.After(200 * time.Millisecond)
|
||||
err = tests.Clear(utils.KeyPeriodicNotification(tests.GiveMeTestNamespace()), conn)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}()
|
||||
|
||||
ms.Start()
|
||||
}
|
||||
|
||||
func createMessageServer() (*MessageServer, context.CancelFunc) {
|
||||
ns := tests.GiveMeTestNamespace()
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
return NewMessageServer(ctx, ns, redisPool), cancel
|
||||
}
|
@ -3,6 +3,7 @@
|
||||
package pool
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/gocraft/work"
|
||||
@ -72,6 +73,12 @@ func (rj *RedisJob) Run(j *work.Job) error {
|
||||
}
|
||||
}()
|
||||
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
err = fmt.Errorf("Runtime error: %s", r)
|
||||
}
|
||||
}()
|
||||
|
||||
//Start to run
|
||||
rj.jobRunning(j.ID)
|
||||
//Inject data
|
||||
|
@ -323,6 +323,8 @@ func (gcwp *GoCraftWorkPool) Stats() (models.JobPoolStats, error) {
|
||||
return models.JobPoolStats{}, err
|
||||
}
|
||||
|
||||
fmt.Printf("hbs=%+#v\n", hbs[0])
|
||||
|
||||
//Find the heartbeat of this pool via pid
|
||||
stats := make([]*models.JobPoolStatsData, 0)
|
||||
for _, hb := range hbs {
|
||||
|
413
src/jobservice_v2/pool/redis_pool_test.go
Normal file
413
src/jobservice_v2/pool/redis_pool_test.go
Normal file
@ -0,0 +1,413 @@
|
||||
// Copyright 2018 The Harbor Authors. All rights reserved.
|
||||
package pool
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"reflect"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/vmware/harbor/src/jobservice_v2/errs"
|
||||
"github.com/vmware/harbor/src/jobservice_v2/job"
|
||||
"github.com/vmware/harbor/src/jobservice_v2/logger"
|
||||
"github.com/vmware/harbor/src/jobservice_v2/opm"
|
||||
|
||||
"github.com/vmware/harbor/src/jobservice_v2/tests"
|
||||
|
||||
"github.com/vmware/harbor/src/jobservice_v2/env"
|
||||
)
|
||||
|
||||
var rPool = tests.GiveMeRedisPool()
|
||||
|
||||
func TestRegisterJob(t *testing.T) {
|
||||
wp, _, _ := createRedisWorkerPool()
|
||||
defer func() {
|
||||
if err := tests.ClearAll(tests.GiveMeTestNamespace(), redisPool.Get()); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}()
|
||||
|
||||
if err := wp.RegisterJob("fake_job", (*fakeJob)(nil)); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
jobs := make(map[string]interface{})
|
||||
jobs["fake_job_1st"] = (*fakeJob)(nil)
|
||||
jobs["fake_job_2nd"] = (*fakeJob)(nil)
|
||||
if err := wp.RegisterJobs(jobs); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
if _, ok := wp.IsKnownJob("fake_job"); !ok {
|
||||
t.Error("expect known job but seems failed to register job 'fake_job'")
|
||||
}
|
||||
|
||||
params := make(map[string]interface{})
|
||||
params["name"] = "testing:v1"
|
||||
if err := wp.ValidateJobParameters((*fakeJob)(nil), params); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEnqueueJob(t *testing.T) {
|
||||
wp, sysCtx, cancel := createRedisWorkerPool()
|
||||
defer func() {
|
||||
if err := tests.ClearAll(tests.GiveMeTestNamespace(), redisPool.Get()); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}()
|
||||
defer cancel()
|
||||
|
||||
if err := wp.RegisterJob("fake_job", (*fakeJob)(nil)); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if err := wp.RegisterJob("fake_unique_job", (*fakeUniqueJob)(nil)); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
go wp.Start()
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
params := make(map[string]interface{})
|
||||
params["name"] = "testing:v1"
|
||||
stats, err := wp.Enqueue("fake_job", params, false)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if stats.Stats.JobID == "" {
|
||||
t.Error("expect none nil job stats but got nil")
|
||||
}
|
||||
|
||||
runAt := time.Now().Unix() + 20
|
||||
stats, err = wp.Schedule("fake_job", params, 20, false)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
if stats.Stats.RunAt > 0 && stats.Stats.RunAt < runAt {
|
||||
t.Errorf("expect returned 'RunAt' should be >= '%d' but seems not", runAt)
|
||||
}
|
||||
|
||||
stats, err = wp.Enqueue("fake_unique_job", params, true)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if stats.Stats.JobID == "" {
|
||||
t.Error("expect none nil job stats but got nil")
|
||||
}
|
||||
|
||||
cancel()
|
||||
sysCtx.WG.Wait()
|
||||
}
|
||||
|
||||
func TestEnqueuePeriodicJob(t *testing.T) {
|
||||
wp, _, cancel := createRedisWorkerPool()
|
||||
defer func() {
|
||||
if err := tests.ClearAll(tests.GiveMeTestNamespace(), redisPool.Get()); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}()
|
||||
defer cancel()
|
||||
|
||||
if err := wp.RegisterJob("fake_job", (*fakeJob)(nil)); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
go wp.Start()
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
params := make(map[string]interface{})
|
||||
params["name"] = "testing:v1"
|
||||
jobStats, err := wp.PeriodicallyEnqueue("fake_job", params, "10 * * * * *")
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
<-time.After(1 * time.Second)
|
||||
|
||||
jStats, err := wp.GetJobStats(jobStats.Stats.JobID)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
if jobStats.Stats.JobName != jStats.Stats.JobName {
|
||||
t.Error("expect same job stats but got different ones")
|
||||
}
|
||||
|
||||
if err := wp.StopJob(jStats.Stats.JobID); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
//cancel()
|
||||
//<-time.After(1 * time.Second)
|
||||
}
|
||||
|
||||
/*func TestCancelAndRetryJobWithHook(t *testing.T) {
|
||||
wp, _, cancel := createRedisWorkerPool()
|
||||
defer func() {
|
||||
if err := tests.ClearAll(tests.GiveMeTestNamespace(), redisPool.Get()); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}()
|
||||
defer cancel()
|
||||
|
||||
if err := wp.RegisterJob("fake_runnable_job", (*fakeRunnableJob)(nil)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
go wp.Start()
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
fmt.Fprintln(w, "ok")
|
||||
}))
|
||||
defer ts.Close()
|
||||
|
||||
params := make(map[string]interface{})
|
||||
params["name"] = "testing:v1"
|
||||
res, err := wp.Enqueue("fake_runnable_job", params, false)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := wp.RegisterHook(res.Stats.JobID, ts.URL); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
//make sure it's running
|
||||
timer := time.NewTimer(1 * time.Second)
|
||||
defer timer.Stop()
|
||||
|
||||
CHECK:
|
||||
<-timer.C
|
||||
if check, err := wp.GetJobStats(res.Stats.JobID); err != nil {
|
||||
t.Fatal(err)
|
||||
} else {
|
||||
if check.Stats.Status != job.JobStatusRunning {
|
||||
timer.Reset(1 * time.Second)
|
||||
goto CHECK
|
||||
}
|
||||
}
|
||||
|
||||
//cancel
|
||||
if err := wp.CancelJob(res.Stats.JobID); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
<-time.After(5 * time.Second)
|
||||
updatedRes, err := wp.GetJobStats(res.Stats.JobID)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if updatedRes.Stats.Status != job.JobStatusCancelled {
|
||||
t.Fatalf("expect job staus '%s' but got '%s'\n", job.JobStatusCancelled, updatedRes.Stats.Status)
|
||||
}
|
||||
if updatedRes.Stats.DieAt == 0 {
|
||||
t.Fatalf("expect none zero 'DieAt' but got 0 value")
|
||||
}
|
||||
|
||||
//retry
|
||||
if err := wp.RetryJob(updatedRes.Stats.JobID); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}*/
|
||||
|
||||
func createRedisWorkerPool() (*GoCraftWorkPool, *env.Context, context.CancelFunc) {
|
||||
ctx := context.Background()
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
envCtx := &env.Context{
|
||||
SystemContext: ctx,
|
||||
WG: new(sync.WaitGroup),
|
||||
ErrorChan: make(chan error, 1),
|
||||
JobContext: newContext(ctx),
|
||||
}
|
||||
|
||||
return NewGoCraftWorkPool(envCtx, tests.GiveMeTestNamespace(), 3, rPool), envCtx, cancel
|
||||
}
|
||||
|
||||
type fakeJob struct{}
|
||||
|
||||
func (j *fakeJob) MaxFails() uint {
|
||||
return 3
|
||||
}
|
||||
|
||||
func (j *fakeJob) ShouldRetry() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (j *fakeJob) Validate(params map[string]interface{}) error {
|
||||
if p, ok := params["name"]; ok {
|
||||
if p == "testing:v1" {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
return errors.New("testing error")
|
||||
}
|
||||
|
||||
func (j *fakeJob) Run(ctx env.JobContext, params map[string]interface{}) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
type fakeUniqueJob struct{}
|
||||
|
||||
func (j *fakeUniqueJob) MaxFails() uint {
|
||||
return 3
|
||||
}
|
||||
|
||||
func (j *fakeUniqueJob) ShouldRetry() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (j *fakeUniqueJob) Validate(params map[string]interface{}) error {
|
||||
if p, ok := params["name"]; ok {
|
||||
if p == "testing:v1" {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
return errors.New("testing error")
|
||||
}
|
||||
|
||||
func (j *fakeUniqueJob) Run(ctx env.JobContext, params map[string]interface{}) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
type fakeRunnableJob struct{}
|
||||
|
||||
func (j *fakeRunnableJob) MaxFails() uint {
|
||||
return 2
|
||||
}
|
||||
|
||||
func (j *fakeRunnableJob) ShouldRetry() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (j *fakeRunnableJob) Validate(params map[string]interface{}) error {
|
||||
if p, ok := params["name"]; ok {
|
||||
if p == "testing:v1" {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
return errors.New("testing error")
|
||||
}
|
||||
|
||||
func (j *fakeRunnableJob) Run(ctx env.JobContext, params map[string]interface{}) error {
|
||||
tk := time.NewTicker(1 * time.Second)
|
||||
defer tk.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-tk.C:
|
||||
cmd, ok := ctx.OPCommand()
|
||||
if ok {
|
||||
if cmd == opm.CtlCommandStop {
|
||||
return errs.JobStoppedError()
|
||||
}
|
||||
|
||||
return errs.JobCancelledError()
|
||||
}
|
||||
case <-ctx.SystemContext().Done():
|
||||
return nil
|
||||
case <-time.After(1 * time.Minute):
|
||||
return errors.New("fake job timeout")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type fakeContext struct {
|
||||
//System context
|
||||
sysContext context.Context
|
||||
|
||||
//op command func
|
||||
opCommandFunc job.CheckOPCmdFunc
|
||||
|
||||
//checkin func
|
||||
checkInFunc job.CheckInFunc
|
||||
|
||||
//other required information
|
||||
properties map[string]interface{}
|
||||
}
|
||||
|
||||
func newContext(sysCtx context.Context) *fakeContext {
|
||||
return &fakeContext{
|
||||
sysContext: sysCtx,
|
||||
properties: make(map[string]interface{}),
|
||||
}
|
||||
}
|
||||
|
||||
//Build implements the same method in env.JobContext interface
|
||||
//This func will build the job execution context before running
|
||||
func (c *fakeContext) Build(dep env.JobData) (env.JobContext, error) {
|
||||
jContext := &fakeContext{
|
||||
sysContext: c.sysContext,
|
||||
properties: make(map[string]interface{}),
|
||||
}
|
||||
|
||||
//Copy properties
|
||||
if len(c.properties) > 0 {
|
||||
for k, v := range c.properties {
|
||||
jContext.properties[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
if opCommandFunc, ok := dep.ExtraData["opCommandFunc"]; ok {
|
||||
if reflect.TypeOf(opCommandFunc).Kind() == reflect.Func {
|
||||
if funcRef, ok := opCommandFunc.(job.CheckOPCmdFunc); ok {
|
||||
jContext.opCommandFunc = funcRef
|
||||
}
|
||||
}
|
||||
}
|
||||
if jContext.opCommandFunc == nil {
|
||||
return nil, errors.New("failed to inject opCommandFunc")
|
||||
}
|
||||
|
||||
if checkInFunc, ok := dep.ExtraData["checkInFunc"]; ok {
|
||||
if reflect.TypeOf(checkInFunc).Kind() == reflect.Func {
|
||||
if funcRef, ok := checkInFunc.(job.CheckInFunc); ok {
|
||||
jContext.checkInFunc = funcRef
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if jContext.checkInFunc == nil {
|
||||
return nil, errors.New("failed to inject checkInFunc")
|
||||
}
|
||||
|
||||
return jContext, nil
|
||||
}
|
||||
|
||||
//Get implements the same method in env.JobContext interface
|
||||
func (c *fakeContext) Get(prop string) (interface{}, bool) {
|
||||
v, ok := c.properties[prop]
|
||||
return v, ok
|
||||
}
|
||||
|
||||
//SystemContext implements the same method in env.JobContext interface
|
||||
func (c *fakeContext) SystemContext() context.Context {
|
||||
return c.sysContext
|
||||
}
|
||||
|
||||
//Checkin is bridge func for reporting detailed status
|
||||
func (c *fakeContext) Checkin(status string) error {
|
||||
if c.checkInFunc != nil {
|
||||
c.checkInFunc(status)
|
||||
} else {
|
||||
return errors.New("nil check in function")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
//OPCommand return the control operational command like stop/cancel if have
|
||||
func (c *fakeContext) OPCommand() (string, bool) {
|
||||
if c.opCommandFunc != nil {
|
||||
return c.opCommandFunc()
|
||||
}
|
||||
|
||||
return "", false
|
||||
}
|
||||
|
||||
//GetLogger returns the logger
|
||||
func (c *fakeContext) GetLogger() logger.Interface {
|
||||
return nil
|
||||
}
|
@ -25,8 +25,8 @@ const (
|
||||
func GiveMeRedisPool() *redis.Pool {
|
||||
redisHost := getRedisHost()
|
||||
redisPool := &redis.Pool{
|
||||
MaxActive: 2,
|
||||
MaxIdle: 2,
|
||||
MaxActive: 6,
|
||||
MaxIdle: 6,
|
||||
Wait: true,
|
||||
Dial: func() (redis.Conn, error) {
|
||||
return redis.Dial(
|
||||
@ -58,6 +58,28 @@ func Clear(key string, conn redis.Conn) error {
|
||||
return errors.New("failed to clear")
|
||||
}
|
||||
|
||||
//ClearAll ...
|
||||
func ClearAll(namespace string, conn redis.Conn) error {
|
||||
defer conn.Close()
|
||||
|
||||
keys, err := redis.Strings(conn.Do("KEYS", fmt.Sprintf("%s:*", namespace)))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(keys) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, key := range keys {
|
||||
if err := conn.Send("DEL", key); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return conn.Flush()
|
||||
}
|
||||
|
||||
func getRedisHost() string {
|
||||
redisHost := os.Getenv(testingRedisHost)
|
||||
if redisHost == "" {
|
||||
|
Loading…
Reference in New Issue
Block a user