Merge pull request #12612 from steven-zou/fix/preheat_job_stopped

fix(preheat):enhance preheat job
This commit is contained in:
Steven Zou 2020-07-28 22:55:51 +08:00 committed by GitHub
commit 86bf6df0d2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 29 additions and 1 deletions

View File

@ -434,6 +434,7 @@ func (de *defaultEnforcer) startTask(ctx context.Context, executionID int64, can
},
ImageName: fmt.Sprintf("%s/%s", candidate.Namespace, candidate.Repository),
Tag: candidate.Tags[0],
Digest: candidate.Digest,
}
piData, err := pi.ToJSON()

View File

@ -76,20 +76,34 @@ func (j *Job) Run(ctx job.Context, params job.Parameters) error {
return errors.Wrap(err, "preheat job running error")
}
// shouldStop checks if the job should be stopped
shouldStop := func() bool {
if cmd, ok := ctx.OPCommand(); ok && cmd == job.StopCommand {
return true
}
return false
}
// Parse parameters, ignore errors as they have been validated already
p, _ := parseParamProvider(params)
pi, _ := parseParamImage(params)
// Print related info to log first
myLogger.Infof(
"Preheating image '%s:%s' to the target preheat provider: %s %s:%s\n",
"Preheating image '%s:%s@%s' to the target preheat provider: %s %s:%s\n",
pi.ImageName,
pi.Tag,
pi.Digest,
p.Vendor,
p.Name,
p.Endpoint,
)
if shouldStop() {
return nil
}
// Get driver factory for the given provider
fac, ok := pr.GetProvider(p.Vendor)
if !ok {
@ -119,6 +133,10 @@ func (j *Job) Run(ctx job.Context, params job.Parameters) error {
myLogger.Infof("Check health of preheat provider instance: %s", pr.DriverStatusHealthy)
if shouldStop() {
return nil
}
// Then send the preheat requests to the target provider.
st, err := d.Preheat(pi)
if err != nil {
@ -145,6 +163,10 @@ func (j *Job) Run(ctx job.Context, params job.Parameters) error {
return preheatJobRunningError(err)
}
if shouldStop() {
return nil
}
myLogger.Info("Start to loop check the preheating status until it's success or timeout(30m)")
// If process is not completed, loop check the status until it's ready.
tk := time.NewTicker(checkInterval)
@ -167,6 +189,10 @@ func (j *Job) Run(ctx job.Context, params job.Parameters) error {
if s.Status == provider.PreheatingStatusSuccess {
return nil
}
if shouldStop() {
return nil
}
case <-tm.C:
return preheatJobRunningError(errors.Errorf("status check timeout: %v", checkTimeout))
}

View File

@ -65,6 +65,7 @@ func (suite *JobTestSuite) SetupSuite() {
ctx := &jobservice.MockJobContext{}
logger := &jobservice.MockJobLogger{}
ctx.On("GetLogger").Return(logger)
ctx.On("OPCommand").Return(job.StopCommand, false)
suite.context = ctx
}