Merge pull request #14023 from ywk253100/210115_scheduled

Fix the legacy scheduled job issue for GC/scan all
This commit is contained in:
Steven Zou 2021-01-18 14:01:01 +08:00 committed by GitHub
commit 42559479e6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 73 additions and 107 deletions

View File

@ -312,7 +312,7 @@ BEGIN
VALUES ('GARBAGE_COLLECTION', -1,
(SELECT schd.cron_str::json->>'cron'),
'GARBAGE_COLLECTION',
(SELECT json_build_object('trigger', null, 'deleteuntagged', schd.job_parameters::json->'delete_untagged', 'dryrun', false, 'job_parameters', schd.job_parameters)),
(SELECT json_build_object('trigger', null, 'deleteuntagged', schd.job_parameters::json->'delete_untagged', 'dryrun', false, 'extra_attrs', schd.job_parameters::json)),
(SELECT schd.cron_str::json->>'type'),
(SELECT json_build_object('delete_untagged', schd.job_parameters::json->'delete_untagged')),
schd.creation_time, schd.update_time) RETURNING id INTO new_schd_id;
@ -414,7 +414,7 @@ BEGIN
LOOP
INSERT INTO schedule (vendor_type, vendor_id, cron, callback_func_name,
cron_type, creation_time, update_time)
VALUES ('IMAGE_SCAN_ALL', 0,
VALUES ('SCAN_ALL', 0,
(SELECT schd.cron_str::json->>'cron'),
'scanAll',
(SELECT schd.cron_str::json->>'type'),

View File

@ -78,7 +78,7 @@ func (c *controller) Start(ctx context.Context, policy Policy, trigger string) (
return -1, err
}
_, err = c.taskMgr.Create(ctx, execID, &task.Job{
Name: job.ImageGC,
Name: job.GarbageCollection,
Metadata: &job.Metadata{
JobKind: job.KindGeneric,
},

View File

@ -48,7 +48,10 @@ import (
// DefaultController is a default singleton scan API controller.
var DefaultController = NewController()
// const definitions
const (
VendorTypeScanAll = "SCAN_ALL"
configRegistryEndpoint = "registryEndpoint"
configCoreInternalAddr = "coreInternalAddr"
@ -62,7 +65,7 @@ const (
func init() {
// keep only the latest created 5 scan all execution records
task.SetExecutionSweeperCount(job.ImageScanAllJob, 5)
task.SetExecutionSweeperCount(VendorTypeScanAll, 5)
}
// uuidGenerator is a func template which is for generating UUID.
@ -277,7 +280,7 @@ func (bc *basicController) Scan(ctx context.Context, artifact *ar.Artifact, opti
}
func (bc *basicController) ScanAll(ctx context.Context, trigger string, async bool) (int64, error) {
executionID, err := bc.execMgr.Create(ctx, job.ImageScanAllJob, 0, trigger)
executionID, err := bc.execMgr.Create(ctx, VendorTypeScanAll, 0, trigger)
if err != nil {
return 0, err
}

View File

@ -479,7 +479,7 @@ func (suite *ControllerTestSuite) TestScanAll() {
executionID := int64(1)
suite.execMgr.On(
"Create", ctx, "IMAGE_SCAN_ALL", int64(0), "SCHEDULE",
"Create", ctx, "SCAN_ALL", int64(0), "SCHEDULE",
).Return(executionID, nil).Once()
mock.OnAnything(suite.artifactCtl, "List").Return([]*artifact.Artifact{}, nil).Once()
@ -501,7 +501,7 @@ func (suite *ControllerTestSuite) TestScanAll() {
executionID := int64(1)
suite.execMgr.On(
"Create", ctx, "IMAGE_SCAN_ALL", int64(0), "SCHEDULE",
"Create", ctx, "SCAN_ALL", int64(0), "SCHEDULE",
).Return(executionID, nil).Once()
mock.OnAnything(suite.artifactCtl, "List").Return([]*artifact.Artifact{suite.artifact}, nil).Once()

View File

@ -47,7 +47,7 @@ func init() {
}
// NOTE: the vendor type of execution for the scan job trigger by the scan all is job.ImageScanAllJob
if err := task.RegisterCheckInProcessor(job.ImageScanAllJob, scanTaskCheckInProcessor); err != nil {
if err := task.RegisterCheckInProcessor(VendorTypeScanAll, scanTaskCheckInProcessor); err != nil {
log.Fatalf("failed to register the checkin processor for the scan all job, error %v", err)
}

View File

@ -164,7 +164,7 @@ func (suite *CallbackTestSuite) TestScanAllCallback() {
{
// create execution failed
suite.execMgr.On(
"Create", context.TODO(), "IMAGE_SCAN_ALL", int64(0), "SCHEDULE",
"Create", context.TODO(), "SCAN_ALL", int64(0), "SCHEDULE",
).Return(int64(0), fmt.Errorf("failed")).Once()
suite.Error(scanAllCallback(context.TODO(), ""))
@ -174,7 +174,7 @@ func (suite *CallbackTestSuite) TestScanAllCallback() {
executionID := int64(1)
suite.execMgr.On(
"Create", context.TODO(), "IMAGE_SCAN_ALL", int64(0), "SCHEDULE",
"Create", context.TODO(), "SCAN_ALL", int64(0), "SCHEDULE",
).Return(executionID, nil).Once()
suite.execMgr.On(

View File

@ -0,0 +1,36 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package legacy
import "github.com/goharbor/harbor/src/pkg/scheduler"
// As one job implementation can only be registered with one name. Define the following three
// schedulers which are wrapper of pkg/scheduler.PeriodicJob for the legacy periodical jobs
// They can be removed after several releases
// ReplicationScheduler is the legacy scheduler for replication
type ReplicationScheduler struct {
scheduler.PeriodicJob
}
// GarbageCollectionScheduler is the legacy scheduler for garbage collection
type GarbageCollectionScheduler struct {
scheduler.PeriodicJob
}
// ScanAllScheduler is the legacy scheduler for scan all
type ScanAllScheduler struct {
scheduler.PeriodicJob
}

View File

@ -1,81 +0,0 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package replication
import (
"fmt"
"net/http"
"os"
"github.com/goharbor/harbor/src/common/api"
common_http "github.com/goharbor/harbor/src/common/http"
"github.com/goharbor/harbor/src/common/http/modifier/auth"
"github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/replication/model"
)
// Scheduler is a job running in Jobservice which can be used as
// a scheduler when submitting it as a scheduled job. It receives
// a URL and data, and post the data to the URL when it is running
type Scheduler struct {
ctx job.Context
}
// ShouldRetry ...
func (s *Scheduler) ShouldRetry() bool {
return false
}
// MaxFails ...
func (s *Scheduler) MaxFails() uint {
return 0
}
// MaxCurrency is implementation of same method in Interface.
func (s *Scheduler) MaxCurrency() uint {
return 0
}
// Validate ....
func (s *Scheduler) Validate(params job.Parameters) error {
return nil
}
// Run ...
func (s *Scheduler) Run(ctx job.Context, params job.Parameters) error {
cmd, exist := ctx.OPCommand()
if exist && cmd == job.StopCommand {
return nil
}
logger := ctx.GetLogger()
url := params["url"].(string)
url = fmt.Sprintf("%s/api/%s/replication/executions?trigger=%s", url, api.APIVersion, model.TriggerTypeScheduled)
policyID := (int64)(params["policy_id"].(float64))
cred := auth.NewSecretAuthorizer(os.Getenv("JOBSERVICE_SECRET"))
client := common_http.NewClient(&http.Client{
Transport: common_http.GetHTTPTransport(common_http.SecureTransport),
}, cred)
if err := client.Post(url, struct {
PolicyID int64 `json:"policy_id"`
}{
PolicyID: policyID,
}); err != nil {
logger.Errorf("failed to run the schedule job: %v", err)
return err
}
logger.Info("the schedule job finished")
return nil
}

View File

@ -22,16 +22,12 @@ const (
// ImageScanJob is name of scan job it will be used as key to register to job service.
ImageScanJob = "IMAGE_SCAN"
// ImageScanAllJob is the name of "scanall" job in job service
ImageScanAllJob = "IMAGE_SCAN_ALL"
// ImageGC the name of image garbage collection job in job service
ImageGC = "IMAGE_GC"
// GarbageCollection job name
GarbageCollection = "GARBAGE_COLLECTION"
// ImageGCReadOnly the name of image garbage collection read only job in job service
ImageGCReadOnly = "IMAGE_GC_READ_ONLY"
// Replication : the name of the replication job in job service
Replication = "REPLICATION"
// ReplicationScheduler : the name of the replication scheduler job in job service
ReplicationScheduler = "IMAGE_REPLICATE"
// WebhookJob : the name of the webhook job in job service
WebhookJob = "WEBHOOK"
// SlackJob : the name of the slack job in job service

View File

@ -75,7 +75,7 @@ func (suite *ManagerTestSuite) SetupTest() {
"id",
id,
"name",
job.ImageGC,
jobNameGarbageCollection,
"kind",
job.KindPeriodic,
"unique",
@ -104,7 +104,7 @@ func (suite *ManagerTestSuite) SetupTest() {
params["redis_url_reg"] = "redis://redis:6379/1"
policy := make(map[string]interface{})
policy["job_name"] = job.ImageGC
policy["job_name"] = jobNameGarbageCollection
policy["job_params"] = params
policy["cron_spec"] = "0 0 17 * * *"

View File

@ -29,6 +29,12 @@ import (
"github.com/gomodule/redigo/redis"
)
const (
jobNameGarbageCollection = "IMAGE_GC"
jobNameScanAll = "IMAGE_SCAN_ALL"
jobNameReplicationScheduler = "IMAGE_REPLICATE"
)
// PolicyMigrator migrate the cron job policy to new schema
type PolicyMigrator struct {
// namespace of rdb
@ -299,9 +305,9 @@ func clearDuplicatedPolicies(conn redis.Conn, ns string) error {
continue
}
if p.JobName == job.ImageScanAllJob ||
p.JobName == job.ImageGC ||
p.JobName == job.ReplicationScheduler {
if p.JobName == jobNameScanAll ||
p.JobName == jobNameGarbageCollection ||
p.JobName == jobNameReplicationScheduler {
score, _ := strconv.ParseInt(string(bytes[i+1].([]byte)), 10, 64)
key := hashKey(p)

View File

@ -33,6 +33,7 @@ import (
"github.com/goharbor/harbor/src/jobservice/job/impl"
"github.com/goharbor/harbor/src/jobservice/job/impl/gc"
"github.com/goharbor/harbor/src/jobservice/job/impl/gcreadonly"
"github.com/goharbor/harbor/src/jobservice/job/impl/legacy"
"github.com/goharbor/harbor/src/jobservice/job/impl/notification"
"github.com/goharbor/harbor/src/jobservice/job/impl/replication"
"github.com/goharbor/harbor/src/jobservice/job/impl/sample"
@ -255,15 +256,20 @@ func (bs *Bootstrap) loadAndRunRedisWorkerPool(
job.SampleJob: (*sample.Job)(nil),
// Functional jobs
job.ImageScanJob: (*scan.Job)(nil),
job.ImageGC: (*gc.GarbageCollector)(nil),
job.GarbageCollection: (*gc.GarbageCollector)(nil),
job.ImageGCReadOnly: (*gcreadonly.GarbageCollector)(nil),
job.Replication: (*replication.Replication)(nil),
job.ReplicationScheduler: (*replication.Scheduler)(nil),
job.Retention: (*retention.Job)(nil),
scheduler.JobNameScheduler: (*scheduler.PeriodicJob)(nil),
job.WebhookJob: (*notification.WebhookJob)(nil),
job.SlackJob: (*notification.SlackJob)(nil),
job.P2PPreheat: (*preheat.Job)(nil),
// In v2.2 we migrate the scheduled replication, garbage collection and scan all to
// the scheduler mechanism, the following three jobs are kept for the legacy jobs
// and they can be removed after several releases
"IMAGE_REPLICATE": (*legacy.ReplicationScheduler)(nil),
"IMAGE_GC": (*legacy.GarbageCollectionScheduler)(nil),
"IMAGE_SCAN_ALL": (*legacy.ScanAllScheduler)(nil),
}); err != nil {
// exit
return nil, err

View File

@ -176,11 +176,11 @@ func (s *scanAllAPI) createOrUpdateScanAllSchedule(ctx context.Context, cronType
}
}
return s.scheduler.Schedule(ctx, job.ImageScanAllJob, 0, cronType, cron, scan.ScanAllCallback, nil, nil)
return s.scheduler.Schedule(ctx, scan.VendorTypeScanAll, 0, cronType, cron, scan.ScanAllCallback, nil, nil)
}
func (s *scanAllAPI) getScanAllSchedule(ctx context.Context) (*scheduler.Schedule, error) {
query := q.New(q.KeyWords{"vendor_type": job.ImageScanAllJob})
query := q.New(q.KeyWords{"vendor_type": scan.VendorTypeScanAll})
schedules, err := s.scheduler.ListSchedules(ctx, query.First("-creation_time"))
if err != nil {
return nil, err
@ -235,7 +235,7 @@ func (s *scanAllAPI) getMetrics(ctx context.Context, trigger ...string) (*models
}
func (s *scanAllAPI) getLatestScanAllExecution(ctx context.Context, trigger ...string) (*task.Execution, error) {
query := q.New(q.KeyWords{"vendor_type": job.ImageScanAllJob})
query := q.New(q.KeyWords{"vendor_type": scan.VendorTypeScanAll})
if len(trigger) > 0 {
query.Keywords["trigger"] = trigger[0]
}