diff --git a/api/v2.0/swagger.yaml b/api/v2.0/swagger.yaml index 32d9bcbf7..eada98c57 100644 --- a/api/v2.0/swagger.yaml +++ b/api/v2.0/swagger.yaml @@ -1942,6 +1942,8 @@ paths: tags: - replication operationId: getReplicationLog + produces: + - text/plain parameters: - name: id in: path diff --git a/make/migrations/postgresql/0050_2.2.0_schema.up.sql b/make/migrations/postgresql/0050_2.2.0_schema.up.sql index 4df827c63..7fcdce803 100644 --- a/make/migrations/postgresql/0050_2.2.0_schema.up.sql +++ b/make/migrations/postgresql/0050_2.2.0_schema.up.sql @@ -20,6 +20,8 @@ ALTER TABLE task ADD COLUMN IF NOT EXISTS vendor_type varchar(16); UPDATE task SET vendor_type = execution.vendor_type FROM execution WHERE task.execution_id = execution.id; ALTER TABLE task ALTER COLUMN vendor_type SET NOT NULL; +ALTER TABLE execution ADD COLUMN IF NOT EXISTS update_time timestamp; + DO $$ DECLARE art RECORD; @@ -283,3 +285,6 @@ ALTER TABLE scan_report DROP COLUMN IF EXISTS status_code; ALTER TABLE scan_report DROP COLUMN IF EXISTS status_rev; ALTER TABLE scan_report DROP COLUMN IF EXISTS start_time; ALTER TABLE scan_report DROP COLUMN IF EXISTS end_time; + +/*add unique for vendor_type+vendor_id to avoid dup records when updating policies*/ +ALTER TABLE schedule ADD CONSTRAINT unique_schedule UNIQUE (vendor_type, vendor_id); diff --git a/src/pkg/scheduler/dao.go b/src/pkg/scheduler/dao.go index a303ac0d4..658a44626 100644 --- a/src/pkg/scheduler/dao.go +++ b/src/pkg/scheduler/dao.go @@ -59,6 +59,10 @@ func (d *dao) Create(ctx context.Context, schedule *schedule) (int64, error) { } id, err := ormer.Insert(schedule) if err != nil { + if e := orm.AsConflictError(err, "schedule with vendor type: %s vendor ID: %d already exists", + schedule.VendorType, schedule.VendorID); e != nil { + err = e + } return 0, err } return id, nil @@ -118,6 +122,9 @@ func (d *dao) Update(ctx context.Context, schedule *schedule, props ...string) e } n, err := ormer.Update(schedule, props...) if err != nil { + if e := orm.AsConflictError(err, "schedule with the same vendor type and vendor ID already exists"); e != nil { + err = e + } return err } if n == 0 { diff --git a/src/pkg/scheduler/dao_test.go b/src/pkg/scheduler/dao_test.go index 67f83d11e..4236805a0 100644 --- a/src/pkg/scheduler/dao_test.go +++ b/src/pkg/scheduler/dao_test.go @@ -58,6 +58,19 @@ func (d *daoTestSuite) TearDownTest() { func (d *daoTestSuite) TestCreate() { // the happy pass is covered in SetupTest + + // conflict + schedule := &schedule{ + VendorType: "Vendor", + VendorID: 1, + CRON: "0 * * * * *", + CallbackFuncName: "callback_func_01", + CallbackFuncParam: "callback_func_params", + ExtraAttrs: `{"key":"value"}`, + } + _, err := d.dao.Create(d.ctx, schedule) + d.Require().NotNil(err) + d.True(errors.IsConflictErr(err)) } func (d *daoTestSuite) TestList() { diff --git a/src/pkg/task/dao/execution.go b/src/pkg/task/dao/execution.go index cbf7ecf77..8f6fa2db5 100644 --- a/src/pkg/task/dao/execution.go +++ b/src/pkg/task/dao/execution.go @@ -17,11 +17,12 @@ package dao import ( "context" "fmt" - "github.com/goharbor/harbor/src/lib/log" "strings" + "time" "github.com/goharbor/harbor/src/jobservice/job" "github.com/goharbor/harbor/src/lib/errors" + "github.com/goharbor/harbor/src/lib/log" "github.com/goharbor/harbor/src/lib/orm" "github.com/goharbor/harbor/src/lib/q" ) @@ -234,8 +235,8 @@ func (e *executionDAO) refreshStatus(ctx context.Context, id int64) (bool, strin return false, "", false, err } - sql := `update execution set status = ?, revision = revision+1 where id = ? and revision = ?` - result, err := ormer.Raw(sql, status, id, execution.Revision).Exec() + sql := `update execution set status = ?, revision = revision+1, update_time = ? where id = ? and revision = ?` + result, err := ormer.Raw(sql, status, time.Now(), id, execution.Revision).Exec() if err != nil { return false, "", false, err } diff --git a/src/pkg/task/dao/model.go b/src/pkg/task/dao/model.go index 83659a130..53d8123de 100644 --- a/src/pkg/task/dao/model.go +++ b/src/pkg/task/dao/model.go @@ -38,6 +38,7 @@ type Execution struct { Trigger string `orm:"column(trigger)"` ExtraAttrs string `orm:"column(extra_attrs)"` // json string StartTime time.Time `orm:"column(start_time)"` + UpdateTime time.Time `orm:"column(update_time)"` EndTime time.Time `orm:"column(end_time)"` Revision int64 `orm:"column(revision)"` } diff --git a/src/pkg/task/execution.go b/src/pkg/task/execution.go index fff64fa21..900412134 100644 --- a/src/pkg/task/execution.go +++ b/src/pkg/task/execution.go @@ -99,33 +99,39 @@ func (e *executionManager) Create(ctx context.Context, vendorType string, vendor return 0, err } + now := time.Now() execution := &dao.Execution{ VendorType: vendorType, VendorID: vendorID, Status: job.RunningStatus.String(), Trigger: trigger, ExtraAttrs: string(data), - StartTime: time.Now(), + StartTime: now, + UpdateTime: now, } return e.executionDAO.Create(ctx, execution) } func (e *executionManager) MarkDone(ctx context.Context, id int64, message string) error { + now := time.Now() return e.executionDAO.Update(ctx, &dao.Execution{ ID: id, Status: job.SuccessStatus.String(), StatusMessage: message, - EndTime: time.Now(), - }, "Status", "StatusMessage", "EndTime") + UpdateTime: now, + EndTime: now, + }, "Status", "StatusMessage", "UpdateTime", "EndTime") } func (e *executionManager) MarkError(ctx context.Context, id int64, message string) error { + now := time.Now() return e.executionDAO.Update(ctx, &dao.Execution{ ID: id, Status: job.ErrorStatus.String(), StatusMessage: message, - EndTime: time.Now(), - }, "Status", "StatusMessage", "EndTime") + UpdateTime: now, + EndTime: now, + }, "Status", "StatusMessage", "UpdateTime", "EndTime") } func (e *executionManager) Stop(ctx context.Context, id int64) error { @@ -146,11 +152,13 @@ func (e *executionManager) Stop(ctx context.Context, id int64) error { } // contains no task and the status isn't final, update the status to stop directly if len(tasks) == 0 && !job.Status(execution.Status).Final() { + now := time.Now() return e.executionDAO.Update(ctx, &dao.Execution{ - ID: id, - Status: job.StoppedStatus.String(), - EndTime: time.Now(), - }, "Status", "EndTime") + ID: id, + Status: job.StoppedStatus.String(), + UpdateTime: now, + EndTime: now, + }, "Status", "UpdateTime", "EndTime") } for _, task := range tasks { @@ -266,6 +274,7 @@ func (e *executionManager) populateExecution(ctx context.Context, execution *dao Metrics: nil, Trigger: execution.Trigger, StartTime: execution.StartTime, + UpdateTime: execution.UpdateTime, EndTime: execution.EndTime, } diff --git a/src/pkg/task/execution_test.go b/src/pkg/task/execution_test.go index c5b50993e..8d497d7bc 100644 --- a/src/pkg/task/execution_test.go +++ b/src/pkg/task/execution_test.go @@ -63,14 +63,14 @@ func (e *executionManagerTestSuite) TestCreate() { } func (e *executionManagerTestSuite) TestMarkDone() { - e.execDAO.On("Update", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) + e.execDAO.On("Update", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) err := e.execMgr.MarkDone(nil, 1, "success") e.Require().Nil(err) e.execDAO.AssertExpectations(e.T()) } func (e *executionManagerTestSuite) TestMarkError() { - e.execDAO.On("Update", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) + e.execDAO.On("Update", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) err := e.execMgr.MarkError(nil, 1, "error") e.Require().Nil(err) e.execDAO.AssertExpectations(e.T()) @@ -83,7 +83,7 @@ func (e *executionManagerTestSuite) TestStop() { Status: job.RunningStatus.String(), }, nil) e.taskDAO.On("List", mock.Anything, mock.Anything).Return(nil, nil) - e.execDAO.On("Update", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) + e.execDAO.On("Update", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) err := e.execMgr.Stop(nil, 1) e.Require().Nil(err) e.taskDAO.AssertExpectations(e.T()) diff --git a/src/pkg/task/model.go b/src/pkg/task/model.go index 4750a50ce..fcd01ff96 100644 --- a/src/pkg/task/model.go +++ b/src/pkg/task/model.go @@ -49,6 +49,7 @@ type Execution struct { // the customized attributes for different kinds of consumers ExtraAttrs map[string]interface{} `json:"extra_attrs"` StartTime time.Time `json:"start_time"` + UpdateTime time.Time `json:"update_time"` EndTime time.Time `json:"end_time"` } diff --git a/src/server/route.go b/src/server/route.go index 96071f486..25713a2b9 100644 --- a/src/server/route.go +++ b/src/server/route.go @@ -50,8 +50,9 @@ func registerRoutes() { beego.Router("/service/notifications/jobs/webhook/:id([0-9]+)", &jobs.Handler{}, "post:HandleNotificationJob") beego.Router("/service/notifications/jobs/retention/task/:id([0-9]+)", &jobs.Handler{}, "post:HandleRetentionTask") beego.Router("/service/notifications/jobs/scan/:uuid", &jobs.Handler{}, "post:HandleScan") - router.NewRoute().Method(http.MethodPost).Path("/service/notifications/schedules/:id([0-9]+)").Handler(handler.NewJobStatusHandler()) // legacy job status hook endpoint for scheduler - router.NewRoute().Method(http.MethodPost).Path("/service/notifications/jobs/replication/:id([0-9]+)").Handler(handler.NewJobStatusHandler()) // legacy job status hook endpoint for replication scheduler + router.NewRoute().Method(http.MethodPost).Path("/service/notifications/schedules/:id([0-9]+)").Handler(handler.NewJobStatusHandler()) // legacy job status hook endpoint for scheduler + router.NewRoute().Method(http.MethodPost).Path("/service/notifications/jobs/replication/:id([0-9]+)").Handler(handler.NewJobStatusHandler()) // legacy job status hook endpoint for replication scheduler + router.NewRoute().Method(http.MethodPost).Path("/service/notifications/jobs/replication/task/:id([0-9]+)").Handler(handler.NewJobStatusHandler()) // legacy job status hook endpoint for replication task router.NewRoute().Method(http.MethodPost).Path("/service/notifications/tasks/:id").Handler(handler.NewJobStatusHandler()) beego.Router("/service/token", &token.Handler{})