Merge pull request #8675 from ywk253100/190814_retention_task

Handle the retention task status updating in concurrency
This commit is contained in:
Steven Zou 2019-08-20 17:07:21 +08:00 committed by GitHub
commit 217252a097
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 107 additions and 51 deletions

View File

@ -132,6 +132,7 @@ create table retention_task
repository varchar(255), repository varchar(255),
job_id varchar(64), job_id varchar(64),
status varchar(32), status varchar(32),
status_code integer,
start_time timestamp default CURRENT_TIMESTAMP, start_time timestamp default CURRENT_TIMESTAMP,
end_time timestamp default CURRENT_TIMESTAMP, end_time timestamp default CURRENT_TIMESTAMP,
total integer, total integer,

View File

@ -24,6 +24,7 @@ import (
"github.com/goharbor/harbor/src/common/models" "github.com/goharbor/harbor/src/common/models"
"github.com/goharbor/harbor/src/common/utils/log" "github.com/goharbor/harbor/src/common/utils/log"
"github.com/goharbor/harbor/src/core/api" "github.com/goharbor/harbor/src/core/api"
jjob "github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/pkg/notification" "github.com/goharbor/harbor/src/pkg/notification"
"github.com/goharbor/harbor/src/pkg/retention" "github.com/goharbor/harbor/src/pkg/retention"
"github.com/goharbor/harbor/src/replication" "github.com/goharbor/harbor/src/replication"
@ -109,42 +110,52 @@ func (h *Handler) HandleReplicationTask() {
// HandleRetentionTask handles the webhook of retention task // HandleRetentionTask handles the webhook of retention task
func (h *Handler) HandleRetentionTask() { func (h *Handler) HandleRetentionTask() {
log.Debugf("received retention task status update event: task-%d, status-%s", h.id, h.status) taskID := h.id
status := h.rawStatus
log.Debugf("received retention task status update event: task-%d, status-%s", taskID, status)
mgr := &retention.DefaultManager{} mgr := &retention.DefaultManager{}
props := []string{"Status"} // handle checkin
task := &retention.Task{
ID: h.id,
Status: h.status,
}
if h.status == models.JobFinished || h.status == models.JobError ||
h.status == models.JobStopped {
task.EndTime = time.Now()
props = append(props, "EndTime")
} else if h.status == models.JobRunning {
if h.checkIn != "" { if h.checkIn != "" {
var retainObj struct { var retainObj struct {
Total int `json:"total"` Total int `json:"total"`
Retained int `json:"retained"` Retained int `json:"retained"`
} }
if err := json.Unmarshal([]byte(h.checkIn), &retainObj); err != nil { if err := json.Unmarshal([]byte(h.checkIn), &retainObj); err != nil {
log.Errorf("failed to resolve checkin of retention task %d: %v", h.id, err) log.Errorf("failed to resolve checkin of retention task %d: %v", taskID, err)
} else { return
if retainObj.Total > 0 {
task.Total = retainObj.Total
props = append(props, "Total")
} }
if retainObj.Retained > 0 { task := &retention.Task{
task.Retained = retainObj.Retained ID: taskID,
props = append(props, "Retained") Total: retainObj.Total,
Retained: retainObj.Retained,
} }
} if err := mgr.UpdateTask(task, "Total", "Retained"); err != nil {
} log.Errorf("failed to update of retention task %d: %v", taskID, err)
}
if err := mgr.UpdateTask(task, props...); err != nil {
log.Errorf("failed to update the status of retention task %d: %v", h.id, err)
h.SendInternalServerError(err) h.SendInternalServerError(err)
return return
} }
return
}
// handle status updating
if err := mgr.UpdateTaskStatus(taskID, status); err != nil {
log.Errorf("failed to update the status of retention task %d: %v", taskID, err)
h.SendInternalServerError(err)
return
}
// if the status is the final status, update the end time
if status == jjob.StoppedStatus.String() || status == jjob.SuccessStatus.String() ||
status == jjob.ErrorStatus.String() {
task := &retention.Task{
ID: taskID,
EndTime: time.Now(),
}
if err := mgr.UpdateTask(task, "EndTime"); err != nil {
log.Errorf("failed to update of retention task %d: %v", taskID, err)
h.SendInternalServerError(err)
return
}
}
} }
// HandleNotificationJob handles the hook of notification job // HandleNotificationJob handles the hook of notification job

View File

@ -54,6 +54,7 @@ type RetentionTask struct {
Repository string `orm:"column(repository)"` Repository string `orm:"column(repository)"`
JobID string `orm:"column(job_id)"` JobID string `orm:"column(job_id)"`
Status string `orm:"column(status)"` Status string `orm:"column(status)"`
StatusCode int `orm:"column(status_code)"`
StartTime time.Time `orm:"column(start_time)"` StartTime time.Time `orm:"column(start_time)"`
EndTime time.Time `orm:"column(end_time)"` EndTime time.Time `orm:"column(end_time)"`
Total int `orm:"column(total)"` Total int `orm:"column(total)"`

View File

@ -3,12 +3,13 @@ package dao
import ( import (
"errors" "errors"
"fmt" "fmt"
"strconv"
"github.com/astaxie/beego/orm" "github.com/astaxie/beego/orm"
"github.com/goharbor/harbor/src/common/dao" "github.com/goharbor/harbor/src/common/dao"
jobmodels "github.com/goharbor/harbor/src/common/models" jobmodels "github.com/goharbor/harbor/src/common/models"
"github.com/goharbor/harbor/src/pkg/retention/dao/models" "github.com/goharbor/harbor/src/pkg/retention/dao/models"
"github.com/goharbor/harbor/src/pkg/retention/q" "github.com/goharbor/harbor/src/pkg/retention/q"
"strconv"
) )
// CreatePolicy Create Policy // CreatePolicy Create Policy
@ -228,6 +229,18 @@ func UpdateTask(task *models.RetentionTask, cols ...string) error {
return err return err
} }
// UpdateTaskStatus updates the status of task whose status code is less than the statusCode provided
func UpdateTaskStatus(taskID int64, status string, statusCode int) error {
_, err := dao.GetOrmer().QueryTable(&models.RetentionTask{}).
Filter("ID", taskID).
Filter("StatusCode__lt", statusCode).
Update(orm.Params{
"Status": status,
"StatusCode": statusCode,
})
return err
}
// DeleteTask deletes the task record specified by ID in database // DeleteTask deletes the task record specified by ID in database
func DeleteTask(id int64) error { func DeleteTask(id int64) error {
_, err := dao.GetOrmer().Delete(&models.RetentionTask{ _, err := dao.GetOrmer().Delete(&models.RetentionTask{

View File

@ -194,8 +194,12 @@ func TestTask(t *testing.T) {
// update // update
task.ID = id task.ID = id
task.Status = "running" task.Total = 1
err = UpdateTask(task, "Status") err = UpdateTask(task, "Total")
require.Nil(t, err)
// update status
err = UpdateTaskStatus(id, "running", 1)
require.Nil(t, err) require.Nil(t, err)
// list // list
@ -205,8 +209,10 @@ func TestTask(t *testing.T) {
}) })
require.Nil(t, err) require.Nil(t, err)
require.Equal(t, 1, len(tasks)) require.Equal(t, 1, len(tasks))
assert.Equal(t, 1, tasks[0].Total)
assert.Equal(t, int64(1), tasks[0].ExecutionID) assert.Equal(t, int64(1), tasks[0].ExecutionID)
assert.Equal(t, "running", tasks[0].Status) assert.Equal(t, "running", tasks[0].Status)
assert.Equal(t, 1, tasks[0].StatusCode)
// delete // delete
err = DeleteTask(id) err = DeleteTask(id)

View File

@ -126,6 +126,9 @@ func (f *fakeRetentionManager) CreateTask(task *Task) (int64, error) {
func (f *fakeRetentionManager) UpdateTask(task *Task, cols ...string) error { func (f *fakeRetentionManager) UpdateTask(task *Task, cols ...string) error {
return nil return nil
} }
func (f *fakeRetentionManager) UpdateTaskStatus(int64, string) error {
return nil
}
func (f *fakeRetentionManager) GetTaskLog(taskID int64) ([]byte, error) { func (f *fakeRetentionManager) GetTaskLog(taskID int64) ([]byte, error) {
return nil, nil return nil, nil
} }

View File

@ -21,7 +21,8 @@ import (
"time" "time"
"github.com/astaxie/beego/orm" "github.com/astaxie/beego/orm"
"github.com/goharbor/harbor/src/common/job" cjob "github.com/goharbor/harbor/src/common/job"
"github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/pkg/retention/dao" "github.com/goharbor/harbor/src/pkg/retention/dao"
"github.com/goharbor/harbor/src/pkg/retention/dao/models" "github.com/goharbor/harbor/src/pkg/retention/dao/models"
"github.com/goharbor/harbor/src/pkg/retention/policy" "github.com/goharbor/harbor/src/pkg/retention/policy"
@ -58,6 +59,12 @@ type Manager interface {
CreateTask(task *Task) (int64, error) CreateTask(task *Task) (int64, error)
// Update the specified task // Update the specified task
UpdateTask(task *Task, cols ...string) error UpdateTask(task *Task, cols ...string) error
// Update the status of the specified task
// The status is updated only when it is behind the one stored
// in the database.
// e.g. if the status is running but the status stored
// in database is failed, the updating doesn't take effect
UpdateTaskStatus(taskID int64, status string) error
// Get the task specified by the task ID // Get the task specified by the task ID
GetTask(taskID int64) (*Task, error) GetTask(taskID int64) (*Task, error)
// Get the log of the specified task // Get the log of the specified task
@ -217,6 +224,7 @@ func (d *DefaultManager) ListTasks(query ...*q.TaskQuery) ([]*Task, error) {
Repository: t.Repository, Repository: t.Repository,
JobID: t.JobID, JobID: t.JobID,
Status: t.Status, Status: t.Status,
StatusCode: t.StatusCode,
StartTime: t.StartTime, StartTime: t.StartTime,
EndTime: t.EndTime, EndTime: t.EndTime,
Total: t.Total, Total: t.Total,
@ -252,6 +260,15 @@ func (d *DefaultManager) UpdateTask(task *Task, cols ...string) error {
}, cols...) }, cols...)
} }
// UpdateTaskStatus updates the status of the specified task
func (d *DefaultManager) UpdateTaskStatus(taskID int64, status string) error {
if taskID <= 0 {
return fmt.Errorf("invalid task ID: %d", taskID)
}
st := job.Status(status)
return dao.UpdateTaskStatus(taskID, status, st.Code())
}
// GetTask returns the task specified by task ID // GetTask returns the task specified by task ID
func (d *DefaultManager) GetTask(taskID int64) (*Task, error) { func (d *DefaultManager) GetTask(taskID int64) (*Task, error) {
if taskID <= 0 { if taskID <= 0 {
@ -267,6 +284,7 @@ func (d *DefaultManager) GetTask(taskID int64) (*Task, error) {
Repository: task.Repository, Repository: task.Repository,
JobID: task.JobID, JobID: task.JobID,
Status: task.Status, Status: task.Status,
StatusCode: task.StatusCode,
StartTime: task.StartTime, StartTime: task.StartTime,
EndTime: task.EndTime, EndTime: task.EndTime,
Total: task.Total, Total: task.Total,
@ -283,7 +301,7 @@ func (d *DefaultManager) GetTaskLog(taskID int64) ([]byte, error) {
if task == nil { if task == nil {
return nil, fmt.Errorf("task %d not found", taskID) return nil, fmt.Errorf("task %d not found", taskID)
} }
return job.GlobalClient.GetJobLog(task.JobID) return cjob.GlobalClient.GetJobLog(task.JobID)
} }
// NewManager ... // NewManager ...

View File

@ -6,8 +6,8 @@ import (
"time" "time"
"github.com/goharbor/harbor/src/common/dao" "github.com/goharbor/harbor/src/common/dao"
"github.com/goharbor/harbor/src/common/job" "github.com/goharbor/harbor/src/common/job"
jjob "github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/pkg/retention/policy" "github.com/goharbor/harbor/src/pkg/retention/policy"
"github.com/goharbor/harbor/src/pkg/retention/policy/rule" "github.com/goharbor/harbor/src/pkg/retention/policy/rule"
"github.com/goharbor/harbor/src/pkg/retention/q" "github.com/goharbor/harbor/src/pkg/retention/q"
@ -171,7 +171,9 @@ func TestTask(t *testing.T) {
task := &Task{ task := &Task{
ExecutionID: 1, ExecutionID: 1,
JobID: "1", JobID: "1",
Status: TaskStatusPending, Status: jjob.PendingStatus.String(),
StatusCode: jjob.PendingStatus.Code(),
Total: 0,
StartTime: time.Now(), StartTime: time.Now(),
} }
// create // create
@ -185,23 +187,29 @@ func TestTask(t *testing.T) {
// update // update
task.ID = id task.ID = id
task.Status = TaskStatusInProgress task.Total = 1
err = m.UpdateTask(task, "Status") err = m.UpdateTask(task, "Total")
require.Nil(t, err)
// update status to success which is a final status
err = m.UpdateTaskStatus(id, jjob.SuccessStatus.String())
require.Nil(t, err)
// try to update status to running, as the status has already
// been updated to a final status, this updating shouldn't take effect
err = m.UpdateTaskStatus(id, jjob.RunningStatus.String())
require.Nil(t, err) require.Nil(t, err)
// list // list
tasks, err := m.ListTasks(&q.TaskQuery{ tasks, err := m.ListTasks(&q.TaskQuery{
ExecutionID: 1, ExecutionID: 1,
Status: TaskStatusInProgress,
}) })
require.Nil(t, err) require.Nil(t, err)
require.Equal(t, 1, len(tasks)) require.Equal(t, 1, len(tasks))
assert.Equal(t, int64(1), tasks[0].ExecutionID) assert.Equal(t, int64(1), tasks[0].ExecutionID)
assert.Equal(t, TaskStatusInProgress, tasks[0].Status) assert.Equal(t, 1, tasks[0].Total)
assert.Equal(t, jjob.SuccessStatus.String(), tasks[0].Status)
task.Status = TaskStatusFailed assert.Equal(t, jjob.SuccessStatus.Code(), tasks[0].StatusCode)
err = m.UpdateTask(task, "Status")
require.Nil(t, err)
// get task log // get task log
job.GlobalClient = &tjob.MockJobClient{ job.GlobalClient = &tjob.MockJobClient{

View File

@ -23,12 +23,6 @@ const (
ExecutionStatusFailed string = "Failed" ExecutionStatusFailed string = "Failed"
ExecutionStatusStopped string = "Stopped" ExecutionStatusStopped string = "Stopped"
TaskStatusPending string = "Pending"
TaskStatusInProgress string = "InProgress"
TaskStatusSucceed string = "Succeed"
TaskStatusFailed string = "Failed"
TaskStatusStopped string = "Stopped"
CandidateKindImage string = "image" CandidateKindImage string = "image"
CandidateKindChart string = "chart" CandidateKindChart string = "chart"
@ -54,6 +48,7 @@ type Task struct {
Repository string `json:"repository"` Repository string `json:"repository"`
JobID string `json:"job_id"` JobID string `json:"job_id"`
Status string `json:"status"` Status string `json:"status"`
StatusCode int `json:"status_code"`
StartTime time.Time `json:"start_time"` StartTime time.Time `json:"start_time"`
EndTime time.Time `json:"end_time"` EndTime time.Time `json:"end_time"`
Total int `json:"total"` Total int `json:"total"`