Merge pull request #4536 from vmware/add_ut_for_pool

Add UT cases for package pool
This commit is contained in:
Steven Zou 2018-03-29 23:16:21 +08:00 committed by GitHub
commit 0e7822278e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 707 additions and 115 deletions

View File

@ -49,14 +49,14 @@ func TestLaunchJobSucceed(t *testing.T) {
res, err := postReq(fmt.Sprintf("http://localhost:%d/api/v1/jobs", port), createJobReq(true)) res, err := postReq(fmt.Sprintf("http://localhost:%d/api/v1/jobs", port), createJobReq(true))
if err != nil { if err != nil {
t.Error(err) t.Fatal(err)
} }
obj, err := getResult(res) obj, err := getResult(res)
if err != nil { if err != nil {
t.Error(err) t.Fatal(err)
} }
if obj.Stats.JobID != "fake_ID_ok" { if obj.Stats.JobID != "fake_ID_ok" {
t.Errorf("expect job ID 'fake_ID_ok' but got '%s'\n", obj.Stats.JobID) t.Fatalf("expect job ID 'fake_ID_ok' but got '%s'\n", obj.Stats.JobID)
} }
server.Stop() server.Stop()
@ -70,7 +70,7 @@ func TestGetJobFailed(t *testing.T) {
res, err := getReq(fmt.Sprintf("http://localhost:%d/api/v1/jobs/fake_job", port)) res, err := getReq(fmt.Sprintf("http://localhost:%d/api/v1/jobs/fake_job", port))
if e := expectFormatedError(res, err); e != nil { if e := expectFormatedError(res, err); e != nil {
t.Error(e) t.Fatal(e)
} }
server.Stop() server.Stop()
@ -84,14 +84,14 @@ func TestGetJobSucceed(t *testing.T) {
res, err := getReq(fmt.Sprintf("http://localhost:%d/api/v1/jobs/fake_job_ok", port)) res, err := getReq(fmt.Sprintf("http://localhost:%d/api/v1/jobs/fake_job_ok", port))
if err != nil { if err != nil {
t.Error(err) t.Fatal(err)
} }
obj, err := getResult(res) obj, err := getResult(res)
if err != nil { if err != nil {
t.Error(err) t.Fatal(err)
} }
if obj.Stats.JobName != "testing" || obj.Stats.JobID != "fake_ID_ok" { if obj.Stats.JobName != "testing" || obj.Stats.JobID != "fake_ID_ok" {
t.Errorf("expect job ID 'fake_ID_ok' of 'testing', but got '%s'\n", obj.Stats.JobID) t.Fatalf("expect job ID 'fake_ID_ok' of 'testing', but got '%s'\n", obj.Stats.JobID)
} }
server.Stop() server.Stop()
@ -105,21 +105,21 @@ func TestJobActionFailed(t *testing.T) {
actionReq, err := createJobActionReq("stop") actionReq, err := createJobActionReq("stop")
if err != nil { if err != nil {
t.Error(err) t.Fatal(err)
} }
resData, err := postReq(fmt.Sprintf("http://localhost:%d/api/v1/jobs/fake_job", port), actionReq) resData, err := postReq(fmt.Sprintf("http://localhost:%d/api/v1/jobs/fake_job", port), actionReq)
expectFormatedError(resData, err) expectFormatedError(resData, err)
actionReq, err = createJobActionReq("cancel") actionReq, err = createJobActionReq("cancel")
if err != nil { if err != nil {
t.Error(err) t.Fatal(err)
} }
resData, err = postReq(fmt.Sprintf("http://localhost:%d/api/v1/jobs/fake_job", port), actionReq) resData, err = postReq(fmt.Sprintf("http://localhost:%d/api/v1/jobs/fake_job", port), actionReq)
expectFormatedError(resData, err) expectFormatedError(resData, err)
actionReq, err = createJobActionReq("retry") actionReq, err = createJobActionReq("retry")
if err != nil { if err != nil {
t.Error(err) t.Fatal(err)
} }
resData, err = postReq(fmt.Sprintf("http://localhost:%d/api/v1/jobs/fake_job", port), actionReq) resData, err = postReq(fmt.Sprintf("http://localhost:%d/api/v1/jobs/fake_job", port), actionReq)
expectFormatedError(resData, err) expectFormatedError(resData, err)
@ -135,29 +135,29 @@ func TestJobActionSucceed(t *testing.T) {
actionReq, err := createJobActionReq("stop") actionReq, err := createJobActionReq("stop")
if err != nil { if err != nil {
t.Error(err) t.Fatal(err)
} }
_, err = postReq(fmt.Sprintf("http://localhost:%d/api/v1/jobs/fake_job_ok", port), actionReq) _, err = postReq(fmt.Sprintf("http://localhost:%d/api/v1/jobs/fake_job_ok", port), actionReq)
if err != nil { if err != nil {
t.Error(err) t.Fatal(err)
} }
actionReq, err = createJobActionReq("cancel") actionReq, err = createJobActionReq("cancel")
if err != nil { if err != nil {
t.Error(err) t.Fatal(err)
} }
_, err = postReq(fmt.Sprintf("http://localhost:%d/api/v1/jobs/fake_job_ok", port), actionReq) _, err = postReq(fmt.Sprintf("http://localhost:%d/api/v1/jobs/fake_job_ok", port), actionReq)
if err != nil { if err != nil {
t.Error(err) t.Fatal(err)
} }
actionReq, err = createJobActionReq("retry") actionReq, err = createJobActionReq("retry")
if err != nil { if err != nil {
t.Error(err) t.Fatal(err)
} }
_, err = postReq(fmt.Sprintf("http://localhost:%d/api/v1/jobs/fake_job_ok", port), actionReq) _, err = postReq(fmt.Sprintf("http://localhost:%d/api/v1/jobs/fake_job_ok", port), actionReq)
if err != nil { if err != nil {
t.Error(err) t.Fatal(err)
} }
server.Stop() server.Stop()
@ -171,7 +171,7 @@ func TestCheckStatus(t *testing.T) {
resData, err := getReq(fmt.Sprintf("http://localhost:%d/api/v1/stats", port)) resData, err := getReq(fmt.Sprintf("http://localhost:%d/api/v1/stats", port))
if err != nil { if err != nil {
t.Error(err) t.Fatal(err)
} }
poolStats := &models.JobPoolStats{ poolStats := &models.JobPoolStats{
@ -179,11 +179,11 @@ func TestCheckStatus(t *testing.T) {
} }
err = json.Unmarshal(resData, poolStats) err = json.Unmarshal(resData, poolStats)
if err != nil { if err != nil {
t.Error(err) t.Fatal(err)
} }
if poolStats.Pools[0].WorkerPoolID != "fake_pool_ID" { if poolStats.Pools[0].WorkerPoolID != "fake_pool_ID" {
t.Errorf("expect pool ID 'fake_pool_ID' but got '%s'", poolStats.Pools[0].WorkerPoolID) t.Fatalf("expect pool ID 'fake_pool_ID' but got '%s'", poolStats.Pools[0].WorkerPoolID)
} }
server.Stop() server.Stop()
@ -197,11 +197,11 @@ func TestGetJobLog(t *testing.T) {
resData, err := getReq(fmt.Sprintf("http://localhost:%d/api/v1/jobs/fake_job_ok/log", port)) resData, err := getReq(fmt.Sprintf("http://localhost:%d/api/v1/jobs/fake_job_ok/log", port))
if err != nil { if err != nil {
t.Error(err) t.Fatal(err)
} }
if len(resData) == 0 { if len(resData) == 0 {
t.Error("expect job log but got nothing") t.Fatal("expect job log but got nothing")
} }
server.Stop() server.Stop()

View File

@ -9,22 +9,22 @@ import (
func TestConfigLoadingFailed(t *testing.T) { func TestConfigLoadingFailed(t *testing.T) {
cfg := &Configuration{} cfg := &Configuration{}
if err := cfg.Load("./config.not-existing.yaml", false); err == nil { if err := cfg.Load("./config.not-existing.yaml", false); err == nil {
t.Errorf("Load config from none-existing document, expect none nil error but got '%s'\n", err) t.Fatalf("Load config from none-existing document, expect none nil error but got '%s'\n", err)
} }
} }
func TestConfigLoadingSucceed(t *testing.T) { func TestConfigLoadingSucceed(t *testing.T) {
if err := CreateLogDir(); err != nil { if err := CreateLogDir(); err != nil {
t.Error(err) t.Fatal(err)
} }
cfg := &Configuration{} cfg := &Configuration{}
if err := cfg.Load("../config_test.yml", false); err != nil { if err := cfg.Load("../config_test.yml", false); err != nil {
t.Errorf("Load config from yaml file, expect nil error but got error '%s'\n", err) t.Fatalf("Load config from yaml file, expect nil error but got error '%s'\n", err)
} }
if err := RemoveLogDir(); err != nil { if err := RemoveLogDir(); err != nil {
t.Error(err) t.Fatal(err)
} }
} }
@ -36,70 +36,70 @@ func TestConfigLoadingWithEnv(t *testing.T) {
cfg := &Configuration{} cfg := &Configuration{}
if err := cfg.Load("../config_test.yml", true); err != nil { if err := cfg.Load("../config_test.yml", true); err != nil {
t.Errorf("Load config from yaml file, expect nil error but got error '%s'\n", err) t.Fatalf("Load config from yaml file, expect nil error but got error '%s'\n", err)
} }
if cfg.Protocol != "https" { if cfg.Protocol != "https" {
t.Errorf("expect protocol 'https', but got '%s'\n", cfg.Protocol) t.Fatalf("expect protocol 'https', but got '%s'\n", cfg.Protocol)
} }
if cfg.Port != 8989 { if cfg.Port != 8989 {
t.Errorf("expect port 8989 but got '%d'\n", cfg.Port) t.Fatalf("expect port 8989 but got '%d'\n", cfg.Port)
} }
if cfg.PoolConfig.WorkerCount != 8 { if cfg.PoolConfig.WorkerCount != 8 {
t.Errorf("expect workcount 8 but go '%d'\n", cfg.PoolConfig.WorkerCount) t.Fatalf("expect workcount 8 but go '%d'\n", cfg.PoolConfig.WorkerCount)
} }
if cfg.PoolConfig.RedisPoolCfg.Host != "localhost" { if cfg.PoolConfig.RedisPoolCfg.Host != "localhost" {
t.Errorf("expect redis host 'localhost' but got '%s'\n", cfg.PoolConfig.RedisPoolCfg.Host) t.Fatalf("expect redis host 'localhost' but got '%s'\n", cfg.PoolConfig.RedisPoolCfg.Host)
} }
if cfg.PoolConfig.RedisPoolCfg.Port != 7379 { if cfg.PoolConfig.RedisPoolCfg.Port != 7379 {
t.Errorf("expect redis port '7379' but got '%d'\n", cfg.PoolConfig.RedisPoolCfg.Port) t.Fatalf("expect redis port '7379' but got '%d'\n", cfg.PoolConfig.RedisPoolCfg.Port)
} }
if cfg.PoolConfig.RedisPoolCfg.Namespace != "ut_namespace" { if cfg.PoolConfig.RedisPoolCfg.Namespace != "ut_namespace" {
t.Errorf("expect redis namespace 'ut_namespace' but got '%s'\n", cfg.PoolConfig.RedisPoolCfg.Namespace) t.Fatalf("expect redis namespace 'ut_namespace' but got '%s'\n", cfg.PoolConfig.RedisPoolCfg.Namespace)
} }
if cfg.LoggerConfig.BasePath != "/tmp" { if cfg.LoggerConfig.BasePath != "/tmp" {
t.Errorf("expect log base path '/tmp' but got '%s'\n", cfg.LoggerConfig.BasePath) t.Fatalf("expect log base path '/tmp' but got '%s'\n", cfg.LoggerConfig.BasePath)
} }
if cfg.LoggerConfig.LogLevel != "DEBUG" { if cfg.LoggerConfig.LogLevel != "DEBUG" {
t.Errorf("expect log level 'DEBUG' but got '%s'\n", cfg.LoggerConfig.LogLevel) t.Fatalf("expect log level 'DEBUG' but got '%s'\n", cfg.LoggerConfig.LogLevel)
} }
if cfg.LoggerConfig.ArchivePeriod != 5 { if cfg.LoggerConfig.ArchivePeriod != 5 {
t.Errorf("expect log archive period 5 but got '%d'\n", cfg.LoggerConfig.ArchivePeriod) t.Fatalf("expect log archive period 5 but got '%d'\n", cfg.LoggerConfig.ArchivePeriod)
} }
unsetENV() unsetENV()
if err := RemoveLogDir(); err != nil { if err := RemoveLogDir(); err != nil {
t.Error(err) t.Fatal(err)
} }
} }
func TestDefaultConfig(t *testing.T) { func TestDefaultConfig(t *testing.T) {
if err := CreateLogDir(); err != nil { if err := CreateLogDir(); err != nil {
t.Error(err) t.Fatal(err)
} }
if err := DefaultConfig.Load("../config_test.yml", true); err != nil { if err := DefaultConfig.Load("../config_test.yml", true); err != nil {
t.Errorf("Load config from yaml file, expect nil error but got error '%s'\n", err) t.Fatalf("Load config from yaml file, expect nil error but got error '%s'\n", err)
} }
if endpoint := GetAdminServerEndpoint(); endpoint != "http://127.0.0.1:8888" { if endpoint := GetAdminServerEndpoint(); endpoint != "http://127.0.0.1:8888" {
t.Errorf("expect default admin server endpoint 'http://127.0.0.1:8888' but got '%s'\n", endpoint) t.Fatalf("expect default admin server endpoint 'http://127.0.0.1:8888' but got '%s'\n", endpoint)
} }
if basePath := GetLogBasePath(); basePath != "/tmp/job_logs" { if basePath := GetLogBasePath(); basePath != "/tmp/job_logs" {
t.Errorf("expect default logger base path '/tmp/job_logs' but got '%s'\n", basePath) t.Fatalf("expect default logger base path '/tmp/job_logs' but got '%s'\n", basePath)
} }
if lvl := GetLogLevel(); lvl != "INFO" { if lvl := GetLogLevel(); lvl != "INFO" {
t.Errorf("expect default logger level 'INFO' but got '%s'\n", lvl) t.Fatalf("expect default logger level 'INFO' but got '%s'\n", lvl)
} }
if period := GetLogArchivePeriod(); period != 1 { if period := GetLogArchivePeriod(); period != 1 {
t.Errorf("expect default log archive period 1 but got '%d'\n", period) t.Fatalf("expect default log archive period 1 but got '%d'\n", period)
} }
if err := RemoveLogDir(); err != nil { if err := RemoveLogDir(); err != nil {
t.Error(err) t.Fatal(err)
} }
} }

View File

@ -17,11 +17,11 @@ func TestLaunchGenericJob(t *testing.T) {
req := createJobReq("Generic", false, false) req := createJobReq("Generic", false, false)
res, err := c.LaunchJob(req) res, err := c.LaunchJob(req)
if err != nil { if err != nil {
t.Error(err) t.Fatal(err)
} }
if res.Stats.JobID != "fake_ID" { if res.Stats.JobID != "fake_ID" {
t.Errorf("expect enqueued job ID 'fake_ID' but got '%s'\n", res.Stats.JobID) t.Fatalf("expect enqueued job ID 'fake_ID' but got '%s'\n", res.Stats.JobID)
} }
} }
@ -31,11 +31,11 @@ func TestLaunchGenericJobUnique(t *testing.T) {
req := createJobReq("Generic", true, false) req := createJobReq("Generic", true, false)
res, err := c.LaunchJob(req) res, err := c.LaunchJob(req)
if err != nil { if err != nil {
t.Error(err) t.Fatal(err)
} }
if res.Stats.JobID != "fake_ID" { if res.Stats.JobID != "fake_ID" {
t.Errorf("expect enqueued job ID 'fake_ID' but got '%s'\n", res.Stats.JobID) t.Fatalf("expect enqueued job ID 'fake_ID' but got '%s'\n", res.Stats.JobID)
} }
} }
@ -45,11 +45,11 @@ func TestLaunchGenericJobWithHook(t *testing.T) {
req := createJobReq("Generic", false, true) req := createJobReq("Generic", false, true)
res, err := c.LaunchJob(req) res, err := c.LaunchJob(req)
if err != nil { if err != nil {
t.Error(err) t.Fatal(err)
} }
if res.Stats.JobID != "fake_ID" { if res.Stats.JobID != "fake_ID" {
t.Errorf("expect enqueued job ID 'fake_ID' but got '%s'\n", res.Stats.JobID) t.Fatalf("expect enqueued job ID 'fake_ID' but got '%s'\n", res.Stats.JobID)
} }
} }
@ -59,11 +59,11 @@ func TestLaunchScheduledJob(t *testing.T) {
req := createJobReq("Scheduled", false, true) req := createJobReq("Scheduled", false, true)
res, err := c.LaunchJob(req) res, err := c.LaunchJob(req)
if err != nil { if err != nil {
t.Error(err) t.Fatal(err)
} }
if res.Stats.JobID != "fake_ID_Scheduled" { if res.Stats.JobID != "fake_ID_Scheduled" {
t.Errorf("expect enqueued job ID 'fake_ID_Scheduled' but got '%s'\n", res.Stats.JobID) t.Fatalf("expect enqueued job ID 'fake_ID_Scheduled' but got '%s'\n", res.Stats.JobID)
} }
} }
@ -73,11 +73,11 @@ func TestLaunchScheduledUniqueJob(t *testing.T) {
req := createJobReq("Scheduled", true, false) req := createJobReq("Scheduled", true, false)
res, err := c.LaunchJob(req) res, err := c.LaunchJob(req)
if err != nil { if err != nil {
t.Error(err) t.Fatal(err)
} }
if res.Stats.JobID != "fake_ID_Scheduled" { if res.Stats.JobID != "fake_ID_Scheduled" {
t.Errorf("expect enqueued job ID 'fake_ID_Scheduled' but got '%s'\n", res.Stats.JobID) t.Fatalf("expect enqueued job ID 'fake_ID_Scheduled' but got '%s'\n", res.Stats.JobID)
} }
} }
@ -87,11 +87,11 @@ func TestLaunchPeriodicJob(t *testing.T) {
req := createJobReq("Periodic", true, false) req := createJobReq("Periodic", true, false)
res, err := c.LaunchJob(req) res, err := c.LaunchJob(req)
if err != nil { if err != nil {
t.Error(err) t.Fatal(err)
} }
if res.Stats.JobID != "fake_ID_Periodic" { if res.Stats.JobID != "fake_ID_Periodic" {
t.Errorf("expect enqueued job ID 'fake_ID_Periodic' but got '%s'\n", res.Stats.JobID) t.Fatalf("expect enqueued job ID 'fake_ID_Periodic' but got '%s'\n", res.Stats.JobID)
} }
} }
@ -100,11 +100,11 @@ func TestGetJobStats(t *testing.T) {
c := NewController(pool) c := NewController(pool)
stats, err := c.GetJob("fake_ID") stats, err := c.GetJob("fake_ID")
if err != nil { if err != nil {
t.Error(err) t.Fatal(err)
} }
if stats.Stats.Status != "running" { if stats.Stats.Status != "running" {
t.Errorf("expect stauts 'running' but got '%s'\n", stats.Stats.Status) t.Fatalf("expect stauts 'running' but got '%s'\n", stats.Stats.Status)
} }
} }
@ -113,15 +113,15 @@ func TestJobActions(t *testing.T) {
c := NewController(pool) c := NewController(pool)
if err := c.StopJob("fake_ID"); err != nil { if err := c.StopJob("fake_ID"); err != nil {
t.Error(err) t.Fatal(err)
} }
if err := c.CancelJob("fake_ID"); err != nil { if err := c.CancelJob("fake_ID"); err != nil {
t.Error(err) t.Fatal(err)
} }
if err := c.RetryJob("fake_ID"); err != nil { if err := c.RetryJob("fake_ID"); err != nil {
t.Error(err) t.Fatal(err)
} }
} }
@ -134,7 +134,7 @@ func TestGetJobLogData(t *testing.T) {
t.Errorf("expect object not found error but got '%s'\n", err) t.Errorf("expect object not found error but got '%s'\n", err)
} }
} else { } else {
t.Error("expect error but got nil") t.Fatal("expect error but got nil")
} }
} }
@ -144,15 +144,15 @@ func TestCheckStatus(t *testing.T) {
st, err := c.CheckStatus() st, err := c.CheckStatus()
if err != nil { if err != nil {
t.Error(err) t.Fatal(err)
} }
if len(st.Pools) == 0 { if len(st.Pools) == 0 {
t.Error("expect status data but got zero list") t.Fatal("expect status data but got zero list")
} }
if st.Pools[0].Status != "running" { if st.Pools[0].Status != "running" {
t.Errorf("expect status 'running' but got '%s'\n", st.Pools[0].Status) t.Fatalf("expect status 'running' but got '%s'\n", st.Pools[0].Status)
} }
} }
@ -170,23 +170,23 @@ func TestInvalidCheck(t *testing.T) {
} }
if _, err := c.LaunchJob(req); err == nil { if _, err := c.LaunchJob(req); err == nil {
t.Error("error expected but got nil") t.Fatal("error expected but got nil")
} }
req.Job.Name = "fake" req.Job.Name = "fake"
if _, err := c.LaunchJob(req); err == nil { if _, err := c.LaunchJob(req); err == nil {
t.Error("error expected but got nil") t.Fatal("error expected but got nil")
} }
req.Job.Metadata.JobKind = "Scheduled" req.Job.Metadata.JobKind = "Scheduled"
if _, err := c.LaunchJob(req); err == nil { if _, err := c.LaunchJob(req); err == nil {
t.Error("error expected but got nil") t.Fatal("error expected but got nil")
} }
req.Job.Metadata.JobKind = "Periodic" req.Job.Metadata.JobKind = "Periodic"
req.Job.Metadata.Cron = "x x x x x x" req.Job.Metadata.Cron = "x x x x x x"
if _, err := c.LaunchJob(req); err == nil { if _, err := c.LaunchJob(req); err == nil {
t.Error("error expected but got nil") t.Fatal("error expected but got nil")
} }
} }

View File

@ -13,11 +13,11 @@ func TestSweeper(t *testing.T) {
workDir := "/tmp/sweeper_logs" workDir := "/tmp/sweeper_logs"
if err := os.MkdirAll(workDir, 0755); err != nil { if err := os.MkdirAll(workDir, 0755); err != nil {
t.Error(err) t.Fatal(err)
} }
_, err := os.Create(fmt.Sprintf("%s/sweeper_test.log", workDir)) _, err := os.Create(fmt.Sprintf("%s/sweeper_test.log", workDir))
if err != nil { if err != nil {
t.Error(err) t.Fatal(err)
} }
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
@ -28,9 +28,9 @@ func TestSweeper(t *testing.T) {
<-time.After(100 * time.Millisecond) <-time.After(100 * time.Millisecond)
if err := os.Remove(fmt.Sprintf("%s/sweeper_test.log", workDir)); err != nil { if err := os.Remove(fmt.Sprintf("%s/sweeper_test.log", workDir)); err != nil {
t.Error(err) t.Fatal(err)
} }
if err := os.Remove(workDir); err != nil { if err := os.Remove(workDir); err != nil {
t.Error(err) t.Fatal(err)
} }
} }

View File

@ -21,7 +21,7 @@ func TestHookClient(t *testing.T) {
Status: "running", Status: "running",
}) })
if err != nil { if err != nil {
t.Error(err) t.Fatal(err)
} }
} }
@ -37,6 +37,6 @@ func TestReportStatusFailed(t *testing.T) {
Status: "running", Status: "running",
}) })
if err == nil { if err == nil {
t.Error("expect error but got nil") t.Fatal("expect error but got nil")
} }
} }

View File

@ -57,16 +57,16 @@ func TestSetJobStatus(t *testing.T) {
<-time.After(100 * time.Millisecond) <-time.After(100 * time.Millisecond)
stats, err := mgr.Retrieve("fake_job_ID") stats, err := mgr.Retrieve("fake_job_ID")
if err != nil { if err != nil {
t.Error(err) t.Fatal(err)
} }
if stats.Stats.Status != "running" { if stats.Stats.Status != "running" {
t.Errorf("expect job status 'running' but got '%s'\n", stats.Stats.Status) t.Fatalf("expect job status 'running' but got '%s'\n", stats.Stats.Status)
} }
key := utils.KeyJobStats(testingNamespace, "fake_job_ID") key := utils.KeyJobStats(testingNamespace, "fake_job_ID")
if err := clear(key, redisPool.Get()); err != nil { if err := clear(key, redisPool.Get()); err != nil {
t.Error(err) t.Fatal(err)
} }
} }
@ -77,20 +77,20 @@ func TestCommand(t *testing.T) {
<-time.After(200 * time.Millisecond) <-time.After(200 * time.Millisecond)
if err := mgr.SendCommand("fake_job_ID", CtlCommandStop); err != nil { if err := mgr.SendCommand("fake_job_ID", CtlCommandStop); err != nil {
t.Error(err) t.Fatal(err)
} }
if cmd, err := mgr.CtlCommand("fake_job_ID"); err != nil { if cmd, err := mgr.CtlCommand("fake_job_ID"); err != nil {
t.Error(err) t.Fatal(err)
} else { } else {
if cmd != CtlCommandStop { if cmd != CtlCommandStop {
t.Errorf("expect '%s' but got '%s'", CtlCommandStop, cmd) t.Fatalf("expect '%s' but got '%s'", CtlCommandStop, cmd)
} }
} }
key := utils.KeyJobCtlCommands(testingNamespace, "fake_job_ID") key := utils.KeyJobCtlCommands(testingNamespace, "fake_job_ID")
if err := clear(key, redisPool.Get()); err != nil { if err := clear(key, redisPool.Get()); err != nil {
t.Error(err) t.Fatal(err)
} }
} }
@ -105,7 +105,7 @@ func TestDieAt(t *testing.T) {
dieAt := time.Now().Unix() dieAt := time.Now().Unix()
if err := createDeadJob(redisPool.Get(), dieAt); err != nil { if err := createDeadJob(redisPool.Get(), dieAt); err != nil {
t.Error(err) t.Fatal(err)
} }
<-time.After(200 * time.Millisecond) <-time.After(200 * time.Millisecond)
mgr.DieAt("fake_job_ID", dieAt) mgr.DieAt("fake_job_ID", dieAt)
@ -113,20 +113,20 @@ func TestDieAt(t *testing.T) {
stats, err := mgr.Retrieve("fake_job_ID") stats, err := mgr.Retrieve("fake_job_ID")
if err != nil { if err != nil {
t.Error(err) t.Fatal(err)
} }
if stats.Stats.DieAt != dieAt { if stats.Stats.DieAt != dieAt {
t.Errorf("expect die at '%d' but got '%d'\n", dieAt, stats.Stats.DieAt) t.Fatalf("expect die at '%d' but got '%d'\n", dieAt, stats.Stats.DieAt)
} }
key := utils.KeyJobStats(testingNamespace, "fake_job_ID") key := utils.KeyJobStats(testingNamespace, "fake_job_ID")
if err := clear(key, redisPool.Get()); err != nil { if err := clear(key, redisPool.Get()); err != nil {
t.Error(err) t.Fatal(err)
} }
key2 := utils.RedisKeyDead(testingNamespace) key2 := utils.RedisKeyDead(testingNamespace)
if err := clear(key2, redisPool.Get()); err != nil { if err := clear(key2, redisPool.Get()); err != nil {
t.Error(err) t.Fatal(err)
} }
} }
@ -137,12 +137,12 @@ func TestRegisterHook(t *testing.T) {
<-time.After(200 * time.Millisecond) <-time.After(200 * time.Millisecond)
if err := mgr.RegisterHook("fake_job_ID", "http://localhost:9999", false); err != nil { if err := mgr.RegisterHook("fake_job_ID", "http://localhost:9999", false); err != nil {
t.Error(err) t.Fatal(err)
} }
key := utils.KeyJobStats(testingNamespace, "fake_job_ID") key := utils.KeyJobStats(testingNamespace, "fake_job_ID")
if err := clear(key, redisPool.Get()); err != nil { if err := clear(key, redisPool.Get()); err != nil {
t.Error(err) t.Fatal(err)
} }
} }
@ -158,12 +158,12 @@ func TestExpireJobStats(t *testing.T) {
<-time.After(200 * time.Millisecond) <-time.After(200 * time.Millisecond)
if err := mgr.ExpirePeriodicJobStats("fake_job_ID"); err != nil { if err := mgr.ExpirePeriodicJobStats("fake_job_ID"); err != nil {
t.Error(err) t.Fatal(err)
} }
key := utils.KeyJobStats(testingNamespace, "fake_job_ID") key := utils.KeyJobStats(testingNamespace, "fake_job_ID")
if err := clear(key, redisPool.Get()); err != nil { if err := clear(key, redisPool.Get()); err != nil {
t.Error(err) t.Fatal(err)
} }
} }
@ -185,7 +185,7 @@ func TestCheckIn(t *testing.T) {
defer ts.Close() defer ts.Close()
if err := mgr.RegisterHook("fake_job_ID", ts.URL, false); err != nil { if err := mgr.RegisterHook("fake_job_ID", ts.URL, false); err != nil {
t.Error(err) t.Fatal(err)
} }
mgr.CheckIn("fake_job_ID", "checkin") mgr.CheckIn("fake_job_ID", "checkin")
@ -193,16 +193,16 @@ func TestCheckIn(t *testing.T) {
stats, err := mgr.Retrieve("fake_job_ID") stats, err := mgr.Retrieve("fake_job_ID")
if err != nil { if err != nil {
t.Error(err) t.Fatal(err)
} }
if stats.Stats.CheckIn != "checkin" { if stats.Stats.CheckIn != "checkin" {
t.Errorf("expect check in info 'checkin' but got '%s'\n", stats.Stats.CheckIn) t.Fatalf("expect check in info 'checkin' but got '%s'\n", stats.Stats.CheckIn)
} }
key := utils.KeyJobStats(testingNamespace, "fake_job_ID") key := utils.KeyJobStats(testingNamespace, "fake_job_ID")
if err := clear(key, redisPool.Get()); err != nil { if err := clear(key, redisPool.Get()); err != nil {
t.Error(err) t.Fatal(err)
} }
} }

View File

@ -14,11 +14,11 @@ func TestPeriodicJobPolicy(t *testing.T) {
data, err := p.Serialize() data, err := p.Serialize()
if err != nil { if err != nil {
t.Error(err) t.Fatal(err)
} }
if err := p.DeSerialize(data); err != nil { if err := p.DeSerialize(data); err != nil {
t.Error(err) t.Fatal(err)
} }
} }
@ -37,17 +37,17 @@ func TestPeriodicJobPolicyStore(t *testing.T) {
pl = append(pl, createPolicy("")) pl = append(pl, createPolicy(""))
ps.addAll(pl) ps.addAll(pl)
if ps.size() != 3 { if ps.size() != 3 {
t.Errorf("expect size 3 but got '%d'\n", ps.size()) t.Fatalf("expect size 3 but got '%d'\n", ps.size())
} }
l := ps.list() l := ps.list()
if l == nil || len(l) != 3 { if l == nil || len(l) != 3 {
t.Error("expect a policy list with 3 items but got invalid list") t.Fatal("expect a policy list with 3 items but got invalid list")
} }
rp := ps.remove("fake_ID_Steven") rp := ps.remove("fake_ID_Steven")
if rp == nil { if rp == nil {
t.Error("expect none nil policy object but got nil") t.Fatal("expect none nil policy object but got nil")
} }
} }

View File

@ -20,33 +20,33 @@ func TestScheduler(t *testing.T) {
params["image"] = "testing:v1" params["image"] = "testing:v1"
id, runAt, err := scheduler.Schedule("fake_job", params, "5 * * * * *") id, runAt, err := scheduler.Schedule("fake_job", params, "5 * * * * *")
if err != nil { if err != nil {
t.Error(err) t.Fatal(err)
} }
if time.Now().Unix() >= runAt { if time.Now().Unix() >= runAt {
t.Error("the running at time of scheduled job should be after now, but seems not") t.Fatal("the running at time of scheduled job should be after now, but seems not")
} }
if err := scheduler.Load(); err != nil { if err := scheduler.Load(); err != nil {
t.Error(err) t.Fatal(err)
} }
if scheduler.pstore.size() != 1 { if scheduler.pstore.size() != 1 {
t.Errorf("expect 1 item in pstore but got '%d'\n", scheduler.pstore.size()) t.Fatalf("expect 1 item in pstore but got '%d'\n", scheduler.pstore.size())
} }
if err := scheduler.UnSchedule(id); err != nil { if err := scheduler.UnSchedule(id); err != nil {
t.Error(err) t.Fatal(err)
} }
if err := scheduler.Clear(); err != nil { if err := scheduler.Clear(); err != nil {
t.Error(err) t.Fatal(err)
} }
err = tests.Clear(utils.KeyPeriodicPolicy(tests.GiveMeTestNamespace()), redisPool.Get()) err = tests.Clear(utils.KeyPeriodicPolicy(tests.GiveMeTestNamespace()), redisPool.Get())
err = tests.Clear(utils.KeyPeriodicPolicyScore(tests.GiveMeTestNamespace()), redisPool.Get()) err = tests.Clear(utils.KeyPeriodicPolicyScore(tests.GiveMeTestNamespace()), redisPool.Get())
err = tests.Clear(utils.KeyPeriodicNotification(tests.GiveMeTestNamespace()), redisPool.Get()) err = tests.Clear(utils.KeyPeriodicNotification(tests.GiveMeTestNamespace()), redisPool.Get())
if err != nil { if err != nil {
t.Error(err) t.Fatal(err)
} }
} }
@ -58,16 +58,16 @@ func TestPubFunc(t *testing.T) {
CronSpec: "5 * * * * *", CronSpec: "5 * * * * *",
} }
if err := scheduler.AcceptPeriodicPolicy(p); err != nil { if err := scheduler.AcceptPeriodicPolicy(p); err != nil {
t.Error(err) t.Fatal(err)
} }
if scheduler.pstore.size() != 1 { if scheduler.pstore.size() != 1 {
t.Errorf("expect 1 item in pstore but got '%d' after accepting \n", scheduler.pstore.size()) t.Fatalf("expect 1 item in pstore but got '%d' after accepting \n", scheduler.pstore.size())
} }
if rmp := scheduler.RemovePeriodicPolicy("fake_ID"); rmp == nil { if rmp := scheduler.RemovePeriodicPolicy("fake_ID"); rmp == nil {
t.Error("expect none nil object returned after removing but got nil") t.Fatal("expect none nil object returned after removing but got nil")
} }
if scheduler.pstore.size() != 0 { if scheduler.pstore.size() != 0 {
t.Errorf("expect 0 item in pstore but got '%d' \n", scheduler.pstore.size()) t.Fatalf("expect 0 item in pstore but got '%d' \n", scheduler.pstore.size())
} }
} }

View File

@ -15,16 +15,16 @@ import (
func TestSweeper(t *testing.T) { func TestSweeper(t *testing.T) {
epoch := time.Now().Unix() - 1000 epoch := time.Now().Unix() - 1000
if err := createFakeScheduledJob(epoch); err != nil { if err := createFakeScheduledJob(epoch); err != nil {
t.Error(err) t.Fatal(err)
} }
ns := tests.GiveMeTestNamespace() ns := tests.GiveMeTestNamespace()
sweeper := NewSweeper(ns, redisPool, work.NewClient(ns, redisPool)) sweeper := NewSweeper(ns, redisPool, work.NewClient(ns, redisPool))
if err := sweeper.ClearOutdatedScheduledJobs(); err != nil { if err := sweeper.ClearOutdatedScheduledJobs(); err != nil {
t.Error(err) t.Fatal(err)
} }
err := tests.Clear(utils.RedisKeyScheduled(ns), redisPool.Get()) err := tests.Clear(utils.RedisKeyScheduled(ns), redisPool.Get())
if err != nil { if err != nil {
t.Error(err) t.Fatal(err)
} }
} }

View File

@ -0,0 +1,148 @@
// Copyright 2018 The Harbor Authors. All rights reserved.
package pool
import (
"context"
"encoding/json"
"errors"
"testing"
"time"
"github.com/vmware/harbor/src/jobservice_v2/opm"
"github.com/vmware/harbor/src/jobservice_v2/models"
"github.com/vmware/harbor/src/jobservice_v2/period"
"github.com/vmware/harbor/src/jobservice_v2/utils"
"github.com/vmware/harbor/src/jobservice_v2/tests"
)
var redisPool = tests.GiveMeRedisPool()
func TestPublishPolicy(t *testing.T) {
ms, cancel := createMessageServer()
err := ms.Subscribe(period.EventSchedulePeriodicPolicy, func(data interface{}) error {
if _, ok := data.(*period.PeriodicJobPolicy); !ok {
t.Fatal("expect PeriodicJobPolicy but got other thing")
return errors.New("expect PeriodicJobPolicy but got other thing")
}
return nil
})
if err != nil {
t.Fatal(err)
}
err = ms.Subscribe(period.EventUnSchedulePeriodicPolicy, func(data interface{}) error {
if _, ok := data.(*period.PeriodicJobPolicy); !ok {
t.Fatal("expect PeriodicJobPolicy but got other thing")
return errors.New("expect PeriodicJobPolicy but got other thing")
}
return nil
})
if err != nil {
t.Fatal(err)
}
go func() {
defer cancel()
//wait and then publish
<-time.After(200 * time.Millisecond)
p := &period.PeriodicJobPolicy{
PolicyID: "fake_ID",
JobName: "fake_job",
CronSpec: "5 * * * *",
}
notification := &models.Message{
Event: period.EventSchedulePeriodicPolicy,
Data: p,
}
rawJSON, err := json.Marshal(notification)
if err != nil {
t.Fatal(err)
}
conn := redisPool.Get()
defer conn.Close()
err = conn.Send("PUBLISH", utils.KeyPeriodicNotification(tests.GiveMeTestNamespace()), rawJSON)
if err != nil {
t.Fatal(err)
}
notification.Event = period.EventUnSchedulePeriodicPolicy
rawJSON, err = json.Marshal(notification)
if err != nil {
t.Fatal(err)
}
err = conn.Send("PUBLISH", utils.KeyPeriodicNotification(tests.GiveMeTestNamespace()), rawJSON)
if err != nil {
t.Fatal(err)
}
//send quit signal
<-time.After(200 * time.Millisecond)
err = tests.Clear(utils.KeyPeriodicNotification(tests.GiveMeTestNamespace()), conn)
if err != nil {
t.Fatal(err)
}
}()
ms.Start()
}
func TestPublishHook(t *testing.T) {
ms, cancel := createMessageServer()
err := ms.Subscribe(opm.EventRegisterStatusHook, func(data interface{}) error {
if _, ok := data.(*opm.HookData); !ok {
t.Fatal("expect HookData but got other thing")
return errors.New("expect HookData but got other thing")
}
return nil
})
if err != nil {
t.Fatal(err)
}
go func() {
defer cancel()
<-time.After(200 * time.Millisecond)
hook := &opm.HookData{
JobID: "fake_job_ID",
HookURL: "http://localhost:9999/hook",
}
notification := &models.Message{
Event: opm.EventRegisterStatusHook,
Data: hook,
}
rawJSON, err := json.Marshal(notification)
if err != nil {
t.Fatal(err)
}
conn := redisPool.Get()
defer conn.Close()
err = conn.Send("PUBLISH", utils.KeyPeriodicNotification(tests.GiveMeTestNamespace()), rawJSON)
if err != nil {
t.Fatal(err)
}
//send quit signal
<-time.After(200 * time.Millisecond)
err = tests.Clear(utils.KeyPeriodicNotification(tests.GiveMeTestNamespace()), conn)
if err != nil {
t.Fatal(err)
}
}()
ms.Start()
}
func createMessageServer() (*MessageServer, context.CancelFunc) {
ns := tests.GiveMeTestNamespace()
ctx, cancel := context.WithCancel(context.Background())
return NewMessageServer(ctx, ns, redisPool), cancel
}

View File

@ -3,6 +3,7 @@
package pool package pool
import ( import (
"fmt"
"time" "time"
"github.com/gocraft/work" "github.com/gocraft/work"
@ -72,6 +73,12 @@ func (rj *RedisJob) Run(j *work.Job) error {
} }
}() }()
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("Runtime error: %s", r)
}
}()
//Start to run //Start to run
rj.jobRunning(j.ID) rj.jobRunning(j.ID)
//Inject data //Inject data

View File

@ -323,6 +323,8 @@ func (gcwp *GoCraftWorkPool) Stats() (models.JobPoolStats, error) {
return models.JobPoolStats{}, err return models.JobPoolStats{}, err
} }
fmt.Printf("hbs=%+#v\n", hbs[0])
//Find the heartbeat of this pool via pid //Find the heartbeat of this pool via pid
stats := make([]*models.JobPoolStatsData, 0) stats := make([]*models.JobPoolStatsData, 0)
for _, hb := range hbs { for _, hb := range hbs {

View File

@ -0,0 +1,413 @@
// Copyright 2018 The Harbor Authors. All rights reserved.
package pool
import (
"context"
"errors"
"reflect"
"sync"
"testing"
"time"
"github.com/vmware/harbor/src/jobservice_v2/errs"
"github.com/vmware/harbor/src/jobservice_v2/job"
"github.com/vmware/harbor/src/jobservice_v2/logger"
"github.com/vmware/harbor/src/jobservice_v2/opm"
"github.com/vmware/harbor/src/jobservice_v2/tests"
"github.com/vmware/harbor/src/jobservice_v2/env"
)
var rPool = tests.GiveMeRedisPool()
func TestRegisterJob(t *testing.T) {
wp, _, _ := createRedisWorkerPool()
defer func() {
if err := tests.ClearAll(tests.GiveMeTestNamespace(), redisPool.Get()); err != nil {
t.Error(err)
}
}()
if err := wp.RegisterJob("fake_job", (*fakeJob)(nil)); err != nil {
t.Error(err)
}
jobs := make(map[string]interface{})
jobs["fake_job_1st"] = (*fakeJob)(nil)
jobs["fake_job_2nd"] = (*fakeJob)(nil)
if err := wp.RegisterJobs(jobs); err != nil {
t.Error(err)
}
if _, ok := wp.IsKnownJob("fake_job"); !ok {
t.Error("expect known job but seems failed to register job 'fake_job'")
}
params := make(map[string]interface{})
params["name"] = "testing:v1"
if err := wp.ValidateJobParameters((*fakeJob)(nil), params); err != nil {
t.Error(err)
}
}
func TestEnqueueJob(t *testing.T) {
wp, sysCtx, cancel := createRedisWorkerPool()
defer func() {
if err := tests.ClearAll(tests.GiveMeTestNamespace(), redisPool.Get()); err != nil {
t.Error(err)
}
}()
defer cancel()
if err := wp.RegisterJob("fake_job", (*fakeJob)(nil)); err != nil {
t.Error(err)
}
if err := wp.RegisterJob("fake_unique_job", (*fakeUniqueJob)(nil)); err != nil {
t.Error(err)
}
go wp.Start()
time.Sleep(1 * time.Second)
params := make(map[string]interface{})
params["name"] = "testing:v1"
stats, err := wp.Enqueue("fake_job", params, false)
if err != nil {
t.Error(err)
}
if stats.Stats.JobID == "" {
t.Error("expect none nil job stats but got nil")
}
runAt := time.Now().Unix() + 20
stats, err = wp.Schedule("fake_job", params, 20, false)
if err != nil {
t.Error(err)
}
if stats.Stats.RunAt > 0 && stats.Stats.RunAt < runAt {
t.Errorf("expect returned 'RunAt' should be >= '%d' but seems not", runAt)
}
stats, err = wp.Enqueue("fake_unique_job", params, true)
if err != nil {
t.Error(err)
}
if stats.Stats.JobID == "" {
t.Error("expect none nil job stats but got nil")
}
cancel()
sysCtx.WG.Wait()
}
func TestEnqueuePeriodicJob(t *testing.T) {
wp, _, cancel := createRedisWorkerPool()
defer func() {
if err := tests.ClearAll(tests.GiveMeTestNamespace(), redisPool.Get()); err != nil {
t.Error(err)
}
}()
defer cancel()
if err := wp.RegisterJob("fake_job", (*fakeJob)(nil)); err != nil {
t.Error(err)
}
go wp.Start()
time.Sleep(1 * time.Second)
params := make(map[string]interface{})
params["name"] = "testing:v1"
jobStats, err := wp.PeriodicallyEnqueue("fake_job", params, "10 * * * * *")
if err != nil {
t.Error(err)
}
<-time.After(1 * time.Second)
jStats, err := wp.GetJobStats(jobStats.Stats.JobID)
if err != nil {
t.Error(err)
}
if jobStats.Stats.JobName != jStats.Stats.JobName {
t.Error("expect same job stats but got different ones")
}
if err := wp.StopJob(jStats.Stats.JobID); err != nil {
t.Error(err)
}
//cancel()
//<-time.After(1 * time.Second)
}
/*func TestCancelAndRetryJobWithHook(t *testing.T) {
wp, _, cancel := createRedisWorkerPool()
defer func() {
if err := tests.ClearAll(tests.GiveMeTestNamespace(), redisPool.Get()); err != nil {
t.Fatal(err)
}
}()
defer cancel()
if err := wp.RegisterJob("fake_runnable_job", (*fakeRunnableJob)(nil)); err != nil {
t.Fatal(err)
}
go wp.Start()
time.Sleep(1 * time.Second)
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, "ok")
}))
defer ts.Close()
params := make(map[string]interface{})
params["name"] = "testing:v1"
res, err := wp.Enqueue("fake_runnable_job", params, false)
if err != nil {
t.Fatal(err)
}
if err := wp.RegisterHook(res.Stats.JobID, ts.URL); err != nil {
t.Fatal(err)
}
//make sure it's running
timer := time.NewTimer(1 * time.Second)
defer timer.Stop()
CHECK:
<-timer.C
if check, err := wp.GetJobStats(res.Stats.JobID); err != nil {
t.Fatal(err)
} else {
if check.Stats.Status != job.JobStatusRunning {
timer.Reset(1 * time.Second)
goto CHECK
}
}
//cancel
if err := wp.CancelJob(res.Stats.JobID); err != nil {
t.Fatal(err)
}
<-time.After(5 * time.Second)
updatedRes, err := wp.GetJobStats(res.Stats.JobID)
if err != nil {
t.Fatal(err)
}
if updatedRes.Stats.Status != job.JobStatusCancelled {
t.Fatalf("expect job staus '%s' but got '%s'\n", job.JobStatusCancelled, updatedRes.Stats.Status)
}
if updatedRes.Stats.DieAt == 0 {
t.Fatalf("expect none zero 'DieAt' but got 0 value")
}
//retry
if err := wp.RetryJob(updatedRes.Stats.JobID); err != nil {
t.Fatal(err)
}
}*/
func createRedisWorkerPool() (*GoCraftWorkPool, *env.Context, context.CancelFunc) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
envCtx := &env.Context{
SystemContext: ctx,
WG: new(sync.WaitGroup),
ErrorChan: make(chan error, 1),
JobContext: newContext(ctx),
}
return NewGoCraftWorkPool(envCtx, tests.GiveMeTestNamespace(), 3, rPool), envCtx, cancel
}
type fakeJob struct{}
func (j *fakeJob) MaxFails() uint {
return 3
}
func (j *fakeJob) ShouldRetry() bool {
return true
}
func (j *fakeJob) Validate(params map[string]interface{}) error {
if p, ok := params["name"]; ok {
if p == "testing:v1" {
return nil
}
}
return errors.New("testing error")
}
func (j *fakeJob) Run(ctx env.JobContext, params map[string]interface{}) error {
return nil
}
type fakeUniqueJob struct{}
func (j *fakeUniqueJob) MaxFails() uint {
return 3
}
func (j *fakeUniqueJob) ShouldRetry() bool {
return true
}
func (j *fakeUniqueJob) Validate(params map[string]interface{}) error {
if p, ok := params["name"]; ok {
if p == "testing:v1" {
return nil
}
}
return errors.New("testing error")
}
func (j *fakeUniqueJob) Run(ctx env.JobContext, params map[string]interface{}) error {
return nil
}
type fakeRunnableJob struct{}
func (j *fakeRunnableJob) MaxFails() uint {
return 2
}
func (j *fakeRunnableJob) ShouldRetry() bool {
return true
}
func (j *fakeRunnableJob) Validate(params map[string]interface{}) error {
if p, ok := params["name"]; ok {
if p == "testing:v1" {
return nil
}
}
return errors.New("testing error")
}
func (j *fakeRunnableJob) Run(ctx env.JobContext, params map[string]interface{}) error {
tk := time.NewTicker(1 * time.Second)
defer tk.Stop()
for {
select {
case <-tk.C:
cmd, ok := ctx.OPCommand()
if ok {
if cmd == opm.CtlCommandStop {
return errs.JobStoppedError()
}
return errs.JobCancelledError()
}
case <-ctx.SystemContext().Done():
return nil
case <-time.After(1 * time.Minute):
return errors.New("fake job timeout")
}
}
}
type fakeContext struct {
//System context
sysContext context.Context
//op command func
opCommandFunc job.CheckOPCmdFunc
//checkin func
checkInFunc job.CheckInFunc
//other required information
properties map[string]interface{}
}
func newContext(sysCtx context.Context) *fakeContext {
return &fakeContext{
sysContext: sysCtx,
properties: make(map[string]interface{}),
}
}
//Build implements the same method in env.JobContext interface
//This func will build the job execution context before running
func (c *fakeContext) Build(dep env.JobData) (env.JobContext, error) {
jContext := &fakeContext{
sysContext: c.sysContext,
properties: make(map[string]interface{}),
}
//Copy properties
if len(c.properties) > 0 {
for k, v := range c.properties {
jContext.properties[k] = v
}
}
if opCommandFunc, ok := dep.ExtraData["opCommandFunc"]; ok {
if reflect.TypeOf(opCommandFunc).Kind() == reflect.Func {
if funcRef, ok := opCommandFunc.(job.CheckOPCmdFunc); ok {
jContext.opCommandFunc = funcRef
}
}
}
if jContext.opCommandFunc == nil {
return nil, errors.New("failed to inject opCommandFunc")
}
if checkInFunc, ok := dep.ExtraData["checkInFunc"]; ok {
if reflect.TypeOf(checkInFunc).Kind() == reflect.Func {
if funcRef, ok := checkInFunc.(job.CheckInFunc); ok {
jContext.checkInFunc = funcRef
}
}
}
if jContext.checkInFunc == nil {
return nil, errors.New("failed to inject checkInFunc")
}
return jContext, nil
}
//Get implements the same method in env.JobContext interface
func (c *fakeContext) Get(prop string) (interface{}, bool) {
v, ok := c.properties[prop]
return v, ok
}
//SystemContext implements the same method in env.JobContext interface
func (c *fakeContext) SystemContext() context.Context {
return c.sysContext
}
//Checkin is bridge func for reporting detailed status
func (c *fakeContext) Checkin(status string) error {
if c.checkInFunc != nil {
c.checkInFunc(status)
} else {
return errors.New("nil check in function")
}
return nil
}
//OPCommand return the control operational command like stop/cancel if have
func (c *fakeContext) OPCommand() (string, bool) {
if c.opCommandFunc != nil {
return c.opCommandFunc()
}
return "", false
}
//GetLogger returns the logger
func (c *fakeContext) GetLogger() logger.Interface {
return nil
}

View File

@ -25,8 +25,8 @@ const (
func GiveMeRedisPool() *redis.Pool { func GiveMeRedisPool() *redis.Pool {
redisHost := getRedisHost() redisHost := getRedisHost()
redisPool := &redis.Pool{ redisPool := &redis.Pool{
MaxActive: 2, MaxActive: 6,
MaxIdle: 2, MaxIdle: 6,
Wait: true, Wait: true,
Dial: func() (redis.Conn, error) { Dial: func() (redis.Conn, error) {
return redis.Dial( return redis.Dial(
@ -58,6 +58,28 @@ func Clear(key string, conn redis.Conn) error {
return errors.New("failed to clear") return errors.New("failed to clear")
} }
//ClearAll ...
func ClearAll(namespace string, conn redis.Conn) error {
defer conn.Close()
keys, err := redis.Strings(conn.Do("KEYS", fmt.Sprintf("%s:*", namespace)))
if err != nil {
return err
}
if len(keys) == 0 {
return nil
}
for _, key := range keys {
if err := conn.Send("DEL", key); err != nil {
return err
}
}
return conn.Flush()
}
func getRedisHost() string { func getRedisHost() string {
redisHost := os.Getenv(testingRedisHost) redisHost := os.Getenv(testingRedisHost)
if redisHost == "" { if redisHost == "" {