From 9889d4badd798327d2498af668368a74670b0adb Mon Sep 17 00:00:00 2001 From: Steven Zou Date: Thu, 29 Mar 2018 19:50:44 +0800 Subject: [PATCH] Add UT cases for package pool --- src/jobservice_v2/api/handler_test.go | 42 +- src/jobservice_v2/config/config_test.go | 44 +- src/jobservice_v2/core/controller_test.go | 50 +-- src/jobservice_v2/logger/sweeper_test.go | 8 +- src/jobservice_v2/opm/hook_client_test.go | 4 +- .../opm/redis_job_stats_mgr_test.go | 40 +- src/jobservice_v2/period/job_policy_test.go | 10 +- .../period/redis_scheduler_test.go | 22 +- src/jobservice_v2/period/sweeper_test.go | 6 +- src/jobservice_v2/pool/message_server_test.go | 148 +++++++ src/jobservice_v2/pool/redis_job_wrapper.go | 7 + src/jobservice_v2/pool/redis_pool.go | 2 + src/jobservice_v2/pool/redis_pool_test.go | 413 ++++++++++++++++++ src/jobservice_v2/tests/utils.go | 26 +- 14 files changed, 707 insertions(+), 115 deletions(-) create mode 100644 src/jobservice_v2/pool/message_server_test.go create mode 100644 src/jobservice_v2/pool/redis_pool_test.go diff --git a/src/jobservice_v2/api/handler_test.go b/src/jobservice_v2/api/handler_test.go index e3b225a5d..354a22e60 100644 --- a/src/jobservice_v2/api/handler_test.go +++ b/src/jobservice_v2/api/handler_test.go @@ -49,14 +49,14 @@ func TestLaunchJobSucceed(t *testing.T) { res, err := postReq(fmt.Sprintf("http://localhost:%d/api/v1/jobs", port), createJobReq(true)) if err != nil { - t.Error(err) + t.Fatal(err) } obj, err := getResult(res) if err != nil { - t.Error(err) + t.Fatal(err) } if obj.Stats.JobID != "fake_ID_ok" { - t.Errorf("expect job ID 'fake_ID_ok' but got '%s'\n", obj.Stats.JobID) + t.Fatalf("expect job ID 'fake_ID_ok' but got '%s'\n", obj.Stats.JobID) } server.Stop() @@ -70,7 +70,7 @@ func TestGetJobFailed(t *testing.T) { res, err := getReq(fmt.Sprintf("http://localhost:%d/api/v1/jobs/fake_job", port)) if e := expectFormatedError(res, err); e != nil { - t.Error(e) + t.Fatal(e) } server.Stop() @@ -84,14 +84,14 @@ func TestGetJobSucceed(t *testing.T) { res, err := getReq(fmt.Sprintf("http://localhost:%d/api/v1/jobs/fake_job_ok", port)) if err != nil { - t.Error(err) + t.Fatal(err) } obj, err := getResult(res) if err != nil { - t.Error(err) + t.Fatal(err) } if obj.Stats.JobName != "testing" || obj.Stats.JobID != "fake_ID_ok" { - t.Errorf("expect job ID 'fake_ID_ok' of 'testing', but got '%s'\n", obj.Stats.JobID) + t.Fatalf("expect job ID 'fake_ID_ok' of 'testing', but got '%s'\n", obj.Stats.JobID) } server.Stop() @@ -105,21 +105,21 @@ func TestJobActionFailed(t *testing.T) { actionReq, err := createJobActionReq("stop") if err != nil { - t.Error(err) + t.Fatal(err) } resData, err := postReq(fmt.Sprintf("http://localhost:%d/api/v1/jobs/fake_job", port), actionReq) expectFormatedError(resData, err) actionReq, err = createJobActionReq("cancel") if err != nil { - t.Error(err) + t.Fatal(err) } resData, err = postReq(fmt.Sprintf("http://localhost:%d/api/v1/jobs/fake_job", port), actionReq) expectFormatedError(resData, err) actionReq, err = createJobActionReq("retry") if err != nil { - t.Error(err) + t.Fatal(err) } resData, err = postReq(fmt.Sprintf("http://localhost:%d/api/v1/jobs/fake_job", port), actionReq) expectFormatedError(resData, err) @@ -135,29 +135,29 @@ func TestJobActionSucceed(t *testing.T) { actionReq, err := createJobActionReq("stop") if err != nil { - t.Error(err) + t.Fatal(err) } _, err = postReq(fmt.Sprintf("http://localhost:%d/api/v1/jobs/fake_job_ok", port), actionReq) if err != nil { - t.Error(err) + t.Fatal(err) } actionReq, err = createJobActionReq("cancel") if err != nil { - t.Error(err) + t.Fatal(err) } _, err = postReq(fmt.Sprintf("http://localhost:%d/api/v1/jobs/fake_job_ok", port), actionReq) if err != nil { - t.Error(err) + t.Fatal(err) } actionReq, err = createJobActionReq("retry") if err != nil { - t.Error(err) + t.Fatal(err) } _, err = postReq(fmt.Sprintf("http://localhost:%d/api/v1/jobs/fake_job_ok", port), actionReq) if err != nil { - t.Error(err) + t.Fatal(err) } server.Stop() @@ -171,7 +171,7 @@ func TestCheckStatus(t *testing.T) { resData, err := getReq(fmt.Sprintf("http://localhost:%d/api/v1/stats", port)) if err != nil { - t.Error(err) + t.Fatal(err) } poolStats := &models.JobPoolStats{ @@ -179,11 +179,11 @@ func TestCheckStatus(t *testing.T) { } err = json.Unmarshal(resData, poolStats) if err != nil { - t.Error(err) + t.Fatal(err) } if poolStats.Pools[0].WorkerPoolID != "fake_pool_ID" { - t.Errorf("expect pool ID 'fake_pool_ID' but got '%s'", poolStats.Pools[0].WorkerPoolID) + t.Fatalf("expect pool ID 'fake_pool_ID' but got '%s'", poolStats.Pools[0].WorkerPoolID) } server.Stop() @@ -197,11 +197,11 @@ func TestGetJobLog(t *testing.T) { resData, err := getReq(fmt.Sprintf("http://localhost:%d/api/v1/jobs/fake_job_ok/log", port)) if err != nil { - t.Error(err) + t.Fatal(err) } if len(resData) == 0 { - t.Error("expect job log but got nothing") + t.Fatal("expect job log but got nothing") } server.Stop() diff --git a/src/jobservice_v2/config/config_test.go b/src/jobservice_v2/config/config_test.go index aa9b433e0..abc7c5df6 100644 --- a/src/jobservice_v2/config/config_test.go +++ b/src/jobservice_v2/config/config_test.go @@ -9,22 +9,22 @@ import ( func TestConfigLoadingFailed(t *testing.T) { cfg := &Configuration{} if err := cfg.Load("./config.not-existing.yaml", false); err == nil { - t.Errorf("Load config from none-existing document, expect none nil error but got '%s'\n", err) + t.Fatalf("Load config from none-existing document, expect none nil error but got '%s'\n", err) } } func TestConfigLoadingSucceed(t *testing.T) { if err := CreateLogDir(); err != nil { - t.Error(err) + t.Fatal(err) } cfg := &Configuration{} if err := cfg.Load("../config_test.yml", false); err != nil { - t.Errorf("Load config from yaml file, expect nil error but got error '%s'\n", err) + t.Fatalf("Load config from yaml file, expect nil error but got error '%s'\n", err) } if err := RemoveLogDir(); err != nil { - t.Error(err) + t.Fatal(err) } } @@ -36,70 +36,70 @@ func TestConfigLoadingWithEnv(t *testing.T) { cfg := &Configuration{} if err := cfg.Load("../config_test.yml", true); err != nil { - t.Errorf("Load config from yaml file, expect nil error but got error '%s'\n", err) + t.Fatalf("Load config from yaml file, expect nil error but got error '%s'\n", err) } if cfg.Protocol != "https" { - t.Errorf("expect protocol 'https', but got '%s'\n", cfg.Protocol) + t.Fatalf("expect protocol 'https', but got '%s'\n", cfg.Protocol) } if cfg.Port != 8989 { - t.Errorf("expect port 8989 but got '%d'\n", cfg.Port) + t.Fatalf("expect port 8989 but got '%d'\n", cfg.Port) } if cfg.PoolConfig.WorkerCount != 8 { - t.Errorf("expect workcount 8 but go '%d'\n", cfg.PoolConfig.WorkerCount) + t.Fatalf("expect workcount 8 but go '%d'\n", cfg.PoolConfig.WorkerCount) } if cfg.PoolConfig.RedisPoolCfg.Host != "localhost" { - t.Errorf("expect redis host 'localhost' but got '%s'\n", cfg.PoolConfig.RedisPoolCfg.Host) + t.Fatalf("expect redis host 'localhost' but got '%s'\n", cfg.PoolConfig.RedisPoolCfg.Host) } if cfg.PoolConfig.RedisPoolCfg.Port != 7379 { - t.Errorf("expect redis port '7379' but got '%d'\n", cfg.PoolConfig.RedisPoolCfg.Port) + t.Fatalf("expect redis port '7379' but got '%d'\n", cfg.PoolConfig.RedisPoolCfg.Port) } if cfg.PoolConfig.RedisPoolCfg.Namespace != "ut_namespace" { - t.Errorf("expect redis namespace 'ut_namespace' but got '%s'\n", cfg.PoolConfig.RedisPoolCfg.Namespace) + t.Fatalf("expect redis namespace 'ut_namespace' but got '%s'\n", cfg.PoolConfig.RedisPoolCfg.Namespace) } if cfg.LoggerConfig.BasePath != "/tmp" { - t.Errorf("expect log base path '/tmp' but got '%s'\n", cfg.LoggerConfig.BasePath) + t.Fatalf("expect log base path '/tmp' but got '%s'\n", cfg.LoggerConfig.BasePath) } if cfg.LoggerConfig.LogLevel != "DEBUG" { - t.Errorf("expect log level 'DEBUG' but got '%s'\n", cfg.LoggerConfig.LogLevel) + t.Fatalf("expect log level 'DEBUG' but got '%s'\n", cfg.LoggerConfig.LogLevel) } if cfg.LoggerConfig.ArchivePeriod != 5 { - t.Errorf("expect log archive period 5 but got '%d'\n", cfg.LoggerConfig.ArchivePeriod) + t.Fatalf("expect log archive period 5 but got '%d'\n", cfg.LoggerConfig.ArchivePeriod) } unsetENV() if err := RemoveLogDir(); err != nil { - t.Error(err) + t.Fatal(err) } } func TestDefaultConfig(t *testing.T) { if err := CreateLogDir(); err != nil { - t.Error(err) + t.Fatal(err) } if err := DefaultConfig.Load("../config_test.yml", true); err != nil { - t.Errorf("Load config from yaml file, expect nil error but got error '%s'\n", err) + t.Fatalf("Load config from yaml file, expect nil error but got error '%s'\n", err) } if endpoint := GetAdminServerEndpoint(); endpoint != "http://127.0.0.1:8888" { - t.Errorf("expect default admin server endpoint 'http://127.0.0.1:8888' but got '%s'\n", endpoint) + t.Fatalf("expect default admin server endpoint 'http://127.0.0.1:8888' but got '%s'\n", endpoint) } if basePath := GetLogBasePath(); basePath != "/tmp/job_logs" { - t.Errorf("expect default logger base path '/tmp/job_logs' but got '%s'\n", basePath) + t.Fatalf("expect default logger base path '/tmp/job_logs' but got '%s'\n", basePath) } if lvl := GetLogLevel(); lvl != "INFO" { - t.Errorf("expect default logger level 'INFO' but got '%s'\n", lvl) + t.Fatalf("expect default logger level 'INFO' but got '%s'\n", lvl) } if period := GetLogArchivePeriod(); period != 1 { - t.Errorf("expect default log archive period 1 but got '%d'\n", period) + t.Fatalf("expect default log archive period 1 but got '%d'\n", period) } if err := RemoveLogDir(); err != nil { - t.Error(err) + t.Fatal(err) } } diff --git a/src/jobservice_v2/core/controller_test.go b/src/jobservice_v2/core/controller_test.go index fcdde75cc..c54f7e36e 100644 --- a/src/jobservice_v2/core/controller_test.go +++ b/src/jobservice_v2/core/controller_test.go @@ -17,11 +17,11 @@ func TestLaunchGenericJob(t *testing.T) { req := createJobReq("Generic", false, false) res, err := c.LaunchJob(req) if err != nil { - t.Error(err) + t.Fatal(err) } if res.Stats.JobID != "fake_ID" { - t.Errorf("expect enqueued job ID 'fake_ID' but got '%s'\n", res.Stats.JobID) + t.Fatalf("expect enqueued job ID 'fake_ID' but got '%s'\n", res.Stats.JobID) } } @@ -31,11 +31,11 @@ func TestLaunchGenericJobUnique(t *testing.T) { req := createJobReq("Generic", true, false) res, err := c.LaunchJob(req) if err != nil { - t.Error(err) + t.Fatal(err) } if res.Stats.JobID != "fake_ID" { - t.Errorf("expect enqueued job ID 'fake_ID' but got '%s'\n", res.Stats.JobID) + t.Fatalf("expect enqueued job ID 'fake_ID' but got '%s'\n", res.Stats.JobID) } } @@ -45,11 +45,11 @@ func TestLaunchGenericJobWithHook(t *testing.T) { req := createJobReq("Generic", false, true) res, err := c.LaunchJob(req) if err != nil { - t.Error(err) + t.Fatal(err) } if res.Stats.JobID != "fake_ID" { - t.Errorf("expect enqueued job ID 'fake_ID' but got '%s'\n", res.Stats.JobID) + t.Fatalf("expect enqueued job ID 'fake_ID' but got '%s'\n", res.Stats.JobID) } } @@ -59,11 +59,11 @@ func TestLaunchScheduledJob(t *testing.T) { req := createJobReq("Scheduled", false, true) res, err := c.LaunchJob(req) if err != nil { - t.Error(err) + t.Fatal(err) } if res.Stats.JobID != "fake_ID_Scheduled" { - t.Errorf("expect enqueued job ID 'fake_ID_Scheduled' but got '%s'\n", res.Stats.JobID) + t.Fatalf("expect enqueued job ID 'fake_ID_Scheduled' but got '%s'\n", res.Stats.JobID) } } @@ -73,11 +73,11 @@ func TestLaunchScheduledUniqueJob(t *testing.T) { req := createJobReq("Scheduled", true, false) res, err := c.LaunchJob(req) if err != nil { - t.Error(err) + t.Fatal(err) } if res.Stats.JobID != "fake_ID_Scheduled" { - t.Errorf("expect enqueued job ID 'fake_ID_Scheduled' but got '%s'\n", res.Stats.JobID) + t.Fatalf("expect enqueued job ID 'fake_ID_Scheduled' but got '%s'\n", res.Stats.JobID) } } @@ -87,11 +87,11 @@ func TestLaunchPeriodicJob(t *testing.T) { req := createJobReq("Periodic", true, false) res, err := c.LaunchJob(req) if err != nil { - t.Error(err) + t.Fatal(err) } if res.Stats.JobID != "fake_ID_Periodic" { - t.Errorf("expect enqueued job ID 'fake_ID_Periodic' but got '%s'\n", res.Stats.JobID) + t.Fatalf("expect enqueued job ID 'fake_ID_Periodic' but got '%s'\n", res.Stats.JobID) } } @@ -100,11 +100,11 @@ func TestGetJobStats(t *testing.T) { c := NewController(pool) stats, err := c.GetJob("fake_ID") if err != nil { - t.Error(err) + t.Fatal(err) } if stats.Stats.Status != "running" { - t.Errorf("expect stauts 'running' but got '%s'\n", stats.Stats.Status) + t.Fatalf("expect stauts 'running' but got '%s'\n", stats.Stats.Status) } } @@ -113,15 +113,15 @@ func TestJobActions(t *testing.T) { c := NewController(pool) if err := c.StopJob("fake_ID"); err != nil { - t.Error(err) + t.Fatal(err) } if err := c.CancelJob("fake_ID"); err != nil { - t.Error(err) + t.Fatal(err) } if err := c.RetryJob("fake_ID"); err != nil { - t.Error(err) + t.Fatal(err) } } @@ -134,7 +134,7 @@ func TestGetJobLogData(t *testing.T) { t.Errorf("expect object not found error but got '%s'\n", err) } } else { - t.Error("expect error but got nil") + t.Fatal("expect error but got nil") } } @@ -144,15 +144,15 @@ func TestCheckStatus(t *testing.T) { st, err := c.CheckStatus() if err != nil { - t.Error(err) + t.Fatal(err) } if len(st.Pools) == 0 { - t.Error("expect status data but got zero list") + t.Fatal("expect status data but got zero list") } if st.Pools[0].Status != "running" { - t.Errorf("expect status 'running' but got '%s'\n", st.Pools[0].Status) + t.Fatalf("expect status 'running' but got '%s'\n", st.Pools[0].Status) } } @@ -170,23 +170,23 @@ func TestInvalidCheck(t *testing.T) { } if _, err := c.LaunchJob(req); err == nil { - t.Error("error expected but got nil") + t.Fatal("error expected but got nil") } req.Job.Name = "fake" if _, err := c.LaunchJob(req); err == nil { - t.Error("error expected but got nil") + t.Fatal("error expected but got nil") } req.Job.Metadata.JobKind = "Scheduled" if _, err := c.LaunchJob(req); err == nil { - t.Error("error expected but got nil") + t.Fatal("error expected but got nil") } req.Job.Metadata.JobKind = "Periodic" req.Job.Metadata.Cron = "x x x x x x" if _, err := c.LaunchJob(req); err == nil { - t.Error("error expected but got nil") + t.Fatal("error expected but got nil") } } diff --git a/src/jobservice_v2/logger/sweeper_test.go b/src/jobservice_v2/logger/sweeper_test.go index 424ba4e27..c241d250c 100644 --- a/src/jobservice_v2/logger/sweeper_test.go +++ b/src/jobservice_v2/logger/sweeper_test.go @@ -13,11 +13,11 @@ func TestSweeper(t *testing.T) { workDir := "/tmp/sweeper_logs" if err := os.MkdirAll(workDir, 0755); err != nil { - t.Error(err) + t.Fatal(err) } _, err := os.Create(fmt.Sprintf("%s/sweeper_test.log", workDir)) if err != nil { - t.Error(err) + t.Fatal(err) } ctx, cancel := context.WithCancel(context.Background()) @@ -28,9 +28,9 @@ func TestSweeper(t *testing.T) { <-time.After(100 * time.Millisecond) if err := os.Remove(fmt.Sprintf("%s/sweeper_test.log", workDir)); err != nil { - t.Error(err) + t.Fatal(err) } if err := os.Remove(workDir); err != nil { - t.Error(err) + t.Fatal(err) } } diff --git a/src/jobservice_v2/opm/hook_client_test.go b/src/jobservice_v2/opm/hook_client_test.go index 0e8091a5c..dd5958556 100644 --- a/src/jobservice_v2/opm/hook_client_test.go +++ b/src/jobservice_v2/opm/hook_client_test.go @@ -21,7 +21,7 @@ func TestHookClient(t *testing.T) { Status: "running", }) if err != nil { - t.Error(err) + t.Fatal(err) } } @@ -37,6 +37,6 @@ func TestReportStatusFailed(t *testing.T) { Status: "running", }) if err == nil { - t.Error("expect error but got nil") + t.Fatal("expect error but got nil") } } diff --git a/src/jobservice_v2/opm/redis_job_stats_mgr_test.go b/src/jobservice_v2/opm/redis_job_stats_mgr_test.go index 6fd1eb8ee..d62833d72 100644 --- a/src/jobservice_v2/opm/redis_job_stats_mgr_test.go +++ b/src/jobservice_v2/opm/redis_job_stats_mgr_test.go @@ -57,16 +57,16 @@ func TestSetJobStatus(t *testing.T) { <-time.After(100 * time.Millisecond) stats, err := mgr.Retrieve("fake_job_ID") if err != nil { - t.Error(err) + t.Fatal(err) } if stats.Stats.Status != "running" { - t.Errorf("expect job status 'running' but got '%s'\n", stats.Stats.Status) + t.Fatalf("expect job status 'running' but got '%s'\n", stats.Stats.Status) } key := utils.KeyJobStats(testingNamespace, "fake_job_ID") if err := clear(key, redisPool.Get()); err != nil { - t.Error(err) + t.Fatal(err) } } @@ -77,20 +77,20 @@ func TestCommand(t *testing.T) { <-time.After(200 * time.Millisecond) if err := mgr.SendCommand("fake_job_ID", CtlCommandStop); err != nil { - t.Error(err) + t.Fatal(err) } if cmd, err := mgr.CtlCommand("fake_job_ID"); err != nil { - t.Error(err) + t.Fatal(err) } else { if cmd != CtlCommandStop { - t.Errorf("expect '%s' but got '%s'", CtlCommandStop, cmd) + t.Fatalf("expect '%s' but got '%s'", CtlCommandStop, cmd) } } key := utils.KeyJobCtlCommands(testingNamespace, "fake_job_ID") if err := clear(key, redisPool.Get()); err != nil { - t.Error(err) + t.Fatal(err) } } @@ -105,7 +105,7 @@ func TestDieAt(t *testing.T) { dieAt := time.Now().Unix() if err := createDeadJob(redisPool.Get(), dieAt); err != nil { - t.Error(err) + t.Fatal(err) } <-time.After(200 * time.Millisecond) mgr.DieAt("fake_job_ID", dieAt) @@ -113,20 +113,20 @@ func TestDieAt(t *testing.T) { stats, err := mgr.Retrieve("fake_job_ID") if err != nil { - t.Error(err) + t.Fatal(err) } if stats.Stats.DieAt != dieAt { - t.Errorf("expect die at '%d' but got '%d'\n", dieAt, stats.Stats.DieAt) + t.Fatalf("expect die at '%d' but got '%d'\n", dieAt, stats.Stats.DieAt) } key := utils.KeyJobStats(testingNamespace, "fake_job_ID") if err := clear(key, redisPool.Get()); err != nil { - t.Error(err) + t.Fatal(err) } key2 := utils.RedisKeyDead(testingNamespace) if err := clear(key2, redisPool.Get()); err != nil { - t.Error(err) + t.Fatal(err) } } @@ -137,12 +137,12 @@ func TestRegisterHook(t *testing.T) { <-time.After(200 * time.Millisecond) if err := mgr.RegisterHook("fake_job_ID", "http://localhost:9999", false); err != nil { - t.Error(err) + t.Fatal(err) } key := utils.KeyJobStats(testingNamespace, "fake_job_ID") if err := clear(key, redisPool.Get()); err != nil { - t.Error(err) + t.Fatal(err) } } @@ -158,12 +158,12 @@ func TestExpireJobStats(t *testing.T) { <-time.After(200 * time.Millisecond) if err := mgr.ExpirePeriodicJobStats("fake_job_ID"); err != nil { - t.Error(err) + t.Fatal(err) } key := utils.KeyJobStats(testingNamespace, "fake_job_ID") if err := clear(key, redisPool.Get()); err != nil { - t.Error(err) + t.Fatal(err) } } @@ -185,7 +185,7 @@ func TestCheckIn(t *testing.T) { defer ts.Close() if err := mgr.RegisterHook("fake_job_ID", ts.URL, false); err != nil { - t.Error(err) + t.Fatal(err) } mgr.CheckIn("fake_job_ID", "checkin") @@ -193,16 +193,16 @@ func TestCheckIn(t *testing.T) { stats, err := mgr.Retrieve("fake_job_ID") if err != nil { - t.Error(err) + t.Fatal(err) } if stats.Stats.CheckIn != "checkin" { - t.Errorf("expect check in info 'checkin' but got '%s'\n", stats.Stats.CheckIn) + t.Fatalf("expect check in info 'checkin' but got '%s'\n", stats.Stats.CheckIn) } key := utils.KeyJobStats(testingNamespace, "fake_job_ID") if err := clear(key, redisPool.Get()); err != nil { - t.Error(err) + t.Fatal(err) } } diff --git a/src/jobservice_v2/period/job_policy_test.go b/src/jobservice_v2/period/job_policy_test.go index 23888e7df..c30f431fe 100644 --- a/src/jobservice_v2/period/job_policy_test.go +++ b/src/jobservice_v2/period/job_policy_test.go @@ -14,11 +14,11 @@ func TestPeriodicJobPolicy(t *testing.T) { data, err := p.Serialize() if err != nil { - t.Error(err) + t.Fatal(err) } if err := p.DeSerialize(data); err != nil { - t.Error(err) + t.Fatal(err) } } @@ -37,17 +37,17 @@ func TestPeriodicJobPolicyStore(t *testing.T) { pl = append(pl, createPolicy("")) ps.addAll(pl) if ps.size() != 3 { - t.Errorf("expect size 3 but got '%d'\n", ps.size()) + t.Fatalf("expect size 3 but got '%d'\n", ps.size()) } l := ps.list() if l == nil || len(l) != 3 { - t.Error("expect a policy list with 3 items but got invalid list") + t.Fatal("expect a policy list with 3 items but got invalid list") } rp := ps.remove("fake_ID_Steven") if rp == nil { - t.Error("expect none nil policy object but got nil") + t.Fatal("expect none nil policy object but got nil") } } diff --git a/src/jobservice_v2/period/redis_scheduler_test.go b/src/jobservice_v2/period/redis_scheduler_test.go index c1141bba7..3b9604589 100644 --- a/src/jobservice_v2/period/redis_scheduler_test.go +++ b/src/jobservice_v2/period/redis_scheduler_test.go @@ -20,33 +20,33 @@ func TestScheduler(t *testing.T) { params["image"] = "testing:v1" id, runAt, err := scheduler.Schedule("fake_job", params, "5 * * * * *") if err != nil { - t.Error(err) + t.Fatal(err) } if time.Now().Unix() >= runAt { - t.Error("the running at time of scheduled job should be after now, but seems not") + t.Fatal("the running at time of scheduled job should be after now, but seems not") } if err := scheduler.Load(); err != nil { - t.Error(err) + t.Fatal(err) } if scheduler.pstore.size() != 1 { - t.Errorf("expect 1 item in pstore but got '%d'\n", scheduler.pstore.size()) + t.Fatalf("expect 1 item in pstore but got '%d'\n", scheduler.pstore.size()) } if err := scheduler.UnSchedule(id); err != nil { - t.Error(err) + t.Fatal(err) } if err := scheduler.Clear(); err != nil { - t.Error(err) + t.Fatal(err) } err = tests.Clear(utils.KeyPeriodicPolicy(tests.GiveMeTestNamespace()), redisPool.Get()) err = tests.Clear(utils.KeyPeriodicPolicyScore(tests.GiveMeTestNamespace()), redisPool.Get()) err = tests.Clear(utils.KeyPeriodicNotification(tests.GiveMeTestNamespace()), redisPool.Get()) if err != nil { - t.Error(err) + t.Fatal(err) } } @@ -58,16 +58,16 @@ func TestPubFunc(t *testing.T) { CronSpec: "5 * * * * *", } if err := scheduler.AcceptPeriodicPolicy(p); err != nil { - t.Error(err) + t.Fatal(err) } if scheduler.pstore.size() != 1 { - t.Errorf("expect 1 item in pstore but got '%d' after accepting \n", scheduler.pstore.size()) + t.Fatalf("expect 1 item in pstore but got '%d' after accepting \n", scheduler.pstore.size()) } if rmp := scheduler.RemovePeriodicPolicy("fake_ID"); rmp == nil { - t.Error("expect none nil object returned after removing but got nil") + t.Fatal("expect none nil object returned after removing but got nil") } if scheduler.pstore.size() != 0 { - t.Errorf("expect 0 item in pstore but got '%d' \n", scheduler.pstore.size()) + t.Fatalf("expect 0 item in pstore but got '%d' \n", scheduler.pstore.size()) } } diff --git a/src/jobservice_v2/period/sweeper_test.go b/src/jobservice_v2/period/sweeper_test.go index bdb023084..17cda17b0 100644 --- a/src/jobservice_v2/period/sweeper_test.go +++ b/src/jobservice_v2/period/sweeper_test.go @@ -15,16 +15,16 @@ import ( func TestSweeper(t *testing.T) { epoch := time.Now().Unix() - 1000 if err := createFakeScheduledJob(epoch); err != nil { - t.Error(err) + t.Fatal(err) } ns := tests.GiveMeTestNamespace() sweeper := NewSweeper(ns, redisPool, work.NewClient(ns, redisPool)) if err := sweeper.ClearOutdatedScheduledJobs(); err != nil { - t.Error(err) + t.Fatal(err) } err := tests.Clear(utils.RedisKeyScheduled(ns), redisPool.Get()) if err != nil { - t.Error(err) + t.Fatal(err) } } diff --git a/src/jobservice_v2/pool/message_server_test.go b/src/jobservice_v2/pool/message_server_test.go new file mode 100644 index 000000000..56f6defb2 --- /dev/null +++ b/src/jobservice_v2/pool/message_server_test.go @@ -0,0 +1,148 @@ +// Copyright 2018 The Harbor Authors. All rights reserved. +package pool + +import ( + "context" + "encoding/json" + "errors" + "testing" + "time" + + "github.com/vmware/harbor/src/jobservice_v2/opm" + + "github.com/vmware/harbor/src/jobservice_v2/models" + "github.com/vmware/harbor/src/jobservice_v2/period" + "github.com/vmware/harbor/src/jobservice_v2/utils" + + "github.com/vmware/harbor/src/jobservice_v2/tests" +) + +var redisPool = tests.GiveMeRedisPool() + +func TestPublishPolicy(t *testing.T) { + ms, cancel := createMessageServer() + err := ms.Subscribe(period.EventSchedulePeriodicPolicy, func(data interface{}) error { + if _, ok := data.(*period.PeriodicJobPolicy); !ok { + t.Fatal("expect PeriodicJobPolicy but got other thing") + return errors.New("expect PeriodicJobPolicy but got other thing") + } + return nil + }) + if err != nil { + t.Fatal(err) + } + + err = ms.Subscribe(period.EventUnSchedulePeriodicPolicy, func(data interface{}) error { + if _, ok := data.(*period.PeriodicJobPolicy); !ok { + t.Fatal("expect PeriodicJobPolicy but got other thing") + return errors.New("expect PeriodicJobPolicy but got other thing") + } + + return nil + }) + if err != nil { + t.Fatal(err) + } + + go func() { + defer cancel() + //wait and then publish + <-time.After(200 * time.Millisecond) + + p := &period.PeriodicJobPolicy{ + PolicyID: "fake_ID", + JobName: "fake_job", + CronSpec: "5 * * * *", + } + notification := &models.Message{ + Event: period.EventSchedulePeriodicPolicy, + Data: p, + } + + rawJSON, err := json.Marshal(notification) + if err != nil { + t.Fatal(err) + } + + conn := redisPool.Get() + defer conn.Close() + err = conn.Send("PUBLISH", utils.KeyPeriodicNotification(tests.GiveMeTestNamespace()), rawJSON) + if err != nil { + t.Fatal(err) + } + + notification.Event = period.EventUnSchedulePeriodicPolicy + rawJSON, err = json.Marshal(notification) + if err != nil { + t.Fatal(err) + } + err = conn.Send("PUBLISH", utils.KeyPeriodicNotification(tests.GiveMeTestNamespace()), rawJSON) + if err != nil { + t.Fatal(err) + } + + //send quit signal + <-time.After(200 * time.Millisecond) + err = tests.Clear(utils.KeyPeriodicNotification(tests.GiveMeTestNamespace()), conn) + if err != nil { + t.Fatal(err) + } + }() + + ms.Start() +} + +func TestPublishHook(t *testing.T) { + ms, cancel := createMessageServer() + err := ms.Subscribe(opm.EventRegisterStatusHook, func(data interface{}) error { + if _, ok := data.(*opm.HookData); !ok { + t.Fatal("expect HookData but got other thing") + return errors.New("expect HookData but got other thing") + } + return nil + }) + if err != nil { + t.Fatal(err) + } + + go func() { + defer cancel() + + <-time.After(200 * time.Millisecond) + hook := &opm.HookData{ + JobID: "fake_job_ID", + HookURL: "http://localhost:9999/hook", + } + notification := &models.Message{ + Event: opm.EventRegisterStatusHook, + Data: hook, + } + + rawJSON, err := json.Marshal(notification) + if err != nil { + t.Fatal(err) + } + + conn := redisPool.Get() + defer conn.Close() + err = conn.Send("PUBLISH", utils.KeyPeriodicNotification(tests.GiveMeTestNamespace()), rawJSON) + if err != nil { + t.Fatal(err) + } + + //send quit signal + <-time.After(200 * time.Millisecond) + err = tests.Clear(utils.KeyPeriodicNotification(tests.GiveMeTestNamespace()), conn) + if err != nil { + t.Fatal(err) + } + }() + + ms.Start() +} + +func createMessageServer() (*MessageServer, context.CancelFunc) { + ns := tests.GiveMeTestNamespace() + ctx, cancel := context.WithCancel(context.Background()) + return NewMessageServer(ctx, ns, redisPool), cancel +} diff --git a/src/jobservice_v2/pool/redis_job_wrapper.go b/src/jobservice_v2/pool/redis_job_wrapper.go index 91a08e0bb..51f561948 100644 --- a/src/jobservice_v2/pool/redis_job_wrapper.go +++ b/src/jobservice_v2/pool/redis_job_wrapper.go @@ -3,6 +3,7 @@ package pool import ( + "fmt" "time" "github.com/gocraft/work" @@ -72,6 +73,12 @@ func (rj *RedisJob) Run(j *work.Job) error { } }() + defer func() { + if r := recover(); r != nil { + err = fmt.Errorf("Runtime error: %s", r) + } + }() + //Start to run rj.jobRunning(j.ID) //Inject data diff --git a/src/jobservice_v2/pool/redis_pool.go b/src/jobservice_v2/pool/redis_pool.go index 12fd017f9..c5b0e295f 100644 --- a/src/jobservice_v2/pool/redis_pool.go +++ b/src/jobservice_v2/pool/redis_pool.go @@ -323,6 +323,8 @@ func (gcwp *GoCraftWorkPool) Stats() (models.JobPoolStats, error) { return models.JobPoolStats{}, err } + fmt.Printf("hbs=%+#v\n", hbs[0]) + //Find the heartbeat of this pool via pid stats := make([]*models.JobPoolStatsData, 0) for _, hb := range hbs { diff --git a/src/jobservice_v2/pool/redis_pool_test.go b/src/jobservice_v2/pool/redis_pool_test.go new file mode 100644 index 000000000..8e96dd1d6 --- /dev/null +++ b/src/jobservice_v2/pool/redis_pool_test.go @@ -0,0 +1,413 @@ +// Copyright 2018 The Harbor Authors. All rights reserved. +package pool + +import ( + "context" + "errors" + "reflect" + "sync" + "testing" + "time" + + "github.com/vmware/harbor/src/jobservice_v2/errs" + "github.com/vmware/harbor/src/jobservice_v2/job" + "github.com/vmware/harbor/src/jobservice_v2/logger" + "github.com/vmware/harbor/src/jobservice_v2/opm" + + "github.com/vmware/harbor/src/jobservice_v2/tests" + + "github.com/vmware/harbor/src/jobservice_v2/env" +) + +var rPool = tests.GiveMeRedisPool() + +func TestRegisterJob(t *testing.T) { + wp, _, _ := createRedisWorkerPool() + defer func() { + if err := tests.ClearAll(tests.GiveMeTestNamespace(), redisPool.Get()); err != nil { + t.Error(err) + } + }() + + if err := wp.RegisterJob("fake_job", (*fakeJob)(nil)); err != nil { + t.Error(err) + } + + jobs := make(map[string]interface{}) + jobs["fake_job_1st"] = (*fakeJob)(nil) + jobs["fake_job_2nd"] = (*fakeJob)(nil) + if err := wp.RegisterJobs(jobs); err != nil { + t.Error(err) + } + + if _, ok := wp.IsKnownJob("fake_job"); !ok { + t.Error("expect known job but seems failed to register job 'fake_job'") + } + + params := make(map[string]interface{}) + params["name"] = "testing:v1" + if err := wp.ValidateJobParameters((*fakeJob)(nil), params); err != nil { + t.Error(err) + } +} + +func TestEnqueueJob(t *testing.T) { + wp, sysCtx, cancel := createRedisWorkerPool() + defer func() { + if err := tests.ClearAll(tests.GiveMeTestNamespace(), redisPool.Get()); err != nil { + t.Error(err) + } + }() + defer cancel() + + if err := wp.RegisterJob("fake_job", (*fakeJob)(nil)); err != nil { + t.Error(err) + } + if err := wp.RegisterJob("fake_unique_job", (*fakeUniqueJob)(nil)); err != nil { + t.Error(err) + } + + go wp.Start() + time.Sleep(1 * time.Second) + + params := make(map[string]interface{}) + params["name"] = "testing:v1" + stats, err := wp.Enqueue("fake_job", params, false) + if err != nil { + t.Error(err) + } + if stats.Stats.JobID == "" { + t.Error("expect none nil job stats but got nil") + } + + runAt := time.Now().Unix() + 20 + stats, err = wp.Schedule("fake_job", params, 20, false) + if err != nil { + t.Error(err) + } + + if stats.Stats.RunAt > 0 && stats.Stats.RunAt < runAt { + t.Errorf("expect returned 'RunAt' should be >= '%d' but seems not", runAt) + } + + stats, err = wp.Enqueue("fake_unique_job", params, true) + if err != nil { + t.Error(err) + } + if stats.Stats.JobID == "" { + t.Error("expect none nil job stats but got nil") + } + + cancel() + sysCtx.WG.Wait() +} + +func TestEnqueuePeriodicJob(t *testing.T) { + wp, _, cancel := createRedisWorkerPool() + defer func() { + if err := tests.ClearAll(tests.GiveMeTestNamespace(), redisPool.Get()); err != nil { + t.Error(err) + } + }() + defer cancel() + + if err := wp.RegisterJob("fake_job", (*fakeJob)(nil)); err != nil { + t.Error(err) + } + + go wp.Start() + time.Sleep(1 * time.Second) + + params := make(map[string]interface{}) + params["name"] = "testing:v1" + jobStats, err := wp.PeriodicallyEnqueue("fake_job", params, "10 * * * * *") + if err != nil { + t.Error(err) + } + <-time.After(1 * time.Second) + + jStats, err := wp.GetJobStats(jobStats.Stats.JobID) + if err != nil { + t.Error(err) + } + + if jobStats.Stats.JobName != jStats.Stats.JobName { + t.Error("expect same job stats but got different ones") + } + + if err := wp.StopJob(jStats.Stats.JobID); err != nil { + t.Error(err) + } + + //cancel() + //<-time.After(1 * time.Second) +} + +/*func TestCancelAndRetryJobWithHook(t *testing.T) { + wp, _, cancel := createRedisWorkerPool() + defer func() { + if err := tests.ClearAll(tests.GiveMeTestNamespace(), redisPool.Get()); err != nil { + t.Fatal(err) + } + }() + defer cancel() + + if err := wp.RegisterJob("fake_runnable_job", (*fakeRunnableJob)(nil)); err != nil { + t.Fatal(err) + } + + go wp.Start() + time.Sleep(1 * time.Second) + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintln(w, "ok") + })) + defer ts.Close() + + params := make(map[string]interface{}) + params["name"] = "testing:v1" + res, err := wp.Enqueue("fake_runnable_job", params, false) + if err != nil { + t.Fatal(err) + } + if err := wp.RegisterHook(res.Stats.JobID, ts.URL); err != nil { + t.Fatal(err) + } + //make sure it's running + timer := time.NewTimer(1 * time.Second) + defer timer.Stop() + +CHECK: + <-timer.C + if check, err := wp.GetJobStats(res.Stats.JobID); err != nil { + t.Fatal(err) + } else { + if check.Stats.Status != job.JobStatusRunning { + timer.Reset(1 * time.Second) + goto CHECK + } + } + + //cancel + if err := wp.CancelJob(res.Stats.JobID); err != nil { + t.Fatal(err) + } + <-time.After(5 * time.Second) + updatedRes, err := wp.GetJobStats(res.Stats.JobID) + if err != nil { + t.Fatal(err) + } + if updatedRes.Stats.Status != job.JobStatusCancelled { + t.Fatalf("expect job staus '%s' but got '%s'\n", job.JobStatusCancelled, updatedRes.Stats.Status) + } + if updatedRes.Stats.DieAt == 0 { + t.Fatalf("expect none zero 'DieAt' but got 0 value") + } + + //retry + if err := wp.RetryJob(updatedRes.Stats.JobID); err != nil { + t.Fatal(err) + } +}*/ + +func createRedisWorkerPool() (*GoCraftWorkPool, *env.Context, context.CancelFunc) { + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + envCtx := &env.Context{ + SystemContext: ctx, + WG: new(sync.WaitGroup), + ErrorChan: make(chan error, 1), + JobContext: newContext(ctx), + } + + return NewGoCraftWorkPool(envCtx, tests.GiveMeTestNamespace(), 3, rPool), envCtx, cancel +} + +type fakeJob struct{} + +func (j *fakeJob) MaxFails() uint { + return 3 +} + +func (j *fakeJob) ShouldRetry() bool { + return true +} + +func (j *fakeJob) Validate(params map[string]interface{}) error { + if p, ok := params["name"]; ok { + if p == "testing:v1" { + return nil + } + } + + return errors.New("testing error") +} + +func (j *fakeJob) Run(ctx env.JobContext, params map[string]interface{}) error { + return nil +} + +type fakeUniqueJob struct{} + +func (j *fakeUniqueJob) MaxFails() uint { + return 3 +} + +func (j *fakeUniqueJob) ShouldRetry() bool { + return true +} + +func (j *fakeUniqueJob) Validate(params map[string]interface{}) error { + if p, ok := params["name"]; ok { + if p == "testing:v1" { + return nil + } + } + + return errors.New("testing error") +} + +func (j *fakeUniqueJob) Run(ctx env.JobContext, params map[string]interface{}) error { + return nil +} + +type fakeRunnableJob struct{} + +func (j *fakeRunnableJob) MaxFails() uint { + return 2 +} + +func (j *fakeRunnableJob) ShouldRetry() bool { + return true +} + +func (j *fakeRunnableJob) Validate(params map[string]interface{}) error { + if p, ok := params["name"]; ok { + if p == "testing:v1" { + return nil + } + } + + return errors.New("testing error") +} + +func (j *fakeRunnableJob) Run(ctx env.JobContext, params map[string]interface{}) error { + tk := time.NewTicker(1 * time.Second) + defer tk.Stop() + + for { + select { + case <-tk.C: + cmd, ok := ctx.OPCommand() + if ok { + if cmd == opm.CtlCommandStop { + return errs.JobStoppedError() + } + + return errs.JobCancelledError() + } + case <-ctx.SystemContext().Done(): + return nil + case <-time.After(1 * time.Minute): + return errors.New("fake job timeout") + } + } +} + +type fakeContext struct { + //System context + sysContext context.Context + + //op command func + opCommandFunc job.CheckOPCmdFunc + + //checkin func + checkInFunc job.CheckInFunc + + //other required information + properties map[string]interface{} +} + +func newContext(sysCtx context.Context) *fakeContext { + return &fakeContext{ + sysContext: sysCtx, + properties: make(map[string]interface{}), + } +} + +//Build implements the same method in env.JobContext interface +//This func will build the job execution context before running +func (c *fakeContext) Build(dep env.JobData) (env.JobContext, error) { + jContext := &fakeContext{ + sysContext: c.sysContext, + properties: make(map[string]interface{}), + } + + //Copy properties + if len(c.properties) > 0 { + for k, v := range c.properties { + jContext.properties[k] = v + } + } + + if opCommandFunc, ok := dep.ExtraData["opCommandFunc"]; ok { + if reflect.TypeOf(opCommandFunc).Kind() == reflect.Func { + if funcRef, ok := opCommandFunc.(job.CheckOPCmdFunc); ok { + jContext.opCommandFunc = funcRef + } + } + } + if jContext.opCommandFunc == nil { + return nil, errors.New("failed to inject opCommandFunc") + } + + if checkInFunc, ok := dep.ExtraData["checkInFunc"]; ok { + if reflect.TypeOf(checkInFunc).Kind() == reflect.Func { + if funcRef, ok := checkInFunc.(job.CheckInFunc); ok { + jContext.checkInFunc = funcRef + } + } + } + + if jContext.checkInFunc == nil { + return nil, errors.New("failed to inject checkInFunc") + } + + return jContext, nil +} + +//Get implements the same method in env.JobContext interface +func (c *fakeContext) Get(prop string) (interface{}, bool) { + v, ok := c.properties[prop] + return v, ok +} + +//SystemContext implements the same method in env.JobContext interface +func (c *fakeContext) SystemContext() context.Context { + return c.sysContext +} + +//Checkin is bridge func for reporting detailed status +func (c *fakeContext) Checkin(status string) error { + if c.checkInFunc != nil { + c.checkInFunc(status) + } else { + return errors.New("nil check in function") + } + + return nil +} + +//OPCommand return the control operational command like stop/cancel if have +func (c *fakeContext) OPCommand() (string, bool) { + if c.opCommandFunc != nil { + return c.opCommandFunc() + } + + return "", false +} + +//GetLogger returns the logger +func (c *fakeContext) GetLogger() logger.Interface { + return nil +} diff --git a/src/jobservice_v2/tests/utils.go b/src/jobservice_v2/tests/utils.go index 17c8fe7fb..e42d71291 100644 --- a/src/jobservice_v2/tests/utils.go +++ b/src/jobservice_v2/tests/utils.go @@ -25,8 +25,8 @@ const ( func GiveMeRedisPool() *redis.Pool { redisHost := getRedisHost() redisPool := &redis.Pool{ - MaxActive: 2, - MaxIdle: 2, + MaxActive: 6, + MaxIdle: 6, Wait: true, Dial: func() (redis.Conn, error) { return redis.Dial( @@ -58,6 +58,28 @@ func Clear(key string, conn redis.Conn) error { return errors.New("failed to clear") } +//ClearAll ... +func ClearAll(namespace string, conn redis.Conn) error { + defer conn.Close() + + keys, err := redis.Strings(conn.Do("KEYS", fmt.Sprintf("%s:*", namespace))) + if err != nil { + return err + } + + if len(keys) == 0 { + return nil + } + + for _, key := range keys { + if err := conn.Send("DEL", key); err != nil { + return err + } + } + + return conn.Flush() +} + func getRedisHost() string { redisHost := os.Getenv(testingRedisHost) if redisHost == "" {