Tiny fixes for task manager

1. Add update time for execution
2. Add unique constraint for schedule to avoid dup records when updating policies
3. Format replication log
4. Keep the webhook handler for legacy replication jobs to avoid jobservice resending the status change request

Signed-off-by: Wenkai Yin <yinw@vmware.com>
This commit is contained in:
Wenkai Yin 2020-12-10 20:26:54 +08:00
parent 949379f7bc
commit 69808f033e
10 changed files with 57 additions and 17 deletions

View File

@ -1931,6 +1931,8 @@ paths:
tags:
- replication
operationId: getReplicationLog
produces:
- text/plain
parameters:
- name: id
in: path

View File

@ -20,6 +20,8 @@ ALTER TABLE task ADD COLUMN IF NOT EXISTS vendor_type varchar(16);
UPDATE task SET vendor_type = execution.vendor_type FROM execution WHERE task.execution_id = execution.id;
ALTER TABLE task ALTER COLUMN vendor_type SET NOT NULL;
ALTER TABLE execution ADD COLUMN IF NOT EXISTS update_time timestamp;
DO $$
DECLARE
art RECORD;
@ -283,3 +285,6 @@ ALTER TABLE scan_report DROP COLUMN IF EXISTS status_code;
ALTER TABLE scan_report DROP COLUMN IF EXISTS status_rev;
ALTER TABLE scan_report DROP COLUMN IF EXISTS start_time;
ALTER TABLE scan_report DROP COLUMN IF EXISTS end_time;
/*add unique for vendor_type+vendor_id to avoid dup records when updating policies*/
ALTER TABLE schedule ADD CONSTRAINT unique_schedule UNIQUE (vendor_type, vendor_id);

View File

@ -59,6 +59,10 @@ func (d *dao) Create(ctx context.Context, schedule *schedule) (int64, error) {
}
id, err := ormer.Insert(schedule)
if err != nil {
if e := orm.AsConflictError(err, "schedule with vendor type: %s vendor ID: %d already exists",
schedule.VendorType, schedule.VendorID); e != nil {
err = e
}
return 0, err
}
return id, nil
@ -118,6 +122,9 @@ func (d *dao) Update(ctx context.Context, schedule *schedule, props ...string) e
}
n, err := ormer.Update(schedule, props...)
if err != nil {
if e := orm.AsConflictError(err, "schedule with the same vendor type and vendor ID already exists"); e != nil {
err = e
}
return err
}
if n == 0 {

View File

@ -58,6 +58,19 @@ func (d *daoTestSuite) TearDownTest() {
func (d *daoTestSuite) TestCreate() {
// the happy pass is covered in SetupTest
// conflict
schedule := &schedule{
VendorType: "Vendor",
VendorID: 1,
CRON: "0 * * * * *",
CallbackFuncName: "callback_func_01",
CallbackFuncParam: "callback_func_params",
ExtraAttrs: `{"key":"value"}`,
}
_, err := d.dao.Create(d.ctx, schedule)
d.Require().NotNil(err)
d.True(errors.IsConflictErr(err))
}
func (d *daoTestSuite) TestList() {

View File

@ -17,11 +17,12 @@ package dao
import (
"context"
"fmt"
"github.com/goharbor/harbor/src/lib/log"
"strings"
"time"
"github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/lib/errors"
"github.com/goharbor/harbor/src/lib/log"
"github.com/goharbor/harbor/src/lib/orm"
"github.com/goharbor/harbor/src/lib/q"
)
@ -234,8 +235,8 @@ func (e *executionDAO) refreshStatus(ctx context.Context, id int64) (bool, strin
return false, "", false, err
}
sql := `update execution set status = ?, revision = revision+1 where id = ? and revision = ?`
result, err := ormer.Raw(sql, status, id, execution.Revision).Exec()
sql := `update execution set status = ?, revision = revision+1, update_time = ? where id = ? and revision = ?`
result, err := ormer.Raw(sql, status, time.Now(), id, execution.Revision).Exec()
if err != nil {
return false, "", false, err
}

View File

@ -38,6 +38,7 @@ type Execution struct {
Trigger string `orm:"column(trigger)"`
ExtraAttrs string `orm:"column(extra_attrs)"` // json string
StartTime time.Time `orm:"column(start_time)"`
UpdateTime time.Time `orm:"column(update_time)"`
EndTime time.Time `orm:"column(end_time)"`
Revision int64 `orm:"column(revision)"`
}

View File

@ -99,33 +99,39 @@ func (e *executionManager) Create(ctx context.Context, vendorType string, vendor
return 0, err
}
now := time.Now()
execution := &dao.Execution{
VendorType: vendorType,
VendorID: vendorID,
Status: job.RunningStatus.String(),
Trigger: trigger,
ExtraAttrs: string(data),
StartTime: time.Now(),
StartTime: now,
UpdateTime: now,
}
return e.executionDAO.Create(ctx, execution)
}
func (e *executionManager) MarkDone(ctx context.Context, id int64, message string) error {
now := time.Now()
return e.executionDAO.Update(ctx, &dao.Execution{
ID: id,
Status: job.SuccessStatus.String(),
StatusMessage: message,
EndTime: time.Now(),
}, "Status", "StatusMessage", "EndTime")
UpdateTime: now,
EndTime: now,
}, "Status", "StatusMessage", "UpdateTime", "EndTime")
}
func (e *executionManager) MarkError(ctx context.Context, id int64, message string) error {
now := time.Now()
return e.executionDAO.Update(ctx, &dao.Execution{
ID: id,
Status: job.ErrorStatus.String(),
StatusMessage: message,
EndTime: time.Now(),
}, "Status", "StatusMessage", "EndTime")
UpdateTime: now,
EndTime: now,
}, "Status", "StatusMessage", "UpdateTime", "EndTime")
}
func (e *executionManager) Stop(ctx context.Context, id int64) error {
@ -146,11 +152,13 @@ func (e *executionManager) Stop(ctx context.Context, id int64) error {
}
// contains no task and the status isn't final, update the status to stop directly
if len(tasks) == 0 && !job.Status(execution.Status).Final() {
now := time.Now()
return e.executionDAO.Update(ctx, &dao.Execution{
ID: id,
Status: job.StoppedStatus.String(),
EndTime: time.Now(),
}, "Status", "EndTime")
ID: id,
Status: job.StoppedStatus.String(),
UpdateTime: now,
EndTime: now,
}, "Status", "UpdateTime", "EndTime")
}
for _, task := range tasks {
@ -266,6 +274,7 @@ func (e *executionManager) populateExecution(ctx context.Context, execution *dao
Metrics: nil,
Trigger: execution.Trigger,
StartTime: execution.StartTime,
UpdateTime: execution.UpdateTime,
EndTime: execution.EndTime,
}

View File

@ -63,14 +63,14 @@ func (e *executionManagerTestSuite) TestCreate() {
}
func (e *executionManagerTestSuite) TestMarkDone() {
e.execDAO.On("Update", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
e.execDAO.On("Update", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
err := e.execMgr.MarkDone(nil, 1, "success")
e.Require().Nil(err)
e.execDAO.AssertExpectations(e.T())
}
func (e *executionManagerTestSuite) TestMarkError() {
e.execDAO.On("Update", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
e.execDAO.On("Update", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
err := e.execMgr.MarkError(nil, 1, "error")
e.Require().Nil(err)
e.execDAO.AssertExpectations(e.T())
@ -83,7 +83,7 @@ func (e *executionManagerTestSuite) TestStop() {
Status: job.RunningStatus.String(),
}, nil)
e.taskDAO.On("List", mock.Anything, mock.Anything).Return(nil, nil)
e.execDAO.On("Update", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
e.execDAO.On("Update", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
err := e.execMgr.Stop(nil, 1)
e.Require().Nil(err)
e.taskDAO.AssertExpectations(e.T())

View File

@ -49,6 +49,7 @@ type Execution struct {
// the customized attributes for different kinds of consumers
ExtraAttrs map[string]interface{} `json:"extra_attrs"`
StartTime time.Time `json:"start_time"`
UpdateTime time.Time `json:"update_time"`
EndTime time.Time `json:"end_time"`
}

View File

@ -50,8 +50,9 @@ func registerRoutes() {
beego.Router("/service/notifications/jobs/webhook/:id([0-9]+)", &jobs.Handler{}, "post:HandleNotificationJob")
beego.Router("/service/notifications/jobs/retention/task/:id([0-9]+)", &jobs.Handler{}, "post:HandleRetentionTask")
beego.Router("/service/notifications/jobs/scan/:uuid", &jobs.Handler{}, "post:HandleScan")
router.NewRoute().Method(http.MethodPost).Path("/service/notifications/schedules/:id([0-9]+)").Handler(handler.NewJobStatusHandler()) // legacy job status hook endpoint for scheduler
router.NewRoute().Method(http.MethodPost).Path("/service/notifications/jobs/replication/:id([0-9]+)").Handler(handler.NewJobStatusHandler()) // legacy job status hook endpoint for replication scheduler
router.NewRoute().Method(http.MethodPost).Path("/service/notifications/schedules/:id([0-9]+)").Handler(handler.NewJobStatusHandler()) // legacy job status hook endpoint for scheduler
router.NewRoute().Method(http.MethodPost).Path("/service/notifications/jobs/replication/:id([0-9]+)").Handler(handler.NewJobStatusHandler()) // legacy job status hook endpoint for replication scheduler
router.NewRoute().Method(http.MethodPost).Path("/service/notifications/jobs/replication/task/:id([0-9]+)").Handler(handler.NewJobStatusHandler()) // legacy job status hook endpoint for replication task
router.NewRoute().Method(http.MethodPost).Path("/service/notifications/tasks/:id").Handler(handler.NewJobStatusHandler())
beego.Router("/service/token", &token.Handler{})