From d392e27ef96da63ba426fdc899903907f5656a82 Mon Sep 17 00:00:00 2001 From: Steven Zou Date: Tue, 28 Jul 2020 20:48:08 +0800 Subject: [PATCH] fix(preheat):enhance preheat job - add job stop check points in preheat job - add missing digest property for the preheat request sent to the provider Signed-off-by: Steven Zou --- src/controller/p2p/preheat/enforcer.go | 1 + src/pkg/p2p/preheat/job.go | 28 +++++++++++++++++++++++++- src/pkg/p2p/preheat/job_test.go | 1 + 3 files changed, 29 insertions(+), 1 deletion(-) diff --git a/src/controller/p2p/preheat/enforcer.go b/src/controller/p2p/preheat/enforcer.go index f52101fdf..3e54eaecf 100644 --- a/src/controller/p2p/preheat/enforcer.go +++ b/src/controller/p2p/preheat/enforcer.go @@ -434,6 +434,7 @@ func (de *defaultEnforcer) startTask(ctx context.Context, executionID int64, can }, ImageName: fmt.Sprintf("%s/%s", candidate.Namespace, candidate.Repository), Tag: candidate.Tags[0], + Digest: candidate.Digest, } piData, err := pi.ToJSON() diff --git a/src/pkg/p2p/preheat/job.go b/src/pkg/p2p/preheat/job.go index 1edef2178..c11a87d9f 100644 --- a/src/pkg/p2p/preheat/job.go +++ b/src/pkg/p2p/preheat/job.go @@ -76,20 +76,34 @@ func (j *Job) Run(ctx job.Context, params job.Parameters) error { return errors.Wrap(err, "preheat job running error") } + // shouldStop checks if the job should be stopped + shouldStop := func() bool { + if cmd, ok := ctx.OPCommand(); ok && cmd == job.StopCommand { + return true + } + + return false + } + // Parse parameters, ignore errors as they have been validated already p, _ := parseParamProvider(params) pi, _ := parseParamImage(params) // Print related info to log first myLogger.Infof( - "Preheating image '%s:%s' to the target preheat provider: %s %s:%s\n", + "Preheating image '%s:%s@%s' to the target preheat provider: %s %s:%s\n", pi.ImageName, pi.Tag, + pi.Digest, p.Vendor, p.Name, p.Endpoint, ) + if shouldStop() { + return nil + } + // Get driver factory for the given provider fac, ok := pr.GetProvider(p.Vendor) if !ok { @@ -119,6 +133,10 @@ func (j *Job) Run(ctx job.Context, params job.Parameters) error { myLogger.Infof("Check health of preheat provider instance: %s", pr.DriverStatusHealthy) + if shouldStop() { + return nil + } + // Then send the preheat requests to the target provider. st, err := d.Preheat(pi) if err != nil { @@ -145,6 +163,10 @@ func (j *Job) Run(ctx job.Context, params job.Parameters) error { return preheatJobRunningError(err) } + if shouldStop() { + return nil + } + myLogger.Info("Start to loop check the preheating status until it's success or timeout(30m)") // If process is not completed, loop check the status until it's ready. tk := time.NewTicker(checkInterval) @@ -167,6 +189,10 @@ func (j *Job) Run(ctx job.Context, params job.Parameters) error { if s.Status == provider.PreheatingStatusSuccess { return nil } + + if shouldStop() { + return nil + } case <-tm.C: return preheatJobRunningError(errors.Errorf("status check timeout: %v", checkTimeout)) } diff --git a/src/pkg/p2p/preheat/job_test.go b/src/pkg/p2p/preheat/job_test.go index 393329239..e7f56eda9 100644 --- a/src/pkg/p2p/preheat/job_test.go +++ b/src/pkg/p2p/preheat/job_test.go @@ -65,6 +65,7 @@ func (suite *JobTestSuite) SetupSuite() { ctx := &jobservice.MockJobContext{} logger := &jobservice.MockJobLogger{} ctx.On("GetLogger").Return(logger) + ctx.On("OPCommand").Return(job.StopCommand, false) suite.context = ctx }