diff --git a/make/photon/prepare/templates/jobservice/config.yml.jinja b/make/photon/prepare/templates/jobservice/config.yml.jinja index bf30f99374..14dbddf69a 100644 --- a/make/photon/prepare/templates/jobservice/config.yml.jinja +++ b/make/photon/prepare/templates/jobservice/config.yml.jinja @@ -52,5 +52,11 @@ metric: port: {{ metric.port }} {% endif %} +reaper: + # the max time to wait for a task to finish, if unfinished after max_update_hours, the task will be mark as error, but the task will continue to run, default value is 24, + max_update_hours: 24 + # the max time for execution in running state without new task created + max_dangling_hours: 168 + # the max size of job log returned by API, default is 10M max_retrieve_size_mb: 10 \ No newline at end of file diff --git a/src/jobservice/config/config.go b/src/jobservice/config/config.go index 7fc7f29fed..2fc974a679 100644 --- a/src/jobservice/config/config.go +++ b/src/jobservice/config/config.go @@ -22,6 +22,7 @@ import ( "os" "strconv" "strings" + "time" "gopkg.in/yaml.v2" @@ -83,6 +84,9 @@ type Configuration struct { // Metric configurations Metric *MetricConfig `yaml:"metric,omitempty"` + // Reaper configurations + ReaperConfig *ReaperConfig `yaml:"reaper,omitempty"` + // MaxLogSizeReturnedMB is the max size of log returned by job log API MaxLogSizeReturnedMB int `yaml:"max_retrieve_size_mb,omitempty"` } @@ -135,6 +139,11 @@ type LoggerConfig struct { Sweeper *LogSweeperConfig `yaml:"sweeper"` } +type ReaperConfig struct { + MaxUpdateHour int `yaml:"max_update_hours"` + MaxDanglingHour int `yaml:"max_dangling_hours"` +} + // Load the configuration options from the specified yaml file. // If the yaml file is specified and existing, load configurations from yaml file first; // If detecting env variables is specified, load configurations from env variables; @@ -348,3 +357,19 @@ func (c *Configuration) validate() error { return nil // valid } + +// MaxUpdateDuration the max time for an execution can be updated by task +func MaxUpdateDuration() time.Duration { + if DefaultConfig != nil && DefaultConfig.ReaperConfig != nil && DefaultConfig.ReaperConfig.MaxUpdateHour > 24 { + return time.Duration(DefaultConfig.ReaperConfig.MaxUpdateHour) * time.Hour + } + return 24 * time.Hour +} + +// MaxDanglingHour the max time for an execution can be dangling state +func MaxDanglingHour() int { + if DefaultConfig != nil && DefaultConfig.ReaperConfig != nil && DefaultConfig.ReaperConfig.MaxDanglingHour > 24*7 { + return DefaultConfig.ReaperConfig.MaxDanglingHour + } + return 24 * 7 +} diff --git a/src/jobservice/config/config_test.go b/src/jobservice/config/config_test.go index df9b41bd3b..1d239c31d1 100644 --- a/src/jobservice/config/config_test.go +++ b/src/jobservice/config/config_test.go @@ -15,6 +15,7 @@ package config import ( "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -86,6 +87,12 @@ func (suite *ConfigurationTestSuite) TestDefaultConfig() { err := DefaultConfig.Load("../config_test.yml", true) require.Nil(suite.T(), err, "load config from yaml file, expect nil error but got error '%s'", err) + maxUpdateHour := MaxUpdateDuration() + require.Equal(suite.T(), 24*time.Hour, maxUpdateHour, "expect max update hour to be 24 but got %d", maxUpdateHour) + + maxDangling := MaxDanglingHour() + require.Equal(suite.T(), 168, maxDangling, "expect max dangling time to be 24 but got %d", maxDangling) + assert.Equal(suite.T(), 10, DefaultConfig.MaxLogSizeReturnedMB, "expect max log size returned 10MB but got %d", DefaultConfig.MaxLogSizeReturnedMB) redisURL := DefaultConfig.PoolConfig.RedisPoolCfg.RedisURL assert.Equal(suite.T(), "redis://localhost:6379", redisURL, "expect redisURL '%s' but got '%s'", "redis://localhost:6379", redisURL) diff --git a/src/jobservice/config_test.yml b/src/jobservice/config_test.yml index c8d7bb45af..c4f74fb652 100644 --- a/src/jobservice/config_test.yml +++ b/src/jobservice/config_test.yml @@ -40,4 +40,9 @@ loggers: - name: "STD_OUTPUT" # Same with above level: "DEBUG" +reaper: + # the max time to wait for the job to finish, if not finished, the job will be mark as error, default value is 24 + max_update_hours: 24 + max_dangling_hours: 168 + max_retrieve_size_mb: 10 \ No newline at end of file diff --git a/src/jobservice/worker/cworker/reaper.go b/src/jobservice/worker/cworker/reaper.go index 5c84d1b572..688df83cb4 100644 --- a/src/jobservice/worker/cworker/reaper.go +++ b/src/jobservice/worker/cworker/reaper.go @@ -23,6 +23,7 @@ import ( "github.com/gomodule/redigo/redis" "github.com/goharbor/harbor/src/jobservice/common/rds" + "github.com/goharbor/harbor/src/jobservice/config" "github.com/goharbor/harbor/src/jobservice/errs" "github.com/goharbor/harbor/src/jobservice/job" "github.com/goharbor/harbor/src/jobservice/lcm" @@ -31,7 +32,6 @@ import ( ) const ( - maxUpdateDuration = 24 * time.Hour reapLoopInterval = 1 * time.Hour initialReapLoopInterval = 5 * time.Minute ) @@ -171,7 +171,7 @@ func (r *reaper) syncOutdatedStats() error { } } else { // Ongoing, check the update timestamp to make sure it is not hung - if time.Unix(t.Job().Info.UpdateTime, 0).Add(maxUpdateDuration).Before(time.Now()) { + if time.Unix(t.Job().Info.UpdateTime, 0).Add(config.MaxUpdateDuration()).Before(time.Now()) { // Status hung // Mark job status to error state if err = t.Fail(); err != nil { diff --git a/src/pkg/task/mock_sweep_manager_test.go b/src/pkg/task/mock_sweep_manager_test.go index e3c84850d7..abafd57cb1 100644 --- a/src/pkg/task/mock_sweep_manager_test.go +++ b/src/pkg/task/mock_sweep_manager_test.go @@ -27,6 +27,20 @@ func (_m *mockSweepManager) Clean(ctx context.Context, execID []int64) error { return r0 } +// FixDanglingStateExecution provides a mock function with given fields: ctx +func (_m *mockSweepManager) FixDanglingStateExecution(ctx context.Context) error { + ret := _m.Called(ctx) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context) error); ok { + r0 = rf(ctx) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // ListCandidates provides a mock function with given fields: ctx, vendorType, retainCnt func (_m *mockSweepManager) ListCandidates(ctx context.Context, vendorType string, retainCnt int64) ([]int64, error) { ret := _m.Called(ctx, vendorType, retainCnt) diff --git a/src/pkg/task/sweep_job.go b/src/pkg/task/sweep_job.go index 9bf3a1b949..78f3573146 100644 --- a/src/pkg/task/sweep_job.go +++ b/src/pkg/task/sweep_job.go @@ -90,6 +90,10 @@ func (sj *SweepJob) Run(ctx job.Context, params job.Parameters) error { sj.logger.Info("start to run sweep job") + if err := sj.mgr.FixDanglingStateExecution(ctx.SystemContext()); err != nil { + sj.logger.Errorf("failed to fix dangling state executions, error: %v", err) + } + var errs errors.Errors for vendor, cnt := range sj.execRetainCountsMap { if sj.shouldStop(ctx) { diff --git a/src/pkg/task/sweep_job_test.go b/src/pkg/task/sweep_job_test.go index 1f471a3213..47fd5f97b6 100644 --- a/src/pkg/task/sweep_job_test.go +++ b/src/pkg/task/sweep_job_test.go @@ -18,9 +18,10 @@ import ( "context" "testing" + "github.com/stretchr/testify/suite" + "github.com/goharbor/harbor/src/jobservice/job" mockjobservice "github.com/goharbor/harbor/src/testing/jobservice" - "github.com/stretchr/testify/suite" ) type sweepJobTestSuite struct { @@ -46,8 +47,9 @@ func (suite *sweepJobTestSuite) TestRun() { }, } // test stop case - j := &SweepJob{} + j := &SweepJob{mgr: suite.sweepMgr} suite.jobCtx.On("OPCommand").Return(job.StopCommand, true).Once() + suite.sweepMgr.On("FixDanglingStateExecution", context.TODO()).Return(nil) err := j.Run(suite.jobCtx, params) suite.NoError(err, "stop job should not return error") @@ -66,6 +68,7 @@ func (suite *sweepJobTestSuite) TestRun() { suite.sweepMgr.On("ListCandidates", ctx, "REPLICATION", int64(20)).Return([]int64{2}, nil) suite.sweepMgr.On("Clean", ctx, []int64{1}).Return(nil) suite.sweepMgr.On("Clean", ctx, []int64{2}).Return(nil) + suite.sweepMgr.On("FixDanglingStateExecution", ctx).Return(nil) err = j.Run(suite.jobCtx, params) suite.NoError(err) } diff --git a/src/pkg/task/sweep_manager.go b/src/pkg/task/sweep_manager.go index f697b6b2d9..135b63d060 100644 --- a/src/pkg/task/sweep_manager.go +++ b/src/pkg/task/sweep_manager.go @@ -19,6 +19,7 @@ import ( "fmt" "time" + "github.com/goharbor/harbor/src/jobservice/config" "github.com/goharbor/harbor/src/jobservice/job" "github.com/goharbor/harbor/src/lib/errors" "github.com/goharbor/harbor/src/lib/orm" @@ -40,6 +41,8 @@ type SweepManager interface { ListCandidates(ctx context.Context, vendorType string, retainCnt int64) (execIDs []int64, err error) // Clean deletes the tasks belonging to the execution which in final status and deletes executions. Clean(ctx context.Context, execID []int64) (err error) + // FixDanglingStateExecution fixes the dangling state execution. + FixDanglingStateExecution(ctx context.Context) error } // sweepManager implements the interface SweepManager. @@ -182,6 +185,32 @@ func (sm *sweepManager) Clean(ctx context.Context, execIDs []int64) error { return nil } +// FixDanglingStateExecution update executions always running +func (sm *sweepManager) FixDanglingStateExecution(ctx context.Context) error { + ormer, err := orm.FromContext(ctx) + if err != nil { + return err + } + + sql := `UPDATE execution + SET status = + CASE + WHEN EXISTS (SELECT 1 FROM task WHERE execution_id = execution.id AND status = 'Error') THEN 'Error' + WHEN EXISTS (SELECT 1 FROM task WHERE execution_id = execution.id AND status = 'Stopped') THEN 'Stopped' + ELSE 'Success' + END +WHERE status = 'Running' + AND start_time < now() - INTERVAL ? + AND NOT EXISTS (SELECT 1 FROM task WHERE execution_id = execution.id AND status = 'Running')` + + intervalStr := fmt.Sprintf("%d hour", config.MaxDanglingHour()) + _, err = ormer.Raw(sql, intervalStr).Exec() + if err != nil { + return errors.Wrap(err, "failed to fix dangling state execution") + } + return nil +} + func NewSweepManager() SweepManager { return &sweepManager{ execDAO: dao.NewExecutionDAO(),