Fix dangling state execution (#18272)

Add max update hour and max dangling hour setting in config.yaml
  Fixes #17611

Signed-off-by: stonezdj <daojunz@vmware.com>
This commit is contained in:
stonezdj(Daojun Zhang) 2023-03-21 18:48:14 +08:00 committed by GitHub
parent 67d3f9add8
commit 46fa91f866
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 97 additions and 4 deletions

View File

@ -52,5 +52,11 @@ metric:
port: {{ metric.port }} port: {{ metric.port }}
{% endif %} {% endif %}
reaper:
# the max time to wait for a task to finish, if unfinished after max_update_hours, the task will be mark as error, but the task will continue to run, default value is 24,
max_update_hours: 24
# the max time for execution in running state without new task created
max_dangling_hours: 168
# the max size of job log returned by API, default is 10M # the max size of job log returned by API, default is 10M
max_retrieve_size_mb: 10 max_retrieve_size_mb: 10

View File

@ -22,6 +22,7 @@ import (
"os" "os"
"strconv" "strconv"
"strings" "strings"
"time"
"gopkg.in/yaml.v2" "gopkg.in/yaml.v2"
@ -83,6 +84,9 @@ type Configuration struct {
// Metric configurations // Metric configurations
Metric *MetricConfig `yaml:"metric,omitempty"` Metric *MetricConfig `yaml:"metric,omitempty"`
// Reaper configurations
ReaperConfig *ReaperConfig `yaml:"reaper,omitempty"`
// MaxLogSizeReturnedMB is the max size of log returned by job log API // MaxLogSizeReturnedMB is the max size of log returned by job log API
MaxLogSizeReturnedMB int `yaml:"max_retrieve_size_mb,omitempty"` MaxLogSizeReturnedMB int `yaml:"max_retrieve_size_mb,omitempty"`
} }
@ -135,6 +139,11 @@ type LoggerConfig struct {
Sweeper *LogSweeperConfig `yaml:"sweeper"` Sweeper *LogSweeperConfig `yaml:"sweeper"`
} }
type ReaperConfig struct {
MaxUpdateHour int `yaml:"max_update_hours"`
MaxDanglingHour int `yaml:"max_dangling_hours"`
}
// Load the configuration options from the specified yaml file. // Load the configuration options from the specified yaml file.
// If the yaml file is specified and existing, load configurations from yaml file first; // If the yaml file is specified and existing, load configurations from yaml file first;
// If detecting env variables is specified, load configurations from env variables; // If detecting env variables is specified, load configurations from env variables;
@ -348,3 +357,19 @@ func (c *Configuration) validate() error {
return nil // valid return nil // valid
} }
// MaxUpdateDuration the max time for an execution can be updated by task
func MaxUpdateDuration() time.Duration {
if DefaultConfig != nil && DefaultConfig.ReaperConfig != nil && DefaultConfig.ReaperConfig.MaxUpdateHour > 24 {
return time.Duration(DefaultConfig.ReaperConfig.MaxUpdateHour) * time.Hour
}
return 24 * time.Hour
}
// MaxDanglingHour the max time for an execution can be dangling state
func MaxDanglingHour() int {
if DefaultConfig != nil && DefaultConfig.ReaperConfig != nil && DefaultConfig.ReaperConfig.MaxDanglingHour > 24*7 {
return DefaultConfig.ReaperConfig.MaxDanglingHour
}
return 24 * 7
}

View File

@ -15,6 +15,7 @@ package config
import ( import (
"testing" "testing"
"time"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -86,6 +87,12 @@ func (suite *ConfigurationTestSuite) TestDefaultConfig() {
err := DefaultConfig.Load("../config_test.yml", true) err := DefaultConfig.Load("../config_test.yml", true)
require.Nil(suite.T(), err, "load config from yaml file, expect nil error but got error '%s'", err) require.Nil(suite.T(), err, "load config from yaml file, expect nil error but got error '%s'", err)
maxUpdateHour := MaxUpdateDuration()
require.Equal(suite.T(), 24*time.Hour, maxUpdateHour, "expect max update hour to be 24 but got %d", maxUpdateHour)
maxDangling := MaxDanglingHour()
require.Equal(suite.T(), 168, maxDangling, "expect max dangling time to be 24 but got %d", maxDangling)
assert.Equal(suite.T(), 10, DefaultConfig.MaxLogSizeReturnedMB, "expect max log size returned 10MB but got %d", DefaultConfig.MaxLogSizeReturnedMB) assert.Equal(suite.T(), 10, DefaultConfig.MaxLogSizeReturnedMB, "expect max log size returned 10MB but got %d", DefaultConfig.MaxLogSizeReturnedMB)
redisURL := DefaultConfig.PoolConfig.RedisPoolCfg.RedisURL redisURL := DefaultConfig.PoolConfig.RedisPoolCfg.RedisURL
assert.Equal(suite.T(), "redis://localhost:6379", redisURL, "expect redisURL '%s' but got '%s'", "redis://localhost:6379", redisURL) assert.Equal(suite.T(), "redis://localhost:6379", redisURL, "expect redisURL '%s' but got '%s'", "redis://localhost:6379", redisURL)

View File

@ -40,4 +40,9 @@ loggers:
- name: "STD_OUTPUT" # Same with above - name: "STD_OUTPUT" # Same with above
level: "DEBUG" level: "DEBUG"
reaper:
# the max time to wait for the job to finish, if not finished, the job will be mark as error, default value is 24
max_update_hours: 24
max_dangling_hours: 168
max_retrieve_size_mb: 10 max_retrieve_size_mb: 10

View File

@ -23,6 +23,7 @@ import (
"github.com/gomodule/redigo/redis" "github.com/gomodule/redigo/redis"
"github.com/goharbor/harbor/src/jobservice/common/rds" "github.com/goharbor/harbor/src/jobservice/common/rds"
"github.com/goharbor/harbor/src/jobservice/config"
"github.com/goharbor/harbor/src/jobservice/errs" "github.com/goharbor/harbor/src/jobservice/errs"
"github.com/goharbor/harbor/src/jobservice/job" "github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/jobservice/lcm" "github.com/goharbor/harbor/src/jobservice/lcm"
@ -31,7 +32,6 @@ import (
) )
const ( const (
maxUpdateDuration = 24 * time.Hour
reapLoopInterval = 1 * time.Hour reapLoopInterval = 1 * time.Hour
initialReapLoopInterval = 5 * time.Minute initialReapLoopInterval = 5 * time.Minute
) )
@ -171,7 +171,7 @@ func (r *reaper) syncOutdatedStats() error {
} }
} else { } else {
// Ongoing, check the update timestamp to make sure it is not hung // Ongoing, check the update timestamp to make sure it is not hung
if time.Unix(t.Job().Info.UpdateTime, 0).Add(maxUpdateDuration).Before(time.Now()) { if time.Unix(t.Job().Info.UpdateTime, 0).Add(config.MaxUpdateDuration()).Before(time.Now()) {
// Status hung // Status hung
// Mark job status to error state // Mark job status to error state
if err = t.Fail(); err != nil { if err = t.Fail(); err != nil {

View File

@ -27,6 +27,20 @@ func (_m *mockSweepManager) Clean(ctx context.Context, execID []int64) error {
return r0 return r0
} }
// FixDanglingStateExecution provides a mock function with given fields: ctx
func (_m *mockSweepManager) FixDanglingStateExecution(ctx context.Context) error {
ret := _m.Called(ctx)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context) error); ok {
r0 = rf(ctx)
} else {
r0 = ret.Error(0)
}
return r0
}
// ListCandidates provides a mock function with given fields: ctx, vendorType, retainCnt // ListCandidates provides a mock function with given fields: ctx, vendorType, retainCnt
func (_m *mockSweepManager) ListCandidates(ctx context.Context, vendorType string, retainCnt int64) ([]int64, error) { func (_m *mockSweepManager) ListCandidates(ctx context.Context, vendorType string, retainCnt int64) ([]int64, error) {
ret := _m.Called(ctx, vendorType, retainCnt) ret := _m.Called(ctx, vendorType, retainCnt)

View File

@ -90,6 +90,10 @@ func (sj *SweepJob) Run(ctx job.Context, params job.Parameters) error {
sj.logger.Info("start to run sweep job") sj.logger.Info("start to run sweep job")
if err := sj.mgr.FixDanglingStateExecution(ctx.SystemContext()); err != nil {
sj.logger.Errorf("failed to fix dangling state executions, error: %v", err)
}
var errs errors.Errors var errs errors.Errors
for vendor, cnt := range sj.execRetainCountsMap { for vendor, cnt := range sj.execRetainCountsMap {
if sj.shouldStop(ctx) { if sj.shouldStop(ctx) {

View File

@ -18,9 +18,10 @@ import (
"context" "context"
"testing" "testing"
"github.com/stretchr/testify/suite"
"github.com/goharbor/harbor/src/jobservice/job" "github.com/goharbor/harbor/src/jobservice/job"
mockjobservice "github.com/goharbor/harbor/src/testing/jobservice" mockjobservice "github.com/goharbor/harbor/src/testing/jobservice"
"github.com/stretchr/testify/suite"
) )
type sweepJobTestSuite struct { type sweepJobTestSuite struct {
@ -46,8 +47,9 @@ func (suite *sweepJobTestSuite) TestRun() {
}, },
} }
// test stop case // test stop case
j := &SweepJob{} j := &SweepJob{mgr: suite.sweepMgr}
suite.jobCtx.On("OPCommand").Return(job.StopCommand, true).Once() suite.jobCtx.On("OPCommand").Return(job.StopCommand, true).Once()
suite.sweepMgr.On("FixDanglingStateExecution", context.TODO()).Return(nil)
err := j.Run(suite.jobCtx, params) err := j.Run(suite.jobCtx, params)
suite.NoError(err, "stop job should not return error") suite.NoError(err, "stop job should not return error")
@ -66,6 +68,7 @@ func (suite *sweepJobTestSuite) TestRun() {
suite.sweepMgr.On("ListCandidates", ctx, "REPLICATION", int64(20)).Return([]int64{2}, nil) suite.sweepMgr.On("ListCandidates", ctx, "REPLICATION", int64(20)).Return([]int64{2}, nil)
suite.sweepMgr.On("Clean", ctx, []int64{1}).Return(nil) suite.sweepMgr.On("Clean", ctx, []int64{1}).Return(nil)
suite.sweepMgr.On("Clean", ctx, []int64{2}).Return(nil) suite.sweepMgr.On("Clean", ctx, []int64{2}).Return(nil)
suite.sweepMgr.On("FixDanglingStateExecution", ctx).Return(nil)
err = j.Run(suite.jobCtx, params) err = j.Run(suite.jobCtx, params)
suite.NoError(err) suite.NoError(err)
} }

View File

@ -19,6 +19,7 @@ import (
"fmt" "fmt"
"time" "time"
"github.com/goharbor/harbor/src/jobservice/config"
"github.com/goharbor/harbor/src/jobservice/job" "github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/lib/errors" "github.com/goharbor/harbor/src/lib/errors"
"github.com/goharbor/harbor/src/lib/orm" "github.com/goharbor/harbor/src/lib/orm"
@ -40,6 +41,8 @@ type SweepManager interface {
ListCandidates(ctx context.Context, vendorType string, retainCnt int64) (execIDs []int64, err error) ListCandidates(ctx context.Context, vendorType string, retainCnt int64) (execIDs []int64, err error)
// Clean deletes the tasks belonging to the execution which in final status and deletes executions. // Clean deletes the tasks belonging to the execution which in final status and deletes executions.
Clean(ctx context.Context, execID []int64) (err error) Clean(ctx context.Context, execID []int64) (err error)
// FixDanglingStateExecution fixes the dangling state execution.
FixDanglingStateExecution(ctx context.Context) error
} }
// sweepManager implements the interface SweepManager. // sweepManager implements the interface SweepManager.
@ -182,6 +185,32 @@ func (sm *sweepManager) Clean(ctx context.Context, execIDs []int64) error {
return nil return nil
} }
// FixDanglingStateExecution update executions always running
func (sm *sweepManager) FixDanglingStateExecution(ctx context.Context) error {
ormer, err := orm.FromContext(ctx)
if err != nil {
return err
}
sql := `UPDATE execution
SET status =
CASE
WHEN EXISTS (SELECT 1 FROM task WHERE execution_id = execution.id AND status = 'Error') THEN 'Error'
WHEN EXISTS (SELECT 1 FROM task WHERE execution_id = execution.id AND status = 'Stopped') THEN 'Stopped'
ELSE 'Success'
END
WHERE status = 'Running'
AND start_time < now() - INTERVAL ?
AND NOT EXISTS (SELECT 1 FROM task WHERE execution_id = execution.id AND status = 'Running')`
intervalStr := fmt.Sprintf("%d hour", config.MaxDanglingHour())
_, err = ormer.Raw(sql, intervalStr).Exec()
if err != nil {
return errors.Wrap(err, "failed to fix dangling state execution")
}
return nil
}
func NewSweepManager() SweepManager { func NewSweepManager() SweepManager {
return &sweepManager{ return &sweepManager{
execDAO: dao.NewExecutionDAO(), execDAO: dao.NewExecutionDAO(),