From 69808f033e92ba2e4646e3a41265594bd9209633 Mon Sep 17 00:00:00 2001 From: Wenkai Yin Date: Thu, 10 Dec 2020 20:26:54 +0800 Subject: [PATCH] Tiny fixes for task manager 1. Add update time for execution 2. Add unique constraint for schedule to avoid dup records when updating policies 3. Format replication log 4. Keep the webhook handler for legacy replication jobs to avoid jobservice resending the status change request Signed-off-by: Wenkai Yin --- api/v2.0/swagger.yaml | 2 ++ .../postgresql/0050_2.2.0_schema.up.sql | 5 ++++ src/pkg/scheduler/dao.go | 7 +++++ src/pkg/scheduler/dao_test.go | 13 +++++++++ src/pkg/task/dao/execution.go | 7 ++--- src/pkg/task/dao/model.go | 1 + src/pkg/task/execution.go | 27 ++++++++++++------- src/pkg/task/execution_test.go | 6 ++--- src/pkg/task/model.go | 1 + src/server/route.go | 5 ++-- 10 files changed, 57 insertions(+), 17 deletions(-) diff --git a/api/v2.0/swagger.yaml b/api/v2.0/swagger.yaml index 550e78321..27a610ef6 100644 --- a/api/v2.0/swagger.yaml +++ b/api/v2.0/swagger.yaml @@ -1931,6 +1931,8 @@ paths: tags: - replication operationId: getReplicationLog + produces: + - text/plain parameters: - name: id in: path diff --git a/make/migrations/postgresql/0050_2.2.0_schema.up.sql b/make/migrations/postgresql/0050_2.2.0_schema.up.sql index 4df827c63..7fcdce803 100644 --- a/make/migrations/postgresql/0050_2.2.0_schema.up.sql +++ b/make/migrations/postgresql/0050_2.2.0_schema.up.sql @@ -20,6 +20,8 @@ ALTER TABLE task ADD COLUMN IF NOT EXISTS vendor_type varchar(16); UPDATE task SET vendor_type = execution.vendor_type FROM execution WHERE task.execution_id = execution.id; ALTER TABLE task ALTER COLUMN vendor_type SET NOT NULL; +ALTER TABLE execution ADD COLUMN IF NOT EXISTS update_time timestamp; + DO $$ DECLARE art RECORD; @@ -283,3 +285,6 @@ ALTER TABLE scan_report DROP COLUMN IF EXISTS status_code; ALTER TABLE scan_report DROP COLUMN IF EXISTS status_rev; ALTER TABLE scan_report DROP COLUMN IF EXISTS start_time; ALTER TABLE scan_report DROP COLUMN IF EXISTS end_time; + +/*add unique for vendor_type+vendor_id to avoid dup records when updating policies*/ +ALTER TABLE schedule ADD CONSTRAINT unique_schedule UNIQUE (vendor_type, vendor_id); diff --git a/src/pkg/scheduler/dao.go b/src/pkg/scheduler/dao.go index a303ac0d4..658a44626 100644 --- a/src/pkg/scheduler/dao.go +++ b/src/pkg/scheduler/dao.go @@ -59,6 +59,10 @@ func (d *dao) Create(ctx context.Context, schedule *schedule) (int64, error) { } id, err := ormer.Insert(schedule) if err != nil { + if e := orm.AsConflictError(err, "schedule with vendor type: %s vendor ID: %d already exists", + schedule.VendorType, schedule.VendorID); e != nil { + err = e + } return 0, err } return id, nil @@ -118,6 +122,9 @@ func (d *dao) Update(ctx context.Context, schedule *schedule, props ...string) e } n, err := ormer.Update(schedule, props...) if err != nil { + if e := orm.AsConflictError(err, "schedule with the same vendor type and vendor ID already exists"); e != nil { + err = e + } return err } if n == 0 { diff --git a/src/pkg/scheduler/dao_test.go b/src/pkg/scheduler/dao_test.go index 67f83d11e..4236805a0 100644 --- a/src/pkg/scheduler/dao_test.go +++ b/src/pkg/scheduler/dao_test.go @@ -58,6 +58,19 @@ func (d *daoTestSuite) TearDownTest() { func (d *daoTestSuite) TestCreate() { // the happy pass is covered in SetupTest + + // conflict + schedule := &schedule{ + VendorType: "Vendor", + VendorID: 1, + CRON: "0 * * * * *", + CallbackFuncName: "callback_func_01", + CallbackFuncParam: "callback_func_params", + ExtraAttrs: `{"key":"value"}`, + } + _, err := d.dao.Create(d.ctx, schedule) + d.Require().NotNil(err) + d.True(errors.IsConflictErr(err)) } func (d *daoTestSuite) TestList() { diff --git a/src/pkg/task/dao/execution.go b/src/pkg/task/dao/execution.go index cbf7ecf77..8f6fa2db5 100644 --- a/src/pkg/task/dao/execution.go +++ b/src/pkg/task/dao/execution.go @@ -17,11 +17,12 @@ package dao import ( "context" "fmt" - "github.com/goharbor/harbor/src/lib/log" "strings" + "time" "github.com/goharbor/harbor/src/jobservice/job" "github.com/goharbor/harbor/src/lib/errors" + "github.com/goharbor/harbor/src/lib/log" "github.com/goharbor/harbor/src/lib/orm" "github.com/goharbor/harbor/src/lib/q" ) @@ -234,8 +235,8 @@ func (e *executionDAO) refreshStatus(ctx context.Context, id int64) (bool, strin return false, "", false, err } - sql := `update execution set status = ?, revision = revision+1 where id = ? and revision = ?` - result, err := ormer.Raw(sql, status, id, execution.Revision).Exec() + sql := `update execution set status = ?, revision = revision+1, update_time = ? where id = ? and revision = ?` + result, err := ormer.Raw(sql, status, time.Now(), id, execution.Revision).Exec() if err != nil { return false, "", false, err } diff --git a/src/pkg/task/dao/model.go b/src/pkg/task/dao/model.go index 83659a130..53d8123de 100644 --- a/src/pkg/task/dao/model.go +++ b/src/pkg/task/dao/model.go @@ -38,6 +38,7 @@ type Execution struct { Trigger string `orm:"column(trigger)"` ExtraAttrs string `orm:"column(extra_attrs)"` // json string StartTime time.Time `orm:"column(start_time)"` + UpdateTime time.Time `orm:"column(update_time)"` EndTime time.Time `orm:"column(end_time)"` Revision int64 `orm:"column(revision)"` } diff --git a/src/pkg/task/execution.go b/src/pkg/task/execution.go index fff64fa21..900412134 100644 --- a/src/pkg/task/execution.go +++ b/src/pkg/task/execution.go @@ -99,33 +99,39 @@ func (e *executionManager) Create(ctx context.Context, vendorType string, vendor return 0, err } + now := time.Now() execution := &dao.Execution{ VendorType: vendorType, VendorID: vendorID, Status: job.RunningStatus.String(), Trigger: trigger, ExtraAttrs: string(data), - StartTime: time.Now(), + StartTime: now, + UpdateTime: now, } return e.executionDAO.Create(ctx, execution) } func (e *executionManager) MarkDone(ctx context.Context, id int64, message string) error { + now := time.Now() return e.executionDAO.Update(ctx, &dao.Execution{ ID: id, Status: job.SuccessStatus.String(), StatusMessage: message, - EndTime: time.Now(), - }, "Status", "StatusMessage", "EndTime") + UpdateTime: now, + EndTime: now, + }, "Status", "StatusMessage", "UpdateTime", "EndTime") } func (e *executionManager) MarkError(ctx context.Context, id int64, message string) error { + now := time.Now() return e.executionDAO.Update(ctx, &dao.Execution{ ID: id, Status: job.ErrorStatus.String(), StatusMessage: message, - EndTime: time.Now(), - }, "Status", "StatusMessage", "EndTime") + UpdateTime: now, + EndTime: now, + }, "Status", "StatusMessage", "UpdateTime", "EndTime") } func (e *executionManager) Stop(ctx context.Context, id int64) error { @@ -146,11 +152,13 @@ func (e *executionManager) Stop(ctx context.Context, id int64) error { } // contains no task and the status isn't final, update the status to stop directly if len(tasks) == 0 && !job.Status(execution.Status).Final() { + now := time.Now() return e.executionDAO.Update(ctx, &dao.Execution{ - ID: id, - Status: job.StoppedStatus.String(), - EndTime: time.Now(), - }, "Status", "EndTime") + ID: id, + Status: job.StoppedStatus.String(), + UpdateTime: now, + EndTime: now, + }, "Status", "UpdateTime", "EndTime") } for _, task := range tasks { @@ -266,6 +274,7 @@ func (e *executionManager) populateExecution(ctx context.Context, execution *dao Metrics: nil, Trigger: execution.Trigger, StartTime: execution.StartTime, + UpdateTime: execution.UpdateTime, EndTime: execution.EndTime, } diff --git a/src/pkg/task/execution_test.go b/src/pkg/task/execution_test.go index c5b50993e..8d497d7bc 100644 --- a/src/pkg/task/execution_test.go +++ b/src/pkg/task/execution_test.go @@ -63,14 +63,14 @@ func (e *executionManagerTestSuite) TestCreate() { } func (e *executionManagerTestSuite) TestMarkDone() { - e.execDAO.On("Update", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) + e.execDAO.On("Update", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) err := e.execMgr.MarkDone(nil, 1, "success") e.Require().Nil(err) e.execDAO.AssertExpectations(e.T()) } func (e *executionManagerTestSuite) TestMarkError() { - e.execDAO.On("Update", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) + e.execDAO.On("Update", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) err := e.execMgr.MarkError(nil, 1, "error") e.Require().Nil(err) e.execDAO.AssertExpectations(e.T()) @@ -83,7 +83,7 @@ func (e *executionManagerTestSuite) TestStop() { Status: job.RunningStatus.String(), }, nil) e.taskDAO.On("List", mock.Anything, mock.Anything).Return(nil, nil) - e.execDAO.On("Update", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) + e.execDAO.On("Update", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) err := e.execMgr.Stop(nil, 1) e.Require().Nil(err) e.taskDAO.AssertExpectations(e.T()) diff --git a/src/pkg/task/model.go b/src/pkg/task/model.go index 4750a50ce..fcd01ff96 100644 --- a/src/pkg/task/model.go +++ b/src/pkg/task/model.go @@ -49,6 +49,7 @@ type Execution struct { // the customized attributes for different kinds of consumers ExtraAttrs map[string]interface{} `json:"extra_attrs"` StartTime time.Time `json:"start_time"` + UpdateTime time.Time `json:"update_time"` EndTime time.Time `json:"end_time"` } diff --git a/src/server/route.go b/src/server/route.go index 96071f486..25713a2b9 100644 --- a/src/server/route.go +++ b/src/server/route.go @@ -50,8 +50,9 @@ func registerRoutes() { beego.Router("/service/notifications/jobs/webhook/:id([0-9]+)", &jobs.Handler{}, "post:HandleNotificationJob") beego.Router("/service/notifications/jobs/retention/task/:id([0-9]+)", &jobs.Handler{}, "post:HandleRetentionTask") beego.Router("/service/notifications/jobs/scan/:uuid", &jobs.Handler{}, "post:HandleScan") - router.NewRoute().Method(http.MethodPost).Path("/service/notifications/schedules/:id([0-9]+)").Handler(handler.NewJobStatusHandler()) // legacy job status hook endpoint for scheduler - router.NewRoute().Method(http.MethodPost).Path("/service/notifications/jobs/replication/:id([0-9]+)").Handler(handler.NewJobStatusHandler()) // legacy job status hook endpoint for replication scheduler + router.NewRoute().Method(http.MethodPost).Path("/service/notifications/schedules/:id([0-9]+)").Handler(handler.NewJobStatusHandler()) // legacy job status hook endpoint for scheduler + router.NewRoute().Method(http.MethodPost).Path("/service/notifications/jobs/replication/:id([0-9]+)").Handler(handler.NewJobStatusHandler()) // legacy job status hook endpoint for replication scheduler + router.NewRoute().Method(http.MethodPost).Path("/service/notifications/jobs/replication/task/:id([0-9]+)").Handler(handler.NewJobStatusHandler()) // legacy job status hook endpoint for replication task router.NewRoute().Method(http.MethodPost).Path("/service/notifications/tasks/:id").Handler(handler.NewJobStatusHandler()) beego.Router("/service/token", &token.Handler{})