fix(job currency):introduce max corrency of job (#11589)

- update Job interface to introdcue MaxCurrency method for declaring the max currency of the specified job
- change the downstream jobs to implement the new interface method
  - GC and sample jobs are set to 1
  - other jobs are set to 0 that means unlimited
- add max currency optiot when doing job registration
- resolve issue #11586
  - probably resolve issue #11281
  - resolve issue #11570

Signed-off-by: Steven Zou <szou@vmware.com>
This commit is contained in:
Steven Zou 2020-04-13 18:07:54 +08:00 committed by GitHub
parent 45518d5daa
commit 3ad5b2ba06
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 79 additions and 8 deletions

View File

@ -16,14 +16,15 @@ package gc
import (
"fmt"
"os"
"time"
"github.com/goharbor/harbor/src/common/models"
"github.com/goharbor/harbor/src/controller/artifact"
"github.com/goharbor/harbor/src/controller/project"
"github.com/goharbor/harbor/src/lib/q"
"github.com/goharbor/harbor/src/pkg/artifactrash"
"github.com/goharbor/harbor/src/pkg/blob"
"os"
"time"
"github.com/garyburd/redigo/redis"
"github.com/goharbor/harbor/src/common"
@ -81,6 +82,11 @@ func (gc *GarbageCollector) MaxFails() uint {
return 1
}
// MaxCurrency is implementation of same method in Interface.
func (gc *GarbageCollector) MaxCurrency() uint {
return 1
}
// ShouldRetry implements the interface in job/Interface
func (gc *GarbageCollector) ShouldRetry() bool {
return false

View File

@ -3,12 +3,13 @@ package notification
import (
"bytes"
"fmt"
commonhttp "github.com/goharbor/harbor/src/common/http"
"github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/jobservice/logger"
"net/http"
"os"
"strconv"
commonhttp "github.com/goharbor/harbor/src/common/http"
"github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/jobservice/logger"
)
// Max retry has the same meaning as max fails.
@ -36,6 +37,11 @@ func (wj *WebhookJob) MaxFails() uint {
return 10
}
// MaxCurrency is implementation of same method in Interface.
func (wj *WebhookJob) MaxCurrency() uint {
return 0
}
// ShouldRetry ...
func (wj *WebhookJob) ShouldRetry() bool {
return true

View File

@ -60,6 +60,11 @@ func (r *Replication) MaxFails() uint {
return 3
}
// MaxCurrency is implementation of same method in Interface.
func (r *Replication) MaxCurrency() uint {
return 0
}
// ShouldRetry always returns true which means the job is needed to be restarted when fails
func (r *Replication) ShouldRetry() bool {
return true

View File

@ -43,6 +43,11 @@ func (s *Scheduler) MaxFails() uint {
return 0
}
// MaxCurrency is implementation of same method in Interface.
func (s *Scheduler) MaxCurrency() uint {
return 0
}
// Validate ....
func (s *Scheduler) Validate(params job.Parameters) error {
return nil

View File

@ -32,6 +32,11 @@ func (j *Job) MaxFails() uint {
return 3
}
// MaxCurrency is implementation of same method in Interface.
func (j *Job) MaxCurrency() uint {
return 1
}
// ShouldRetry ...
func (j *Job) ShouldRetry() bool {
return true

View File

@ -22,6 +22,12 @@ type Interface interface {
// uint: the failure count allowed. If it is set to 0, then default value 4 is used.
MaxFails() uint
// Max currency of the job. Unlike the WorkerPool concurrency, it controls the limit on the number jobs of that type
// that can be active at one time by within a single redis instance.
//
// The default value is 0, which means "no limit on job concurrency".
MaxCurrency() uint
// Tell the worker worker if retry the failed job when the fails is
// still less that the number declared by the method 'MaxFails'.
//

View File

@ -196,6 +196,10 @@ func (j *fakeParentJob) MaxFails() uint {
return 1
}
func (j *fakeParentJob) MaxCurrency() uint {
return 0
}
func (j *fakeParentJob) ShouldRetry() bool {
return false
}
@ -217,6 +221,10 @@ func (j *fakePanicJob) MaxFails() uint {
return 1
}
func (j *fakePanicJob) MaxCurrency() uint {
return 0
}
func (j *fakePanicJob) ShouldRetry() bool {
return false
}

View File

@ -410,8 +410,9 @@ func (w *basicWorker) registerJob(name string, j interface{}) (err error) {
w.pool.JobWithOptions(
name,
work.JobOptions{
MaxFails: theJ.MaxFails(),
SkipDead: true,
MaxFails: theJ.MaxFails(),
MaxConcurrency: theJ.MaxCurrency(),
SkipDead: true,
},
// Use generic handler to handle as we do not accept context with this way.
func(job *work.Job) error {

View File

@ -226,6 +226,10 @@ func (j *fakeJob) MaxFails() uint {
return 3
}
func (j *fakeJob) MaxCurrency() uint {
return 0
}
func (j *fakeJob) ShouldRetry() bool {
return false
}
@ -253,6 +257,10 @@ func (j *fakeLongRunJob) MaxFails() uint {
return 3
}
func (j *fakeLongRunJob) MaxCurrency() uint {
return 0
}
func (j *fakeLongRunJob) ShouldRetry() bool {
return false
}

View File

@ -18,10 +18,11 @@ import (
"bytes"
"encoding/json"
"fmt"
"github.com/goharbor/harbor/src/lib/selector"
"strings"
"time"
"github.com/goharbor/harbor/src/lib/selector"
"github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/jobservice/logger"
"github.com/goharbor/harbor/src/pkg/retention/dep"
@ -46,6 +47,11 @@ func (pj *Job) MaxFails() uint {
return 3
}
// MaxCurrency is implementation of same method in Interface.
func (pj *Job) MaxCurrency() uint {
return 0
}
// ShouldRetry indicates job can be retried if failed
func (pj *Job) ShouldRetry() bool {
return true

View File

@ -35,6 +35,11 @@ func (sa *Job) MaxFails() uint {
return 1
}
// MaxCurrency is implementation of same method in Interface.
func (sa *Job) MaxCurrency() uint {
return 0
}
// ShouldRetry implements the interface in job/Interface
func (sa *Job) ShouldRetry() bool {
return false

View File

@ -95,6 +95,11 @@ func (j *Job) MaxFails() uint {
return 3
}
// MaxCurrency is implementation of same method in Interface.
func (j *Job) MaxCurrency() uint {
return 0
}
// ShouldRetry indicates if the job should be retried
func (j *Job) ShouldRetry() bool {
return true

View File

@ -34,6 +34,11 @@ func (pj *PeriodicJob) MaxFails() uint {
return 3
}
// MaxCurrency is implementation of same method in Interface.
func (pj *PeriodicJob) MaxCurrency() uint {
return 0
}
// ShouldRetry indicates job can be retried if failed
func (pj *PeriodicJob) ShouldRetry() bool {
return true