fix: wrap schedule/exec/task creation as orm tx (#18458)

Wrap orm tx when the scheduler try to create the task because submit job
maybe failure depends on the jobservice.

Fixes: #18452

Signed-off-by: chlins <chenyuzh@vmware.com>
This commit is contained in:
Chlins Zhang 2023-04-04 08:42:18 +08:00 committed by GitHub
parent 02c51c6b70
commit a7cef5e24f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 78 additions and 46 deletions

View File

@ -5,7 +5,6 @@ import (
"time"
"github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/jobservice/logger"
"github.com/goharbor/harbor/src/lib/log"
"github.com/goharbor/harbor/src/lib/orm"
"github.com/goharbor/harbor/src/lib/q"
@ -63,7 +62,7 @@ func (c *controller) Start(ctx context.Context, async bool, trigger string) erro
return err
}
logger.Info("Created job for scan data export successfully")
log.Info("Created job for scan data export successfully")
return nil
}
go func(ctx context.Context) {
@ -77,7 +76,7 @@ func (c *controller) Start(ctx context.Context, async bool, trigger string) erro
}
err = c.createCleanupTask(ctx, jobParams, execID)
if err != nil {
logger.Errorf("Encountered error in scan data artifact cleanup : %v", err)
log.Errorf("Encountered error in scan data artifact cleanup : %v", err)
return
}
}(c.makeCtx())
@ -97,7 +96,7 @@ func (c *controller) createCleanupTask(ctx context.Context, jobParams job.Parame
_, err := c.taskMgr.Create(ctx, execID, j)
if err != nil {
logger.Errorf("Unable to create a scan data export job in clean-up mode : %v", err)
log.Errorf("Unable to create a scan data export job in clean-up mode : %v", err)
c.markError(ctx, execID, err)
return err
}
@ -107,44 +106,45 @@ func (c *controller) createCleanupTask(ctx context.Context, jobParams job.Parame
func (c *controller) markError(ctx context.Context, executionID int64, err error) {
// try to stop the execution first in case that some tasks are already created
if err := c.execMgr.StopAndWait(ctx, executionID, 10*time.Second); err != nil {
logger.Errorf("failed to stop the execution %d: %v", executionID, err)
log.Errorf("failed to stop the execution %d: %v", executionID, err)
}
if err := c.execMgr.MarkError(ctx, executionID, err.Error()); err != nil {
logger.Errorf("failed to mark error for the execution %d: %v", executionID, err)
log.Errorf("failed to mark error for the execution %d: %v", executionID, err)
}
}
// ScheduleCleanupTask schedules a system artifact cleanup task
func ScheduleCleanupTask(ctx context.Context) {
scheduleSystemArtifactCleanJob(ctx)
func ScheduleCleanupTask(ctx context.Context) error {
return scheduleSystemArtifactCleanJob(ctx)
}
func scheduleSystemArtifactCleanJob(ctx context.Context) {
func scheduleSystemArtifactCleanJob(ctx context.Context) error {
schedule, err := getSystemArtifactCleanupSchedule(ctx)
if err != nil {
return
return err
}
if schedule != nil {
logger.Debugf(" Export data cleanup job already scheduled with ID : %v.", schedule.ID)
return
log.Debugf("Export data cleanup job already scheduled with ID : %v.", schedule.ID)
return nil
}
scheduleID, err := sched.Schedule(ctx, job.SystemArtifactCleanupVendorType, 0, cronTypeDaily, cronSpec, SystemArtifactCleanupCallback, nil, nil)
if err != nil {
log.Errorf("Encountered error when scheduling scan data export cleanup job : %v", err)
return
return err
}
log.Infof("Scheduled scan data export cleanup job with ID : %v", scheduleID)
return nil
}
func getSystemArtifactCleanupSchedule(ctx context.Context) (*scheduler.Schedule, error) {
query := q.New(map[string]interface{}{"vendor_type": job.SystemArtifactCleanupVendorType})
schedules, err := sched.ListSchedules(ctx, query)
if err != nil {
logger.Errorf("Unable to check if export data cleanup job is already scheduled : %v", err)
log.Errorf("Unable to check if export data cleanup job is already scheduled : %v", err)
return nil, err
}
if len(schedules) > 0 {
logger.Infof("Found export data cleanup job with schedule id : %v", schedules[0].ID)
log.Debugf("Found export data cleanup job with schedule id : %v", schedules[0].ID)
return schedules[0], nil
}
return nil, nil

View File

@ -259,11 +259,27 @@ func main() {
log.Errorf("failed to check the jobservice health status: timeout, error: %v", err)
return
}
// schedule the system jobs with retry as the operation depends on the jobservice,
// retry to handle the failure case caused by jobservice.
ctx := orm.Context()
options = []retry.Option{
retry.InitialInterval(time.Millisecond * 500),
retry.MaxInterval(time.Second * 10),
retry.Timeout(time.Minute * 5),
retry.Callback(func(err error, sleep time.Duration) {
log.Debugf("failed to schedule system job, retry after %s : %v", sleep, err)
}),
}
// schedule system artifact cleanup job
systemartifact.ScheduleCleanupTask(ctx)
if err := retry.Retry(func() error {
return systemartifact.ScheduleCleanupTask(ctx)
}, options...); err != nil {
log.Errorf("failed to schedule system artifact cleanup job, error: %v", err)
}
// schedule system execution sweep job
if err := task.ScheduleSweepJob(ctx); err != nil {
if err := retry.Retry(func() error {
return task.ScheduleSweepJob(ctx)
}, options...); err != nil {
log.Errorf("failed to schedule system execution sweep job, error: %v", err)
}
}()

View File

@ -24,6 +24,7 @@ import (
"github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/lib/errors"
"github.com/goharbor/harbor/src/lib/log"
"github.com/goharbor/harbor/src/lib/orm"
"github.com/goharbor/harbor/src/lib/q"
"github.com/goharbor/harbor/src/pkg/task"
)
@ -133,29 +134,40 @@ func (s *scheduler) Schedule(ctx context.Context, vendorType string, vendorID in
}
sched.ExtraAttrs = string(extrasData)
// create schedule record
// when checkin hook comes, the database record must exist,
// so the database record must be created first before submitting job
id, err := s.dao.Create(ctx, sched)
if err != nil {
var scheduleID, taskID int64
// ensureTask makes sure the task has been created at the end
ensureTask := func(ctx context.Context) error {
// create schedule record
// when checkin hook comes, the database record must exist,
// so the database record must be created first before submitting job
scheduleID, err = s.dao.Create(ctx, sched)
if err != nil {
return err
}
// create execution by schedule id
execID, err := s.execMgr.Create(ctx, JobNameScheduler, scheduleID, task.ExecutionTriggerManual, params)
if err != nil {
return err
}
// create task by execution id, maybe failed if error to submit job to jobservice,
// so wrap these 3 actions as a transaction.
taskID, err = s.taskMgr.Create(ctx, execID, &task.Job{
Name: JobNameScheduler,
Metadata: &job.Metadata{
JobKind: job.KindPeriodic,
Cron: cron,
},
})
if err != nil {
return err
}
return nil
}
if err = orm.WithTransaction(ensureTask)(orm.SetTransactionOpNameToContext(ctx, "tx-ensure-schedule-task")); err != nil {
return 0, err
}
execID, err := s.execMgr.Create(ctx, JobNameScheduler, id, task.ExecutionTriggerManual, params)
if err != nil {
return 0, err
}
taskID, err := s.taskMgr.Create(ctx, execID, &task.Job{
Name: JobNameScheduler,
Metadata: &job.Metadata{
JobKind: job.KindPeriodic,
Cron: cron,
},
})
if err != nil {
return 0, err
}
// make sure the created task is stopped if got any error in the following steps
defer func() {
if err == nil {
@ -179,7 +191,7 @@ func (s *scheduler) Schedule(ctx context.Context, vendorType string, vendorID in
return 0, err
}
return id, nil
return scheduleID, nil
}
func (s *scheduler) UnScheduleByID(ctx context.Context, id int64) error {

View File

@ -22,13 +22,16 @@ import (
"github.com/stretchr/testify/suite"
"github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/lib/orm"
"github.com/goharbor/harbor/src/pkg/task"
ormtesting "github.com/goharbor/harbor/src/testing/lib/orm"
"github.com/goharbor/harbor/src/testing/mock"
tasktesting "github.com/goharbor/harbor/src/testing/pkg/task"
)
type schedulerTestSuite struct {
suite.Suite
ctx context.Context
scheduler *scheduler
dao *mockDAO
execMgr *tasktesting.ExecutionManager
@ -40,6 +43,7 @@ func (s *schedulerTestSuite) SetupTest() {
err := RegisterCallbackFunc("callback", func(context.Context, string) error { return nil })
s.Require().Nil(err)
s.ctx = orm.NewContext(nil, &ormtesting.FakeOrmer{})
s.dao = &mockDAO{}
s.execMgr = &tasktesting.ExecutionManager{}
s.taskMgr = &tasktesting.Manager{}
@ -54,15 +58,15 @@ func (s *schedulerTestSuite) SetupTest() {
func (s *schedulerTestSuite) TestSchedule() {
// empty vendor type
extras := make(map[string]interface{})
id, err := s.scheduler.Schedule(nil, "", 0, "", "0 * * * * *", "callback", nil, extras)
id, err := s.scheduler.Schedule(s.ctx, "", 0, "", "0 * * * * *", "callback", nil, extras)
s.NotNil(err)
// invalid cron
id, err = s.scheduler.Schedule(nil, "vendor", 1, "", "", "callback", nil, extras)
id, err = s.scheduler.Schedule(s.ctx, "vendor", 1, "", "", "callback", nil, extras)
s.NotNil(err)
// callback function not exist
id, err = s.scheduler.Schedule(nil, "vendor", 1, "", "0 * * * * *", "not-exist", nil, extras)
id, err = s.scheduler.Schedule(s.ctx, "vendor", 1, "", "0 * * * * *", "not-exist", nil, extras)
s.NotNil(err)
// failed to submit to jobservice
@ -75,7 +79,7 @@ func (s *schedulerTestSuite) TestSchedule() {
Status: job.ErrorStatus.String(),
}, nil)
s.taskMgr.On("Stop", mock.Anything, mock.Anything).Return(nil)
_, err = s.scheduler.Schedule(nil, "vendor", 1, "", "0 * * * * *", "callback", "param", extras)
_, err = s.scheduler.Schedule(s.ctx, "vendor", 1, "", "0 * * * * *", "callback", "param", extras)
s.Require().NotNil(err)
s.dao.AssertExpectations(s.T())
s.execMgr.AssertExpectations(s.T())
@ -93,7 +97,7 @@ func (s *schedulerTestSuite) TestSchedule() {
ExecutionID: 1,
Status: job.SuccessStatus.String(),
}, nil)
id, err = s.scheduler.Schedule(nil, "vendor", 1, "", "0 * * * * *", "callback", "param", extras)
id, err = s.scheduler.Schedule(s.ctx, "vendor", 1, "", "0 * * * * *", "callback", "param", extras)
s.Require().Nil(err)
s.Equal(int64(1), id)
s.dao.AssertExpectations(s.T())
@ -161,7 +165,7 @@ func (s *schedulerTestSuite) TestGetSchedule() {
CRON: "0 * * * * *",
}, nil)
s.execMgr.On("List", mock.Anything, mock.Anything).Return(nil, nil)
schd, err := s.scheduler.GetSchedule(nil, 1)
schd, err := s.scheduler.GetSchedule(s.ctx, 1)
s.Require().Nil(err)
s.Equal("0 * * * * *", schd.CRON)
s.Equal(job.ErrorStatus.String(), schd.Status)
@ -184,7 +188,7 @@ func (s *schedulerTestSuite) TestGetSchedule() {
Status: job.SuccessStatus.String(),
},
}, nil)
schd, err = s.scheduler.GetSchedule(nil, 1)
schd, err = s.scheduler.GetSchedule(s.ctx, 1)
s.Require().Nil(err)
s.Equal("0 * * * * *", schd.CRON)
s.Equal(job.SuccessStatus.String(), schd.Status)