Merge pull request #13729 from ywk253100/201210_fix

Tiny fixes for task manager
This commit is contained in:
Wenkai Yin(尹文开) 2020-12-14 18:41:46 +08:00 committed by GitHub
commit fd900889c1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 57 additions and 17 deletions

View File

@ -1942,6 +1942,8 @@ paths:
tags: tags:
- replication - replication
operationId: getReplicationLog operationId: getReplicationLog
produces:
- text/plain
parameters: parameters:
- name: id - name: id
in: path in: path

View File

@ -20,6 +20,8 @@ ALTER TABLE task ADD COLUMN IF NOT EXISTS vendor_type varchar(16);
UPDATE task SET vendor_type = execution.vendor_type FROM execution WHERE task.execution_id = execution.id; UPDATE task SET vendor_type = execution.vendor_type FROM execution WHERE task.execution_id = execution.id;
ALTER TABLE task ALTER COLUMN vendor_type SET NOT NULL; ALTER TABLE task ALTER COLUMN vendor_type SET NOT NULL;
ALTER TABLE execution ADD COLUMN IF NOT EXISTS update_time timestamp;
DO $$ DO $$
DECLARE DECLARE
art RECORD; art RECORD;
@ -283,3 +285,6 @@ ALTER TABLE scan_report DROP COLUMN IF EXISTS status_code;
ALTER TABLE scan_report DROP COLUMN IF EXISTS status_rev; ALTER TABLE scan_report DROP COLUMN IF EXISTS status_rev;
ALTER TABLE scan_report DROP COLUMN IF EXISTS start_time; ALTER TABLE scan_report DROP COLUMN IF EXISTS start_time;
ALTER TABLE scan_report DROP COLUMN IF EXISTS end_time; ALTER TABLE scan_report DROP COLUMN IF EXISTS end_time;
/*add unique for vendor_type+vendor_id to avoid dup records when updating policies*/
ALTER TABLE schedule ADD CONSTRAINT unique_schedule UNIQUE (vendor_type, vendor_id);

View File

@ -59,6 +59,10 @@ func (d *dao) Create(ctx context.Context, schedule *schedule) (int64, error) {
} }
id, err := ormer.Insert(schedule) id, err := ormer.Insert(schedule)
if err != nil { if err != nil {
if e := orm.AsConflictError(err, "schedule with vendor type: %s vendor ID: %d already exists",
schedule.VendorType, schedule.VendorID); e != nil {
err = e
}
return 0, err return 0, err
} }
return id, nil return id, nil
@ -118,6 +122,9 @@ func (d *dao) Update(ctx context.Context, schedule *schedule, props ...string) e
} }
n, err := ormer.Update(schedule, props...) n, err := ormer.Update(schedule, props...)
if err != nil { if err != nil {
if e := orm.AsConflictError(err, "schedule with the same vendor type and vendor ID already exists"); e != nil {
err = e
}
return err return err
} }
if n == 0 { if n == 0 {

View File

@ -58,6 +58,19 @@ func (d *daoTestSuite) TearDownTest() {
func (d *daoTestSuite) TestCreate() { func (d *daoTestSuite) TestCreate() {
// the happy pass is covered in SetupTest // the happy pass is covered in SetupTest
// conflict
schedule := &schedule{
VendorType: "Vendor",
VendorID: 1,
CRON: "0 * * * * *",
CallbackFuncName: "callback_func_01",
CallbackFuncParam: "callback_func_params",
ExtraAttrs: `{"key":"value"}`,
}
_, err := d.dao.Create(d.ctx, schedule)
d.Require().NotNil(err)
d.True(errors.IsConflictErr(err))
} }
func (d *daoTestSuite) TestList() { func (d *daoTestSuite) TestList() {

View File

@ -17,11 +17,12 @@ package dao
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/goharbor/harbor/src/lib/log"
"strings" "strings"
"time"
"github.com/goharbor/harbor/src/jobservice/job" "github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/lib/errors" "github.com/goharbor/harbor/src/lib/errors"
"github.com/goharbor/harbor/src/lib/log"
"github.com/goharbor/harbor/src/lib/orm" "github.com/goharbor/harbor/src/lib/orm"
"github.com/goharbor/harbor/src/lib/q" "github.com/goharbor/harbor/src/lib/q"
) )
@ -234,8 +235,8 @@ func (e *executionDAO) refreshStatus(ctx context.Context, id int64) (bool, strin
return false, "", false, err return false, "", false, err
} }
sql := `update execution set status = ?, revision = revision+1 where id = ? and revision = ?` sql := `update execution set status = ?, revision = revision+1, update_time = ? where id = ? and revision = ?`
result, err := ormer.Raw(sql, status, id, execution.Revision).Exec() result, err := ormer.Raw(sql, status, time.Now(), id, execution.Revision).Exec()
if err != nil { if err != nil {
return false, "", false, err return false, "", false, err
} }

View File

@ -38,6 +38,7 @@ type Execution struct {
Trigger string `orm:"column(trigger)"` Trigger string `orm:"column(trigger)"`
ExtraAttrs string `orm:"column(extra_attrs)"` // json string ExtraAttrs string `orm:"column(extra_attrs)"` // json string
StartTime time.Time `orm:"column(start_time)"` StartTime time.Time `orm:"column(start_time)"`
UpdateTime time.Time `orm:"column(update_time)"`
EndTime time.Time `orm:"column(end_time)"` EndTime time.Time `orm:"column(end_time)"`
Revision int64 `orm:"column(revision)"` Revision int64 `orm:"column(revision)"`
} }

View File

@ -99,33 +99,39 @@ func (e *executionManager) Create(ctx context.Context, vendorType string, vendor
return 0, err return 0, err
} }
now := time.Now()
execution := &dao.Execution{ execution := &dao.Execution{
VendorType: vendorType, VendorType: vendorType,
VendorID: vendorID, VendorID: vendorID,
Status: job.RunningStatus.String(), Status: job.RunningStatus.String(),
Trigger: trigger, Trigger: trigger,
ExtraAttrs: string(data), ExtraAttrs: string(data),
StartTime: time.Now(), StartTime: now,
UpdateTime: now,
} }
return e.executionDAO.Create(ctx, execution) return e.executionDAO.Create(ctx, execution)
} }
func (e *executionManager) MarkDone(ctx context.Context, id int64, message string) error { func (e *executionManager) MarkDone(ctx context.Context, id int64, message string) error {
now := time.Now()
return e.executionDAO.Update(ctx, &dao.Execution{ return e.executionDAO.Update(ctx, &dao.Execution{
ID: id, ID: id,
Status: job.SuccessStatus.String(), Status: job.SuccessStatus.String(),
StatusMessage: message, StatusMessage: message,
EndTime: time.Now(), UpdateTime: now,
}, "Status", "StatusMessage", "EndTime") EndTime: now,
}, "Status", "StatusMessage", "UpdateTime", "EndTime")
} }
func (e *executionManager) MarkError(ctx context.Context, id int64, message string) error { func (e *executionManager) MarkError(ctx context.Context, id int64, message string) error {
now := time.Now()
return e.executionDAO.Update(ctx, &dao.Execution{ return e.executionDAO.Update(ctx, &dao.Execution{
ID: id, ID: id,
Status: job.ErrorStatus.String(), Status: job.ErrorStatus.String(),
StatusMessage: message, StatusMessage: message,
EndTime: time.Now(), UpdateTime: now,
}, "Status", "StatusMessage", "EndTime") EndTime: now,
}, "Status", "StatusMessage", "UpdateTime", "EndTime")
} }
func (e *executionManager) Stop(ctx context.Context, id int64) error { func (e *executionManager) Stop(ctx context.Context, id int64) error {
@ -146,11 +152,13 @@ func (e *executionManager) Stop(ctx context.Context, id int64) error {
} }
// contains no task and the status isn't final, update the status to stop directly // contains no task and the status isn't final, update the status to stop directly
if len(tasks) == 0 && !job.Status(execution.Status).Final() { if len(tasks) == 0 && !job.Status(execution.Status).Final() {
now := time.Now()
return e.executionDAO.Update(ctx, &dao.Execution{ return e.executionDAO.Update(ctx, &dao.Execution{
ID: id, ID: id,
Status: job.StoppedStatus.String(), Status: job.StoppedStatus.String(),
EndTime: time.Now(), UpdateTime: now,
}, "Status", "EndTime") EndTime: now,
}, "Status", "UpdateTime", "EndTime")
} }
for _, task := range tasks { for _, task := range tasks {
@ -266,6 +274,7 @@ func (e *executionManager) populateExecution(ctx context.Context, execution *dao
Metrics: nil, Metrics: nil,
Trigger: execution.Trigger, Trigger: execution.Trigger,
StartTime: execution.StartTime, StartTime: execution.StartTime,
UpdateTime: execution.UpdateTime,
EndTime: execution.EndTime, EndTime: execution.EndTime,
} }

View File

@ -63,14 +63,14 @@ func (e *executionManagerTestSuite) TestCreate() {
} }
func (e *executionManagerTestSuite) TestMarkDone() { func (e *executionManagerTestSuite) TestMarkDone() {
e.execDAO.On("Update", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) e.execDAO.On("Update", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
err := e.execMgr.MarkDone(nil, 1, "success") err := e.execMgr.MarkDone(nil, 1, "success")
e.Require().Nil(err) e.Require().Nil(err)
e.execDAO.AssertExpectations(e.T()) e.execDAO.AssertExpectations(e.T())
} }
func (e *executionManagerTestSuite) TestMarkError() { func (e *executionManagerTestSuite) TestMarkError() {
e.execDAO.On("Update", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) e.execDAO.On("Update", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
err := e.execMgr.MarkError(nil, 1, "error") err := e.execMgr.MarkError(nil, 1, "error")
e.Require().Nil(err) e.Require().Nil(err)
e.execDAO.AssertExpectations(e.T()) e.execDAO.AssertExpectations(e.T())
@ -83,7 +83,7 @@ func (e *executionManagerTestSuite) TestStop() {
Status: job.RunningStatus.String(), Status: job.RunningStatus.String(),
}, nil) }, nil)
e.taskDAO.On("List", mock.Anything, mock.Anything).Return(nil, nil) e.taskDAO.On("List", mock.Anything, mock.Anything).Return(nil, nil)
e.execDAO.On("Update", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) e.execDAO.On("Update", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
err := e.execMgr.Stop(nil, 1) err := e.execMgr.Stop(nil, 1)
e.Require().Nil(err) e.Require().Nil(err)
e.taskDAO.AssertExpectations(e.T()) e.taskDAO.AssertExpectations(e.T())

View File

@ -49,6 +49,7 @@ type Execution struct {
// the customized attributes for different kinds of consumers // the customized attributes for different kinds of consumers
ExtraAttrs map[string]interface{} `json:"extra_attrs"` ExtraAttrs map[string]interface{} `json:"extra_attrs"`
StartTime time.Time `json:"start_time"` StartTime time.Time `json:"start_time"`
UpdateTime time.Time `json:"update_time"`
EndTime time.Time `json:"end_time"` EndTime time.Time `json:"end_time"`
} }

View File

@ -50,8 +50,9 @@ func registerRoutes() {
beego.Router("/service/notifications/jobs/webhook/:id([0-9]+)", &jobs.Handler{}, "post:HandleNotificationJob") beego.Router("/service/notifications/jobs/webhook/:id([0-9]+)", &jobs.Handler{}, "post:HandleNotificationJob")
beego.Router("/service/notifications/jobs/retention/task/:id([0-9]+)", &jobs.Handler{}, "post:HandleRetentionTask") beego.Router("/service/notifications/jobs/retention/task/:id([0-9]+)", &jobs.Handler{}, "post:HandleRetentionTask")
beego.Router("/service/notifications/jobs/scan/:uuid", &jobs.Handler{}, "post:HandleScan") beego.Router("/service/notifications/jobs/scan/:uuid", &jobs.Handler{}, "post:HandleScan")
router.NewRoute().Method(http.MethodPost).Path("/service/notifications/schedules/:id([0-9]+)").Handler(handler.NewJobStatusHandler()) // legacy job status hook endpoint for scheduler router.NewRoute().Method(http.MethodPost).Path("/service/notifications/schedules/:id([0-9]+)").Handler(handler.NewJobStatusHandler()) // legacy job status hook endpoint for scheduler
router.NewRoute().Method(http.MethodPost).Path("/service/notifications/jobs/replication/:id([0-9]+)").Handler(handler.NewJobStatusHandler()) // legacy job status hook endpoint for replication scheduler router.NewRoute().Method(http.MethodPost).Path("/service/notifications/jobs/replication/:id([0-9]+)").Handler(handler.NewJobStatusHandler()) // legacy job status hook endpoint for replication scheduler
router.NewRoute().Method(http.MethodPost).Path("/service/notifications/jobs/replication/task/:id([0-9]+)").Handler(handler.NewJobStatusHandler()) // legacy job status hook endpoint for replication task
router.NewRoute().Method(http.MethodPost).Path("/service/notifications/tasks/:id").Handler(handler.NewJobStatusHandler()) router.NewRoute().Method(http.MethodPost).Path("/service/notifications/tasks/:id").Handler(handler.NewJobStatusHandler())
beego.Router("/service/token", &token.Handler{}) beego.Router("/service/token", &token.Handler{})