diff --git a/src/jobservice/job/impl/gc/garbage_collection.go b/src/jobservice/job/impl/gc/garbage_collection.go index 96413e7c6..374cfe2ae 100644 --- a/src/jobservice/job/impl/gc/garbage_collection.go +++ b/src/jobservice/job/impl/gc/garbage_collection.go @@ -16,14 +16,15 @@ package gc import ( "fmt" + "os" + "time" + "github.com/goharbor/harbor/src/common/models" "github.com/goharbor/harbor/src/controller/artifact" "github.com/goharbor/harbor/src/controller/project" "github.com/goharbor/harbor/src/lib/q" "github.com/goharbor/harbor/src/pkg/artifactrash" "github.com/goharbor/harbor/src/pkg/blob" - "os" - "time" "github.com/garyburd/redigo/redis" "github.com/goharbor/harbor/src/common" @@ -81,6 +82,11 @@ func (gc *GarbageCollector) MaxFails() uint { return 1 } +// MaxCurrency is implementation of same method in Interface. +func (gc *GarbageCollector) MaxCurrency() uint { + return 1 +} + // ShouldRetry implements the interface in job/Interface func (gc *GarbageCollector) ShouldRetry() bool { return false diff --git a/src/jobservice/job/impl/notification/webhook_job.go b/src/jobservice/job/impl/notification/webhook_job.go index 612fad284..7c292cd69 100644 --- a/src/jobservice/job/impl/notification/webhook_job.go +++ b/src/jobservice/job/impl/notification/webhook_job.go @@ -3,12 +3,13 @@ package notification import ( "bytes" "fmt" - commonhttp "github.com/goharbor/harbor/src/common/http" - "github.com/goharbor/harbor/src/jobservice/job" - "github.com/goharbor/harbor/src/jobservice/logger" "net/http" "os" "strconv" + + commonhttp "github.com/goharbor/harbor/src/common/http" + "github.com/goharbor/harbor/src/jobservice/job" + "github.com/goharbor/harbor/src/jobservice/logger" ) // Max retry has the same meaning as max fails. @@ -36,6 +37,11 @@ func (wj *WebhookJob) MaxFails() uint { return 10 } +// MaxCurrency is implementation of same method in Interface. +func (wj *WebhookJob) MaxCurrency() uint { + return 0 +} + // ShouldRetry ... func (wj *WebhookJob) ShouldRetry() bool { return true diff --git a/src/jobservice/job/impl/replication/replication.go b/src/jobservice/job/impl/replication/replication.go index 4cdb63acf..aeb818107 100644 --- a/src/jobservice/job/impl/replication/replication.go +++ b/src/jobservice/job/impl/replication/replication.go @@ -60,6 +60,11 @@ func (r *Replication) MaxFails() uint { return 3 } +// MaxCurrency is implementation of same method in Interface. +func (r *Replication) MaxCurrency() uint { + return 0 +} + // ShouldRetry always returns true which means the job is needed to be restarted when fails func (r *Replication) ShouldRetry() bool { return true diff --git a/src/jobservice/job/impl/replication/scheduler.go b/src/jobservice/job/impl/replication/scheduler.go index 6539117af..6c15230a6 100644 --- a/src/jobservice/job/impl/replication/scheduler.go +++ b/src/jobservice/job/impl/replication/scheduler.go @@ -43,6 +43,11 @@ func (s *Scheduler) MaxFails() uint { return 0 } +// MaxCurrency is implementation of same method in Interface. +func (s *Scheduler) MaxCurrency() uint { + return 0 +} + // Validate .... func (s *Scheduler) Validate(params job.Parameters) error { return nil diff --git a/src/jobservice/job/impl/sample/job.go b/src/jobservice/job/impl/sample/job.go index e4c32cb04..afb72d990 100644 --- a/src/jobservice/job/impl/sample/job.go +++ b/src/jobservice/job/impl/sample/job.go @@ -32,6 +32,11 @@ func (j *Job) MaxFails() uint { return 3 } +// MaxCurrency is implementation of same method in Interface. +func (j *Job) MaxCurrency() uint { + return 1 +} + // ShouldRetry ... func (j *Job) ShouldRetry() bool { return true diff --git a/src/jobservice/job/interface.go b/src/jobservice/job/interface.go index 40683a553..6ea1bdad4 100644 --- a/src/jobservice/job/interface.go +++ b/src/jobservice/job/interface.go @@ -22,6 +22,12 @@ type Interface interface { // uint: the failure count allowed. If it is set to 0, then default value 4 is used. MaxFails() uint + // Max currency of the job. Unlike the WorkerPool concurrency, it controls the limit on the number jobs of that type + // that can be active at one time by within a single redis instance. + // + // The default value is 0, which means "no limit on job concurrency". + MaxCurrency() uint + // Tell the worker worker if retry the failed job when the fails is // still less that the number declared by the method 'MaxFails'. // diff --git a/src/jobservice/runner/redis_test.go b/src/jobservice/runner/redis_test.go index d64a448a7..b3a5fb1cd 100644 --- a/src/jobservice/runner/redis_test.go +++ b/src/jobservice/runner/redis_test.go @@ -196,6 +196,10 @@ func (j *fakeParentJob) MaxFails() uint { return 1 } +func (j *fakeParentJob) MaxCurrency() uint { + return 0 +} + func (j *fakeParentJob) ShouldRetry() bool { return false } @@ -217,6 +221,10 @@ func (j *fakePanicJob) MaxFails() uint { return 1 } +func (j *fakePanicJob) MaxCurrency() uint { + return 0 +} + func (j *fakePanicJob) ShouldRetry() bool { return false } diff --git a/src/jobservice/worker/cworker/c_worker.go b/src/jobservice/worker/cworker/c_worker.go index bc58a0222..01d7bf4ae 100644 --- a/src/jobservice/worker/cworker/c_worker.go +++ b/src/jobservice/worker/cworker/c_worker.go @@ -410,8 +410,9 @@ func (w *basicWorker) registerJob(name string, j interface{}) (err error) { w.pool.JobWithOptions( name, work.JobOptions{ - MaxFails: theJ.MaxFails(), - SkipDead: true, + MaxFails: theJ.MaxFails(), + MaxConcurrency: theJ.MaxCurrency(), + SkipDead: true, }, // Use generic handler to handle as we do not accept context with this way. func(job *work.Job) error { diff --git a/src/jobservice/worker/cworker/c_worker_test.go b/src/jobservice/worker/cworker/c_worker_test.go index 7d81cb280..584315706 100644 --- a/src/jobservice/worker/cworker/c_worker_test.go +++ b/src/jobservice/worker/cworker/c_worker_test.go @@ -226,6 +226,10 @@ func (j *fakeJob) MaxFails() uint { return 3 } +func (j *fakeJob) MaxCurrency() uint { + return 0 +} + func (j *fakeJob) ShouldRetry() bool { return false } @@ -253,6 +257,10 @@ func (j *fakeLongRunJob) MaxFails() uint { return 3 } +func (j *fakeLongRunJob) MaxCurrency() uint { + return 0 +} + func (j *fakeLongRunJob) ShouldRetry() bool { return false } diff --git a/src/pkg/retention/job.go b/src/pkg/retention/job.go index 3d18c62f0..417f0cd8c 100644 --- a/src/pkg/retention/job.go +++ b/src/pkg/retention/job.go @@ -18,10 +18,11 @@ import ( "bytes" "encoding/json" "fmt" - "github.com/goharbor/harbor/src/lib/selector" "strings" "time" + "github.com/goharbor/harbor/src/lib/selector" + "github.com/goharbor/harbor/src/jobservice/job" "github.com/goharbor/harbor/src/jobservice/logger" "github.com/goharbor/harbor/src/pkg/retention/dep" @@ -46,6 +47,11 @@ func (pj *Job) MaxFails() uint { return 3 } +// MaxCurrency is implementation of same method in Interface. +func (pj *Job) MaxCurrency() uint { + return 0 +} + // ShouldRetry indicates job can be retried if failed func (pj *Job) ShouldRetry() bool { return true diff --git a/src/pkg/scan/all/job.go b/src/pkg/scan/all/job.go index 68912be7d..6341a3e1f 100644 --- a/src/pkg/scan/all/job.go +++ b/src/pkg/scan/all/job.go @@ -35,6 +35,11 @@ func (sa *Job) MaxFails() uint { return 1 } +// MaxCurrency is implementation of same method in Interface. +func (sa *Job) MaxCurrency() uint { + return 0 +} + // ShouldRetry implements the interface in job/Interface func (sa *Job) ShouldRetry() bool { return false diff --git a/src/pkg/scan/job.go b/src/pkg/scan/job.go index c050c0f9e..aac05ca29 100644 --- a/src/pkg/scan/job.go +++ b/src/pkg/scan/job.go @@ -95,6 +95,11 @@ func (j *Job) MaxFails() uint { return 3 } +// MaxCurrency is implementation of same method in Interface. +func (j *Job) MaxCurrency() uint { + return 0 +} + // ShouldRetry indicates if the job should be retried func (j *Job) ShouldRetry() bool { return true diff --git a/src/pkg/scheduler/periodic_job.go b/src/pkg/scheduler/periodic_job.go index f52f47af5..eefafed3b 100644 --- a/src/pkg/scheduler/periodic_job.go +++ b/src/pkg/scheduler/periodic_job.go @@ -34,6 +34,11 @@ func (pj *PeriodicJob) MaxFails() uint { return 3 } +// MaxCurrency is implementation of same method in Interface. +func (pj *PeriodicJob) MaxCurrency() uint { + return 0 +} + // ShouldRetry indicates job can be retried if failed func (pj *PeriodicJob) ShouldRetry() bool { return true