From 6c6dbbe7ff72746b6d2b72ba90c08f92db2ca030 Mon Sep 17 00:00:00 2001 From: Steven Zou Date: Mon, 2 Apr 2018 17:04:03 +0800 Subject: [PATCH] Fix issue of stopping periodic job improve op command by using cache return 404 if no job found to stop --- src/jobservice/job/impl/demo_job.go | 6 +- src/jobservice/opm/op_commands.go | 166 ++++++++++++++++++ src/jobservice/opm/redis_job_stats_mgr.go | 64 +++---- .../opm/redis_job_stats_mgr_test.go | 5 - src/jobservice/period/redis_scheduler.go | 6 + src/jobservice/pool/message_server.go | 3 + src/jobservice/pool/message_server_test.go | 51 ++++++ src/jobservice/pool/redis_pool.go | 47 ++++- src/jobservice/utils/utils.go | 14 ++ 9 files changed, 306 insertions(+), 56 deletions(-) create mode 100644 src/jobservice/opm/op_commands.go diff --git a/src/jobservice/job/impl/demo_job.go b/src/jobservice/job/impl/demo_job.go index dd3930ddd..2be68d4b4 100644 --- a/src/jobservice/job/impl/demo_job.go +++ b/src/jobservice/job/impl/demo_job.go @@ -55,6 +55,7 @@ func (dj *DemoJob) Run(ctx env.JobContext, params map[string]interface{}) error logger.Info("I'm finished, exit!") fmt.Println("I'm finished, exit!") }() + fmt.Println("I'm running") logger.Info("=======Replication job running=======") logger.Infof("params: %#v\n", params) logger.Infof("context: %#v\n", ctx) @@ -81,11 +82,12 @@ func (dj *DemoJob) Run(ctx env.JobContext, params map[string]interface{}) error //HOLD ON FOR A WHILE logger.Error("Holding for 20 sec") - <-time.After(10 * time.Second) + <-time.After(15 * time.Second) //logger.Fatal("I'm back, check if I'm stopped/cancelled") if cmd, ok := ctx.OPCommand(); ok { logger.Infof("cmd=%s\n", cmd) + fmt.Printf("Receive OP command: %s\n", cmd) if cmd == opm.CtlCommandCancel { logger.Info("exit for receiving cancel signal") return errs.JobCancelledError() @@ -95,7 +97,7 @@ func (dj *DemoJob) Run(ctx env.JobContext, params map[string]interface{}) error return errs.JobStoppedError() } - fmt.Println("I'm here") + fmt.Println("I'm close to end") return nil } diff --git a/src/jobservice/opm/op_commands.go b/src/jobservice/opm/op_commands.go new file mode 100644 index 000000000..c3668ac19 --- /dev/null +++ b/src/jobservice/opm/op_commands.go @@ -0,0 +1,166 @@ +// Copyright 2018 The Harbor Authors. All rights reserved. + +package opm + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "sync" + "time" + + "github.com/garyburd/redigo/redis" + "github.com/vmware/harbor/src/jobservice/logger" + "github.com/vmware/harbor/src/jobservice/models" + "github.com/vmware/harbor/src/jobservice/utils" +) + +const ( + commandValidTime = 5 * time.Minute + commandSweepTickerTime = 1 * time.Hour + //EventFireCommand for firing command event + EventFireCommand = "fire_command" +) + +type oPCommand struct { + command string + fireTime int64 +} + +//oPCommands maintain commands list +type oPCommands struct { + lock *sync.RWMutex + commands map[string]*oPCommand + context context.Context + redisPool *redis.Pool + namespace string + stopChan chan struct{} + doneChan chan struct{} +} + +//newOPCommands is constructor of OPCommands +func newOPCommands(ctx context.Context, ns string, redisPool *redis.Pool) *oPCommands { + return &oPCommands{ + lock: new(sync.RWMutex), + commands: make(map[string]*oPCommand), + context: ctx, + redisPool: redisPool, + namespace: ns, + stopChan: make(chan struct{}, 1), + doneChan: make(chan struct{}, 1), + } +} + +//Start the command sweeper +func (opc *oPCommands) Start() { + go opc.loop() + logger.Info("OP commands sweeper is started") +} + +//Stop the command sweeper +func (opc *oPCommands) Stop() { + opc.stopChan <- struct{}{} + <-opc.doneChan +} + +//Fire command +func (opc *oPCommands) Fire(jobID string, command string) error { + if utils.IsEmptyStr(jobID) { + return errors.New("empty job ID") + } + + if command != CtlCommandStop && command != CtlCommandCancel { + return fmt.Errorf("Unsupported command %s", command) + } + + notification := &models.Message{ + Event: EventFireCommand, + Data: []string{jobID, command}, + } + + rawJSON, err := json.Marshal(notification) + if err != nil { + return err + } + + conn := opc.redisPool.Get() + defer conn.Close() + + _, err = conn.Do("PUBLISH", utils.KeyPeriodicNotification(opc.namespace), rawJSON) + + return err +} + +//Push command into the list +func (opc *oPCommands) Push(jobID string, command string) error { + if utils.IsEmptyStr(jobID) { + return errors.New("empty job ID") + } + + if command != CtlCommandStop && command != CtlCommandCancel { + return fmt.Errorf("Unsupported command %s", command) + } + + opc.lock.Lock() + defer opc.lock.Unlock() + + opc.commands[jobID] = &oPCommand{ + command: command, + fireTime: time.Now().Unix(), + } + + return nil +} + +//Pop out the command if existing +func (opc *oPCommands) Pop(jobID string) (string, bool) { + if utils.IsEmptyStr(jobID) { + return "", false + } + + opc.lock.RLock() + defer opc.lock.RUnlock() + + c, ok := opc.commands[jobID] + if ok { + if time.Unix(c.fireTime, 0).Add(commandValidTime).After(time.Now()) { + delete(opc.commands, jobID) + return c.command, true + } + } + + return "", false +} + +func (opc *oPCommands) loop() { + defer func() { + logger.Info("OP commands is stopped") + opc.doneChan <- struct{}{} + }() + + tk := time.NewTicker(commandSweepTickerTime) + defer tk.Stop() + + for { + select { + case <-tk.C: + opc.sweepCommands() + case <-opc.context.Done(): + return + case <-opc.stopChan: + return + } + } +} + +func (opc *oPCommands) sweepCommands() { + opc.lock.Lock() + defer opc.lock.Unlock() + + for k, v := range opc.commands { + if time.Unix(v.fireTime, 0).Add(commandValidTime).After(time.Now()) { + delete(opc.commands, k) + } + } +} diff --git a/src/jobservice/opm/redis_job_stats_mgr.go b/src/jobservice/opm/redis_job_stats_mgr.go index 086c21131..675413a7b 100644 --- a/src/jobservice/opm/redis_job_stats_mgr.go +++ b/src/jobservice/opm/redis_job_stats_mgr.go @@ -57,7 +57,8 @@ type RedisJobStatsManager struct { doneChan chan struct{} processChan chan *queueItem isRunning *atomic.Value - hookStore *HookStore //cache the hook here to avoid requesting backend + hookStore *HookStore //cache the hook here to avoid requesting backend + opCommands *oPCommands //maintain the OP commands } //NewRedisJobStatsManager is constructor of RedisJobStatsManager @@ -74,6 +75,7 @@ func NewRedisJobStatsManager(ctx context.Context, namespace string, redisPool *r processChan: make(chan *queueItem, processBufferSize), hookStore: NewHookStore(), isRunning: isRunning, + opCommands: newOPCommands(ctx, namespace, redisPool), } } @@ -83,6 +85,7 @@ func (rjs *RedisJobStatsManager) Start() { return } go rjs.loop() + rjs.opCommands.Start() rjs.isRunning.Store(true) logger.Info("Redis job stats manager is started") @@ -97,6 +100,8 @@ func (rjs *RedisJobStatsManager) Shutdown() { if !(rjs.isRunning.Load().(bool)) { return } + + rjs.opCommands.Stop() rjs.stopChan <- struct{}{} <-rjs.doneChan } @@ -213,7 +218,12 @@ func (rjs *RedisJobStatsManager) SendCommand(jobID string, command string) error return errors.New("unknown command") } - return rjs.writeCtlCommand(jobID, command) + if err := rjs.opCommands.Fire(jobID, command); err != nil { + return err + } + + //Directly add to op commands maintaining list + return rjs.opCommands.Push(jobID, command) } //CheckIn mesage @@ -239,7 +249,12 @@ func (rjs *RedisJobStatsManager) CtlCommand(jobID string) (string, error) { return "", errors.New("empty job ID") } - return rjs.getCrlCommand(jobID) + c, ok := rjs.opCommands.Pop(jobID) + if !ok { + return "", fmt.Errorf("no OP command fired to job %s", jobID) + } + + return c, nil } //DieAt marks the failed jobs with the time they put into dead queue. @@ -262,7 +277,7 @@ func (rjs *RedisJobStatsManager) RegisterHook(jobID string, hookURL string, isCa return errors.New("empty job ID") } - if utils.IsEmptyStr(hookURL) { + if !utils.IsValidURL(hookURL) { return errors.New("invalid hook url") } @@ -302,7 +317,7 @@ func (rjs *RedisJobStatsManager) submitStatusReportingItem(jobID string, status, if !ok { //Retrieve from backend hookURL, err = rjs.getHook(jobID) - if err != nil { + if err != nil || !utils.IsValidURL(hookURL) { //logged and exit logger.Warningf("no status hook found for job %s\n, abandon status reporting", jobID) return @@ -328,45 +343,6 @@ func (rjs *RedisJobStatsManager) reportStatus(jobID string, hookURL, status, che return DefaultHookClient.ReportStatus(hookURL, reportingStatus) } -func (rjs *RedisJobStatsManager) getCrlCommand(jobID string) (string, error) { - conn := rjs.redisPool.Get() - defer conn.Close() - - key := utils.KeyJobCtlCommands(rjs.namespace, jobID) - cmd, err := redis.String(conn.Do("HGET", key, "command")) - if err != nil { - return "", err - } - //try to DEL it after getting the command - //Ignore the error,leave it as dirty data - _, err = conn.Do("DEL", key) - if err != nil { - //only logged - logger.Errorf("del key %s failed with error: %s\n", key, err) - } - - return cmd, nil -} - -func (rjs *RedisJobStatsManager) writeCtlCommand(jobID string, command string) error { - conn := rjs.redisPool.Get() - defer conn.Close() - - key := utils.KeyJobCtlCommands(rjs.namespace, jobID) - args := make([]interface{}, 0, 5) - args = append(args, key, "command", command, "fire_time", time.Now().Unix()) - if err := conn.Send("HMSET", args...); err != nil { - return err - } - - expireTime := 24*60*60 + rand.Int63n(10) - if err := conn.Send("EXPIRE", key, expireTime); err != nil { - return err - } - - return conn.Flush() -} - func (rjs *RedisJobStatsManager) updateJobStatus(jobID string, status string) error { conn := rjs.redisPool.Get() defer conn.Close() diff --git a/src/jobservice/opm/redis_job_stats_mgr_test.go b/src/jobservice/opm/redis_job_stats_mgr_test.go index 59e6d188f..a5c5cc8cb 100644 --- a/src/jobservice/opm/redis_job_stats_mgr_test.go +++ b/src/jobservice/opm/redis_job_stats_mgr_test.go @@ -87,11 +87,6 @@ func TestCommand(t *testing.T) { t.Fatalf("expect '%s' but got '%s'", CtlCommandStop, cmd) } } - - key := utils.KeyJobCtlCommands(testingNamespace, "fake_job_ID") - if err := clear(key, redisPool.Get()); err != nil { - t.Fatal(err) - } } func TestDieAt(t *testing.T) { diff --git a/src/jobservice/period/redis_scheduler.go b/src/jobservice/period/redis_scheduler.go index 3b74cc19f..7f80c35b4 100644 --- a/src/jobservice/period/redis_scheduler.go +++ b/src/jobservice/period/redis_scheduler.go @@ -9,6 +9,8 @@ import ( "sync" "time" + "github.com/vmware/harbor/src/jobservice/errs" + "github.com/robfig/cron" "github.com/garyburd/redigo/redis" @@ -156,6 +158,10 @@ func (rps *RedisPeriodicScheduler) UnSchedule(cronJobPolicyID string) error { } score, err := rps.getScoreByID(cronJobPolicyID) + if err == redis.ErrNil { + return errs.NoObjectFoundError(err.Error()) + } + if err != nil { return err } diff --git a/src/jobservice/pool/message_server.go b/src/jobservice/pool/message_server.go index 05f91d368..471cff980 100644 --- a/src/jobservice/pool/message_server.go +++ b/src/jobservice/pool/message_server.go @@ -89,6 +89,9 @@ func (ms *MessageServer) Start() error { dt, _ := json.Marshal(m.Data) json.Unmarshal(dt, hookObject) converted = hookObject + case opm.EventFireCommand: + //no need to convert []string + converted = m.Data } res := callback.Call([]reflect.Value{reflect.ValueOf(converted)}) e := res[0].Interface() diff --git a/src/jobservice/pool/message_server_test.go b/src/jobservice/pool/message_server_test.go index d53efb6d0..6ba5e38cd 100644 --- a/src/jobservice/pool/message_server_test.go +++ b/src/jobservice/pool/message_server_test.go @@ -5,6 +5,7 @@ import ( "context" "encoding/json" "errors" + "fmt" "testing" "time" @@ -141,6 +142,56 @@ func TestPublishHook(t *testing.T) { ms.Start() } +func TestPublishCommands(t *testing.T) { + ms, cancel := createMessageServer() + err := ms.Subscribe(opm.EventFireCommand, func(data interface{}) error { + cmds, ok := data.([]string) + if !ok { + t.Fatal("expect fired command but got other thing") + return errors.New("expect fired command but got other thing") + } + if len(cmds) != 2 { + t.Fatalf("expect a array with 2 items but only got '%d' items", len(cmds)) + return fmt.Errorf("expect a array with 2 items but only got '%d' items", len(cmds)) + } + if cmds[1] != "stop" { + t.Fatalf("expect command 'stop' but got '%s'", cmds[1]) + return fmt.Errorf("expect command 'stop' but got '%s'", cmds[1]) + } + return nil + }) + if err != nil { + t.Fatal(err) + } + + go func() { + defer cancel() + <-time.After(200 * time.Millisecond) + + notification := &models.Message{ + Event: opm.EventRegisterStatusHook, + Data: []string{"fake_job_ID", "stop"}, + } + + rawJSON, err := json.Marshal(notification) + if err != nil { + t.Fatal(err) + } + + conn := redisPool.Get() + defer conn.Close() + err = conn.Send("PUBLISH", utils.KeyPeriodicNotification(tests.GiveMeTestNamespace()), rawJSON) + if err != nil { + t.Fatal(err) + } + + //hold for a while + <-time.After(200 * time.Millisecond) + }() + + ms.Start() +} + func createMessageServer() (*MessageServer, context.CancelFunc) { ns := tests.GiveMeTestNamespace() ctx, cancel := context.WithCancel(context.Background()) diff --git a/src/jobservice/pool/redis_pool.go b/src/jobservice/pool/redis_pool.go index efc1c4fd8..384b96d3e 100644 --- a/src/jobservice/pool/redis_pool.go +++ b/src/jobservice/pool/redis_pool.go @@ -123,6 +123,12 @@ func (gcwp *GoCraftWorkPool) Start() { }); err != nil { return } + if err = gcwp.messageServer.Subscribe(opm.EventFireCommand, + func(data interface{}) error { + return gcwp.handleOPCommandFiring(data) + }); err != nil { + return + } //Start message server if err = gcwp.messageServer.Start(); err != nil { @@ -323,8 +329,6 @@ func (gcwp *GoCraftWorkPool) Stats() (models.JobPoolStats, error) { return models.JobPoolStats{}, err } - fmt.Printf("hbs=%+#v\n", hbs[0]) - //Find the heartbeat of this pool via pid stats := make([]*models.JobPoolStatsData, 0) for _, hb := range hbs { @@ -367,9 +371,14 @@ func (gcwp *GoCraftWorkPool) StopJob(jobID string) error { return err } + needSetStopStatus := false + switch theJob.Stats.JobKind { case job.JobKindGeneric: - //nothing need to do + //Only running job can be stopped + if theJob.Stats.Status != job.JobStatusRunning { + return fmt.Errorf("job '%s' is not a running job", jobID) + } case job.JobKindScheduled: //we need to delete the scheduled job in the queue if it is not running yet //otherwise, nothing need to do @@ -377,6 +386,7 @@ func (gcwp *GoCraftWorkPool) StopJob(jobID string) error { if err := gcwp.client.DeleteScheduledJob(theJob.Stats.RunAt, jobID); err != nil { return err } + needSetStopStatus = true } case job.JobKindPeriodic: //firstly delete the periodic job policy @@ -390,6 +400,8 @@ func (gcwp *GoCraftWorkPool) StopJob(jobID string) error { //only logged logger.Errorf("Expire the stats of job %s failed with error: %s\n", theJob.Stats.JobID, err) } + + needSetStopStatus = true default: break } @@ -400,6 +412,13 @@ func (gcwp *GoCraftWorkPool) StopJob(jobID string) error { if err := gcwp.statsManager.SendCommand(jobID, opm.CtlCommandStop); err != nil { return err } + //The job running instance will set the status to 'stopped' + needSetStopStatus = false + } + + //If needed, update the job status to 'stopped' + if needSetStopStatus { + gcwp.statsManager.SetJobStatus(jobID, job.JobStatusStopped) } return nil @@ -475,8 +494,8 @@ func (gcwp *GoCraftWorkPool) RegisterHook(jobID string, hookURL string) error { return errors.New("empty job ID") } - if utils.IsEmptyStr(hookURL) { - return errors.New("empty hook url") + if !utils.IsValidURL(hookURL) { + return errors.New("invalid hook url") } return gcwp.statsManager.RegisterHook(jobID, hookURL, false) @@ -550,6 +569,24 @@ func (gcwp *GoCraftWorkPool) handleRegisterStatusHook(data interface{}) error { return gcwp.statsManager.RegisterHook(hook.JobID, hook.HookURL, true) } +func (gcwp *GoCraftWorkPool) handleOPCommandFiring(data interface{}) error { + if data == nil { + return errors.New("nil data interface") + } + + commands, ok := data.([]interface{}) + if !ok || len(commands) != 2 { + return errors.New("malformed op commands object") + } + jobID, ok := commands[0].(string) + command, ok := commands[1].(string) + if !ok { + return errors.New("malformed op command info") + } + + return gcwp.statsManager.SendCommand(jobID, command) +} + //log the job func (rpc *RedisPoolContext) logJob(job *work.Job, next work.NextMiddlewareFunc) error { logger.Infof("Job incoming: %s:%s", job.Name, job.ID) diff --git a/src/jobservice/utils/utils.go b/src/jobservice/utils/utils.go index 7fcfaa185..db34310b2 100644 --- a/src/jobservice/utils/utils.go +++ b/src/jobservice/utils/utils.go @@ -5,6 +5,7 @@ package utils import ( "errors" + "net/url" "os" "strings" @@ -57,6 +58,19 @@ func IsValidPort(port uint) bool { return port != 0 && port < 65536 } +//IsValidURL validates if the url is well-formted +func IsValidURL(address string) bool { + if IsEmptyStr(address) { + return false + } + + if _, err := url.Parse(address); err != nil { + return false + } + + return true +} + //JobScore represents the data item with score in the redis db. type JobScore struct { JobBytes []byte