mirror of
https://github.com/goharbor/harbor.git
synced 2025-02-16 20:01:35 +01:00
Change queue name for purge audit log and filter the old one (#18182)
Fixes #18121 Refactor job name with VendorType prefix, make sure job queue name and vendor type in execution and task are identical Signed-off-by: stonezdj <daojunz@vmware.com>
This commit is contained in:
parent
d03f0dcf2d
commit
58054c0ce8
@ -57,7 +57,7 @@ func init() {
|
||||
_ = notifier.Subscribe(event.TopicPullArtifact, &internal.Handler{})
|
||||
_ = notifier.Subscribe(event.TopicPushArtifact, &internal.Handler{})
|
||||
|
||||
_ = task.RegisterTaskStatusChangePostFunc(job.Replication, func(ctx context.Context, taskID int64, status string) error {
|
||||
_ = task.RegisterTaskStatusChangePostFunc(job.ReplicationVendorType, func(ctx context.Context, taskID int64, status string) error {
|
||||
notification.AddEvent(ctx, &metadata.ReplicationMetaData{
|
||||
ReplicationTaskID: taskID,
|
||||
Status: status,
|
||||
|
@ -29,12 +29,12 @@ import (
|
||||
)
|
||||
|
||||
func init() {
|
||||
err := scheduler.RegisterCallbackFunc(SchedulerCallback, gcCallback)
|
||||
err := scheduler.RegisterCallbackFunc(job.GarbageCollectionVendorType, gcCallback)
|
||||
if err != nil {
|
||||
log.Fatalf("failed to registry GC call back, %v", err)
|
||||
}
|
||||
|
||||
if err := task.RegisterTaskStatusChangePostFunc(GCVendorType, gcTaskStatusChange); err != nil {
|
||||
if err := task.RegisterTaskStatusChangePostFunc(job.GarbageCollectionVendorType, gcTaskStatusChange); err != nil {
|
||||
log.Fatalf("failed to register the task status change post for the gc job, error %v", err)
|
||||
}
|
||||
}
|
||||
|
@ -12,7 +12,7 @@ import (
|
||||
|
||||
func init() {
|
||||
// keep only the latest created 50 gc execution records
|
||||
task.SetExecutionSweeperCount(GCVendorType, 50)
|
||||
task.SetExecutionSweeperCount(job.GarbageCollectionVendorType, 50)
|
||||
}
|
||||
|
||||
var (
|
||||
@ -20,13 +20,6 @@ var (
|
||||
Ctl = NewController()
|
||||
)
|
||||
|
||||
const (
|
||||
// SchedulerCallback ...
|
||||
SchedulerCallback = "GARBAGE_COLLECTION"
|
||||
// GCVendorType ...
|
||||
GCVendorType = "GARBAGE_COLLECTION"
|
||||
)
|
||||
|
||||
// Controller manages the tags
|
||||
type Controller interface {
|
||||
// Start start a manual gc job
|
||||
@ -79,12 +72,12 @@ func (c *controller) Start(ctx context.Context, policy Policy, trigger string) (
|
||||
para["redis_url_reg"] = policy.ExtraAttrs["redis_url_reg"]
|
||||
para["time_window"] = policy.ExtraAttrs["time_window"]
|
||||
|
||||
execID, err := c.exeMgr.Create(ctx, GCVendorType, -1, trigger, para)
|
||||
execID, err := c.exeMgr.Create(ctx, job.GarbageCollectionVendorType, -1, trigger, para)
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
_, err = c.taskMgr.Create(ctx, execID, &task.Job{
|
||||
Name: job.GarbageCollection,
|
||||
Name: job.GarbageCollectionVendorType,
|
||||
Metadata: &job.Metadata{
|
||||
JobKind: job.KindGeneric,
|
||||
},
|
||||
@ -103,14 +96,14 @@ func (c *controller) Stop(ctx context.Context, id int64) error {
|
||||
|
||||
// ExecutionCount ...
|
||||
func (c *controller) ExecutionCount(ctx context.Context, query *q.Query) (int64, error) {
|
||||
query.Keywords["VendorType"] = GCVendorType
|
||||
query.Keywords["VendorType"] = job.GarbageCollectionVendorType
|
||||
return c.exeMgr.Count(ctx, query)
|
||||
}
|
||||
|
||||
// ListExecutions ...
|
||||
func (c *controller) ListExecutions(ctx context.Context, query *q.Query) ([]*Execution, error) {
|
||||
query = q.MustClone(query)
|
||||
query.Keywords["VendorType"] = GCVendorType
|
||||
query.Keywords["VendorType"] = job.GarbageCollectionVendorType
|
||||
|
||||
execs, err := c.exeMgr.List(ctx, query)
|
||||
if err != nil {
|
||||
@ -128,7 +121,7 @@ func (c *controller) GetExecution(ctx context.Context, id int64) (*Execution, er
|
||||
execs, err := c.exeMgr.List(ctx, &q.Query{
|
||||
Keywords: map[string]interface{}{
|
||||
"ID": id,
|
||||
"VendorType": GCVendorType,
|
||||
"VendorType": job.GarbageCollectionVendorType,
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
@ -146,7 +139,7 @@ func (c *controller) GetTask(ctx context.Context, id int64) (*Task, error) {
|
||||
tasks, err := c.taskMgr.List(ctx, &q.Query{
|
||||
Keywords: map[string]interface{}{
|
||||
"ID": id,
|
||||
"VendorType": GCVendorType,
|
||||
"VendorType": job.GarbageCollectionVendorType,
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
@ -162,7 +155,7 @@ func (c *controller) GetTask(ctx context.Context, id int64) (*Task, error) {
|
||||
// ListTasks ...
|
||||
func (c *controller) ListTasks(ctx context.Context, query *q.Query) ([]*Task, error) {
|
||||
query = q.MustClone(query)
|
||||
query.Keywords["VendorType"] = GCVendorType
|
||||
query.Keywords["VendorType"] = job.GarbageCollectionVendorType
|
||||
tks, err := c.taskMgr.List(ctx, query)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -185,7 +178,7 @@ func (c *controller) GetTaskLog(ctx context.Context, id int64) ([]byte, error) {
|
||||
|
||||
// GetSchedule ...
|
||||
func (c *controller) GetSchedule(ctx context.Context) (*scheduler.Schedule, error) {
|
||||
sch, err := c.schedulerMgr.ListSchedules(ctx, q.New(q.KeyWords{"VendorType": GCVendorType}))
|
||||
sch, err := c.schedulerMgr.ListSchedules(ctx, q.New(q.KeyWords{"VendorType": job.GarbageCollectionVendorType}))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -202,12 +195,12 @@ func (c *controller) GetSchedule(ctx context.Context) (*scheduler.Schedule, erro
|
||||
func (c *controller) CreateSchedule(ctx context.Context, cronType, cron string, policy Policy) (int64, error) {
|
||||
extras := make(map[string]interface{})
|
||||
extras["delete_untagged"] = policy.DeleteUntagged
|
||||
return c.schedulerMgr.Schedule(ctx, GCVendorType, -1, cronType, cron, SchedulerCallback, policy, extras)
|
||||
return c.schedulerMgr.Schedule(ctx, job.GarbageCollectionVendorType, -1, cronType, cron, job.GarbageCollectionVendorType, policy, extras)
|
||||
}
|
||||
|
||||
// DeleteSchedule ...
|
||||
func (c *controller) DeleteSchedule(ctx context.Context) error {
|
||||
return c.schedulerMgr.UnScheduleByVendor(ctx, GCVendorType, -1)
|
||||
return c.schedulerMgr.UnScheduleByVendor(ctx, job.GarbageCollectionVendorType, -1)
|
||||
}
|
||||
|
||||
func convertExecution(exec *task.Execution) *Execution {
|
||||
|
@ -80,7 +80,7 @@ func (g *gcCtrTestSuite) TestGetExecution() {
|
||||
{
|
||||
ID: 1,
|
||||
Trigger: "Manual",
|
||||
VendorType: GCVendorType,
|
||||
VendorType: job.GarbageCollectionVendorType,
|
||||
StatusMessage: "Success",
|
||||
},
|
||||
}, nil)
|
||||
|
@ -49,6 +49,7 @@ var skippedJobTypes = []string{
|
||||
"IMAGE_REPLICATE",
|
||||
"IMAGE_SCAN_ALL",
|
||||
"IMAGE_GC",
|
||||
"PURGE_AUDIT",
|
||||
}
|
||||
|
||||
// MonitorController defines the worker pool operations
|
||||
|
@ -21,6 +21,7 @@ import (
|
||||
"github.com/stretchr/testify/suite"
|
||||
|
||||
"github.com/goharbor/harbor/src/controller/purge"
|
||||
"github.com/goharbor/harbor/src/jobservice/job"
|
||||
"github.com/goharbor/harbor/src/pkg/queuestatus"
|
||||
"github.com/goharbor/harbor/src/pkg/scheduler"
|
||||
"github.com/goharbor/harbor/src/testing/mock"
|
||||
@ -50,27 +51,27 @@ func (s *ScheduleTestSuite) TestCreateSchedule() {
|
||||
|
||||
dataMap := make(map[string]interface{})
|
||||
p := purge.JobPolicy{}
|
||||
id, err := s.ctl.Create(nil, purge.VendorType, "Daily", "* * * * * *", purge.SchedulerCallback, p, dataMap)
|
||||
id, err := s.ctl.Create(nil, job.PurgeAuditVendorType, "Daily", "* * * * * *", purge.SchedulerCallback, p, dataMap)
|
||||
s.Nil(err)
|
||||
s.Equal(int64(1), id)
|
||||
}
|
||||
|
||||
func (s *ScheduleTestSuite) TestDeleteSchedule() {
|
||||
s.scheduler.On("UnScheduleByVendor", mock.Anything, mock.Anything, mock.Anything).Return(nil)
|
||||
s.Nil(s.ctl.Delete(nil, purge.VendorType))
|
||||
s.Nil(s.ctl.Delete(nil, job.PurgeAuditVendorType))
|
||||
}
|
||||
|
||||
func (s *ScheduleTestSuite) TestGetSchedule() {
|
||||
s.scheduler.On("ListSchedules", mock.Anything, mock.Anything).Return([]*scheduler.Schedule{
|
||||
{
|
||||
ID: 1,
|
||||
VendorType: purge.VendorType,
|
||||
VendorType: job.PurgeAuditVendorType,
|
||||
},
|
||||
}, nil).Once()
|
||||
|
||||
schedule, err := s.ctl.Get(nil, purge.VendorType)
|
||||
schedule, err := s.ctl.Get(nil, job.PurgeAuditVendorType)
|
||||
s.Nil(err)
|
||||
s.Equal(purge.VendorType, schedule.VendorType)
|
||||
s.Equal(job.PurgeAuditVendorType, schedule.VendorType)
|
||||
}
|
||||
|
||||
func (s *ScheduleTestSuite) TestListSchedule() {
|
||||
|
@ -292,7 +292,7 @@ func (c *controller) CreatePolicy(ctx context.Context, schema *policyModels.Sche
|
||||
len(schema.Trigger.Settings.Cron) > 0 {
|
||||
// schedule and update policy
|
||||
extras := make(map[string]interface{})
|
||||
if _, err = c.scheduler.Schedule(ctx, job.P2PPreheat, id, "", schema.Trigger.Settings.Cron,
|
||||
if _, err = c.scheduler.Schedule(ctx, job.P2PPreheatVendorType, id, "", schema.Trigger.Settings.Cron,
|
||||
SchedulerCallback, TriggerParam{PolicyID: id}, extras); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
@ -302,7 +302,7 @@ func (c *controller) CreatePolicy(ctx context.Context, schema *policyModels.Sche
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
if e := c.scheduler.UnScheduleByVendor(ctx, job.P2PPreheat, id); e != nil {
|
||||
if e := c.scheduler.UnScheduleByVendor(ctx, job.P2PPreheatVendorType, id); e != nil {
|
||||
return 0, errors.Wrap(e, err.Error())
|
||||
}
|
||||
|
||||
@ -375,7 +375,7 @@ func (c *controller) UpdatePolicy(ctx context.Context, schema *policyModels.Sche
|
||||
|
||||
// unschedule old
|
||||
if needUn {
|
||||
err = c.scheduler.UnScheduleByVendor(ctx, job.P2PPreheat, schema.ID)
|
||||
err = c.scheduler.UnScheduleByVendor(ctx, job.P2PPreheatVendorType, schema.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -384,7 +384,7 @@ func (c *controller) UpdatePolicy(ctx context.Context, schema *policyModels.Sche
|
||||
// schedule new
|
||||
if needSch {
|
||||
extras := make(map[string]interface{})
|
||||
if _, err := c.scheduler.Schedule(ctx, job.P2PPreheat, schema.ID, "", cron, SchedulerCallback,
|
||||
if _, err := c.scheduler.Schedule(ctx, job.P2PPreheatVendorType, schema.ID, "", cron, SchedulerCallback,
|
||||
TriggerParam{PolicyID: schema.ID}, extras); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -408,7 +408,7 @@ func (c *controller) DeletePolicy(ctx context.Context, id int64) error {
|
||||
return err
|
||||
}
|
||||
if s.Trigger != nil && s.Trigger.Type == policyModels.TriggerTypeScheduled && len(s.Trigger.Settings.Cron) > 0 {
|
||||
err = c.scheduler.UnScheduleByVendor(ctx, job.P2PPreheat, id)
|
||||
err = c.scheduler.UnScheduleByVendor(ctx, job.P2PPreheatVendorType, id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -440,7 +440,7 @@ func (c *controller) DeletePoliciesOfProject(ctx context.Context, project int64)
|
||||
func (c *controller) deleteExecs(ctx context.Context, vendorID int64) error {
|
||||
executions, err := c.executionMgr.List(ctx, &q.Query{
|
||||
Keywords: map[string]interface{}{
|
||||
"VendorType": job.P2PPreheat,
|
||||
"VendorType": job.P2PPreheatVendorType,
|
||||
"VendorID": vendorID,
|
||||
},
|
||||
})
|
||||
|
@ -48,7 +48,7 @@ import (
|
||||
|
||||
func init() {
|
||||
// keep only the latest created 50 p2p preheat execution records
|
||||
task.SetExecutionSweeperCount(job.P2PPreheat, 50)
|
||||
task.SetExecutionSweeperCount(job.P2PPreheatVendorType, 50)
|
||||
}
|
||||
|
||||
const (
|
||||
@ -383,7 +383,7 @@ func (de *defaultEnforcer) launchExecutions(ctx context.Context, candidates []*s
|
||||
attrs[extraAttrTriggerSetting] = "-"
|
||||
}
|
||||
|
||||
eid, err := de.executionMgr.Create(ctx, job.P2PPreheat, pl.ID, pl.Trigger.Type, attrs)
|
||||
eid, err := de.executionMgr.Create(ctx, job.P2PPreheatVendorType, pl.ID, pl.Trigger.Type, attrs)
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
@ -458,7 +458,7 @@ func (de *defaultEnforcer) startTask(ctx context.Context, executionID int64, can
|
||||
}
|
||||
|
||||
j := &task.Job{
|
||||
Name: job.P2PPreheat,
|
||||
Name: job.P2PPreheatVendorType,
|
||||
Parameters: job.Parameters{
|
||||
preheat.PreheatParamProvider: instance,
|
||||
preheat.PreheatParamImage: piData,
|
||||
|
@ -29,8 +29,6 @@ import (
|
||||
const (
|
||||
// SchedulerCallback ...
|
||||
SchedulerCallback = "PURGE_AUDIT_LOG_CALLBACK"
|
||||
// VendorType ...
|
||||
VendorType = "PURGE_AUDIT_LOG"
|
||||
)
|
||||
|
||||
// Ctrl a global purge controller instance
|
||||
@ -76,12 +74,12 @@ func (c *controller) Start(ctx context.Context, policy JobPolicy, trigger string
|
||||
para[common.PurgeAuditRetentionHour] = policy.RetentionHour
|
||||
para[common.PurgeAuditIncludeOperations] = policy.IncludeOperations
|
||||
|
||||
execID, err := c.exeMgr.Create(ctx, VendorType, -1, trigger, para)
|
||||
execID, err := c.exeMgr.Create(ctx, job.PurgeAuditVendorType, -1, trigger, para)
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
_, err = c.taskMgr.Create(ctx, execID, &task.Job{
|
||||
Name: job.PurgeAudit,
|
||||
Name: job.PurgeAuditVendorType,
|
||||
Metadata: &job.Metadata{
|
||||
JobKind: job.KindGeneric,
|
||||
},
|
||||
|
@ -37,7 +37,7 @@ import (
|
||||
|
||||
func init() {
|
||||
// keep only the latest created 50 replication execution records
|
||||
task.SetExecutionSweeperCount(job.Replication, 50)
|
||||
task.SetExecutionSweeperCount(job.ReplicationVendorType, 50)
|
||||
}
|
||||
|
||||
// Ctl is a global replication controller instance
|
||||
@ -109,7 +109,7 @@ func (c *controller) Start(ctx context.Context, policy *replicationmodel.Policy,
|
||||
WithMessage("the policy %d is disabled", policy.ID)
|
||||
}
|
||||
// create an execution record
|
||||
id, err := c.execMgr.Create(ctx, job.Replication, policy.ID, trigger)
|
||||
id, err := c.execMgr.Create(ctx, job.ReplicationVendorType, policy.ID, trigger)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
@ -191,7 +191,7 @@ func (c *controller) ListExecutions(ctx context.Context, query *q.Query) ([]*Exe
|
||||
func (c *controller) buildExecutionQuery(query *q.Query) *q.Query {
|
||||
// as the following logic may change the content of the query, clone it first
|
||||
query = q.MustClone(query)
|
||||
query.Keywords["VendorType"] = job.Replication
|
||||
query.Keywords["VendorType"] = job.ReplicationVendorType
|
||||
// convert the query keyword "PolicyID" or "policy_id" to the "VendorID"
|
||||
if value, exist := query.Keywords["PolicyID"]; exist {
|
||||
query.Keywords["VendorID"] = value
|
||||
@ -208,7 +208,7 @@ func (c *controller) GetExecution(ctx context.Context, id int64) (*Execution, er
|
||||
execs, err := c.execMgr.List(ctx, &q.Query{
|
||||
Keywords: map[string]interface{}{
|
||||
"ID": id,
|
||||
"VendorType": job.Replication,
|
||||
"VendorType": job.ReplicationVendorType,
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
@ -223,13 +223,13 @@ func (c *controller) GetExecution(ctx context.Context, id int64) (*Execution, er
|
||||
|
||||
func (c *controller) TaskCount(ctx context.Context, query *q.Query) (int64, error) {
|
||||
query = q.MustClone(query)
|
||||
query.Keywords["VendorType"] = job.Replication
|
||||
query.Keywords["VendorType"] = job.ReplicationVendorType
|
||||
return c.taskMgr.Count(ctx, query)
|
||||
}
|
||||
|
||||
func (c *controller) ListTasks(ctx context.Context, query *q.Query) ([]*Task, error) {
|
||||
query = q.MustClone(query)
|
||||
query.Keywords["VendorType"] = job.Replication
|
||||
query.Keywords["VendorType"] = job.ReplicationVendorType
|
||||
tks, err := c.taskMgr.List(ctx, query)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -245,7 +245,7 @@ func (c *controller) GetTask(ctx context.Context, id int64) (*Task, error) {
|
||||
tasks, err := c.taskMgr.List(ctx, &q.Query{
|
||||
Keywords: map[string]interface{}{
|
||||
"ID": id,
|
||||
"VendorType": job.Replication,
|
||||
"VendorType": job.ReplicationVendorType,
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
|
@ -108,7 +108,7 @@ func (r *replicationTestSuite) TestStop() {
|
||||
r.execMgr.On("List", mock.Anything, mock.Anything).Return([]*task.Execution{
|
||||
{
|
||||
ID: 1,
|
||||
VendorType: job.Replication,
|
||||
VendorType: job.ReplicationVendorType,
|
||||
VendorID: 1,
|
||||
Status: job.RunningStatus.String(),
|
||||
Metrics: &dao.Metrics{
|
||||
@ -138,7 +138,7 @@ func (r *replicationTestSuite) TestListExecutions() {
|
||||
r.execMgr.On("List", mock.Anything, mock.Anything).Return([]*task.Execution{
|
||||
{
|
||||
ID: 1,
|
||||
VendorType: job.Replication,
|
||||
VendorType: job.ReplicationVendorType,
|
||||
VendorID: 1,
|
||||
Status: job.RunningStatus.String(),
|
||||
Metrics: &dao.Metrics{
|
||||
@ -162,7 +162,7 @@ func (r *replicationTestSuite) TestGetExecution() {
|
||||
r.execMgr.On("List", mock.Anything, mock.Anything).Return([]*task.Execution{
|
||||
{
|
||||
ID: 1,
|
||||
VendorType: job.Replication,
|
||||
VendorType: job.ReplicationVendorType,
|
||||
VendorID: 1,
|
||||
Status: job.RunningStatus.String(),
|
||||
Metrics: &dao.Metrics{
|
||||
|
@ -132,7 +132,7 @@ func (c *copyFlow) createTasks(ctx context.Context, srcResources, dstResources [
|
||||
}
|
||||
|
||||
job := &task.Job{
|
||||
Name: job.Replication,
|
||||
Name: job.ReplicationVendorType,
|
||||
Metadata: &job.Metadata{
|
||||
JobKind: job.KindGeneric,
|
||||
},
|
||||
|
@ -74,7 +74,7 @@ func (d *deletionFlow) createTasks(ctx context.Context, srcResources, dstResourc
|
||||
}
|
||||
|
||||
job := &task.Job{
|
||||
Name: job.Replication,
|
||||
Name: job.ReplicationVendorType,
|
||||
Metadata: &job.Metadata{
|
||||
JobKind: job.KindGeneric,
|
||||
},
|
||||
|
@ -121,7 +121,7 @@ func (c *controller) CreatePolicy(ctx context.Context, policy *model.Policy) (in
|
||||
}
|
||||
// create schedule if needed
|
||||
if policy.IsScheduledTrigger() {
|
||||
if _, err = c.scheduler.Schedule(ctx, job.Replication, id, "", policy.Trigger.Settings.Cron,
|
||||
if _, err = c.scheduler.Schedule(ctx, job.ReplicationVendorType, id, "", policy.Trigger.Settings.Cron,
|
||||
callbackFuncName, id, map[string]interface{}{}); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
@ -134,7 +134,7 @@ func (c *controller) UpdatePolicy(ctx context.Context, policy *model.Policy, pro
|
||||
return err
|
||||
}
|
||||
// delete the schedule
|
||||
if err := c.scheduler.UnScheduleByVendor(ctx, job.Replication, policy.ID); err != nil {
|
||||
if err := c.scheduler.UnScheduleByVendor(ctx, job.ReplicationVendorType, policy.ID); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -148,7 +148,7 @@ func (c *controller) UpdatePolicy(ctx context.Context, policy *model.Policy, pro
|
||||
}
|
||||
// create schedule if needed
|
||||
if policy.IsScheduledTrigger() {
|
||||
if _, err := c.scheduler.Schedule(ctx, job.Replication, policy.ID, "", policy.Trigger.Settings.Cron,
|
||||
if _, err := c.scheduler.Schedule(ctx, job.ReplicationVendorType, policy.ID, "", policy.Trigger.Settings.Cron,
|
||||
callbackFuncName, policy.ID, map[string]interface{}{}); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -175,11 +175,11 @@ func (c *controller) validatePolicy(ctx context.Context, policy *model.Policy) e
|
||||
|
||||
func (c *controller) DeletePolicy(ctx context.Context, id int64) error {
|
||||
// delete the executions
|
||||
if err := c.execMgr.DeleteByVendor(ctx, job.Replication, id); err != nil {
|
||||
if err := c.execMgr.DeleteByVendor(ctx, job.ReplicationVendorType, id); err != nil {
|
||||
return err
|
||||
}
|
||||
// delete the schedule
|
||||
if err := c.scheduler.UnScheduleByVendor(ctx, job.Replication, id); err != nil {
|
||||
if err := c.scheduler.UnScheduleByVendor(ctx, job.ReplicationVendorType, id); err != nil {
|
||||
return err
|
||||
}
|
||||
// delete the policy
|
||||
|
@ -33,7 +33,7 @@ import (
|
||||
|
||||
func init() {
|
||||
// keep only the latest created 50 retention execution records
|
||||
task.SetExecutionSweeperCount(job.Retention, 50)
|
||||
task.SetExecutionSweeperCount(job.RetentionVendorType, 50)
|
||||
}
|
||||
|
||||
// go:generate mockery -name Controller -case snake
|
||||
@ -86,7 +86,7 @@ type defaultController struct {
|
||||
const (
|
||||
// SchedulerCallback ...
|
||||
SchedulerCallback = "RETENTION"
|
||||
schedulerVendorType = job.Retention
|
||||
schedulerVendorType = job.RetentionVendorType
|
||||
)
|
||||
|
||||
// TriggerParam ...
|
||||
@ -207,7 +207,7 @@ func (r *defaultController) DeleteRetention(ctx context.Context, id int64) error
|
||||
func (r *defaultController) deleteExecs(ctx context.Context, vendorID int64) error {
|
||||
executions, err := r.execMgr.List(ctx, &q.Query{
|
||||
Keywords: map[string]interface{}{
|
||||
"VendorType": job.Retention,
|
||||
"VendorType": job.RetentionVendorType,
|
||||
"VendorID": vendorID,
|
||||
},
|
||||
})
|
||||
@ -232,7 +232,7 @@ func (r *defaultController) TriggerRetentionExec(ctx context.Context, policyID i
|
||||
return 0, err
|
||||
}
|
||||
|
||||
id, err := r.execMgr.Create(ctx, job.Retention, policyID, trigger,
|
||||
id, err := r.execMgr.Create(ctx, job.RetentionVendorType, policyID, trigger,
|
||||
map[string]interface{}{
|
||||
"dry_run": dryRun,
|
||||
},
|
||||
@ -284,7 +284,7 @@ func (r *defaultController) GetRetentionExec(ctx context.Context, executionID in
|
||||
// ListRetentionExecs List Retention Executions
|
||||
func (r *defaultController) ListRetentionExecs(ctx context.Context, policyID int64, query *q.Query) ([]*retention.Execution, error) {
|
||||
query = q.MustClone(query)
|
||||
query.Keywords["VendorType"] = job.Retention
|
||||
query.Keywords["VendorType"] = job.RetentionVendorType
|
||||
query.Keywords["VendorID"] = policyID
|
||||
execs, err := r.execMgr.List(ctx, query)
|
||||
if err != nil {
|
||||
@ -314,7 +314,7 @@ func convertExecution(exec *task.Execution) *retention.Execution {
|
||||
func (r *defaultController) GetTotalOfRetentionExecs(ctx context.Context, policyID int64) (int64, error) {
|
||||
return r.execMgr.Count(ctx, &q.Query{
|
||||
Keywords: map[string]interface{}{
|
||||
"VendorType": job.Retention,
|
||||
"VendorType": job.RetentionVendorType,
|
||||
"VendorID": policyID,
|
||||
},
|
||||
})
|
||||
@ -323,7 +323,7 @@ func (r *defaultController) GetTotalOfRetentionExecs(ctx context.Context, policy
|
||||
// ListRetentionExecTasks List Retention Execution Histories
|
||||
func (r *defaultController) ListRetentionExecTasks(ctx context.Context, executionID int64, query *q.Query) ([]*retention.Task, error) {
|
||||
query = q.MustClone(query)
|
||||
query.Keywords["VendorType"] = job.Retention
|
||||
query.Keywords["VendorType"] = job.RetentionVendorType
|
||||
query.Keywords["ExecutionID"] = executionID
|
||||
tks, err := r.taskMgr.List(ctx, query)
|
||||
if err != nil {
|
||||
@ -355,7 +355,7 @@ func convertTask(t *task.Task) *retention.Task {
|
||||
func (r *defaultController) GetTotalOfRetentionExecTasks(ctx context.Context, executionID int64) (int64, error) {
|
||||
return r.taskMgr.Count(ctx, &q.Query{
|
||||
Keywords: map[string]interface{}{
|
||||
"VendorType": job.Retention,
|
||||
"VendorType": job.RetentionVendorType,
|
||||
"ExecutionID": executionID,
|
||||
},
|
||||
})
|
||||
|
@ -301,7 +301,7 @@ func (bc *basicController) Scan(ctx context.Context, artifact *ar.Artifact, opti
|
||||
"name": r.Name,
|
||||
},
|
||||
}
|
||||
executionID, err := bc.execMgr.Create(ctx, job.ImageScanJob, r.ID, task.ExecutionTriggerManual, extraAttrs)
|
||||
executionID, err := bc.execMgr.Create(ctx, job.ImageScanJobVendorType, r.ID, task.ExecutionTriggerManual, extraAttrs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -955,7 +955,7 @@ func (bc *basicController) launchScanJob(ctx context.Context, param *launchScanJ
|
||||
params[sca.JobParameterRobot] = robotJSON
|
||||
|
||||
j := &task.Job{
|
||||
Name: job.ImageScanJob,
|
||||
Name: job.ImageScanJobVendorType,
|
||||
Metadata: &job.Metadata{
|
||||
JobKind: job.KindGeneric,
|
||||
},
|
||||
|
@ -50,7 +50,7 @@ func init() {
|
||||
log.Fatalf("failed to register the task status change post for the scan all job, error %v", err)
|
||||
}
|
||||
|
||||
if err := task.RegisterTaskStatusChangePostFunc(job.ImageScanJob, scanTaskStatusChange); err != nil {
|
||||
if err := task.RegisterTaskStatusChangePostFunc(job.ImageScanJobVendorType, scanTaskStatusChange); err != nil {
|
||||
log.Fatalf("failed to register the task status change post for the scan job, error %v", err)
|
||||
}
|
||||
}
|
||||
|
@ -17,7 +17,7 @@ import (
|
||||
)
|
||||
|
||||
func init() {
|
||||
task.SetExecutionSweeperCount(job.ScanDataExport, 50)
|
||||
task.SetExecutionSweeperCount(job.ScanDataExportVendorType, 50)
|
||||
}
|
||||
|
||||
var Ctl = NewController()
|
||||
@ -48,7 +48,7 @@ type controller struct {
|
||||
|
||||
func (c *controller) ListExecutions(ctx context.Context, userName string) ([]*export.Execution, error) {
|
||||
keywords := make(map[string]interface{})
|
||||
keywords["VendorType"] = job.ScanDataExport
|
||||
keywords["VendorType"] = job.ScanDataExportVendorType
|
||||
keywords[fmt.Sprintf("ExtraAttrs.%s", export.UserNameAttribute)] = userName
|
||||
|
||||
q := q2.New(q2.KeyWords{})
|
||||
@ -69,7 +69,7 @@ func (c *controller) GetTask(ctx context.Context, executionID int64) (*task.Task
|
||||
query := q2.New(q2.KeyWords{})
|
||||
|
||||
keywords := make(map[string]interface{})
|
||||
keywords["VendorType"] = job.ScanDataExport
|
||||
keywords["VendorType"] = job.ScanDataExportVendorType
|
||||
keywords["ExecutionID"] = executionID
|
||||
query.Keywords = keywords
|
||||
query.Sorts = append(query.Sorts, &q2.Sort{
|
||||
@ -119,7 +119,7 @@ func (c *controller) Start(ctx context.Context, request export.Request) (executi
|
||||
extraAttrs[export.ProjectIDsAttribute] = request.Projects
|
||||
extraAttrs[export.JobNameAttribute] = request.JobName
|
||||
extraAttrs[export.UserNameAttribute] = request.UserName
|
||||
id, err := c.execMgr.Create(ctx, job.ScanDataExport, vendorID, task.ExecutionTriggerManual, extraAttrs)
|
||||
id, err := c.execMgr.Create(ctx, job.ScanDataExportVendorType, vendorID, task.ExecutionTriggerManual, extraAttrs)
|
||||
logger.Infof("Created an execution record with id : %d for vendorID: %d", id, vendorID)
|
||||
if err != nil {
|
||||
logger.Errorf("Encountered error when creating job : %v", err)
|
||||
@ -133,7 +133,7 @@ func (c *controller) Start(ctx context.Context, request export.Request) (executi
|
||||
params[export.JobModeKey] = export.JobModeExport
|
||||
|
||||
j := &task.Job{
|
||||
Name: job.ScanDataExport,
|
||||
Name: job.ScanDataExportVendorType,
|
||||
Metadata: &job.Metadata{
|
||||
JobKind: job.KindGeneric,
|
||||
},
|
||||
|
@ -332,7 +332,7 @@ func (suite *ScanDataExportExecutionTestSuite) validateExecutionManagerInvocatio
|
||||
userName, userNamePresent := m[export.UserNameAttribute]
|
||||
return jobNamePresent && userNamePresent && jobName == "test-job" && userName == "test-user"
|
||||
})
|
||||
suite.execMgr.AssertCalled(suite.T(), "Create", ctx, job.ScanDataExport, int64(-1), task.ExecutionTriggerManual, extraAttsMatcher)
|
||||
suite.execMgr.AssertCalled(suite.T(), "Create", ctx, job.ScanDataExportVendorType, int64(-1), task.ExecutionTriggerManual, extraAttsMatcher)
|
||||
}
|
||||
|
||||
func TestScanDataExportExecutionTestSuite(t *testing.T) {
|
||||
|
@ -16,9 +16,8 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
VendorTypeSystemArtifactCleanup = "SYSTEM_ARTIFACT_CLEANUP"
|
||||
cronTypeDaily = "Daily"
|
||||
cronSpec = "0 0 0 * * *"
|
||||
cronTypeDaily = "Daily"
|
||||
cronSpec = "0 0 0 * * *"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -26,7 +25,7 @@ var (
|
||||
)
|
||||
|
||||
func init() {
|
||||
task.SetExecutionSweeperCount(VendorTypeSystemArtifactCleanup, 50)
|
||||
task.SetExecutionSweeperCount(job.SystemArtifactCleanupVendorType, 50)
|
||||
}
|
||||
|
||||
var Ctl = NewController()
|
||||
@ -52,7 +51,7 @@ type controller struct {
|
||||
}
|
||||
|
||||
func (c *controller) Start(ctx context.Context, async bool, trigger string) error {
|
||||
execID, err := c.execMgr.Create(ctx, VendorTypeSystemArtifactCleanup, 0, trigger)
|
||||
execID, err := c.execMgr.Create(ctx, job.SystemArtifactCleanupVendorType, 0, trigger)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -92,7 +91,7 @@ func (c *controller) Start(ctx context.Context, async bool, trigger string) erro
|
||||
|
||||
func (c *controller) createCleanupTask(ctx context.Context, jobParams job.Parameters, execID int64) error {
|
||||
j := &task.Job{
|
||||
Name: job.SystemArtifactCleanup,
|
||||
Name: job.SystemArtifactCleanupVendorType,
|
||||
Metadata: &job.Metadata{
|
||||
JobKind: job.KindGeneric,
|
||||
},
|
||||
@ -133,7 +132,7 @@ func scheduleSystemArtifactCleanJob(ctx context.Context) {
|
||||
logger.Debugf(" Export data cleanup job already scheduled with ID : %v.", schedule.ID)
|
||||
return
|
||||
}
|
||||
scheduleID, err := sched.Schedule(ctx, VendorTypeSystemArtifactCleanup, 0, cronTypeDaily, cronSpec, SystemArtifactCleanupCallback, nil, nil)
|
||||
scheduleID, err := sched.Schedule(ctx, job.SystemArtifactCleanupVendorType, 0, cronTypeDaily, cronSpec, SystemArtifactCleanupCallback, nil, nil)
|
||||
if err != nil {
|
||||
log.Errorf("Encountered error when scheduling scan data export cleanup job : %v", err)
|
||||
return
|
||||
@ -142,7 +141,7 @@ func scheduleSystemArtifactCleanJob(ctx context.Context) {
|
||||
}
|
||||
|
||||
func getSystemArtifactCleanupSchedule(ctx context.Context) (*scheduler.Schedule, error) {
|
||||
query := q.New(map[string]interface{}{"vendor_type": VendorTypeSystemArtifactCleanup})
|
||||
query := q.New(map[string]interface{}{"vendor_type": job.SystemArtifactCleanupVendorType})
|
||||
schedules, err := sched.ListSchedules(ctx, query)
|
||||
if err != nil {
|
||||
logger.Errorf("Unable to check if export data cleanup job is already scheduled : %v", err)
|
||||
|
@ -8,6 +8,7 @@ import (
|
||||
testifymock "github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/suite"
|
||||
|
||||
"github.com/goharbor/harbor/src/jobservice/job"
|
||||
"github.com/goharbor/harbor/src/lib/orm"
|
||||
scheduler2 "github.com/goharbor/harbor/src/pkg/scheduler"
|
||||
"github.com/goharbor/harbor/src/pkg/task"
|
||||
@ -138,7 +139,7 @@ func (suite *SystemArtifactCleanupTestSuite) TestScheduleCleanupJobNoPreviousSch
|
||||
|
||||
var extraAttrs map[string]interface{}
|
||||
suite.sched.On("Schedule", mock.Anything,
|
||||
VendorTypeSystemArtifactCleanup, int64(0), cronTypeDaily, cronSpec, SystemArtifactCleanupCallback, nil, extraAttrs).Return(int64(1), nil)
|
||||
job.SystemArtifactCleanupVendorType, int64(0), cronTypeDaily, cronSpec, SystemArtifactCleanupCallback, nil, extraAttrs).Return(int64(1), nil)
|
||||
suite.sched.On("ListSchedules", mock.Anything, mock.Anything).Return(make([]*scheduler2.Schedule, 0), nil)
|
||||
sched = suite.sched
|
||||
ctx := context.TODO()
|
||||
@ -146,7 +147,7 @@ func (suite *SystemArtifactCleanupTestSuite) TestScheduleCleanupJobNoPreviousSch
|
||||
ScheduleCleanupTask(ctx)
|
||||
|
||||
suite.sched.AssertCalled(suite.T(), "Schedule", mock.Anything,
|
||||
VendorTypeSystemArtifactCleanup, int64(0), cronTypeDaily, cronSpec, SystemArtifactCleanupCallback, nil, extraAttrs)
|
||||
job.SystemArtifactCleanupVendorType, int64(0), cronTypeDaily, cronSpec, SystemArtifactCleanupCallback, nil, extraAttrs)
|
||||
}
|
||||
|
||||
func (suite *SystemArtifactCleanupTestSuite) TestScheduleCleanupJobPreviousSchedule() {
|
||||
@ -164,7 +165,7 @@ func (suite *SystemArtifactCleanupTestSuite) TestScheduleCleanupJobPreviousSched
|
||||
|
||||
var extraAttrs map[string]interface{}
|
||||
suite.sched.On("Schedule", mock.Anything,
|
||||
VendorTypeSystemArtifactCleanup, int64(0), cronTypeDaily, cronSpec, SystemArtifactCleanupCallback, nil, extraAttrs).Return(int64(1), nil)
|
||||
job.SystemArtifactCleanupVendorType, int64(0), cronTypeDaily, cronSpec, SystemArtifactCleanupCallback, nil, extraAttrs).Return(int64(1), nil)
|
||||
|
||||
existingSchedule := scheduler2.Schedule{ID: int64(10)}
|
||||
suite.sched.On("ListSchedules", mock.Anything, mock.Anything).Return([]*scheduler2.Schedule{&existingSchedule}, nil)
|
||||
@ -174,7 +175,7 @@ func (suite *SystemArtifactCleanupTestSuite) TestScheduleCleanupJobPreviousSched
|
||||
ScheduleCleanupTask(ctx)
|
||||
|
||||
suite.sched.AssertNotCalled(suite.T(), "Schedule", mock.Anything,
|
||||
VendorTypeSystemArtifactCleanup, int64(0), cronTypeDaily, cronSpec, SystemArtifactCleanupCallback, nil, extraAttrs)
|
||||
job.SystemArtifactCleanupVendorType, int64(0), cronTypeDaily, cronSpec, SystemArtifactCleanupCallback, nil, extraAttrs)
|
||||
}
|
||||
|
||||
func (suite *SystemArtifactCleanupTestSuite) TestScheduleCleanupJobPreviousScheduleError() {
|
||||
@ -191,7 +192,7 @@ func (suite *SystemArtifactCleanupTestSuite) TestScheduleCleanupJobPreviousSched
|
||||
}
|
||||
|
||||
suite.sched.On("Schedule", mock.Anything,
|
||||
VendorTypeSystemArtifactCleanup, int64(0), cronTypeDaily, cronSpec, SystemArtifactCleanupCallback, nil, mock.Anything).Return(int64(1), nil)
|
||||
job.SystemArtifactCleanupVendorType, int64(0), cronTypeDaily, cronSpec, SystemArtifactCleanupCallback, nil, mock.Anything).Return(int64(1), nil)
|
||||
|
||||
suite.sched.On("ListSchedules", mock.Anything, mock.Anything).Return(nil, errors.New("test error"))
|
||||
sched = suite.sched
|
||||
@ -203,7 +204,7 @@ func (suite *SystemArtifactCleanupTestSuite) TestScheduleCleanupJobPreviousSched
|
||||
return len(attrs) == 0
|
||||
})
|
||||
suite.sched.AssertNotCalled(suite.T(), "Schedule", mock.Anything,
|
||||
VendorTypeSystemArtifactCleanup, int64(0), cronTypeDaily, cronSpec, SystemArtifactCleanupCallback, nil, extraAttributesMatcher)
|
||||
job.SystemArtifactCleanupVendorType, int64(0), cronTypeDaily, cronSpec, SystemArtifactCleanupCallback, nil, extraAttributesMatcher)
|
||||
}
|
||||
|
||||
func (suite *SystemArtifactCleanupTestSuite) TearDownSuite() {
|
||||
|
@ -20,24 +20,24 @@ const (
|
||||
// SampleJob is name of demo job
|
||||
SampleJob = "DEMO"
|
||||
|
||||
// ImageScanJob is name of scan job it will be used as key to register to job service.
|
||||
ImageScanJob = "IMAGE_SCAN"
|
||||
// GarbageCollection job name
|
||||
GarbageCollection = "GARBAGE_COLLECTION"
|
||||
// Replication : the name of the replication job in job service
|
||||
Replication = "REPLICATION"
|
||||
// WebhookJob : the name of the webhook job in job service
|
||||
WebhookJob = "WEBHOOK"
|
||||
// SlackJob : the name of the slack job in job service
|
||||
SlackJob = "SLACK"
|
||||
// Retention : the name of the retention job
|
||||
Retention = "RETENTION"
|
||||
// P2PPreheat : the name of the P2P preheat job
|
||||
P2PPreheat = "P2P_PREHEAT"
|
||||
// PurgeAudit : the name of purge audit job
|
||||
PurgeAudit = "PURGE_AUDIT"
|
||||
// SystemArtifactCleanup : the name of the SystemArtifact cleanup job
|
||||
SystemArtifactCleanup = "SYSTEM_ARTIFACT_CLEANUP"
|
||||
// ScanDataExport : the name of the scan data export job
|
||||
ScanDataExport = "SCAN_DATA_EXPORT"
|
||||
// ImageScanJobVendorType is name of scan job it will be used as key to register to job service.
|
||||
ImageScanJobVendorType = "IMAGE_SCAN"
|
||||
// GarbageCollectionVendorType job name
|
||||
GarbageCollectionVendorType = "GARBAGE_COLLECTION"
|
||||
// ReplicationVendorType : the name of the replication job in job service
|
||||
ReplicationVendorType = "REPLICATION"
|
||||
// WebhookJobVendorType : the name of the webhook job in job service
|
||||
WebhookJobVendorType = "WEBHOOK"
|
||||
// SlackJobVendorType : the name of the slack job in job service
|
||||
SlackJobVendorType = "SLACK"
|
||||
// RetentionVendorType : the name of the retention job
|
||||
RetentionVendorType = "RETENTION"
|
||||
// P2PPreheatVendorType : the name of the P2P preheat job
|
||||
P2PPreheatVendorType = "P2P_PREHEAT"
|
||||
// PurgeAuditVendorType : the name of purge audit job
|
||||
PurgeAuditVendorType = "PURGE_AUDIT_LOG"
|
||||
// SystemArtifactCleanupVendorType : the name of the SystemArtifact cleanup job
|
||||
SystemArtifactCleanupVendorType = "SYSTEM_ARTIFACT_CLEANUP"
|
||||
// ScanDataExportVendorType : the name of the scan data export job
|
||||
ScanDataExportVendorType = "SCAN_DATA_EXPORT"
|
||||
)
|
||||
|
@ -42,7 +42,7 @@ func (ps *defaultSampler) For(job string) uint {
|
||||
// As an example, sample job has the lowest priority
|
||||
case SampleJob:
|
||||
return 1
|
||||
case SlackJob:
|
||||
case SlackJobVendorType:
|
||||
return 1
|
||||
// add more cases here if specified job priority is required
|
||||
// case XXX:
|
||||
|
@ -42,12 +42,12 @@ func (suite *PrioritySamplerSuite) Test() {
|
||||
p1 := suite.sampler.For(SampleJob)
|
||||
suite.Equal((uint)(1), p1, "Job priority for %s", SampleJob)
|
||||
|
||||
p2 := suite.sampler.For(Retention)
|
||||
suite.Equal(defaultPriority, p2, "Job priority for %s", Retention)
|
||||
p2 := suite.sampler.For(RetentionVendorType)
|
||||
suite.Equal(defaultPriority, p2, "Job priority for %s", RetentionVendorType)
|
||||
|
||||
p3 := suite.sampler.For(Replication)
|
||||
suite.Equal(defaultPriority, p3, "Job priority for %s", Replication)
|
||||
p3 := suite.sampler.For(ReplicationVendorType)
|
||||
suite.Equal(defaultPriority, p3, "Job priority for %s", ReplicationVendorType)
|
||||
|
||||
p4 := suite.sampler.For(SlackJob)
|
||||
suite.Equal((uint)(1), p4, "Job priority for %s", SlackJob)
|
||||
p4 := suite.sampler.For(SlackJobVendorType)
|
||||
suite.Equal((uint)(1), p4, "Job priority for %s", SlackJobVendorType)
|
||||
}
|
||||
|
@ -314,23 +314,23 @@ func (bs *Bootstrap) loadAndRunRedisWorkerPool(
|
||||
// Only for debugging and testing purpose
|
||||
job.SampleJob: (*sample.Job)(nil),
|
||||
// Functional jobs
|
||||
job.ImageScanJob: (*scan.Job)(nil),
|
||||
job.PurgeAudit: (*purge.Job)(nil),
|
||||
job.GarbageCollection: (*gc.GarbageCollector)(nil),
|
||||
job.Replication: (*replication.Replication)(nil),
|
||||
job.Retention: (*retention.Job)(nil),
|
||||
scheduler.JobNameScheduler: (*scheduler.PeriodicJob)(nil),
|
||||
job.WebhookJob: (*notification.WebhookJob)(nil),
|
||||
job.SlackJob: (*notification.SlackJob)(nil),
|
||||
job.P2PPreheat: (*preheat.Job)(nil),
|
||||
job.ScanDataExport: (*scandataexport.ScanDataExport)(nil),
|
||||
job.ImageScanJobVendorType: (*scan.Job)(nil),
|
||||
job.PurgeAuditVendorType: (*purge.Job)(nil),
|
||||
job.GarbageCollectionVendorType: (*gc.GarbageCollector)(nil),
|
||||
job.ReplicationVendorType: (*replication.Replication)(nil),
|
||||
job.RetentionVendorType: (*retention.Job)(nil),
|
||||
scheduler.JobNameScheduler: (*scheduler.PeriodicJob)(nil),
|
||||
job.WebhookJobVendorType: (*notification.WebhookJob)(nil),
|
||||
job.SlackJobVendorType: (*notification.SlackJob)(nil),
|
||||
job.P2PPreheatVendorType: (*preheat.Job)(nil),
|
||||
job.ScanDataExportVendorType: (*scandataexport.ScanDataExport)(nil),
|
||||
// In v2.2 we migrate the scheduled replication, garbage collection and scan all to
|
||||
// the scheduler mechanism, the following three jobs are kept for the legacy jobs
|
||||
// and they can be removed after several releases
|
||||
"IMAGE_REPLICATE": (*legacy.ReplicationScheduler)(nil),
|
||||
"IMAGE_GC": (*legacy.GarbageCollectionScheduler)(nil),
|
||||
"IMAGE_SCAN_ALL": (*legacy.ScanAllScheduler)(nil),
|
||||
job.SystemArtifactCleanup: (*systemartifact.Cleanup)(nil),
|
||||
"IMAGE_REPLICATE": (*legacy.ReplicationScheduler)(nil),
|
||||
"IMAGE_GC": (*legacy.GarbageCollectionScheduler)(nil),
|
||||
"IMAGE_SCAN_ALL": (*legacy.ScanAllScheduler)(nil),
|
||||
job.SystemArtifactCleanupVendorType: (*systemartifact.Cleanup)(nil),
|
||||
}); err != nil {
|
||||
// exit
|
||||
return nil, err
|
||||
|
@ -45,7 +45,7 @@ func (h *HTTPHandler) process(ctx context.Context, event *model.HookEvent) error
|
||||
JobKind: job.KindGeneric,
|
||||
},
|
||||
}
|
||||
j.Name = job.WebhookJob
|
||||
j.Name = job.WebhookJobVendorType
|
||||
|
||||
payload, err := json.Marshal(event.Payload)
|
||||
if err != nil {
|
||||
|
@ -96,7 +96,7 @@ func (s *SlackHandler) process(ctx context.Context, event *model.HookEvent) erro
|
||||
},
|
||||
}
|
||||
// Create a slackJob to send message to slack
|
||||
j.Name = job.SlackJob
|
||||
j.Name = job.SlackJobVendorType
|
||||
|
||||
// Convert payload to slack format
|
||||
payload, err := s.convert(event.Payload)
|
||||
|
@ -13,7 +13,7 @@ import (
|
||||
)
|
||||
|
||||
func init() {
|
||||
if err := task.RegisterCheckInProcessor(job.Retention, retentionTaskCheckInProcessor); err != nil {
|
||||
if err := task.RegisterCheckInProcessor(job.RetentionVendorType, retentionTaskCheckInProcessor); err != nil {
|
||||
log.Fatalf("failed to register the checkin processor for the retention job, error %v", err)
|
||||
}
|
||||
}
|
||||
|
@ -65,7 +65,7 @@ type injectVendorType struct{}
|
||||
// injectVendorType injects vendor type to request header.
|
||||
func (i *injectVendorType) Modify(req *http.Request) error {
|
||||
if req != nil {
|
||||
req.Header.Set("VendorType", job.Retention)
|
||||
req.Header.Set("VendorType", job.RetentionVendorType)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -210,7 +210,7 @@ func createJobs(repositoryRules map[selector.Repository]*lwp.Metadata, isDryRun
|
||||
for repository, policy := range repositoryRules {
|
||||
jobData := &jobData{
|
||||
Repository: repository,
|
||||
JobName: job.Retention,
|
||||
JobName: job.RetentionVendorType,
|
||||
JobParams: make(map[string]interface{}, 3),
|
||||
}
|
||||
// set dry run
|
||||
|
@ -34,7 +34,7 @@ func FromJobservice(req *http.Request) bool {
|
||||
// FromJobRetention detects whether this request is from tag retention job.
|
||||
func FromJobRetention(req *http.Request) bool {
|
||||
if req != nil && req.Header != nil {
|
||||
return req.Header.Get("VendorType") == job.Retention
|
||||
return req.Header.Get("VendorType") == job.RetentionVendorType
|
||||
}
|
||||
|
||||
return false
|
||||
|
@ -11,6 +11,7 @@ import (
|
||||
|
||||
"github.com/goharbor/harbor/src/common/rbac"
|
||||
"github.com/goharbor/harbor/src/controller/gc"
|
||||
"github.com/goharbor/harbor/src/jobservice/job"
|
||||
"github.com/goharbor/harbor/src/lib/config"
|
||||
"github.com/goharbor/harbor/src/lib/errors"
|
||||
"github.com/goharbor/harbor/src/lib/q"
|
||||
@ -161,7 +162,7 @@ func (g *gcAPI) GetGCHistory(ctx context.Context, params operation.GetGCHistoryP
|
||||
}
|
||||
hs = append(hs, &model.GCHistory{
|
||||
ID: exec.ID,
|
||||
Name: gc.GCVendorType,
|
||||
Name: job.GarbageCollectionVendorType,
|
||||
Kind: exec.Trigger,
|
||||
Parameters: string(extraAttrsString),
|
||||
Schedule: &model.ScheduleParam{
|
||||
@ -200,7 +201,7 @@ func (g *gcAPI) GetGC(ctx context.Context, params operation.GetGCParams) middlew
|
||||
|
||||
res := &model.GCHistory{
|
||||
ID: exec.ID,
|
||||
Name: gc.GCVendorType,
|
||||
Name: job.GarbageCollectionVendorType,
|
||||
Kind: exec.Trigger,
|
||||
Parameters: string(extraAttrsString),
|
||||
Status: exec.Status,
|
||||
|
@ -333,7 +333,7 @@ func (api *preheatAPI) DeletePolicy(ctx context.Context, params operation.Delete
|
||||
return nil
|
||||
}
|
||||
executions, err := api.executionCtl.List(ctx, &q.Query{Keywords: map[string]interface{}{
|
||||
"vendor_type": job.P2PPreheat,
|
||||
"vendor_type": job.P2PPreheatVendorType,
|
||||
"vendor_id": policy.ID,
|
||||
}})
|
||||
if err != nil {
|
||||
@ -654,7 +654,7 @@ func (api *preheatAPI) ListExecutions(ctx context.Context, params operation.List
|
||||
}
|
||||
|
||||
if query != nil {
|
||||
query.Keywords["vendor_type"] = job.P2PPreheat
|
||||
query.Keywords["vendor_type"] = job.P2PPreheatVendorType
|
||||
query.Keywords["vendor_id"] = policy.ID
|
||||
}
|
||||
|
||||
@ -822,7 +822,7 @@ func (api *preheatAPI) requireExecutionInProject(ctx context.Context, projectNam
|
||||
return err
|
||||
}
|
||||
|
||||
if exec != nil && exec.VendorType == job.P2PPreheat && exec.VendorID == plc.ID {
|
||||
if exec != nil && exec.VendorType == job.P2PPreheatVendorType && exec.VendorID == plc.ID {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -30,6 +30,7 @@ import (
|
||||
"github.com/goharbor/harbor/src/controller/jobservice"
|
||||
pg "github.com/goharbor/harbor/src/controller/purge"
|
||||
"github.com/goharbor/harbor/src/controller/task"
|
||||
"github.com/goharbor/harbor/src/jobservice/job"
|
||||
"github.com/goharbor/harbor/src/lib/errors"
|
||||
"github.com/goharbor/harbor/src/lib/q"
|
||||
taskPkg "github.com/goharbor/harbor/src/pkg/task"
|
||||
@ -62,7 +63,7 @@ func (p *purgeAPI) CreatePurgeSchedule(ctx context.Context, params purge.CreateP
|
||||
if err := verifyCreateRequest(params); err != nil {
|
||||
return p.SendError(ctx, err)
|
||||
}
|
||||
id, err := p.kick(ctx, pg.VendorType, params.Schedule.Schedule.Type, params.Schedule.Schedule.Cron, params.Schedule.Parameters)
|
||||
id, err := p.kick(ctx, job.PurgeAuditVendorType, params.Schedule.Schedule.Type, params.Schedule.Schedule.Cron, params.Schedule.Parameters)
|
||||
if err != nil {
|
||||
return p.SendError(ctx, err)
|
||||
}
|
||||
@ -151,7 +152,7 @@ func (p *purgeAPI) GetPurgeHistory(ctx context.Context, params purge.GetPurgeHis
|
||||
return p.SendError(ctx, err)
|
||||
}
|
||||
query, err := p.BuildQuery(ctx, params.Q, params.Sort, params.Page, params.PageSize)
|
||||
query.Keywords["VendorType"] = pg.VendorType
|
||||
query.Keywords["VendorType"] = job.PurgeAuditVendorType
|
||||
if err != nil {
|
||||
return p.SendError(ctx, err)
|
||||
}
|
||||
@ -172,7 +173,7 @@ func (p *purgeAPI) GetPurgeHistory(ctx context.Context, params purge.GetPurgeHis
|
||||
}
|
||||
hs = append(hs, &model.ExecHistory{
|
||||
ID: exec.ID,
|
||||
Name: pg.VendorType,
|
||||
Name: job.PurgeAuditVendorType,
|
||||
Kind: exec.Trigger,
|
||||
Parameters: string(extraAttrsString),
|
||||
Schedule: &model.ScheduleParam{
|
||||
@ -200,7 +201,7 @@ func (p *purgeAPI) GetPurgeJob(ctx context.Context, params purge.GetPurgeJobPara
|
||||
}
|
||||
|
||||
exec, err := p.executionCtl.Get(ctx, params.PurgeID)
|
||||
if exec.VendorType != pg.VendorType {
|
||||
if exec.VendorType != job.PurgeAuditVendorType {
|
||||
return p.SendError(ctx, fmt.Errorf("purge job with id %d not found", params.PurgeID))
|
||||
}
|
||||
if err != nil {
|
||||
@ -214,7 +215,7 @@ func (p *purgeAPI) GetPurgeJob(ctx context.Context, params purge.GetPurgeJobPara
|
||||
|
||||
res := &model.ExecHistory{
|
||||
ID: exec.ID,
|
||||
Name: pg.VendorType,
|
||||
Name: job.PurgeAuditVendorType,
|
||||
Kind: exec.Trigger,
|
||||
Parameters: string(extraAttrsString),
|
||||
Status: exec.Status,
|
||||
@ -234,7 +235,7 @@ func (p *purgeAPI) GetPurgeJobLog(ctx context.Context, params purge.GetPurgeJobL
|
||||
}
|
||||
tasks, err := p.taskCtl.List(ctx, q.New(q.KeyWords{
|
||||
"ExecutionID": params.PurgeID,
|
||||
"VendorType": pg.VendorType,
|
||||
"VendorType": job.PurgeAuditVendorType,
|
||||
}))
|
||||
if err != nil {
|
||||
return p.SendError(ctx, err)
|
||||
@ -255,7 +256,7 @@ func (p *purgeAPI) GetPurgeSchedule(ctx context.Context, params purge.GetPurgeSc
|
||||
if err := p.RequireSystemAccess(ctx, rbac.ActionRead, rbac.ResourcePurgeAuditLog); err != nil {
|
||||
return p.SendError(ctx, err)
|
||||
}
|
||||
sch, err := p.schedulerCtl.Get(ctx, pg.VendorType)
|
||||
sch, err := p.schedulerCtl.Get(ctx, job.PurgeAuditVendorType)
|
||||
if errors.IsNotFoundErr(err) {
|
||||
return purge.NewGetPurgeScheduleOK()
|
||||
}
|
||||
@ -287,7 +288,7 @@ func (p *purgeAPI) UpdatePurgeSchedule(ctx context.Context, params purge.UpdateP
|
||||
if err := verifyUpdateRequest(params); err != nil {
|
||||
return p.SendError(ctx, err)
|
||||
}
|
||||
_, err := p.kick(ctx, pg.VendorType, params.Schedule.Schedule.Type, params.Schedule.Schedule.Cron, params.Schedule.Parameters)
|
||||
_, err := p.kick(ctx, job.PurgeAuditVendorType, params.Schedule.Schedule.Type, params.Schedule.Schedule.Cron, params.Schedule.Parameters)
|
||||
if err != nil {
|
||||
return p.SendError(ctx, err)
|
||||
}
|
||||
|
@ -429,7 +429,7 @@ func (r *retentionAPI) requireExecutionInProject(ctx context.Context, p *policy.
|
||||
if exec.PolicyID != p.ID {
|
||||
return errors.New(nil).WithMessage("project: %d, execution id %d not found", p.Scope.Reference, executionID).WithCode(errors.NotFoundCode)
|
||||
}
|
||||
if exec.Type != job.Retention {
|
||||
if exec.Type != job.RetentionVendorType {
|
||||
return errors.New(nil).WithMessage("project: %d, execution id %d not found", p.Scope.Reference, executionID).WithCode(errors.NotFoundCode)
|
||||
}
|
||||
return nil
|
||||
|
@ -33,7 +33,7 @@ class TestJobServiceDashboard(unittest.TestCase, object):
|
||||
self.registry = Registry()
|
||||
self.scan_all = ScanAll()
|
||||
self.schedule = Schedule()
|
||||
self.job_types = [ "GARBAGE_COLLECTION", "PURGE_AUDIT", "P2P_PREHEAT", "IMAGE_SCAN", "REPLICATION", "RETENTION", "SCAN_DATA_EXPORT", "SCHEDULER", "SLACK", "SYSTEM_ARTIFACT_CLEANUP", "WEBHOOK"]
|
||||
self.job_types = [ "GARBAGE_COLLECTION", "PURGE_AUDIT_LOG", "P2P_PREHEAT", "IMAGE_SCAN", "REPLICATION", "RETENTION", "SCAN_DATA_EXPORT", "SCHEDULER", "SLACK", "SYSTEM_ARTIFACT_CLEANUP", "WEBHOOK"]
|
||||
self.cron_type = "Custom"
|
||||
self.cron = "0 0 0 * * 0"
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user