From dd156ca24347cbb8f0a06e3601dd1949a0b5664f Mon Sep 17 00:00:00 2001 From: Wenkai Yin Date: Sat, 24 Mar 2018 20:22:24 +0800 Subject: [PATCH 1/4] Handle replication job status hook --- src/common/job/client.go | 14 ++++++++- src/common/job/const.go | 3 ++ src/replication/replicator/replicator.go | 5 +-- src/ui/api/replication_job.go | 26 +++++++++++---- src/ui/service/notifications/jobs/handler.go | 33 +++++++++++++++----- 5 files changed, 64 insertions(+), 17 deletions(-) diff --git a/src/common/job/client.go b/src/common/job/client.go index 078e6ddc3..331cd06a0 100644 --- a/src/common/job/client.go +++ b/src/common/job/client.go @@ -16,7 +16,8 @@ import ( type Client interface { SubmitJob(*models.JobData) (string, error) GetJobLog(uuid string) ([]byte, error) - //TODO actions or stop? Redirect joblog when we see there's memory issue. + PostAction(uuid, action string) error + //TODO Redirect joblog when we see there's memory issue. } // DefaultClient is the default implementation of Client interface @@ -101,3 +102,14 @@ func (d *DefaultClient) GetJobLog(uuid string) ([]byte, error) { } return data, nil } + +// PostAction call jobservice's API to operate action for job specified by uuid +func (d *DefaultClient) PostAction(uuid, action string) error { + url := d.endpoint + "/api/v1/jobs/" + uuid + req := struct { + Action string `json:"action"` + }{ + Action: action, + } + return d.client.Post(url, req) +} diff --git a/src/common/job/const.go b/src/common/job/const.go index 900efe26a..13ef4169c 100644 --- a/src/common/job/const.go +++ b/src/common/job/const.go @@ -31,4 +31,7 @@ const ( JobServiceStatusSuccess = "Success" //JobServiceStatusScheduled : job status scheduled JobServiceStatusScheduled = "Scheduled" + + // JobActionStop : the action to stop the job + JobActionStop = "stop" ) diff --git a/src/replication/replicator/replicator.go b/src/replication/replicator/replicator.go index faa284422..d7d47a0ff 100644 --- a/src/replication/replicator/replicator.go +++ b/src/replication/replicator/replicator.go @@ -15,6 +15,7 @@ package replicator import ( + "fmt" "strings" "github.com/vmware/harbor/src/common/dao" @@ -83,8 +84,8 @@ func (d *DefaultReplicator) Replicate(replication *Replication) error { Metadata: &job_models.JobMetadata{ JobKind: common_job.JobKindGeneric, }, - // TODO - StatusHook: "", + StatusHook: fmt.Sprintf("%s/service/notifications/jobs/replication/%d", + config.InternalUIURL(), id), } if operation == common_models.RepOpTransfer { diff --git a/src/ui/api/replication_job.go b/src/ui/api/replication_job.go index f2446688f..c4ea967ca 100644 --- a/src/ui/api/replication_job.go +++ b/src/ui/api/replication_job.go @@ -21,11 +21,11 @@ import ( "time" "github.com/vmware/harbor/src/common/dao" + common_job "github.com/vmware/harbor/src/common/job" "github.com/vmware/harbor/src/common/models" "github.com/vmware/harbor/src/common/utils/log" "github.com/vmware/harbor/src/replication/core" api_models "github.com/vmware/harbor/src/ui/api/models" - "github.com/vmware/harbor/src/ui/config" "github.com/vmware/harbor/src/ui/utils" ) @@ -176,10 +176,17 @@ func (ra *RepJobAPI) GetLog() { return } - url := buildJobLogURL(strconv.FormatInt(ra.jobID, 10), ReplicationJobType) - err = utils.RequestAsUI(http.MethodGet, url, nil, utils.NewJobLogRespHandler(&ra.BaseAPI)) + logBytes, err := utils.GetJobServiceClient().GetJobLog(job.UUID) if err != nil { - ra.RenderError(http.StatusInternalServerError, err.Error()) + ra.HandleInternalServerError(fmt.Sprintf("failed to get log of job %s: %v", + job.UUID, err)) + return + } + ra.Ctx.ResponseWriter.Header().Set(http.CanonicalHeaderKey("Content-Length"), strconv.Itoa(len(logBytes))) + ra.Ctx.ResponseWriter.Header().Set(http.CanonicalHeaderKey("Content-Type"), "text/plain") + _, err = ra.Ctx.ResponseWriter.Write(logBytes) + if err != nil { + ra.HandleInternalServerError(fmt.Sprintf("failed to write log of job %s: %v", job.UUID, err)) return } } @@ -199,10 +206,17 @@ func (ra *RepJobAPI) StopJobs() { ra.CustomAbort(http.StatusNotFound, fmt.Sprintf("policy %d not found", req.PolicyID)) } - if err = config.GlobalJobserviceClient.StopReplicationJobs(req.PolicyID); err != nil { - ra.HandleInternalServerError(fmt.Sprintf("failed to stop replication jobs of policy %d: %v", req.PolicyID, err)) + jobs, err := dao.GetRepJobByPolicy(policy.ID) + if err != nil { + ra.HandleInternalServerError(fmt.Sprintf("failed to list jobs of policy %d: %v", policy.ID, err)) return } + for _, job := range jobs { + if err = utils.GetJobServiceClient().PostAction(job.UUID, common_job.JobActionStop); err != nil { + log.Errorf("failed to stop job id-%d uuid-%s: %v", job.ID, job.UUID, err) + continue + } + } } //TODO:add Post handler to call job service API to submit jobs by policy diff --git a/src/ui/service/notifications/jobs/handler.go b/src/ui/service/notifications/jobs/handler.go index ada6126af..87f126f0c 100644 --- a/src/ui/service/notifications/jobs/handler.go +++ b/src/ui/service/notifications/jobs/handler.go @@ -37,33 +37,50 @@ var statusMap = map[string]string{ // Handler handles reqeust on /service/notifications/jobs/*, which listens to the webhook of jobservice. type Handler struct { api.BaseController + id int64 + status string } -// HandleScan handles the webhook of scan job -func (h *Handler) HandleScan() { +// Prepare ... +func (h *Handler) Prepare() { id, err := h.GetInt64FromPath(":id") if err != nil { log.Errorf("Failed to get job ID, error: %v", err) //Avoid job service from resending... + h.Abort("200") return } + h.id = id var data jobmodels.JobStatusChange err = json.Unmarshal(h.Ctx.Input.CopyBody(1<<32), &data) if err != nil { log.Errorf("Failed to decode job status change, job ID: %d, error: %v", id, err) + h.Abort("200") return } - status, ok := statusMap[data.Status] log.Debugf("Received scan job status update for job: %d, status: %s", id, data.Status) - if ok { - if err := dao.UpdateScanJobStatus(id, status); err != nil { - log.Errorf("Failed to update job status, id: %d, data: %v", id, data) - h.HandleInternalServerError(err.Error()) - } + status, ok := statusMap[data.Status] + if !ok { + h.Abort("200") + return } + h.status = status +} +// HandleScan handles the webhook of scan job +func (h *Handler) HandleScan() { + if err := dao.UpdateScanJobStatus(h.id, h.status); err != nil { + log.Errorf("Failed to update job status, id: %d, status: %s", h.id, h.status) + h.HandleInternalServerError(err.Error()) + return + } } //HandleReplication handles the webhook of replication job func (h *Handler) HandleReplication() { + if err := dao.UpdateRepJobStatus(h.id, h.status); err != nil { + log.Errorf("Failed to update job status, id: %d, status: %s", h.id, h.status) + h.HandleInternalServerError(err.Error()) + return + } } From 3dc8610bb754b37cf0d6dd19b2b5eb671ad30bf2 Mon Sep 17 00:00:00 2001 From: Steven Zou Date: Wed, 28 Mar 2018 14:54:41 +0800 Subject: [PATCH 2/4] Add UT cases for package opm (update travis yaml to start redis container) --- .travis.yml | 3 + src/jobservice_v2/opm/redis_job_stats_mgr.go | 2 + .../opm/redis_job_stats_mgr_test.go | 267 +++++++++++++++++- tests/docker-compose.test.yml | 7 + 4 files changed, 277 insertions(+), 2 deletions(-) diff --git a/.travis.yml b/.travis.yml index 536583496..e598e200a 100644 --- a/.travis.yml +++ b/.travis.yml @@ -28,11 +28,13 @@ env: AUTH_MODE: db_auth SELF_REGISTRATION: on KEY_PATH: /data/secretkey + REDIS_HOST: localhost before_install: - sudo ./tests/hostcfg.sh - sudo ./tests/generateCerts.sh - sudo ./make/prepare + - sudo mkdir -p "/data/redis" install: - sudo apt-get update && sudo apt-get install -y libldap2-dev @@ -114,6 +116,7 @@ script: - ./tests/swaggerchecker.sh - ./tests/startuptest.sh - ./tests/userlogintest.sh ${HARBOR_ADMIN} ${HARBOR_ADMIN_PASSWD} + - export REDIS_HOST=$IP # - sudo ./tests/testprepare.sh # - go test -v ./tests/apitests diff --git a/src/jobservice_v2/opm/redis_job_stats_mgr.go b/src/jobservice_v2/opm/redis_job_stats_mgr.go index 99339a33c..31d9ccdec 100644 --- a/src/jobservice_v2/opm/redis_job_stats_mgr.go +++ b/src/jobservice_v2/opm/redis_job_stats_mgr.go @@ -153,6 +153,8 @@ func (rjs *RedisJobStatsManager) loop() { if err := rjs.process(item); err != nil { item.fails++ if item.fails < maxFails { + logger.Warningf("Failed to process '%s' request with error: %s\n", item.op, err) + //Retry after a random interval go func() { timer := time.NewTimer(time.Duration(backoff(item.fails)) * time.Second) diff --git a/src/jobservice_v2/opm/redis_job_stats_mgr_test.go b/src/jobservice_v2/opm/redis_job_stats_mgr_test.go index 54a798737..6fd1eb8ee 100644 --- a/src/jobservice_v2/opm/redis_job_stats_mgr_test.go +++ b/src/jobservice_v2/opm/redis_job_stats_mgr_test.go @@ -1,6 +1,269 @@ // Copyright 2018 The Harbor Authors. All rights reserved. package opm -import "testing" +import ( + "context" + "encoding/json" + "errors" + "fmt" + "net/http" + "net/http/httptest" + "os" + "testing" + "time" -func TestRetrieveJob(t *testing.T) {} + "github.com/garyburd/redigo/redis" + "github.com/vmware/harbor/src/jobservice_v2/job" + "github.com/vmware/harbor/src/jobservice_v2/models" + "github.com/vmware/harbor/src/jobservice_v2/utils" +) + +const ( + dialConnectionTimeout = 30 * time.Second + healthCheckPeriod = time.Minute + dialReadTimeout = healthCheckPeriod + 10*time.Second + dialWriteTimeout = 10 * time.Second + testingRedisHost = "REDIS_HOST" + testingNamespace = "testing_job_service_v2" +) + +var redisHost = getRedisHost() +var redisPool = &redis.Pool{ + MaxActive: 2, + MaxIdle: 2, + Wait: true, + Dial: func() (redis.Conn, error) { + return redis.Dial( + "tcp", + fmt.Sprintf("%s:%d", redisHost, 6379), + redis.DialConnectTimeout(dialConnectionTimeout), + redis.DialReadTimeout(dialReadTimeout), + redis.DialWriteTimeout(dialWriteTimeout), + ) + }, +} + +func TestSetJobStatus(t *testing.T) { + mgr := createStatsManager(redisPool) + mgr.Start() + defer mgr.Shutdown() + <-time.After(200 * time.Millisecond) + //make sure data existing + testingStats := createFakeStats() + mgr.Save(testingStats) + <-time.After(200 * time.Millisecond) + + mgr.SetJobStatus("fake_job_ID", "running") + <-time.After(100 * time.Millisecond) + stats, err := mgr.Retrieve("fake_job_ID") + if err != nil { + t.Error(err) + } + + if stats.Stats.Status != "running" { + t.Errorf("expect job status 'running' but got '%s'\n", stats.Stats.Status) + } + + key := utils.KeyJobStats(testingNamespace, "fake_job_ID") + if err := clear(key, redisPool.Get()); err != nil { + t.Error(err) + } +} + +func TestCommand(t *testing.T) { + mgr := createStatsManager(redisPool) + mgr.Start() + defer mgr.Shutdown() + <-time.After(200 * time.Millisecond) + + if err := mgr.SendCommand("fake_job_ID", CtlCommandStop); err != nil { + t.Error(err) + } + + if cmd, err := mgr.CtlCommand("fake_job_ID"); err != nil { + t.Error(err) + } else { + if cmd != CtlCommandStop { + t.Errorf("expect '%s' but got '%s'", CtlCommandStop, cmd) + } + } + + key := utils.KeyJobCtlCommands(testingNamespace, "fake_job_ID") + if err := clear(key, redisPool.Get()); err != nil { + t.Error(err) + } +} + +func TestDieAt(t *testing.T) { + mgr := createStatsManager(redisPool) + mgr.Start() + defer mgr.Shutdown() + <-time.After(200 * time.Millisecond) + + testingStats := createFakeStats() + mgr.Save(testingStats) + + dieAt := time.Now().Unix() + if err := createDeadJob(redisPool.Get(), dieAt); err != nil { + t.Error(err) + } + <-time.After(200 * time.Millisecond) + mgr.DieAt("fake_job_ID", dieAt) + <-time.After(300 * time.Millisecond) + + stats, err := mgr.Retrieve("fake_job_ID") + if err != nil { + t.Error(err) + } + + if stats.Stats.DieAt != dieAt { + t.Errorf("expect die at '%d' but got '%d'\n", dieAt, stats.Stats.DieAt) + } + + key := utils.KeyJobStats(testingNamespace, "fake_job_ID") + if err := clear(key, redisPool.Get()); err != nil { + t.Error(err) + } + key2 := utils.RedisKeyDead(testingNamespace) + if err := clear(key2, redisPool.Get()); err != nil { + t.Error(err) + } +} + +func TestRegisterHook(t *testing.T) { + mgr := createStatsManager(redisPool) + mgr.Start() + defer mgr.Shutdown() + <-time.After(200 * time.Millisecond) + + if err := mgr.RegisterHook("fake_job_ID", "http://localhost:9999", false); err != nil { + t.Error(err) + } + + key := utils.KeyJobStats(testingNamespace, "fake_job_ID") + if err := clear(key, redisPool.Get()); err != nil { + t.Error(err) + } +} + +func TestExpireJobStats(t *testing.T) { + mgr := createStatsManager(redisPool) + mgr.Start() + defer mgr.Shutdown() + <-time.After(200 * time.Millisecond) + + //make sure data existing + testingStats := createFakeStats() + mgr.Save(testingStats) + <-time.After(200 * time.Millisecond) + + if err := mgr.ExpirePeriodicJobStats("fake_job_ID"); err != nil { + t.Error(err) + } + + key := utils.KeyJobStats(testingNamespace, "fake_job_ID") + if err := clear(key, redisPool.Get()); err != nil { + t.Error(err) + } +} + +func TestCheckIn(t *testing.T) { + mgr := createStatsManager(redisPool) + mgr.Start() + defer mgr.Shutdown() + <-time.After(200 * time.Millisecond) + + //make sure data existing + testingStats := createFakeStats() + mgr.Save(testingStats) + <-time.After(200 * time.Millisecond) + + //Start http server + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintln(w, "ok") + })) + defer ts.Close() + + if err := mgr.RegisterHook("fake_job_ID", ts.URL, false); err != nil { + t.Error(err) + } + + mgr.CheckIn("fake_job_ID", "checkin") + <-time.After(200 * time.Millisecond) + + stats, err := mgr.Retrieve("fake_job_ID") + if err != nil { + t.Error(err) + } + + if stats.Stats.CheckIn != "checkin" { + t.Errorf("expect check in info 'checkin' but got '%s'\n", stats.Stats.CheckIn) + } + + key := utils.KeyJobStats(testingNamespace, "fake_job_ID") + if err := clear(key, redisPool.Get()); err != nil { + t.Error(err) + } +} + +func getRedisHost() string { + redisHost := os.Getenv(testingRedisHost) + if redisHost == "" { + redisHost = "10.160.178.186" //for local test + } + + return redisHost +} + +func createStatsManager(redisPool *redis.Pool) JobStatsManager { + ctx := context.Background() + return NewRedisJobStatsManager(ctx, testingNamespace, redisPool) +} + +func clear(key string, conn redis.Conn) error { + if conn != nil { + defer conn.Close() + _, err := conn.Do("DEL", key) + return err + } + + return errors.New("failed to clear") +} + +func createFakeStats() models.JobStats { + testingStats := models.JobStats{ + Stats: &models.JobStatData{ + JobID: "fake_job_ID", + JobKind: job.JobKindPeriodic, + JobName: "fake_job", + Status: "Pending", + IsUnique: false, + RefLink: "/api/v1/jobs/fake_job_ID", + CronSpec: "5 * * * * *", + EnqueueTime: time.Now().Unix(), + UpdateTime: time.Now().Unix(), + }, + } + + return testingStats +} + +func createDeadJob(conn redis.Conn, dieAt int64) error { + dead := make(map[string]interface{}) + dead["name"] = "fake_job" + dead["id"] = "fake_job_ID" + dead["args"] = make(map[string]interface{}) + dead["fails"] = 3 + dead["err"] = "testing error" + dead["failed_at"] = dieAt + + rawJSON, err := json.Marshal(&dead) + if err != nil { + return err + } + + defer conn.Close() + key := utils.RedisKeyDead(testingNamespace) + _, err = conn.Do("ZADD", key, dieAt, rawJSON) + return err +} diff --git a/tests/docker-compose.test.yml b/tests/docker-compose.test.yml index 61596b1fb..d989a95f2 100644 --- a/tests/docker-compose.test.yml +++ b/tests/docker-compose.test.yml @@ -34,3 +34,10 @@ services: - /data/:/data/ ports: - 8888:8080 + redis: + image: vmware/redis-photon:4.0 + restart: always + volumes: + - /data/redis:/data + ports: + - 6379:6379 From d3909bb633ec801eba86a4ffa00c59c8cc1c593a Mon Sep 17 00:00:00 2001 From: Steven Zou Date: Wed, 28 Mar 2018 14:54:41 +0800 Subject: [PATCH 3/4] Add UT cases for package opm (update travis yaml to start redis container) Fix data race issue in opm package --- .travis.yml | 3 + src/jobservice_v2/opm/redis_job_stats_mgr.go | 19 +- .../opm/redis_job_stats_mgr_test.go | 267 +++++++++++++++++- tests/docker-compose.test.yml | 7 + 4 files changed, 288 insertions(+), 8 deletions(-) diff --git a/.travis.yml b/.travis.yml index 536583496..e598e200a 100644 --- a/.travis.yml +++ b/.travis.yml @@ -28,11 +28,13 @@ env: AUTH_MODE: db_auth SELF_REGISTRATION: on KEY_PATH: /data/secretkey + REDIS_HOST: localhost before_install: - sudo ./tests/hostcfg.sh - sudo ./tests/generateCerts.sh - sudo ./make/prepare + - sudo mkdir -p "/data/redis" install: - sudo apt-get update && sudo apt-get install -y libldap2-dev @@ -114,6 +116,7 @@ script: - ./tests/swaggerchecker.sh - ./tests/startuptest.sh - ./tests/userlogintest.sh ${HARBOR_ADMIN} ${HARBOR_ADMIN_PASSWD} + - export REDIS_HOST=$IP # - sudo ./tests/testprepare.sh # - go test -v ./tests/apitests diff --git a/src/jobservice_v2/opm/redis_job_stats_mgr.go b/src/jobservice_v2/opm/redis_job_stats_mgr.go index 99339a33c..0f9951561 100644 --- a/src/jobservice_v2/opm/redis_job_stats_mgr.go +++ b/src/jobservice_v2/opm/redis_job_stats_mgr.go @@ -10,6 +10,7 @@ import ( "math" "math/rand" "strconv" + "sync/atomic" "time" "github.com/vmware/harbor/src/jobservice_v2/errs" @@ -55,12 +56,15 @@ type RedisJobStatsManager struct { stopChan chan struct{} doneChan chan struct{} processChan chan *queueItem - isRunning bool //no need to sync + isRunning *atomic.Value hookStore *HookStore //cache the hook here to avoid requesting backend } //NewRedisJobStatsManager is constructor of RedisJobStatsManager func NewRedisJobStatsManager(ctx context.Context, namespace string, redisPool *redis.Pool) *RedisJobStatsManager { + isRunning := &atomic.Value{} + isRunning.Store(false) + return &RedisJobStatsManager{ namespace: namespace, context: ctx, @@ -69,16 +73,17 @@ func NewRedisJobStatsManager(ctx context.Context, namespace string, redisPool *r doneChan: make(chan struct{}, 1), processChan: make(chan *queueItem, processBufferSize), hookStore: NewHookStore(), + isRunning: isRunning, } } //Start is implementation of same method in JobStatsManager interface. func (rjs *RedisJobStatsManager) Start() { - if rjs.isRunning { + if rjs.isRunning.Load().(bool) { return } go rjs.loop() - rjs.isRunning = true + rjs.isRunning.Store(true) logger.Info("Redis job stats manager is started") } @@ -86,10 +91,10 @@ func (rjs *RedisJobStatsManager) Start() { //Shutdown is implementation of same method in JobStatsManager interface. func (rjs *RedisJobStatsManager) Shutdown() { defer func() { - rjs.isRunning = false + rjs.isRunning.Store(false) }() - if !rjs.isRunning { + if !(rjs.isRunning.Load().(bool)) { return } rjs.stopChan <- struct{}{} @@ -139,7 +144,7 @@ func (rjs *RedisJobStatsManager) loop() { controlChan := make(chan struct{}) defer func() { - rjs.isRunning = false + rjs.isRunning.Store(false) //Notify other sub goroutines close(controlChan) logger.Info("Redis job stats manager is stopped") @@ -153,6 +158,8 @@ func (rjs *RedisJobStatsManager) loop() { if err := rjs.process(item); err != nil { item.fails++ if item.fails < maxFails { + logger.Warningf("Failed to process '%s' request with error: %s\n", item.op, err) + //Retry after a random interval go func() { timer := time.NewTimer(time.Duration(backoff(item.fails)) * time.Second) diff --git a/src/jobservice_v2/opm/redis_job_stats_mgr_test.go b/src/jobservice_v2/opm/redis_job_stats_mgr_test.go index 54a798737..6fd1eb8ee 100644 --- a/src/jobservice_v2/opm/redis_job_stats_mgr_test.go +++ b/src/jobservice_v2/opm/redis_job_stats_mgr_test.go @@ -1,6 +1,269 @@ // Copyright 2018 The Harbor Authors. All rights reserved. package opm -import "testing" +import ( + "context" + "encoding/json" + "errors" + "fmt" + "net/http" + "net/http/httptest" + "os" + "testing" + "time" -func TestRetrieveJob(t *testing.T) {} + "github.com/garyburd/redigo/redis" + "github.com/vmware/harbor/src/jobservice_v2/job" + "github.com/vmware/harbor/src/jobservice_v2/models" + "github.com/vmware/harbor/src/jobservice_v2/utils" +) + +const ( + dialConnectionTimeout = 30 * time.Second + healthCheckPeriod = time.Minute + dialReadTimeout = healthCheckPeriod + 10*time.Second + dialWriteTimeout = 10 * time.Second + testingRedisHost = "REDIS_HOST" + testingNamespace = "testing_job_service_v2" +) + +var redisHost = getRedisHost() +var redisPool = &redis.Pool{ + MaxActive: 2, + MaxIdle: 2, + Wait: true, + Dial: func() (redis.Conn, error) { + return redis.Dial( + "tcp", + fmt.Sprintf("%s:%d", redisHost, 6379), + redis.DialConnectTimeout(dialConnectionTimeout), + redis.DialReadTimeout(dialReadTimeout), + redis.DialWriteTimeout(dialWriteTimeout), + ) + }, +} + +func TestSetJobStatus(t *testing.T) { + mgr := createStatsManager(redisPool) + mgr.Start() + defer mgr.Shutdown() + <-time.After(200 * time.Millisecond) + //make sure data existing + testingStats := createFakeStats() + mgr.Save(testingStats) + <-time.After(200 * time.Millisecond) + + mgr.SetJobStatus("fake_job_ID", "running") + <-time.After(100 * time.Millisecond) + stats, err := mgr.Retrieve("fake_job_ID") + if err != nil { + t.Error(err) + } + + if stats.Stats.Status != "running" { + t.Errorf("expect job status 'running' but got '%s'\n", stats.Stats.Status) + } + + key := utils.KeyJobStats(testingNamespace, "fake_job_ID") + if err := clear(key, redisPool.Get()); err != nil { + t.Error(err) + } +} + +func TestCommand(t *testing.T) { + mgr := createStatsManager(redisPool) + mgr.Start() + defer mgr.Shutdown() + <-time.After(200 * time.Millisecond) + + if err := mgr.SendCommand("fake_job_ID", CtlCommandStop); err != nil { + t.Error(err) + } + + if cmd, err := mgr.CtlCommand("fake_job_ID"); err != nil { + t.Error(err) + } else { + if cmd != CtlCommandStop { + t.Errorf("expect '%s' but got '%s'", CtlCommandStop, cmd) + } + } + + key := utils.KeyJobCtlCommands(testingNamespace, "fake_job_ID") + if err := clear(key, redisPool.Get()); err != nil { + t.Error(err) + } +} + +func TestDieAt(t *testing.T) { + mgr := createStatsManager(redisPool) + mgr.Start() + defer mgr.Shutdown() + <-time.After(200 * time.Millisecond) + + testingStats := createFakeStats() + mgr.Save(testingStats) + + dieAt := time.Now().Unix() + if err := createDeadJob(redisPool.Get(), dieAt); err != nil { + t.Error(err) + } + <-time.After(200 * time.Millisecond) + mgr.DieAt("fake_job_ID", dieAt) + <-time.After(300 * time.Millisecond) + + stats, err := mgr.Retrieve("fake_job_ID") + if err != nil { + t.Error(err) + } + + if stats.Stats.DieAt != dieAt { + t.Errorf("expect die at '%d' but got '%d'\n", dieAt, stats.Stats.DieAt) + } + + key := utils.KeyJobStats(testingNamespace, "fake_job_ID") + if err := clear(key, redisPool.Get()); err != nil { + t.Error(err) + } + key2 := utils.RedisKeyDead(testingNamespace) + if err := clear(key2, redisPool.Get()); err != nil { + t.Error(err) + } +} + +func TestRegisterHook(t *testing.T) { + mgr := createStatsManager(redisPool) + mgr.Start() + defer mgr.Shutdown() + <-time.After(200 * time.Millisecond) + + if err := mgr.RegisterHook("fake_job_ID", "http://localhost:9999", false); err != nil { + t.Error(err) + } + + key := utils.KeyJobStats(testingNamespace, "fake_job_ID") + if err := clear(key, redisPool.Get()); err != nil { + t.Error(err) + } +} + +func TestExpireJobStats(t *testing.T) { + mgr := createStatsManager(redisPool) + mgr.Start() + defer mgr.Shutdown() + <-time.After(200 * time.Millisecond) + + //make sure data existing + testingStats := createFakeStats() + mgr.Save(testingStats) + <-time.After(200 * time.Millisecond) + + if err := mgr.ExpirePeriodicJobStats("fake_job_ID"); err != nil { + t.Error(err) + } + + key := utils.KeyJobStats(testingNamespace, "fake_job_ID") + if err := clear(key, redisPool.Get()); err != nil { + t.Error(err) + } +} + +func TestCheckIn(t *testing.T) { + mgr := createStatsManager(redisPool) + mgr.Start() + defer mgr.Shutdown() + <-time.After(200 * time.Millisecond) + + //make sure data existing + testingStats := createFakeStats() + mgr.Save(testingStats) + <-time.After(200 * time.Millisecond) + + //Start http server + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintln(w, "ok") + })) + defer ts.Close() + + if err := mgr.RegisterHook("fake_job_ID", ts.URL, false); err != nil { + t.Error(err) + } + + mgr.CheckIn("fake_job_ID", "checkin") + <-time.After(200 * time.Millisecond) + + stats, err := mgr.Retrieve("fake_job_ID") + if err != nil { + t.Error(err) + } + + if stats.Stats.CheckIn != "checkin" { + t.Errorf("expect check in info 'checkin' but got '%s'\n", stats.Stats.CheckIn) + } + + key := utils.KeyJobStats(testingNamespace, "fake_job_ID") + if err := clear(key, redisPool.Get()); err != nil { + t.Error(err) + } +} + +func getRedisHost() string { + redisHost := os.Getenv(testingRedisHost) + if redisHost == "" { + redisHost = "10.160.178.186" //for local test + } + + return redisHost +} + +func createStatsManager(redisPool *redis.Pool) JobStatsManager { + ctx := context.Background() + return NewRedisJobStatsManager(ctx, testingNamespace, redisPool) +} + +func clear(key string, conn redis.Conn) error { + if conn != nil { + defer conn.Close() + _, err := conn.Do("DEL", key) + return err + } + + return errors.New("failed to clear") +} + +func createFakeStats() models.JobStats { + testingStats := models.JobStats{ + Stats: &models.JobStatData{ + JobID: "fake_job_ID", + JobKind: job.JobKindPeriodic, + JobName: "fake_job", + Status: "Pending", + IsUnique: false, + RefLink: "/api/v1/jobs/fake_job_ID", + CronSpec: "5 * * * * *", + EnqueueTime: time.Now().Unix(), + UpdateTime: time.Now().Unix(), + }, + } + + return testingStats +} + +func createDeadJob(conn redis.Conn, dieAt int64) error { + dead := make(map[string]interface{}) + dead["name"] = "fake_job" + dead["id"] = "fake_job_ID" + dead["args"] = make(map[string]interface{}) + dead["fails"] = 3 + dead["err"] = "testing error" + dead["failed_at"] = dieAt + + rawJSON, err := json.Marshal(&dead) + if err != nil { + return err + } + + defer conn.Close() + key := utils.RedisKeyDead(testingNamespace) + _, err = conn.Do("ZADD", key, dieAt, rawJSON) + return err +} diff --git a/tests/docker-compose.test.yml b/tests/docker-compose.test.yml index 61596b1fb..d989a95f2 100644 --- a/tests/docker-compose.test.yml +++ b/tests/docker-compose.test.yml @@ -34,3 +34,10 @@ services: - /data/:/data/ ports: - 8888:8080 + redis: + image: vmware/redis-photon:4.0 + restart: always + volumes: + - /data/redis:/data + ports: + - 6379:6379 From 33c3603af2b2fb22e351f27b4fd41f9dc34f2c79 Mon Sep 17 00:00:00 2001 From: Steven Zou Date: Wed, 28 Mar 2018 15:57:59 +0800 Subject: [PATCH 4/4] Add waiting time before triggering http request --- src/jobservice_v2/api/handler_test.go | 8 +++ src/jobservice_v2/period/job_policy_test.go | 68 +++++++++++++++++++++ 2 files changed, 76 insertions(+) create mode 100644 src/jobservice_v2/period/job_policy_test.go diff --git a/src/jobservice_v2/api/handler_test.go b/src/jobservice_v2/api/handler_test.go index 78bfaa150..e3b225a5d 100644 --- a/src/jobservice_v2/api/handler_test.go +++ b/src/jobservice_v2/api/handler_test.go @@ -31,6 +31,7 @@ var client = &http.Client{ func TestLaunchJobFailed(t *testing.T) { server, port, ctx := createServer() server.Start() + <-time.After(200 * time.Millisecond) resData, err := postReq(fmt.Sprintf("http://localhost:%d/api/v1/jobs", port), createJobReq(false)) if e := expectFormatedError(resData, err); e != nil { @@ -44,6 +45,7 @@ func TestLaunchJobFailed(t *testing.T) { func TestLaunchJobSucceed(t *testing.T) { server, port, ctx := createServer() server.Start() + <-time.After(200 * time.Millisecond) res, err := postReq(fmt.Sprintf("http://localhost:%d/api/v1/jobs", port), createJobReq(true)) if err != nil { @@ -64,6 +66,7 @@ func TestLaunchJobSucceed(t *testing.T) { func TestGetJobFailed(t *testing.T) { server, port, ctx := createServer() server.Start() + <-time.After(200 * time.Millisecond) res, err := getReq(fmt.Sprintf("http://localhost:%d/api/v1/jobs/fake_job", port)) if e := expectFormatedError(res, err); e != nil { @@ -77,6 +80,7 @@ func TestGetJobFailed(t *testing.T) { func TestGetJobSucceed(t *testing.T) { server, port, ctx := createServer() server.Start() + <-time.After(200 * time.Millisecond) res, err := getReq(fmt.Sprintf("http://localhost:%d/api/v1/jobs/fake_job_ok", port)) if err != nil { @@ -97,6 +101,7 @@ func TestGetJobSucceed(t *testing.T) { func TestJobActionFailed(t *testing.T) { server, port, ctx := createServer() server.Start() + <-time.After(200 * time.Millisecond) actionReq, err := createJobActionReq("stop") if err != nil { @@ -126,6 +131,7 @@ func TestJobActionFailed(t *testing.T) { func TestJobActionSucceed(t *testing.T) { server, port, ctx := createServer() server.Start() + <-time.After(200 * time.Millisecond) actionReq, err := createJobActionReq("stop") if err != nil { @@ -161,6 +167,7 @@ func TestJobActionSucceed(t *testing.T) { func TestCheckStatus(t *testing.T) { server, port, ctx := createServer() server.Start() + <-time.After(200 * time.Millisecond) resData, err := getReq(fmt.Sprintf("http://localhost:%d/api/v1/stats", port)) if err != nil { @@ -186,6 +193,7 @@ func TestCheckStatus(t *testing.T) { func TestGetJobLog(t *testing.T) { server, port, ctx := createServer() server.Start() + <-time.After(200 * time.Millisecond) resData, err := getReq(fmt.Sprintf("http://localhost:%d/api/v1/jobs/fake_job_ok/log", port)) if err != nil { diff --git a/src/jobservice_v2/period/job_policy_test.go b/src/jobservice_v2/period/job_policy_test.go new file mode 100644 index 000000000..23888e7df --- /dev/null +++ b/src/jobservice_v2/period/job_policy_test.go @@ -0,0 +1,68 @@ +// Copyright 2018 The Harbor Authors. All rights reserved. +package period + +import ( + "fmt" + "math/rand" + "sync" + "testing" + "time" +) + +func TestPeriodicJobPolicy(t *testing.T) { + p := createPolicy("") + + data, err := p.Serialize() + if err != nil { + t.Error(err) + } + + if err := p.DeSerialize(data); err != nil { + t.Error(err) + } +} + +func TestPeriodicJobPolicyStore(t *testing.T) { + ps := &periodicJobPolicyStore{ + lock: new(sync.RWMutex), + policies: make(map[string]*PeriodicJobPolicy), + } + + ps.add(createPolicy("fake_ID_Steven")) + if ps.size() != 1 { + t.Errorf("expect size 1 but got '%d'\n", ps.size()) + } + pl := make([]*PeriodicJobPolicy, 0) + pl = append(pl, createPolicy("")) + pl = append(pl, createPolicy("")) + ps.addAll(pl) + if ps.size() != 3 { + t.Errorf("expect size 3 but got '%d'\n", ps.size()) + } + + l := ps.list() + if l == nil || len(l) != 3 { + t.Error("expect a policy list with 3 items but got invalid list") + } + + rp := ps.remove("fake_ID_Steven") + if rp == nil { + t.Error("expect none nil policy object but got nil") + } +} + +func createPolicy(id string) *PeriodicJobPolicy { + theID := id + if theID == "" { + theID = fmt.Sprintf("fake_ID_%d", time.Now().UnixNano()+int64(rand.Intn(1000))) + } + p := &PeriodicJobPolicy{ + PolicyID: theID, + JobName: "fake_job", + JobParameters: make(map[string]interface{}), + CronSpec: "5 * * * * *", + } + p.JobParameters["image"] = "testing:v1" + + return p +}